RabbitMQ Work Queues (EIP Competing Consumers Pattern)
referencias
https://www.rabbitmq.com/tutorials/tutorial-two-java.html
https://github.com/jalbertomr/RabbitMQWorkQueues.git
Este tipo de trabajo de queues corresponde al Enterprise Integration Pattern Competing Consumers Pattern.
El productor se modifica un poco del ejemplo anterior, se le agrega un parametro que sera el mensaje que se enviara desde linea de comandos.
package com.bext;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class NewTask {
private final static String TASK_QUEUE_NAME = "task-queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME,false, false,false, null);
String message = String.join(" ", args[0]);
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] enviado '" + message + "'");
}
}
}
El consumidor
package com.bext;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
public final static String TASK_QUEUE_NAME = "task-queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println("Esperando por mensajes... CTRL+C para salir.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] recibido: '" + message +"'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Hecho.");
}
};
boolean autoAck = true; // acknowledgment
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag->{});
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if ( ch == '.') Thread.sleep(1000);
}
}
}
Para ejecutar en terminal el los consumers requeridos, exportamos el proyecto como runnable jar file.
ejecutando 3 Workers (Consumers) y un NewTask (Producer).
Si observamos, los workers se distribuyen alternadamente los mensajes que envia nuestro productor, si creamos otro Consumidor se captará también los mensajes, esto nos habla de escalabilidad.
Por default RabbitMQ envia secuencialmente los mensajes a los consumidores, esto es en promedio cada consumidor recibirá el mismo número de mensajes. (Round Robin).
Message Acknowledgment
Hasta el momento no pasa nada si un Consumidor toma un mensaje pero no lo procesa totalmente, por cualquier razón controlada o no controlada, es te mensaje se perderá y el efecto que se esperaba de él. Para solventar esto se usa el Knowledgment, esto es, cuando el Consumidor termina totalmente de procesar el mensaje envía un Acknowledgement al broker message para que el broker se despreocupe por reenviarlo nuevamente. de lo contrario si no recibe un Aknowledgement por parte del Consumidor, no lo borrara de su queue y lo enviará nuevamente. El mensaje de Acknowledgment debe hacerse por el mismo canal por el que se recibio originalemente el mensaje si no se generará una excepción (channel-level protocol exceptión).
Si en nuestro ejemplo Consumidor (Worker) lo modificamos para que por parametro le indiquemos si será AutoAck o No, al pasarle cualquier parámetro desde linea de comandos el autoacknowledge será deshabilitado. Ahora necesitamos una manera de obligar al Consumidor que salga para que el procesamiento del mensaje quede incompleto, esto lo haremos al tener en contenido del mensaje algún '*' .
Hacemos el ejercicio de enviar un mensaje que obligara al Consumidor abortar, esto con AutoAck, esto hará que el mensaje no se pierda y quede disponible nuevamente. utilizaremos la linea de comando para monitorear el estado de RabbitMQ.
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
Tenemos 0 mensajes listos y 0 con señal unacknowledge.
Enviamos un mensaje para que sea abortado y monitoriamos el estatus
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1*
[x] enviado 'mensaje1*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0
Tenemos un mensaje en espera, ahora corremos el Consumidor en modo autoacknowledge, este se abortará, pero el mensaje estará aún disponible para un nuevo intento.
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0
Para limpiar el mensaje, lo haremos desde el admin de RabbitMQ con requeue con NO para que lo elimine.
Verificamos en linea de comando
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
Ahora, sin autoacknowledge, el Consumidor no indicará que ya proceso el mensaje por que se abortará con el mensaje.
El Consumidor en modo noautoack
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1*'
Enviamos el mensaje abortivo
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1*
[x] enviado 'mensaje1*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
El mensaje no esta disponible para volver a ser enviado.
El mismo ejercicio pero ejecutando primero el Productor y después el Consumidor.
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1*
[x] enviado 'mensaje1*'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0
Consumidor
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1*'
Estatus
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
Ejercicio con tres mensajes
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
[x] enviado 'mensaje1.'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje2..
[x] enviado 'mensaje2..'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 2 0
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje3*
[x] enviado 'mensaje3*'
Consumidor NOAUTOACK
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje2..'
[x] Hecho.
[x] recibido: 'mensaje3*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
Con NOAUTOACK los mensajes y el estatus simplemente pierden cuidado en mantenerlos nuevamente listos para ser disponibles nuevamente.
Ejercicio con tres mensajes con Consumidor AUTOACK.
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
[x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje2..
[x] enviado 'mensaje2..'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 2
El Consumidor con AUTOACK a tomado los dos mensajes pero aun espera el AUTOACK.
Enviamos un mensaje abortivo. El Consumidor aborta pero los mensajes estan nuevamente disponibles.
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje3*
[x] enviado 'mensaje3*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 3 0
Para limpiar los mensajes ahora ejecutaremos el Consumidor en modo noAutoAck
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar NOAUTOACK
Esperando por mensajes... CTRL+C para salir.
se proporciono un parametro, autoAck DEShabilitado.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje2..'
[x] Hecho.
[x] recibido: 'mensaje3*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
Observamos que al enviar mensajes con AUTOACK en el consumidor los mensajes son enviados, pero su contador de messages_unacknowledged se incrementa y si abortamos el consumidor quedan los mensajes nuevamente disponibles. Debemos encontrar el modo de enviar la señal de ACK por parte del consumidor al Broker message avisando que ya proceso el mensaje. algo parecido al commit en base de Datos. para enviar esta señal lo hacemos por el channel después de haber hecho el trabajo con el mensaje.
https://www.rabbitmq.com/confirms.html
En el Consumidor agragamos el basickAck.
...
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] recibido: '" + message +"'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.basicAck(0, true);
System.out.println("[x] Hecho.");
}
...
Ahora Probamos que no se rehabiliten los mensajes 1, 2 al abortar el tercero.
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
[x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje2.
[x] enviado 'mensaje2.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje3*
[x] enviado 'mensaje3*'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 1 0
El consumidor
RabbitMQWorkQueues$ java -jar WorkQueuWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje2.'
[x] Hecho.
[x] recibido: 'mensaje3*'
Bien, ahora, ya tenemos los dos primeros mensajes procesador y liberados, y el tercero que fue abortado, disponible para ser reprocesado nuevamente.
nota: La forma adecuada de hacer el basicAck.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Message Durability
Cuando el servidor RabbitMQ se cae, los mensajes que estan esperando ser consumidos se pierden, Así que existe una alternativa para darle persistencia a estos mensajes, no en un 100%, pero funcional.
Para esto hay que declarar al queue Durable, para que tenga efecto en el queue, debe de hacerse en cambio en uno que no tenga el mismo nombre, o eliminar y crear nuevamente el queue, esto debe aplicarse tanto al Productor como al Consumidor.
..
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false,false, null);
..
En el Productor especificamos el tipo de mensaje como Persistente
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Y probamos la Persistencia de los mensajes, generando algunos, tirando el servidor RabbitMQ y levantandolo para consultar los mensajes.
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
[x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje12..
[x] enviado 'mensaje12..'
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 2 0
RabbitMQWorkQueues$ sudo systemctl stop rabbitmq-server.service
[sudo] password for bext:
RabbitMQWorkQueues$ sudo systemctl start rabbitmq-server.service
Ya levantado, checamos los mensajes.
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 2 0
Ahora Consumimos los mensajes
RabbitMQWorkQueues$ java -jar WorkQueuesWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje12..'
[x] Hecho.
^Cbext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$
Checamos status
RabbitMQWorkQueues$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
task-queue 0 0
Fair Dispatch (consumo equilibrado)
Hablemos de como se consumen los mensajes por un grupo de consumidores. haremos un ejercicio donde creamos un grupo de mensajes en una terminal.
en otra terminal corremos un consumidor, y luego en otra terminal otro consumidor.
Productor
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
[x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje10..........
[x] enviado 'mensaje10..........'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
[x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje10..........
[x] enviado 'mensaje10..........'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje1.
^[[A [x] enviado 'mensaje1.'
RabbitMQWorkQueues$ java -jar WorkQueuesNewTask.jar mensaje10..........
[x] enviado 'mensaje10..........'
bext@bext-VPCF13WFX:~/eclipse-workspace/RabbitMQWorkQueues$
Consumidor 1
RabbitMQWorkQueues$ java -jar WorkQueuWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje10..........'
[x] Hecho.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje10..........'
[x] Hecho.
[x] recibido: 'mensaje1.'
[x] Hecho.
[x] recibido: 'mensaje10..........'
[x] Hecho.
Consumidor 2
RabbitMQWorkQueues$ java -jar WorkQueuWorker.jar
Esperando por mensajes... CTRL+C para salir.
No se proporciono un parametro, autoAck Habilitado
En este caso observamos que todos los mensajes que estaban en el queue fueron consumidos por un solo consumidor, esto es por que fueron asignados al consumidor1 en el momento que se ejecuto. el Consumidor2 no le toco ninguno.
Otro Ejemplo, Con los dos Consumidores en linea, liberaremos mensajes que tardan 1 seg en terminar de procesarse y de 10 seg en procesarse alternadamente. Veamos como los consumen.
Observamos que el Consumidor1 toma todos los mensajes1 de 1 seg, y el Consumidor2 toma todos los de 10 seg. Este comportamiento hace que un consumidor siempre tenga la carga más pesada.
Para resolver esto, hay que indicarle al channel que solo precarge un mensaje, manipulando de esta manera el Qos (quality of service).
En el consumidor (Worker) indicamos el basicQos.
...
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
int prefetchCount = 1;
channel.basicQos(prefetchCount);
...
Realizamos el mismo ejercicio anterior, Dos consumidores en linea y generamos mensajes alternados de 1 y 10 seg. en diferentes terminales.
Ahora vemos que los mensajes se distribuyen equitativemente entre los dos consumidores.
Repitiendo el ejemplo donde cargamos el queue con mensajes alternados de 1 y 10 seg. y después corremos el consumidor1 y el consumidor2.
Ahora vemos que tambien se disrtribuye la carga equitativamente los mensajes entre los consumidores.
eot