viernes, 18 de marzo de 2022

Project Reactor Essentials 3/3

 Project Reactor Essentials 3/3




Operators Test


@Test
public void subscribeOnSchedulersSingleTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 4)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
Output:

[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread single-1

@Test
public void subscribeOnSchedulersBoundedElasticTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 4)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
Output:

[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-1

@Test
public void publishOnSchedulersSingleTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 6)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6)
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread main
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread single-1
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread main
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread single-1
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread main
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread single-1

@Test
public void publishOnSchedulersBoundedElasticTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 6)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6)
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread main
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-2
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread main
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-1
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread main
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-1

@Test
public void multiplePublishOnSchedulersTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 8)
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("4th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6, 7, 8)
.verifyComplete();
}
Output:

[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread single-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 1 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 3 - on thread single-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread single-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 8 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 7 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 8 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 7 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 4 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 8 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 6 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 7 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 8 - on thread single-1

@Test
public void multipleSubscribeOnSchedulersTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 8)
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("4th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6, 7, 8)
.verifyComplete();
}
Output:

[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 8 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 8 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 8 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 8 - on thread boundedElastic-2

@Test
public void multiplePublishOnSubscribeOnSchedulers2Test() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 8)
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("4th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6, 7, 8)
.verifyComplete();
}
Output:

[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 1 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 3 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 4 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 7 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 7 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 5 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 8 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 8 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 6 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 7 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 7 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 8 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 8 - on thread boundedElastic-1

@Test
public void subscribeOnIOTest() {
Mono<List<String>> listMono = Mono.fromCallable(() -> Files.readAllLines(Path.of("data.txt")))
.log()
.subscribeOn(Schedulers.boundedElastic());

StepVerifier.create(listMono)
.expectSubscription()
.thenConsumeWhile(lines -> {
Assertions.assertFalse(lines.isEmpty());
log.info("lines size: {}", lines.size());
return true;
})
.verifyComplete();
}
Output:

[boundedElastic-1] INFO reactor.Mono.Callable.1 - | onSubscribe([Fuseable] Operators.MonoSubscriber)
[boundedElastic-1] INFO reactor.Mono.Callable.1 - | request(unbounded)
[boundedElastic-1] INFO reactor.Mono.Callable.1 - | onNext([first line, second line, third line, fourth line end.])
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - lines size: 4
[boundedElastic-1] INFO reactor.Mono.Callable.1 - | onComplete()

@Test
public void switchIfEmptyTest() {
Flux<Object> flux = fluxEmpty()
.switchIfEmpty(Flux.just("It", "was", "empty", "but", "now", "not", "anymore"))
.log();

StepVerifier.create(flux)
.expectSubscription()
.expectNext("It", "was", "empty", "but", "now", "not", "anymore")
.expectComplete()
.verify();

}

private Flux<Object> fluxEmpty() {
return Flux.empty();
}


Output:

[main] INFO reactor.Flux.SwitchIfEmpty.1 - onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - request(unbounded)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(It)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(was)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(empty)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(but)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(now)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(not)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(anymore)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onComplete()

@Test
public void deferTest() throws InterruptedException {
Mono<Long> monoTick = Mono.just(System.currentTimeMillis());
Mono<Long> defer = Mono.defer(() -> Mono.just(System.currentTimeMillis()));

defer.subscribe(tick -> log.info("tick {}", tick));
Thread.sleep(100);
defer.subscribe(tick -> log.info("tick {}", tick));
Thread.sleep(100);
defer.subscribe(tick -> log.info("tick {}", tick));
Thread.sleep(100);
defer.subscribe(tick -> log.info("tick {}", tick));

AtomicLong atomicLong = new AtomicLong();
defer.subscribe(atomicLong::set);
Assertions.assertTrue(atomicLong.get() > 0);
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331088
[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331197
[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331305
[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331414

@Test
public void fluxConcatTest() {
Flux<String> fluxA = Flux.just("a", "b");
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.concat(fluxA, fluxB).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a", "b", "c", "d")
.expectComplete()
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(b)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(c)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(d)
[main] INFO reactor.Flux.ConcatArray.1 - onComplete()

@Test
public void fluxConcatWithTest() {
Flux<String> fluxA = Flux.just("a", "b");
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcatWith = fluxA.concatWith(fluxB).log();

StepVerifier.create(fluxConcatWith)
.expectSubscription()
.expectNext("a", "b", "c", "d")
.expectComplete()
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(b)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(c)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(d)
[main] INFO reactor.Flux.ConcatArray.1 - onComplete()

@Test
public void fluxCombineLatestTest() throws InterruptedException {

Flux<String> fluxA = Flux.just("a", "b", "c", "d").delayElements(Duration.ofMillis(100)).log();
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190)).log();

Flux<String> fluxCombineLatest = Flux.combineLatest(fluxA, fluxB, (fa, fb) -> fa.toUpperCase() + fb);

fluxCombineLatest.subscribe(log::info);

StepVerifier.create(fluxCombineLatest)
.expectSubscription()
.expectNext("A1", "B1", "C1", "C2", "D2")
.expectComplete()
.verify();

Thread.sleep(500);
}
Output:

[main] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.1 - request(32)
[main] INFO reactor.Flux.ConcatMap.2 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.2 - request(32)
[main] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.1 - request(32)
[main] INFO reactor.Flux.ConcatMap.2 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.2 - request(32)
[parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(a)
[parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(a)
[parallel-2] INFO reactor.Flux.ConcatMap.2 - onNext(1)
[parallel-2] INFO com.bext.reactor.OperatorsTest - A1
[parallel-4] INFO reactor.Flux.ConcatMap.2 - onNext(1)
[parallel-5] INFO reactor.Flux.ConcatMap.1 - onNext(b)
[parallel-5] INFO com.bext.reactor.OperatorsTest - B1
[parallel-6] INFO reactor.Flux.ConcatMap.1 - onNext(b)
[parallel-9] INFO reactor.Flux.ConcatMap.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - C1
[parallel-10] INFO reactor.Flux.ConcatMap.1 - onNext(c)
[parallel-8] INFO reactor.Flux.ConcatMap.2 - onNext(2)
[parallel-7] INFO reactor.Flux.ConcatMap.2 - onNext(2)
[parallel-7] INFO com.bext.reactor.OperatorsTest - C2
[parallel-8] INFO reactor.Flux.ConcatMap.2 - onComplete()
[parallel-7] INFO reactor.Flux.ConcatMap.2 - onComplete()
[parallel-11] INFO reactor.Flux.ConcatMap.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - D2
[parallel-11] INFO reactor.Flux.ConcatMap.1 - onComplete()
[parallel-11] INFO reactor.Flux.ConcatMap.1 - cancel()
[parallel-11] INFO reactor.Flux.ConcatMap.2 - cancel()
[parallel-12] INFO reactor.Flux.ConcatMap.1 - onNext(d)
[parallel-12] INFO reactor.Flux.ConcatMap.1 - onComplete()
[parallel-12] INFO reactor.Flux.ConcatMap.1 - cancel()
[parallel-12] INFO reactor.Flux.ConcatMap.2 - cancel()

@Test
public void fluxMergeTest() throws InterruptedException {
Flux<String> fluxA = Flux.just("a", "b","c", "d").delayElements(Duration.ofMillis(100));
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190));

Flux<String> fluxMerge = Flux.merge(fluxA, fluxB).log();

fluxMerge.subscribe(log::info);

StepVerifier.create(fluxMerge)
.expectSubscription()
.expectNext("a","1","b","c","2","d")
.verifyComplete();

Thread.sleep(600);
}
Output:

[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-1] INFO com.bext.reactor.OperatorsTest - a
[parallel-3] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-2] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-2] INFO com.bext.reactor.OperatorsTest - 1
[parallel-4] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-5] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-5] INFO com.bext.reactor.OperatorsTest - b
[parallel-6] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-9] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - c
[parallel-10] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-7] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-7] INFO com.bext.reactor.OperatorsTest - 2
[parallel-8] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-11] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - d
[parallel-11] INFO reactor.Flux.Merge.1 - onComplete()
[parallel-12] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-12] INFO reactor.Flux.Merge.1 - onComplete()

@Test
public void fluxMergeWithTest() throws InterruptedException {
Flux<String> fluxA = Flux.just("a", "b","c", "d").delayElements(Duration.ofMillis(100));
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190));

Flux<String> fluxAMergeWithB = fluxA.mergeWith(fluxB).log();
Flux<String> fluxBMergeWithA = fluxB.mergeWith(fluxA).log();

fluxAMergeWithB.subscribe(log::info);

StepVerifier.create( fluxAMergeWithB)
.expectSubscription()
.expectNext("a","1","b","c","2","d")
.verifyComplete();

StepVerifier.create( fluxBMergeWithA)
.expectSubscription()
.expectNext("a","1","b","c","2","d")
.verifyComplete();

Thread.sleep(600);
}
Output:

[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-1] INFO com.bext.reactor.OperatorsTest - a
[parallel-3] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-2] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-2] INFO com.bext.reactor.OperatorsTest - 1
[parallel-5] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-4] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-5] INFO com.bext.reactor.OperatorsTest - b
[parallel-6] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-9] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - c
[parallel-10] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-7] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-7] INFO com.bext.reactor.OperatorsTest - 2
[parallel-8] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-11] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - d
[parallel-11] INFO reactor.Flux.Merge.1 - onComplete()
[parallel-12] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-12] INFO reactor.Flux.Merge.1 - onComplete()
[main] INFO reactor.Flux.Merge.2 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.2 - request(unbounded)
[parallel-2] INFO reactor.Flux.Merge.2 - onNext(a)
[parallel-1] INFO reactor.Flux.Merge.2 - onNext(1)
[parallel-3] INFO reactor.Flux.Merge.2 - onNext(b)
[parallel-5] INFO reactor.Flux.Merge.2 - onNext(c)
[parallel-4] INFO reactor.Flux.Merge.2 - onNext(2)
[parallel-6] INFO reactor.Flux.Merge.2 - onNext(d)
[parallel-6] INFO reactor.Flux.Merge.2 - onComplete()

@Test
public void fluxMergeSecuentialTest() throws InterruptedException {
Flux<String> fluxA = Flux.just("a", "b","c", "d").delayElements(Duration.ofMillis(100));
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190));

Flux<String> fluxMerge = Flux.mergeSequential(fluxA, fluxB).log();

fluxMerge.subscribe(log::info);

StepVerifier.create(fluxMerge)
.expectSubscription()
.expectNext("a","b","c","d","1","2")
.verifyComplete();

Thread.sleep(600);
}
Output:

[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(a)
[parallel-1] INFO com.bext.reactor.OperatorsTest - a
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(a)
[parallel-5] INFO reactor.Flux.MergeSequential.1 - onNext(b)
[parallel-5] INFO com.bext.reactor.OperatorsTest - b
[parallel-6] INFO reactor.Flux.MergeSequential.1 - onNext(b)
[parallel-9] INFO reactor.Flux.MergeSequential.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - c
[parallel-10] INFO reactor.Flux.MergeSequential.1 - onNext(c)
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - d
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onNext(1)
[parallel-11] INFO com.bext.reactor.OperatorsTest - 1
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onNext(2)
[parallel-11] INFO com.bext.reactor.OperatorsTest - 2
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onComplete()
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onNext(d)
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onNext(1)
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onNext(2)
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onComplete()

@Test
public void fluxConcatWithErrorTest() {
Flux<String> fluxA = Flux.just("a", "b")
.map( t -> {
if (t.equals("b")){
throw new IllegalArgumentException();
}
return t;
});
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.concat(fluxA, fluxB).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a")
.expectError(IllegalArgumentException.class)
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.IllegalArgumentException)
[main] ERROR reactor.Flux.ConcatArray.1 - 
java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxConcatWithErrorTest$34(OperatorsTest.java:397)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2191)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2065)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8389)
at..... org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

@Test
public void fluxConcatDelayErrorTest() {
Flux<String> fluxA = Flux.just("a", "b")
.map( t -> {
if (t.equals("b")){
throw new IllegalArgumentException();
}
return t;
});
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.concatDelayError(fluxA, fluxB).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a","c","d")
.expectError(IllegalArgumentException.class)
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArrayDelayErrorSubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(c)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(d)
[main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.IllegalArgumentException)
[main] ERROR reactor.Flux.ConcatArray.1 - 
java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxConcatDelayErrorTest$35(OperatorsTest.java:417)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2191)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2065)
......
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

@Test
public void fluxMergeDelayErrorTest() {
Flux<String> fluxA = Flux.just("a", "b")
.map( t -> {
if (t.equals("b")){
throw new IllegalArgumentException();
}
return t;
}).doOnError( err -> log.error("fluxA.doOnError reporting error {}", err.getMessage()));

Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.mergeDelayError(1, fluxA, fluxB, fluxA).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a","c","d","a")
.expectError()
.verify();
}
Output:

[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[main] INFO reactor.Flux.Merge.1 - onNext(a)
[main] ERROR com.bext.reactor.OperatorsTest - fluxA.doOnError reporting error null
[main] INFO reactor.Flux.Merge.1 - onNext(c)
[main] INFO reactor.Flux.Merge.1 - onNext(d)
[main] INFO reactor.Flux.Merge.1 - onNext(a)
[main] ERROR com.bext.reactor.OperatorsTest - fluxA.doOnError reporting error null
[main] ERROR reactor.Flux.Merge.1 - onError(reactor.core.Exceptions$CompositeException: Multiple exceptions)
[main] ERROR reactor.Flux.Merge.1 - 
reactor.core.Exceptions$CompositeException: Multiple exceptions
at reactor.core.Exceptions.multiple(Exceptions.java:121)
at reactor.core.Exceptions.addThrowable(Exceptions.java:93)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:851)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:983)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
at reactor.co........ com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Suppressed: java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxMergeDelayErrorTest$36(OperatorsTest.java:437)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
... 87 more
Suppressed: java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxMergeDelayErrorTest$36(OperatorsTest.java:437)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
... 87 more

@Test
public void fluxMapTest() {
Flux<String> flux = Flux.just("a", "b");
Flux<Flux<String>> fluxFluxString = flux.map(String::toUpperCase)
.map(this::findByName)
.log();

fluxFluxString.subscribe(o -> o.toStream().forEach(s -> log.info("{}", s)));
}
public Flux<String> findByName(String name){
return name.equals("A") ? Flux.just("Abel","Andrea").delayElements(Duration.ofMillis(100)) : Flux.just("Beto","Bety");

}
Output:

[main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
[main] INFO reactor.Flux.MapFuseable.1 - | onNext(FluxConcatMap)
[main] INFO com.bext.reactor.OperatorsTest - Abel
[main] INFO com.bext.reactor.OperatorsTest - Andrea
[main] INFO reactor.Flux.MapFuseable.1 - | onNext(FluxArray)
[main] INFO com.bext.reactor.OperatorsTest - Beto
[main] INFO com.bext.reactor.OperatorsTest - Bety
[main] INFO reactor.Flux.MapFuseable.1 - | onComplete()

@Test
public void fluxFlatMapTest() throws InterruptedException{
Flux<String> flux = Flux.just("a","b");
Flux<String> fluxString = flux.map(String::toUpperCase)
.flatMap( this::findByName)
.log();

fluxString.subscribe( o -> log.info("{}", o));

StepVerifier.create(fluxString)
.expectSubscription()
.expectNext("Beto","Bety","Abel","Andrea")
.verifyComplete();

Thread.sleep(500);
}
Output:

[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(Beto)
[main] INFO com.bext.reactor.OperatorsTest - Beto
[main] INFO reactor.Flux.FlatMap.1 - onNext(Bety)
[main] INFO com.bext.reactor.OperatorsTest - Bety
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(Beto)
[main] INFO reactor.Flux.FlatMap.1 - onNext(Bety)
[parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(Abel)
[parallel-1] INFO com.bext.reactor.OperatorsTest - Abel
[parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(Abel)
[parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(Andrea)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Andrea
[parallel-3] INFO reactor.Flux.FlatMap.1 - onComplete()
[parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(Andrea)
[parallel-4] INFO reactor.Flux.FlatMap.1 - onComplete()

@Test
public void fluxFlatMapSequentialTest() throws InterruptedException{
Flux<String> flux = Flux.just("a","b");
Flux<String> fluxString = flux.map(String::toUpperCase)
.flatMapSequential( this::findByName)
.log();

fluxString.subscribe( o -> log.info("{}", o));

StepVerifier.create(fluxString)
.expectSubscription()
.expectNext("Abel","Andrea","Beto","Bety")
.verifyComplete();

Thread.sleep(500);
}

public Flux<String> findByName(String name){
return name.equals("A") ? Flux.just("Abel","Andrea").delayElements(Duration.ofMillis(100)) : Flux.just("Beto","Bety");

}
Output:

[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(Abel)
[parallel-1] INFO com.bext.reactor.OperatorsTest - Abel
[parallel-2] INFO reactor.Flux.MergeSequential.1 - onNext(Abel)
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(Andrea)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Andrea
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(Beto)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Beto
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(Bety)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Bety
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onComplete()
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onNext(Andrea)
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onNext(Beto)
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onNext(Bety)
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onComplete()

@Test
public void fluxZipTest(){
Flux<String> fluxAnimal = Flux.just("dog", "bird");
Flux<String> fluxAction = Flux.just("run", "fly");
Flux<Integer> fluxLegs = Flux.just(4, 2);

Flux<Animal> fluxCharacteristics = Flux.zip(fluxAnimal, fluxAction, fluxLegs)
.flatMap(tuple -> Flux.just(new Animal( tuple.getT1(), tuple.getT2(), tuple.getT3())));

fluxCharacteristics.subscribe( animal -> log.info(animal.toString()));

StepVerifier.create(fluxCharacteristics)
.expectSubscription()
.expectNext(new Animal("dog", "run", 4), new Animal("bird","fly",2))
.verifyComplete();
}
@AllArgsConstructor
@Getter
@ToString
@EqualsAndHashCode
class Animal{
private String name;
private String action;
private int legs;
}

Output:

[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=dog, action=run, legs=4)
[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=bird, action=fly, legs=2)

@Test
public void fluxZipWithTest(){
Flux<String> fluxAnimal = Flux.just("dog", "bird");
Flux<String> fluxAction = Flux.just("run", "fly");
Flux<Integer> fluxLegs = Flux.just(4, 2);

Flux<Animal> fluxCharacteristics = fluxAnimal.zipWith(fluxLegs)
.flatMap(tuple -> Flux.just(new Animal( tuple.getT1(), null, tuple.getT2())));

fluxCharacteristics.subscribe( animal -> log.info(animal.toString()));

StepVerifier.create(fluxCharacteristics)
.expectSubscription()
.expectNext(new Animal("dog", null, 4), new Animal("bird",null,2))
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=dog, action=null, legs=4)
[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=bird, action=null, legs=2)

eot

No hay comentarios:

Publicar un comentario