martes, 23 de julio de 2019

RabbitMQ Publish/Subscribe

RabbitMQ Publish/Subscribe



referencia: https://www.rabbitmq.com/tutorials/tutorial-three-java.html
https://github.com/jalbertomr/RabbitMQPublishSubscribe.git

   Se muestra una aplicación donde se generan mensajes tipo log, donde se hacen llegar a los consumidores. Este es un broadcast a varios Consumidores.

   Los elementos que participan en una comunicación en AMQP son el Productor, Exchange, Bind, Queue, Consumer. El Exchange en un intermediario entre el productor y los queues dando opciones de comportamiento en los enlaces, estos pueden ser del tipo:

- Direct.
- Topic.
- Headers.
- fanout.

  En este caso utilizamos el fanout para hacer como broadcast a los consumidores. Al exchange le asignaremos un nombre "log". puede haber exchanges sin nombre, cuando se le asigna "". en este caso el mensaje será entregado por medio del routingKey. si se proporciona.

   En caso del Queue, se asigna como temporal, ya que no importa almacenar todos los mensajes a lo largo del tiempo, solo interesa los últimos mensajes que pudieran generarse. en este caso se generan Queues Temporales. este tipo de queue se eliminan cuando se desconecta el Consumidor.

  El bind se hace del exchange "log" al queue temporal.
  El RoutingKey no es necesaria cuando el exchange es de tipo fanout, ya que se envian a todos los consumidores no habiendo necesidad de distingirlo.

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";
   
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
           
            String message =  args.length < 1 ? "info: EmitLog RabbitMQ"
                                              : String.join(" ", args);
            String routingKey = "";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println("[x] Enviado '"+ message + "'");
        }
    }
}

public class ReceiveLog {
    private static final String EXCHANGE_NAME = "logs";
   
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
       
        System.out.println("[x] Esperando por Mensajes. CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (coneumerTag, delivery) -> {
            String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("[x] Recivido '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->{});
    }
}




Ejecutamos el un Consumidor en terminal y otro que envie datos a archivo. uno tomara datos de un queue temporal y el otro de otro queue temporal.

RabbitMQPublishSubscribe$ java -jar PublishSubscribeReceive.jar
[x] Esperando por Mensajes. CTRL+C para salir.
[x] Recivido 'info: EmitLog RabbitMQ'
[x] Recivido 'Algo Bueno pasa'
[x] Recivido 'Algo Bueno PAsara'
[x] Recivido 'Algo Bueno esta pasando'
[x] Recivido 'Algo Bueno pasa'
[x] Recivido 'Algo Bueno esta pasando'

RabbitMQPublishSubscribe$ java -jar PublishSubscribeReceive.jar > log_de_rabbitmq.log

Los mensajes son generados por el Productor en otra terminal

RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar
[x] Enviado 'info: EmitLog RabbitMQ'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno pasa
[x] Enviado 'Algo Bueno pasa'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno esta pasando
[x] Enviado 'Algo Bueno esta pasando'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno pasa
[x] Enviado 'Algo Bueno pasa'
RabbitMQPublishSubscribe$ java -jar PublishSubscribeEmit.jar Algo Bueno esta pasando
[x] Enviado 'Algo Bueno esta pasando'

Revisamos los exchanges, observamos el que creamos llamado logs de tipo fanout
RabbitMQPublishSubscribe$ sudo rabbitmqctl list_exchanges
Listing exchanges
amq.match headers
 direct
amq.rabbitmq.trace topic
amq.rabbitmq.log topic
amq.headers headers
amq.topic topic
amq.fanout fanout
amq.direct direct
logs fanout

Revisamos los binds
RabbitMQPublishSubscribe$ sudo rabbitmqctl list_bindings 
Listing bindings
 exchange amq.gen-VRS1QdByMqGtnASZ4CkSSw queue amq.gen-VRS1QdByMqGtnASZ4CkSSw []
 exchange amq.gen-mDIBZZRQgEqE4zgVlfXtcw queue amq.gen-mDIBZZRQgEqE4zgVlfXtcw []
 exchange task-queue queue task-queue []
logs exchange amq.gen-VRS1QdByMqGtnASZ4CkSSw queue  []
logs exchange amq.gen-mDIBZZRQgEqE4zgVlfXtcw queue  []

Los queues
RabbitMQPublishSubscribe$ sudo rabbitmqctl list_queues
Listing queues
task-queue 0
amq.gen-mDIBZZRQgEqE4zgVlfXtcw 0
amq.gen-VRS1QdByMqGtnASZ4CkSSw 0



eot

No hay comentarios:

Publicar un comentario