Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RHCLOUD-37053] Simplify email connector internal logic #3227

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.ClientWebApplicationException;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
Expand Down Expand Up @@ -39,20 +40,10 @@ public class HttpExceptionProcessor extends ExceptionProcessor {

@Override
protected void process(Throwable t, Exchange exchange) {
if (t instanceof HttpOperationFailedException e) {
exchange.setProperty(HTTP_STATUS_CODE, e.getStatusCode());
if (e.getStatusCode() >= 300 && e.getStatusCode() < 400) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange);
} else if (e.getStatusCode() >= 400 && e.getStatusCode() < 500 && e.getStatusCode() != SC_TOO_MANY_REQUESTS) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX);
logHttpError(connectorConfig.getClientErrorLogLevel(), e, exchange);
} else if (e.getStatusCode() == SC_TOO_MANY_REQUESTS || e.getStatusCode() >= 500) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange);
} else {
logHttpError(ERROR, e, exchange);
}
if (t instanceof ClientWebApplicationException e) {
manageReturnedStatusCode(exchange, e.getResponse().getStatus(), e.getResponse().readEntity(String.class));
} else if (t instanceof HttpOperationFailedException e) {
manageReturnedStatusCode(exchange, e.getStatusCode(), e.getResponseBody());
} else if (t instanceof ConnectTimeoutException) {
exchange.setProperty(HTTP_ERROR_TYPE, CONNECT_TIMEOUT);
} else if (t instanceof SocketTimeoutException) {
Expand All @@ -70,16 +61,32 @@ protected void process(Throwable t, Exchange exchange) {
}
}

private void logHttpError(Logger.Level level, HttpOperationFailedException e, Exchange exchange) {
private void manageReturnedStatusCode(Exchange exchange, int statusCode, String responseBody) {
exchange.setProperty(HTTP_STATUS_CODE, statusCode);
if (statusCode >= 300 && statusCode < 400) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange);
} else if (statusCode >= 400 && statusCode < 500 && statusCode != SC_TOO_MANY_REQUESTS) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX);
logHttpError(connectorConfig.getClientErrorLogLevel(), statusCode, responseBody, exchange);
} else if (statusCode == SC_TOO_MANY_REQUESTS || statusCode >= 500) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange);
} else {
logHttpError(ERROR, statusCode, responseBody, exchange);
}
}

private void logHttpError(Logger.Level level, int statusCode, String responseBody, Exchange exchange) {
Log.logf(
level,
HTTP_ERROR_LOG_MSG,
getRouteId(exchange),
getOrgId(exchange),
getExchangeId(exchange),
getTargetUrl(exchange),
e.getStatusCode(),
e.getResponseBody()
statusCode,
responseBody
);
}
}
17 changes: 17 additions & 0 deletions connector-email/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@
<scope>test</scope>
</dependency>

<!-- Retries -->
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>

<!-- MockServer -->
<dependency>
<groupId>org.mock-server</groupId>
Expand All @@ -87,6 +94,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public void extract(final Exchange exchange, final JsonObject cloudEventData) {
exchange.setProperty(ExchangeProperty.EMAIL_RECIPIENTS, emails);
exchange.setProperty(ExchangeProperty.EMAIL_SENDER, emailNotification.emailSender());

exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, emailConnectorConfig.isEnableBopServiceWithSslChecks());
exchange.setProperty(ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE, emailConnectorConfig.useSimplifiedEmailRoute(emailNotification.orgId()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.redhat.cloud.notifications.connector.email;

import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
import com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty;
import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
import com.redhat.cloud.notifications.connector.email.model.settings.User;
import com.redhat.cloud.notifications.connector.email.processors.bop.BOPManager;
import com.redhat.cloud.notifications.connector.email.processors.recipientsresolver.ExternalRecipientsResolver;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.quarkus.logging.Log;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID;
import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
import static java.util.stream.Collectors.toSet;

@ApplicationScoped
public class EmailManagementProcessor implements Processor {

@Inject
EmailConnectorConfig emailConnectorConfig;

@Inject
ExternalRecipientsResolver externalRecipientsResolver;

@Inject
BOPManager bopManager;

@Inject
MeterRegistry meterRegistry;

static final String BOP_RESPONSE_TIME_METRIC = "email.bop.response.time";
static final String RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC = "email.recipients_resolver.response.time";

@Override
public void process(final Exchange exchange) {
// fetch recipients
Set<String> recipientsList = fetchRecipients(exchange);

if (recipientsList.isEmpty()) {
Log.infof("Skipped Email notification because the recipients list was empty [orgId=$%s, historyId=%s]", exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class));
} else {
// send to bop
sendToBop(exchange, recipientsList);
}
}

private void sendToBop(Exchange exchange, Set<String> recipientsList) {
// split recipient list and send it ot BOP
g-duval marked this conversation as resolved.
Show resolved Hide resolved
List<List<String>> packedRecipients = partition(recipientsList, emailConnectorConfig.getMaxRecipientsPerEmail() - 1);
final String subject = exchange.getProperty(ExchangeProperty.RENDERED_SUBJECT, String.class);
final String body = exchange.getProperty(ExchangeProperty.RENDERED_BODY, String.class);
final String sender = exchange.getProperty(ExchangeProperty.EMAIL_SENDER, String.class);

for (int i = 0; i < packedRecipients.size(); i++) {
final Timer.Sample bopResponseTimeMetric = Timer.start(meterRegistry);
bopManager.sendToBop(packedRecipients.get(i), subject, body, sender);
bopResponseTimeMetric.stop(meterRegistry.timer(BOP_RESPONSE_TIME_METRIC));
Log.infof("Sent Email notification %d/%d [orgId=%s, historyId=%s]", i + 1, packedRecipients.size(), exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class));
}
}

private static List<List<String>> partition(Set<String> collection, int n) {
AtomicInteger counter = new AtomicInteger();
return collection.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / n))
.values().stream().toList();
}

private Set<String> fetchRecipients(Exchange exchange) {
List<RecipientSettings> recipientSettings = exchange.getProperty(ExchangeProperty.RECIPIENT_SETTINGS, List.class);
Set<String> subscribers = exchange.getProperty(ExchangeProperty.SUBSCRIBERS, Set.class);
Set<String> unsubscribers = exchange.getProperty(ExchangeProperty.UNSUBSCRIBERS, Set.class);
JsonObject recipientsAuthorizationCriterion = exchange.getProperty(ExchangeProperty.RECIPIENTS_AUTHORIZATION_CRITERION, JsonObject.class);

boolean subscribedByDefault = exchange.getProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, boolean.class);
final String orgId = exchange.getProperty(ORG_ID, String.class);

final Timer.Sample recipientsResolverResponseTimeMetric = Timer.start(meterRegistry);
Set<String> recipientsList = externalRecipientsResolver.recipientUsers(
orgId,
Set.copyOf(recipientSettings),
subscribers,
unsubscribers,
subscribedByDefault,
recipientsAuthorizationCriterion)
.stream().map(User::getEmail).filter(email -> email != null && !email.isBlank()).collect(toSet());
recipientsResolverResponseTimeMetric.stop(meterRegistry.timer(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC));

Set<String> emails = exchange.getProperty(ExchangeProperty.EMAIL_RECIPIENTS, Set.of(), Set.class);
if (emailConnectorConfig.isEmailsInternalOnlyEnabled()) {
Set<String> forbiddenEmail = emails.stream().filter(email -> !email.trim().toLowerCase().endsWith("@redhat.com")).collect(Collectors.toSet());
if (!forbiddenEmail.isEmpty()) {
Log.warnf(" %s emails are forbidden for message historyId: %s ", forbiddenEmail, exchange.getProperty(com.redhat.cloud.notifications.connector.ExchangeProperty.ID, String.class));
}
emails.removeAll(forbiddenEmail);
}
recipientsList.addAll(emails);
exchange.setProperty(TOTAL_RECIPIENTS_KEY, recipientsList.size());
return recipientsList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
import org.apache.camel.support.jsse.KeyStoreParameters;
import org.apache.camel.support.jsse.SSLContextParameters;
import org.apache.camel.support.jsse.TrustManagersParameters;
import org.apache.http.conn.ssl.NoopHostnameVerifier;

import java.util.Set;

import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.SUCCESS;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.FILTERED_USERS;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_EMAIL_BOP_V1_SSL;
import static com.redhat.cloud.notifications.connector.http.SslTrustAllManager.getSslContextParameters;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE;
import static org.apache.camel.LoggingLevel.DEBUG;
import static org.apache.camel.LoggingLevel.INFO;
import static org.apache.camel.builder.endpoint.dsl.HttpEndpointBuilderFactory.HttpEndpointBuilder;
Expand Down Expand Up @@ -61,6 +59,9 @@ public class EmailRouteBuilder extends EngineToConnectorRouteBuilder {
@Inject
EmailMetricsProcessor emailMetricsProcessor;

@Inject
EmailManagementProcessor emailManagementProcessor;

/**
* Configures the flow for this connector.
*/
Expand All @@ -74,23 +75,27 @@ public void configureRoutes() {
.to(direct(ENTRYPOINT));
}

/*
* Prepares the payload accepted by BOP and sends the request to
* the service.
*/
final HttpEndpointBuilder bopEndpointV1 = setUpBOPEndpointV1();

from(seda(ENGINE_TO_CONNECTOR))
.routeId(emailConnectorConfig.getConnectorName())
.process(recipientsResolverRequestPreparer)
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(setupRecipientResolverEndpoint())
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.process(recipientsResolverResponseProcessor)
.choice().when(shouldSkipEmail())
.log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.otherwise()
.to(direct(Routes.SPLIT_AND_SEND))
.choice()
.when(shouldUseSimplifiedEmailManagement())
.process(emailManagementProcessor)
.endChoice()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each when and otherwise cases have to be closed explicitly when a sub choice is involve.

.otherwise()
.process(recipientsResolverRequestPreparer)
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(setupRecipientResolverEndpoint())
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.process(recipientsResolverResponseProcessor)
.choice()
.when(shouldSkipEmail())
.log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.endChoice()
.otherwise()
.to(direct(Routes.SPLIT_AND_SEND))
.endChoice()
.end()
.endChoice()
.end()
.to(direct(SUCCESS));

Expand All @@ -105,16 +110,10 @@ public void configureRoutes() {
// Clear all the headers that may come from the previous route.
.removeHeaders("*")
.process(this.BOPRequestPreparer)
.choice().when(shouldUseBopEmailServiceWithSslChecks())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The involved config flag was turned into a constant a while ago.

.log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]")
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(emailConnectorConfig.getBopURL())
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.otherwise()
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(bopEndpointV1)
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.end()
.log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]")
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(emailConnectorConfig.getBopURL())
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.log(INFO, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.process(emailMetricsProcessor);
}
Expand All @@ -123,27 +122,8 @@ private Predicate shouldSkipEmail() {
return exchange -> exchange.getProperty(FILTERED_USERS, Set.class).isEmpty();
}

private Predicate shouldUseBopEmailServiceWithSslChecks() {
return exchange -> exchange.getProperty(USE_EMAIL_BOP_V1_SSL, Boolean.class);
}

/**
* Creates the endpoint for the BOP service. It makes Apache Camel trust
* BOP service's certificate.
* @return the created endpoint.
*/
protected HttpEndpointBuilder setUpBOPEndpointV1() {
// Remove the schema from the url to avoid the
// "ResolveEndpointFailedException", which complaints about specifying
// the schema twice.
final String fullURL = this.emailConnectorConfig.getBopURL();
if (fullURL.startsWith("https")) {
return https(fullURL.replace("https://", ""))
.sslContextParameters(getSslContextParameters())
.x509HostnameVerifier(NoopHostnameVerifier.INSTANCE);
} else {
return http(fullURL.replace("http://", ""));
}
private Predicate shouldUseSimplifiedEmailManagement() {
return exchange -> exchange.getProperty(USE_SIMPLIFIED_EMAIL_ROUTE, false, Boolean.class);
}

private HttpEndpointBuilder setupRecipientResolverEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.redhat.cloud.notifications.connector.email.config;

import com.redhat.cloud.notifications.connector.http.HttpConnectorConfig;
import com.redhat.cloud.notifications.unleash.UnleashContextBuilder;
import io.quarkus.runtime.LaunchMode;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -74,13 +75,13 @@ public class EmailConnectorConfig extends HttpConnectorConfig {
@ConfigProperty(name = RECIPIENTS_RESOLVER_TRUST_STORE_TYPE)
Optional<String> recipientsResolverTrustStoreType;

private String enableBopEmailServiceWithSslChecks;
private String toggleKafkaIncomingHighVolumeTopic;
private String toggleUseSimplifiedEmailRoute;

@PostConstruct
void emailConnectorPostConstruct() {
enableBopEmailServiceWithSslChecks = toggleRegistry.register("enable-bop-service-ssl-checks", true);
toggleKafkaIncomingHighVolumeTopic = toggleRegistry.register("kafka-incoming-high-volume-topic", true);
toggleUseSimplifiedEmailRoute = toggleRegistry.register("use-simplified-email-route", true);
}

@Override
Expand All @@ -102,7 +103,7 @@ protected Map<String, Object> getLoggedConfiguration() {
config.put(RECIPIENTS_RESOLVER_USER_SERVICE_URL, recipientsResolverServiceURL);
config.put(MAX_RECIPIENTS_PER_EMAIL, maxRecipientsPerEmail);
config.put(NOTIFICATIONS_EMAILS_INTERNAL_ONLY_ENABLED, emailsInternalOnlyEnabled);
config.put(enableBopEmailServiceWithSslChecks, isEnableBopServiceWithSslChecks());
config.put(toggleUseSimplifiedEmailRoute, useSimplifiedEmailRoute(null));
config.put(toggleKafkaIncomingHighVolumeTopic, isIncomingKafkaHighVolumeTopicEnabled());

/*
Expand Down Expand Up @@ -182,8 +183,12 @@ public Optional<String> getRecipientsResolverTrustStoreType() {
return recipientsResolverTrustStoreType;
}

public boolean isEnableBopServiceWithSslChecks() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove constant config

return true;
public boolean useSimplifiedEmailRoute(String orgId) {
if (unleashEnabled) {
return unleash.isEnabled(toggleUseSimplifiedEmailRoute, UnleashContextBuilder.buildUnleashContextWithOrgId(orgId), false);
} else {
return false;
}
}

/**
Expand Down
Loading
Loading