diff --git a/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java b/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java
index 9ea23804a3..b642c7ad65 100644
--- a/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java
+++ b/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java
@@ -42,7 +42,7 @@
public abstract class ConnectorRoutesTest extends CamelQuarkusTestSupport {
- private static final String KAFKA_SOURCE_MOCK = "direct:kafka-source-mock";
+ public static final String KAFKA_SOURCE_MOCK = "direct:kafka-source-mock";
@Inject
protected ConnectorConfig connectorConfig;
diff --git a/connector-email/pom.xml b/connector-email/pom.xml
index cb07982dc2..366af3d5ae 100644
--- a/connector-email/pom.xml
+++ b/connector-email/pom.xml
@@ -66,6 +66,12 @@
+
+ io.quarkus
+ quarkus-junit5-mockito
+ test
+
+
org.apache.camel.quarkus
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/CloudEventHistoryBuilder.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/CloudEventHistoryBuilder.java
new file mode 100644
index 0000000000..a8bfa0eaf3
--- /dev/null
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/CloudEventHistoryBuilder.java
@@ -0,0 +1,42 @@
+package com.redhat.cloud.notifications.connector.email;
+
+import com.redhat.cloud.notifications.connector.OutgoingCloudEventBuilder;
+import io.vertx.core.json.JsonObject;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.redhat.cloud.notifications.connector.ExchangeProperty.SUCCESSFUL;
+
+@ApplicationScoped
+@Alternative
+@Priority(0) // The value doesn't matter.
+public class CloudEventHistoryBuilder extends OutgoingCloudEventBuilder {
+
+ public static final String TOTAL_RECIPIENTS_KEY = "total_recipients";
+ public static final String TOTAL_FAILURE_RECIPIENTS_KEY = "total_failure_recipients";
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ int totalRecipients = exchange.getProperty(TOTAL_RECIPIENTS_KEY, Integer.class);
+ Optional> recipientsWithError = Optional.ofNullable(exchange.getProperty("TODO", Set.class));
+ exchange.setProperty(SUCCESSFUL, recipientsWithError.isEmpty() || recipientsWithError.get().size() == 0);
+ super.process(exchange);
+
+ Message in = exchange.getIn();
+ JsonObject cloudEvent = new JsonObject(in.getBody(String.class));
+ JsonObject data = new JsonObject(cloudEvent.getString("data"));
+ data.getJsonObject("details").put(TOTAL_RECIPIENTS_KEY, totalRecipients);
+
+ if (recipientsWithError.isPresent()) {
+ data.getJsonObject("details").put(TOTAL_FAILURE_RECIPIENTS_KEY, recipientsWithError.get().size());
+ }
+
+ cloudEvent.put("data", data.encode());
+ in.setBody(cloudEvent.encode());
+ }
+}
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java
index 630928fd8e..cffba152d5 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java
@@ -3,14 +3,12 @@
import com.redhat.cloud.notifications.connector.CloudEventDataExtractor;
import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
import com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty;
-import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
-import io.vertx.core.json.JsonArray;
+import com.redhat.cloud.notifications.connector.email.model.EmailNotification;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.Exchange;
-import java.util.List;
import java.util.Set;
import static java.util.stream.Collectors.toSet;
@@ -30,38 +28,21 @@ public class EmailCloudEventDataExtractor extends CloudEventDataExtractor {
@Override
public void extract(final Exchange exchange, final JsonObject cloudEventData) {
- final List recipientSettings = cloudEventData.getJsonArray("recipient_settings")
- .stream()
- .map(JsonObject.class::cast)
- .map(jsonSetting -> jsonSetting.mapTo(RecipientSettings.class))
- .toList();
-
- final Set subscribers = cloudEventData.getJsonArray("subscribers", JsonArray.of())
- .stream()
- .map(String.class::cast)
- .collect(toSet());
-
- final Set unsubscribers = cloudEventData.getJsonArray("unsubscribers", JsonArray.of())
- .stream()
- .map(String.class::cast)
- .collect(toSet());
-
- final JsonObject recipientsAuthorizationCriterion = cloudEventData.getJsonObject("recipients_authorization_criterion");
-
- final Set emails = recipientSettings.stream()
+ EmailNotification emailNotification = cloudEventData.mapTo(EmailNotification.class);
+ final Set emails = emailNotification.recipientSettings().stream()
.filter(settings -> settings.getEmails() != null)
.flatMap(settings -> settings.getEmails().stream())
.collect(toSet());
- exchange.setProperty(ExchangeProperty.RENDERED_BODY, cloudEventData.getString("email_body"));
- exchange.setProperty(ExchangeProperty.RENDERED_SUBJECT, cloudEventData.getString("email_subject"));
- exchange.setProperty(ExchangeProperty.RECIPIENT_SETTINGS, recipientSettings);
- exchange.setProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, cloudEventData.getBoolean("subscribed_by_default"));
- exchange.setProperty(ExchangeProperty.SUBSCRIBERS, subscribers);
- exchange.setProperty(ExchangeProperty.UNSUBSCRIBERS, unsubscribers);
- exchange.setProperty(ExchangeProperty.AUTHORIZATION_CRITERIA, recipientsAuthorizationCriterion);
+ exchange.setProperty(ExchangeProperty.RENDERED_BODY, emailNotification.emailBody());
+ exchange.setProperty(ExchangeProperty.RENDERED_SUBJECT, emailNotification.emailSubject());
+ exchange.setProperty(ExchangeProperty.RECIPIENT_SETTINGS, emailNotification.recipientSettings());
+ exchange.setProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, emailNotification.subscribedByDefault());
+ exchange.setProperty(ExchangeProperty.SUBSCRIBERS, emailNotification.subscribers());
+ exchange.setProperty(ExchangeProperty.UNSUBSCRIBERS, emailNotification.unsubscribers());
+ exchange.setProperty(ExchangeProperty.AUTHORIZATION_CRITERION, emailNotification.externalAuthorizationCriterion());
exchange.setProperty(ExchangeProperty.EMAIL_RECIPIENTS, emails);
- exchange.setProperty(ExchangeProperty.EMAIL_SENDER, cloudEventData.getString("email_sender"));
+ exchange.setProperty(ExchangeProperty.EMAIL_SENDER, emailNotification.emailSender());
exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, emailConnectorConfig.isEnableBopServiceWithSslChecks());
}
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java
index e705bb29ab..bfca6ef626 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java
@@ -49,5 +49,5 @@ public class ExchangeProperty {
public static final String USE_EMAIL_BOP_V1_SSL = "use_email_bop_V1_ssl";
- public static final String AUTHORIZATION_CRITERIA = "authorization_criteria";
+ public static final String AUTHORIZATION_CRITERION = "authorization_criterion";
}
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/model/EmailNotification.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/model/EmailNotification.java
new file mode 100644
index 0000000000..2ad62be612
--- /dev/null
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/model/EmailNotification.java
@@ -0,0 +1,39 @@
+package com.redhat.cloud.notifications.connector.email.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
+import io.vertx.core.json.JsonObject;
+import java.util.Collection;
+
+/**
+ * Represents the data structure that the email connector is expecting.
+ * @param emailBody the rendered body of the email to be sent.
+ * @param emailSubject the rendered subject of the email to be sent.
+ * @param emailSender the sender that will appear in the email when
+ * the user receives it.
+ * @param orgId the organization ID associated with the
+ * triggered event.
+ * @param recipientSettings the collection of recipient settings extracted
+ * from both the event and the related endpoints
+ * to the event.
+ * @param subscribers the list of usernames who subscribed to the
+ * event type.
+ * @param unsubscribers the list of usernames who unsubscribed from the
+ * event type.
+ * @param subscribedByDefault true if the event type is subscribed by
+ * default.
+ * @param externalAuthorizationCriterion forward received authorization criterion.
+ *
+ */
+public record EmailNotification(
+ @JsonProperty("email_body") String emailBody,
+ @JsonProperty("email_subject") String emailSubject,
+ @JsonProperty("email_sender") String emailSender,
+ @JsonProperty("orgId") String orgId,
+ @JsonProperty("recipient_settings") Collection recipientSettings,
+ @JsonProperty("subscribers") Collection subscribers,
+ @JsonProperty("unsubscribers") Collection unsubscribers,
+ @JsonProperty("subscribed_by_default") boolean subscribedByDefault,
+ @JsonProperty("external_authorization_criterion") JsonObject externalAuthorizationCriterion
+) { }
+
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsQuery.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsQuery.java
index 60c6ccf02d..6ecbdab896 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsQuery.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsQuery.java
@@ -24,5 +24,5 @@ public class RecipientsQuery {
public boolean subscribedByDefault;
- public JsonObject authorizationCriteria;
+ public JsonObject authorizationCriterion;
}
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverRequestPreparer.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverRequestPreparer.java
index 966c678343..998ae98065 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverRequestPreparer.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverRequestPreparer.java
@@ -26,7 +26,7 @@ public void process(final Exchange exchange) throws JsonProcessingException {
List recipientSettings = exchange.getProperty(ExchangeProperty.RECIPIENT_SETTINGS, List.class);
Set subscribers = exchange.getProperty(ExchangeProperty.SUBSCRIBERS, Set.class);
Set unsubscribers = exchange.getProperty(ExchangeProperty.UNSUBSCRIBERS, Set.class);
- JsonObject authorizationCriteria = exchange.getProperty(ExchangeProperty.AUTHORIZATION_CRITERIA, JsonObject.class);
+ JsonObject authorizationCriterion = exchange.getProperty(ExchangeProperty.AUTHORIZATION_CRITERION, JsonObject.class);
boolean subscribedByDefault = exchange.getProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, boolean.class);
final String orgId = exchange.getProperty(ORG_ID, String.class);
@@ -37,7 +37,7 @@ public void process(final Exchange exchange) throws JsonProcessingException {
recipientsQuery.orgId = orgId;
recipientsQuery.recipientSettings = Set.copyOf(recipientSettings);
recipientsQuery.subscribedByDefault = subscribedByDefault;
- recipientsQuery.authorizationCriteria = authorizationCriteria;
+ recipientsQuery.authorizationCriterion = authorizationCriterion;
// Serialize the payload.
exchange.getMessage().setBody(objectMapper.writeValueAsString(recipientsQuery));
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java
index 7a3693f59c..3562e23172 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java
@@ -16,6 +16,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
import static java.util.stream.Collectors.toSet;
@ApplicationScoped
@@ -49,6 +50,7 @@ public void process(final Exchange exchange) throws JsonProcessingException {
emails.removeAll(forbiddenEmail);
}
recipientsList.addAll(emails);
+ exchange.setProperty(TOTAL_RECIPIENTS_KEY, recipientsList.size());
// We have to remove one from the limit, because a default recipient (like noreply@redhat.com) will be automatically added
exchange.setProperty(ExchangeProperty.FILTERED_USERS, partition(recipientsList, emailConnectorConfig.getMaxRecipientsPerEmail() - 1));
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java
new file mode 100644
index 0000000000..2f97467c7d
--- /dev/null
+++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java
@@ -0,0 +1,59 @@
+package com.redhat.cloud.notifications.connector.email;
+
+import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
+import com.redhat.cloud.notifications.connector.http.SslTrustAllManager;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.mockito.InjectSpy;
+import jakarta.inject.Inject;
+import org.apache.camel.Endpoint;
+import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.when;
+
+@QuarkusTest
+public class BuildBopEndpointTest extends CamelQuarkusTestSupport {
+
+ @InjectSpy
+ EmailConnectorConfig emailConnectorConfig;
+
+ @Inject
+ EmailRouteBuilder emailRouteBuilder;
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ /**
+ * Disables the route builder to ensure that the Camel Context does not get
+ * started before the routes have been advised. More information is
+ * available at the dkulp's Apache Camel Test documentation page.
+ * @return {@code false} in order to stop the Camel Context from booting
+ * before the routes have been advised.
+ */
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ /**
+ * Tests that the function under test creates the BOP endpoint with the
+ * {@link SslTrustAllManager} class as the SSL context parameters, and that
+ * that class is essentially a NOOP class.
+ * @throws Exception if the endpoint could not be created.
+ */
+ @Test
+ void testBuildBOPEndpoint() throws Exception {
+ String initialBopUrl = emailConnectorConfig.getBopURL();
+ when(emailConnectorConfig.getBopURL()).thenReturn("https://test.com");
+
+ Endpoint bopEndpoint = this.emailRouteBuilder.setUpBOPEndpointV1().resolve(this.context);
+ Assertions.assertEquals(this.emailConnectorConfig.getBopURL(), bopEndpoint.getEndpointBaseUri(), "the base URI of the endpoint is not the same as the one set through the properties");
+
+ final String bopEndpointURI = bopEndpoint.getEndpointUri();
+ Assertions.assertTrue(bopEndpointURI.contains("trustManager%3Dcom.redhat.cloud.notifications.connector.http.SslTrustAllManager"), "the endpoint does not contain a reference to the SslTrustAllManager");
+ Assertions.assertTrue(bopEndpointURI.contains("x509HostnameVerifier=NO_OP"), "the base URI does not contain a mention to the NO_OP hostname verifier");
+ }
+}
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java
index 9169a965bf..55e6db142c 100644
--- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java
+++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java
@@ -1,53 +1,291 @@
package com.redhat.cloud.notifications.connector.email;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.redhat.cloud.notifications.MockServerLifecycleManager;
import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
-import com.redhat.cloud.notifications.connector.http.SslTrustAllManager;
+import com.redhat.cloud.notifications.connector.email.model.EmailNotification;
+import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
+import com.redhat.cloud.notifications.connector.email.model.settings.User;
+import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
+import io.vertx.core.json.JsonObject;
import jakarta.inject.Inject;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AdviceWithRouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockserver.mock.action.ExpectationResponseCallback;
+import org.mockserver.model.HttpRequest;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.redhat.cloud.notifications.connector.ConnectorRoutesTest.KAFKA_SOURCE_MOCK;
+import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.CONNECTOR_TO_ENGINE;
+import static com.redhat.cloud.notifications.connector.EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR;
+import static com.redhat.cloud.notifications.connector.EngineToConnectorRouteBuilder.KAFKA_REINJECTION;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventFilter.X_RH_NOTIFICATIONS_CONNECTOR_HEADER;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.CLOUD_EVENT_DATA;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.CLOUD_EVENT_ID;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.CLOUD_EVENT_TYPE;
+import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_FAILURE_RECIPIENTS_KEY;
+import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
+import static com.redhat.cloud.notifications.connector.email.constants.Routes.SEND_EMAIL_BOP;
+import static com.redhat.cloud.notifications.connector.email.constants.Routes.SPLIT_AND_SEND;
+import static org.apache.camel.builder.AdviceWith.adviceWith;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockserver.model.HttpResponse.response;
@QuarkusTest
+@QuarkusTestResource(TestLifecycleManager.class)
public class EmailRouteBuilderTest extends CamelQuarkusTestSupport {
@Inject
EmailConnectorConfig emailConnectorConfig;
@Inject
- EmailRouteBuilder emailRouteBuilder;
-
- @Override
- public boolean isUseAdviceWith() {
- return true;
- }
+ ObjectMapper objectMapper;
- /**
- * Disables the route builder to ensure that the Camel Context does not get
- * started before the routes have been advised. More information is
- * available at the dkulp's Apache Camel Test documentation page.
- * @return {@code false} in order to stop the Camel Context from booting
- * before the routes have been advised.
- */
@Override
public boolean isUseRouteBuilder() {
return false;
}
- /**
- * Tests that the function under test creates the BOP endpoint with the
- * {@link SslTrustAllManager} class as the SSL context parameters, and that
- * that class is essentially a NOOP class.
- * @throws Exception if the endpoint could not be created.
- */
+ static boolean camelRoutesInitialised = false;
+
+ static MockEndpoint splitRoute;
+ static MockEndpoint bopRoute;
+ static MockEndpoint kafkaConnectorToEngine;
+ static MockEndpoint kafkaEngineToConnector;
+
+ void initCamelRoutes() throws Exception {
+ adviceWith(SPLIT_AND_SEND, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpoints(
+ "direct:" + SEND_EMAIL_BOP
+ );
+ }
+ });
+
+ adviceWith(emailConnectorConfig.getConnectorName(), context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpoints(
+ "direct:" + CONNECTOR_TO_ENGINE,
+ "direct:" + SPLIT_AND_SEND
+ );
+ }
+ });
+
+ adviceWith(CONNECTOR_TO_ENGINE, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpoints("kafka:" + emailConnectorConfig.getOutgoingKafkaTopic());
+ }
+ });
+
+ adviceWith(SPLIT_AND_SEND, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpointsAndSkip(
+ "direct:" + SEND_EMAIL_BOP
+ );
+ }
+ });
+
+ adviceWith(ENGINE_TO_CONNECTOR, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() {
+ replaceFromWith(KAFKA_SOURCE_MOCK);
+ }
+ });
+
+ adviceWith(ENGINE_TO_CONNECTOR, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpoints(KAFKA_SOURCE_MOCK);
+ }
+ });
+
+ adviceWith(KAFKA_REINJECTION, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() {
+ // because we will change kafka endpoint to an internal endpoint, exchange properties will be sent as well to this endpoint
+ // we have to remove them to be sure that properties will be loaded from kafka header and message only in case of re-injection
+ weaveByToUri("kafka:*").before().removeProperties("*", "camel*");
+ weaveByToUri("kafka:*").replace().to(KAFKA_SOURCE_MOCK);
+ }
+ });
+
+ splitRoute = getMockEndpoint("mock:direct:" + SPLIT_AND_SEND);
+ bopRoute = getMockEndpoint("mock:direct:" + SEND_EMAIL_BOP);
+ kafkaConnectorToEngine = getMockEndpoint("mock:kafka:" + emailConnectorConfig.getOutgoingKafkaTopic());
+ kafkaEngineToConnector = getMockEndpoint("mock:" + KAFKA_SOURCE_MOCK);
+ }
+
+ void initMocks(ExpectationResponseCallback verifyEmptyRequest, ExpectationResponseCallback bopResponse) throws Exception {
+ if (bopResponse == null) {
+ bopResponse = req -> response().withStatusCode(200);
+ }
+
+ MockServerLifecycleManager.getClient().reset();
+ getMockHttpRequest("/internal/recipients-resolver", "PUT", verifyEmptyRequest);
+ getMockBOPRequest("/v1/sendEmails", "POST", bopResponse);
+ if (!camelRoutesInitialised) {
+ initCamelRoutes();
+ camelRoutesInitialised = true;
+ }
+
+ splitRoute.reset();
+ bopRoute.reset();
+ kafkaConnectorToEngine.reset();
+ kafkaEngineToConnector.reset();
+ }
+
@Test
- void testBuildBOPEndpoint() throws Exception {
- try (Endpoint bopEndpoint = this.emailRouteBuilder.setUpBOPEndpointV1().resolve(this.context)) {
- Assertions.assertEquals(this.emailConnectorConfig.getBopURL(), bopEndpoint.getEndpointBaseUri(), "the base URI of the endpoint is not the same as the one set through the properties");
+ void testEmptyRecipients() throws Exception {
+
+ ExpectationResponseCallback verifyEmptyRequest = req -> response().withBody("[]").withStatusCode(200);
+
+ initMocks(verifyEmptyRequest, null);
+
+ splitRoute.expectedMessageCount(0);
+ kafkaConnectorToEngine.expectedMessageCount(1);
+
+ buildCloudEventAndSendIt(null);
+
+ splitRoute.assertIsSatisfied();
+ kafkaConnectorToEngine.assertIsSatisfied();
+ }
- final String bopEndpointURI = bopEndpoint.getEndpointUri();
- Assertions.assertTrue(bopEndpointURI.contains("trustManager%3Dcom.redhat.cloud.notifications.connector.http.SslTrustAllManager"), "the endpoint does not contain a reference to the SslTrustAllManager");
- Assertions.assertTrue(bopEndpointURI.contains("x509HostnameVerifier=NO_OP"), "the base URI does not contain a mention to the NO_OP hostname verifier");
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWithRecipients(boolean emailsInternalOnlyEnabled) throws Exception {
+ try {
+ emailConnectorConfig.setEmailsInternalOnlyEnabled(emailsInternalOnlyEnabled);
+ Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7");
+ String strUsers = objectMapper.writeValueAsString(users);
+ ExpectationResponseCallback verifyEmptyRequest = req -> response().withBody(strUsers).withStatusCode(200);
+
+ initMocks(verifyEmptyRequest, null);
+
+ Set additionalEmails = Set.of("redhat_user@redhat.com", "external_user@noway.com");
+ int usersAndRecipientsTotalNumber = users.size() + additionalEmails.size();
+
+ splitRoute.expectedMessageCount(1);
+ bopRoute.expectedMessageCount(3);
+ kafkaConnectorToEngine.expectedMessageCount(1);
+
+ buildCloudEventAndSendIt(additionalEmails);
+
+ splitRoute.assertIsSatisfied();
+ bopRoute.assertIsSatisfied();
+ kafkaConnectorToEngine.assertIsSatisfied(2000);
+
+ checkRecipientsAndHistory(usersAndRecipientsTotalNumber, usersAndRecipientsTotalNumber, bopRoute, kafkaConnectorToEngine, 0, emailsInternalOnlyEnabled, "external_user@noway.com");
+ } finally {
+ emailConnectorConfig.setEmailsInternalOnlyEnabled(false);
+ }
+ }
+
+ private static void checkRecipientsAndHistory(int usersAndRecipientsTotalNumber, int recipientsReceivedByBopTotalNumber, MockEndpoint bopRoute, MockEndpoint kafkaEndpoint, int totalRecipientsFailure, boolean emailsInternalOnlyEnabled, String filteredRecipient) {
+ // check recipients sent to bop
+ List receivedExchanges = bopRoute.getReceivedExchanges();
+ Set receivedEmails = new HashSet<>();
+ for (Exchange receivedExchange : receivedExchanges) {
+ Set receivedEmailsOnExchangeMsg = receivedExchange.getIn().getBody(Set.class);
+ assertTrue(receivedEmailsOnExchangeMsg.size() <= 3);
+ receivedEmails.addAll(receivedEmailsOnExchangeMsg);
+ }
+
+ if (emailsInternalOnlyEnabled) {
+ assertFalse(receivedEmails.contains(filteredRecipient));
+ assertEquals(recipientsReceivedByBopTotalNumber - 1, receivedEmails.size());
+ } else {
+ assertTrue(receivedEmails.contains(filteredRecipient));
+ assertEquals(recipientsReceivedByBopTotalNumber, receivedEmails.size());
+ }
+
+ // check metrics sent to engine
+ Exchange kafkaMessage = kafkaEndpoint.getReceivedExchanges().stream().findFirst().get();
+ JsonObject payload = new JsonObject(kafkaMessage.getIn().getBody(String.class));
+ JsonObject data = new JsonObject(payload.getString("data"));
+ if (totalRecipientsFailure == 0) {
+ assertTrue(data.getBoolean("successful"));
+ assertNull(data.getJsonObject("details").getInteger(TOTAL_FAILURE_RECIPIENTS_KEY));
+ } else {
+ assertFalse(data.getBoolean("successful"));
+ assertEquals(totalRecipientsFailure, data.getJsonObject("details").getInteger(TOTAL_FAILURE_RECIPIENTS_KEY));
}
+ if (emailsInternalOnlyEnabled) {
+ assertEquals(usersAndRecipientsTotalNumber - 1, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ } else {
+ assertEquals(usersAndRecipientsTotalNumber, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ }
+ }
+
+ private HttpRequest getMockHttpRequest(String path, String method, ExpectationResponseCallback expectationResponseCallback) {
+ HttpRequest postReq = new HttpRequest()
+ .withPath(path)
+ .withMethod(method);
+ MockServerLifecycleManager.getClient()
+ .withSecure(false)
+ .when(postReq)
+ .respond(expectationResponseCallback);
+ return postReq;
+ }
+
+ private HttpRequest getMockBOPRequest(String path, String method, ExpectationResponseCallback expectationResponseCallback) {
+ HttpRequest postReq = new HttpRequest()
+ .withPath(path)
+ .withMethod(method);
+ MockServerLifecycleManager.getClient()
+ .withSecure(false)
+ .when(postReq)
+ .respond(expectationResponseCallback);
+ return postReq;
+ }
+
+ private void buildCloudEventAndSendIt(Set emailRecipients) {
+ final JsonObject cloudEvent = generateIncomingCloudEvent(emailRecipients);
+
+ final Map headers = new HashMap<>();
+ headers.put(X_RH_NOTIFICATIONS_CONNECTOR_HEADER, emailConnectorConfig.getConnectorName());
+ template.sendBodyAndHeaders(KAFKA_SOURCE_MOCK, cloudEvent.encode(), headers);
+ }
+
+ private JsonObject generateIncomingCloudEvent(Set emailRecipients) {
+ RecipientSettings recipientSettings = new RecipientSettings(false, false, null, null, emailRecipients);
+
+ final EmailNotification emailNotification = new EmailNotification(
+ "test email body",
+ "test email subject",
+ "Not used",
+ "123456",
+ List.of(recipientSettings),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ false,
+ null
+ );
+ final JsonObject payload = JsonObject.mapFrom(emailNotification);
+
+ final String cloudEventId = UUID.randomUUID().toString();
+
+ final JsonObject cloudEvent = new JsonObject();
+ cloudEvent.put(CLOUD_EVENT_ID, cloudEventId);
+ cloudEvent.put(CLOUD_EVENT_TYPE, "com.redhat.console.notification.toCamel." + emailConnectorConfig.getConnectorName());
+ cloudEvent.put(CLOUD_EVENT_DATA, JsonObject.mapFrom(payload));
+ return cloudEvent;
}
}
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/RecipientsListTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/RecipientsListTest.java
deleted file mode 100644
index a02cf45eb4..0000000000
--- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/RecipientsListTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package com.redhat.cloud.notifications.connector.email;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.redhat.cloud.notifications.MockServerLifecycleManager;
-import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
-import com.redhat.cloud.notifications.connector.email.model.settings.User;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusTest;
-import jakarta.inject.Inject;
-import org.apache.camel.Exchange;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.AdviceWithRouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockserver.mock.action.ExpectationResponseCallback;
-import org.mockserver.model.HttpRequest;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.SUCCESS;
-import static com.redhat.cloud.notifications.connector.EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR;
-import static com.redhat.cloud.notifications.connector.ExchangeProperty.START_TIME;
-import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.EMAIL_RECIPIENTS;
-import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.RECIPIENT_SETTINGS;
-import static com.redhat.cloud.notifications.connector.email.constants.Routes.SEND_EMAIL_BOP;
-import static com.redhat.cloud.notifications.connector.email.constants.Routes.SPLIT_AND_SEND;
-import static org.apache.camel.builder.AdviceWith.adviceWith;
-import static org.apache.camel.test.junit5.TestSupport.createExchangeWithBody;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockserver.model.HttpResponse.response;
-
-@QuarkusTest
-@QuarkusTestResource(TestLifecycleManager.class)
-public class RecipientsListTest extends CamelQuarkusTestSupport {
-
- @Inject
- EmailConnectorConfig emailConnectorConfig;
-
- @Inject
- ProducerTemplate producerTemplate;
-
- @Inject
- ObjectMapper objectMapper;
-
- @Override
- public boolean isUseRouteBuilder() {
- return false;
- }
-
- Exchange test(ExpectationResponseCallback verifyEmptyRequest) throws Exception {
-
- getMockHttpRequest("/internal/recipients-resolver", "PUT", verifyEmptyRequest);
-
- Exchange exchange = createExchangeWithBody(context, new HashSet<>());
- exchange.setProperty(EMAIL_RECIPIENTS, new HashSet<>());
- exchange.setProperty(RECIPIENT_SETTINGS, new ArrayList<>());
- exchange.setProperty(START_TIME, System.currentTimeMillis());
-
- adviceWith(SPLIT_AND_SEND, context(), new AdviceWithRouteBuilder() {
- @Override
- public void configure() throws Exception {
- mockEndpointsAndSkip(
- "direct:" + SEND_EMAIL_BOP
- );
- }
- });
-
- adviceWith(emailConnectorConfig.getConnectorName(), context(), new AdviceWithRouteBuilder() {
- @Override
- public void configure() throws Exception {
- mockEndpoints(
- "direct:" + SUCCESS,
- "direct:" + SPLIT_AND_SEND
- );
- }
- });
-
- return exchange;
-
- }
-
- @Test
- void testEmpty() throws Exception {
-
- ExpectationResponseCallback verifyEmptyRequest = req -> response().withBody("[]").withStatusCode(200);
-
- Exchange exchange = test(verifyEmptyRequest);
- MockEndpoint successEndpoint = getMockEndpoint("mock:direct:" + SUCCESS);
- MockEndpoint splitRoute = getMockEndpoint("mock:direct:" + SPLIT_AND_SEND);
-
- splitRoute.expectedMessageCount(0);
- successEndpoint.expectedMessageCount(1);
-
- producerTemplate.send("seda:" + ENGINE_TO_CONNECTOR, exchange);
- splitRoute.assertIsSatisfied();
- successEndpoint.assertIsSatisfied();
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testNotEmpty(boolean emailsInternalOnlyEnabled) throws Exception {
- try {
- emailConnectorConfig.setEmailsInternalOnlyEnabled(emailsInternalOnlyEnabled);
- Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7");
- String strUsers = objectMapper.writeValueAsString(users);
- ExpectationResponseCallback verifyEmptyRequest = req -> response().withBody(strUsers).withStatusCode(200);
-
- Exchange exchange = test(verifyEmptyRequest);
- Set emailRecipients = new HashSet<>();
- emailRecipients.add("redhat_user@redhat.com");
- emailRecipients.add("external_user@noway.com");
- exchange.setProperty(EMAIL_RECIPIENTS, emailRecipients);
- int usersAndRecipientsTotalNumber = emailRecipients.size() + users.size();
-
- MockEndpoint successEndpoint = getMockEndpoint("mock:direct:" + SUCCESS);
- MockEndpoint splitRoute = getMockEndpoint("mock:direct:" + SPLIT_AND_SEND);
- MockEndpoint bopRoute = getMockEndpoint("mock:direct:" + SEND_EMAIL_BOP);
-
- splitRoute.expectedMessageCount(1);
- successEndpoint.expectedMessageCount(1);
- bopRoute.expectedMessageCount(3);
-
- producerTemplate.send("seda:" + ENGINE_TO_CONNECTOR, exchange);
- splitRoute.assertIsSatisfied();
- successEndpoint.assertIsSatisfied();
- bopRoute.assertIsSatisfied();
- List receivedExchanges = bopRoute.getReceivedExchanges();
- Set receivedEmails = new HashSet<>();
- for (Exchange receivedExchange : receivedExchanges) {
- Set receivedEmailsOnExchangeMsg = receivedExchange.getIn().getBody(Set.class);
- assertTrue(receivedEmailsOnExchangeMsg.size() <= 3);
- receivedEmails.addAll(receivedEmailsOnExchangeMsg);
- }
- if (emailsInternalOnlyEnabled) {
- assertFalse(receivedEmails.contains("external_user@noway.com"));
- assertEquals(usersAndRecipientsTotalNumber - 1, receivedEmails.size());
- } else {
- assertTrue(receivedEmails.contains("external_user@noway.com"));
- assertEquals(usersAndRecipientsTotalNumber, receivedEmails.size());
- }
- } finally {
- emailConnectorConfig.setEmailsInternalOnlyEnabled(false);
- }
- }
-
- private HttpRequest getMockHttpRequest(String path, String method, ExpectationResponseCallback expectationResponseCallback) {
- HttpRequest postReq = new HttpRequest()
- .withPath(path)
- .withMethod(method);
- MockServerLifecycleManager.getClient()
- .withSecure(false)
- .when(postReq)
- .respond(expectationResponseCallback);
- return postReq;
- }
-}
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java
index 78921537aa..9e3418f885 100644
--- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java
+++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java
@@ -15,6 +15,7 @@ public Map start() {
MockServerLifecycleManager.start();
Map properties = new HashMap<>();
properties.put("notifications.connector.recipients-resolver.url", getMockServerUrl());
+ properties.put("notifications.connector.user-provider.bop.url", getMockServerUrl());
return properties;
}
diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailProcessor.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailProcessor.java
index 901faff94f..9cbd618403 100644
--- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailProcessor.java
+++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailProcessor.java
@@ -53,7 +53,7 @@ public class EmailProcessor extends SystemEndpointTypeProcessor {
SubscriptionRepository subscriptionRepository;
@Inject
- ExternalAuthorizationCriterionExtractor externalAuthorizationCriteriaExtractor;
+ ExternalAuthorizationCriterionExtractor externalAuthorizationCriterionExtractor;
@Override
public void process(final Event event, final List endpoints) {
@@ -123,7 +123,7 @@ public void process(final Event event, final List endpoints, final boo
subscribers,
unsubscribers,
event.getEventType().isSubscribedByDefault(),
- externalAuthorizationCriteriaExtractor.extract(event)
+ externalAuthorizationCriterionExtractor.extract(event)
);
Log.debugf("[org_id: %s] Sending email notification to connector", emailNotification);
diff --git a/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/RecipientsResolverResource.java b/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/RecipientsResolverResource.java
index 90d4c0fa0a..3b53a7b358 100644
--- a/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/RecipientsResolverResource.java
+++ b/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/RecipientsResolverResource.java
@@ -31,6 +31,6 @@ public Set getRecipients(@NotNull @Valid RecipientsQuery recipientsQuery)
recipientsQuery.subscribers,
recipientsQuery.unsubscribers,
recipientsQuery.subscribedByDefault,
- recipientsQuery.externalAuthorizationCriteria);
+ recipientsQuery.externalAuthorizationCriterion);
}
}
diff --git a/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/pojo/RecipientsQuery.java b/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/pojo/RecipientsQuery.java
index 7954c056ff..791749392f 100644
--- a/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/pojo/RecipientsQuery.java
+++ b/recipients-resolver/src/main/java/com/redhat/cloud/notifications/recipients/rest/pojo/RecipientsQuery.java
@@ -26,5 +26,5 @@ public class RecipientsQuery {
public boolean subscribedByDefault;
- public ExternalAuthorizationCriterion externalAuthorizationCriteria;
+ public ExternalAuthorizationCriterion externalAuthorizationCriterion;
}