miércoles, 24 de julio de 2019

RabbitMQ Routing

RabbitMQ Routing

  
Referencia: https://www.rabbitmq.com/tutorials/tutorial-four-java.html
https://github.com/jalbertomr/RabbitMQRouting.git

   A diferencia de enviar un mensaje a varios consumidores, podemos enviar mensajes a un subconjunto de los consumidores, o un consumidor puede recibir mensajes de un grupo de tipos de mensajes.

Binding

   Por medio del RoutingKey del binging definimos una ruta entre el exchange y los consumidores. El consumidor se enlaza haciendo un canal especificando el exchange, y el RoutingKey que en este caso es el logLevel. el queue es temporal.
Se hace notar que podemos tener multiples binding o sea de un productor a varios consumidores.

Exchange

  El exchange se especifica de tipo Direct.


   Ejecutamos en varias terminales Consumidores configurados con direfentes características, uno para recibir warning y error y lo envía a archivo. otro para recibir info, warning, error, y otro para recibir info, warning. Y un Productor que genera varios mensajes de diferente nivel de log. Estas configuraciones se hacen desde linea de comandos que especifivan el RoutingKey que recibirá el Consumidor.


public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
           
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
           
            String logLevel = getLogLevel(args);
            String message = getMessage(args);
           
            AMQP.BasicProperties props = null;
            channel.basicPublish(EXCHANGE_NAME, logLevel, props, message.getBytes("UTF-8"));
            System.out.println(" [x] Enviado '" + logLevel + "':'" + message + "'");
        }
    }

    private static String getMessage(String[] args) {
        return args[1];
    }

    private static String getLogLevel(String[] args) {
        return args[0];
    }
}


..
public class ReceiveLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
   
    public static void main(String[] args) throws IOException, TimeoutException {
        if (args.length < 1) {
            System.err.println("Uso: ReceiveLogDirect [info] [warning] [error]");
            System.exit(1);
        }
       
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
       
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
       
        for (String logLevel : args) {
            channel.queueBind(queueName, EXCHANGE_NAME, logLevel);
        }
        System.out.println("[x] Esperando por mensajes. CTRL+C para salir.");
       
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] recibido '" +
                    delivery.getEnvelope().getRoutingKey() +
                    "':'" + message + "'");
        };
        boolean autoack = true;
        channel.basicConsume(queueName, autoack, deliverCallback, consumerTag->{});
    }
}


Ejecutamos Productor y Consumidores

Productor
RabbitMQRouting$ java -jar RoutingEmit.jar error "Ha occurido un error."
 [x] Enviado 'error':'Ha occurido un error.'
RabbitMQRouting$ java -jar RoutingEmit.jar info "Se ha enviado un mensaje 1."
 [x] Enviado 'info':'Se ha enviado un mensaje 1.'
RabbitMQRouting$ java -jar RoutingEmit.jar warning  "evento en warning."
 [x] Enviado 'warning':'evento en warning.'
RabbitMQRouting$ java -jar RoutingEmit.jar error "Ha occurido un error2."
 [x] Enviado 'error':'Ha occurido un error2.'
RabbitMQRouting$ java -jar RoutingEmit.jar info "Se ha enviado un mensaje 2."
 [x] Enviado 'info':'Se ha enviado un mensaje 2.'
RabbitMQRouting$ java -jar RoutingEmit.jar warning  "evento en warning2."
 [x] Enviado 'warning':'evento en warning2.'
RabbitMQRouting$ java -jar RoutingEmit.jar error "Ha occurido un error3."
 [x] Enviado 'error':'Ha occurido un error3.'

Consumidor 1

RabbitMQRouting$ java -jar RoutingReceive.jar warning error > logs_de_rabbit.log
 
Consumidor 2
RabbitMQRouting$ java -jar RoutingReceive.jar info warning
[x] Esperando por mensajes. CTRL+C para salir.
[x] recibido 'info':'Se ha enviado un mensaje 1.'
[x] recibido 'warning':'evento en warning.'
[x] recibido 'info':'Se ha enviado un mensaje 2.'
[x] recibido 'warning':'evento en warning2.'
 
Consumidor 3
RabbitMQRouting$ java -jar RoutingReceive.jar info warning error
[x] Esperando por mensajes. CTRL+C para salir.
[x] recibido 'error':'Ha occurido un error.'
[x] recibido 'info':'Se ha enviado un mensaje 1.'
[x] recibido 'warning':'evento en warning.'
[x] recibido 'error':'Ha occurido un error2.'
[x] recibido 'info':'Se ha enviado un mensaje 2.'
[x] recibido 'warning':'evento en warning2.'
[x] recibido 'error':'Ha occurido un error3.' 
 


Una vez corriendo, podemos indagar algo de información del RabbitMQ admin

Los queues temporales que generan los tres consumidores


Los RoutingKeys de los consumidores,




El exchange nombrado direct_logs, los binds asociados a él hacia los queues temporales.


Los Canales, uno por cada consumidor

Detalle de un channel.


 Las Conexiones.

Detalle de Conexion.

eot

No hay comentarios:

Publicar un comentario