miércoles, 9 de marzo de 2022

Project Reactor Essentials 1/3

 Project Reactor Essentials 1/2


https://github.com/jalbertomr/project-reactor-essentials-test


link





Project Reactor Essentials Tests

https://github.com/jalbertomr/project-reactor-essentials-test

Commits

A Walk through Project Reactor with test

  • Dependencies

  • setup Tests

  • Mono.just, subscribe(), log()

  • subscribe( Consumer, errorConsumer)

  • subscribe( Consumer, errorConsumer, CompleteConsumer)

  • subscribe( Consumer, errorConsumer, CompleteConsumer, subscriptionConsumer) Subscription::cancel Subscriptino::request(n)

  • Mono doOnNext twice

  • Mono doOnNext twice but emptied Mono between

  • Mono doOnErrorTest, Mono.error

  • Mono onErrorResume

  • Mono onErrorResume disable doOnError onErrorReturn

  • Flux subscriber

  • Flux.range()

  • flux.fromIterable( List.of(...) )

  • flux.fromIterable( List.of(...), subscribe( consumer, errorConsumer, completedConsumer) Completed!

  • flux.fromItetable( List.of(...), subscribe( consumer, errorConsumer, completedConsumer) request(n) NOT Completed!

  • backpressure overriding subscription

  • backpressure lambdas alternative BaseSubscriber

  • Flux.interval simple do..XX.. activated when main Thread finish Flux fluxInterval = Flux.interval(Duration.ofMillis(100)) .log() .doOnCancel(() -> log.info("doOnCancel")) //not called by main thread stopped .doOnError( err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped .doFinally(signalType -> log.info("doFinally {}", signalType)) //not called by main thread stopped .doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped .doAfterTerminate(() -> log.info("doAfterTerminate")); //not called by main thread stopped

      fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!"));
    
      Thread.sleep(1000);  // to break the flux.interval
    
  • Flux.interval simple do..XX.. activated when subscribe.dispose()

      Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
              .log()
              .doOnCancel(() -> log.info("doOnCancel"))                        //called when subscribe.dispose
              .doOnError( err -> log.error("doOnError {}", err.getMessage()))  //not called by main thread stopped or subscription disposed
              .doFinally(signalType -> log.info("doFinally {}", signalType))   //called when subscribe.dispose
              .doOnTerminate(() -> log.info("doOnTerminate"))                  //not called by main thread stopped or subscription disposed
              .doAfterTerminate(() -> log.info("doAfterTerminate"));           //not called by main thread stopped or subscription disposed
    
      Disposable subscribe = fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!"));
      Thread.sleep(300);
      subscribe.dispose();
    
      Thread.sleep(1000);  // to break the flux.interval
    
  • Flux.interval disposed by subscriber programmatically (BaseSubscriber)

  • Flux.interval terminated by flux.take(x), do..XX.. events triggered

     Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
              .log()
              .take(3)
              .doOnCancel(() -> log.info("doOnCancel"))                        //Not called
              .doOnError(err -> log.error("doOnError {}", err.getMessage()))   //Not called
              .doOnTerminate(() -> log.info("doOnTerminate"))                           //called 1st when flux finish
              .doAfterTerminate(() -> log.info("doAfterTerminate"))                     //called 2nd when flux finish
              .doFinally(signalType -> log.info("doFinally signal: {}", signalType));   //called 3rd when flux finish
    
      fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!"));
    
      log.info("--------StepVerifier---------");
    
      StepVerifier.create(fluxInterval)
              .expectSubscription()
              .expectNext(0L,1L,2L)
              .expectComplete()
              .verify();
    
      Thread.sleep(1000);  // to break the flux.interval
    
  • Flux.interval canceled by BaseSubscriber - cancel, do..XX.. events triggered

  • interval withVirtualTime(Duration), thenAwait(Duration)

  • .expectNoEvent(Duration)

  • limitRate( int prefechRate)

  • Hot Flux, ConnectableFlux -- Flux.create()...publish()

  • Hot Flux, ConnectableFlux -- StepVerifier .then(publish::connect).thenConsumeWhile(i -> i < 5).expectNext(5,6...

  • Hot Flux, ConnectableFlux -- autoConnect( minSubscribers)

  • OperatorsTest - subscribeOn [Schedulers.[single,boudendElastic]] - publishOn [Schedulers.[single, boundedElastic]]

  • multiplePublishOnSchedulers, multipleSubscribeSchedulres, multiplePublishOnSubscribeOnSchedulers

  • fromCallable (block IO)

  • switchIfEmpty test previos to defer

  • Mono.defer(() -> {...})

  • Mono.defer atomicLong

  • Flux.concat(...), fluxA,concatWith( fluxB), flux.combineLatest(...)

  • Flux.merge( fluxA, fluxB), FluxA.mergeWith( fluxB)...

  • Flux.mergeSequential( fluxA, fluxB), Flux.concat(WithErro), Flux.concatDelayError(1, fluxA, fluxB,...)

  • Flux.map, Flux.flatMap

  • Flux.flatMap delaying flux

  • Flux.flatMapSequential

  • Flux.zip( fluxA, fluxB,...) fluxA.zipWith( fluxB)

  • BlockHound to check if blocking code.

Executions Mono<T> Test


Mono.just("...").log()
subscribe()
@Slf4j
public class MonoTest {
@Test
public void monoSubscriberTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log();
mono.subscribe();

log.info("Mono: {}", mono);
}
}
output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | onComplete()
[main] INFO com.bext.reactor.MonoTest - Mono: MonoLogFuseable

Mono Test
@Test
public void monoSubscriberTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log();
mono.subscribe();

StepVerifier.create(mono)
.expectNext("MonoHasJustThis")
.verifyComplete();
log.info("Mono: {}", mono);
}
verifyComplete call subscribe;

output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | onComplete()
[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | onComplete()
[main] INFO com.bext.reactor.MonoTest - Mono: MonoLogFuseable

subscribe( consumer )


@Test
public void monoSubscriberConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log();
mono.subscribe( t -> log.info("t: {}", t));

log.info("--------StepVerifier---------");

StepVerifier.create(mono)
.expectNext("MonoHasJustThis")
.verifyComplete();
}
output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO com.bext.reactor.MonoTest - t: MonoHasJustThis
[main] INFO reactor.Mono.Just.1 - | onComplete()
[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | onComplete()

subscribe( consumer, errorConsumer)

@Test
public void monoSubscriberConsumerErrorTest() {
Mono<String> mono = Mono.just("MonoHasJustThis").log()
.map( s -> {throw new RuntimeException("test subscribe with error flow");});

mono.subscribe( t -> log.info("t: {}", t), t -> log.error("error in the flow"));

log.info("--------StepVerifier---------");

StepVerifier.create(mono)
.expectError( RuntimeException.class)
.verify();
}
output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | cancel()
[main] ERROR com.bext.reactor.MonoTest - error in the flow
[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | cancel()

subscribe( consumer, errorConsumer, completeConsumer

@Test
public void monoSubscriberConsumerErrorConsumerCompleteConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase);

mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"));

log.info("--------StepVerifier---------");

StepVerifier.create(mono)
.expectNext("MonoHasJustThis".toUpperCase())
.verifyComplete();
}
output

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO com.bext.reactor.MonoTest - t: MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - Complete!
[main] INFO reactor.Mono.Just.1 - | onComplete()
[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | onComplete()

subscribe( consumer, errorConsumer, completeConsumer, subscriptionConsumer)

Cancel previous to next test code, just to see the output
@Test
public void monoSubscriberConsumerErrorConsumerCompleteConsumerSubscriptionConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase);

mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"),
Subscription::cancel);
output

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | cancel()

using subscriptionConsumer parameter request(3)
@Test
public void monoSubscriberConsumerErrorConsumerCompleteConsumerSubscriptionConsumerTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase);

mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"),
subscription -> subscription.request(3));

log.info("--------StepVerifier---------");

StepVerifier.create(mono)
.expectNext("MonoHasJustThis".toUpperCase())
.verifyComplete();
}
output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(3)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO com.bext.reactor.MonoTest - t: MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - Complete!
[main] INFO reactor.Mono.Just.1 - | onComplete()
[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO reactor.Mono.Just.1 - | onComplete()

Mono doOnMethods

@Test
public void monoDoOnMethodsTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase)
.doOnSubscribe( subscription -> log.info("doOnSubscribe - {}", subscription))
.doOnRequest( value -> log.info("doOnRequestThis - {}", value))
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnSuccess(s -> log.info("doOnSuccess - {}", s));

mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"));

output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO com.bext.reactor.MonoTest - doOnSubscribe - reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber@37654521
[main] INFO com.bext.reactor.MonoTest - doOnRequestThis - 9223372036854775807
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO com.bext.reactor.MonoTest - doOnNext - MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - doOnSuccess - MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - t: MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - Complete!
[main] INFO reactor.Mono.Just.1 - | onComplete()

Mono calling two doOnNext

  The mono is intact so doOnNext will execute the sentence, in this case twice.
public void monoDoOnMethodsTest() {
Mono<String> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase)
.doOnSubscribe( subscription -> log.info("doOnSubscribe - {}", subscription))
.doOnRequest( value -> log.info("doOnRequestThis - {}", value))
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnSuccess(s -> log.info("doOnSuccess - {}", s));

mono.subscribe( t -> log.info("t: {}", t),
Throwable::printStackTrace,
() -> log.info("Complete!"));

Output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO com.bext.reactor.MonoTest - doOnSubscribe - reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber@15aab8c6
[main] INFO com.bext.reactor.MonoTest - doOnRequestThis - 9223372036854775807
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO com.bext.reactor.MonoTest - doOnNext - MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - doOnNext - MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - doOnSuccess - MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - t: MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - Complete!
[main] INFO reactor.Mono.Just.1 - | onComplete()

  Eliminating the Mono, after the first doOnNext mono is emptied, so the next doOnNext don´t execute the sentence.

@Test
public void monoDoOnMethodsTest() {
Mono<Object> mono = Mono.just("MonoHasJustThis")
.log()
.map(String::toUpperCase)
.doOnSubscribe( subscription -> log.info("doOnSubscribe - {}", subscription))
.doOnRequest( value -> log.info("doOnRequestThis - {}", value))
.doOnNext( s -> log.info("doOnNext - {}", s))
.flatMap(s -> Mono.empty())
.doOnNext( s -> log.info("doOnNext - {}", s))
.doOnSuccess(s -> log.info("doOnSuccess - {}", s));

Output:

[main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[main] INFO com.bext.reactor.MonoTest - doOnSubscribe - reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@76b1e9b8
[main] INFO com.bext.reactor.MonoTest - doOnRequestThis - 9223372036854775807
[main] INFO reactor.Mono.Just.1 - | request(unbounded)
[main] INFO reactor.Mono.Just.1 - | onNext(MonoHasJustThis)
[main] INFO com.bext.reactor.MonoTest - doOnNext - MONOHASJUSTTHIS
[main] INFO com.bext.reactor.MonoTest - doOnSuccess - null
[main] INFO com.bext.reactor.MonoTest - Complete!
[main] INFO reactor.Mono.Just.1 - | onComplete()

Mono doOnError

@Test
public void monoDoOnErrorTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.log();

log.info("--------StepVerifier---------");

StepVerifier.create(monoError)
.expectError(IllegalArgumentException.class)
.verify();
}

Output:

[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.PeekFuseable.1 - | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
[main] INFO reactor.Mono.PeekFuseable.1 - | request(unbounded)
[main] ERROR com.bext.reactor.MonoTest - doOnError: Mono Error
[main] ERROR reactor.Mono.PeekFuseable.1 - | onError(java.lang.IllegalArgumentException: Mono Error)
[main] ERROR reactor.Mono.PeekFuseable.1 - 
java.lang.IllegalArgumentException: Mono Error
at com.bext.reactor.MonoTest.monoDoOnErrorNopTest(MonoTest.java:117)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

Mono doOnError and after doOnNext is not executed

@Test
public void monoDoOnErrorNopTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.doOnNext(o -> log.info("This doOnNext is not executed {}: ", o))
.log();

log.info("--------StepVerifier---------");

StepVerifier.create(monoError)
.expectError(IllegalArgumentException.class)
.verify();
}
Output:

[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.PeekFuseable.1 - | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
[main] INFO reactor.Mono.PeekFuseable.1 - | request(unbounded)
[main] ERROR com.bext.reactor.MonoTest - doOnError: Mono Error
[main] ERROR reactor.Mono.PeekFuseable.1 - | onError(java.lang.IllegalArgumentException: Mono Error)
[main] ERROR reactor.Mono.PeekFuseable.1 - 
java.lang.IllegalArgumentException: Mono Error

Mono onErrorResume

@Test
public void monoDoOnErrorResumeTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.onErrorResume(throwable -> {
log.info("in onErrorResume: {}", throwable.getMessage());
return Mono.just("Fixed the flow");
})
.log();

log.info("--------StepVerifier---------");

StepVerifier.create(monoError)
.expectNext("Fixed the flow")
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[main] INFO reactor.Mono.OnErrorResume.1 - request(unbounded)
[main] ERROR com.bext.reactor.MonoTest - doOnError: Mono Error
[main] INFO com.bext.reactor.MonoTest - in onErrorResume: Mono Error
[main] INFO reactor.Mono.OnErrorResume.1 - onNext(Fixed the flow)
[main] INFO reactor.Mono.OnErrorResume.1 - onComplete()

Mono onErrorResume disables doOnError after it

@Test
public void monoDoOnErrorResumeDisableDoOnErrorTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.onErrorResume(throwable -> {
log.info("in onErrorResume: {}", throwable.getMessage());
return Mono.just("Fixed the flow");
})
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.log();

log.info("--------StepVerifier---------");

StepVerifier.create(monoError)
.expectNext("Fixed the flow")
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.PeekTerminal.1 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
[main] INFO reactor.Mono.PeekTerminal.1 - | request(unbounded)
[main] INFO com.bext.reactor.MonoTest - in onErrorResume: Mono Error
[main] INFO reactor.Mono.PeekTerminal.1 - | onNext(Fixed the flow)
[main] INFO reactor.Mono.PeekTerminal.1 - | onComplete()

@Test
public void monoDoOnErrorReturnTest() {
Mono<Object> monoError = Mono.error(new IllegalArgumentException("Mono Error"))
.onErrorReturn("Returned by OnErrorReturn")
.onErrorResume(throwable -> {
log.info("in onErrorResume: {}", throwable.getMessage());
return Mono.just("Fixed the flow");
})
.doOnError(throwable -> log.error("doOnError: {}", throwable.getMessage()))
.log();

log.info("--------StepVerifier---------");

StepVerifier.create(monoError)
.expectNext("Returned by OnErrorReturn")
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.MonoTest - --------StepVerifier---------
[main] INFO reactor.Mono.PeekTerminal.1 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
[main] INFO reactor.Mono.PeekTerminal.1 - | request(unbounded)
[main] INFO reactor.Mono.PeekTerminal.1 - | onNext(Returned by OnErrorReturn)
[main] INFO reactor.Mono.PeekTerminal.1 - | onComplete()


eot

No hay comentarios:

Publicar un comentario