miércoles, 30 de marzo de 2022

Spring WebFlux Profile Service

Spring WebFlux Profile Service


Create a spring boot application with reactiveweb, lombok, reactiveMongoDB, Okta with springinitializr

Initialize the Repository with some data when ApplicationReadyEvent through ApplicationListener



Class Initializer init the repository due the ApplicationReadyEvent.
@Log4j2
@Component
public class Initializer implements ApplicationListener<ApplicationReadyEvent> {

private final ProfileRepository profileRepository;

public Initializer(ProfileRepository profileRepository) {
this.profileRepository = profileRepository;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Flux<Profile> profiles = Flux
.just("Hugo","Paco","Luis","Daisy")
.map( email -> new Profile(null, email))
.flatMap( this.profileRepository::save);

this.profileRepository.deleteAll()
.thenMany( profiles)
.thenMany( this.profileRepository.findAll())
.subscribe(log::info);
}
}

When runs

2022-03-30 11:51:41.987  INFO 11260 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017
2022-03-30 11:51:41.987  INFO 11260 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017
2022-03-30 11:51:41.989  INFO 11260 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=34528800}
2022-03-30 11:51:42.057  INFO 11260 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-03-30 11:51:42.067  INFO 11260 --- [           main] c.b.p.ProfileserviceApplication          : Started ProfileserviceApplication in 1.635 seconds (JVM running for 2.77)
2022-03-30 11:51:42.137  INFO 11260 --- [ntLoopGroup-3-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:3}] to localhost:27017
2022-03-30 11:51:42.339  INFO 11260 --- [ntLoopGroup-3-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27017
2022-03-30 11:51:42.339  INFO 11260 --- [ntLoopGroup-3-5] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:5}] to localhost:27017
2022-03-30 11:51:42.379  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01b, email=Hugo)
2022-03-30 11:51:42.380  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01e, email=Daisy)
2022-03-30 11:51:42.380  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01d, email=Luis)
2022-03-30 11:51:42.380  INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer   : Profile(id=624498aed79edb6e1c86d01c, email=Paco)

Using new Subscriber<> on .subscribe(...)

this.profileRepository.deleteAll()
.thenMany(profiles)
.thenMany(this.profileRepository.findAll())
.subscribe(new Subscriber<Profile>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe: {}", s);
s.request( profiles.count().block());
}

@Override
public void onNext(Profile profile) {
log.info("onNext: {}", profile);
}

@Override
public void onError(Throwable t) {
log.error("onError");
}

@Override
public void onComplete() {
log.info("onComplete: Completed!");
}
});
}

When Runs .subscribe(new Subscriber(....) ...

2022-03-30 12:40:06.846  INFO 9676 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:45}] to localhost:27017
2022-03-30 12:40:06.846  INFO 9676 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:44}] to localhost:27017
2022-03-30 12:40:06.847  INFO 9676 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=36455100}
2022-03-30 12:40:06.905  INFO 9676 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-03-30 12:40:06.912  INFO 9676 --- [           main] c.b.p.ProfileserviceApplication          : Started ProfileserviceApplication in 1.55 seconds (JVM running for 2.239)
2022-03-30 12:40:06.940  INFO 9676 --- [           main] c.bext.profileservice.init.Initializer   : onSubscribe: reactor.core.publisher.StrictSubscriber@546ed2a0
2022-03-30 12:40:06.993  INFO 9676 --- [ntLoopGroup-3-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:47}] to localhost:27017
2022-03-30 12:40:06.993  INFO 9676 --- [ntLoopGroup-3-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:46}] to localhost:27017
2022-03-30 12:40:07.004  INFO 9676 --- [ntLoopGroup-3-5] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:48}] to localhost:27017
2022-03-30 12:40:07.004  INFO 9676 --- [ntLoopGroup-3-6] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:6, serverValue:49}] to localhost:27017
2022-03-30 12:40:07.062  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b1, email=Paco)
2022-03-30 12:40:07.062  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b2, email=Luis)
2022-03-30 12:40:07.063  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b0, email=Hugo)
2022-03-30 12:40:07.063  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onNext: Profile(id=6244a40744fe4b14684364b3, email=Daisy)
2022-03-30 12:40:07.063  INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : onComplete: Completed!

Using new Consumer on .suscribe(...)


this.profileRepository.deleteAll()
.thenMany( profiles)
.thenMany( this.profileRepository.findAll())
.subscribe(new Consumer<Profile>() {
@Override
public void accept(Profile profile) {
log.info("Consumer.accept: {}" , profile);
}
});
}
When runs

2022-03-30 12:50:05.221  INFO 15496 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:56}] to localhost:27017
2022-03-30 12:50:05.221  INFO 15496 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:55}] to localhost:27017
2022-03-30 12:50:05.222  INFO 15496 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=35209600}
2022-03-30 12:50:05.279  INFO 15496 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-03-30 12:50:05.286  INFO 15496 --- [           main] c.b.p.ProfileserviceApplication          : Started ProfileserviceApplication in 1.553 seconds (JVM running for 2.215)
2022-03-30 12:50:05.349  INFO 15496 --- [ntLoopGroup-3-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:57}] to localhost:27017
2022-03-30 12:50:05.400  INFO 15496 --- [ntLoopGroup-3-5] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:59}] to localhost:27017
2022-03-30 12:50:05.401  INFO 15496 --- [ntLoopGroup-3-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:58}] to localhost:27017
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81d8, email=Hugo)
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81db, email=Daisy)
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81da, email=Luis)
2022-03-30 12:50:05.437  INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer   : Consumer.accept: Profile(id=6244a65dadd7a11c908f81d9, email=Paco)


Event Sourcing 


When  a new record is created an event is triggered to be communicated with other component in the same JVM, some like to CQRS.

An ApplicationEvent with the profile is needed


public class ProfileCreatedEvent extends ApplicationEvent {
public ProfileCreatedEvent(Profile source) {
super(source);
}
}

And is activated when the profile is created.

...
public Mono<Profile> create(String email){
return this.profileRepository.save( new Profile(null, email))
.doOnSuccess(profile -> this.applicationEventPublisher.publishEvent( new ProfileCreatedEvent( profile)));
}
...

The Controller

The ProfileController to test the functionality of the service

@RestController
@RequestMapping("/profiles")
public class ProfileController {

private final ProfileService profileService;

public ProfileController(ProfileService profileService) {
this.profileService = profileService;
}

@GetMapping
Flux<Profile> all(){
return this.profileService.all();
}

@RequestMapping("/{id}")
Mono<Profile> byId(@PathVariable("id") String id){
return this.profileService.byId(id);
}

@PostMapping
Mono<ResponseEntity<Object>> create(@RequestBody Profile profile) {
return this.profileService.create( profile.getEmail())
.map(profileSaved -> ResponseEntity.created(URI.create("/profiles/" + profileSaved.getId()))
.build());

}
}

Test the Controller


To test the profile controller is necessary disable the okta dependency, otherwise a login window in browser is shown.

<!--
<dependency>
<groupId>com.okta.spring</groupId>
<artifactId>okta-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
-->

Run the application and check for the response of the endpoints

C:\WINDOWS\system32>curl http://localhost:8080/profiles
[{"id":"6244d52fe3281a6063300c8f","email":"Hugo"},{"id":"6244d52fe3281a6063300c92","email":"Daisy"},{"id":"6244d52fe3281a6063300c91","email":"Luis"},{"id":"6244d52fe3281a6063300c90","email":"Paco"}]

C:\WINDOWS\system32>curl http://localhost:8080/profiles/6244d52fe3281a6063300c92
{"id":"6244d52fe3281a6063300c92","email":"Daisy"}

bext@DESKTOP-NLF0058 MINGW64 /e/Descargas/Reactor/Imagenes
$ curl \
> -X POST \
> -H "Content-Type: application/json" \
> -d '{"email":"newProfile"}' http://localhost:8080/profiles
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18    0     0  100    18      0    818 --:--:-- --:--:-- --:--:--   857

C:\WINDOWS\system32>curl http://localhost:8080/profiles/
[{"id":"6244d52fe3281a6063300c8f","email":"Hugo"},{"id":"6244d52fe3281a6063300c92","email":"Daisy"},{"id":"6244d52fe3281a6063300c91","email":"Luis"},{"id":"6244d52fe3281a6063300c90","email":"Paco"},{"id":"6244df9ee3281a6063300c94","email":"newProfile"}]

ProfileCreatedEventPublisher


The ProfileCreatedEventPublisher works as a glue between the producer of the event of profile created and put them in a deque where a Consumer takes them and as a FluxSink of ProfileCreatedEvents will be available for everyone that uses this Consumer.
Service Send Event and WebSocket endPoint




@Component
public class ProfileCreatedEventPublisher implements ApplicationListener<ProfileCreatedEvent>, Consumer<FluxSink<ProfileCreatedEvent>> {

private final BlockingDeque<ProfileCreatedEvent> profileEventsDeque = new LinkedBlockingDeque<>();
private final Executor executor;

public ProfileCreatedEventPublisher( Executor executor) {
this.executor = executor;
}

@Override
public void accept(FluxSink<ProfileCreatedEvent> profileCreatedEventFluxSink) {
this.executor.execute(new Runnable() {
@Override
public void run() {
while (true){
try {
ProfileCreatedEvent profileCreatedEvent = profileEventsDeque.take();
profileCreatedEventFluxSink.next(profileCreatedEvent);
} catch (InterruptedException e) {
ReflectionUtils.rethrowRuntimeException(e);
}
}
}
});
}

@Override
public void onApplicationEvent(ProfileCreatedEvent profileCreatedEvent) {
this.profileEventsDeque.offer(profileCreatedEvent);
}
}

The Test of the ServiceSendEventControler by http browser


Publish the event by WebSocket to an html client


Add WebSocket Code
    
@Configuration
public class WebSocketConfiguration {
private final ProfileCreatedEventPublisher profileCreatedEventPublisher;

public WebSocketConfiguration(ProfileCreatedEventPublisher profileCreatedEventPublisher) {
this.profileCreatedEventPublisher = profileCreatedEventPublisher;
}

@Bean
WebSocketHandler handler() {
Flux<ProfileCreatedEvent> share = Flux.create(profileCreatedEventPublisher).share();
return new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession wsSession) {
Flux<WebSocketMessage> map = share.map(profileCreatedEvent -> {

return (profileCreatedEvent);

})
.map(json -> wsSession.textMessage(json.toString()));
return wsSession.send(map);
}
};
}

@Bean
HandlerMapping handlerMapping() {
return new SimpleUrlHandlerMapping() {
{
setOrder(10);
setUrlMap(Collections.singletonMap("/ws/profiles", handler()));
}
};
}

@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

The html client

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Profile notification Client</title>
</head>
<body>
<script>
var socket = new WebSocket('ws://localhost:8080/ws/profiles');
socket.addEventListener('message', function (ev) {
window.alert('message from server: ' + ev.data);
})
</script>
</body>
</html>
Running the code, browser listening the webSocket endpoint and creating the profile created event.




 Now convert to json text the message.

@Configuration
public class WebSocketConfiguration {
private final ProfileCreatedEventPublisher profileCreatedEventPublisher;
private final ObjectMapper objectMapper;

@SneakyThrows
private String jsonFrom(ProfileCreatedEvent profileCreatedEvent) {
return objectMapper.writeValueAsString(profileCreatedEvent);
}

public WebSocketConfiguration(ProfileCreatedEventPublisher profileCreatedEventPublisher, ObjectMapper objectMapper) {
this.profileCreatedEventPublisher = profileCreatedEventPublisher;
this.objectMapper = objectMapper;
}

@Bean
WebSocketHandler handler() {
Flux<ProfileCreatedEvent> share = Flux.create(profileCreatedEventPublisher).share();
return session -> {
Flux<WebSocketMessage> map = share
.map(this::jsonFrom)
.map(session::textMessage);
return session.send(map);
};
}

@Bean
HandlerMapping handlerMapping() {
return new SimpleUrlHandlerMapping() {
{
setOrder(10);
setUrlMap(Collections.singletonMap("/ws/profiles", handler()));
}
};
}

@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

Running the code, browser listening the webSocket endpoint and creating the profile created event.


Browser inspector



html is upgraded to websocket protocol.

eot

No hay comentarios:

Publicar un comentario