Spring WebFlux Profile Service
Create a spring boot application with reactiveweb, lombok, reactiveMongoDB, Okta with springinitializr
Initialize the Repository with some data when ApplicationReadyEvent through ApplicationListener
Class Initializer init the repository due the ApplicationReadyEvent.
@Log4j2
@Component
public class Initializer implements ApplicationListener<ApplicationReadyEvent> {
private final ProfileRepository profileRepository;
public Initializer(ProfileRepository profileRepository) {
this.profileRepository = profileRepository;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Flux<Profile> profiles = Flux
.just("Hugo","Paco","Luis","Daisy")
.map( email -> new Profile(null, email))
.flatMap( this.profileRepository::save);
this.profileRepository.deleteAll()
.thenMany( profiles)
.thenMany( this.profileRepository.findAll())
.subscribe(log::info);
}
}
When runs
2022-03-30 11:51:41.987 INFO 11260 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017
2022-03-30 11:51:41.987 INFO 11260 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017
2022-03-30 11:51:41.989 INFO 11260 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=34528800}
2022-03-30 11:51:42.057 INFO 11260 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
2022-03-30 11:51:42.067 INFO 11260 --- [ main] c.b.p.ProfileserviceApplication : Started ProfileserviceApplication in 1.635 seconds (JVM running for 2.77)
2022-03-30 11:51:42.137 INFO 11260 --- [ntLoopGroup-3-3] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:3}] to localhost:27017
2022-03-30 11:51:42.339 INFO 11260 --- [ntLoopGroup-3-4] org.mongodb.driver.connection : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27017
2022-03-30 11:51:42.339 INFO 11260 --- [ntLoopGroup-3-5] org.mongodb.driver.connection : Opened connection [connectionId{localValue:5, serverValue:5}] to localhost:27017
2022-03-30 11:51:42.379 INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer : Profile(id=624498aed79edb6e1c86d01b, email=Hugo)
2022-03-30 11:51:42.380 INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer : Profile(id=624498aed79edb6e1c86d01e, email=Daisy)
2022-03-30 11:51:42.380 INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer : Profile(id=624498aed79edb6e1c86d01d, email=Luis)
2022-03-30 11:51:42.380 INFO 11260 --- [ntLoopGroup-3-5] c.bext.profileservice.init.Initializer : Profile(id=624498aed79edb6e1c86d01c, email=Paco)
Using new Subscriber<> on .subscribe(...)
this.profileRepository.deleteAll()
.thenMany(profiles)
.thenMany(this.profileRepository.findAll())
.subscribe(new Subscriber<Profile>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe: {}", s);
s.request( profiles.count().block());
}
@Override
public void onNext(Profile profile) {
log.info("onNext: {}", profile);
}
@Override
public void onError(Throwable t) {
log.error("onError");
}
@Override
public void onComplete() {
log.info("onComplete: Completed!");
}
});
}
When Runs .subscribe(new Subscriber(....) ...
2022-03-30 12:40:06.846 INFO 9676 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:45}] to localhost:27017
2022-03-30 12:40:06.846 INFO 9676 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:44}] to localhost:27017
2022-03-30 12:40:06.847 INFO 9676 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=36455100}
2022-03-30 12:40:06.905 INFO 9676 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
2022-03-30 12:40:06.912 INFO 9676 --- [ main] c.b.p.ProfileserviceApplication : Started ProfileserviceApplication in 1.55 seconds (JVM running for 2.239)
2022-03-30 12:40:06.940 INFO 9676 --- [ main] c.bext.profileservice.init.Initializer : onSubscribe: reactor.core.publisher.StrictSubscriber@546ed2a0
2022-03-30 12:40:06.993 INFO 9676 --- [ntLoopGroup-3-4] org.mongodb.driver.connection : Opened connection [connectionId{localValue:4, serverValue:47}] to localhost:27017
2022-03-30 12:40:06.993 INFO 9676 --- [ntLoopGroup-3-3] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:46}] to localhost:27017
2022-03-30 12:40:07.004 INFO 9676 --- [ntLoopGroup-3-5] org.mongodb.driver.connection : Opened connection [connectionId{localValue:5, serverValue:48}] to localhost:27017
2022-03-30 12:40:07.004 INFO 9676 --- [ntLoopGroup-3-6] org.mongodb.driver.connection : Opened connection [connectionId{localValue:6, serverValue:49}] to localhost:27017
2022-03-30 12:40:07.062 INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : onNext: Profile(id=6244a40744fe4b14684364b1, email=Paco)
2022-03-30 12:40:07.062 INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : onNext: Profile(id=6244a40744fe4b14684364b2, email=Luis)
2022-03-30 12:40:07.063 INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : onNext: Profile(id=6244a40744fe4b14684364b0, email=Hugo)
2022-03-30 12:40:07.063 INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : onNext: Profile(id=6244a40744fe4b14684364b3, email=Daisy)
2022-03-30 12:40:07.063 INFO 9676 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : onComplete: Completed!
Using new Consumer on .suscribe(...)
this.profileRepository.deleteAll()
.thenMany( profiles)
.thenMany( this.profileRepository.findAll())
.subscribe(new Consumer<Profile>() {
@Override
public void accept(Profile profile) {
log.info("Consumer.accept: {}" , profile);
}
});
}
When runs
2022-03-30 12:50:05.221 INFO 15496 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:56}] to localhost:27017
2022-03-30 12:50:05.221 INFO 15496 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:55}] to localhost:27017
2022-03-30 12:50:05.222 INFO 15496 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=35209600}
2022-03-30 12:50:05.279 INFO 15496 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
2022-03-30 12:50:05.286 INFO 15496 --- [ main] c.b.p.ProfileserviceApplication : Started ProfileserviceApplication in 1.553 seconds (JVM running for 2.215)
2022-03-30 12:50:05.349 INFO 15496 --- [ntLoopGroup-3-3] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:57}] to localhost:27017
2022-03-30 12:50:05.400 INFO 15496 --- [ntLoopGroup-3-5] org.mongodb.driver.connection : Opened connection [connectionId{localValue:5, serverValue:59}] to localhost:27017
2022-03-30 12:50:05.401 INFO 15496 --- [ntLoopGroup-3-4] org.mongodb.driver.connection : Opened connection [connectionId{localValue:4, serverValue:58}] to localhost:27017
2022-03-30 12:50:05.437 INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : Consumer.accept: Profile(id=6244a65dadd7a11c908f81d8, email=Hugo)
2022-03-30 12:50:05.437 INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : Consumer.accept: Profile(id=6244a65dadd7a11c908f81db, email=Daisy)
2022-03-30 12:50:05.437 INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : Consumer.accept: Profile(id=6244a65dadd7a11c908f81da, email=Luis)
2022-03-30 12:50:05.437 INFO 15496 --- [ntLoopGroup-3-4] c.bext.profileservice.init.Initializer : Consumer.accept: Profile(id=6244a65dadd7a11c908f81d9, email=Paco)
Event Sourcing
When a new record is created an event is triggered to be communicated with other component in the same JVM, some like to CQRS.
An ApplicationEvent with the profile is needed
public class ProfileCreatedEvent extends ApplicationEvent {
public ProfileCreatedEvent(Profile source) {
super(source);
}
}
And is activated when the profile is created.
...
public Mono<Profile> create(String email){
return this.profileRepository.save( new Profile(null, email))
.doOnSuccess(profile -> this.applicationEventPublisher.publishEvent( new ProfileCreatedEvent( profile)));
}
...
The Controller
The ProfileController to test the functionality of the service
@RestController
@RequestMapping("/profiles")
public class ProfileController {
private final ProfileService profileService;
public ProfileController(ProfileService profileService) {
this.profileService = profileService;
}
@GetMapping
Flux<Profile> all(){
return this.profileService.all();
}
@RequestMapping("/{id}")
Mono<Profile> byId(@PathVariable("id") String id){
return this.profileService.byId(id);
}
@PostMapping
Mono<ResponseEntity<Object>> create(@RequestBody Profile profile) {
return this.profileService.create( profile.getEmail())
.map(profileSaved -> ResponseEntity.created(URI.create("/profiles/" + profileSaved.getId()))
.build());
}
}
Test the Controller
To test the profile controller is necessary disable the okta dependency, otherwise a login window in browser is shown.
<!--
<dependency>
<groupId>com.okta.spring</groupId>
<artifactId>okta-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
-->
Run the application and check for the response of the endpoints
C:\WINDOWS\system32>curl http://localhost:8080/profiles
[{"id":"6244d52fe3281a6063300c8f","email":"Hugo"},{"id":"6244d52fe3281a6063300c92","email":"Daisy"},{"id":"6244d52fe3281a6063300c91","email":"Luis"},{"id":"6244d52fe3281a6063300c90","email":"Paco"}]
C:\WINDOWS\system32>curl http://localhost:8080/profiles/6244d52fe3281a6063300c92
{"id":"6244d52fe3281a6063300c92","email":"Daisy"}
bext@DESKTOP-NLF0058 MINGW64 /e/Descargas/Reactor/Imagenes
$ curl \
> -X POST \
> -H "Content-Type: application/json" \
> -d '{"email":"newProfile"}' http://localhost:8080/profiles
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 0 0 100 18 0 818 --:--:-- --:--:-- --:--:-- 857
C:\WINDOWS\system32>curl http://localhost:8080/profiles/
[{"id":"6244d52fe3281a6063300c8f","email":"Hugo"},{"id":"6244d52fe3281a6063300c92","email":"Daisy"},{"id":"6244d52fe3281a6063300c91","email":"Luis"},{"id":"6244d52fe3281a6063300c90","email":"Paco"},{"id":"6244df9ee3281a6063300c94","email":"newProfile"}]
ProfileCreatedEventPublisher
The ProfileCreatedEventPublisher works as a glue between the producer of the event of profile created and put them in a deque where a Consumer takes them and as a FluxSink of ProfileCreatedEvents will be available for everyone that uses this Consumer.
Service Send Event and WebSocket endPoint
@Component
public class ProfileCreatedEventPublisher implements ApplicationListener<ProfileCreatedEvent>, Consumer<FluxSink<ProfileCreatedEvent>> {
private final BlockingDeque<ProfileCreatedEvent> profileEventsDeque = new LinkedBlockingDeque<>();
private final Executor executor;
public ProfileCreatedEventPublisher( Executor executor) {
this.executor = executor;
}
@Override
public void accept(FluxSink<ProfileCreatedEvent> profileCreatedEventFluxSink) {
this.executor.execute(new Runnable() {
@Override
public void run() {
while (true){
try {
ProfileCreatedEvent profileCreatedEvent = profileEventsDeque.take();
profileCreatedEventFluxSink.next(profileCreatedEvent);
} catch (InterruptedException e) {
ReflectionUtils.rethrowRuntimeException(e);
}
}
}
});
}
@Override
public void onApplicationEvent(ProfileCreatedEvent profileCreatedEvent) {
this.profileEventsDeque.offer(profileCreatedEvent);
}
}
The Test of the ServiceSendEventControler by http browser
Publish the event by WebSocket to an html client
Add WebSocket Code
@Configuration
public class WebSocketConfiguration {
private final ProfileCreatedEventPublisher profileCreatedEventPublisher;
public WebSocketConfiguration(ProfileCreatedEventPublisher profileCreatedEventPublisher) {
this.profileCreatedEventPublisher = profileCreatedEventPublisher;
}
@Bean
WebSocketHandler handler() {
Flux<ProfileCreatedEvent> share = Flux.create(profileCreatedEventPublisher).share();
return new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession wsSession) {
Flux<WebSocketMessage> map = share.map(profileCreatedEvent -> {
return (profileCreatedEvent);
})
.map(json -> wsSession.textMessage(json.toString()));
return wsSession.send(map);
}
};
}
@Bean
HandlerMapping handlerMapping() {
return new SimpleUrlHandlerMapping() {
{
setOrder(10);
setUrlMap(Collections.singletonMap("/ws/profiles", handler()));
}
};
}
@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
The html client
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Profile notification Client</title>
</head>
<body>
<script>
var socket = new WebSocket('ws://localhost:8080/ws/profiles');
socket.addEventListener('message', function (ev) {
window.alert('message from server: ' + ev.data);
})
</script>
</body>
</html>
Running the code, browser listening the webSocket endpoint and creating the profile created event.
@Configuration
public class WebSocketConfiguration {
private final ProfileCreatedEventPublisher profileCreatedEventPublisher;
private final ObjectMapper objectMapper;
@SneakyThrows
private String jsonFrom(ProfileCreatedEvent profileCreatedEvent) {
return objectMapper.writeValueAsString(profileCreatedEvent);
}
public WebSocketConfiguration(ProfileCreatedEventPublisher profileCreatedEventPublisher, ObjectMapper objectMapper) {
this.profileCreatedEventPublisher = profileCreatedEventPublisher;
this.objectMapper = objectMapper;
}
@Bean
WebSocketHandler handler() {
Flux<ProfileCreatedEvent> share = Flux.create(profileCreatedEventPublisher).share();
return session -> {
Flux<WebSocketMessage> map = share
.map(this::jsonFrom)
.map(session::textMessage);
return session.send(map);
};
}
@Bean
HandlerMapping handlerMapping() {
return new SimpleUrlHandlerMapping() {
{
setOrder(10);
setUrlMap(Collections.singletonMap("/ws/profiles", handler()));
}
};
}
@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Running the code, browser listening the webSocket endpoint and creating the profile created event.
Browser inspector
html is upgraded to websocket protocol.
eot
No hay comentarios:
Publicar un comentario