RSocket Spring Boot
Request Response
Test
>rsc --debug --request --data "{\"message\":\"MyMessage\"}" --route request-response --stacktrace tcp://localhost:7000
2022-03-01 19:49:58.863 DEBUG 2744 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-01 19:49:58.863 DEBUG 2744 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 53
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 |ponse |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 22 7d |ssage"} |
+--------+-------------------------------------------------+----------------+
2022-03-01 19:49:58.872 DEBUG 2744 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 73
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 20 4d 79 4d 65 73 73 61 67 65 22 2c 22 63 |e: MyMessage","c|
|00000030| 72 65 61 74 65 64 22 3a 31 36 34 36 31 38 35 37 |reated":16461857|
|00000040| 39 38 7d |98} |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message: MyMessage","created":1646185798}
Console Output:
2022-03-02 12:21:36.746 INFO 27908 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000
2022-03-02 12:21:36.756 INFO 27908 --- [ main] c.b.r.RsocketsgetstartApplication : Started RsocketsgetstartApplication in 1.371 seconds (JVM running for 2.07)
2022-03-02 12:21:42.468 INFO 27908 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Received request-response message Message(message=MyMessage, created=1646245302)
2022-03-02 12:21:42.501 ERROR 27908 --- [ctor-http-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:550) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
To Handle the Exception
@MessageMapping("request-response")
Mono<Message> requestResponse(final Message message){
Hooks.onErrorDropped(error->log.warn("Exception happened: {}", error.getMessage()));
log.info("Received request-response message {}", message);
return Mono.just(new Message("in controller message: " + message.getMessage()));
}
Console output:
2022-03-02 12:25:19.156 INFO 25896 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000
2022-03-02 12:25:19.165 INFO 25896 --- [ main] c.b.r.RsocketsgetstartApplication : Started RsocketsgetstartApplication in 1.388 seconds (JVM running for 2.191)
2022-03-02 12:25:26.668 INFO 25896 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Received request-response message Message(message=MyMessage, created=1646245526)
2022-03-02 12:25:26.701 WARN 25896 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Exception happened: java.util.concurrent.CancellationException: Disposed
Request Response with @DestinationVariable
@MessageMapping("request-response.{name}")
String requestResponse(@DestinationVariable String name){
log.info("Received request-response.{name} : {}", name);
return "request-response: " + name;
}
>rsc --debug --request --route request-response.MYDATA --stacktrace tcp://localhost:7000
2022-03-02 16:47:25.121 DEBUG 19236 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-02 16:47:25.121 DEBUG 19236 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 37
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 18 17 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 2e 4d 59 44 41 54 41 |ponse.MYDATA |
+--------+-------------------------------------------------+----------------+
Data:
2022-03-02 16:47:25.130 DEBUG 19236 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 30
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 72 65 71 75 65 73 74 2d 72 65 73 70 6f 6e 73 65 |request-response|
|00000010| 3a 20 4d 59 44 41 54 41 |: MYDATA |
+--------+-------------------------------------------------+----------------+
request-response: MYDATA
Fire And Forget
@MessageMapping("fire-and-forget")
public Mono<Void> fireAndforget(final Message message){
log.info("-> fire-and-forget request: {}", message);
return Mono.empty();
}
Test
@Test
void fireAndForgetTest(){
//send a fire-and-forget message
Mono<Void> response = requester
.route("fire-and-forget")
.data( new Message("MyMessage"))
.retrieveMono(Void.class);
//assert that the result is a completed mono
StepVerifier
.create(response)
.verifyComplete();
}
>rsc --debug --fnf --data "{\"message\":\"MyMessage\"}" --route fire-and-forget --stacktrace tcp://localhost:7000
2022-03-01 20:40:19.672 DEBUG 24816 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-01 20:40:19.673 DEBUG 24816 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_FNF Flags: 0b100000000 Length: 52
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo|
|00000010| 72 67 65 74 |rget |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 22 7d |ssage"} |
+--------+-------------------------------------------------+----------------+
console output
2022-03-02 12:58:13.357 INFO 27692 --- [ctor-http-nio-3] c.b.r.RsocketsgetstartApplication : -> fire-and-forget request: Message(message=MyMessage, created=1646247493)
2022-03-02 12:58:13.368 WARN 27692 --- [ctor-http-nio-3] c.b.r.RsocketsgetstartApplication : Exception happened: java.util.concurrent.CancellationException: Disposed
Request Stream
@MessageMapping("stream-request")
public Flux<Message> stream(final Message message){
Hooks.onErrorDropped(error->log.warn("Exception happened: {}", error.getMessage()));
log.info("-> stream-request: {}", message);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message("in controller message:" + message.getMessage() + "request #: " + index))
.log();
}
Test
@Test
void requestStreamTest(){
//send a request stream
Flux<Message> streamResp = requester
.route("stream-request")
.data( new Message("MyMessage"))
.retrieveFlux(Message.class);
// verify that the response is a flux stream
StepVerifier
.create(streamResp)
.consumeNextWith( message ->
assertThat(message.getMessage()).isEqualTo("in controller message: MyMessage request #: 0"))
.expectNextCount(0)
.consumeNextWith( message ->
assertThat(message.getMessage()).isEqualTo("in controller message: MyMessage request #: 1"))
.thenCancel()
.verify();
}
>rsc --debug --stream --data "{\"message\":\"MyMessage\"}" --route stream-request --stacktrace tcp://localhost:7000
2022-03-02 12:54:25.267 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-02 12:54:25.267 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 55 InitialRequestN: 9223372036854775807
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0f 0e 73 74 72 65 61 6d 2d 72 65 71 75 |.....stream-requ|
|00000010| 65 73 74 |est |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 22 7d |ssage"} |
+--------+-------------------------------------------------+----------------+
2022-03-02 12:54:26.457 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 30 22 2c 22 63 72 65 61 74 65 |st #: 0","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 36 7d |d":1646247266} |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 0","created":1646247266}
2022-03-02 12:54:27.446 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 31 22 2c 22 63 72 65 61 74 65 |st #: 1","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 37 7d |d":1646247267} |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 1","created":1646247267}
2022-03-02 12:54:28.452 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 32 22 2c 22 63 72 65 61 74 65 |st #: 2","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 38 7d |d":1646247268} |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 2","created":1646247268}
2022-03-02 12:54:29.448 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 33 22 2c 22 63 72 65 61 74 65 |st #: 3","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 36 39 7d |d":1646247269} |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 3","created":1646247269}
2022-03-02 12:54:30.446 DEBUG 19260 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 84
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 69 6e 20 63 |{"message":"in c|
|00000010| 6f 6e 74 72 6f 6c 6c 65 72 20 6d 65 73 73 61 67 |ontroller messag|
|00000020| 65 3a 4d 79 4d 65 73 73 61 67 65 72 65 71 75 65 |e:MyMessagereque|
|00000030| 73 74 20 23 3a 20 34 22 2c 22 63 72 65 61 74 65 |st #: 4","create|
|00000040| 64 22 3a 31 36 34 36 32 34 37 32 37 30 7d |d":1646247270} |
+--------+-------------------------------------------------+----------------+
{"message":"in controller message:MyMessagerequest #: 4","created":1646247270}
The stream flows indefinitly so cancel rsc. ctl-c.
Console output
2022-03-02 12:52:34.417 INFO 27692 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000
2022-03-02 12:52:34.426 INFO 27692 --- [ main] c.b.r.RsocketsgetstartApplication : Started RsocketsgetstartApplication in 1.321 seconds (JVM running for 2.031)
2022-03-02 12:54:25.421 INFO 27692 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : -> stream-request: Message(message=MyMessage, created=1646247265)
2022-03-02 12:54:25.437 INFO 27692 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber)
2022-03-02 12:54:25.438 INFO 27692 --- [ctor-http-nio-2] reactor.Flux.Map.1 : request(1)
2022-03-02 12:54:26.443 INFO 27692 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=in controller message:MyMessagerequest #: 0, created=1646247266))
2022-03-02 12:54:26.456 INFO 27692 --- [ parallel-2] reactor.Flux.Map.1 : request(9223372036854775806)
2022-03-02 12:54:27.446 INFO 27692 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=in controller message:MyMessagerequest #: 1, created=1646247267))
2022-03-02 12:54:28.451 INFO 27692 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=in controller message:MyMessagerequest #: 2, created=1646247268))
2022-03-02 12:54:29.448 INFO 27692 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=in controller message:MyMessagerequest #: 3, created=1646247269))
2022-03-02 12:54:30.445 INFO 27692 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=in controller message:MyMessagerequest #: 4, created=1646247270))
2022-03-02 12:54:30.493 WARN 27692 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Exception happened: java.util.concurrent.CancellationException: Disposed
2022-03-02 12:54:30.494 INFO 27692 --- [ctor-http-nio-2] reactor.Flux.Map.1 : cancel()
Request Stream @DestinationVariable limited elements
@MessageMapping("request-stream.{name}")
public Flux<Message> requestResponselux(@DestinationVariable String name){
Hooks.onErrorDropped(error-> log.warn("Exception happened: {}", error.getMessage()));
log.info("Received request-responseFlux.{name} : {}", name);
return Flux
.fromStream(Stream.generate(() -> new Message("MyMessage-" + name + " @" + Instant.now()) ))
.take(4)
.delayElements(Duration.ofSeconds(1))
.log();
}
>rsc --debug --stream --route request-responseFlux.MYDATA --stacktrace tcp://localhost:7000
2022-03-02 17:33:56.931 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-02 17:33:56.932 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 45 InitialRequestN: 9223372036854775807
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 1c 1b 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 46 6c 75 78 2e 4d 59 44 41 54 41 |ponseFlux.MYDATA|
+--------+-------------------------------------------------+----------------+
Data:
2022-03-02 17:33:58.105 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 86
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 37 2e 30 35 37 34 39 36 5a 22 2c 22 63 72 65 61 |7.057496Z","crea|
|00000040| 74 65 64 22 3a 31 36 34 36 32 36 34 30 33 37 7d |ted":1646264037}|
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:57.057496Z","created":1646264037}
2022-03-02 17:33:59.110 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 38 2e 31 30 34 36 36 33 39 30 30 5a 22 2c 22 63 |8.104663900Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 33 38 7d |38} |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:58.104663900Z","created":1646264038}
2022-03-02 17:34:00.126 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 39 2e 31 30 39 30 31 38 39 30 30 5a 22 2c 22 63 |9.109018900Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 33 39 7d |39} |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:59.109018900Z","created":1646264039}
2022-03-02 17:34:01.138 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 34 3a 30 |22-03-02T23:34:0|
|00000030| 30 2e 31 32 36 32 35 37 31 30 30 5a 22 2c 22 63 |0.126257100Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 34 30 7d |40} |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:34:00.126257100Z","created":1646264040}
2022-03-02 17:34:01.139 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:
2022-03-02 17:33:56.931 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
2022-03-02 17:33:56.932 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 45 InitialRequestN: 9223372036854775807
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 1c 1b 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 46 6c 75 78 2e 4d 59 44 41 54 41 |ponseFlux.MYDATA|
+--------+-------------------------------------------------+----------------+
Data:
2022-03-02 17:33:58.105 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 86
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 37 2e 30 35 37 34 39 36 5a 22 2c 22 63 72 65 61 |7.057496Z","crea|
|00000040| 74 65 64 22 3a 31 36 34 36 32 36 34 30 33 37 7d |ted":1646264037}|
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:57.057496Z","created":1646264037}
2022-03-02 17:33:59.110 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 38 2e 31 30 34 36 36 33 39 30 30 5a 22 2c 22 63 |8.104663900Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 33 38 7d |38} |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:58.104663900Z","created":1646264038}
2022-03-02 17:34:00.126 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 33 3a 35 |22-03-02T23:33:5|
|00000030| 39 2e 31 30 39 30 31 38 39 30 30 5a 22 2c 22 63 |9.109018900Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 33 39 7d |39} |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:33:59.109018900Z","created":1646264039}
2022-03-02 17:34:01.138 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 89
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 4d 79 4d 65 |{"message":"MyMe|
|00000010| 73 73 61 67 65 2d 4d 59 44 41 54 41 20 40 32 30 |ssage-MYDATA @20|
|00000020| 32 32 2d 30 33 2d 30 32 54 32 33 3a 33 34 3a 30 |22-03-02T23:34:0|
|00000030| 30 2e 31 32 36 32 35 37 31 30 30 5a 22 2c 22 63 |0.126257100Z","c|
|00000040| 72 65 61 74 65 64 22 3a 31 36 34 36 32 36 34 30 |reated":16462640|
|00000050| 34 30 7d |40} |
+--------+-------------------------------------------------+----------------+
{"message":"MyMessage-MYDATA @2022-03-02T23:34:00.126257100Z","created":1646264040}
2022-03-02 17:34:01.139 DEBUG 17876 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:
console output
2022-03-02 17:33:34.371 INFO 15640 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000
2022-03-02 17:33:34.385 INFO 15640 --- [ main] c.b.r.RsocketsgetstartApplication : Started RsocketsgetstartApplication in 1.42 seconds (JVM running for 2.148)
2022-03-02 17:33:57.027 INFO 15640 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Received request-responseFlux.{name} : MYDATA
2022-03-02 17:33:57.060 INFO 15640 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2022-03-02 17:33:57.060 INFO 15640 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(1)
2022-03-02 17:33:58.077 INFO 15640 --- [ parallel-2] reactor.Flux.ConcatMap.1 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:33:57.057496Z, created=1646264037))
2022-03-02 17:33:58.103 INFO 15640 --- [ parallel-2] reactor.Flux.ConcatMap.1 : request(9223372036854775806)
2022-03-02 17:33:59.106 INFO 15640 --- [ parallel-3] reactor.Flux.ConcatMap.1 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:33:58.104663900Z, created=1646264038))
2022-03-02 17:34:00.122 INFO 15640 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:33:59.109018900Z, created=1646264039))
2022-03-02 17:34:01.133 INFO 15640 --- [ parallel-5] reactor.Flux.ConcatMap.1 : onNext(Message(message=MyMessage-MYDATA @2022-03-02T23:34:00.126257100Z, created=1646264040))
2022-03-02 17:34:01.138 INFO 15640 --- [ parallel-5] reactor.Flux.ConcatMap.1 : onComplete()
2022-03-02 17:34:01.146 WARN 15640 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Exception happened: java.util.concurrent.CancellationException: Disposed
Request Channel
@MessageMapping("request-channel")
public Flux<Message> channel(final Flux<Integer> settings){
Hooks.onErrorDropped(error->log.warn("Exception happened: {}", error.getMessage()));
log.info("Received request-channel");
return settings
.doOnNext(setting -> log.info("Requested interval is {} seconds.", setting))
.doOnCancel(() -> log.warn("The client cancel the channel."))
.switchMap(setting -> Flux.interval(Duration.ofSeconds( setting)))
.map(index -> new Message("channel generating response #" + index))
.log();
}
>rsc --debug --channel --data - --route request-channel --stacktrace tcp://localhost:7000
2022-03-02 13:51:46.926 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
1 has been typed 1 from keyboard to generate one message each 1 second.
2022-03-02 13:51:54.167 DEBUG 27436 --- [oundedElastic-1] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_CHANNEL Flags: 0b100000000 Length: 34 InitialRequestN: 9223372036854775807
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 72 65 71 75 65 73 74 2d 63 68 61 |.....request-cha|
|00000010| 6e 6e 65 6c |nnel |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 |1 |
+--------+-------------------------------------------------+----------------+
2022-03-02 13:51:54.245 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 9223372036854775807
Data:
2022-03-02 13:51:55.265 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 30 22 2c 22 63 72 65 |esponse #0","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 31 35 |ated":1646250715|
|00000040| 7d |} |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #0","created":1646250715}
2022-03-02 13:51:56.254 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 31 22 2c 22 63 72 65 |esponse #1","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 31 36 |ated":1646250716|
|00000040| 7d |} |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #1","created":1646250716}
52022-03-02 13:51:57.242 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 32 22 2c 22 63 72 65 |esponse #2","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 31 37 |ated":1646250717|
|00000040| 7d |} |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #2","created":1646250717}
5 has been typed 5 from keyboard to generate one message each 1 second.
2022-03-02 13:51:57.521 DEBUG 27436 --- [oundedElastic-1] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100100000 Length: 30
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 72 65 71 75 65 73 74 2d 63 68 61 |.....request-cha|
|00000010| 6e 6e 65 6c |nnel |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 |5 |
+--------+-------------------------------------------------+----------------+
2022-03-02 13:52:02.535 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 30 22 2c 22 63 72 65 |esponse #0","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 32 32 |ated":1646250722|
|00000040| 7d |} |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #0","created":1646250722}
2022-03-02 13:52:06.940 DEBUG 27436 --- [ parallel-1] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14
Data:
2022-03-02 13:52:06.941 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:
2022-03-02 13:52:07.006 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14
Data:
2022-03-02 13:52:07.006 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:
2022-03-02 13:52:07.524 DEBUG 27436 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 71
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 63 68 61 6e |{"message":"chan|
|00000010| 6e 65 6c 20 67 65 6e 65 72 61 74 69 6e 67 20 72 |nel generating r|
|00000020| 65 73 70 6f 6e 73 65 20 23 31 22 2c 22 63 72 65 |esponse #1","cre|
|00000030| 61 74 65 64 22 3a 31 36 34 36 32 35 30 37 32 37 |ated":1646250727|
|00000040| 7d |} |
+--------+-------------------------------------------------+----------------+
{"message":"channel generating response #1","created":1646250727}
2022-03-02 13:52:09.163 DEBUG 27436 --- [oundedElastic-1] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:
Console output
2022-03-02 13:51:32.354 INFO 3492 --- [ main] c.b.r.RsocketsgetstartApplication : No active profile set, falling back to 1 default profile: "default"
2022-03-02 13:51:33.417 INFO 3492 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000
2022-03-02 13:51:33.427 INFO 3492 --- [ main] c.b.r.RsocketsgetstartApplication : Started RsocketsgetstartApplication in 1.352 seconds (JVM running for 2.046)
2022-03-02 13:51:54.191 INFO 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Received request-channel
2022-03-02 13:51:54.230 INFO 3492 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber)
2022-03-02 13:51:54.231 INFO 3492 --- [ctor-http-nio-2] reactor.Flux.Map.1 : request(1)
2022-03-02 13:51:54.240 INFO 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Requested interval is 1 seconds.
2022-03-02 13:51:55.252 INFO 3492 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=channel generating response #0, created=1646250715))
2022-03-02 13:51:55.265 INFO 3492 --- [ parallel-2] reactor.Flux.Map.1 : request(9223372036854775806)
2022-03-02 13:51:56.254 INFO 3492 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=channel generating response #1, created=1646250716))
2022-03-02 13:51:57.241 INFO 3492 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Message(message=channel generating response #2, created=1646250717))
2022-03-02 13:51:57.522 INFO 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Requested interval is 5 seconds.
2022-03-02 13:52:02.532 INFO 3492 --- [ parallel-3] reactor.Flux.Map.1 : onNext(Message(message=channel generating response #0, created=1646250722))
2022-03-02 13:52:07.524 INFO 3492 --- [ parallel-3] reactor.Flux.Map.1 : onNext(Message(message=channel generating response #1, created=1646250727))
2022-03-02 13:52:09.170 WARN 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : Exception happened: java.util.concurrent.CancellationException: Disposed
2022-03-02 13:52:09.172 INFO 3492 --- [ctor-http-nio-2] reactor.Flux.Map.1 : cancel()
2022-03-02 13:52:09.172 WARN 3492 --- [ctor-http-nio-2] c.b.r.RsocketsgetstartApplication : The client cancel the channel.
eot
No hay comentarios:
Publicar un comentario