miércoles, 30 de marzo de 2022

Spring WebFlux Profile Service

Spring WebFlux Profile Service


Create a spring boot application with reactiveweb, lombok, reactiveMongoDB, Okta with springinitializr

Initialize the Repository with some data when ApplicationReadyEvent through ApplicationListener



Class Initializer init the repository due the ApplicationReadyEvent.
@Log4j2
@Component
public class Initializer implements ApplicationListener<ApplicationReadyEvent> {

private final ProfileRepository profileRepository;

public Initializer(ProfileRepository profileRepository) {
this.profileRepository = profileRepository;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Flux<Profile> profiles = Flux
.just("Hugo","Paco","Luis","Daisy")
.map( email -> new Profile(null, email))
.flatMap( this.profileRepository::save);

this.profileRepository.deleteAll()
.thenMany( profiles)
.thenMany( this.profileRepository.findAll())
.subscribe(log::info);
}
}

When runs

2022-03-30 11:51:41.987  INFO 11260 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017
2022-03-30 11:51:41.987  INFO 11260 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017
2022-03-30 11:51:41.989  INFO 11260 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=34528800}
2022-03-30 11:51:42.057  INFO 11260 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-03-30 11:51:42.067  INFO 11260 --- [           main] c.b.p.ProfileserviceApplication          : Started ProfileserviceApplication in 1.635 seconds (JVM running for 2.77)
2022-03-30 11:51:42.137  INFO 11260 --- [ntLoopGroup-3-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:3}] to localhost:27017
2022-03-30 11:51:42.339  INFO 11260 --- [ntLoopGroup-3-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27017
2022-03-30 11:51:42.339  INFO 11260 --- [ntLoopGroup-3-5] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:5}] to localhost:27017
2022-03-30 11:51:42.379  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01b, email=Hugo)
2022-03-30 11:51:42.380  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01e, email=Daisy)
2022-03-30 11:51:42.380  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01d, email=Luis)
2022-03-30 11:51:42.380  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01c, email=Paco)

Using new Subscriber<> on .subscribe(...)

this.profileRepository.deleteAll()
.thenMany(profiles)
.thenMany(this.profileRepository.findAll())
.subscribe(new Subscriber<Profile>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe: {}", s);
s.request( profiles.count().block());
}

@Override
public void onNext(Profile profile) {
log.info("onNext: {}", profile);
}

@Override
public void onError(Throwable t) {
log.error("onError");
}

@Override
public void onComplete() {
log.info("onComplete: Completed!");
}
});
}

When Runs .subscribe(new Subscriber(....) ...

2022-03-30 12:40:06.846  INFO 9676 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:45}] to localhost:27017
2022-03-30 12:40:06.846  INFO 9676 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:44}] to localhost:27017
2022-03-30 12:40:06.847  INFO 9676 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=36455100}
2022-03-30 12:40:06.905  INFO 9676 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-03-30 12:40:06.912  INFO 9676 --- [           main] c.b.p.ProfileserviceApplication          : Started ProfileserviceApplication in 1.55 seconds (JVM running for 2.239)
2022-03-30 12:40:06.940  INFO 9676 --- [           main] c.bext.profileservice.init.Initializer   : onSubscribe: reactor.core.publisher.StrictSubscriber@546ed2a0
2022-03-30 12:40:06.993  INFO 9676 --- [ntLoopGroup-3-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:47}] to localhost:27017
2022-03-30 12:40:06.993  INFO 9676 --- [ntLoopGroup-3-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:46}] to localhost:27017
2022-03-30 12:40:07.004  INFO 9676 --- [ntLoopGroup-3-5] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:48}] to localhost:27017
2022-03-30 12:40:07.004  INFO 9676 --- [ntLoopGroup-3-6] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:6, serverValue:49}] to localhost:27017
2022-03-30 12:40:07.062  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b1, email=Paco)
2022-03-30 12:40:07.062  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b2, email=Luis)
2022-03-30 12:40:07.063  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b0, email=Hugo)
2022-03-30 12:40:07.063  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b3, email=Daisy)
2022-03-30 12:40:07.063  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onComplete: Completed!

Using new Consumer on .suscribe(...)


this.profileRepository.deleteAll()
.thenMany( profiles)
.thenMany( this.profileRepository.findAll())
.subscribe(new Consumer<Profile>() {
@Override
public void accept(Profile profile) {
log.info("Consumer.accept: {}" , profile);
}
});
}
When runs

2022-03-30 12:50:05.221  INFO 15496 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:56}] to localhost:27017
2022-03-30 12:50:05.221  INFO 15496 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:55}] to localhost:27017
2022-03-30 12:50:05.222  INFO 15496 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=35209600}
2022-03-30 12:50:05.279  INFO 15496 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-03-30 12:50:05.286  INFO 15496 --- [           main] c.b.p.ProfileserviceApplication          : Started ProfileserviceApplication in 1.553 seconds (JVM running for 2.215)
2022-03-30 12:50:05.349  INFO 15496 --- [ntLoopGroup-3-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:57}] to localhost:27017
2022-03-30 12:50:05.400  INFO 15496 --- [ntLoopGroup-3-5] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:59}] to localhost:27017
2022-03-30 12:50:05.401  INFO 15496 --- [ntLoopGroup-3-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:58}] to localhost:27017
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81d8, email=Hugo)
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81db, email=Daisy)
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81da, email=Luis)
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81d9, email=Paco)


Event Sourcing 


When  a new record is created an event is triggered to be communicated with other component in the same JVM, some like to CQRS.

An ApplicationEvent with the profile is needed


public class ProfileCreatedEvent extends ApplicationEvent {
public ProfileCreatedEvent(Profile source) {
super(source);
}
}

And is activated when the profile is created.

...
public Mono<Profile> create(String email){
return this.profileRepository.save( new Profile(null, email))
.doOnSuccess(profile -> this.applicationEventPublisher.publishEvent( new ProfileCreatedEvent( profile)));
}
...

The Controller

The ProfileController to test the functionality of the service

@RestController
@RequestMapping("/profiles")
public class ProfileController {

private final ProfileService profileService;

public ProfileController(ProfileService profileService) {
this.profileService = profileService;
}

@GetMapping
Flux<Profile> all(){
return this.profileService.all();
}

@RequestMapping("/{id}")
Mono<Profile> byId(@PathVariable("id") String id){
return this.profileService.byId(id);
}

@PostMapping
Mono<ResponseEntity<Object>> create(@RequestBody Profile profile) {
return this.profileService.create( profile.getEmail())
.map(profileSaved -> ResponseEntity.created(URI.create("/profiles/" + profileSaved.getId()))
.build());

}
}

Test the Controller


To test the profile controller is necessary disable the okta dependency, otherwise a login window in browser is shown.

<!--
<dependency>
<groupId>com.okta.spring</groupId>
<artifactId>okta-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
-->

Run the application and check for the response of the endpoints

C:\WINDOWS\system32>curl http://localhost:8080/profiles
[{"id":"6244d52fe3281a6063300c8f","email":"Hugo"},{"id":"6244d52fe3281a6063300c92","email":"Daisy"},{"id":"6244d52fe3281a6063300c91","email":"Luis"},{"id":"6244d52fe3281a6063300c90","email":"Paco"}]

C:\WINDOWS\system32>curl http://localhost:8080/profiles/6244d52fe3281a6063300c92
{"id":"6244d52fe3281a6063300c92","email":"Daisy"}

bext@DESKTOP-NLF0058 MINGW64 /e/Descargas/Reactor/Imagenes
$ curl \
> -X POST \
> -H "Content-Type: application/json" \
> -d '{"email":"newProfile"}' http://localhost:8080/profiles
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18    0     0  100    18      0    818 --:--:-- --:--:-- --:--:--   857

C:\WINDOWS\system32>curl http://localhost:8080/profiles/
[{"id":"6244d52fe3281a6063300c8f","email":"Hugo"},{"id":"6244d52fe3281a6063300c92","email":"Daisy"},{"id":"6244d52fe3281a6063300c91","email":"Luis"},{"id":"6244d52fe3281a6063300c90","email":"Paco"},{"id":"6244df9ee3281a6063300c94","email":"newProfile"}]

ProfileCreatedEventPublisher


The ProfileCreatedEventPublisher works as a glue between the producer of the event of profile created and put them in a deque where a Consumer takes them and as a FluxSink of ProfileCreatedEvents will be available for everyone that uses this Consumer.
Service Send Event and WebSocket endPoint




@Component
public class ProfileCreatedEventPublisher implements ApplicationListener<ProfileCreatedEvent>, Consumer<FluxSink<ProfileCreatedEvent>> {

private final BlockingDeque<ProfileCreatedEvent> profileEventsDeque = new LinkedBlockingDeque<>();
private final Executor executor;

public ProfileCreatedEventPublisher( Executor executor) {
this.executor = executor;
}

@Override
public void accept(FluxSink<ProfileCreatedEvent> profileCreatedEventFluxSink) {
this.executor.execute(new Runnable() {
@Override
public void run() {
while (true){
try {
ProfileCreatedEvent profileCreatedEvent = profileEventsDeque.take();
profileCreatedEventFluxSink.next(profileCreatedEvent);
} catch (InterruptedException e) {
ReflectionUtils.rethrowRuntimeException(e);
}
}
}
});
}

@Override
public void onApplicationEvent(ProfileCreatedEvent profileCreatedEvent) {
this.profileEventsDeque.offer(profileCreatedEvent);
}
}

The Test of the ServiceSendEventControler by http browser


Publish the event by WebSocket to an html client


Add WebSocket Code
    
@Configuration
public class WebSocketConfiguration {
private final ProfileCreatedEventPublisher profileCreatedEventPublisher;

public WebSocketConfiguration(ProfileCreatedEventPublisher profileCreatedEventPublisher) {
this.profileCreatedEventPublisher = profileCreatedEventPublisher;
}

@Bean
WebSocketHandler handler() {
Flux<ProfileCreatedEvent> share = Flux.create(profileCreatedEventPublisher).share();
return new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession wsSession) {
Flux<WebSocketMessage> map = share.map(profileCreatedEvent -> {

return (profileCreatedEvent);

})
.map(json -> wsSession.textMessage(json.toString()));
return wsSession.send(map);
}
};
}

@Bean
HandlerMapping handlerMapping() {
return new SimpleUrlHandlerMapping() {
{
setOrder(10);
setUrlMap(Collections.singletonMap("/ws/profiles", handler()));
}
};
}

@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

The html client

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Profile notification Client</title>
</head>
<body>
<script>
var socket = new WebSocket('ws://localhost:8080/ws/profiles');
socket.addEventListener('message', function (ev) {
window.alert('message from server: ' + ev.data);
})
</script>
</body>
</html>
Running the code, browser listening the webSocket endpoint and creating the profile created event.




 Now convert to json text the message.

@Configuration
public class WebSocketConfiguration {
private final ProfileCreatedEventPublisher profileCreatedEventPublisher;
private final ObjectMapper objectMapper;

@SneakyThrows
private String jsonFrom(ProfileCreatedEvent profileCreatedEvent) {
return objectMapper.writeValueAsString(profileCreatedEvent);
}

public WebSocketConfiguration(ProfileCreatedEventPublisher profileCreatedEventPublisher, ObjectMapper objectMapper) {
this.profileCreatedEventPublisher = profileCreatedEventPublisher;
this.objectMapper = objectMapper;
}

@Bean
WebSocketHandler handler() {
Flux<ProfileCreatedEvent> share = Flux.create(profileCreatedEventPublisher).share();
return session -> {
Flux<WebSocketMessage> map = share
.map(this::jsonFrom)
.map(session::textMessage);
return session.send(map);
};
}

@Bean
HandlerMapping handlerMapping() {
return new SimpleUrlHandlerMapping() {
{
setOrder(10);
setUrlMap(Collections.singletonMap("/ws/profiles", handler()));
}
};
}

@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

Running the code, browser listening the webSocket endpoint and creating the profile created event.


Browser inspector



html is upgraded to websocket protocol.

eot

viernes, 18 de marzo de 2022

Project Reactor Essentials 3/3

 Project Reactor Essentials 3/3




Operators Test


@Test
public void subscribeOnSchedulersSingleTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 4)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
Output:

[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread single-1

@Test
public void subscribeOnSchedulersBoundedElasticTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 4)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
Output:

[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-1

@Test
public void publishOnSchedulersSingleTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 6)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6)
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread main
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread single-1
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread main
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread single-1
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread main
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread single-1

@Test
public void publishOnSchedulersBoundedElasticTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 6)
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6)
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread main
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-2
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread main
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread main
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-1
[main] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread main
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-1

@Test
public void multiplePublishOnSchedulersTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 8)
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("4th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6, 7, 8)
.verifyComplete();
}
Output:

[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread single-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 1 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 2 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 3 - on thread single-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread boundedElastic-2
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread single-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 8 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 7 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 2nd map - 8 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 7 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 4 - on thread single-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 8 - on thread boundedElastic-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 5 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 6 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 7 - on thread single-1
[single-1] INFO com.bext.reactor.OperatorsTest - 4th map - 8 - on thread single-1

@Test
public void multipleSubscribeOnSchedulersTest() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 8)
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("4th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6, 7, 8)
.verifyComplete();
}
Output:

[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 3 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 5 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 6 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 7 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 8 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 8 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 3th map - 8 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 4th map - 8 - on thread boundedElastic-2

@Test
public void multiplePublishOnSubscribeOnSchedulers2Test() {
Flux<Integer> fluxSubscribeOn = Flux.range(1, 8)
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("1st map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("2nd map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
log.info("3th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("4th map - {} - on thread {}", i, Thread.currentThread().getName());
return i;
});

StepVerifier.create(fluxSubscribeOn)
.expectSubscription()
.expectNext(1, 2, 3, 4, 5, 6, 7, 8)
.verifyComplete();
}
Output:

[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 1 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 2 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 3 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 1 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 3 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 1 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 4 - on thread boundedElastic-2
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 4 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 5 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 2 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 5 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 3 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 6 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 3 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 6 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 4 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 4 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 7 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 5 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 7 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 5 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 1st map - 8 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 6 - on thread boundedElastic-1
[boundedElastic-2] INFO com.bext.reactor.OperatorsTest - 2nd map - 8 - on thread boundedElastic-2
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 6 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 7 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 7 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 3th map - 8 - on thread boundedElastic-1
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - 4th map - 8 - on thread boundedElastic-1

@Test
public void subscribeOnIOTest() {
Mono<List<String>> listMono = Mono.fromCallable(() -> Files.readAllLines(Path.of("data.txt")))
.log()
.subscribeOn(Schedulers.boundedElastic());

StepVerifier.create(listMono)
.expectSubscription()
.thenConsumeWhile(lines -> {
Assertions.assertFalse(lines.isEmpty());
log.info("lines size: {}", lines.size());
return true;
})
.verifyComplete();
}
Output:

[boundedElastic-1] INFO reactor.Mono.Callable.1 - | onSubscribe([Fuseable] Operators.MonoSubscriber)
[boundedElastic-1] INFO reactor.Mono.Callable.1 - | request(unbounded)
[boundedElastic-1] INFO reactor.Mono.Callable.1 - | onNext([first line, second line, third line, fourth line end.])
[boundedElastic-1] INFO com.bext.reactor.OperatorsTest - lines size: 4
[boundedElastic-1] INFO reactor.Mono.Callable.1 - | onComplete()

@Test
public void switchIfEmptyTest() {
Flux<Object> flux = fluxEmpty()
.switchIfEmpty(Flux.just("It", "was", "empty", "but", "now", "not", "anymore"))
.log();

StepVerifier.create(flux)
.expectSubscription()
.expectNext("It", "was", "empty", "but", "now", "not", "anymore")
.expectComplete()
.verify();

}

private Flux<Object> fluxEmpty() {
return Flux.empty();
}


Output:

[main] INFO reactor.Flux.SwitchIfEmpty.1 - onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - request(unbounded)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(It)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(was)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(empty)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(but)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(now)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(not)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onNext(anymore)
[main] INFO reactor.Flux.SwitchIfEmpty.1 - onComplete()

@Test
public void deferTest() throws InterruptedException {
Mono<Long> monoTick = Mono.just(System.currentTimeMillis());
Mono<Long> defer = Mono.defer(() -> Mono.just(System.currentTimeMillis()));

defer.subscribe(tick -> log.info("tick {}", tick));
Thread.sleep(100);
defer.subscribe(tick -> log.info("tick {}", tick));
Thread.sleep(100);
defer.subscribe(tick -> log.info("tick {}", tick));
Thread.sleep(100);
defer.subscribe(tick -> log.info("tick {}", tick));

AtomicLong atomicLong = new AtomicLong();
defer.subscribe(atomicLong::set);
Assertions.assertTrue(atomicLong.get() > 0);
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331088
[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331197
[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331305
[main] INFO com.bext.reactor.OperatorsTest - tick 1647630331414

@Test
public void fluxConcatTest() {
Flux<String> fluxA = Flux.just("a", "b");
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.concat(fluxA, fluxB).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a", "b", "c", "d")
.expectComplete()
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(b)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(c)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(d)
[main] INFO reactor.Flux.ConcatArray.1 - onComplete()

@Test
public void fluxConcatWithTest() {
Flux<String> fluxA = Flux.just("a", "b");
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcatWith = fluxA.concatWith(fluxB).log();

StepVerifier.create(fluxConcatWith)
.expectSubscription()
.expectNext("a", "b", "c", "d")
.expectComplete()
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(b)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(c)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(d)
[main] INFO reactor.Flux.ConcatArray.1 - onComplete()

@Test
public void fluxCombineLatestTest() throws InterruptedException {

Flux<String> fluxA = Flux.just("a", "b", "c", "d").delayElements(Duration.ofMillis(100)).log();
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190)).log();

Flux<String> fluxCombineLatest = Flux.combineLatest(fluxA, fluxB, (fa, fb) -> fa.toUpperCase() + fb);

fluxCombineLatest.subscribe(log::info);

StepVerifier.create(fluxCombineLatest)
.expectSubscription()
.expectNext("A1", "B1", "C1", "C2", "D2")
.expectComplete()
.verify();

Thread.sleep(500);
}
Output:

[main] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.1 - request(32)
[main] INFO reactor.Flux.ConcatMap.2 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.2 - request(32)
[main] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.1 - request(32)
[main] INFO reactor.Flux.ConcatMap.2 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
[main] INFO reactor.Flux.ConcatMap.2 - request(32)
[parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(a)
[parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(a)
[parallel-2] INFO reactor.Flux.ConcatMap.2 - onNext(1)
[parallel-2] INFO com.bext.reactor.OperatorsTest - A1
[parallel-4] INFO reactor.Flux.ConcatMap.2 - onNext(1)
[parallel-5] INFO reactor.Flux.ConcatMap.1 - onNext(b)
[parallel-5] INFO com.bext.reactor.OperatorsTest - B1
[parallel-6] INFO reactor.Flux.ConcatMap.1 - onNext(b)
[parallel-9] INFO reactor.Flux.ConcatMap.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - C1
[parallel-10] INFO reactor.Flux.ConcatMap.1 - onNext(c)
[parallel-8] INFO reactor.Flux.ConcatMap.2 - onNext(2)
[parallel-7] INFO reactor.Flux.ConcatMap.2 - onNext(2)
[parallel-7] INFO com.bext.reactor.OperatorsTest - C2
[parallel-8] INFO reactor.Flux.ConcatMap.2 - onComplete()
[parallel-7] INFO reactor.Flux.ConcatMap.2 - onComplete()
[parallel-11] INFO reactor.Flux.ConcatMap.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - D2
[parallel-11] INFO reactor.Flux.ConcatMap.1 - onComplete()
[parallel-11] INFO reactor.Flux.ConcatMap.1 - cancel()
[parallel-11] INFO reactor.Flux.ConcatMap.2 - cancel()
[parallel-12] INFO reactor.Flux.ConcatMap.1 - onNext(d)
[parallel-12] INFO reactor.Flux.ConcatMap.1 - onComplete()
[parallel-12] INFO reactor.Flux.ConcatMap.1 - cancel()
[parallel-12] INFO reactor.Flux.ConcatMap.2 - cancel()

@Test
public void fluxMergeTest() throws InterruptedException {
Flux<String> fluxA = Flux.just("a", "b","c", "d").delayElements(Duration.ofMillis(100));
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190));

Flux<String> fluxMerge = Flux.merge(fluxA, fluxB).log();

fluxMerge.subscribe(log::info);

StepVerifier.create(fluxMerge)
.expectSubscription()
.expectNext("a","1","b","c","2","d")
.verifyComplete();

Thread.sleep(600);
}
Output:

[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-1] INFO com.bext.reactor.OperatorsTest - a
[parallel-3] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-2] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-2] INFO com.bext.reactor.OperatorsTest - 1
[parallel-4] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-5] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-5] INFO com.bext.reactor.OperatorsTest - b
[parallel-6] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-9] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - c
[parallel-10] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-7] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-7] INFO com.bext.reactor.OperatorsTest - 2
[parallel-8] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-11] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - d
[parallel-11] INFO reactor.Flux.Merge.1 - onComplete()
[parallel-12] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-12] INFO reactor.Flux.Merge.1 - onComplete()

@Test
public void fluxMergeWithTest() throws InterruptedException {
Flux<String> fluxA = Flux.just("a", "b","c", "d").delayElements(Duration.ofMillis(100));
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190));

Flux<String> fluxAMergeWithB = fluxA.mergeWith(fluxB).log();
Flux<String> fluxBMergeWithA = fluxB.mergeWith(fluxA).log();

fluxAMergeWithB.subscribe(log::info);

StepVerifier.create( fluxAMergeWithB)
.expectSubscription()
.expectNext("a","1","b","c","2","d")
.verifyComplete();

StepVerifier.create( fluxBMergeWithA)
.expectSubscription()
.expectNext("a","1","b","c","2","d")
.verifyComplete();

Thread.sleep(600);
}
Output:

[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-1] INFO com.bext.reactor.OperatorsTest - a
[parallel-3] INFO reactor.Flux.Merge.1 - onNext(a)
[parallel-2] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-2] INFO com.bext.reactor.OperatorsTest - 1
[parallel-5] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-4] INFO reactor.Flux.Merge.1 - onNext(1)
[parallel-5] INFO com.bext.reactor.OperatorsTest - b
[parallel-6] INFO reactor.Flux.Merge.1 - onNext(b)
[parallel-9] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - c
[parallel-10] INFO reactor.Flux.Merge.1 - onNext(c)
[parallel-7] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-7] INFO com.bext.reactor.OperatorsTest - 2
[parallel-8] INFO reactor.Flux.Merge.1 - onNext(2)
[parallel-11] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - d
[parallel-11] INFO reactor.Flux.Merge.1 - onComplete()
[parallel-12] INFO reactor.Flux.Merge.1 - onNext(d)
[parallel-12] INFO reactor.Flux.Merge.1 - onComplete()
[main] INFO reactor.Flux.Merge.2 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.2 - request(unbounded)
[parallel-2] INFO reactor.Flux.Merge.2 - onNext(a)
[parallel-1] INFO reactor.Flux.Merge.2 - onNext(1)
[parallel-3] INFO reactor.Flux.Merge.2 - onNext(b)
[parallel-5] INFO reactor.Flux.Merge.2 - onNext(c)
[parallel-4] INFO reactor.Flux.Merge.2 - onNext(2)
[parallel-6] INFO reactor.Flux.Merge.2 - onNext(d)
[parallel-6] INFO reactor.Flux.Merge.2 - onComplete()

@Test
public void fluxMergeSecuentialTest() throws InterruptedException {
Flux<String> fluxA = Flux.just("a", "b","c", "d").delayElements(Duration.ofMillis(100));
Flux<String> fluxB = Flux.just("1", "2").delayElements(Duration.ofMillis(190));

Flux<String> fluxMerge = Flux.mergeSequential(fluxA, fluxB).log();

fluxMerge.subscribe(log::info);

StepVerifier.create(fluxMerge)
.expectSubscription()
.expectNext("a","b","c","d","1","2")
.verifyComplete();

Thread.sleep(600);
}
Output:

[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(a)
[parallel-1] INFO com.bext.reactor.OperatorsTest - a
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(a)
[parallel-5] INFO reactor.Flux.MergeSequential.1 - onNext(b)
[parallel-5] INFO com.bext.reactor.OperatorsTest - b
[parallel-6] INFO reactor.Flux.MergeSequential.1 - onNext(b)
[parallel-9] INFO reactor.Flux.MergeSequential.1 - onNext(c)
[parallel-9] INFO com.bext.reactor.OperatorsTest - c
[parallel-10] INFO reactor.Flux.MergeSequential.1 - onNext(c)
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onNext(d)
[parallel-11] INFO com.bext.reactor.OperatorsTest - d
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onNext(1)
[parallel-11] INFO com.bext.reactor.OperatorsTest - 1
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onNext(2)
[parallel-11] INFO com.bext.reactor.OperatorsTest - 2
[parallel-11] INFO reactor.Flux.MergeSequential.1 - onComplete()
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onNext(d)
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onNext(1)
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onNext(2)
[parallel-12] INFO reactor.Flux.MergeSequential.1 - onComplete()

@Test
public void fluxConcatWithErrorTest() {
Flux<String> fluxA = Flux.just("a", "b")
.map( t -> {
if (t.equals("b")){
throw new IllegalArgumentException();
}
return t;
});
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.concat(fluxA, fluxB).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a")
.expectError(IllegalArgumentException.class)
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.IllegalArgumentException)
[main] ERROR reactor.Flux.ConcatArray.1 - 
java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxConcatWithErrorTest$34(OperatorsTest.java:397)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2191)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2065)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8389)
at..... org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

@Test
public void fluxConcatDelayErrorTest() {
Flux<String> fluxA = Flux.just("a", "b")
.map( t -> {
if (t.equals("b")){
throw new IllegalArgumentException();
}
return t;
});
Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.concatDelayError(fluxA, fluxB).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a","c","d")
.expectError(IllegalArgumentException.class)
.verify();
}
Output:

[main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArrayDelayErrorSubscriber)
[main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(a)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(c)
[main] INFO reactor.Flux.ConcatArray.1 - onNext(d)
[main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.IllegalArgumentException)
[main] ERROR reactor.Flux.ConcatArray.1 - 
java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxConcatDelayErrorTest$35(OperatorsTest.java:417)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2191)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2065)
......
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

@Test
public void fluxMergeDelayErrorTest() {
Flux<String> fluxA = Flux.just("a", "b")
.map( t -> {
if (t.equals("b")){
throw new IllegalArgumentException();
}
return t;
}).doOnError( err -> log.error("fluxA.doOnError reporting error {}", err.getMessage()));

Flux<String> fluxB = Flux.just("c", "d");

Flux<String> fluxConcat = Flux.mergeDelayError(1, fluxA, fluxB, fluxA).log();

StepVerifier.create(fluxConcat)
.expectSubscription()
.expectNext("a","c","d","a")
.expectError()
.verify();
}
Output:

[main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.Merge.1 - request(unbounded)
[main] INFO reactor.Flux.Merge.1 - onNext(a)
[main] ERROR com.bext.reactor.OperatorsTest - fluxA.doOnError reporting error null
[main] INFO reactor.Flux.Merge.1 - onNext(c)
[main] INFO reactor.Flux.Merge.1 - onNext(d)
[main] INFO reactor.Flux.Merge.1 - onNext(a)
[main] ERROR com.bext.reactor.OperatorsTest - fluxA.doOnError reporting error null
[main] ERROR reactor.Flux.Merge.1 - onError(reactor.core.Exceptions$CompositeException: Multiple exceptions)
[main] ERROR reactor.Flux.Merge.1 - 
reactor.core.Exceptions$CompositeException: Multiple exceptions
at reactor.core.Exceptions.multiple(Exceptions.java:121)
at reactor.core.Exceptions.addThrowable(Exceptions.java:93)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:851)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:983)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
at reactor.co........ com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Suppressed: java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxMergeDelayErrorTest$36(OperatorsTest.java:437)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
... 87 more
Suppressed: java.lang.IllegalArgumentException
at com.bext.reactor.OperatorsTest.lambda$fluxMergeDelayErrorTest$36(OperatorsTest.java:437)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
... 87 more

@Test
public void fluxMapTest() {
Flux<String> flux = Flux.just("a", "b");
Flux<Flux<String>> fluxFluxString = flux.map(String::toUpperCase)
.map(this::findByName)
.log();

fluxFluxString.subscribe(o -> o.toStream().forEach(s -> log.info("{}", s)));
}
public Flux<String> findByName(String name){
return name.equals("A") ? Flux.just("Abel","Andrea").delayElements(Duration.ofMillis(100)) : Flux.just("Beto","Bety");

}
Output:

[main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
[main] INFO reactor.Flux.MapFuseable.1 - | onNext(FluxConcatMap)
[main] INFO com.bext.reactor.OperatorsTest - Abel
[main] INFO com.bext.reactor.OperatorsTest - Andrea
[main] INFO reactor.Flux.MapFuseable.1 - | onNext(FluxArray)
[main] INFO com.bext.reactor.OperatorsTest - Beto
[main] INFO com.bext.reactor.OperatorsTest - Bety
[main] INFO reactor.Flux.MapFuseable.1 - | onComplete()

@Test
public void fluxFlatMapTest() throws InterruptedException{
Flux<String> flux = Flux.just("a","b");
Flux<String> fluxString = flux.map(String::toUpperCase)
.flatMap( this::findByName)
.log();

fluxString.subscribe( o -> log.info("{}", o));

StepVerifier.create(fluxString)
.expectSubscription()
.expectNext("Beto","Bety","Abel","Andrea")
.verifyComplete();

Thread.sleep(500);
}
Output:

[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(Beto)
[main] INFO com.bext.reactor.OperatorsTest - Beto
[main] INFO reactor.Flux.FlatMap.1 - onNext(Bety)
[main] INFO com.bext.reactor.OperatorsTest - Bety
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(Beto)
[main] INFO reactor.Flux.FlatMap.1 - onNext(Bety)
[parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(Abel)
[parallel-1] INFO com.bext.reactor.OperatorsTest - Abel
[parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(Abel)
[parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(Andrea)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Andrea
[parallel-3] INFO reactor.Flux.FlatMap.1 - onComplete()
[parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(Andrea)
[parallel-4] INFO reactor.Flux.FlatMap.1 - onComplete()

@Test
public void fluxFlatMapSequentialTest() throws InterruptedException{
Flux<String> flux = Flux.just("a","b");
Flux<String> fluxString = flux.map(String::toUpperCase)
.flatMapSequential( this::findByName)
.log();

fluxString.subscribe( o -> log.info("{}", o));

StepVerifier.create(fluxString)
.expectSubscription()
.expectNext("Abel","Andrea","Beto","Bety")
.verifyComplete();

Thread.sleep(500);
}

public Flux<String> findByName(String name){
return name.equals("A") ? Flux.just("Abel","Andrea").delayElements(Duration.ofMillis(100)) : Flux.just("Beto","Bety");

}
Output:

[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[main] INFO reactor.Flux.MergeSequential.1 - onSubscribe(FluxMergeSequential.MergeSequentialMain)
[main] INFO reactor.Flux.MergeSequential.1 - request(unbounded)
[parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(Abel)
[parallel-1] INFO com.bext.reactor.OperatorsTest - Abel
[parallel-2] INFO reactor.Flux.MergeSequential.1 - onNext(Abel)
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(Andrea)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Andrea
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(Beto)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Beto
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onNext(Bety)
[parallel-3] INFO com.bext.reactor.OperatorsTest - Bety
[parallel-3] INFO reactor.Flux.MergeSequential.1 - onComplete()
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onNext(Andrea)
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onNext(Beto)
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onNext(Bety)
[parallel-4] INFO reactor.Flux.MergeSequential.1 - onComplete()

@Test
public void fluxZipTest(){
Flux<String> fluxAnimal = Flux.just("dog", "bird");
Flux<String> fluxAction = Flux.just("run", "fly");
Flux<Integer> fluxLegs = Flux.just(4, 2);

Flux<Animal> fluxCharacteristics = Flux.zip(fluxAnimal, fluxAction, fluxLegs)
.flatMap(tuple -> Flux.just(new Animal( tuple.getT1(), tuple.getT2(), tuple.getT3())));

fluxCharacteristics.subscribe( animal -> log.info(animal.toString()));

StepVerifier.create(fluxCharacteristics)
.expectSubscription()
.expectNext(new Animal("dog", "run", 4), new Animal("bird","fly",2))
.verifyComplete();
}
@AllArgsConstructor
@Getter
@ToString
@EqualsAndHashCode
class Animal{
private String name;
private String action;
private int legs;
}

Output:

[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=dog, action=run, legs=4)
[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=bird, action=fly, legs=2)

@Test
public void fluxZipWithTest(){
Flux<String> fluxAnimal = Flux.just("dog", "bird");
Flux<String> fluxAction = Flux.just("run", "fly");
Flux<Integer> fluxLegs = Flux.just(4, 2);

Flux<Animal> fluxCharacteristics = fluxAnimal.zipWith(fluxLegs)
.flatMap(tuple -> Flux.just(new Animal( tuple.getT1(), null, tuple.getT2())));

fluxCharacteristics.subscribe( animal -> log.info(animal.toString()));

StepVerifier.create(fluxCharacteristics)
.expectSubscription()
.expectNext(new Animal("dog", null, 4), new Animal("bird",null,2))
.verifyComplete();
}
Output:

[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=dog, action=null, legs=4)
[main] INFO com.bext.reactor.OperatorsTest - OperatorsTest.Animal(name=bird, action=null, legs=2)

eot