Project Reactor Essentials 1/2
Project Reactor Essentials Tests
https://github.com/jalbertomr/project-reactor-essentials-test
A Walk through Project Reactor with test
-
Dependencies
-
setup Tests
-
Mono.just, subscribe(), log()
-
subscribe( Consumer, errorConsumer)
-
subscribe( Consumer, errorConsumer, CompleteConsumer)
-
subscribe( Consumer, errorConsumer, CompleteConsumer, subscriptionConsumer) Subscription::cancel Subscriptino::request(n)
-
Mono doOnNext twice
-
Mono doOnNext twice but emptied Mono between
-
Mono doOnErrorTest, Mono.error
-
Mono onErrorResume
-
Mono onErrorResume disable doOnError onErrorReturn
-
Flux subscriber
-
Flux.range()
-
flux.fromIterable( List.of(...) )
-
flux.fromIterable( List.of(...), subscribe( consumer, errorConsumer, completedConsumer) Completed!
-
flux.fromItetable( List.of(...), subscribe( consumer, errorConsumer, completedConsumer) request(n) NOT Completed!
-
backpressure overriding subscription
-
backpressure lambdas alternative BaseSubscriber
-
Flux.interval simple do..XX.. activated when main Thread finish Flux fluxInterval = Flux.interval(Duration.ofMillis(100)) .log() .doOnCancel(() -> log.info("doOnCancel")) //not called by main thread stopped .doOnError( err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped .doFinally(signalType -> log.info("doFinally {}", signalType)) //not called by main thread stopped .doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped .doAfterTerminate(() -> log.info("doAfterTerminate")); //not called by main thread stopped
fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!")); Thread.sleep(1000); // to break the flux.interval
-
Flux.interval simple do..XX.. activated when subscribe.dispose()
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100)) .log() .doOnCancel(() -> log.info("doOnCancel")) //called when subscribe.dispose .doOnError( err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped or subscription disposed .doFinally(signalType -> log.info("doFinally {}", signalType)) //called when subscribe.dispose .doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped or subscription disposed .doAfterTerminate(() -> log.info("doAfterTerminate")); //not called by main thread stopped or subscription disposed Disposable subscribe = fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!")); Thread.sleep(300); subscribe.dispose(); Thread.sleep(1000); // to break the flux.interval
-
Flux.interval disposed by subscriber programmatically (BaseSubscriber)
-
Flux.interval terminated by flux.take(x), do..XX.. events triggered
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100)) .log() .take(3) .doOnCancel(() -> log.info("doOnCancel")) //Not called .doOnError(err -> log.error("doOnError {}", err.getMessage())) //Not called .doOnTerminate(() -> log.info("doOnTerminate")) //called 1st when flux finish .doAfterTerminate(() -> log.info("doAfterTerminate")) //called 2nd when flux finish .doFinally(signalType -> log.info("doFinally signal: {}", signalType)); //called 3rd when flux finish fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!")); log.info("--------StepVerifier---------"); StepVerifier.create(fluxInterval) .expectSubscription() .expectNext(0L,1L,2L) .expectComplete() .verify(); Thread.sleep(1000); // to break the flux.interval
-
Flux.interval canceled by BaseSubscriber - cancel, do..XX.. events triggered
-
interval withVirtualTime(Duration), thenAwait(Duration)
-
.expectNoEvent(Duration)
-
limitRate( int prefechRate)
-
Hot Flux, ConnectableFlux -- Flux.create()...publish()
-
Hot Flux, ConnectableFlux -- StepVerifier .then(publish::connect).thenConsumeWhile(i -> i < 5).expectNext(5,6...
-
Hot Flux, ConnectableFlux -- autoConnect( minSubscribers)
-
OperatorsTest - subscribeOn [Schedulers.[single,boudendElastic]] - publishOn [Schedulers.[single, boundedElastic]]
-
multiplePublishOnSchedulers, multipleSubscribeSchedulres, multiplePublishOnSubscribeOnSchedulers
-
fromCallable (block IO)
-
switchIfEmpty test previos to defer
-
Mono.defer(() -> {...})
-
Mono.defer atomicLong
-
Flux.concat(...), fluxA,concatWith( fluxB), flux.combineLatest(...)
-
Flux.merge( fluxA, fluxB), FluxA.mergeWith( fluxB)...
-
Flux.mergeSequential( fluxA, fluxB), Flux.concat(WithErro), Flux.concatDelayError(1, fluxA, fluxB,...)
-
Flux.map, Flux.flatMap
-
Flux.flatMap delaying flux
-
Flux.flatMapSequential
-
Flux.zip( fluxA, fluxB,...) fluxA.zipWith( fluxB)
BlockHound to check if blocking code.
Executions Mono<T> Test
@Slf4j
public class MonoTest {
@Test
public void monoSubscriberTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log();
mono.subscribe();
log.info("Mono: {}", mono);
}
}
@Test
public void monoSubscriberTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log();
mono.subscribe();
StepVerifier.create(mono)
.expectNext("MonoHasJustThis")
.verifyComplete();
log.info("Mono: {}", mono);
}
@Test
public void monoSubscriberConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log();
mono.subscribe( t -> log.info("t: {}", t));
log.info("--------StepVerifier---------");
StepVerifier.create(mono)
.expectNext("MonoHasJustThis")
.verifyComplete();
}
@Test
public void monoSubscriberConsumerErrorTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log()
.map( s -> {throw new RuntimeException("test subscribe with error flow");});
mono.subscribe( t -> log.info("t: {}", t), t -> log.error("error in the flow"));
log.info("--------StepVerifier---------");
StepVerifier.create(mono)
.expectError( RuntimeException.class)
.verify();
}
@Test
public void monoSubscriberConsumerErrorConsumerCompleteConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase);
mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"));
log.info("--------StepVerifier---------");
StepVerifier.create(mono)
.expectNext("MonoHasJustThis".toUpperCase())
.verifyComplete();
}
@Test
public void monoSubscriberConsumerErrorConsumerCompleteConsumerSubscriptionConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase);
mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"),
Subscription::cancel);
@Test
public void monoSubscriberConsumerErrorConsumerCompleteConsumerSubscriptionConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase);
mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"),
subscription -> subscription.request(3));
log.info("--------StepVerifier---------");
StepVerifier.create(mono)
.expectNext("MonoHasJustThis".toUpperCase())
.verifyComplete();
}
@Test
public void monoDoOnMethodsTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase)
.doOnSubscribe( subscription -> log.info("doOnSubscribe - {}", subscription))
.doOnRequest( value -> log.info("doOnRequestThis - {}", value))
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnSuccess(s -> log.info("doOnSuccess - {}", s));
mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"));
public void monoDoOnMethodsTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase)
.doOnSubscribe( subscription -> log.info("doOnSubscribe - {}", subscription))
.doOnRequest( value -> log.info("doOnRequestThis - {}", value))
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnSuccess(s -> log.info("doOnSuccess - {}", s));
mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"));
@Test
public void monoDoOnMethodsTest() {
Mono<Object> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase)
.doOnSubscribe( subscription -> log.info("doOnSubscribe - {}", subscription))
.doOnRequest( value -> log.info("doOnRequestThis - {}", value))
.doOnNext( s -> log.info("doOnNext - {}", s))
.flatMap(s -> Mono.empty())
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnSuccess(s -> log.info("doOnSuccess - {}", s));
@Test
public void monoDoOnErrorTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.log();
log.info("--------StepVerifier---------");
StepVerifier.create(monoError)
.expectError(IllegalArgumentException.class)
.verify();
}
@Test
public void monoDoOnErrorNopTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.doOnNext(o -> log.info("This doOnNext is not executed {}: ", o))
.log();
log.info("--------StepVerifier---------");
StepVerifier.create(monoError)
.expectError(IllegalArgumentException.class)
.verify();
}
@Test
public void monoDoOnErrorResumeTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.onErrorResume(throwable -> {
log.info("in onErrorResume: {}", throwable.getMessage());
return Mono.just("Fixed the flow");
})
.log();
log.info("--------StepVerifier---------");
StepVerifier.create(monoError)
.expectNext("Fixed the flow")
.verifyComplete();
}
@Test
public void monoDoOnErrorResumeDisableDoOnErrorTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.onErrorResume(throwable -> {
log.info("in onErrorResume: {}", throwable.getMessage());
return Mono.just("Fixed the flow");
})
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.log();
log.info("--------StepVerifier---------");
StepVerifier.create(monoError)
.expectNext("Fixed the flow")
.verifyComplete();
}
@Test
public void monoDoOnErrorReturnTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.onErrorReturn("Returned by OnErrorReturn")
.onErrorResume(throwable -> {
log.info("in onErrorResume: {}", throwable.getMessage());
return Mono.just("Fixed the flow");
})
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.log();
log.info("--------StepVerifier---------");
StepVerifier.create(monoError)
.expectNext("Returned by OnErrorReturn")
.verifyComplete();
}
No hay comentarios:
Publicar un comentario