Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(event-reporter): refactoring, variables grouping, code splitting #351

Merged
Merged
5 changes: 4 additions & 1 deletion event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func (c *eventReporterController) Run(ctx context.Context) {
}
trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr)

err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, &reporter.ArgoTrackingMetadata{
AppInstanceLabelKey: &appInstanceLabelKey,
TrackingMethod: &trackingMethod,
})
if err != nil {
return err
}
Expand Down
87 changes: 55 additions & 32 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ type ApplicationEventReporter interface {
a *appv1.Application,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
argoTrackingMetadata *ArgoTrackingMetadata,
) error
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}
Expand Down Expand Up @@ -113,8 +112,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
a *appv1.Application,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
metricTimer := metricsUtils.NewMetricTimer()

Expand All @@ -141,18 +139,11 @@ func (s *applicationEventReporter) StreamApplicationEvents(

desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, a, nil, logCtx)

syncRevision := utils.GetOperationStateRevision(a)
var applicationVersions *apiclient.ApplicationVersions
if syncRevision != nil {
syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
applicationVersions = syncManifests.GetApplicationVersions()
} else {
applicationVersions = nil
}
applicationVersions := s.resolveApplicationVersions(ctx, a, logCtx)

logCtx.Info("getting parent application name")

parentAppIdentity := utils.GetParentAppIdentity(a, appInstanceLabelKey, trackingMethod)
parentAppIdentity := utils.GetParentAppIdentity(a, *argoTrackingMetadata.AppInstanceLabelKey, *argoTrackingMetadata.TrackingMethod)

if utils.IsChildApp(parentAppIdentity) {
logCtx.Info("processing as child application")
Expand All @@ -165,27 +156,29 @@ func (s *applicationEventReporter) StreamApplicationEvents(
}

rs := utils.GetAppAsResource(a)
utils.SetHealthStatusIfMissing(rs)

parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, nil, logCtx)

// helm app hasnt revision
// TODO: add check if it helm application
parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity)
if err != nil {
logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
}

utils.SetHealthStatusIfMissing(rs)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, eventProcessingStartedAt, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
err = s.processResource(ctx, *rs, logCtx, eventProcessingStartedAt, parentDesiredManifests, manifestGenErr, a, applicationVersions, &ReportedEntityParentApp{
app: parentApplicationEntity,
appTree: appTree,
revisionsMetadata: parentAppSyncRevisionsMetadata,
}, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
}
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration())
} else {
logCtx.Info("processing as root application")
// will get here only for root applications (not managed as a resource by another application)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, appInstanceLabelKey, trackingMethod, applicationVersions)
logCtx.Info("processing as root application")
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name)
return fmt.Errorf("failed to get application event: %w", err)
Expand Down Expand Up @@ -216,7 +209,11 @@ func (s *applicationEventReporter) StreamApplicationEvents(
s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name)
continue
}
err := s.processResource(ctx, rs, a, logCtx, eventProcessingStartedAt, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
err := s.processResource(ctx, rs, logCtx, eventProcessingStartedAt, desiredManifests, manifestGenErr, nil, nil, &ReportedEntityParentApp{
app: a,
appTree: appTree,
revisionsMetadata: revisionsMetadata,
}, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
Expand All @@ -225,6 +222,16 @@ func (s *applicationEventReporter) StreamApplicationEvents(
return nil
}

func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Context, a *appv1.Application, logCtx *log.Entry) *apiclient.ApplicationVersions {
syncRevision := utils.GetOperationStateRevision(a)
if syncRevision == nil {
return nil
}

syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
return syncManifests.GetApplicationVersions()
}

func (s *applicationEventReporter) getAppForResourceReporting(
rs appv1.ResourceStatus,
ctx context.Context,
Expand Down Expand Up @@ -252,17 +259,14 @@ func (s *applicationEventReporter) getAppForResourceReporting(
func (s *applicationEventReporter) processResource(
ctx context.Context,
rs appv1.ResourceStatus,
parentApplication *appv1.Application,
logCtx *log.Entry,
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
appTree *appv1.ApplicationTree,
manifestGenErr bool,
originalApplication *appv1.Application,
revisionsMetadata *utils.AppSyncRevisionsMetadata,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
applicationVersions *apiclient.ApplicationVersions,
originalApplication *appv1.Application, // passed onlu if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed onlu if resource is app
reportedEntityParentApp *ReportedEntityParentApp,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
metricsEventType := metrics.MetricResourceEventType
if utils.IsApp(rs) {
Expand All @@ -277,25 +281,44 @@ func (s *applicationEventReporter) processResource(
// get resource desired state
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)

actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, parentApplication, originalApplication)
actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, reportedEntityParentApp.app, originalApplication)
if err != nil {
return err
}
if actualState == nil {
return nil
}

parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, parentApplication, revisionsMetadata)
parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, reportedEntityParentApp.app, reportedEntityParentApp.revisionsMetadata)

var originalAppRevisionMetadata *utils.AppSyncRevisionsMetadata = nil

if originalApplication != nil {
originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication)
}

ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, appEventProcessingStartedAt, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
ev, err := getResourceEventPayload(
appEventProcessingStartedAt,
&ReportedResource{
rs: &rs,
actualState: actualState,
desiredState: desiredState,
manifestGenErr: manifestGenErr,
rsAsAppInfo: &ReportedResourceAsApp{
app: originalApplication,
revisionsMetadata: originalAppRevisionMetadata,
applicationVersions: applicationVersions,
},
},
&ReportedEntityParentApp{
app: parentApplicationToReport,
appTree: reportedEntityParentApp.appTree,
revisionsMetadata: revisionMetadataToReport,
},
argoTrackingMetadata,
)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name)
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, reportedEntityParentApp.app.Name)
logCtx.WithError(err).Warn("failed to get event payload, resuming")
return nil
}
Expand All @@ -307,7 +330,7 @@ func (s *applicationEventReporter) processResource(
appName = appRes.Name
} else {
utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event")
appName = parentApplication.Name
appName = reportedEntityParentApp.app.Name
}

if err := s.codefreshClient.SendEvent(ctx, appName, ev); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions event_reporter/reporter/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ import (
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/pkg/codefresh"
"github.com/argoproj/argo-cd/v2/util/argo"
)

const (
Expand Down Expand Up @@ -226,7 +224,7 @@ func TestStreamApplicationEvent(t *testing.T) {
assert.Equal(t, *app, actualApp)
return nil
}
_ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel)
_ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, getMockedArgoTrackingMetadata())
})
}

Expand Down
Loading
Loading