WebFlux Non Blocking API @RestController @GetMapping return [1,2,3,4]
@RestController
public class FluxMonoController { @GetMapping("/flux") Flux<Integer> flux() { return Flux.just(1,2,3,4) .log();
} }
Running the app
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 19:13:17.830 INFO 12520 --- [ main] com.bext.RoadReactiveSpringApplication : Starting RoadReactiveSpringApplication on DESKTOP-NLF0058 with PID 12520 (D:\proy\roadReactiveSpring\build\classes\java\main started by bext in D:\proy\roadReactiveSpring)
2019-12-10 19:13:17.832 INFO 12520 --- [ main] com.bext.RoadReactiveSpringApplication : No active profile set, falling back to default profiles: default
2019-12-10 19:13:18.187 INFO 12520 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode.
2019-12-10 19:13:18.202 INFO 12520 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 10ms. Found 0 Reactive MongoDB repository interfaces.
2019-12-10 19:13:18.206 INFO 12520 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data MongoDB repositories in DEFAULT mode.
2019-12-10 19:13:18.207 INFO 12520 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 1ms. Found 0 MongoDB repository interfaces.
2019-12-10 19:13:18.712 INFO 12520 --- [ main] org.mongodb.driver.cluster : Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2019-12-10 19:13:18.749 INFO 12520 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:775}] to localhost:27017
2019-12-10 19:13:18.753 INFO 12520 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 1]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2480000}
2019-12-10 19:13:18.926 INFO 12520 --- [ main] org.mongodb.driver.cluster : Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2019-12-10 19:13:19.324 INFO 12520 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2019-12-10 19:13:19.327 INFO 12520 --- [ main] com.bext.RoadReactiveSpringApplication : Started RoadReactiveSpringApplication in 1.725 seconds (JVM running for 2.33)
2019-12-10 19:13:19.384 INFO 12520 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:776}] to localhost:27017
2019-12-10 19:13:19.385 INFO 12520 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 1]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=1257200}
2019-12-10 19:13:24.711 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2019-12-10 19:13:24.712 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | request(unbounded)
2019-12-10 19:13:24.712 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(1)
2019-12-10 19:13:24.712 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(2)
2019-12-10 19:13:24.713 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(3)
2019-12-10 19:13:24.713 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(4)
2019-12-10 19:13:24.713 INFO 12520 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onComplete()
From the command line
C:\Users\bext>curl localhost:8080/flux
[1,2,3,4]
WebFlux Non Blocking API .delayElements( Duration.ofSeconds(1))
@RestController
public class FluxMonoController { @GetMapping("/flux") Flux<Integer> flux() { return Flux.just(1,2,3,4) .delayElements(Duration.ofSeconds(1)) .log();
} }
Running the application
2019-12-10 19:18:04.464 INFO 16152 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-12-10 19:18:04.465 INFO 16152 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(unbounded)
2019-12-10 19:18:05.470 INFO 16152 --- [ parallel-1] reactor.Flux.ConcatMap.1 : onNext(1)
2019-12-10 19:18:06.471 INFO 16152 --- [ parallel-2] reactor.Flux.ConcatMap.1 : onNext(2)
2019-12-10 19:18:07.473 INFO 16152 --- [ parallel-3] reactor.Flux.ConcatMap.1 : onNext(3)
2019-12-10 19:18:08.474 INFO 16152 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onNext(4)
2019-12-10 19:18:08.474 INFO 16152 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onComplete()
The time between onNext is 1 second.
From the command line
C:\Users\bext>curl localhost:8080/flux
[1,2,3,4]
after 4 seconds aprox. shows all the results in one time.
WebFlux Non Blocking API .delayElements( Duration.ofSeconds(1)) MediaType.APPLICATION_STREAM_JSON_VALUE
this way the results are showing one by one on the browser.
@GetMapping(value = "/fluxstream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Integer> fluxStream() { return Flux.just(1,2,3,4) .delayElements(Duration.ofSeconds(1)) .log();
}
output on the browser
1 ( after a second)2 ( after a second)
3 ( after a second)
4 ( after a second)
Running the application and Tested on Browser, shows successively each second a value.
JUnit Test Reactive API using WebTestClient
With the Spring class WebTestClient we can test the flux, this is instantiated by the @WebFluxTest annotation.
JUnit Test Reactive API using WebTestClient
With the Spring class WebTestClient we can test the flux, this is instantiated by the @WebFluxTest annotation.
With StepVerifier we check the result expeted for our Junit test. (use @WebFluxTest in the case of rest controler)
@RunWith(SpringRunner.class) @WebFluxTest
class FluxMonoControllerTest { @Autowired
WebTestClient webTestClient;
@Test
public void flux_webClient01() { Flux<Integer> fluxInteger = webTestClient.get().uri("/flux") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .returnResult(Integer.class) .getResponseBody(); StepVerifier.create( fluxInteger) .expectSubscription() .expectNext(1) .expectNext(2) .expectNext(3) .expectNext(4) .verifyComplete();
} @Test
public void flux_webClient02() { webTestClient.get().uri("/flux") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBodyList(Integer.class) .hasSize(4);
} @Test
public void flux_webClient03() { List<Integer> expectedListInteger = Arrays.asList(1,2,3,4); EntityExchangeResult<List<Integer>> entityExchangeResult = webTestClient.get().uri("/flux") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBodyList(Integer.class) .returnResult(); assertEquals( expectedListInteger, entityExchangeResult.getResponseBody()); }
@Test
public void flux_webClient04() { List<Integer> expectedListInteger = Arrays.asList(1,2,3,4); webTestClient.get().uri("/flux") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBodyList(Integer.class) .consumeWith( response -> assertEquals( expectedListInteger, response.getResponseBody())); } }
Running this Tests
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 19:39:16.665 INFO 4916 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Starting FluxMonoControllerTest on DESKTOP-NLF0058 with PID 4916 (started by bext in D:\proy\roadReactiveSpring)
2019-12-10 19:39:16.667 INFO 4916 --- [ Test worker] c.b.controller.FluxMonoControllerTest : No active profile set, falling back to default profiles: default
2019-12-10 19:39:17.510 INFO 4916 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Started FluxMonoControllerTest in 1.047 seconds (JVM running for 2.097)
2019-12-10 19:39:17.924 INFO 4916 --- [ parallel-1] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-12-10 19:39:17.927 INFO 4916 --- [ parallel-1] reactor.Flux.ConcatMap.1 : request(unbounded)
2019-12-10 19:39:18.934 INFO 4916 --- [ parallel-2] reactor.Flux.ConcatMap.1 : onNext(1)
2019-12-10 19:39:19.934 INFO 4916 --- [ parallel-3] reactor.Flux.ConcatMap.1 : onNext(2)
2019-12-10 19:39:20.936 INFO 4916 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onNext(3)
2019-12-10 19:39:21.937 INFO 4916 --- [ parallel-5] reactor.Flux.ConcatMap.1 : onNext(4)
2019-12-10 19:39:21.938 INFO 4916 --- [ parallel-5] reactor.Flux.ConcatMap.1 : onComplete()
2019-12-10 19:39:22.035 INFO 4916 --- [ parallel-6] reactor.Flux.ConcatMap.2 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-12-10 19:39:22.036 INFO 4916 --- [ parallel-6] reactor.Flux.ConcatMap.2 : request(unbounded)
2019-12-10 19:39:23.036 INFO 4916 --- [ parallel-7] reactor.Flux.ConcatMap.2 : onNext(1)
2019-12-10 19:39:24.038 INFO 4916 --- [ parallel-8] reactor.Flux.ConcatMap.2 : onNext(2)
2019-12-10 19:39:25.040 INFO 4916 --- [ parallel-9] reactor.Flux.ConcatMap.2 : onNext(3)
2019-12-10 19:39:26.040 INFO 4916 --- [ parallel-10] reactor.Flux.ConcatMap.2 : onNext(4)
2019-12-10 19:39:26.040 INFO 4916 --- [ parallel-10] reactor.Flux.ConcatMap.2 : onComplete()
2019-12-10 19:39:26.051 INFO 4916 --- [ parallel-11] reactor.Flux.ConcatMap.3 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-12-10 19:39:26.051 INFO 4916 --- [ parallel-11] reactor.Flux.ConcatMap.3 : request(unbounded)
2019-12-10 19:39:27.054 INFO 4916 --- [ parallel-12] reactor.Flux.ConcatMap.3 : onNext(1)
2019-12-10 19:39:28.054 INFO 4916 --- [ parallel-1] reactor.Flux.ConcatMap.3 : onNext(2)
2019-12-10 19:39:29.057 INFO 4916 --- [ parallel-2] reactor.Flux.ConcatMap.3 : onNext(3)
2019-12-10 19:39:30.057 INFO 4916 --- [ parallel-3] reactor.Flux.ConcatMap.3 : onNext(4)
2019-12-10 19:39:30.057 INFO 4916 --- [ parallel-3] reactor.Flux.ConcatMap.3 : onComplete()
2019-12-10 19:39:30.067 INFO 4916 --- [ parallel-4] reactor.Flux.ConcatMap.4 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-12-10 19:39:30.068 INFO 4916 --- [ parallel-4] reactor.Flux.ConcatMap.4 : request(unbounded)
2019-12-10 19:39:31.070 INFO 4916 --- [ parallel-5] reactor.Flux.ConcatMap.4 : onNext(1)
2019-12-10 19:39:32.072 INFO 4916 --- [ parallel-6] reactor.Flux.ConcatMap.4 : onNext(2)
2019-12-10 19:39:33.072 INFO 4916 --- [ parallel-7] reactor.Flux.ConcatMap.4 : onNext(3)
2019-12-10 19:39:34.073 INFO 4916 --- [ parallel-8] reactor.Flux.ConcatMap.4 : onNext(4)
2019-12-10 19:39:34.073 INFO 4916 --- [ parallel-8] reactor.Flux.ConcatMap.4 : onComplete()
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.0.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 21s
5 actionable tasks: 4 executed, 1 up-to-date
07:39:34 p. m.: Tasks execution finished ':cleanTest :test --tests "com.bext.controller.FluxMonoControllerTest"'.
JUnit Test infinite Non Blocking Sequence API using WebTestClient
@Test
void fluxStream() { Flux<Long> fluxLong = webTestClient.get().uri("/fluxstream")
.accept(MediaType.APPLICATION_STREAM_JSON) .exchange() .expectStatus().isOk() .returnResult(Long.class) .getResponseBody(); StepVerifier.create(fluxLong) .expectSubscription() .expectNext(0L) .expectNext(1L) .expectNext(2L) .thenCancel() .verify();
}
Running this Test
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 20:18:40.975 INFO 20132 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Starting FluxMonoControllerTest on DESKTOP-NLF0058 with PID 20132 (started by bext in D:\proy\roadReactiveSpring)
2019-12-10 20:18:40.977 INFO 20132 --- [ Test worker] c.b.controller.FluxMonoControllerTest : No active profile set, falling back to default profiles: default
2019-12-10 20:18:41.714 INFO 20132 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Started FluxMonoControllerTest in 0.917 seconds (JVM running for 2.188)
2019-12-10 20:18:42.034 INFO 20132 --- [ parallel-1] reactor.Flux.Interval.1 : onSubscribe(FluxInterval.IntervalRunnable)
2019-12-10 20:18:42.037 INFO 20132 --- [ parallel-1] reactor.Flux.Interval.1 : request(1)
2019-12-10 20:18:43.039 INFO 20132 --- [ parallel-2] reactor.Flux.Interval.1 : onNext(0)
2019-12-10 20:18:43.085 INFO 20132 --- [ Test worker] reactor.Flux.Interval.1 : request(31)
2019-12-10 20:18:44.039 INFO 20132 --- [ parallel-2] reactor.Flux.Interval.1 : onNext(1)
2019-12-10 20:18:45.040 INFO 20132 --- [ parallel-2] reactor.Flux.Interval.1 : onNext(2)
2019-12-10 20:18:45.041 INFO 20132 --- [ parallel-2] reactor.Flux.Interval.1 : cancel()
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.0.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 6s
5 actionable tasks: 2 executed, 3 up-to-date
08:18:45 p. m.: Tasks execution finished ':cleanTest :test --tests "com.bext.controller.FluxMonoControllerTest.fluxStream"'.
JUnit Test simple Non Blocking API Mono using WebTestClient
@RestController
public class FluxMonoController { @GetMapping("/flux") public Flux<Integer> flux() { return Flux.just(1,2,3,4) .delayElements(Duration.ofSeconds(1)) .log();
} @GetMapping(value = "/fluxstream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Long> fluxStream() { return Flux.interval(Duration.ofSeconds(1)) //generate infinite succesive values
.log();
} @GetMapping("/mono") public Mono<Integer> mono(){ return Mono.just(1) .log();
}
}
@Test
void mono_WebClient01(){ Integer expectedValue = new Integer(1); EntityExchangeResult<Integer> entityExchangeResult = webTestClient.get().uri("/mono") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody(Integer.class) .returnResult(); assertEquals( expectedValue, entityExchangeResult.getResponseBody());
}
@Test
void mono_WebClient02(){ Integer expectedValue = new Integer(1); webTestClient.get().uri("/mono") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody(Integer.class) .consumeWith( response -> { assertEquals( expectedValue, response.getResponseBody());
});
}
Running this test
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 20:21:20.903 INFO 20092 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Starting FluxMonoControllerTest on DESKTOP-NLF0058 with PID 20092 (started by bext in D:\proy\roadReactiveSpring)
2019-12-10 20:21:20.905 INFO 20092 --- [ Test worker] c.b.controller.FluxMonoControllerTest : No active profile set, falling back to default profiles: default
2019-12-10 20:21:21.720 INFO 20092 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Started FluxMonoControllerTest in 1.004 seconds (JVM running for 2.131)
2019-12-10 20:21:22.034 INFO 20092 --- [ parallel-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2019-12-10 20:21:22.045 INFO 20092 --- [ parallel-1] reactor.Mono.Just.1 : | request(unbounded)
2019-12-10 20:21:22.046 INFO 20092 --- [ parallel-1] reactor.Mono.Just.1 : | onNext(1)
2019-12-10 20:21:22.059 INFO 20092 --- [ parallel-1] reactor.Mono.Just.1 : | onComplete()
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.0.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 4s
5 actionable tasks: 4 executed, 1 up-to-date
08:21:22 p. m.: Tasks execution finished ':cleanTest :test --tests "com.bext.controller.FluxMonoControllerTest.mono_WebClient01"'.
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 20:22:14.344 INFO 10988 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Starting FluxMonoControllerTest on DESKTOP-NLF0058 with PID 10988 (started by bext in D:\proy\roadReactiveSpring)
2019-12-10 20:22:14.347 INFO 10988 --- [ Test worker] c.b.controller.FluxMonoControllerTest : No active profile set, falling back to default profiles: default
2019-12-10 20:22:15.059 INFO 10988 --- [ Test worker] c.b.controller.FluxMonoControllerTest : Started FluxMonoControllerTest in 0.889 seconds (JVM running for 2.125)
2019-12-10 20:22:15.363 INFO 10988 --- [ parallel-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2019-12-10 20:22:15.365 INFO 10988 --- [ parallel-1] reactor.Mono.Just.1 : | request(unbounded)
2019-12-10 20:22:15.366 INFO 10988 --- [ parallel-1] reactor.Mono.Just.1 : | onNext(1)
2019-12-10 20:22:15.376 INFO 10988 --- [ parallel-1] reactor.Mono.Just.1 : | onComplete()
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.0.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 3s
5 actionable tasks: 2 executed, 3 up-to-date
08:22:15 p. m.: Tasks execution finished ':cleanTest :test --tests "com.bext.controller.FluxMonoControllerTest.mono_WebClient02"'.
Simple Non Blocking API Flux Mono using Handler and Router
Handler
package com.bext.handler; import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
public class SimpleHandlerFunction { public Mono<ServerResponse> flux(ServerRequest serverRequest) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body( Flux.just(1,2,3,4).log(), Integer.class);
} public Mono<ServerResponse> fluxString(ServerRequest serverRequest) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body( Flux.just("Elemento1","Elemento2","Elemento3","Elemento4").log(),
String.class);
} public Mono<ServerResponse> mono(ServerRequest serverRequest) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body( Mono.just(1).log(), Integer.class);
} public Mono<ServerResponse> monoString(ServerRequest serverRequest) { return ServerResponse.ok() .contentType(MediaType.TEXT_PLAIN) .body( Mono.just("Unico").log(), String.class);
} }Router
package com.bext.router; import com.bext.handler.SimpleHandlerFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
@Configuration
public class RouterFunctionConfig { @Bean
public RouterFunction<ServerResponse> route(SimpleHandlerFunction simpleHandlerFunction){ return RouterFunctions .route(GET("/functional/flux").and(accept(MediaType.APPLICATION_JSON)), simpleHandlerFunction::flux) .andRoute(GET("/functional/fluxString").and(accept(MediaType.APPLICATION_JSON)), simpleHandlerFunction::fluxString) .andRoute(GET("/functional/mono").and(accept(MediaType.APPLICATION_JSON)), simpleHandlerFunction::mono) .andRoute(GET("/functional/monoString").and(accept(MediaType.APPLICATION_JSON)), simpleHandlerFunction::monoString); } }
Running the Application
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 20:32:42.252 INFO 15000 --- [ main] com.bext.RoadReactiveSpringApplication : Starting RoadReactiveSpringApplication on DESKTOP-NLF0058 with PID 15000 (D:\proy\roadReactiveSpring\build\classes\java\main started by bext in D:\proy\roadReactiveSpring)
2019-12-10 20:32:42.255 INFO 15000 --- [ main] com.bext.RoadReactiveSpringApplication : No active profile set, falling back to default profiles: default
2019-12-10 20:32:42.614 INFO 15000 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode.
2019-12-10 20:32:42.625 INFO 15000 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 7ms. Found 0 Reactive MongoDB repository interfaces.
2019-12-10 20:32:42.627 INFO 15000 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data MongoDB repositories in DEFAULT mode.
2019-12-10 20:32:42.629 INFO 15000 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 1ms. Found 0 MongoDB repository interfaces.
2019-12-10 20:32:43.031 INFO 15000 --- [ main] org.mongodb.driver.cluster : Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2019-12-10 20:32:43.066 INFO 15000 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:783}] to localhost:27017
2019-12-10 20:32:43.069 INFO 15000 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 1]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=1546700}
2019-12-10 20:32:43.185 INFO 15000 --- [ main] org.mongodb.driver.cluster : Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2019-12-10 20:32:43.552 INFO 15000 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2019-12-10 20:32:43.556 INFO 15000 --- [ main] com.bext.RoadReactiveSpringApplication : Started RoadReactiveSpringApplication in 1.57 seconds (JVM running for 2.278)
2019-12-10 20:32:43.592 INFO 15000 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:784}] to localhost:27017
2019-12-10 20:32:43.594 INFO 15000 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 1]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=1314500}
2019-12-10 20:33:03.918 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2019-12-10 20:33:03.919 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | request(unbounded)
2019-12-10 20:33:03.919 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(1)
2019-12-10 20:33:03.919 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(2)
2019-12-10 20:33:03.919 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(3)
2019-12-10 20:33:03.919 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onNext(4)
2019-12-10 20:33:03.919 INFO 15000 --- [ctor-http-nio-2] reactor.Flux.Array.1 : | onComplete()
2019-12-10 20:33:06.201 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2019-12-10 20:33:06.201 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | request(1)
2019-12-10 20:33:06.201 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | onNext(Elemento1)
2019-12-10 20:33:06.204 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | request(127)
2019-12-10 20:33:06.204 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | onNext(Elemento2)
2019-12-10 20:33:06.204 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | onNext(Elemento3)
2019-12-10 20:33:06.204 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | onNext(Elemento4)
2019-12-10 20:33:06.204 INFO 15000 --- [ctor-http-nio-3] reactor.Flux.Array.2 : | onComplete()
2019-12-10 20:33:11.608 INFO 15000 --- [ctor-http-nio-4] reactor.Mono.Just.3 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2019-12-10 20:33:11.608 INFO 15000 --- [ctor-http-nio-4] reactor.Mono.Just.3 : | request(unbounded)
2019-12-10 20:33:11.608 INFO 15000 --- [ctor-http-nio-4] reactor.Mono.Just.3 : | onNext(1)
2019-12-10 20:33:11.613 INFO 15000 --- [ctor-http-nio-4] reactor.Mono.Just.3 : | onComplete()
2019-12-10 20:33:16.211 INFO 15000 --- [ctor-http-nio-1] reactor.Mono.Just.4 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2019-12-10 20:33:16.211 INFO 15000 --- [ctor-http-nio-1] reactor.Mono.Just.4 : | request(unbounded)
2019-12-10 20:33:16.211 INFO 15000 --- [ctor-http-nio-1] reactor.Mono.Just.4 : | onNext(Unico)
2019-12-10 20:33:16.211 INFO 15000 --- [ctor-http-nio-1] reactor.Mono.Just.4 : | onComplete()
Test on Command line
C:\Users\bext>curl localhost:8080/functional/flux
[1,2,3,4]
C:\Users\bext>curl localhost:8080/functional/fluxString
Elemento1Elemento2Elemento3Elemento4
C:\Users\bext>curl localhost:8080/functional/mono
1
C:\Users\bext>curl localhost:8080/functional/monoString
Unico
JUnit Functional EndPoint using WebTestClient, RestController (@WebFluxTest) and
Functional (@SpringBootTest @AutoConfigureWebTestClient) Approach
In the case of functional web we use @SpringBootTest and @AutoconfigureWebTestClient
package com.bext.handler; import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class) @SpringBootTest
@AutoConfigureWebTestClient
public class SampleHandlerFunctionTest {
@Autowired
WebTestClient webTestClient;
@Test
public void flux_webClient01() { Flux<Integer> fluxInteger = webTestClient.get().uri("/functional/flux") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .returnResult(Integer.class) .getResponseBody(); StepVerifier.create( fluxInteger) .expectSubscription() .expectNext(1) .expectNext(2) .expectNext(3) .expectNext(4) .verifyComplete();
} @Test
public void fluxString_webClient01() { Flux<String> fluxString = webTestClient.get().uri("/functional/fluxString") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .returnResult(String.class) .getResponseBody(); StepVerifier.create( fluxString) .expectSubscription() .expectNext("Elemento1Elemento2Elemento3Elemento4") .verifyComplete();
} @Test
void mono_WebClient02(){ Integer expectedValue = new Integer(1); webTestClient.get().uri("/functional/mono") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody(Integer.class) .consumeWith( response -> { assertEquals( expectedValue, response.getResponseBody()); }); } }
Running this Test
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-10 20:52:56.273 INFO 20308 --- [ Test worker] c.b.handler.SampleHandlerFunctionTest : Starting SampleHandlerFunctionTest on DESKTOP-NLF0058 with PID 20308 (started by bext in D:\proy\ReactiveSpringRoad)
2019-12-10 20:52:56.275 INFO 20308 --- [ Test worker] c.b.handler.SampleHandlerFunctionTest : No active profile set, falling back to default profiles: default
2019-12-10 20:52:57.368 INFO 20308 --- [ Test worker] c.b.handler.SampleHandlerFunctionTest : Started SampleHandlerFunctionTest in 1.319 seconds (JVM running for 2.624)
2019-12-10 20:52:57.663 INFO 20308 --- [ parallel-1] reactor.Flux.Array.1 : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2019-12-10 20:52:57.664 INFO 20308 --- [ parallel-1] reactor.Flux.Array.1 : | request(1)
2019-12-10 20:52:57.665 INFO 20308 --- [ parallel-1] reactor.Flux.Array.1 : | onNext(Elemento1)
2019-12-10 20:52:57.705 INFO 20308 --- [ Test worker] reactor.Flux.Array.1 : | request(31)
2019-12-10 20:52:57.705 INFO 20308 --- [ Test worker] reactor.Flux.Array.1 : | onNext(Elemento2)
2019-12-10 20:52:57.705 INFO 20308 --- [ Test worker] reactor.Flux.Array.1 : | onNext(Elemento3)
2019-12-10 20:52:57.705 INFO 20308 --- [ Test worker] reactor.Flux.Array.1 : | onNext(Elemento4)
2019-12-10 20:52:57.706 INFO 20308 --- [ Test worker] reactor.Flux.Array.1 : | onComplete()
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | request(unbounded)
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | onNext(1)
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | onNext(2)
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | onNext(3)
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | onNext(4)
2019-12-10 20:52:57.734 INFO 20308 --- [ parallel-2] reactor.Flux.Array.2 : | onComplete()
2019-12-10 20:52:57.762 INFO 20308 --- [ parallel-3] reactor.Mono.Just.3 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2019-12-10 20:52:57.762 INFO 20308 --- [ parallel-3] reactor.Mono.Just.3 : | request(unbounded)
2019-12-10 20:52:57.762 INFO 20308 --- [ parallel-3] reactor.Mono.Just.3 : | onNext(1)
2019-12-10 20:52:57.763 INFO 20308 --- [ parallel-3] reactor.Mono.Just.3 : | onComplete()
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.0.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 4s
5 actionable tasks: 2 executed, 3 up-to-date
08:52:57 p. m.: Tasks execution finished ':cleanTest :test --tests "com.bext.handler.SampleHandlerFunctionTest"'.
eot