Skip to content

Commit

Permalink
[SELC-5981] changed user-cdc business logic to eveluate if mail is ch…
Browse files Browse the repository at this point in the history
…anged
  • Loading branch information
flaminiaScarciofolo committed Nov 13, 2024
1 parent e84cfae commit 24bd176
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.client.EventHubFdRestClient;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.FdUserNotificationToSend;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +37,7 @@
import org.openapi.quarkus.user_registry_json.api.UserApi;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.concurrent.TimeoutException;

Expand All @@ -50,6 +52,8 @@
import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_CDC_NAME;
import static it.pagopa.selfcare.user.model.constants.EventsName.FD_EVENT_USER_CDC_NAME;
import static java.util.Arrays.asList;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.nullsLast;

@Startup
@Slf4j
Expand All @@ -72,6 +76,8 @@ public class UserInstitutionCdcService {
private final Integer retryMinBackOff;
private final Integer retryMaxBackOff;
private final Integer maxRetry;
private final boolean sendEventsEnabled;
private final boolean sendFdEventsEnabled;


@RestClient
Expand Down Expand Up @@ -107,11 +113,13 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
this.telemetryClient = telemetryClient;
this.tableClient = tableClient;
this.notificationMapper = notificationMapper;
this.sendEventsEnabled = sendEventsEnabled;
this.sendFdEventsEnabled = sendFdEventsEnabled;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream(sendEventsEnabled, sendFdEventsEnabled);
initOrderStream();
}

private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnabled) {
private void initOrderStream() {
log.info("Starting initOrderStream ... ");

//Retrieve last resumeToken for watching collection at specific operation
Expand Down Expand Up @@ -140,12 +148,35 @@ private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnab

Multi<ChangeStreamDocument<UserInstitution>> publisher = dataCollection.watch(pipeline, UserInstitution.class, options);
publisher.subscribe().with(
this::consumerUserInstitutionRepositoryEvent,
document -> propagateDocumentToConsumers(document, publisher),
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});
log.info("Completed initOrderStream ... ");
}

public void propagateDocumentToConsumers(ChangeStreamDocument<UserInstitution> document, Multi<ChangeStreamDocument<UserInstitution>> publisher) {
assert document.getFullDocument() != null;
assert document.getDocumentKey() != null;
UserInstitution userInstitutionChanged = document.getFullDocument();

boolean hasActiveFdProduct = userInstitutionChanged.getProducts().stream()
.anyMatch(product -> (PROD_FD.getValue().equals(product.getProductId()) || PROD_FD_GARANTITO.getValue().equals(product.getProductId()))
&& OnboardedProductState.ACTIVE.equals(product.getStatus()));

boolean userMailIsChanged = isUserMailChanged(userInstitutionChanged);

if (Boolean.FALSE.equals(userMailIsChanged)) {
publisher.subscribe().with(
this::consumerUserInstitutionRepositoryEvent,
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});
}

if (Boolean.TRUE.equals(sendEventsEnabled)) {
publisher.subscribe().with(
Expand All @@ -157,18 +188,27 @@ private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnab
});
}

if (Boolean.TRUE.equals(sendFdEventsEnabled)) {
if (Boolean.TRUE.equals(sendFdEventsEnabled) && hasActiveFdProduct) {
publisher.subscribe().with(
this::consumerToSendUserEventForFD,
subscription -> consumerToSendUserEventForFD(document, userMailIsChanged),
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}
}


log.info("Completed initOrderStream ... ");
private boolean isUserMailChanged(UserInstitution userInstitutionChanged) {
OffsetDateTime maxProductUpdateAt = null;
if (Objects.nonNull(userInstitutionChanged.getProducts()) && !userInstitutionChanged.getProducts().isEmpty()) {
maxProductUpdateAt = userInstitutionChanged.getProducts().stream()
.max(Comparator.comparing(OnboardedProduct::getUpdatedAt, nullsLast(naturalOrder())))
.map(OnboardedProduct::getUpdatedAt)
.orElse(null);
}
OffsetDateTime maxUserMailUpdateAt = userInstitutionChanged.getUserMailUpdatedAt();
return Objects.nonNull(maxProductUpdateAt) && Objects.nonNull(maxUserMailUpdateAt) && maxUserMailUpdateAt.isAfter(maxProductUpdateAt);
}

private ReactiveMongoCollection<UserInstitution> getCollection() {
Expand Down Expand Up @@ -241,7 +281,7 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
});
}

public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document) {
public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document, boolean isUserMailChanged) {

if (Objects.nonNull(document.getFullDocument()) && Objects.nonNull(document.getDocumentKey())) {
UserInstitution userInstitutionChanged = document.getFullDocument();
Expand All @@ -251,9 +291,9 @@ public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> d
userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD.getValue(), PROD_FD_GARANTITO.getValue())))
.onItem().ifNotNull().transform(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> {
.onItem().transformToMulti(userResource -> Multi.createFrom().iterable(UserUtils.retrieveFdProduct(userInstitutionChanged.getProducts(), List.of(PROD_FD.getValue(), PROD_FD_GARANTITO.getValue()), isUserMailChanged))
.onItem().transformToUniAndMerge(onboardedProduct -> {
FdUserNotificationToSend fdUserNotificationToSend = notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct));
log.info("Sending message to EventHubFdRestClient ... ");
return eventHubFdRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.experimental.FieldNameConstants;
import org.bson.types.ObjectId;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -24,5 +25,6 @@ public class UserInstitution extends ReactivePanacheMongoEntity {
private String institutionRootName;
private List<OnboardedProduct> products = new ArrayList<>();
private String userMailUuid;
private OffsetDateTime userMailUpdatedAt;

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.mongodb.MongoTestResource;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import it.pagopa.selfcare.user.client.EventHubFdRestClient;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.UserInstitutionCdcService;
import it.pagopa.selfcare.user.event.entity.UserInfo;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.FdUserNotificationToSend;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
Expand All @@ -32,6 +32,7 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.UUID;

import static it.pagopa.selfcare.user.event.UserInstitutionCdcService.USERS_FIELD_LIST_WITHOUT_FISCAL_CODE;
import static it.pagopa.selfcare.user.model.NotificationUserType.*;
Expand All @@ -56,6 +57,9 @@ public class UserInstitutionCdcServiceTest {
@InjectMock
EventHubFdRestClient eventHubFdRestClient;

@InjectMock
UserInstitutionRepository userInstitutionRepository;


@Test
void consumerToSendScUserEvent() {
Expand Down Expand Up @@ -89,7 +93,29 @@ void consumerToSendUserEventForFDSendACTIVE_USER() {
when(eventHubFdRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document);
userInstitutionCdcService.consumerToSendUserEventForFD(document, false);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubFdRestClient, times(1)).
sendMessage(any(FdUserNotificationToSend.class));
Assertions.assertEquals(ACTIVE_USER, argumentCaptor.getValue().getType());
}

@Test
void consumerToSendUserEventForFDSendACTIVE_USER_mailChanged() {
UserInstitution userInstitution = dummyUserInstitution(false, OnboardedProductState.ACTIVE);
ChangeStreamDocument<UserInstitution> document = Mockito.mock(ChangeStreamDocument.class);
when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));
ArgumentCaptor<FdUserNotificationToSend> argumentCaptor = ArgumentCaptor.forClass(FdUserNotificationToSend.class);
when(eventHubFdRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document, true);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubFdRestClient, times(1)).
Expand All @@ -112,7 +138,7 @@ void consumerToSendUserEventForFDSendSUSPEND_USER() {
when(eventHubFdRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document);
userInstitutionCdcService.consumerToSendUserEventForFD(document, false);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubFdRestClient, times(1)).
Expand All @@ -134,7 +160,7 @@ void consumerToSendUserEventForFDSendDELETE_USER() {
when(eventHubFdRestClient.sendMessage(argumentCaptor.capture()))
.thenReturn(Uni.createFrom().nullItem());

userInstitutionCdcService.consumerToSendUserEventForFD(document);
userInstitutionCdcService.consumerToSendUserEventForFD(document, false);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubFdRestClient, times(1)).
Expand All @@ -153,7 +179,7 @@ void consumerToSendUserEventForFDNotSend() {
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));

userInstitutionCdcService.consumerToSendUserEventForFD(document);
userInstitutionCdcService.consumerToSendUserEventForFD(document, false);
verify(userRegistryApi, times(1)).
findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId());
verify(eventHubFdRestClient, times(0)).
Expand All @@ -167,13 +193,14 @@ UserResource dummyUserResource() {
return userResource;
}

UserInstitution dummyUserInstitution(boolean sendForFd, OnboardedProductState state){
UserInstitution dummyUserInstitution(boolean sendForFd, OnboardedProductState state) {
UserInstitution userInstitution = new UserInstitution();
userInstitution.setId(ObjectId.get());
if(sendForFd) {
userInstitution.setUserMailUpdatedAt(OffsetDateTime.of(2023, 1, 3, 0, 0, 0, 0, ZoneOffset.UTC));
if (sendForFd) {
userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", state, 2, "prod-fd"),
dummyOnboardedProduct("example-2", OnboardedProductState.ACTIVE, 1, "prod-io")));
}else {
} else {
userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", OnboardedProductState.ACTIVE, 2, "prod-io"),
dummyOnboardedProduct("example-2", OnboardedProductState.ACTIVE, 1, "prod-fd")));
}
Expand All @@ -191,4 +218,86 @@ OnboardedProduct dummyOnboardedProduct(String productRole, OnboardedProductState
return onboardedProduct;
}


@Test
void propagateDocumentToConsumers_withChangeUserMailFalse() {
ChangeStreamDocument<UserInstitution> document = mock(ChangeStreamDocument.class);

Multi<ChangeStreamDocument<UserInstitution>> publisher = Multi.createFrom().item(document);

UserInstitution userInstitution = new UserInstitution();
userInstitution.setId(new ObjectId());
OnboardedProduct product = new OnboardedProduct();
product.setProductId("prod-io");
product.setStatus(OnboardedProductState.ACTIVE);
userInstitution.setProducts(List.of(product));

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));

when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

userInstitutionCdcService.propagateDocumentToConsumers(document, publisher);
verify(eventHubFdRestClient, times(0)).sendMessage(any(FdUserNotificationToSend.class));
verify(eventHubRestClient, times(1)).sendMessage(any(UserNotificationToSend.class));
verify(userInstitutionRepository, times(1)).updateUser(any());
}

@Test
void propagateDocumentToConsumers_withChangeUserMailWithFd() {
ChangeStreamDocument<UserInstitution> document = mock(ChangeStreamDocument.class);

Multi<ChangeStreamDocument<UserInstitution>> publisher = Multi.createFrom().item(document);

UserInstitution userInstitution = new UserInstitution();
userInstitution.setId(new ObjectId());
OnboardedProduct product = new OnboardedProduct();
product.setProductId("prod-fd");
product.setStatus(OnboardedProductState.ACTIVE);
userInstitution.setProducts(List.of(product));
userInstitution.setUserMailUpdatedAt(OffsetDateTime.of(LocalDate.now(), LocalTime.now(), ZoneOffset.UTC));

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));

when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

userInstitutionCdcService.propagateDocumentToConsumers(document, publisher);
verify(eventHubFdRestClient, times(1)).sendMessage(any(FdUserNotificationToSend.class));
verify(eventHubRestClient, times(1)).sendMessage(any(UserNotificationToSend.class));
verify(userInstitutionRepository, times(1)).updateUser(any());
}


@Test
void propagateDocumentToConsumers_withChangeUserMailTrueWithoutFd() {

ChangeStreamDocument<UserInstitution> document = mock(ChangeStreamDocument.class);

Multi<ChangeStreamDocument<UserInstitution>> publisher = Multi.createFrom().item(document);

UserInstitution userInstitution = new UserInstitution();
userInstitution.setId(new ObjectId());
OnboardedProduct product = new OnboardedProduct();
product.setProductId("prod-io");
product.setStatus(OnboardedProductState.ACTIVE);
userInstitution.setProducts(List.of(product));
userInstitution.setUserMailUpdatedAt(OffsetDateTime.of(LocalDate.now(), LocalTime.now(), ZoneOffset.UTC));

UserResource userResource = dummyUserResource();
when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()))
.thenReturn(Uni.createFrom().item(userResource));

when(document.getFullDocument()).thenReturn(userInstitution);
when(document.getDocumentKey()).thenReturn(new BsonDocument());

userInstitutionCdcService.propagateDocumentToConsumers(document, publisher);
verify(eventHubFdRestClient, times(0)).sendMessage(any(FdUserNotificationToSend.class));
verify(eventHubRestClient, times(1)).sendMessage(any(UserNotificationToSend.class));
verify(userInstitutionRepository, times(1)).updateUser(any());
}
}
Loading

0 comments on commit 24bd176

Please sign in to comment.