From 45e6c5cc8a443f51b6a47ad215cb911e7e0b41be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Sat, 23 Dec 2023 03:54:23 +0100 Subject: [PATCH] refactoring refactoring --- api.go | 38 ++++++++++++++++++++++++++++++++++++++ docker-compose.yaml | 8 +------- examples/go.mod | 9 +++++---- examples/go.sum | 18 ++++++++---------- examples/windowed/main.go | 4 ++-- go.mod | 9 +++++---- go.sum | 19 +++++++++---------- interceptor.go | 9 ++++++++- processor.go | 18 +++++++++++++++++- processors/aggregator.go | 8 +++++--- store.go | 14 ++++++-------- stores/pebble/store.go | 6 +++++- task.go | 10 +++++----- task_manager.go | 2 +- worker.go | 19 ++++++++----------- 15 files changed, 123 insertions(+), 68 deletions(-) create mode 100644 api.go diff --git a/api.go b/api.go new file mode 100644 index 0000000..9239d59 --- /dev/null +++ b/api.go @@ -0,0 +1,38 @@ +package kstreams + +import "github.com/twmb/franz-go/pkg/kgo" + +type Store interface { + Init() error + Flush() error + Close() error +} + +func NewChangeLoggingKeyValueStateStore[K, V any](client *kgo.Client) *ChangeLoggingKeyValueStateStore[K, V] { + return &ChangeLoggingKeyValueStateStore[K, V]{} +} + +type ChangeLoggingKeyValueStateStore[K, V any] struct { + client *kgo.Client +} + +func (s *ChangeLoggingKeyValueStateStore[K, V]) Init() error { + return nil +} + +func (s *ChangeLoggingKeyValueStateStore[K, V]) Flush() error { + return nil +} + +func (s *ChangeLoggingKeyValueStateStore[K, V]) Close() error { + return nil +} + +func (t *ChangeLoggingKeyValueStateStore[K, V]) Set(k K, v V) error { + return nil +} + +func (t *ChangeLoggingKeyValueStateStore[K, V]) Get(k K) (V, error) { + result := new(V) + return *result, nil +} diff --git a/docker-compose.yaml b/docker-compose.yaml index b27a7bd..67449ca 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,11 +1,5 @@ version: "3.9" services: - minio: - command: server /data --console-address ":9001" - image: minio/minio:latest - ports: - - 9000:9000 - - 9001:9001 redpanda: command: - redpanda @@ -17,6 +11,6 @@ services: - --overprovisioned - --node-id - "0" - image: docker.vectorized.io/vectorized/redpanda:v22.3.9 + image: redpandadata/redpanda:v23.3.1 ports: - 9092:9092 diff --git a/examples/go.mod b/examples/go.mod index baea0d7..fcbb534 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -19,24 +19,25 @@ require ( github.com/cockroachdb/pebble v0.0.0-20221222183300-eb5e1039627d // indirect github.com/cockroachdb/redact v1.0.8 // indirect github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/compress v1.16.3 // indirect github.com/kr/pretty v0.2.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.12.0 // indirect github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/twmb/franz-go v1.11.0 // indirect + github.com/twmb/franz-go v1.13.5 // indirect github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.2.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.3.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index af14e9b..baa3877 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -245,8 +245,8 @@ github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -293,8 +293,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -354,13 +354,13 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/twmb/franz-go v1.2.3-0.20211104052441-7952375c09c0/go.mod h1:e5ZOdNswX/wv+jebWNX49yc9U7zgR18Xovj9ckk6mx8= -github.com/twmb/franz-go v1.11.0 h1:yva9TXAgqI62rcdxFQjQPWxa0DmFoZfYEAUcYeSiIMM= -github.com/twmb/franz-go v1.11.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro= +github.com/twmb/franz-go v1.13.5 h1:7Hk47eZ7XRb4yWXQZk1GZU4BthkrKuZUfKOuP9Sgp24= +github.com/twmb/franz-go v1.13.5/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 h1:zzK73ySQ/KakIQM5qIPKfZnBEdvjNeAsryL1cPbyXNY= github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058/go.mod h1:fuA2THFeFx/ms1w1R432uxWJqq7o3Q+olvJR4NFpWcM= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211104051938-70808186d5f7/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v1.2.0 h1:jYWh2qFw5lDbNv5Gvu/sMKagzICxuA5L6m1W2Oe7XUo= -github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= +github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= @@ -398,7 +398,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= -golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -470,7 +469,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/examples/windowed/main.go b/examples/windowed/main.go index b31838f..e73073f 100644 --- a/examples/windowed/main.go +++ b/examples/windowed/main.go @@ -57,9 +57,9 @@ func main() { "my-agg-store", ) kstreams.RegisterStore(t, s, "my-agg-store") - kstreams.RegisterProcessor(t, p, "my-agg-processor", "sensor-data", "my-agg-store") + kstreams.MustRegisterProcessor(t, p, "my-agg-processor", "sensor-data", "my-agg-store") - kstreams.RegisterSink(t, "custom-agg-out", "message-count", serde.JSONSerializer[processors.WindowKey[string]](), serde.JSONSerializer[float64](), "my-agg-processor") + kstreams.MustRegisterSink(t, "custom-agg-out", "message-count", serde.JSONSerializer[processors.WindowKey[string]](), serde.JSONSerializer[float64](), "my-agg-processor") app := kstreams.New(t.MustBuild(), "my-app", kstreams.WithWorkersCount(1), kstreams.WithLogr(zerologr.New(log))) diff --git a/go.mod b/go.mod index c532724..9ded775 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,12 @@ go 1.19 require ( github.com/alecthomas/assert/v2 v2.2.0 + github.com/davecgh/go-spew v1.1.0 github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a github.com/hashicorp/go-multierror v1.1.1 - github.com/twmb/franz-go v1.11.0 + github.com/twmb/franz-go v1.13.5 github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 - github.com/twmb/franz-go/pkg/kmsg v1.2.0 + github.com/twmb/franz-go/pkg/kmsg v1.4.0 golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) @@ -17,6 +18,6 @@ require ( github.com/alecthomas/repr v0.1.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect - github.com/klauspost/compress v1.15.9 // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect ) diff --git a/go.sum b/go.sum index 95e8ee6..fdd8d1a 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ github.com/alecthomas/assert/v2 v2.2.0 h1:f6L/b7KE2bfA+9O4FL3CM/xJccDEwPVYd5fALB github.com/alecthomas/assert/v2 v2.2.0/go.mod h1:b/+1DI2Q6NckYi+3mXyH3wFb8qG37K/DuK80n7WefXA= github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE= github.com/alecthomas/repr v0.1.0/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a h1:FXCefK0m04/byLiHiCNNbibt/fWqclW7lAxTygj0IK4= github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -23,35 +24,33 @@ github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aW github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/twmb/franz-go v1.2.3-0.20211104052441-7952375c09c0/go.mod h1:e5ZOdNswX/wv+jebWNX49yc9U7zgR18Xovj9ckk6mx8= -github.com/twmb/franz-go v1.11.0 h1:yva9TXAgqI62rcdxFQjQPWxa0DmFoZfYEAUcYeSiIMM= -github.com/twmb/franz-go v1.11.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro= +github.com/twmb/franz-go v1.13.5 h1:7Hk47eZ7XRb4yWXQZk1GZU4BthkrKuZUfKOuP9Sgp24= +github.com/twmb/franz-go v1.13.5/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 h1:zzK73ySQ/KakIQM5qIPKfZnBEdvjNeAsryL1cPbyXNY= github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058/go.mod h1:fuA2THFeFx/ms1w1R432uxWJqq7o3Q+olvJR4NFpWcM= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211104051938-70808186d5f7/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v1.2.0 h1:jYWh2qFw5lDbNv5Gvu/sMKagzICxuA5L6m1W2Oe7XUo= -github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= +github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd h1:zVFyTKZN/Q7mNRWSs1GOYnHM9NiFSJ54YVRsD0rNWT4= golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/interceptor.go b/interceptor.go index b12c050..1e6ba4a 100644 --- a/interceptor.go +++ b/interceptor.go @@ -1,5 +1,12 @@ package kstreams -type ProcessorInterceptor[K any, V any, Kout any, Vout any] func(ctx ProcessorContext[Kout, Vout], k K, v V, processor Processor[K, V, Kout, Vout]) +import "context" + +type ProcessorInterceptor[K any, V any, Kout any, Vout any] func(ctx context.Context, k K, v V, processor Processor[K, V, Kout, Vout]) // How to capture Forwarded stuff here? +func init() { + + // f := func(ctx context.Context, k string, v string, proc Processor[string, string, string, string]) { + // } +} diff --git a/processor.go b/processor.go index dc9d2aa..2e0347f 100644 --- a/processor.go +++ b/processor.go @@ -1,6 +1,10 @@ package kstreams -import "context" +import ( + "context" + + "github.com/twmb/franz-go/pkg/kgo" +) // Processor is a low-level interface. The implementation can retain the // ProcessorContext passed into Init and use it to access state stores and @@ -14,5 +18,17 @@ type Processor[Kin any, Vin any, Kout any, Vout any] interface { Process(ctx context.Context, k Kin, v Vin) error } +type Input[Kin any, Vin any] struct { +} + +type Record[K, V any] struct { + Key K + Value V + X kgo.Record +} + +type Output[Kout any, Vout any] struct { +} + // ProcessorBuilder creates an actual processor for a specific TopicPartition. type ProcessorBuilder[Kin any, Vin any, Kout any, Vout any] func() Processor[Kin, Vin, Kout, Vout] diff --git a/processors/aggregator.go b/processors/aggregator.go index 6b45a7b..1489f1d 100644 --- a/processors/aggregator.go +++ b/processors/aggregator.go @@ -37,6 +37,8 @@ func NewWindowedAggregator[Kin, Vin, State, Vout any]( keySerde kstreams.SerDe[Kin], stateSerde kstreams.SerDe[State], storeName string, + // TODO add trigger - when to call finalize. + // TODO maybe introduce interface for aggregator, so people can plug custom aggregators? ) ( kstreams.ProcessorBuilder[Kin, Vin, WindowKey[Kin], Vout], kstreams.StoreBuilder, @@ -159,8 +161,8 @@ func (t *WindowedKeyValueStore[K, V]) Init() error { return t.store.Init() } -func (t *WindowedKeyValueStore[K, V]) Flush(ctx context.Context) error { - return t.store.Flush(ctx) +func (t *WindowedKeyValueStore[K, V]) Flush() error { + return t.store.Flush() } func (t *WindowedKeyValueStore[K, V]) Close() error { @@ -224,7 +226,7 @@ func WindowKeyDeserializer[K any](deserializer kstreams.Deserializer[K]) kstream return func(b []byte) (key WindowKey[K], err error) { length := binary.BigEndian.Uint16(b) if len(b) < int(length)+1+8 { - return WindowKey[K]{}, fmt.Errorf("eof") + return WindowKey[K]{}, fmt.Errorf("unexpected eof") } b = b[2:] diff --git a/store.go b/store.go index d22c5fa..ab17ff1 100644 --- a/store.go +++ b/store.go @@ -5,12 +5,6 @@ import ( "errors" ) -type Store interface { - Init() error - Flush(context.Context) error - Close() error -} - var ( ErrKeyNotFound = errors.New("store: key not found") ) @@ -60,14 +54,18 @@ func (t *KeyValueStore[K, V]) Init() error { return t.store.Init() } -func (t *KeyValueStore[K, V]) Flush(ctx context.Context) error { - return t.store.Flush(ctx) +func (t *KeyValueStore[K, V]) Flush() error { + return t.store.Flush() } func (t *KeyValueStore[K, V]) Close() error { return t.store.Close() } +func (t *KeyValueStore[K, V]) Checkpoint(ctx context.Context, id string) error { + return nil +} + func (t *KeyValueStore[K, V]) Set(k K, v V) error { key, err := t.keySerializer(k) if err != nil { diff --git a/stores/pebble/store.go b/stores/pebble/store.go index d9ad745..3e220cf 100644 --- a/stores/pebble/store.go +++ b/stores/pebble/store.go @@ -17,10 +17,14 @@ func (s *pebbleStore) Init() error { return nil } -func (s *pebbleStore) Flush(ctx context.Context) error { +func (s *pebbleStore) Flush() error { return s.db.Flush() } +func (s *pebbleStore) Checkpoint(ctx context.Context, id string) error { + return nil +} + func (s *pebbleStore) Close() error { if err := s.db.Flush(); err != nil { return err diff --git a/task.go b/task.go index 12f9478..d913a8b 100644 --- a/task.go +++ b/task.go @@ -16,7 +16,7 @@ type Task struct { topics []string partition int32 - committableOffsets map[string]int64 // Topic => offset + committableOffsets map[string]kgo.EpochOffset // Topic => offset processors map[string]Node @@ -31,7 +31,7 @@ func NewTask(topics []string, partition int32, rootNodes map[string]RecordProces stores: stores, topics: topics, partition: partition, - committableOffsets: map[string]int64{}, + committableOffsets: map[string]kgo.EpochOffset{}, processors: processors, sinks: sinks, processorsToStores: processorToStore, @@ -48,7 +48,7 @@ func (t *Task) Process(ctx context.Context, records ...*kgo.Record) error { if err := p.Process(ctx, record); err != nil { return fmt.Errorf("failed to process record: %w", err) } - t.committableOffsets[record.Topic] = record.Offset + 1 + t.committableOffsets[record.Topic] = kgo.EpochOffset{Epoch: record.LeaderEpoch, Offset: record.Offset + 1} } return nil @@ -76,7 +76,7 @@ func (t *Task) Close(ctx context.Context) error { return err.ErrorOrNil() } -func (t *Task) GetOffsetsToCommit() map[string]int64 { +func (t *Task) GetOffsetsToCommit() map[string]kgo.EpochOffset { return t.committableOffsets } @@ -91,7 +91,7 @@ func (t *Task) Flush(ctx context.Context) error { var err *multierror.Error for _, store := range t.stores { - err = multierror.Append(err, store.Flush(ctx)) + err = multierror.Append(err, store.Flush()) } for _, sink := range t.sinks { diff --git a/task_manager.go b/task_manager.go index 9a84b58..62edfe7 100644 --- a/task_manager.go +++ b/task_manager.go @@ -178,7 +178,7 @@ func (t *TaskManager) commit(ctx context.Context) error { if _, ok := data[topic]; !ok { data[topic] = make(map[int32]kgo.EpochOffset) } - data[topic][task.partition] = kgo.EpochOffset{Offset: offset} + data[topic][task.partition] = kgo.EpochOffset{Epoch: offset.Epoch, Offset: offset.Offset} } } errCh := make(chan error, 1) diff --git a/worker.go b/worker.go index 87ebb16..adc6181 100644 --- a/worker.go +++ b/worker.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/go-logr/logr" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" @@ -93,7 +94,7 @@ func NewWorker(log logr.Logger, name string, t *Topology, group string, brokers tm.client = client - sr := &Worker{ + w := &Worker{ name: name, log: log.WithValues("worker", name), group: group, @@ -107,8 +108,8 @@ func NewWorker(log logr.Logger, name string, t *Topology, group string, brokers taskManager: tm, commitInterval: commitInterval, } - sr.closed.Add(1) - return sr, nil + w.closed.Add(1) + return w, nil } func (r *Worker) changeState(newState RoutineState) { @@ -161,7 +162,6 @@ func (r *Worker) handleRunning() { } if !errors.Is(f.Err(), context.DeadlineExceeded) { - var fetches []kgo.FetchTopicPartition for _, fetchError := range f.Errors() { if errors.Is(fetchError.Err, context.DeadlineExceeded) { continue @@ -169,18 +169,13 @@ func (r *Worker) handleRunning() { r.log.Error(fetchError.Err, "fetch error", "topic", fetchError.Topic, "partition", fetchError.Partition) if fetchError.Err != nil { r.err = fmt.Errorf("fetch error on topic %s, partition %d: %w", fetchError.Topic, fetchError.Partition, fetchError.Err) + spew.Dump(fetchError) r.changeState(StateCloseRequested) return } } f.EachPartition(func(fetch kgo.FetchTopicPartition) { - if fetch.Err == nil { - fetches = append(fetches, fetch) - } - }) - - for _, fetch := range fetches { r.log.V(1).Info("Processing", "topic", fetch.Topic, "partition", fetch.Partition) task, err := r.taskManager.TaskFor(fetch.Topic, fetch.Partition) if err != nil { @@ -205,7 +200,9 @@ func (r *Worker) handleRunning() { } } r.log.V(2).Info("Processed", "topic", fetch.Topic, "partition", fetch.Partition) - } + + }) + } commitCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)