Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(event-handler): more appropriate variable naming #349

Merged
merged 6 commits into from
Oct 25, 2024
8 changes: 4 additions & 4 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *eventReporterController) Run(ctx context.Context) {

// sendIfPermitted is a helper to send the application to the client's streaming channel if the
// caller has RBAC privileges permissions to view it
sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error {
sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, eventProcessingStartedAt string, ignoreResourceCache bool) error {
if eventType == watch.Bookmark {
return nil // ignore this event
}
Expand All @@ -76,7 +76,7 @@ func (c *eventReporterController) Run(ctx context.Context) {
}
trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr)

err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
if err != nil {
return err
}
Expand Down Expand Up @@ -105,9 +105,9 @@ func (c *eventReporterController) Run(ctx context.Context) {
c.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricAppEventType, event.Application.Name)
continue
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
eventProcessingStartedAt := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
err := sendIfPermitted(ctx, event.Application, event.Type, eventProcessingStartedAt, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
Expand Down
19 changes: 19 additions & 0 deletions event_reporter/metrics/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package metrics_utils

import (
"time"
)

type MetricTimer struct {
startAt time.Time
}

func NewMetricTimer() *MetricTimer {
return &MetricTimer{
startAt: time.Now(),
}
}

func (m *MetricTimer) Duration() time.Duration {
return time.Since(m.startAt)
}
29 changes: 14 additions & 15 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/watch"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
metricsUtils "github.com/argoproj/argo-cd/v2/event_reporter/metrics/utils"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)
Expand All @@ -45,7 +46,7 @@ type ApplicationEventReporter interface {
StreamApplicationEvents(
ctx context.Context,
a *appv1.Application,
ts string,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
Expand Down Expand Up @@ -110,14 +111,14 @@ func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *a
func (s *applicationEventReporter) StreamApplicationEvents(
ctx context.Context,
a *appv1.Application,
ts string,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
) error {
startTime := time.Now()
logCtx := log.WithField("app", a.Name)
metricTimer := metricsUtils.NewMetricTimer()

logCtx := log.WithField("app", a.Name)
logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events")

project := a.Spec.GetProject()
Expand Down Expand Up @@ -175,17 +176,16 @@ func (s *applicationEventReporter) StreamApplicationEvents(
}

utils.SetHealthStatusIfMissing(rs)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, eventProcessingStartedAt, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
}
reconcileDuration := time.Since(startTime)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, reconcileDuration)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration())
} else {
logCtx.Info("processing as root application")
// will get here only for root applications (not managed as a resource by another application)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, ts, appInstanceLabelKey, trackingMethod, applicationVersions)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, appInstanceLabelKey, trackingMethod, applicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name)
return fmt.Errorf("failed to get application event: %w", err)
Expand All @@ -196,13 +196,12 @@ func (s *applicationEventReporter) StreamApplicationEvents(
return nil
}

utils.LogWithAppStatus(a, logCtx, ts).Info("sending root application event")
utils.LogWithAppStatus(a, logCtx, eventProcessingStartedAt).Info("sending root application event")
if err := s.codefreshClient.SendEvent(ctx, a.Name, appEvent); err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name)
return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err)
}
reconcileDuration := time.Since(startTime)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, reconcileDuration)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, metricTimer.Duration())
}

revisionsMetadata, _ := s.getApplicationRevisionsMetadata(ctx, logCtx, a)
Expand All @@ -217,7 +216,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name)
continue
}
err := s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
err := s.processResource(ctx, rs, a, logCtx, eventProcessingStartedAt, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
Expand Down Expand Up @@ -255,7 +254,7 @@ func (s *applicationEventReporter) processResource(
rs appv1.ResourceStatus,
parentApplication *appv1.Application,
logCtx *log.Entry,
ts string,
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
appTree *appv1.ApplicationTree,
manifestGenErr bool,
Expand Down Expand Up @@ -294,7 +293,7 @@ func (s *applicationEventReporter) processResource(
originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication)
}

ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, ts, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, appEventProcessingStartedAt, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name)
logCtx.WithError(err).Warn("failed to get event payload, resuming")
Expand All @@ -304,7 +303,7 @@ func (s *applicationEventReporter) processResource(
appRes := appv1.Application{}
appName := ""
if utils.IsApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil {
utils.LogWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event")
utils.LogWithAppStatus(&appRes, logCtx, appEventProcessingStartedAt).Info("streaming resource event")
appName = appRes.Name
} else {
utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event")
Expand Down
8 changes: 4 additions & 4 deletions event_reporter/reporter/event_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func getResourceEventPayload(
desiredState *apiclient.Manifest,
apptree *appv1.ApplicationTree,
manifestGenErr bool,
ts string,
appEventProcessingStartedAt string,
originalApplication *appv1.Application, // passed when rs is application
revisionsMetadata *utils.AppSyncRevisionsMetadata,
originalAppRevisionsMetadata *utils.AppSyncRevisionsMetadata, // passed when rs is application
Expand Down Expand Up @@ -194,7 +194,7 @@ func getResourceEventPayload(
}

payload := events.EventPayload{
Timestamp: ts,
Timestamp: appEventProcessingStartedAt,
Object: object,
Source: &source,
Errors: errors,
Expand All @@ -215,7 +215,7 @@ func (s *applicationEventReporter) getApplicationEventPayload(
ctx context.Context,
a *appv1.Application,
appTree *appv1.ApplicationTree,
ts string,
eventProcessingStartedAt string,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
applicationVersions *apiclient.ApplicationVersions,
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *applicationEventReporter) getApplicationEventPayload(
errors = append(errors, parseAggregativeHealthErrorsOfApplication(a, appTree)...)

payload := events.EventPayload{
Timestamp: ts,
Timestamp: eventProcessingStartedAt,
Object: object,
Source: source,
Errors: errors,
Expand Down
Loading