Skip to content

Commit

Permalink
Enable storing of Run Events as Record
Browse files Browse the repository at this point in the history
All Events related to taskrun are stored when we are done with Runs
and in a single List.
This can be controlled by flag passed to watcher "store-event". Put
it to false disable storing of eventlist.
Record Name of EventList is stored as `results.tekton.dev/eventlist`
in TaskRun and PipelineRun.
  • Loading branch information
khrm authored and tekton-robot committed Jul 3, 2024
1 parent 31db5c9 commit 1e174ec
Showing 15 changed files with 312 additions and 13 deletions.
2 changes: 2 additions & 0 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
@@ -72,6 +72,7 @@ var (
checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects")
updateLogTimeout = flag.Duration("update_log_timeout", 300*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.")
dynamicReconcileTimeout = flag.Duration("dynamic_reconcile_timeout", 30*time.Second, "How long the Watcher waits for the dynamic reconciler to complete before it aborts.")
storeEvent = flag.Bool("store_event", false, "If enabled, events related to runs will also be stored")
)

func main() {
@@ -106,6 +107,7 @@ func main() {
CheckOwner: *checkOwner,
UpdateLogTimeout: updateLogTimeout,
DynamicReconcileTimeout: dynamicReconcileTimeout,
StoreEvent: *storeEvent,
}
log.Printf("dynamic reconcile timeout %s and update log timeout is %s", cfg.DynamicReconcileTimeout.String(), cfg.UpdateLogTimeout.String())

2 changes: 1 addition & 1 deletion config/base/100-watcher-serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ rules:
# Watcher currently get config from APISever, so will
# fail to start if it does not have this permission.
- apiGroups: [""]
resources: ["configmaps", "pods"]
resources: ["configmaps", "pods", "events"]
verbs: ["get", "list", "watch"]
# Required to read logs, when logs API is enabled
- apiGroups: [""]
2 changes: 2 additions & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
@@ -225,6 +225,8 @@ Possible values for `data_type` and `summary.type` (for Result) are:
- `tekton.dev/v1beta1.TaskRun` or `TASK_RUN`
- `tekton.dev/v1beta1.PipelineRun` or `PIPELINE_RUN`
- `results.tekton.dev/v1alpha2.Log`
- `results.tekton.dev/v1alpha3.Log`
- `results.tekton.dev/v1.EventList`

#### The `data` field in Record

3 changes: 3 additions & 0 deletions pkg/apis/v1alpha3/types.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ import (
"k8s.io/apimachinery/pkg/types"
)

// EventListRecordType represents the API resource type for EventSet records.
const EventListRecordType = "results.tekton.dev/v1.EventList"

// LogRecordType represents the API resource type for Tekton log records.
const LogRecordType = "results.tekton.dev/v1alpha3.Log"

3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/annotation/annotation.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,9 @@ const (
// Log identifier.
Log = "results.tekton.dev/log"

// EventList identifier.
EventList = "results.tekton.dev/eventlist"

// ResultAnnotations is an annotation that integrators should add to objects in order to store
// arbitrary keys/values into the Result.Annotations field.
ResultAnnotations = "results.tekton.dev/resultAnnotations"
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/config.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@ type Config struct {

// DynamicReconcileTimeout is the time we provide for the dynamic reconciler to process an event
DynamicReconcileTimeout *time.Duration
// Whether to Store Events related to Taskrun and Pipelineruns
StoreEvent bool
}

// GetDisableAnnotationUpdate returns whether annotation updates should be
129 changes: 128 additions & 1 deletion pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ package dynamic
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"sync"
@@ -38,10 +39,12 @@ import (
"github.com/tektoncd/results/pkg/watcher/results"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
@@ -54,6 +57,9 @@ var (
// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
// KubeClientSet allows us to talk to the k8s for core APIs
KubeClientSet kubernetes.Interface

resultsClient *results.Client
objectClient ObjectClient
cfg *reconciler.Config
@@ -79,9 +85,10 @@ type IsReadyForDeletion func(ctx context.Context, object results.Object) (bool,
type AfterDeletion func(ctx context.Context, object results.Object) error

// NewDynamicReconciler creates a new dynamic Reconciler.
func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
func NewDynamicReconciler(kubeClientSet kubernetes.Interface, rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
return &Reconciler{
resultsClient: results.NewClient(rc, lc),
KubeClientSet: kubeClientSet,
objectClient: oc,
cfg: cfg,
// Always true predicate.
@@ -227,6 +234,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {
}
}

// CreateEvents if enabled
if r.cfg.StoreEvent {
if err := r.storeEvents(ctx, o); err != nil {
logger.Errorw("Error storing eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
if ctxCancel != nil {
ctxCancel()
}
return err
}
logger.Debugw("Successfully store eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
}
logger = logger.With(zap.String("results.tekton.dev/result", res.Name),
zap.String("results.tekton.dev/record", rec.Name))
logger.Debugw("Record has been successfully upserted into API server", timeTakenField)
@@ -584,3 +611,103 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,

return nil
}

// storeEvents streams logs to the API server
func (r *Reconciler) storeEvents(ctx context.Context, o results.Object) error {
logger := logging.FromContext(ctx)
condition := o.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
GVK := o.GetObjectKind().GroupVersionKind()
if !GVK.Empty() &&
(GVK.Kind == "TaskRun" || GVK.Kind == "PipelineRun") &&
condition != nil &&
!condition.IsUnknown() {

rec, err := r.resultsClient.GetEventListRecord(ctx, o)
if err != nil {
return err
}

if rec != nil {
// It means we have already stored events
eventListName := rec.GetName()
// Update Events annotation if it doesn't exist
return r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: eventListName})
}

events, err := r.KubeClientSet.CoreV1().Events(o.GetNamespace()).List(ctx, metav1.ListOptions{
FieldSelector: "involvedObject.uid=" + string(o.GetUID()),
})
if err != nil {
logger.Errorf("Failed to store events - retrieve",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

tr, ok := o.(*pipelinev1beta1.TaskRun)

if ok {
podName := tr.Status.PodName
podEvents, err := r.KubeClientSet.CoreV1().Events(o.GetNamespace()).List(ctx, metav1.ListOptions{
FieldSelector: "involvedObject.name=" + podName,
})
if err != nil {
logger.Errorf("Failed to fetch taskrun pod events",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("podname", podName),
zap.String("err", err.Error()),
)
}
if podEvents != nil && len(podEvents.Items) > 0 {
events.Items = append(events.Items, podEvents.Items...)
}

}

data := filterEventList(events)
eventList, err := json.Marshal(data)
if err != nil {
logger.Errorf("Failed to store events - marshal",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

rec, err = r.resultsClient.PutEventList(ctx, o, eventList)
if err != nil {
return err
}

if err := r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: rec.GetName()}); err != nil {
return err
}

}

return nil
}

func filterEventList(events *v1.EventList) *v1.EventList {
if events == nil || len(events.Items) == 0 {
return events
}

for i, event := range events.Items {
// Only taking Name, Namespace and CreationTimeStamp for ObjectMeta
events.Items[i].ObjectMeta = metav1.ObjectMeta{
Name: event.Name,
Namespace: event.Namespace,
CreationTimestamp: event.CreationTimestamp,
}
}

return events
}
37 changes: 34 additions & 3 deletions pkg/watcher/reconciler/dynamic/dynamic_test.go
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sTest "k8s.io/client-go/kubernetes/fake"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
@@ -138,9 +139,12 @@ func TestReconcile_TaskRun(t *testing.T) {
cfg := &reconciler.Config{
DisableAnnotationUpdate: true,
RequeueInterval: 1 * time.Second,
StoreEvent: true,
}

r := NewDynamicReconciler(resultsClient, logsClient, trclient, cfg)
client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(client, resultsClient, logsClient, trclient, cfg)
if err := r.Reconcile(ctx, taskrun); err != nil {
t.Fatal(err)
}
@@ -164,10 +168,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing result uid: %v", err)
}
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID())).String())
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID()+"eventlist")).String())
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record: %v", err)
}
eventListName := watcherresults.FormatEventListName(resultName, uid, taskrun)
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist %s: record: %v", eventListName, err)
}
})

// Enable Annotation Updates, re-reconcile
@@ -204,6 +212,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record '%s': %v", logRecordName, err)
}
eventListName := tr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}

})

t.Run("delete object once grace period elapses", func(t *testing.T) {
@@ -428,8 +444,13 @@ func TestReconcile_PipelineRun(t *testing.T) {
if _, err := prclient.Create(ctx, pipelinerun, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
cfg := &reconciler.Config{
StoreEvent: true,
}

client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(resultsClient, logsClient, prclient, nil)
r := NewDynamicReconciler(client, resultsClient, logsClient, prclient, cfg)
if err := r.Reconcile(ctx, pipelinerun); err != nil {
t.Fatal(err)
}
@@ -474,6 +495,16 @@ func TestReconcile_PipelineRun(t *testing.T) {
t.Fatalf("Error getting log record: %v", err)
}
})
t.Run("EventList", func(t *testing.T) {

eventListName := pr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}
})

// We don't do the same exhaustive feature testing as TaskRuns here -
// since everything is handled as a generic object testing TaskRuns should
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
@@ -48,6 +49,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(pipelineRunLister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
pipelineRunLister: pipelineRunLister,
6 changes: 5 additions & 1 deletion pkg/watcher/reconciler/pipelinerun/reconciler.go
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
@@ -43,6 +44,9 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

// kubeClientSet allows us to talk to the k8s for core APIs
kubeClientSet kubernetes.Interface

resultsClient pb.ResultsClient
logsClient pb.LogsClient
pipelineRunLister pipelinev1beta1listers.PipelineRunLister
@@ -86,7 +90,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
PipelineRunInterface: r.pipelineClient.TektonV1beta1().PipelineRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.kubeClientSet, r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
// Tell the dynamic reconciler to wait until all underlying TaskRuns are
// ready for deletion before deleting the PipelineRun. This guarantees
// that the TaskRuns will not be deleted before their final state being
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
@@ -47,6 +48,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(lister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
lister: lister,
Loading

0 comments on commit 1e174ec

Please sign in to comment.