Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
refactoring
  • Loading branch information
birdayz authored Dec 23, 2023
1 parent da88277 commit 45e6c5c
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 68 deletions.
38 changes: 38 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package kstreams

import "github.com/twmb/franz-go/pkg/kgo"

type Store interface {
Init() error
Flush() error
Close() error
}

func NewChangeLoggingKeyValueStateStore[K, V any](client *kgo.Client) *ChangeLoggingKeyValueStateStore[K, V] {
return &ChangeLoggingKeyValueStateStore[K, V]{}
}

type ChangeLoggingKeyValueStateStore[K, V any] struct {
client *kgo.Client

Check failure on line 16 in api.go

View workflow job for this annotation

GitHub Actions / lint

field `client` is unused (unused)
}

func (s *ChangeLoggingKeyValueStateStore[K, V]) Init() error {
return nil
}

func (s *ChangeLoggingKeyValueStateStore[K, V]) Flush() error {
return nil
}

func (s *ChangeLoggingKeyValueStateStore[K, V]) Close() error {
return nil
}

func (t *ChangeLoggingKeyValueStateStore[K, V]) Set(k K, v V) error {
return nil
}

func (t *ChangeLoggingKeyValueStateStore[K, V]) Get(k K) (V, error) {
result := new(V)
return *result, nil
}
8 changes: 1 addition & 7 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
version: "3.9"
services:
minio:
command: server /data --console-address ":9001"
image: minio/minio:latest
ports:
- 9000:9000
- 9001:9001
redpanda:
command:
- redpanda
Expand All @@ -17,6 +11,6 @@ services:
- --overprovisioned
- --node-id
- "0"
image: docker.vectorized.io/vectorized/redpanda:v22.3.9
image: redpandadata/redpanda:v23.3.1
ports:
- 9092:9092
9 changes: 5 additions & 4 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@ require (
github.com/cockroachdb/pebble v0.0.0-20221222183300-eb5e1039627d // indirect
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/twmb/franz-go v1.11.0 // indirect
github.com/twmb/franz-go v1.13.5 // indirect
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.2.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.3.0 // indirect
Expand Down
18 changes: 8 additions & 10 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -293,8 +293,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -354,13 +354,13 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/twmb/franz-go v1.2.3-0.20211104052441-7952375c09c0/go.mod h1:e5ZOdNswX/wv+jebWNX49yc9U7zgR18Xovj9ckk6mx8=
github.com/twmb/franz-go v1.11.0 h1:yva9TXAgqI62rcdxFQjQPWxa0DmFoZfYEAUcYeSiIMM=
github.com/twmb/franz-go v1.11.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro=
github.com/twmb/franz-go v1.13.5 h1:7Hk47eZ7XRb4yWXQZk1GZU4BthkrKuZUfKOuP9Sgp24=
github.com/twmb/franz-go v1.13.5/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk=
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 h1:zzK73ySQ/KakIQM5qIPKfZnBEdvjNeAsryL1cPbyXNY=
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058/go.mod h1:fuA2THFeFx/ms1w1R432uxWJqq7o3Q+olvJR4NFpWcM=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211104051938-70808186d5f7/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.2.0 h1:jYWh2qFw5lDbNv5Gvu/sMKagzICxuA5L6m1W2Oe7XUo=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s=
github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
Expand Down Expand Up @@ -398,7 +398,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -470,7 +469,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
4 changes: 2 additions & 2 deletions examples/windowed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func main() {
"my-agg-store",
)
kstreams.RegisterStore(t, s, "my-agg-store")
kstreams.RegisterProcessor(t, p, "my-agg-processor", "sensor-data", "my-agg-store")
kstreams.MustRegisterProcessor(t, p, "my-agg-processor", "sensor-data", "my-agg-store")

kstreams.RegisterSink(t, "custom-agg-out", "message-count", serde.JSONSerializer[processors.WindowKey[string]](), serde.JSONSerializer[float64](), "my-agg-processor")
kstreams.MustRegisterSink(t, "custom-agg-out", "message-count", serde.JSONSerializer[processors.WindowKey[string]](), serde.JSONSerializer[float64](), "my-agg-processor")

app := kstreams.New(t.MustBuild(), "my-app", kstreams.WithWorkersCount(1), kstreams.WithLogr(zerologr.New(log)))

Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ go 1.19

require (
github.com/alecthomas/assert/v2 v2.2.0
github.com/davecgh/go-spew v1.1.0
github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a
github.com/hashicorp/go-multierror v1.1.1
github.com/twmb/franz-go v1.11.0
github.com/twmb/franz-go v1.13.5
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058
github.com/twmb/franz-go/pkg/kmsg v1.2.0
github.com/twmb/franz-go/pkg/kmsg v1.4.0
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
Expand All @@ -17,6 +18,6 @@ require (
github.com/alecthomas/repr v0.1.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
)
19 changes: 9 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/alecthomas/assert/v2 v2.2.0 h1:f6L/b7KE2bfA+9O4FL3CM/xJccDEwPVYd5fALB
github.com/alecthomas/assert/v2 v2.2.0/go.mod h1:b/+1DI2Q6NckYi+3mXyH3wFb8qG37K/DuK80n7WefXA=
github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE=
github.com/alecthomas/repr v0.1.0/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a h1:FXCefK0m04/byLiHiCNNbibt/fWqclW7lAxTygj0IK4=
github.com/go-logr/logr v1.2.4-0.20221203165057-4da5305ff29a/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand All @@ -23,35 +24,33 @@ github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aW
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/twmb/franz-go v1.2.3-0.20211104052441-7952375c09c0/go.mod h1:e5ZOdNswX/wv+jebWNX49yc9U7zgR18Xovj9ckk6mx8=
github.com/twmb/franz-go v1.11.0 h1:yva9TXAgqI62rcdxFQjQPWxa0DmFoZfYEAUcYeSiIMM=
github.com/twmb/franz-go v1.11.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro=
github.com/twmb/franz-go v1.13.5 h1:7Hk47eZ7XRb4yWXQZk1GZU4BthkrKuZUfKOuP9Sgp24=
github.com/twmb/franz-go v1.13.5/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk=
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058 h1:zzK73ySQ/KakIQM5qIPKfZnBEdvjNeAsryL1cPbyXNY=
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220215213838-c67ef7e57058/go.mod h1:fuA2THFeFx/ms1w1R432uxWJqq7o3Q+olvJR4NFpWcM=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211104051938-70808186d5f7/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.2.0 h1:jYWh2qFw5lDbNv5Gvu/sMKagzICxuA5L6m1W2Oe7XUo=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s=
github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd h1:zVFyTKZN/Q7mNRWSs1GOYnHM9NiFSJ54YVRsD0rNWT4=
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
9 changes: 8 additions & 1 deletion interceptor.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package kstreams

type ProcessorInterceptor[K any, V any, Kout any, Vout any] func(ctx ProcessorContext[Kout, Vout], k K, v V, processor Processor[K, V, Kout, Vout])
import "context"

type ProcessorInterceptor[K any, V any, Kout any, Vout any] func(ctx context.Context, k K, v V, processor Processor[K, V, Kout, Vout])

// How to capture Forwarded stuff here?
func init() {

// f := func(ctx context.Context, k string, v string, proc Processor[string, string, string, string]) {
// }
}
18 changes: 17 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package kstreams

import "context"
import (
"context"

"github.com/twmb/franz-go/pkg/kgo"
)

// Processor is a low-level interface. The implementation can retain the
// ProcessorContext passed into Init and use it to access state stores and
Expand All @@ -14,5 +18,17 @@ type Processor[Kin any, Vin any, Kout any, Vout any] interface {
Process(ctx context.Context, k Kin, v Vin) error
}

type Input[Kin any, Vin any] struct {
}

type Record[K, V any] struct {
Key K
Value V
X kgo.Record
}

type Output[Kout any, Vout any] struct {
}

// ProcessorBuilder creates an actual processor for a specific TopicPartition.
type ProcessorBuilder[Kin any, Vin any, Kout any, Vout any] func() Processor[Kin, Vin, Kout, Vout]
8 changes: 5 additions & 3 deletions processors/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func NewWindowedAggregator[Kin, Vin, State, Vout any](
keySerde kstreams.SerDe[Kin],
stateSerde kstreams.SerDe[State],
storeName string,
// TODO add trigger - when to call finalize.
// TODO maybe introduce interface for aggregator, so people can plug custom aggregators?
) (
kstreams.ProcessorBuilder[Kin, Vin, WindowKey[Kin], Vout],
kstreams.StoreBuilder,
Expand Down Expand Up @@ -159,8 +161,8 @@ func (t *WindowedKeyValueStore[K, V]) Init() error {
return t.store.Init()
}

func (t *WindowedKeyValueStore[K, V]) Flush(ctx context.Context) error {
return t.store.Flush(ctx)
func (t *WindowedKeyValueStore[K, V]) Flush() error {
return t.store.Flush()
}

func (t *WindowedKeyValueStore[K, V]) Close() error {
Expand Down Expand Up @@ -224,7 +226,7 @@ func WindowKeyDeserializer[K any](deserializer kstreams.Deserializer[K]) kstream
return func(b []byte) (key WindowKey[K], err error) {
length := binary.BigEndian.Uint16(b)
if len(b) < int(length)+1+8 {
return WindowKey[K]{}, fmt.Errorf("eof")
return WindowKey[K]{}, fmt.Errorf("unexpected eof")
}

b = b[2:]
Expand Down
14 changes: 6 additions & 8 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"errors"
)

type Store interface {
Init() error
Flush(context.Context) error
Close() error
}

var (
ErrKeyNotFound = errors.New("store: key not found")
)
Expand Down Expand Up @@ -60,14 +54,18 @@ func (t *KeyValueStore[K, V]) Init() error {
return t.store.Init()
}

func (t *KeyValueStore[K, V]) Flush(ctx context.Context) error {
return t.store.Flush(ctx)
func (t *KeyValueStore[K, V]) Flush() error {
return t.store.Flush()
}

func (t *KeyValueStore[K, V]) Close() error {
return t.store.Close()
}

func (t *KeyValueStore[K, V]) Checkpoint(ctx context.Context, id string) error {
return nil
}

func (t *KeyValueStore[K, V]) Set(k K, v V) error {
key, err := t.keySerializer(k)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion stores/pebble/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ func (s *pebbleStore) Init() error {
return nil
}

func (s *pebbleStore) Flush(ctx context.Context) error {
func (s *pebbleStore) Flush() error {
return s.db.Flush()
}

func (s *pebbleStore) Checkpoint(ctx context.Context, id string) error {
return nil
}

func (s *pebbleStore) Close() error {
if err := s.db.Flush(); err != nil {
return err
Expand Down
Loading

0 comments on commit 45e6c5c

Please sign in to comment.