martes, 1 de marzo de 2022

RSocket Spring Boot

 RSocket Spring Boot



Request Response



Test




>rsc --debug --request --data "{\"message\":\"MyMessage\"}" --route request-response --stacktrace tcp://localhost:7000

2022-03-01 19:49:58.863 DEBUG 2744 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2022-03-01 19:49:58.863 DEBUG 2744 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 53
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65                                  |ponse           |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 22 7d                            |ssage"}         |
+--------+-------------------------------------------------+----------------+
2022-03-01 19:49:58.872 DEBUG 2744 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 73
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 20 4d 79 4d 65 73 73 61 67 65 22 2c 22 63 |e: MyMessage","c|
|00000030| 72 65 61 74 65 64 22 3a 31 36 34 36 31 38 35 37 |reated":16461857|
|00000040| 39 38 7d                                        |98}             |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message: MyMessage","created":1646185798}

Console Output:

2022-03-02 12:21:36.746  INFO 27908 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7000
2022-03-02 12:21:36.756  INFO 27908 --- [           main] c.b.r.RsocketsgetstartApplication        : Started RsocketsgetstartApplication in 1.371 seconds (JVM running for 2.07)
2022-03-02 12:21:42.468  INFO 27908 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Received request-response message Message(message=MyMessage, created=1646245302)
2022-03-02 12:21:42.501 ERROR 27908 --- [ctor-http-nio-2] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:550) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]

To Handle the Exception

@MessageMapping("request-response")
Mono<Message> requestResponse(final Message message){
Hooks.onErrorDropped(error->log.warn("Exception happened: {}", error.getMessage()));
log.info("Received request-response message {}", message);
return Mono.just(new Message("in controller message: " + message.getMessage()));
}
Console output:

2022-03-02 12:25:19.156  INFO 25896 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7000
2022-03-02 12:25:19.165  INFO 25896 --- [           main] c.b.r.RsocketsgetstartApplication        : Started RsocketsgetstartApplication in 1.388 seconds (JVM running for 2.191)
2022-03-02 12:25:26.668  INFO 25896 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Received request-response message Message(message=MyMessage, created=1646245526)
2022-03-02 12:25:26.701  WARN 25896 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Exception happened: java.util.concurrent.CancellationException: Disposed

Request Response with @DestinationVariable

@MessageMapping("request-response.{name}")
String requestResponse(@DestinationVariable String name){
log.info("Received request-response.{name} : {}", name);
return "request-response: " + name;
}

>rsc --debug --request  --route request-response.MYDATA --stacktrace tcp://localhost:7000
2022-03-02 16:47:25.121 DEBUG 19236 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2022-03-02 16:47:25.121 DEBUG 19236 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 37
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 18 17 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 2e 4d 59 44 41 54 41             |ponse.MYDATA    |
+--------+-------------------------------------------------+----------------+
Data:

2022-03-02 16:47:25.130 DEBUG 19236 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 30
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 72 65 71 75 65 73 74 2d 72 65 73 70 6f 6e 73 65 |request-response|
|00000010| 3a 20 4d 59 44 41 54 41                         |: MYDATA        |
+--------+-------------------------------------------------+----------------+
request-response: MYDATA

Fire And Forget


@MessageMapping("fire-and-forget")
public Mono<Void> fireAndforget(final Message message){
log.info("-> fire-and-forget request: {}", message);
return Mono.empty();
}
Test

@Test
void fireAndForgetTest(){
//send a fire-and-forget message
Mono<Void> response = requester
.route("fire-and-forget")
.data( new Message("MyMessage"))
.retrieveMono(Void.class);
//assert that the result is a completed mono
StepVerifier
.create(response)
.verifyComplete();
}

>rsc --debug --fnf --data "{\"message\":\"MyMessage\"}" --route fire-and-forget --stacktrace tcp://localhost:7000
2022-03-01 20:40:19.672 DEBUG 24816 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2022-03-01 20:40:19.673 DEBUG 24816 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_FNF Flags: 0b100000000 Length: 52
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo|
|00000010| 72 67 65 74                                     |rget            |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 22 7d                            |ssage"}         |
+--------+-------------------------------------------------+----------------+

console output

2022-03-02 12:58:13.357  INFO 27692 --- [ctor-http-nio-3] c.b.r.RsocketsgetstartApplication        : -> fire-and-forget request: Message(message=MyMessage, created=1646247493)
2022-03-02 12:58:13.368  WARN 27692 --- [ctor-http-nio-3] c.b.r.RsocketsgetstartApplication        : Exception happened: java.util.concurrent.CancellationException: Disposed

Request Stream


@MessageMapping("stream-request")
public Flux<Message> stream(final Message message){
Hooks.onErrorDropped(error->log.warn("Exception happened: {}", error.getMessage()));
log.info("-> stream-request: {}", message);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message("in controller message:" + message.getMessage() + "request #: " + index))
.log();
}
Test

@Test
void requestStreamTest(){
//send a request stream
Flux<Message> streamResp = requester
.route("stream-request")
.data( new Message("MyMessage"))
.retrieveFlux(Message.class);

// verify that the response is a flux stream
StepVerifier
.create(streamResp)
.consumeNextWith( message ->
assertThat(message.getMessage()).isEqualTo("in controller message: MyMessage request #: 0"))
.expectNextCount(0)
.consumeNextWith( message ->
assertThat(message.getMessage()).isEqualTo("in controller message: MyMessage request #: 1"))
.thenCancel()
.verify();
}

>rsc --debug --stream --data "{\"message\":\"MyMessage\"}" --route stream-request --stacktrace tcp://localhost:7000
2022-03-02 12:54:25.267 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2022-03-02 12:54:25.267 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 55 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0f 0e 73 74 72 65 61 6d 2d 72 65 71 75 |.....stream-requ|
|00000010| 65 73 74                                        |est             |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 22 7d                            |ssage"}         |
+--------+-------------------------------------------------+----------------+
2022-03-02 12:54:26.457 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 30 22 2c 22 63 72 65 61 74 65 |st #: 0","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 36 7d       |d":1646247266}  |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 0","created":1646247266}
2022-03-02 12:54:27.446 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 31 22 2c 22 63 72 65 61 74 65 |st #: 1","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 37 7d       |d":1646247267}  |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 1","created":1646247267}
2022-03-02 12:54:28.452 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 32 22 2c 22 63 72 65 61 74 65 |st #: 2","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 38 7d       |d":1646247268}  |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 2","created":1646247268}
2022-03-02 12:54:29.448 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 33 22 2c 22 63 72 65 61 74 65 |st #: 3","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 39 7d       |d":1646247269}  |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 3","created":1646247269}
2022-03-02 12:54:30.446 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 34 22 2c 22 63 72 65 61 74 65 |st #: 4","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 37 30 7d       |d":1646247270}  |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 4","created":1646247270}

The stream flows indefinitly so cancel rsc. ctl-c.

Console output

2022-03-02 12:52:34.417  INFO 27692 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7000
2022-03-02 12:52:34.426  INFO 27692 --- [           main] c.b.r.RsocketsgetstartApplication        : Started RsocketsgetstartApplication in 1.321 seconds (JVM running for 2.031)
2022-03-02 12:54:25.421  INFO 27692 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : -> stream-request: Message(message=MyMessage, created=1646247265)
2022-03-02 12:54:25.437  INFO 27692 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onSubscribe(FluxMap.MapSubscriber)
2022-03-02 12:54:25.438  INFO 27692 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : request(1)
2022-03-02 12:54:26.443  INFO 27692 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=in controller message:MyMessagerequest #: 0, created=1646247266))
2022-03-02 12:54:26.456  INFO 27692 --- [     parallel-2] reactor.Flux.Map.1                       : request(9223372036854775806)
2022-03-02 12:54:27.446  INFO 27692 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=in controller message:MyMessagerequest #: 1, created=1646247267))
2022-03-02 12:54:28.451  INFO 27692 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=in controller message:MyMessagerequest #: 2, created=1646247268))
2022-03-02 12:54:29.448  INFO 27692 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=in controller message:MyMessagerequest #: 3, created=1646247269))
2022-03-02 12:54:30.445  INFO 27692 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=in controller message:MyMessagerequest #: 4, created=1646247270))
2022-03-02 12:54:30.493  WARN 27692 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Exception happened: java.util.concurrent.CancellationException: Disposed
2022-03-02 12:54:30.494  INFO 27692 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : cancel()

Request Stream @DestinationVariable limited elements


@MessageMapping("request-stream.{name}")
public Flux<Message> requestResponselux(@DestinationVariable String name){
Hooks.onErrorDropped(error-> log.warn("Exception happened: {}", error.getMessage()));
log.info("Received request-responseFlux.{name} : {}", name);
return Flux
.fromStream(Stream.generate(() -> new Message("MyMessage-" + name + " @" + Instant.now()) ))
.take(4)
.delayElements(Duration.ofSeconds(1))
.log();
}
>rsc --debug --stream  --route request-responseFlux.MYDATA --stacktrace tcp://localhost:7000
2022-03-02 17:33:56.931 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-02 17:33:56.932 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 45 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 1c 1b 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 46 6c 75 78 2e 4d 59 44 41 54 41 |ponseFlux.MYDATA|
+--------+-------------------------------------------------+----------------+
Data:
2022-03-02 17:33:58.105 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 86
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 37 2e 30 35 37 34 39 36 5a 22 2c 22 63 72 65 61 |7.057496Z","crea|
|00000040| 74 65 64 22 3a 31 36 34 36 32 36 34 30 33 37 7d |ted":1646264037}|
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:57.057496Z","created":1646264037}
2022-03-02 17:33:59.110 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 38 2e 31 30 34 36 36 33 39 30 30 5a 22 2c 22 63 |8.104663900Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 33 38 7d                                        |38}             |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:58.104663900Z","created":1646264038}
2022-03-02 17:34:00.126 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 39 2e 31 30 39 30 31 38 39 30 30 5a 22 2c 22 63 |9.109018900Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 33 39 7d                                        |39}             |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:59.109018900Z","created":1646264039}
2022-03-02 17:34:01.138 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 34 3a 30 |22-03-02T23:34:0|
|00000030| 30 2e 31 32 36 32 35 37 31 30 30 5a 22 2c 22 63 |0.126257100Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 34 30 7d                                        |40}             |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:34:00.126257100Z","created":1646264040}
2022-03-02 17:34:01.139 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:

console output

2022-03-02 17:33:34.371  INFO 15640 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7000
2022-03-02 17:33:34.385  INFO 15640 --- [           main] c.b.r.RsocketsgetstartApplication        : Started RsocketsgetstartApplication in 1.42 seconds (JVM running for 2.148)
2022-03-02 17:33:57.027  INFO 15640 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Received request-responseFlux.{name} : MYDATA
2022-03-02 17:33:57.060  INFO 15640 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1                 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2022-03-02 17:33:57.060  INFO 15640 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1                 : request(1)
2022-03-02 17:33:58.077  INFO 15640 --- [     parallel-2] reactor.Flux.ConcatMap.1                 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:33:57.057496Z, created=1646264037))
2022-03-02 17:33:58.103  INFO 15640 --- [     parallel-2] reactor.Flux.ConcatMap.1                 : request(9223372036854775806)
2022-03-02 17:33:59.106  INFO 15640 --- [     parallel-3] reactor.Flux.ConcatMap.1                 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:33:58.104663900Z, created=1646264038))
2022-03-02 17:34:00.122  INFO 15640 --- [     parallel-4] reactor.Flux.ConcatMap.1                 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:33:59.109018900Z, created=1646264039))
2022-03-02 17:34:01.133  INFO 15640 --- [     parallel-5] reactor.Flux.ConcatMap.1                 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:34:00.126257100Z, created=1646264040))
2022-03-02 17:34:01.138  INFO 15640 --- [     parallel-5] reactor.Flux.ConcatMap.1                 : onComplete()
2022-03-02 17:34:01.146  WARN 15640 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Exception happened: java.util.concurrent.CancellationException: Disposed

Request Channel


@MessageMapping("request-channel")
public Flux<Message> channel(final Flux<Integer> settings){
Hooks.onErrorDropped(error->log.warn("Exception happened: {}", error.getMessage()));
log.info("Received request-channel");

return settings
.doOnNext(setting -> log.info("Requested interval is {} seconds.", setting))
.doOnCancel(() -> log.warn("The client cancel the channel."))
.switchMap(setting -> Flux.interval(Duration.ofSeconds( setting)))
.map(index -> new Message("channel generating response #" + index))
.log();
}

>rsc --debug --channel --data - --route request-channel --stacktrace tcp://localhost:7000
2022-03-02 13:51:46.926 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

1  has been typed 1 from keyboard to generate one message each 1 second.
2022-03-02 13:51:54.167 DEBUG 27436 --- [oundedElastic-1] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_CHANNEL Flags: 0b100000000 Length: 34 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 72 65 71 75 65 73 74 2d 63 68 61 |.....request-cha|
|00000010| 6e 6e 65 6c                                     |nnel            |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31                                              |1               |
+--------+-------------------------------------------------+----------------+
2022-03-02 13:51:54.245 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 9223372036854775807
Data:

2022-03-02 13:51:55.265 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 30 22 2c 22 63 72 65 |esponse #0","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 31 35 |ated":1646250715|
|00000040| 7d                                              |}               |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #0","created":1646250715}
2022-03-02 13:51:56.254 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 31 22 2c 22 63 72 65 |esponse #1","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 31 36 |ated":1646250716|
|00000040| 7d                                              |}               |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #1","created":1646250716}
52022-03-02 13:51:57.242 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 32 22 2c 22 63 72 65 |esponse #2","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 31 37 |ated":1646250717|
|00000040| 7d                                              |}               |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #2","created":1646250717}
5  has been typed 5 from keyboard to generate one message each 1 second.
2022-03-02 13:51:57.521 DEBUG 27436 --- [oundedElastic-1] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100100000 Length: 30
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 72 65 71 75 65 73 74 2d 63 68 61 |.....request-cha|
|00000010| 6e 6e 65 6c                                     |nnel            |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 35                                              |5               |
+--------+-------------------------------------------------+----------------+
2022-03-02 13:52:02.535 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 30 22 2c 22 63 72 65 |esponse #0","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 32 32 |ated":1646250722|
|00000040| 7d                                              |}               |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #0","created":1646250722}
2022-03-02 13:52:06.940 DEBUG 27436 --- [     parallel-1] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14
Data:

2022-03-02 13:52:06.941 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:

2022-03-02 13:52:07.006 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14
Data:

2022-03-02 13:52:07.006 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:

2022-03-02 13:52:07.524 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 31 22 2c 22 63 72 65 |esponse #1","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 32 37 |ated":1646250727|
|00000040| 7d                                              |}               |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #1","created":1646250727}
2022-03-02 13:52:09.163 DEBUG 27436 --- [oundedElastic-1] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:

Console output

2022-03-02 13:51:32.354  INFO 3492 --- [           main] c.b.r.RsocketsgetstartApplication        : No active profile set, falling back to 1 default profile: "default"
2022-03-02 13:51:33.417  INFO 3492 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7000
2022-03-02 13:51:33.427  INFO 3492 --- [           main] c.b.r.RsocketsgetstartApplication        : Started RsocketsgetstartApplication in 1.352 seconds (JVM running for 2.046)
2022-03-02 13:51:54.191  INFO 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Received request-channel
2022-03-02 13:51:54.230  INFO 3492 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onSubscribe(FluxMap.MapSubscriber)
2022-03-02 13:51:54.231  INFO 3492 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : request(1)
2022-03-02 13:51:54.240  INFO 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Requested interval is 1 seconds.
2022-03-02 13:51:55.252  INFO 3492 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=channel generating response #0, created=1646250715))
2022-03-02 13:51:55.265  INFO 3492 --- [     parallel-2] reactor.Flux.Map.1                       : request(9223372036854775806)
2022-03-02 13:51:56.254  INFO 3492 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=channel generating response #1, created=1646250716))
2022-03-02 13:51:57.241  INFO 3492 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Message(message=channel generating response #2, created=1646250717))
2022-03-02 13:51:57.522  INFO 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Requested interval is 5 seconds.
2022-03-02 13:52:02.532  INFO 3492 --- [     parallel-3] reactor.Flux.Map.1                       : onNext(Message(message=channel generating response #0, created=1646250722))
2022-03-02 13:52:07.524  INFO 3492 --- [     parallel-3] reactor.Flux.Map.1                       : onNext(Message(message=channel generating response #1, created=1646250727))
2022-03-02 13:52:09.170  WARN 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : Exception happened: java.util.concurrent.CancellationException: Disposed
2022-03-02 13:52:09.172  INFO 3492 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : cancel()
2022-03-02 13:52:09.172  WARN 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication        : The client cancel the channel.

eot

No hay comentarios:

Publicar un comentario