RabbitMQ RPC Request/Reply Pattern
referencias:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
https://github.com/jalbertomr/RabbitMQRPC.git
Cuando queremos ejecutar una función que esta ubicada en una máquina remota y esperamos el resultado, se apega a un patrón conocido como Remote Procedure Call (RPC).
Veremos que con RabbitMQ podemos construir un sistema RPC con los elementos que nos proporciona. Tendremos un cliente y un servidor RPC escalable. El proceso remoto será una función que calcula números fibonacci.
El Cliente manda a llamar la función remota con call y blockea hasta que recibe la respuesta. (una alternativa es usar una secuencia de procesos de forma asincrona por aquello del non-blocking).
Se necesita una forma de decirle al servidor por que queue regresará la respuesta, esto se lo indicamos al cliente al momento de hacer la publicación por medio de propiedades enviada en el parametro de la función de publicación. podiramos utlizar también el default-queue.
Existen varias propiedades que salen de AMQP.Properties, entre ellas la de replayTo donde indicamos el queue, y correlationId, que nos servira para identificar el mensaje que le corresponde al cliente al emitirlo.
CorrelationId
Se crea un solo queue por cliente, El cliente recibirá una respuesta que tomará del queue de respuesta y verificará que el correlationId sea el mismo, o sea, el que le corresponde, de no ser así, simplemente lo ignorará. este no se debe perder pues esta en un queue que lo retoma.
En Resumen la esta implementación de RPC con RabbitMQ funciona de la siguiente manera:
El cliente genera para cada request propiedades unicas como el queue temporal y exclusivo que es en donde espera la respuesta, y el correlationId. Este mensaje en el request es enviado a un queue "rpc_queue". El server o RPC worker espera algún request en el "rpc_queue", al llegar genera la respuesta y la envía el queue temporal que le mando el cliente en las propiedades del mensaje.
El cliente espera por la respuesta en el queue temporal exclusivo, cuando aparece esta respuesta verifica que sea la misma correlationId para regresar el mensaje.
Server
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); ) {
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(RPC_QUEUE_NAME, durable, exclusive, autoDelete, null /*arguments*/);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println("[x] Esperando peticiones RPC...");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println("[.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println("[.] " + e.toString());
} finally {
String exchange = "";
String routingKey = delivery.getProperties().getReplyTo();
channel.basicPublish(exchange, routingKey, replyProps, response.getBytes("UTF-8"));
boolean multiple = false;
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), multiple);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized(monitor) {
monitor.notify();
}
}
};
boolean autoAck = false;
channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag->{} );
// Wait and be prepared to consume the message from RPC client.
while(true) {
synchronized(monitor) {
try {
monitor.wait();
} catch (InterruptedException e2) {
e2.printStackTrace();
}
}
}
}
}
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
}
Cliente
El Bloqueo se hace por BlockingQueue.
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] args) {
try( RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0 ; i < 32; i++ ) {
String i_str = Integer.toString(i);
System.out.println("[x] Peticion de fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println("[.] Se obtiene '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
private String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
String exchange = "";
String routingKey = requestQueueName;
channel.basicPublish(exchange, routingKey, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
boolean autoAck = true;
String ctag = channel.basicConsume( replyQueueName, autoAck,
(consumerTag, delivery)-> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(),"UTF-8"));
}
}, consumerTag->{}
);
String result = response.take();
channel.basicCancel(ctag);
return result;
}
@Override
public void close() throws IOException {
connection.close();
}
}
La Prueba
Ejecutando Tres Workers, para las 32 peticiones de trabajo del cliente, el trabajo se escala y el tiempo de respuesta es menor.
eot
No hay comentarios:
Publicar un comentario