viernes, 26 de julio de 2019

Install Minikube ubuntu 18.04

Install Minikube ubuntu 18.04

referencias: https://kubernetes.io/docs/setup/learning-environment/minikube/

bext@bext-VPCF13WFX:~$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/v1.2.0/minikube-linux-amd64
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 39.8M  100 39.8M    0     0   455k      0  0:01:29  0:01:29 --:--:--  498k
 
bext@bext-VPCF13WFX:~$ chmod +x minikube
bext@bext-VPCF13WFX:~$ sudo cp minikube /usr/local/bin
[sudo] password for bext: 
bext@bext-VPCF13WFX:~$ rm minikube
bext@bext-VPCF13WFX:~$ 
 
Podemos eliminar el borrar minikube, pero tendremos que agregarlo al PATH.

1.- Arrancamos minikube y creamos un cluster

bext@bext-VPCF13WFX:~$ minikube start -p micluster
😄  minikube v1.2.0 on linux (amd64)
🔥  Creating virtualbox VM (CPUs=2, Memory=2048MB, Disk=20000MB) ...
🐳  Configuring environment for Kubernetes v1.15.0 on Docker 18.09.6
💾  Downloading kubeadm v1.15.0
💾  Downloading kubelet v1.15.0
🚜  Pulling images ...
🚀  Launching Kubernetes ... 
⌛  Verifying: apiserver proxy etcd scheduler controller dns
🏄  Done! kubectl is now configured to use "micluster"
bext@bext-VPCF13WFX:~
 
 
 
Se ha creado un minikube context llamado minikube. esto se hace automáticamente, manualmente se hace con
kubectl config use-context minikube

o se pasa como parametro
kubectl get pods --context=minikube

Podemos correr el dashboard especificamos el nombre del cluster ya que lo creamos con nombre micluster.

bext@bext-VPCF13WFX:~$ minikube dashboard -p micluster
🔌  Enabling dashboard ...
🤔  Verifying dashboard health ...
🚀  Launching proxy ...
🤔  Verifying proxy health ...
🎉  Opening http://127.0.0.1:38919/api/v1/namespaces/kube-system/services/http:kubernetes-dashboard:/proxy/ in your default browser...


Probamos borrando el cluster
bext@bext-VPCF13WFX:~$ minikube delete
🙄  "minikube" cluster does not exist
🙄  "minikube" profile does not exist
bext@bext-VPCF13WFX:~$ minikube delete -p micluster
🔥  Deleting "micluster" from virtualbox ...
💔  The "micluster" cluster has been deleted.
bext@bext-VPCF13WFX:~$ 


Cremos de nuevo pero con nombre de cluster por default minikube

bext@bext-VPCF13WFX:~$ minikube start
😄  minikube v1.2.0 on linux (amd64)
🔥  Creating virtualbox VM (CPUs=2, Memory=2048MB, Disk=20000MB) ...
🐳  Configuring environment for Kubernetes v1.15.0 on Docker 18.09.6
🚜  Pulling images ...
🚀  Launching Kubernetes ... 
⌛  Verifying: apiserver proxy etcd scheduler controller dns
🏄  Done! kubectl is now configured to use "minikube"


minikube usa la imagen boot2docker con user y passwork:
user: docker
pass: tcuser

2.-  Interactuamos con el cluster usando kubectl, usamos la imagen echoserver y la exponemos al puerto 8080

bext@bext-VPCF13WFX:~$ kubectl run prueba-minikube --image=k8s.gcr.io/echoserver:1.10 --port=8080
kubectl run --generator=deployment/apps.v1 is DEPRECATED and will be removed in a future version. Use kubectl run --generator=run-pod/v1 or kubectl create instead.
deployment.apps/prueba-minikube created

 3.- Para acceder a prueba-minikube debemos exponerlo como servicio.

bext@bext-VPCF13WFX:~$ kubectl expose deployment prueba-minikube --type=NodePort
service/prueba-minikube exposed

4.- Debemos esperar un poco a que se esponga el servicio

bext@bext-VPCF13WFX:~$ kubectl get pod
NAME                             READY   STATUS    RESTARTS   AGE
prueba-minikube-69d8887f-fl5fm   1/1     Running   0          7m6s
bext@bext-VPCF13WFX:~$ 



5.- obtenemos la url de donde se expuso el servicio prueba-minikube, y la accesamos.

bext@bext-VPCF13WFX:~$ minikube service prueba-minikube --url
http://192.168.99.116:30950

bext@bext-VPCF13WFX:~$ curl http://192.168.99.116:30950


Hostname: prueba-minikube-69d8887f-fl5fm

Pod Information:
 -no pod information available-

Server values:
 server_version=nginx: 1.13.3 - lua: 10008

Request Information:
 client_address=172.17.0.1
 method=GET
 real path=/
 query=
 request_version=1.1
 request_scheme=http
 request_uri=http://192.168.99.116:8080/

Request Headers:
 accept=*/*
 host=192.168.99.116:30950
 user-agent=curl/7.58.0

Request Body:
 -no body in request-

bext@bext-VPCF13WFX:~$ 


6.- Borramos el servicio prueba-minikube

bext@bext-VPCF13WFX:~$ kubectl delete services prueba-minikube
service "prueba-minikube" deleted

7.- Borramos el deployment prueba-minikube

bext@bext-VPCF13WFX:~$ kubectl delete deployment prueba-minikube
deployment.extensions "prueba-minikube" deleted

8.- Detemenos el cluster local minikube

bext@bext-VPCF13WFX:~$ minikube stop
✋  Stopping "minikube" in virtualbox ...
🛑  "minikube" stopped.
 
9.- Borramos el cluster local minikube

bext@bext-VPCF13WFX:~$ minikube delete
🔥  Deleting "minikube" from virtualbox ...
💔  The "minikube" cluster has been deleted.

eot

miércoles, 24 de julio de 2019

RabbitMQ RPC Request/Reply Pattern

RabbitMQ RPC Request/Reply Pattern



referencias:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
https://github.com/jalbertomr/RabbitMQRPC.git

  Cuando queremos ejecutar una función que esta ubicada en una máquina remota y esperamos el resultado, se apega a un patrón conocido como Remote Procedure Call (RPC).

  Veremos que con RabbitMQ podemos construir un sistema RPC con los elementos que nos proporciona. Tendremos un cliente y un servidor RPC escalable. El proceso remoto será una función que calcula números fibonacci.

El Cliente manda a llamar la función remota con call y blockea hasta que recibe la respuesta. (una alternativa es usar una secuencia de procesos de forma asincrona por aquello del non-blocking).

Se necesita una forma de decirle al servidor por que queue regresará la respuesta, esto se lo indicamos al cliente al momento de hacer la publicación por medio de propiedades enviada en el parametro de la función de publicación. podiramos utlizar también el default-queue.

  Existen varias propiedades que salen de AMQP.Properties, entre ellas la de replayTo donde indicamos el queue, y correlationId, que nos servira para identificar el mensaje que le corresponde al cliente al emitirlo.

CorrelationId

  Se crea un solo queue por cliente, El cliente recibirá una respuesta que tomará del queue de respuesta y verificará que el correlationId sea el mismo, o sea, el que le corresponde, de no ser así, simplemente lo ignorará. este no se debe perder pues esta en un queue que lo retoma.

En Resumen la esta implementación de RPC con RabbitMQ funciona de la siguiente manera:
   El cliente genera para cada request propiedades unicas como el queue temporal y exclusivo que es en donde espera la respuesta, y el correlationId. Este mensaje en el request es enviado a un queue "rpc_queue". El server o RPC worker espera algún request en el "rpc_queue", al llegar genera la respuesta y la envía el queue temporal que le mando el cliente en las propiedades del mensaje.
El cliente espera por la respuesta en el queue temporal exclusivo, cuando aparece esta respuesta verifica que sea la misma correlationId para regresar el mensaje.


Server

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();    ) {
            boolean durable = false;
            boolean exclusive = false;
            boolean autoDelete = false;
            channel.queueDeclare(RPC_QUEUE_NAME, durable, exclusive, autoDelete, null /*arguments*/);
            channel.queuePurge(RPC_QUEUE_NAME);
           
            channel.basicQos(1);
           
            System.out.println("[x] Esperando peticiones RPC...");
           
            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
               AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                       .Builder()
                       .correlationId(delivery.getProperties().getCorrelationId())
                       .build();
              
               String response = "";
              
               try {
                  String message = new String(delivery.getBody(), "UTF-8");
                  int n = Integer.parseInt(message);
                 
                  System.out.println("[.] fib(" + message + ")");
                  response += fib(n);
               } catch (RuntimeException e) {
                  System.out.println("[.] " + e.toString());
               } finally {
                  String exchange = "";
                  String routingKey = delivery.getProperties().getReplyTo();
                  channel.basicPublish(exchange, routingKey, replyProps, response.getBytes("UTF-8"));
                  boolean multiple = false;
                  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), multiple);
                // RabbitMq consumer worker thread notifies the RPC server owner thread
                  synchronized(monitor) {
                      monitor.notify();
                  }
               }
            };
            boolean autoAck = false;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag->{} );
           
            // Wait and be prepared to consume the message from RPC client.
            while(true) {
                synchronized(monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e2) {
                        e2.printStackTrace();
                    }
                }
            }
        }
    }

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }
}


Cliente

 El Bloqueo se hace por BlockingQueue.

public class RPCClient implements AutoCloseable {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
   
    public RPCClient() throws IOException, TimeoutException {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("localhost");
      
       connection = factory.newConnection();
       channel = connection.createChannel();
    }
   
    public static void main(String[] args) {
        try( RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 0 ; i < 32; i++ ) {
                String i_str = Integer.toString(i);
                System.out.println("[x] Peticion de fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println("[.] Se obtiene '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    private String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();
       
        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        String exchange = "";
        String routingKey = requestQueueName;
        channel.basicPublish(exchange, routingKey, props, message.getBytes("UTF-8"));
       
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        boolean autoAck = true;
        String ctag = channel.basicConsume( replyQueueName, autoAck,
                (consumerTag, delivery)-> {
                    if  (delivery.getProperties().getCorrelationId().equals(corrId)) {
                        response.offer(new String(delivery.getBody(),"UTF-8"));
                    }
                }, consumerTag->{}
                );
        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    @Override
    public void close() throws IOException {
        connection.close();
    }
}


La Prueba

  Ejecutando Tres Workers, para las 32 peticiones de trabajo del cliente, el trabajo se escala y el tiempo de respuesta es menor.




eot

RabbitMQ Topics

RabbitMQ Topics


Referencia:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
https://github.com/jalbertomr/RabbitMQTopics.git

  Declaramos el exchange como tipo topic, y practicamente nada más, la naturaleza de filtro del routingKey hace el trabajo por si solo.


public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
   
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
           
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
           
            String routingKey = getRoutingKey(args);
            String message = getMessage(args);
           
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Enviado '" + routingKey + "':'" + message + "'");
        };
    }

    private static String getMessage(String[] args) {
        return args[1];
    }

    private static String getRoutingKey(String[] args) {
        return args[0];
    }
}


..

public class ReceiveLogsTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
   
    public static void main(String[] args) throws IOException, TimeoutException {
        if (args.length < 1) {
            System.err.println("Uso: ReceiveLogsTopic [binding key]...");
            System.exit(1);
        }
       
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
       
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
       
        for (String bindingKey : args) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }
        System.out.println("[x] Esperando por mensajes. CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] recibido '" +
                    delivery.getEnvelope().getRoutingKey() +
                    "':'" + message + "'");
        };
        boolean autoack = true;
        channel.basicConsume(queueName, autoack, deliverCallback, consumerTag->{});
    }
}


Ejecución de ejemplo



Otro ejemplo con otra definicion de topicos




eot

RabbitMQ Routing

RabbitMQ Routing

  
Referencia: https://www.rabbitmq.com/tutorials/tutorial-four-java.html
https://github.com/jalbertomr/RabbitMQRouting.git

   A diferencia de enviar un mensaje a varios consumidores, podemos enviar mensajes a un subconjunto de los consumidores, o un consumidor puede recibir mensajes de un grupo de tipos de mensajes.

Binding

   Por medio del RoutingKey del binging definimos una ruta entre el exchange y los consumidores. El consumidor se enlaza haciendo un canal especificando el exchange, y el RoutingKey que en este caso es el logLevel. el queue es temporal.
Se hace notar que podemos tener multiples binding o sea de un productor a varios consumidores.

Exchange

  El exchange se especifica de tipo Direct.


   Ejecutamos en varias terminales Consumidores configurados con direfentes características, uno para recibir warning y error y lo envía a archivo. otro para recibir info, warning, error, y otro para recibir info, warning. Y un Productor que genera varios mensajes de diferente nivel de log. Estas configuraciones se hacen desde linea de comandos que especifivan el RoutingKey que recibirá el Consumidor.


public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
           
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
           
            String logLevel = getLogLevel(args);
            String message = getMessage(args);
           
            AMQP.BasicProperties props = null;
            channel.basicPublish(EXCHANGE_NAME, logLevel, props, message.getBytes("UTF-8"));
            System.out.println(" [x] Enviado '" + logLevel + "':'" + message + "'");
        }
    }

    private static String getMessage(String[] args) {
        return args[1];
    }

    private static String getLogLevel(String[] args) {
        return args[0];
    }
}


..
public class ReceiveLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
   
    public static void main(String[] args) throws IOException, TimeoutException {
        if (args.length < 1) {
            System.err.println("Uso: ReceiveLogDirect [info] [warning] [error]");
            System.exit(1);
        }
       
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
       
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
       
        for (String logLevel : args) {
            channel.queueBind(queueName, EXCHANGE_NAME, logLevel);
        }
        System.out.println("[x] Esperando por mensajes. CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] recibido '" +
                    delivery.getEnvelope().getRoutingKey() +
                    "':'" + message + "'");
        };
        boolean autoack = true;
        channel.basicConsume(queueName, autoack, deliverCallback, consumerTag->{});
    }
}


Ejecutamos Productor y Consumidores

Productor
RabbitMQRouting$ java -jar RoutingEmit.jar error "Ha occurido un error."
 [x] Enviado 'error':'Ha occurido un error.'
RabbitMQRouting$ java -jar RoutingEmit.jar info "Se ha enviado un mensaje 1."
 [x] Enviado 'info':'Se ha enviado un mensaje 1.'
RabbitMQRouting$ java -jar RoutingEmit.jar warning  "evento en warning."
 [x] Enviado 'warning':'evento en warning.'
RabbitMQRouting$ java -jar RoutingEmit.jar error "Ha occurido un error2."
 [x] Enviado 'error':'Ha occurido un error2.'
RabbitMQRouting$ java -jar RoutingEmit.jar info "Se ha enviado un mensaje 2."
 [x] Enviado 'info':'Se ha enviado un mensaje 2.'
RabbitMQRouting$ java -jar RoutingEmit.jar warning  "evento en warning2."
 [x] Enviado 'warning':'evento en warning2.'
RabbitMQRouting$ java -jar RoutingEmit.jar error "Ha occurido un error3."
 [x] Enviado 'error':'Ha occurido un error3.'

Consumidor 1

RabbitMQRouting$ java -jar RoutingReceive.jar warning error > logs_de_rabbit.log
 
Consumidor 2
RabbitMQRouting$ java -jar RoutingReceive.jar info warning
[x] Esperando por mensajes. CTRL+C para salir.
[x] recibido 'info':'Se ha enviado un mensaje 1.'
[x] recibido 'warning':'evento en warning.'
[x] recibido 'info':'Se ha enviado un mensaje 2.'
[x] recibido 'warning':'evento en warning2.'
 
Consumidor 3
RabbitMQRouting$ java -jar RoutingReceive.jar info warning error
[x] Esperando por mensajes. CTRL+C para salir.
[x] recibido 'error':'Ha occurido un error.'
[x] recibido 'info':'Se ha enviado un mensaje 1.'
[x] recibido 'warning':'evento en warning.'
[x] recibido 'error':'Ha occurido un error2.'
[x] recibido 'info':'Se ha enviado un mensaje 2.'
[x] recibido 'warning':'evento en warning2.'
[x] recibido 'error':'Ha occurido un error3.' 
 


Una vez corriendo, podemos indagar algo de información del RabbitMQ admin

Los queues temporales que generan los tres consumidores


Los RoutingKeys de los consumidores,




El exchange nombrado direct_logs, los binds asociados a él hacia los queues temporales.


Los Canales, uno por cada consumidor

Detalle de un channel.


 Las Conexiones.

Detalle de Conexion.

eot

martes, 23 de julio de 2019

RabbitMQ Publish/Subscribe

RabbitMQ Publish/Subscribe



referencia: https://www.rabbitmq.com/tutorials/tutorial-three-java.html
https://github.com/jalbertomr/RabbitMQPublishSubscribe.git

   Se muestra una aplicación donde se generan mensajes tipo log, donde se hacen llegar a los consumidores. Este es un broadcast a varios Consumidores.

   Los elementos que participan en una comunicación en AMQP son el Productor, Exchange, Bind, Queue, Consumer. El Exchange en un intermediario entre el productor y los queues dando opciones de comportamiento en los enlaces, estos pueden ser del tipo:

- Direct.
- Topic.
- Headers.
- fanout.

  En este caso utilizamos el fanout para hacer como broadcast a los consumidores. Al exchange le asignaremos un nombre "log". puede haber exchanges sin nombre, cuando se le asigna "". en este caso el mensaje será entregado por medio del routingKey. si se proporciona.

   En caso del Queue, se asigna como temporal, ya que no importa almacenar todos los mensajes a lo largo del tiempo, solo interesa los últimos mensajes que pudieran generarse. en este caso se generan Queues Temporales. este tipo de queue se eliminan cuando se desconecta el Consumidor.

  El bind se hace del exchange "log" al queue temporal.
  El RoutingKey no es necesaria cuando el exchange es de tipo fanout, ya que se envian a todos los consumidores no habiendo necesidad de distingirlo.

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";
   
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
           
            String message =  args.length < 1 ? "info: EmitLog RabbitMQ"
                                              : String.join(" ", args);
            String routingKey = "";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println("[x] Enviado '"+ message + "'");
        }
    }
}

public class ReceiveLog {
    private static final String EXCHANGE_NAME = "logs";
   
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
       
        System.out.println("[x] Esperando por Mensajes. CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (coneumerTag, delivery) -> {
            String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("[x] Recivido '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->{});
    }
}




Ejecutamos el un Consumidor en terminal y otro que envie datos a archivo. uno tomara datos de un queue temporal y el otro de otro queue temporal.

RabbitMQPublishSubscribe$ java -jar PublishSubscribeReceive.jar
[x] Esperando por Mensajes. CTRL+C para salir.
[x] Recivido 'info: EmitLog RabbitMQ'
[x] Recivido 'Algo Bueno pasa'
[x] Recivido 'Algo Bueno PAsara'
[x] Recivido 'Algo Bueno esta pasando'
[x] Recivido 'Algo Bueno pasa'
[x] Recivido 'Algo Bueno esta pasando'

RabbitMQPublishSubscribe$ java -jar PublishSubscribeReceive.jar > log_de_rabbitmq.log

Los mensajes son generados por el Productor en otra terminal

RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar
[x] Enviado 'info: EmitLog RabbitMQ'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno pasa
[x] Enviado 'Algo Bueno pasa'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno esta pasando
[x] Enviado 'Algo Bueno esta pasando'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno pasa
[x] Enviado 'Algo Bueno pasa'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno esta pasando
[x] Enviado 'Algo Bueno esta pasando'

Revisamos los exchanges, observamos el que creamos llamado logs de tipo fanout
RabbitMQPublishSubscribe$ sudo rabbitmqctl list_exchanges
Listing exchanges
amq.match headers
 direct
amq.rabbitmq.trace topic
amq.rabbitmq.log topic
amq.headers headers
amq.topic topic
amq.fanout fanout
amq.direct direct
logs fanout

Revisamos los binds
RabbitMQPublishSubscribe$ sudo rabbitmqctl list_bindings 
Listing bindings
 exchange amq.gen-VRS1QdByMqGtnASZ4CkSSw queue amq.gen-VRS1QdByMqGtnASZ4CkSSw []
 exchange amq.gen-mDIBZZRQgEqE4zgVlfXtcw queue amq.gen-mDIBZZRQgEqE4zgVlfXtcw []
 exchange task-queue queue task-queue []
logs exchange amq.gen-VRS1QdByMqGtnASZ4CkSSw queue  []
logs exchange amq.gen-mDIBZZRQgEqE4zgVlfXtcw queue  []

Los queues
RabbitMQPublishSubscribe$ sudo rabbitmqctl list_queues
Listing queues
task-queue 0
amq.gen-mDIBZZRQgEqE4zgVlfXtcw 0
amq.gen-VRS1QdByMqGtnASZ4CkSSw 0



eot

lunes, 22 de julio de 2019

RabbitMQ Work Queues (EIP Competing Consumers Pattern)

RabbitMQ Work Queues (EIP Competing Consumers Pattern)

referencias https://www.rabbitmq.com/tutorials/tutorial-two-java.html
https://github.com/jalbertomr/RabbitMQWorkQueues.git



Este tipo de trabajo de queues corresponde al Enterprise Integration Pattern Competing Consumers Pattern.


El productor se modifica un poco del ejemplo anterior, se le agrega un parametro que sera el mensaje que se enviara desde linea de comandos.

package com.bext;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class NewTask {
    private final static String TASK_QUEUE_NAME = "task-queue";
   
    public static void main(String[] args) throws Exception{
       
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME,false, false,false, null);
            String message = String.join(" ", args[0]);
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] enviado '" + message + "'");
        }
    }
}


El consumidor

package com.bext;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {
    public final static String TASK_QUEUE_NAME = "task-queue";
   
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println("Esperando por mensajes... CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] recibido: '" + message +"'");
            try {
              doWork(message);   
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("[x] Hecho.");
            }
        };
        boolean autoAck = true; // acknowledgment
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag->{});
    }
   
    private static void doWork(String task) throws InterruptedException {
        for (char ch: task.toCharArray()) {
            if ( ch == '.') Thread.sleep(1000);
        }
    }
}


Para ejecutar en terminal el los consumers requeridos, exportamos el proyecto como runnable jar file.


ejecutando 3 Workers (Consumers) y un NewTask (Producer).


  Si observamos, los workers se distribuyen alternadamente los mensajes que  envia nuestro productor, si creamos otro Consumidor se captará también los mensajes, esto nos habla de escalabilidad.

   Por default RabbitMQ envia secuencialmente los mensajes a los consumidores, esto es en promedio cada consumidor recibirá el mismo número de mensajes. (Round Robin).

Message Acknowledgment

   Hasta el momento no pasa nada si un Consumidor toma un mensaje pero no lo procesa totalmente, por cualquier razón controlada o no controlada, es te mensaje se perderá y el efecto que se esperaba de él. Para solventar esto se usa el Knowledgment, esto es, cuando el Consumidor termina totalmente de procesar el mensaje envía un Acknowledgement al broker message para que el broker se despreocupe por reenviarlo nuevamente. de lo contrario si no recibe un Aknowledgement por parte del Consumidor, no lo borrara de su queue y lo enviará nuevamente. El mensaje de Acknowledgment debe hacerse por el mismo canal por el que se recibio originalemente el mensaje si no se generará una excepción (channel-level protocol exceptión).

Si en nuestro ejemplo Consumidor (Worker) lo modificamos para que por parametro le indiquemos si será AutoAck o No, al pasarle cualquier parámetro desde linea de comandos el autoacknowledge será deshabilitado. Ahora necesitamos una manera de obligar al Consumidor que salga para que el procesamiento del mensaje quede incompleto, esto lo haremos al tener en contenido del mensaje algún '*' .

Hacemos el ejercicio de enviar un mensaje que obligara al Consumidor abortar, esto con AutoAck, esto hará que el mensaje no se pierda y quede disponible nuevamente. utilizaremos la linea de comando para monitorear el estado de RabbitMQ.

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

Tenemos 0 mensajes listos y 0 con señal unacknowledge.

Enviamos un mensaje para que sea abortado y monitoriamos el estatus

RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1*
 [x] enviado 'mensaje1*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0

Tenemos un mensaje en espera, ahora corremos el Consumidor en modo autoacknowledge, este se abortará, pero el mensaje estará aún disponible para un nuevo intento.

RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1*'

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0

  Para limpiar el mensaje, lo haremos desde el admin de RabbitMQ con requeue con NO para que lo elimine.


Verificamos en linea de comando

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

Ahora, sin autoacknowledge, el Consumidor no indicará que ya proceso el mensaje por que se abortará con el mensaje.

El Consumidor en modo noautoack

RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1*'

Enviamos el mensaje abortivo

RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1*
 [x] enviado 'mensaje1*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

El mensaje no esta disponible para volver a ser enviado.

  El mismo ejercicio pero ejecutando primero el Productor y después el Consumidor.

bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1*
 [x] enviado 'mensaje1*'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0

Consumidor
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1*'

Estatus
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

Ejercicio con tres mensajes

RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
 [x] enviado 'mensaje1.'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje2..
 [x] enviado 'mensaje2..'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 2 0
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje3*
 [x] enviado 'mensaje3*'
 
Consumidor NOAUTOACK

RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje2..'
[x] Hecho.
[x] recibido: 'mensaje3*'

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

Con NOAUTOACK los mensajes y el estatus simplemente pierden cuidado en mantenerlos nuevamente listos para ser disponibles nuevamente.

Ejercicio con tres mensajes con Consumidor AUTOACK.

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
 [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje2..
 [x] enviado 'mensaje2..'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 2

El Consumidor con AUTOACK a tomado los dos mensajes pero aun espera el AUTOACK.

Enviamos un mensaje abortivo. El Consumidor aborta pero los mensajes estan nuevamente disponibles.

RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje3*
 [x] enviado 'mensaje3*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 3 0
 
Para limpiar los mensajes ahora ejecutaremos el Consumidor en modo noAutoAck

RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje2..'
[x] Hecho.
[x] recibido: 'mensaje3*'

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

Observamos que al enviar mensajes con AUTOACK en el consumidor los mensajes son enviados, pero su contador de messages_unacknowledged se incrementa y si abortamos el consumidor quedan los mensajes nuevamente disponibles. Debemos encontrar el modo de enviar la señal de ACK por parte del consumidor al Broker message avisando que ya proceso el mensaje. algo parecido al commit en base de Datos. para enviar esta señal lo hacemos por el channel después de haber hecho el trabajo con el mensaje.

https://www.rabbitmq.com/confirms.html
En el Consumidor agragamos el basickAck.
         ...
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] recibido: '" + message +"'");
            try {
              doWork(message);   
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(0, true);
                System.out.println("[x] Hecho.");
            }

         ...
  
Ahora Probamos que no se rehabiliten los mensajes 1, 2 al abortar el tercero.

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
 [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje2.
 [x] enviado 'mensaje2.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje3*
 [x] enviado 'mensaje3*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0

El consumidor

RabbitMQWorkQueues$ java -jar WorkQueuWorker.jar 
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje2.'
[x] Hecho.
[x] recibido: 'mensaje3*'

 Bien, ahora, ya tenemos los dos primeros mensajes procesador y liberados, y el tercero que fue abortado, disponible para ser reprocesado nuevamente.

nota: La forma adecuada de hacer el basicAck.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

Message Durability

  Cuando el servidor RabbitMQ se cae, los mensajes que estan esperando ser consumidos se pierden, Así que existe una alternativa para darle persistencia a estos mensajes, no en un 100%, pero funcional.
   Para esto hay que declarar al queue Durable, para que tenga efecto en el queue, debe de hacerse en cambio en uno que no tenga el mismo nombre, o eliminar y crear nuevamente el queue, esto debe aplicarse tanto al Productor como al Consumidor.
..
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false,false, null);
..
          
En el Productor especificamos el tipo de mensaje como Persistente

channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Y probamos la Persistencia de los mensajes, generando algunos, tirando el servidor RabbitMQ y levantandolo para consultar los mensajes.

RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
 [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje12..
 [x] enviado 'mensaje12..'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 2 0
RabbitMQWorkQueues$ sudo systemctl stop rabbitmq-server.service
[sudo] password for bext: 
RabbitMQWorkQueues$ sudo systemctl start rabbitmq-server.service

Ya levantado, checamos los mensajes.

RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 2 0

Ahora Consumimos los mensajes
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje12..'
[x] Hecho.
^Cbext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ 

Checamos status
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0

Fair Dispatch (consumo equilibrado)

   Hablemos de como se consumen los mensajes por un grupo de consumidores. haremos un ejercicio donde creamos un grupo de mensajes en una terminal.
en otra terminal corremos un consumidor, y luego en otra terminal otro consumidor.

Productor
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
 [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje10..........
 [x] enviado 'mensaje10..........'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
 [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje10..........
 [x] enviado 'mensaje10..........'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
^[[A [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje10..........
 [x] enviado 'mensaje10..........'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ 

Consumidor 1
RabbitMQWorkQueues$ java -jar WorkQueuWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje10..........'
[x] Hecho.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje10..........'
[x] Hecho.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje10..........'
[x] Hecho.
 
Consumidor 2
RabbitMQWorkQueues$ java -jar WorkQueuWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado

En este caso observamos que todos los mensajes que estaban en el queue fueron consumidos por un solo consumidor, esto es por que fueron asignados al consumidor1 en el momento que se ejecuto. el Consumidor2 no le toco ninguno.


Otro Ejemplo, Con los dos Consumidores en linea, liberaremos mensajes que tardan 1 seg en terminar de procesarse y de 10 seg en procesarse alternadamente. Veamos como los consumen.



Observamos que el Consumidor1 toma todos los mensajes1 de 1 seg, y el Consumidor2 toma todos los de 10 seg. Este comportamiento hace que un consumidor siempre tenga la carga más pesada.

  Para resolver esto, hay que indicarle al channel que solo precarge un mensaje, manipulando de esta manera el Qos (quality of service).

  En el consumidor (Worker) indicamos el basicQos.
...
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

...

Realizamos el mismo ejercicio anterior, Dos consumidores en linea y generamos mensajes alternados de 1 y 10 seg. en diferentes terminales.


Ahora vemos que los mensajes se distribuyen equitativemente entre los dos consumidores.

Repitiendo el ejemplo donde cargamos el queue con mensajes alternados de 1 y 10 seg. y después corremos el consumidor1 y el consumidor2.


Ahora vemos que tambien se disrtribuye la carga equitativamente los mensajes entre los consumidores.



eot

RabbitMQ simplest Producer -> queue -> Consumer, Maven Parent, maven modules

 RabbitMQ simplest Producer -> queue -> Consumer

Prerequisitos, tener corriendo el servidor rabbitMQ.
referencias https://www.rabbitmq.com/getstarted.html





 Crearemos este proyecto como maven, con dos modulos y un parent. un módulo sera el productor y el otro el consumidor. desde el proyecto parent maven podremos constuir ambos modulos (proyectos).

1.- Creamos el proyecto parent con nombre RabbitMQSimplest, el packaging se le indica como pom. el groupId, el artifactId, en este caso le indicaremos propiedades de que tipo de versión java usar.

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

y sus dependencias requeridas por rabbitMQ.

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.bext</groupId>
        <artifactId>RabbitMQSimplest</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>RabbitMQSimplestReceive</artifactId>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>

</project>


y así queda creado el proyecto parent.

2.- Creamos los dos subproyectos, como maven modules, sobre el parent proyect con new maven module, como cualquier otro proyecto.


De esta manera tenemos el proyecto del Productor y Consumidor agrupado. lo podemos correr desde el ide o desde linea de comandos.

bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQSimplest$ mvn clean
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] 
[INFO] RabbitMQSimplest                                                   [pom]
[INFO] RabbitMQSimplestSend                                               [jar]
[INFO] RabbitMQSimplestReceive                                            [jar]
[INFO] 
[INFO] ---------------------< com.bext:RabbitMQSimplest >----------------------
[INFO] Building RabbitMQSimplest 0.0.1-SNAPSHOT                           [1/3]
[INFO] --------------------------------[ pom ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ RabbitMQSimplest ---
[INFO] 
[INFO] -------------------< com.bext:RabbitMQSimplestSend >--------------------
[INFO] Building RabbitMQSimplestSend 0.0.1-SNAPSHOT                       [2/3]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ RabbitMQSimplestSend ---
[INFO] Deleting /home/bext/eclipse-workspace/RabbitMQSimplest/RabbitMQSimplestSend/target
[INFO] 
[INFO] ------------------< com.bext:RabbitMQSimplestReceive >------------------
[INFO] Building RabbitMQSimplestReceive 0.0.1-SNAPSHOT                    [3/3]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ RabbitMQSimplestReceive ---
[INFO] Deleting /home/bext/eclipse-workspace/RabbitMQSimplest/RabbitMQSimplestReceive/target
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for RabbitMQSimplest 0.0.1-SNAPSHOT:
[INFO] 
[INFO] RabbitMQSimplest ................................... SUCCESS [  0.293 s]
[INFO] RabbitMQSimplestSend ............................... SUCCESS [  0.006 s]
[INFO] RabbitMQSimplestReceive ............................ SUCCESS [  0.005 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  0.535 s
[INFO] Finished at: 2019-07-22T17:16:53-05:00
[INFO] ------------------------------------------------------------------------
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQSimplest$ 

bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQSimplest$ mvn install
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] 
[INFO] RabbitMQSimplest                                                   [pom]
[INFO] RabbitMQSimplestSend                                               [jar]
[INFO] RabbitMQSimplestReceive                                            [jar]
[INFO] 
[INFO] ---------------------< com.bext:RabbitMQSimplest >----------------------
[INFO] Building RabbitMQSimplest 0.0.1-SNAPSHOT                           [1/3]
[INFO] --------------------------------[ pom ]---------------------------------
[INFO] 
[INFO] --- maven-install-plugin:2.4:install (default-install) @ RabbitMQSimplest ---
[INFO] Installing /home/bext/eclipse-workspace/RabbitMQSimplest/pom.xml to /home/bext/.m2/repository/com/bext/RabbitMQSimplest/0.0.1-SNAPSHOT/RabbitMQSimplest-0.0.1-SNAPSHOT.pom
[INFO] 
[INFO] -------------------< com.bext:RabbitMQSimplestSend >--------------------
[INFO] Building RabbitMQSimplestSend 0.0.1-SNAPSHOT                       [2/3]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ RabbitMQSimplestSend ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ RabbitMQSimplestSend ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ RabbitMQSimplestSend ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ RabbitMQSimplestSend ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ RabbitMQSimplestSend ---
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ RabbitMQSimplestSend ---
[INFO] 
[INFO] --- maven-install-plugin:2.4:install (default-install) @ RabbitMQSimplestSend ---
[INFO] Installing /home/bext/eclipse-workspace/RabbitMQSimplest/RabbitMQSimplestSend/target/RabbitMQSimplestSend-0.0.1-SNAPSHOT.jar to /home/bext/.m2/repository/com/bext/RabbitMQSimplestSend/0.0.1-SNAPSHOT/RabbitMQSimplestSend-0.0.1-SNAPSHOT.jar
[INFO] Installing /home/bext/eclipse-workspace/RabbitMQSimplest/RabbitMQSimplestSend/pom.xml to /home/bext/.m2/repository/com/bext/RabbitMQSimplestSend/0.0.1-SNAPSHOT/RabbitMQSimplestSend-0.0.1-SNAPSHOT.pom
[INFO] 
[INFO] ------------------< com.bext:RabbitMQSimplestReceive >------------------
[INFO] Building RabbitMQSimplestReceive 0.0.1-SNAPSHOT                    [3/3]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ RabbitMQSimplestReceive ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ RabbitMQSimplestReceive ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ RabbitMQSimplestReceive ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ RabbitMQSimplestReceive ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ RabbitMQSimplestReceive ---
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ RabbitMQSimplestReceive ---
[INFO] 
[INFO] --- maven-install-plugin:2.4:install (default-install) @ RabbitMQSimplestReceive ---
[INFO] Installing /home/bext/eclipse-workspace/RabbitMQSimplest/RabbitMQSimplestReceive/target/RabbitMQSimplestReceive-0.0.1-SNAPSHOT.jar to /home/bext/.m2/repository/com/bext/RabbitMQSimplestReceive/0.0.1-SNAPSHOT/RabbitMQSimplestReceive-0.0.1-SNAPSHOT.jar
[INFO] Installing /home/bext/eclipse-workspace/RabbitMQSimplest/RabbitMQSimplestReceive/pom.xml to /home/bext/.m2/repository/com/bext/RabbitMQSimplestReceive/0.0.1-SNAPSHOT/RabbitMQSimplestReceive-0.0.1-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for RabbitMQSimplest 0.0.1-SNAPSHOT:
[INFO] 
[INFO] RabbitMQSimplest ................................... SUCCESS [  0.502 s]
[INFO] RabbitMQSimplestSend ............................... SUCCESS [  1.235 s]
[INFO] RabbitMQSimplestReceive ............................ SUCCESS [  0.038 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.981 s
[INFO] Finished at: 2019-07-22T17:18:30-05:00
[INFO] ------------------------------------------------------------------------

El Productor


public class Send {
    private final static String QUEUE_NAME = "simple-queue";
   
    public static void main(String[] args) throws Exception{
       
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
               Channel    channel = connection.createChannel()) {
            boolean durable = false;
            boolean exclusive = false;
            boolean autoDelete = false;
            Map<String,Object> arguments = null;
            channel.queueDeclare(QUEUE_NAME,durable, exclusive, autoDelete,      arguments);
           
            String message = "simple mensaje";
            String exchange = "";
            String routingKey = QUEUE_NAME;
            AMQP.BasicProperties props = null;
            channel.basicPublish(exchange, routingKey, props, message.getBytes());
           
            System.out.println(" [x] enviado '" + message + "'");
        }
    }
}


  En el código creamos una ConnectionFactory que implica la creación de sockets, negociación de protocolo según la versión, autenticación. Se conecta al host que tiene el broker rabbitmq, si esta en otra máquina se pondrá la respectiva IP.

  Se crea un canal en un try-with-resources que cierran el canal implicitamente ya que la connexión y el canal implementan java.io.Closeable.

Declaramos un queue por el que publicaremos el mensaje. esto dentro del try.
la creación del queue es idempotente, así que se creara solo si no existe, el mensaje es un arreglo de bytes por lo que podemos meter lo que queramos.

El Consumidor


public class Receive {
    public final static String QUEUE_NAME = "simple-queue";
   
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
       
        boolean durable = false;
        boolean exclusive = false;
        boolean autoDelete = false;
        Map<String,Object> arguments = null;
        channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, arguments);
       
        System.out.println("Esperando por mensajes... CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] recibido: '" + message +"'");
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag->{});
    }
}

  En el Consumidor hacemos lo mismo al principio que en el Productor, es decir creamos la conexión el canal, declaramos el mismo queue donde el productor deposita sus mensajes, y lo consumimos, indicandole una callback functión, que permite manejarlo asincronamente, cuyos parametros toman información del mensaje y con esta tomamos el cuerpo del mensaje y lo desplegamos.

La Prueba


  Para hacer nuestra prueba, correremos tres consumidores, cada uno en una terminal, y despues en otra terminal enviaremos varios mensajes. Vemos que los consumidores van tomando alternadamente los mensajes.


En RabbitMQ Admin vemos que se crea el Queue, que hay tres canales y no un exchange.

Cuando terminamos El consumidor su conexión y canal se cierra.

RabbitMQSimplest$ sudo rabbitmqctl list_queues
Listing queues
simple-queue 0 
 







eot