Skip to content

Commit

Permalink
[SELC-4195] Feat: Added notification service to sent Kafka events and…
Browse files Browse the repository at this point in the history
… notification emails (#58)
  • Loading branch information
flaminiaScarciofolo authored Feb 21, 2024
1 parent e8192d1 commit 0684553
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
@NoArgsConstructor(access = AccessLevel.NONE)
public class TemplateMailConstant {

private static final String ACTIVATE_SUBJECT = "Il tuo ruolo è stato riabilitato";
private static final String DELETE_SUBJECT = "Il tuo ruolo è stato rimosso";
private static final String SUSPEND_SUBJECT = "Il tuo ruolo è sospeso";
public static final String ACTIVATE_SUBJECT = "Il tuo ruolo è stato riabilitato";
public static final String DELETE_SUBJECT = "Il tuo ruolo è stato rimosso";
public static final String SUSPEND_SUBJECT = "Il tuo ruolo è sospeso";
public static final String ACTIVATE_TEMPLATE = "user_activated.ftlh";
public static final String DELETE_TEMPLATE = "user_deleted.ftlh";
public static final String SUSPEND_TEMPLATE = "user_suspended.ftlh";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package it.pagopa.selfcare.user.model;
import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class LoggedUser {
String uid;
String name;
String familyName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package it.pagopa.selfcare.user.model.notification;

import it.pagopa.selfcare.product.entity.Product;
import it.pagopa.selfcare.user.entity.UserInstitution;
import lombok.Builder;
import lombok.Getter;
import org.openapi.quarkus.user_registry_json.model.UserResource;

@Builder
@Getter
public class PrepareNotificationData {
private UserInstitution userInstitution;
private UserResource userResource;
private Product product;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package it.pagopa.selfcare.user.service;

import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.product.entity.Product;
import it.pagopa.selfcare.user.constant.OnboardedProductState;
import it.pagopa.selfcare.user.entity.UserInstitution;
import it.pagopa.selfcare.user.model.notification.UserNotificationToSend;
import org.openapi.quarkus.user_registry_json.model.UserResource;

public interface UserNotificationService {
Uni<UserNotificationToSend> sendKafkaNotification(UserNotificationToSend userNotificationToSend, String userId);
Uni<Void> sendEmailNotification(UserResource user, UserInstitution institution, Product product, OnboardedProductState status, String loggedUserName, String loggedUserSurname);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package it.pagopa.selfcare.user.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import freemarker.template.Configuration;
import freemarker.template.Template;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.MutinyEmitter;
import it.pagopa.selfcare.product.entity.Product;
import it.pagopa.selfcare.product.entity.ProductRole;
import it.pagopa.selfcare.user.conf.CloudTemplateLoader;
import it.pagopa.selfcare.user.constant.OnboardedProductState;
import it.pagopa.selfcare.user.entity.OnboardedProduct;
import it.pagopa.selfcare.user.entity.UserInstitution;
import it.pagopa.selfcare.user.exception.InvalidRequestException;
import it.pagopa.selfcare.user.model.notification.UserNotificationToSend;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static it.pagopa.selfcare.user.constant.TemplateMailConstant.*;

import org.openapi.quarkus.user_registry_json.model.UserResource;
import org.openapi.quarkus.user_registry_json.model.WorkContactResource;


@Slf4j
@ApplicationScoped
public class UserNotificationServiceImpl implements UserNotificationService {

public static final String ERROR_DURING_SEND_DATA_LAKE_NOTIFICATION_FOR_USER = "error during send dataLake notification for user {}";

@Inject
@Channel("sc-users")
MutinyEmitter<String> usersEmitter;

private final ObjectMapper objectMapper;

private final MailService mailService;

private final Configuration freemarkerConfig;

public UserNotificationServiceImpl(Configuration freemarkerConfig, CloudTemplateLoader cloudTemplateLoader, ObjectMapper objectMapper, MailService mailService) {
this.mailService = mailService;
this.freemarkerConfig = freemarkerConfig;
freemarkerConfig.setTemplateLoader(cloudTemplateLoader);
this.objectMapper = objectMapper;
}

private String convertNotificationToJson(UserNotificationToSend userNotificationToSend) {
try {
return objectMapper.writeValueAsString(userNotificationToSend);
} catch (JsonProcessingException e) {
log.warn(ERROR_DURING_SEND_DATA_LAKE_NOTIFICATION_FOR_USER, userNotificationToSend.getUser().getUserId());
throw new InvalidRequestException(ERROR_DURING_SEND_DATA_LAKE_NOTIFICATION_FOR_USER);
}
}

@Override
public Uni<UserNotificationToSend> sendKafkaNotification(UserNotificationToSend userNotificationToSend, String userId) {
String message = convertNotificationToJson(userNotificationToSend);
return usersEmitter.sendMessage(Message.of(message))
.onItem().invoke(() -> log.info("sent dataLake notification for user : {}", userId))
.onFailure().invoke(throwable -> log.warn("error during send dataLake notification for user {}: {} ", userId, throwable.getMessage(), throwable))
.replaceWith(userNotificationToSend);
}

@Override
public Uni<Void> sendEmailNotification(UserResource user, UserInstitution institution, Product product, OnboardedProductState status, String loggedUserName, String loggedUserSurname) {
log.info("sendMailNotification {}", status.name());
return switch (status) {
case ACTIVE ->
buildAndSendEmailNotification(user, institution, product, ACTIVATE_TEMPLATE, ACTIVATE_SUBJECT, loggedUserName, loggedUserSurname);
case SUSPENDED ->
buildAndSendEmailNotification(user, institution, product, DELETE_TEMPLATE, DELETE_SUBJECT, loggedUserName, loggedUserSurname);
case DELETED ->
buildAndSendEmailNotification(user, institution, product, SUSPEND_TEMPLATE, SUSPEND_SUBJECT, loggedUserName, loggedUserSurname);
case PENDING, TOBEVALIDATED, REJECTED -> Uni.createFrom().voidItem();
};
}

private Map<String, String> buildEmailDataModel(UserInstitution institution, Product product, String loggedUserName, String loggedUserSurname) {
Optional<OnboardedProduct> productDb = institution.getProducts().stream().filter(p -> StringUtils.equals(p.getProductId(), product.getId())).findFirst();

Optional<String> roleLabel = Optional.empty();
if (productDb.isPresent()) {
roleLabel = product.getRoleMappings().values().stream()
.flatMap(productRoleInfo -> productRoleInfo.getRoles().stream())
.filter(productRole -> productRole.getCode().equals(productDb.get().getProductRole()))
.map(ProductRole::getLabel).findAny();
}

Map<String, String> dataModel = new HashMap<>();
dataModel.put("productName", product.getTitle());
dataModel.put("productRole", roleLabel.orElse("no_role_found"));
dataModel.put("institutionName", institution.getInstitutionDescription());
dataModel.put("requesterName", loggedUserName);
dataModel.put("requesterSurname", loggedUserSurname);
return dataModel;
}

private Uni<Void> buildAndSendEmailNotification(org.openapi.quarkus.user_registry_json.model.UserResource user, UserInstitution institution, Product product, String templateName, String subject, String loggedUserName, String loggedUserSurname) {
return Uni.createFrom().voidItem().onItem().transformToUni(Unchecked.function(x -> {
String email = retrieveMail(user, institution);
Map<String, String> dataModel = buildEmailDataModel(institution, product, loggedUserName, loggedUserSurname);
return Uni.createFrom().item(getContent(templateName, dataModel))
.onItem().transformToUni(content -> mailService.sendMail(email, content.toString(), subject));
}));
}

private StringWriter getContent(String templateName, Map<String, String> dataModel) {
StringWriter stringWriter = null;
try {
Template template = freemarkerConfig.getTemplate(templateName);
stringWriter = new StringWriter();
template.process(dataModel, stringWriter);
} catch (Exception e) {
log.error("Unable to fetch template {}", templateName, e);
}
return stringWriter;
}

private static String retrieveMail(UserResource user, UserInstitution institution) {
WorkContactResource certEmail = user.getWorkContacts().getOrDefault(institution.getUserMailUuid(), null);
String email;
if (certEmail == null || certEmail.getEmail() == null || StringUtils.isBlank(certEmail.getEmail().getValue())) {
throw new InvalidRequestException("Missing mail for userId: " + user.getId());
} else {
email = certEmail.getEmail().getValue();
}
return email;
}
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,37 @@
package it.pagopa.selfcare.user.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
import it.pagopa.selfcare.user.constant.QueueEvent;
import it.pagopa.selfcare.user.entity.filter.UserInstitutionFilter;
import it.pagopa.selfcare.user.exception.InvalidRequestException;
import it.pagopa.selfcare.user.model.notification.UserNotificationToSend;
import it.pagopa.selfcare.user.util.UserUtils;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.gradle.internal.impldep.org.apache.commons.lang.StringUtils;
import org.openapi.quarkus.user_registry_json.model.MutableUserFieldsDto;
import java.util.ArrayList;
import java.util.List;

import org.openapi.quarkus.user_registry_json.api.UserApi;


@ApplicationScoped
@RequiredArgsConstructor
@Slf4j
public class UserRegistryServiceImpl implements UserRegistryService {
public static final String ERROR_DURING_SEND_DATA_LAKE_NOTIFICATION_FOR_USER = "error during send dataLake notification for user {}";
private static final String USERS_FIELD_LIST_WITHOUT_FISCAL_CODE = "name,familyName,email,workContacts";


private final ObjectMapper objectMapper;
private final UserInstitutionService userInstitutionService;
private final UserUtils userUtils;
private final UserNotificationService userNotificationService;


@RestClient
@Inject
private UserApi userRegistryApi;

@Inject
@Channel("sc-users")
private MutinyEmitter<String> usersEmitter;

@Override
public Uni<List<UserNotificationToSend>> updateUserRegistryAndSendNotificationToQueue(MutableUserFieldsDto userDto, String userId, String institutionId) {
Expand All @@ -57,25 +45,7 @@ public Uni<List<UserNotificationToSend>> updateUserRegistryAndSendNotificationTo
.onItem().transformToMulti(response -> userInstitutionService.findAllWithFilter(userInstitutionFilter.constructMap()))
.onItem().transformToMultiAndMerge(userInstitution -> userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId())
.onItem().transformToMulti(userResource -> Multi.createFrom().iterable(userUtils.buildUsersNotificationResponse(userInstitution, userResource, QueueEvent.UPDATE))))
.onItem().transformToUniAndMerge(notification -> sendUserNotification(notification, userId))
.onItem().transformToUniAndMerge(notification -> userNotificationService.sendKafkaNotification(notification, userId))
.collect().asList();
}


private String convertNotificationToJson(UserNotificationToSend userNotificationToSend) {
try {
return objectMapper.writeValueAsString(userNotificationToSend);
} catch (JsonProcessingException e) {
log.warn(ERROR_DURING_SEND_DATA_LAKE_NOTIFICATION_FOR_USER, userNotificationToSend.getUser().getUserId());
throw new InvalidRequestException(ERROR_DURING_SEND_DATA_LAKE_NOTIFICATION_FOR_USER);
}
}

private Uni<UserNotificationToSend> sendUserNotification(UserNotificationToSend userNotificationToSend, String userId) {
String message = convertNotificationToJson(userNotificationToSend);
return usersEmitter.sendMessage(Message.of(message))
.onItem().invoke(() -> log.info("sent dataLake notification for user : {}", userId))
.onFailure().invoke(throwable -> log.warn("error during send dataLake notification for user {}: {} ", userId, throwable.getMessage(), throwable))
.replaceWith(userNotificationToSend);
}
}
Loading

0 comments on commit 0684553

Please sign in to comment.