Project Reactor Essentials 2/3
Flux<T> Tests
@Test
public void fluxSubsccriberTest() {
Flux<String> fluxString = Flux.just("Flow", "from", "flux", ".just()")
.log();
fluxString.subscribe();
log.info("--------StepVerifier---------");
StepVerifier.create(fluxString)
.expectNext("Flow", "from", "flux", ".just()")
.verifyComplete();
}
Output:
[main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[main] INFO reactor.Flux.Array.1 - | request(unbounded)
[main] INFO reactor.Flux.Array.1 - | onNext(Flow)
[main] INFO reactor.Flux.Array.1 - | onNext(from)
[main] INFO reactor.Flux.Array.1 - | onNext(flux)
[main] INFO reactor.Flux.Array.1 - | onNext(.just())
[main] INFO reactor.Flux.Array.1 - | onComplete()
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[main] INFO reactor.Flux.Array.1 - | request(unbounded)
[main] INFO reactor.Flux.Array.1 - | onNext(Flow)
[main] INFO reactor.Flux.Array.1 - | onNext(from)
[main] INFO reactor.Flux.Array.1 - | onNext(flux)
[main] INFO reactor.Flux.Array.1 - | onNext(.just())
[main] INFO reactor.Flux.Array.1 - | onComplete()
@Test
public void fluxSubscribeNumbersTest() {
Flux<Integer> fluxInteger = Flux.range(1, 5)
.log();
fluxInteger.subscribe(integer -> log.info("flux integer {}", integer));
log.info("--------StepVerifier---------");
StepVerifier.create(fluxInteger)
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
}
Output:
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO com.bext.reactor.FluxTest - flux integer 1
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO com.bext.reactor.FluxTest - flux integer 2
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO com.bext.reactor.FluxTest - flux integer 3
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO com.bext.reactor.FluxTest - flux integer 4
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO com.bext.reactor.FluxTest - flux integer 5
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onComplete()
@Test
public void fluxSubscribeFromIterableTest() {
Flux<Integer> fluxInteger = Flux.fromIterable(List.of(1, 2, 3, 4, 5))
.log();
Flux.concat();
fluxInteger.subscribe(integer -> log.info("flux integer {}", integer), Throwable::printStackTrace
, () -> log.info("Completed!"));
log.info("--------StepVerifier---------");
StepVerifier.create(fluxInteger)
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
}
Output:
[main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
[main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
[main] INFO reactor.Flux.Iterable.1 - | onNext(1)
[main] INFO com.bext.reactor.FluxTest - flux integer 1
[main] INFO reactor.Flux.Iterable.1 - | onNext(2)
[main] INFO com.bext.reactor.FluxTest - flux integer 2
[main] INFO reactor.Flux.Iterable.1 - | onNext(3)
[main] INFO com.bext.reactor.FluxTest - flux integer 3
[main] INFO reactor.Flux.Iterable.1 - | onNext(4)
[main] INFO com.bext.reactor.FluxTest - flux integer 4
[main] INFO reactor.Flux.Iterable.1 - | onNext(5)
[main] INFO com.bext.reactor.FluxTest - flux integer 5
[main] INFO reactor.Flux.Iterable.1 - | onComplete()
[main] INFO com.bext.reactor.FluxTest - Completed!
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
[main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
[main] INFO reactor.Flux.Iterable.1 - | onNext(1)
[main] INFO reactor.Flux.Iterable.1 - | onNext(2)
[main] INFO reactor.Flux.Iterable.1 - | onNext(3)
[main] INFO reactor.Flux.Iterable.1 - | onNext(4)
[main] INFO reactor.Flux.Iterable.1 - | onNext(5)
[main] INFO reactor.Flux.Iterable.1 - | onComplete()
@Test
public void fluxSubscriberNumberErrorTest() {
Flux<Integer> fluxInteger = Flux.range(1, 5)
.log()
.map(i -> {
if (i == 4) {
throw new IndexOutOfBoundsException(" index == 4, Error");
}
return i;
});
fluxInteger.subscribe(integer -> log.info("flux integer {}", integer), Throwable::printStackTrace,
() -> log.info("Complete!"));
log.info("--------StepVerifier---------");
StepVerifier.create(fluxInteger)
.expectNext(1, 2, 3)
.expectError(IndexOutOfBoundsException.class)
.verify();
}
Output:
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO com.bext.reactor.FluxTest - flux integer 1
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO com.bext.reactor.FluxTest - flux integer 2
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO com.bext.reactor.FluxTest - flux integer 3
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | cancel()
java.lang.IndexOutOfBoundsException: index == 4, Error
at com.bext.reactor.FluxTest.lambda$fluxSubscriberNumberErrorTest$3(FluxTest.java:71)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:130)
at reactor.co org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at
......... org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | cancel()
@Test
public void fluxSubscriberNumberBackPressureTest() {
Flux<Integer> fluxInteger = Flux.range(1, 10)
.log();
fluxInteger.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
private int count = 0;
private final int requestCount = 3;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(requestCount);
}
@Override
public void onNext(Integer integer) {
count++;
if (count % requestCount == 0) {
subscription.request(requestCount);
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Output:
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
@Test
public void fluxSubscriberNumberBackPressureTest() {
Flux<Integer> fluxInteger = Flux.range(1, 10)
.log();
fluxInteger.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
private int count = 0;
private final int requestCount = 3;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(requestCount);
}
@Override
public void onNext(Integer integer) {
count++;
if (count % requestCount == 0) {
subscription.request(requestCount);
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
log.info("--------StepVerifier---------");
StepVerifier.create(fluxInteger)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}
Output:
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
@Test
public void fluxSubscriberNumberBackPressureBaseSubscribeTest() {
Flux<Integer> fluxInteger = Flux.range(1, 10)
.log();
fluxInteger.subscribe(new BaseSubscriber<>() {
private int count = 0;
private final int requestCount = 3;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(3);
}
@Override
protected void hookOnNext(Integer value) {
count++;
if (count % requestCount == 0) {
request(requestCount);
}
}
});
log.info("--------StepVerifier---------");
StepVerifier.create(fluxInteger)
.expectSubscription()
.expectNext(1,2,3,4,5,6,7,8,9,10)
.verifyComplete();
}
Output:
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
@Test
public void intervalTest() throws InterruptedException {
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
.log()
.doOnCancel(() -> log.info("doOnCancel")) //not called by main thread stopped
.doOnError( err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped
.doFinally(signalType -> log.info("doFinally {}", signalType)) //not called by main thread stopped
.doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped
.doAfterTerminate(() -> log.info("doAfterTerminate")); //not called by main thread stopped
fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!"));
Thread.sleep(1000); // to break the flux.interval
}
Output:
[main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
[main] INFO reactor.Flux.Interval.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 0
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 1
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 2
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 3
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 4
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 5
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(6)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 6
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(7)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 7
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(8)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 8
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(9)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 9
@Test
public void intervalDisposedTest() throws InterruptedException {
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
.log()
.doOnCancel(() -> log.info("doOnCancel")) //called when subscribe.dispose
.doOnError( err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped or subscription disposed
.doFinally(signalType -> log.info("doFinally {}", signalType)) //called when subscribe.dispose
.doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped or subscription disposed
.doAfterTerminate(() -> log.info("doAfterTerminate")); //not called by main thread stopped or subscription disposed
Disposable subscribe = fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!"));
Thread.sleep(300);
subscribe.dispose();
Thread.sleep(1000); // to break the flux.interval
}
Output:
[main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
[main] INFO reactor.Flux.Interval.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 0
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 1
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 2
[main] INFO com.bext.reactor.FluxTest - doOnCancel
[main] INFO reactor.Flux.Interval.1 - cancel()
[main] INFO com.bext.reactor.FluxTest - doFinally cancel
@Test
public void intervalDisposedBySubscriberProgrammaticallyTest() throws InterruptedException {
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
.log()
.doOnCancel(() -> log.info("doOnCancel")) //called when subscribe.dispose
.doOnError(err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped or subscription disposed
.doFinally(signalType -> log.info("doFinally {}", signalType)) //called when subscribe.dispose
.doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped or subscription disposed
.doAfterTerminate(() -> log.info("doAfterTerminate")); //not called by main thread stopped or subscription disposed
BaseSubscriber subscriber = new BaseSubscriber() {
@Override
public void dispose() {
super.dispose();
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
log.info("hookOnSubscribe");
}
@Override
protected void hookOnNext(Object value) {
request(1);
if (value.toString().equals("4") ) dispose();
}
};
fluxInterval.subscribe( subscriber);
Thread.sleep(1000); // to break the flux.interval
}
Output:
[main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
[main] INFO reactor.Flux.Interval.1 - request(1)
[main] INFO com.bext.reactor.FluxTest - hookOnSubscribe
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO com.bext.reactor.FluxTest - doOnCancel
[parallel-1] INFO reactor.Flux.Interval.1 - cancel()
[parallel-1] INFO com.bext.reactor.FluxTest - doFinally cancel
@Test
public void intervalTerminatedTest() throws InterruptedException {
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
.log()
.take(3)
.doOnCancel(() -> log.info("doOnCancel")) //Not called
.doOnError(err -> log.error("doOnError {}", err.getMessage())) //Not called
.doOnTerminate(() -> log.info("doOnTerminate")) //called 1st when flux finish
.doAfterTerminate(() -> log.info("doAfterTerminate")) //called 2nd when flux finish
.doFinally(signalType -> log.info("doFinally signal: {}", signalType)); //called 3rd when flux finish
fluxInterval.subscribe(aLong -> log.info("interval: {}", aLong), Throwable::printStackTrace, () -> log.info("Completed!"));
log.info("--------StepVerifier---------");
StepVerifier.create(fluxInterval)
.expectSubscription()
.expectNext(0L,1L,2L)
.expectComplete()
.verify();
Thread.sleep(1000); // to break the flux.interval
}
Output:
[main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
[main] INFO reactor.Flux.Interval.1 - request(unbounded)
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
[main] INFO reactor.Flux.Interval.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 0
[parallel-2] INFO reactor.Flux.Interval.1 - onNext(0)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 1
[parallel-2] INFO reactor.Flux.Interval.1 - onNext(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
[parallel-1] INFO com.bext.reactor.FluxTest - interval: 2
[parallel-1] INFO reactor.Flux.Interval.1 - cancel()
[parallel-1] INFO com.bext.reactor.FluxTest - doOnTerminate
[parallel-1] INFO com.bext.reactor.FluxTest - Completed!
[parallel-1] INFO com.bext.reactor.FluxTest - doFinally signal: onComplete
[parallel-1] INFO com.bext.reactor.FluxTest - doAfterTerminate
[parallel-2] INFO reactor.Flux.Interval.1 - onNext(2)
[parallel-2] INFO reactor.Flux.Interval.1 - cancel()
[parallel-2] INFO com.bext.reactor.FluxTest - doOnTerminate
[parallel-2] INFO com.bext.reactor.FluxTest - doFinally signal: onComplete
[parallel-2] INFO com.bext.reactor.FluxTest - doAfterTerminate
@Test
public void intervalSubscribeCancelProgrammaticallyBaseSubscriberTest() throws InterruptedException {
Flux<Long> fluxInterval = Flux.interval(Duration.ofMillis(100))
.log()
.doOnCancel(() -> log.info("doOnCancel")) //called when subscribe.cancel
.doOnError(err -> log.error("doOnError {}", err.getMessage())) //not called by main thread stopped or subscription canceled
.doOnTerminate(() -> log.info("doOnTerminate")) //not called by main thread stopped or subscription canceled
.doAfterTerminate(() -> log.info("doAfterTerminate")) //not called by main thread stopped or subscription canceled
.doFinally(signalType -> log.info("doFinally {}", signalType)); //called when subscribe.cancel
BaseSubscriber subscriber = new BaseSubscriber() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
log.info("hookOnSubscribe");
}
@Override
protected void hookOnNext(Object value) {
request(1);
if (value.toString().equals("4") ) cancel();
}
};
fluxInterval.subscribe( subscriber);
Thread.sleep(1000); // to break the flux.interval
}
Output:
[main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
[main] INFO reactor.Flux.Interval.1 - request(1)
[main] INFO com.bext.reactor.FluxTest - hookOnSubscribe
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
[parallel-1] INFO reactor.Flux.Interval.1 - request(1)
[parallel-1] INFO com.bext.reactor.FluxTest - doOnCancel
[parallel-1] INFO reactor.Flux.Interval.1 - cancel()
[parallel-1] INFO com.bext.reactor.FluxTest - doFinally cancel
@Test
public void intervalWithVirtualTest(){
StepVerifier.withVirtualTime(this::fluxIntervalDuration)
.expectSubscription()
.thenAwait(Duration.ofDays(1))
.expectNoEvent(Duration.ofHours(24))
.expectNext(0L)
.thenAwait(Duration.ofDays(1))
.expectNext(1L)
.thenCancel()
.verify();
}
private Flux<Long> fluxIntervalDuration() {
return Flux.interval(Duration.ofDays(1))
.take(10)
.log();
}
Output:
[main] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
[main] INFO reactor.Flux.Take.1 - request(unbounded)
[main] INFO reactor.Flux.Take.1 - onNext(0)
[main] INFO reactor.Flux.Take.1 - onNext(1)
[main] INFO reactor.Flux.Take.1 - cancel()
@Test
public void fluxRangelimitRateTest(){
Flux<Integer> fluxRange = Flux.range(1, 10)
.log()
.limitRate(3);
log.info("--------StepVerifier---------");
StepVerifier.create(fluxRange)
.expectSubscription()
.expectNext(1,2,3,4,5,6,7,8,9,10)
.verifyComplete();
}
Output:
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | request(3)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
@Test
public void connectableFlux_Hot_Test() throws InterruptedException {
ConnectableFlux<Integer> publish = Flux.range(1, 10)
.log()
.delayElements(Duration.ofMillis(100))
.publish();
/* publish.connect();
Thread.sleep(200); log.info("Thread sleeping for 200 mS");
publish.subscribe(i -> log.info("1.- subscribe element {}", i));
Thread.sleep(300); log.info("Thread sleeping for 300 mS");
publish.subscribe(i -> log.info("2.- subscribe element {}", i));
*/
log.info("--------StepVerifier---------");
StepVerifier.create(publish)
.then(publish::connect)
.thenConsumeWhile( i -> i < 5)
.expectNext(5,6,7,8,9,10)
.verifyComplete();
//Thread.sleep(600);
}
Output:
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(32)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onComplete()
@Test
public void connectableFluxAutoConnectTest(){
Flux<Integer> fluxAutoconnect = Flux.range(1, 5)
.log()
.delayElements(Duration.ofMillis(100))
.publish()
.autoConnect(2);
log.info("--------StepVerifier---------");
StepVerifier.create(fluxAutoconnect)
.then(fluxAutoconnect::subscribe)
.expectNext(1,2,3,4,5)
.expectComplete()
.verify();
}
Output:
[main] INFO com.bext.reactor.FluxTest - --------StepVerifier---------
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(32)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onComplete()
eot
No hay comentarios:
Publicar un comentario