Skip to content

Commit

Permalink
[SELC-5887] First implementation for send event for FD from user-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
flaminiaScarciofolo committed Oct 28, 2024
1 parent deb015e commit 6c1fe53
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -52,6 +55,9 @@ public class UserInstitutionCdcService {
private static final String COLLECTION_NAME = "userInstitutions";
private static final String OPERATION_NAME = "USER-CDC-UserInfoUpdate";
public static final String USERS_FIELD_LIST_WITHOUT_FISCAL_CODE = "name,familyName,email,workContacts";
private static final String PROD_FD = "prod-fd";
private static final String PROD_FD_GARANTITO = "prod-fd-garantito";


private final TelemetryClient telemetryClient;

Expand Down Expand Up @@ -103,7 +109,7 @@ private void initOrderStream(Boolean sendEventsEnabled) {
//Retrieve last resumeToken for watching collection at specific operation
String resumeToken = null;

if(!ConfigUtils.getProfiles().contains("test")) {
if (!ConfigUtils.getProfiles().contains("test")) {
try {
TableEntity cdcStartAtEntity = tableClient.getEntity(CDC_START_AT_PARTITION_KEY, CDC_START_AT_ROW_KEY);
if (Objects.nonNull(cdcStartAtEntity))
Expand All @@ -117,7 +123,7 @@ private void initOrderStream(Boolean sendEventsEnabled) {
ReactiveMongoCollection<UserInstitution> dataCollection = getCollection();
ChangeStreamOptions options = new ChangeStreamOptions()
.fullDocument(FullDocument.UPDATE_LOOKUP);
if(Objects.nonNull(resumeToken))
if (Objects.nonNull(resumeToken))
options = options.resumeAfter(BsonDocument.parse(resumeToken));

Bson match = Aggregates.match(Filters.in("operationType", asList("update", "replace", "insert")));
Expand All @@ -129,11 +135,11 @@ private void initOrderStream(Boolean sendEventsEnabled) {
this::consumerUserInstitutionRepositoryEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});

if(sendEventsEnabled) {
if (sendEventsEnabled) {
publisher.subscribe().with(
this::consumerToSendScUserEvent,
failure -> {
Expand Down Expand Up @@ -196,13 +202,13 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Multi.createFrom().iterable(UserUtils.groupingProductAndReturnMinStateProduct(userInstitutionChanged.getProducts()))
.map(onboardedProduct -> notificationMapper.toUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource))
.onItem().transformToUniAndMerge(userNotificationToSend -> eventHubRestClient.sendMessage(userNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.toUni()
)
.subscribe().with(
Expand All @@ -216,6 +222,44 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
});
}

public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document) {

assert document.getFullDocument() != null;
assert document.getDocumentKey() != null;
UserInstitution userInstitutionChanged = document.getFullDocument();

log.info("Starting consumerToSendUserEventForFd ... ");

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD, PROD_FD_GARANTITO)))
.map(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> eventHubRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
)
.subscribe().with(
result -> {
log.info("SendEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(EVENTS_USER_INSTITUTION_SUCCESS, 1D));
},
failure -> {
log.error("Error during SendEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
});
}

private NotificationUserType evaluateType(OnboardedProduct onboardedProduct) {
return switch (onboardedProduct.getStatus()) {
case ACTIVE -> NotificationUserType.ACTIVE_USER;
case SUSPENDED -> NotificationUserType.SUSPEND_USER;
case DELETED -> NotificationUserType.DELETE_USER;
default -> null;
};
}

private boolean checkIfIsRetryableException(Throwable throwable) {
return throwable instanceof TimeoutException ||
(throwable instanceof ClientWebApplicationException webApplicationException && webApplicationException.getResponse().getStatus() == 429);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.model.FdUserNotificationToSend;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import org.mapstruct.Mapper;
Expand Down Expand Up @@ -33,4 +35,19 @@ public interface NotificationMapper {
default String toUniqueIdNotification(UserInstitution userInstitution, OnboardedProduct product) {
return UserUtils.uniqueIdNotification(userInstitution.getId().toHexString(), product.getProductId(), product.getProductRole());
}

@Mapping(target = "id", expression = "java(toUniqueIdNotification(userInstitutionChanged, product))")
@Mapping(target = "onboardingTokenId", source = "product.tokenId")
@Mapping(target = "product", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user.role", source = "product.role")
@Mapping(target = "user.productRole", source = "product.productRole")
@Mapping(target = "user.relationshipStatus", source = "product.status")
@Mapping(target = "user.userId", source = "userResource.id", ignore = true)
@Mapping(target = "user.name", source = "userResource.name.value")
@Mapping(target = "user.familyName", source = "userResource.familyName.value")
@Mapping(target = "user.email", source = "userResource.email.value")
@Mapping(target = "type", source = "type")
FdUserNotificationToSend toFdUserNotificationToSend(UserInstitution userInstitutionChanged, OnboardedProduct product, UserResource userResource, NotificationUserType type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.UserInstitutionCdcService;
import it.pagopa.selfcare.user.event.entity.UserInfo;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.model.FdUserNotificationToSend;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;
import jakarta.inject.Inject;
import org.bson.BsonDocument;
import org.bson.types.ObjectId;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.openapi.quarkus.user_registry_json.api.UserApi;
import org.openapi.quarkus.user_registry_json.model.UserResource;
Expand All @@ -28,6 +33,7 @@
import java.util.List;

import static it.pagopa.selfcare.user.event.UserInstitutionCdcService.USERS_FIELD_LIST_WITHOUT_FISCAL_CODE;
import static it.pagopa.selfcare.user.model.NotificationUserType.*;
import static org.mockito.Mockito.*;

@QuarkusTest
Expand All @@ -45,9 +51,10 @@ public class UserInstitutionCdcServiceTest {
@InjectMock
EventHubRestClient eventHubRestClient;


@Test
void consumerToSendScUserEvent() {
UserInstitution userInstitution = dummyUserInstitution();
UserInstitution userInstitution = dummyUserInstitution(false, null);
ChangeStreamDocument<UserInstitution> document = Mockito.mock(ChangeStreamDocument.class);
when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());
Expand All @@ -60,27 +67,120 @@ void consumerToSendScUserEvent() {
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubRestClient, times(2)).
sendMessage(any());
sendMessage(any(UserNotificationToSend.class));
}

@Test
void consumerToSendUserEventForFDSendACTIVE_USER() {
UserInstitution userInstitution = dummyUserInstitution(true, OnboardedProductState.ACTIVE);
ChangeStreamDocument<UserInstitution> document = Mockito.mock(ChangeStreamDocument.class);
when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));
ArgumentCaptor<FdUserNotificationToSend> argumentCaptor = ArgumentCaptor.forClass(FdUserNotificationToSend.class);
when(eventHubRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubRestClient, times(1)).
sendMessage(any(FdUserNotificationToSend.class));
Assertions.assertEquals(ACTIVE_USER, argumentCaptor.getValue().getType());
}

@Test
void consumerToSendUserEventForFDSendSUSPEND_USER() {
UserInstitution userInstitution = dummyUserInstitution(true, OnboardedProductState.SUSPENDED);
ChangeStreamDocument<UserInstitution> document = Mockito.mock(ChangeStreamDocument.class);
when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));
ArgumentCaptor<FdUserNotificationToSend> argumentCaptor = ArgumentCaptor.forClass(FdUserNotificationToSend.class);

when(eventHubRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubRestClient, times(1)).
sendMessage(any(FdUserNotificationToSend.class));
Assertions.assertEquals(SUSPEND_USER, argumentCaptor.getValue().getType());
}

@Test
void consumerToSendUserEventForFDSendDELETE_USER() {
UserInstitution userInstitution = dummyUserInstitution(true, OnboardedProductState.DELETED);
ChangeStreamDocument<UserInstitution> document = Mockito.mock(ChangeStreamDocument.class);
when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));
ArgumentCaptor<FdUserNotificationToSend> argumentCaptor = ArgumentCaptor.forClass(FdUserNotificationToSend.class);
when(eventHubRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubRestClient, times(1)).
sendMessage(any(FdUserNotificationToSend.class));
Assertions.assertEquals(DELETE_USER, argumentCaptor.getValue().getType());
}



@Test
void consumerToSendUserEventForFDNotSend() {
UserInstitution userInstitution = dummyUserInstitution(false, null);
ChangeStreamDocument<UserInstitution> document = Mockito.mock(ChangeStreamDocument.class);
when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));

userInstitutionCdcService.consumerToSendUserEventForFD(document);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubRestClient, times(0)).
sendMessage(any(UserNotificationToSend.class));
}


UserResource dummyUserResource() {
UserResource userResource = new UserResource();

return userResource;
}

UserInstitution dummyUserInstitution() {
UserInstitution dummyUserInstitution(boolean sendForFd, OnboardedProductState state){
UserInstitution userInstitution = new UserInstitution();
userInstitution.setId(ObjectId.get());
userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", OnboardedProductState.ACTIVE, 1),
dummyOnboardedProduct("example-2", OnboardedProductState.DELETED, 1)));
if(sendForFd) {
userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", state, 2, "prod-fd"),
dummyOnboardedProduct("example-2", OnboardedProductState.ACTIVE, 1, "prod-io")));
}else {
userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", OnboardedProductState.ACTIVE, 2, "prod-io"),
dummyOnboardedProduct("example-2", OnboardedProductState.ACTIVE, 1, "prod-fd")));
}
return userInstitution;

}

OnboardedProduct dummyOnboardedProduct(String productRole, OnboardedProductState state, int day) {
OnboardedProduct dummyOnboardedProduct(String productRole, OnboardedProductState state, int day, String productId) {
OnboardedProduct onboardedProduct = new OnboardedProduct();
onboardedProduct.setProductId("productId");
onboardedProduct.setProductId(productId);
onboardedProduct.setProductRole(productRole);
onboardedProduct.setCreatedAt(OffsetDateTime.of(2024, 1, day, 0, 0, 0, 0, ZoneOffset.UTC));
onboardedProduct.setUpdatedAt(OffsetDateTime.of(2024, 1, day, 0, 0, 0, 0, ZoneOffset.UTC));
Expand Down
Loading

0 comments on commit 6c1fe53

Please sign in to comment.