Skip to content

Commit

Permalink
Removing IAggregationCommand interface
Browse files Browse the repository at this point in the history
  • Loading branch information
g-duval committed Nov 20, 2024
1 parent 63c27b8 commit 03d24d4
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.redhat.cloud.notifications.ingress.Metadata;
import com.redhat.cloud.notifications.ingress.Parser;
import com.redhat.cloud.notifications.ingress.Payload;
import com.redhat.cloud.notifications.models.IAggregationCommand;
import com.redhat.cloud.notifications.models.AggregationCommand;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.PushGateway;
Expand Down Expand Up @@ -82,11 +82,11 @@ public void processDailyEmail() {
} else {
aggregationOrgConfigRepository.createMissingDefaultConfiguration(defaultDailyDigestTime);
}
List<IAggregationCommand> aggregationCommands = processAggregateEmailsWithOrgPref(now, registry);
List<AggregationCommand> aggregationCommands = processAggregateEmailsWithOrgPref(now, registry);
Log.infof("found %s commands", aggregationCommands.size());
Log.debugf("Aggregation commands: %s", aggregationCommands);

aggregationCommands.stream().collect(Collectors.groupingBy(IAggregationCommand::getOrgId))
aggregationCommands.stream().collect(Collectors.groupingBy(AggregationCommand::getOrgId))
.values().forEach(this::sendIt);

List<String> orgIdsToUpdate = aggregationCommands.stream().map(aggregationCommand -> aggregationCommand.getOrgId()).collect(Collectors.toList());
Expand Down Expand Up @@ -139,9 +139,9 @@ protected LocalDateTime computeScheduleExecutionTime() {
}
}

List<IAggregationCommand> processAggregateEmailsWithOrgPref(LocalDateTime endTime, CollectorRegistry registry) {
List<AggregationCommand> processAggregateEmailsWithOrgPref(LocalDateTime endTime, CollectorRegistry registry) {

List<IAggregationCommand> pendingAggregationCommands;
List<AggregationCommand> pendingAggregationCommands;
if (aggregatorConfig.isAggregationBasedOnEventEnabled()) {
pendingAggregationCommands = emailAggregationResources.getApplicationsWithPendingAggregationAccordingOrgPref(endTime);
} else {
Expand All @@ -157,7 +157,7 @@ List<IAggregationCommand> processAggregateEmailsWithOrgPref(LocalDateTime endTim
return pendingAggregationCommands;
}

private void sendIt(List<IAggregationCommand> aggregationCommands) {
private void sendIt(List<AggregationCommand> aggregationCommands) {

List<Event> eventList = new ArrayList<>();
aggregationCommands.stream().forEach(aggregationCommand -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import com.redhat.cloud.notifications.models.AggregationCommand;
import com.redhat.cloud.notifications.models.EmailAggregationKey;
import com.redhat.cloud.notifications.models.EventAggregationCommand;
import com.redhat.cloud.notifications.models.EventAggregationCriteria;
import com.redhat.cloud.notifications.models.IAggregationCommand;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
Expand All @@ -25,7 +23,7 @@ public class EmailAggregationRepository {
@Inject
EntityManager entityManager;

public List<IAggregationCommand> getApplicationsWithPendingAggregationAccordinfOrgPref(LocalDateTime now) {
public List<AggregationCommand> getApplicationsWithPendingAggregationAccordinfOrgPref(LocalDateTime now) {
// Must takes every EmailAggregation supposed to be processed on last 15 min
// it covers cases when aggregation job may be run with few minutes late (ie: 05:01, 07,32)
String query = "SELECT DISTINCT ea.orgId, ea.bundleName, ea.applicationName, acp.lastRun FROM EmailAggregation ea, AggregationOrgConfig acp WHERE " +
Expand All @@ -47,7 +45,7 @@ public List<IAggregationCommand> getApplicationsWithPendingAggregationAccordinfO
.collect(toList());
}

public List<IAggregationCommand> getApplicationsWithPendingAggregationAccordingOrgPref(LocalDateTime now) {
public List<AggregationCommand> getApplicationsWithPendingAggregationAccordingOrgPref(LocalDateTime now) {
String query = "SELECT DISTINCT ev.orgId, ev.bundleId, ev.applicationId, acp.lastRun, bu.name, ap.name FROM Event ev " +
"join Application ap on ev.applicationId = ap.id join Bundle bu on ev.bundleId = bu.id " +
"left join AggregationOrgConfig acp on ev.orgId = acp.orgId " +
Expand All @@ -61,7 +59,7 @@ public List<IAggregationCommand> getApplicationsWithPendingAggregationAccordingO

List<Object[]> records = hqlQuery.getResultList();
return records.stream()
.map(emailAggregationRecord -> new EventAggregationCommand(
.map(emailAggregationRecord -> new AggregationCommand(
new EventAggregationCriteria((String) emailAggregationRecord[0], (UUID) emailAggregationRecord[1], (UUID) emailAggregationRecord[2], (String) emailAggregationRecord[4], (String) emailAggregationRecord[5]),
(LocalDateTime) emailAggregationRecord[3],
now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.redhat.cloud.notifications.ingress.Parser;
import com.redhat.cloud.notifications.models.AggregationCommand;
import com.redhat.cloud.notifications.models.AggregationOrgConfig;
import com.redhat.cloud.notifications.models.IAggregationCommand;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.quarkus.test.common.QuarkusTestResource;
Expand Down Expand Up @@ -259,7 +258,7 @@ void shouldProcessFourAggregations() {
helpers.addEmailAggregation("someOrgId", "unknown-bundle", "unknown-application", "somePolicyId", "someHostId");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

final List<IAggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(4, emailAggregations.size());
}
Expand All @@ -271,7 +270,7 @@ void shouldProcessOneAggregationOnly() {
helpers.addEmailAggregation("someOrgId", "rhel", "policies", "somePolicyId", "someHostId");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

final List<IAggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(1, emailAggregations.size());

Expand All @@ -289,7 +288,7 @@ void shouldNotIncreaseAggregationsWhenPolicyIdIsDifferent() {
helpers.addEmailAggregation("shouldBeIgnoredOrgId", "someRhel", "somePolicies", "policyId1", "someHostId");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

final List<IAggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(1, emailAggregations.size());
}
Expand All @@ -301,15 +300,15 @@ void shouldNotIncreaseAggregationsWhenHostIdIsDifferent() {
helpers.addEmailAggregation("shouldBeIgnoredOrgId", "someRhel", "somePolicies", "somePolicyId", "hostId2");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

List<IAggregationCommand> emailAggregations = null;
List<AggregationCommand> emailAggregations = null;
emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
assertEquals(1, emailAggregations.size());
}

@Test
void shouldProcessZeroAggregations() {
helpers.addAggregationOrgConfig(someOrgIdToProceed);
final List<IAggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = testee.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(0, emailAggregations.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.redhat.cloud.notifications.ingress.Action;
import com.redhat.cloud.notifications.ingress.Event;
import com.redhat.cloud.notifications.ingress.Parser;
import com.redhat.cloud.notifications.models.AggregationCommand;
import com.redhat.cloud.notifications.models.AggregationOrgConfig;
import com.redhat.cloud.notifications.models.Application;
import com.redhat.cloud.notifications.models.EventAggregationCommand;
import com.redhat.cloud.notifications.models.EventAggregationCriteria;
import com.redhat.cloud.notifications.models.EventType;
import com.redhat.cloud.notifications.models.IAggregationCommand;
import com.redhat.cloud.notifications.models.SubscriptionType;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
Expand Down Expand Up @@ -98,14 +98,17 @@ void initAggregationParameters() {
dailyEmailAggregationJob.defaultDailyDigestTime = LocalTime.now(ZoneOffset.UTC);
}

List<EventAggregationCommand> getRecordsFromKafka() {
List<EventAggregationCommand> aggregationCommands = new ArrayList<>();
List<AggregationCommand> getRecordsFromKafka() {
List<AggregationCommand> aggregationCommands = new ArrayList<>();

InMemorySink<String> results = connector.sink(DailyEmailAggregationJob.EGRESS_CHANNEL);
for (Message message : results.received()) {
Action action = Parser.decode(String.valueOf(message.getPayload()));
for (Event event : action.getEvents()) {
aggregationCommands.add(objectMapper.convertValue(event.getPayload().getAdditionalProperties(), EventAggregationCommand.class));
AggregationCommand aggCommand = objectMapper.convertValue(event.getPayload().getAdditionalProperties(), AggregationCommand.class);
EventAggregationCriteria aggregationCriteria = objectMapper.convertValue(event.getPayload().getAdditionalProperties().get("aggregationKey"), EventAggregationCriteria.class);
aggCommand.setAggregationKey(aggregationCriteria);
aggregationCommands.add(aggCommand);
}
}

Expand All @@ -123,7 +126,7 @@ void shouldSentFourAggregationsToKafkaTopic() {

dailyEmailAggregationJob.processDailyEmail();

List<EventAggregationCommand> listCommand = getRecordsFromKafka();
List<AggregationCommand> listCommand = getRecordsFromKafka();
assertEquals(4, listCommand.size());
checkAggCommand(listCommand, "anotherOrgId", "rhel", "policies");
checkAggCommand(listCommand, "anotherOrgId", "rhel", "unknown-application");
Expand All @@ -145,7 +148,7 @@ void shouldSentTwoAggregationsToKafkaTopic() {
// Because we added time preferences for orgId someOrgId two hours in the past, those messages must be ignored
dailyEmailAggregationJob.processDailyEmail();

List<EventAggregationCommand> listCommand = getRecordsFromKafka();
List<AggregationCommand> listCommand = getRecordsFromKafka();
assertEquals(2, listCommand.size());

checkAggCommand(listCommand, "anotherOrgId", "rhel", "policies");
Expand Down Expand Up @@ -177,13 +180,13 @@ void shouldSentTwoAggregationsToKafkaTopic() {
checkAggCommand(listCommand, "someOrgId", "rhel", "unknown-application");
}

private void checkAggCommand(List<EventAggregationCommand> commands, String orgId, String bundleName, String applicationName) {
private void checkAggCommand(List<AggregationCommand> commands, String orgId, String bundleName, String applicationName) {
Application application = resourceHelpers.findApp(bundleName, applicationName);

assertTrue(commands.stream().anyMatch(
com -> orgId.equals(com.getAggregationKey().getOrgId()) &&
application.getBundleId().equals(com.getAggregationKey().getBundleId()) &&
application.getId().equals(com.getAggregationKey().getApplicationId()) &&
application.getBundleId().equals(((EventAggregationCriteria) com.getAggregationKey()).getBundleId()) &&
application.getId().equals(((EventAggregationCriteria) com.getAggregationKey()).getApplicationId()) &&
DAILY.equals(com.getSubscriptionType())
));
}
Expand Down Expand Up @@ -284,7 +287,7 @@ void shouldProcessFourAggregations() {
addEventEmailAggregation("someOrgId", "unknown-bundle", "unknown-application", "somePolicyId", "someHostId");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

final List<IAggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(4, emailAggregations.size());
}
Expand All @@ -296,15 +299,15 @@ void shouldProcessOneAggregationOnly() {
addEventEmailAggregation("someOrgId", "rhel", "policies", "somePolicyId", "someHostId");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

final List<IAggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(1, emailAggregations.size());

final EventAggregationCommand aggregationCommand = (EventAggregationCommand) emailAggregations.get(0);
final AggregationCommand aggregationCommand = (AggregationCommand) emailAggregations.get(0);
assertEquals("someOrgId", aggregationCommand.getAggregationKey().getOrgId());
Application application = resourceHelpers.findApp("rhel", "policies");
assertEquals(application.getBundleId(), aggregationCommand.getAggregationKey().getBundleId());
assertEquals(application.getId(), aggregationCommand.getAggregationKey().getApplicationId());
assertEquals(application.getBundleId(), ((EventAggregationCriteria) aggregationCommand.getAggregationKey()).getBundleId());
assertEquals(application.getId(), ((EventAggregationCriteria) aggregationCommand.getAggregationKey()).getApplicationId());
assertEquals(DAILY, aggregationCommand.getSubscriptionType());
}

Expand All @@ -315,7 +318,7 @@ void shouldNotIncreaseAggregationsWhenPolicyIdIsDifferent() {
addEventEmailAggregation("shouldBeIgnoredOrgId", "some-rhel", "some-policies", "policyId1", "someHostId");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

final List<IAggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(1, emailAggregations.size());
}
Expand All @@ -327,14 +330,14 @@ void shouldNotIncreaseAggregationsWhenHostIdIsDifferent() {
addEventEmailAggregation("shouldBeIgnoredOrgId", "some-rhel", "some-policies", "somePolicyId", "hostId2");
helpers.addAggregationOrgConfig(someOrgIdToProceed);

List<IAggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
List<AggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
assertEquals(1, emailAggregations.size());
}

@Test
void shouldProcessZeroAggregations() {
helpers.addAggregationOrgConfig(someOrgIdToProceed);
final List<IAggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());
final List<AggregationCommand> emailAggregations = dailyEmailAggregationJob.processAggregateEmailsWithOrgPref(LocalDateTime.now(UTC), new CollectorRegistry());

assertEquals(0, emailAggregations.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.redhat.cloud.notifications.TestLifecycleManager;
import com.redhat.cloud.notifications.helpers.ResourceHelpers;
import com.redhat.cloud.notifications.models.AggregationCommand;
import com.redhat.cloud.notifications.models.AggregationOrgConfig;
import com.redhat.cloud.notifications.models.Application;
import com.redhat.cloud.notifications.models.EmailAggregation;
import com.redhat.cloud.notifications.models.EmailAggregationKey;
import com.redhat.cloud.notifications.models.Event;
import com.redhat.cloud.notifications.models.EventAggregationCommand;
import com.redhat.cloud.notifications.models.EventAggregationCriteria;
import com.redhat.cloud.notifications.models.EventType;
import com.redhat.cloud.notifications.models.IAggregationCommand;
import com.redhat.cloud.notifications.models.SubscriptionType;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
Expand Down Expand Up @@ -76,7 +76,7 @@ void testApplicationsWithPendingAggregationAccordinfOrgPref() {

EmailAggregationKey key = new EmailAggregationKey(ORG_ID, BUNDLE_NAME, APP_NAME);

List<IAggregationCommand> keys = emailAggregationResources.getApplicationsWithPendingAggregationAccordinfOrgPref(end);
List<AggregationCommand> keys = emailAggregationResources.getApplicationsWithPendingAggregationAccordinfOrgPref(end);
assertEquals(4, keys.size());
assertEquals(ORG_ID, keys.get(0).getOrgId());
assertEquals(BUNDLE_NAME, keys.get(0).getAggregationKey().getBundle());
Expand Down Expand Up @@ -108,11 +108,11 @@ void testApplicationsWithPendingAggregationAccordingOrgPref() {
addEventEmailAggregation(ORG_ID, "other-bundle", APP_NAME, PAYLOAD2);
addEventEmailAggregation(ORG_ID, BUNDLE_NAME, "other-app", PAYLOAD2);

List<IAggregationCommand> keys = emailAggregationResources.getApplicationsWithPendingAggregationAccordingOrgPref(end);
List<AggregationCommand> keys = emailAggregationResources.getApplicationsWithPendingAggregationAccordingOrgPref(end);
assertEquals(4, keys.size());
Application application = resourceHelpers.findApp(BUNDLE_NAME, APP_NAME);

List<IAggregationCommand> matchedKeys = keys.stream().filter(k -> ORG_ID.equals(k.getOrgId())).filter(k -> ((EventAggregationCommand) k).getAggregationKey().getApplicationId().equals(application.getId())).collect(Collectors.toList());
List<AggregationCommand> matchedKeys = keys.stream().filter(k -> ORG_ID.equals(k.getOrgId())).filter(k -> (((EventAggregationCriteria) k.getAggregationKey()).getApplicationId().equals(application.getId()))).collect(Collectors.toList());
assertEquals(1, matchedKeys.size());
assertEquals(BUNDLE_NAME, matchedKeys.get(0).getAggregationKey().getBundle());
assertEquals(APP_NAME, matchedKeys.get(0).getAggregationKey().getApplication());
Expand All @@ -122,7 +122,7 @@ void testApplicationsWithPendingAggregationAccordingOrgPref() {

keys = emailAggregationResources.getApplicationsWithPendingAggregationAccordingOrgPref(end);
assertEquals(3, keys.size());
matchedKeys = keys.stream().filter(k -> ORG_ID.equals(k.getOrgId())).filter(k -> ((EventAggregationCommand) k).getAggregationKey().getApplicationId().equals(application.getId())).collect(Collectors.toList());
matchedKeys = keys.stream().filter(k -> ORG_ID.equals(k.getOrgId())).filter(k -> (((EventAggregationCriteria) k.getAggregationKey()).getApplicationId().equals(application.getId()))).collect(Collectors.toList());
assertEquals(0, matchedKeys.size());
}

Expand Down
Loading

0 comments on commit 03d24d4

Please sign in to comment.