Skip to content

Commit

Permalink
integrate feature manager for report from v2 or v1
Browse files Browse the repository at this point in the history
  • Loading branch information
pasha-codefresh committed Nov 27, 2023
1 parent 8ad5be8 commit 7646ec7
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 39 deletions.
4 changes: 2 additions & 2 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type eventReporterController struct {
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) EventReporterController {
appBroadcaster := reporter.NewBroadcaster()
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager)
appInformer.AddEventHandler(appBroadcaster)
return &eventReporterController{
appBroadcaster: appBroadcaster,
Expand Down
17 changes: 12 additions & 5 deletions event_reporter/reporter/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ type Broadcaster interface {
}

type broadcasterHandler struct {
lock sync.Mutex
subscribers []*subscriber
filter sharding.ApplicationFilterFunction
lock sync.Mutex
subscribers []*subscriber
filter sharding.ApplicationFilterFunction
featureManager *FeatureManager
}

func NewBroadcaster() Broadcaster {
func NewBroadcaster(featureManager *FeatureManager) Broadcaster {
// todo: pass real value here
filter := getApplicationFilter("")
return &broadcasterHandler{
filter: filter,
filter: filter,
featureManager: featureManager,
}
}

Expand All @@ -57,6 +59,11 @@ func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) {
subscribers = append(subscribers, b.subscribers...)
b.lock.Unlock()

if !b.featureManager.ShouldReporterRun() {
log.Infof("filtering application '%s', event reporting is turned off and old one is in use", event.Application.Name)
return
}

if b.filter != nil {
result, expectedShard := b.filter(&event.Application)
if !result {
Expand Down
43 changes: 43 additions & 0 deletions event_reporter/reporter/feature_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package reporter

import (
settings_util "github.com/argoproj/argo-cd/v2/util/settings"
log "github.com/sirupsen/logrus"
"time"
)

type FeatureManager struct {
settingsMgr *settings_util.SettingsManager
shouldRun bool
}

func NewFeatureManager(settingsMgr *settings_util.SettingsManager) *FeatureManager {
return &FeatureManager{settingsMgr: settingsMgr}
}

func (f *FeatureManager) setShouldRun() {
reporterVersion, err := f.settingsMgr.GetCodefreshReporterVersion()
if err != nil {
log.Warnf("Failed to get reporter version: %v", err)
f.shouldRun = false
return
}
f.shouldRun = reporterVersion == string(settings_util.CodefreshV2ReporterVersion)
}

func (f *FeatureManager) Watch() {
f.setShouldRun()
tick := time.Tick(5 * time.Second)
for {
select {
case <-tick:
{
f.setShouldRun()
}
}
}
}

func (f *FeatureManager) ShouldReporterRun() bool {
return f.shouldRun
}
38 changes: 8 additions & 30 deletions event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -66,8 +67,9 @@ type EventReporterServer struct {
db db.ArgoDB

// stopCh is the channel which when closed, will shutdown the Argo CD server
stopCh chan struct{}
serviceSet *EventReporterServerSet
stopCh chan struct{}
serviceSet *EventReporterServerSet
featureManager *reporter.FeatureManager
}

type EventReporterServerSet struct {
Expand Down Expand Up @@ -144,35 +146,14 @@ func (a *EventReporterServer) healthCheck(r *http.Request) error {
// Init starts informers used by the API server
func (a *EventReporterServer) Init(ctx context.Context) {
go a.appInformer.Run(ctx.Done())
go a.featureManager.Watch()
svcSet := newEventReporterServiceSet(a)
a.serviceSet = svcSet
}

func (a *EventReporterServer) RunController(ctx context.Context) {
running := false
controllerCtx, cancel := context.WithCancel(ctx)
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer)
tick := time.Tick(5 * time.Second)

for {
select {
case <-tick:
{
rVersion, err := a.settingsMgr.GetCodefreshReporterVersion()
if !running && rVersion == string(settings_util.CodefreshV2ReporterVersion) {
controllerCtx, cancel = context.WithCancel(ctx)
log.Warnf("Reporter parameter (%s) detected - starting controller", rVersion)
go controller.Run(controllerCtx)
running = true
}
if running == true && err == nil && isOldReporterVersion(rVersion) {
log.Warnf("Stopping reporter because version param changed to %s or missing", settings_util.CodefreshV1ReporterVersion)
cancel()
running = false
}
}
}
}
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager)
go controller.Run(ctx)
}

// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented
Expand Down Expand Up @@ -256,10 +237,6 @@ func (a *EventReporterServer) Run(ctx context.Context, lns *Listeners) {
<-a.stopCh
}

func isOldReporterVersion(reporterVersion string) bool {
return reporterVersion == "" || reporterVersion == string(settings_util.CodefreshV1ReporterVersion)
}

// NewServer returns a new instance of the Argo CD API server
func NewEventReporterServer(ctx context.Context, opts EventReporterServerOpts) *EventReporterServer {
settingsMgr := settings_util.NewSettingsManager(ctx, opts.KubeClientset, opts.Namespace)
Expand Down Expand Up @@ -302,6 +279,7 @@ func NewEventReporterServer(ctx context.Context, opts EventReporterServerOpts) *
appLister: appLister,
policyEnforcer: policyEnf,
db: dbInstance,
featureManager: reporter.NewFeatureManager(settingsMgr),
}

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
serviceAccountName: event-reporter
containers:
- name: event-reporter
image: docker.io/xeonalex/personal-argocd-dev:reporting-v2-test-v13
image: docker.io/xeonalex/personal-argocd-dev:reporting-v2-test-v20
imagePullPolicy: Always
args:
- /usr/local/bin/event-reporter-server
Expand Down
2 changes: 2 additions & 0 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
for {
select {
case event := <-eventsChannel:
log.Infof("event channel size is %d", len(eventsChannel))
rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion()
if rVersion == string(settings.CodefreshV2ReporterVersion) {
logCtx.Info("v1 reported disabled skipping event")
Expand All @@ -1129,6 +1130,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing

shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
log.Infof("ignore event for app %s", event.Application.Name)
continue
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
Expand Down
5 changes: 4 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
applicationsetpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/applicationset"
certificatepkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/certificate"
clusterpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/cluster"
eventspkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
gpgkeypkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/gpgkey"
projectpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/project"
repocredspkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/repocreds"
Expand Down Expand Up @@ -766,7 +767,9 @@ func (a *ArgoCDServer) newGRPCServer() (*grpc.Server, application.AppResourceTre
grpc_util.PanicLoggerUnaryServerInterceptor(a.log),
)))
grpcS := grpc.NewServer(sOpts...)


srv := a.serviceSet.ApplicationService.(*application.Server)
eventspkg.RegisterEventingServer(grpcS, srv)
versionpkg.RegisterVersionServiceServer(grpcS, a.serviceSet.VersionService)
clusterpkg.RegisterClusterServiceServer(grpcS, a.serviceSet.ClusterService)
applicationpkg.RegisterApplicationServiceServer(grpcS, a.serviceSet.ApplicationService)
Expand Down

0 comments on commit 7646ec7

Please sign in to comment.