Skip to content

Commit

Permalink
[SELC-5887] added configuration for send fd event
Browse files Browse the repository at this point in the history
  • Loading branch information
flaminiaScarciofolo committed Oct 30, 2024
1 parent 6c1fe53 commit fb49f8d
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.client.EventHubFdRestClient;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
Expand Down Expand Up @@ -78,6 +79,10 @@ public class UserInstitutionCdcService {
@Inject
EventHubRestClient eventHubRestClient;

@RestClient
@Inject
EventHubFdRestClient eventHubFdRestClient;

private final NotificationMapper notificationMapper;


Expand All @@ -87,6 +92,7 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
@ConfigProperty(name = "user-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "user-cdc.retry") Integer maxRetry,
@ConfigProperty(name = "user-cdc.send-events.watch.enabled") Boolean sendEventsEnabled,
@ConfigProperty(name = "user-cdc.send-events-fd.watch.enabled") Boolean sendFdEventsEnabled,
UserInstitutionRepository userInstitutionRepository,
TelemetryClient telemetryClient,
TableClient tableClient, NotificationMapper notificationMapper) {
Expand All @@ -100,10 +106,10 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
this.tableClient = tableClient;
this.notificationMapper = notificationMapper;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream(sendEventsEnabled);
initOrderStream(sendEventsEnabled, sendFdEventsEnabled);
}

private void initOrderStream(Boolean sendEventsEnabled) {
private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnabled) {
log.info("Starting initOrderStream ... ");

//Retrieve last resumeToken for watching collection at specific operation
Expand Down Expand Up @@ -149,6 +155,17 @@ private void initOrderStream(Boolean sendEventsEnabled) {
});
}

if (sendFdEventsEnabled) {
publisher.subscribe().with(
this::consumerToSendUserEventForFD,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}


log.info("Completed initOrderStream ... ");
}

Expand Down Expand Up @@ -234,19 +251,22 @@ public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> d
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD, PROD_FD_GARANTITO)))
.map(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> eventHubRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
)
.onItem().ifNotNull().transform(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> {
log.info("Sending message to EventHubFdRestClient ... ");
return eventHubFdRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D)));
}
))
.subscribe().with(
result -> {
log.info("SendEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
log.info("SendFdEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(EVENTS_USER_INSTITUTION_SUCCESS, 1D));
},
failure -> {
log.error("Error during SendEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
log.error("Error during SendFdEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.model.FdUserNotificationToSend;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import it.pagopa.selfcare.user.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.openapi.quarkus.user_registry_json.model.CertifiableFieldResourceOfstring;
import org.openapi.quarkus.user_registry_json.model.UserResource;

import javax.swing.text.html.Option;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

@Mapper(componentModel = "cdi", imports = UUID.class)
Expand All @@ -20,13 +21,7 @@ public interface NotificationMapper {
@Mapping(target = "productId", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user.role", source = "product.role")
@Mapping(target = "user.productRole", source = "product.productRole")
@Mapping(target = "user.relationshipStatus", source = "product.status")
@Mapping(target = "user.userId", source = "userResource.id", ignore = true)
@Mapping(target = "user.name", source = "userResource.name.value")
@Mapping(target = "user.familyName", source = "userResource.familyName.value")
@Mapping(target = "user.email", source = "userResource.email.value")
@Mapping(target = "user", expression = "java(mapUser(userResource, userInstitution.getUserMailUuid(), product))")
@Mapping(target = "id", expression = "java(toUniqueIdNotification(userInstitution, product))")
@Mapping(target = "eventType", expression = "java(it.pagopa.selfcare.user.model.constants.QueueEvent.UPDATE)")
UserNotificationToSend toUserNotificationToSend(UserInstitution userInstitution, OnboardedProduct product, UserResource userResource);
Expand All @@ -41,13 +36,29 @@ default String toUniqueIdNotification(UserInstitution userInstitution, Onboarded
@Mapping(target = "product", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user.role", source = "product.role")
@Mapping(target = "user.productRole", source = "product.productRole")
@Mapping(target = "user.relationshipStatus", source = "product.status")
@Mapping(target = "user.userId", source = "userResource.id", ignore = true)
@Mapping(target = "user.name", source = "userResource.name.value")
@Mapping(target = "user.familyName", source = "userResource.familyName.value")
@Mapping(target = "user.email", source = "userResource.email.value")
@Mapping(target = "user", expression = "java(mapUser(userResource, userInstitutionChanged.getUserMailUuid(), product))")
@Mapping(target = "type", source = "type")
FdUserNotificationToSend toFdUserNotificationToSend(UserInstitution userInstitutionChanged, OnboardedProduct product, UserResource userResource, NotificationUserType type);


@Named("mapUser")
default UserToNotify mapUser(UserResource userResource, String userMailUuid, OnboardedProduct onboardedProduct) {
UserToNotify userToNotify = new UserToNotify();
userToNotify.setUserId(Optional.ofNullable(userResource.getId()).map(UUID::toString).orElse(null));
userToNotify.setName(Optional.ofNullable(userResource.getName()).map(CertifiableFieldResourceOfstring::getValue).orElse(null));
userToNotify.setFamilyName(Optional.ofNullable(userResource.getFamilyName()).map(CertifiableFieldResourceOfstring::getValue).orElse(null));
userToNotify.setEmail(Optional.ofNullable(userMailUuid).map(mailUuid -> retrieveMailFromWorkContacts(userResource, mailUuid)).orElse(null));
userToNotify.setProductRole(onboardedProduct.getProductRole());
userToNotify.setRole(Optional.ofNullable(onboardedProduct.getRole()).map(Enum::name).orElse(null));
userToNotify.setRelationshipStatus(onboardedProduct.getStatus());
return userToNotify;
}

default String retrieveMailFromWorkContacts(UserResource userResource, String userMailUuid) {
return Optional.ofNullable(userResource.getWorkContacts())
.flatMap(stringWorkContactResourceMap -> Optional.ofNullable(stringWorkContactResourceMap.get(userMailUuid))
.flatMap(workContactResource -> Optional.ofNullable(workContactResource.getEmail())
.map(CertifiableFieldResourceOfstring::getValue)))
.orElse(null);
}
}
11 changes: 8 additions & 3 deletions apps/user-cdc/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ quarkus.mongodb.connection-string = ${MONGODB-CONNECTION-STRING}
quarkus.mongodb.database = selcUser

#False for pnpg use case because we must not send events
user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:false}
user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:true}
user-cdc.send-events-fd.watch.enabled=${USER_CDC_SEND_EVENTS_FD_WATCH_ENABLED:true}
user-cdc.appinsights.connection-string=${APPLICATIONINSIGHTS_CONNECTION_STRING:InstrumentationKey=00000000-0000-0000-0000-000000000000}
user-cdc.table.name=${START_AT_TABLE_NAME:CdCStartAt}
user-cdc.storage.connection-string=${STORAGE_CONNECTION_STRING:UseDevelopmentStorage=true;}
Expand All @@ -23,6 +24,10 @@ quarkus.openapi-generator.codegen.spec.user_registry_json.additional-model-type-
quarkus.openapi-generator.user_registry_json.auth.api_key.api-key = ${USER-REGISTRY-API-KEY:example-api-key}
quarkus.rest-client."org.openapi.quarkus.user_registry_json.api.UserApi".url=${USER_REGISTRY_URL:http://localhost:8080}

quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}
quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}${EVENT_HUB_SC_USERS_TOPIC:sc-users}
eventhub.rest-client.keyName=${SHARED_ACCESS_KEY_NAME:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}

quarkus.rest-client.event-hub-fd.url=${EVENT_HUB_BASE_PATH:test}${EVENT_HUB_SELFCARE_FD_TOPIC:selfcare-fd}
eventhubfd.rest-client.keyName=${FD_SHARED_ACCESS_KEY_NAME:test}
eventhubfd.rest-client.key=${EVENTHUB-SELFCARE-FD-FD-KEY-LC:test}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package it.pagopa.selfcare.user.event.mapper;

import it.pagopa.selfcare.onboarding.common.PartyRole;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserToNotify;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;
import org.junit.jupiter.api.Test;
import org.openapi.quarkus.user_registry_json.model.CertifiableFieldResourceOfstring;
import org.openapi.quarkus.user_registry_json.model.UserResource;
import org.openapi.quarkus.user_registry_json.model.WorkContactResource;

import static org.junit.jupiter.api.Assertions.*;
import java.util.Map;
import java.util.UUID;

class NotificationMapperTest {

@Test
void mapUser_withValidData_shouldMapFieldsCorrectly() {
UserResource userResource = new UserResource();
userResource.setId(UUID.randomUUID());
userResource.setName(new CertifiableFieldResourceOfstring().value("John"));
userResource.setFamilyName(new CertifiableFieldResourceOfstring().value("Doe"));
userResource.setWorkContacts(Map.of("mailUuid", new WorkContactResource().email(new CertifiableFieldResourceOfstring().value("[email protected]"))));
OnboardedProduct onboardedProduct = new OnboardedProduct();
onboardedProduct.setProductRole("Admin");
onboardedProduct.setRole(PartyRole.MANAGER);
onboardedProduct.setStatus(OnboardedProductState.ACTIVE);

UserToNotify result = new NotificationMapperImpl().mapUser(userResource, "mailUuid", onboardedProduct);

assertEquals(userResource.getId().toString(), result.getUserId());
assertEquals("John", result.getName());
assertEquals("Doe", result.getFamilyName());
assertEquals("[email protected]", result.getEmail());
assertEquals("Admin", result.getProductRole());
assertEquals("MANAGER", result.getRole());
assertEquals(OnboardedProductState.ACTIVE, result.getRelationshipStatus());
}

@Test
void mapUser_withNullFields_shouldHandleNullValues() {
UserResource userResource = new UserResource();
OnboardedProduct onboardedProduct = new OnboardedProduct();

UserToNotify result = new NotificationMapperImpl().mapUser(userResource, null, onboardedProduct);

assertNull(result.getUserId());
assertNull(result.getName());
assertNull(result.getFamilyName());
assertNull(result.getEmail());
assertNull(result.getProductRole());
assertNull(result.getRole());
assertNull(result.getRelationshipStatus());
}

@Test
void retrieveMailFromWorkContacts_withValidMailUuid_shouldReturnEmail() {
UserResource userResource = new UserResource();
userResource.setWorkContacts(Map.of("mailUuid", new WorkContactResource().email(new CertifiableFieldResourceOfstring().value("[email protected]"))));

String result = new NotificationMapperImpl().retrieveMailFromWorkContacts(userResource, "mailUuid");

assertEquals("[email protected]", result);
}

@Test
void retrieveMailFromWorkContacts_withInvalidMailUuid_shouldReturnNull() {
UserResource userResource = new UserResource();
userResource.setWorkContacts(Map.of("mailUuid", new WorkContactResource().email(new CertifiableFieldResourceOfstring().value("[email protected]"))));

String result = new NotificationMapperImpl().retrieveMailFromWorkContacts(userResource, "invalidUuid");

assertNull(result);
}

@Test
void retrieveMailFromWorkContacts_withNullWorkContacts_shouldReturnNull() {
UserResource userResource = new UserResource();

String result = new NotificationMapperImpl().retrieveMailFromWorkContacts(userResource, "mailUuid");

assertNull(result);
}
}
Loading

0 comments on commit fb49f8d

Please sign in to comment.