diff --git a/publisher/kafka.go b/publisher/kafka.go index bc7b088d..d9d26054 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -13,43 +13,41 @@ import ( pb "github.com/raystack/raccoon/proto" ) -// KafkaProducer Produce data to kafka synchronously -type KafkaProducer interface { - // ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed. - ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error -} - func NewKafka() (*Kafka, error) { kp, err := newKafkaClient(config.PublisherKafka.ToKafkaConfigMap()) if err != nil { return &Kafka{}, err } return &Kafka{ - kp: kp, - flushInterval: config.PublisherKafka.FlushInterval, - topicFormat: config.EventDistribution.PublisherPattern, + kp: kp, + flushInterval: config.PublisherKafka.FlushInterval, + topicFormat: config.EventDistribution.PublisherPattern, + deliveryChannelSize: config.Worker.DeliveryChannelSize, }, nil } -func NewKafkaFromClient(client Client, flushInterval int, topicFormat string) *Kafka { +func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, deliveryChannelSize int) *Kafka { return &Kafka{ - kp: client, - flushInterval: flushInterval, - topicFormat: topicFormat, + kp: client, + flushInterval: flushInterval, + topicFormat: topicFormat, + deliveryChannelSize: deliveryChannelSize, } } type Kafka struct { - kp Client - flushInterval int - topicFormat string + kp Client + flushInterval int + topicFormat string + deliveryChannelSize int } // ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed. // DeliveryChannel needs to be exclusive. DeliveryChannel is exposed for recyclability purpose. -func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error { +func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string) error { errors := make([]error, len(events)) totalProcessed := 0 + deliveryChannel := make(chan kafka.Event, pr.deliveryChannelSize) for order, event := range events { topic := fmt.Sprintf(pr.topicFormat, event.Type) message := &kafka.Message{ diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index cc6286d9..3dde3d52 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -31,7 +31,7 @@ func TestProducer_Close(suite *testing.T) { client := &mockClient{} client.On("Flush", 10).Return(0) client.On("Close").Return() - kp := NewKafkaFromClient(client, 10, "%s") + kp := NewKafkaFromClient(client, 10, "%s", 1) kp.Close() client.AssertExpectations(t) }) @@ -55,9 +55,9 @@ func TestKafka_ProduceBulk(suite *testing.T) { } }() }) - kp := NewKafkaFromClient(client, 10, "%s") + kp := NewKafkaFromClient(client, 10, "%s", 1) - err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2)) + err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1) assert.NoError(t, err) }) }) @@ -79,9 +79,9 @@ func TestKafka_ProduceBulk(suite *testing.T) { }() }).Once() client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once() - kp := NewKafkaFromClient(client, 10, "%s") + kp := NewKafkaFromClient(client, 10, "%s", 1) - err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2)) + err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1) assert.Len(t, err.(BulkError).Errors, 3) assert.Error(t, err.(BulkError).Errors[0]) assert.Empty(t, err.(BulkError).Errors[1]) @@ -91,9 +91,9 @@ func TestKafka_ProduceBulk(suite *testing.T) { t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) { client := &mockClient{} client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once() - kp := NewKafkaFromClient(client, 10, "%s") + kp := NewKafkaFromClient(client, 10, "%s", 1) - err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) + err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1") assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic) }) }) @@ -115,9 +115,9 @@ func TestKafka_ProduceBulk(suite *testing.T) { } }() }).Once() - kp := NewKafkaFromClient(client, 10, "%s") + kp := NewKafkaFromClient(client, 10, "%s", 1) - err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) + err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1") assert.NotEmpty(t, err) assert.Len(t, err.(BulkError).Errors, 2) assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error()) diff --git a/worker/mocks.go b/worker/mocks.go index 599ada79..ca19e60d 100644 --- a/worker/mocks.go +++ b/worker/mocks.go @@ -1,7 +1,6 @@ package worker import ( - kafka "github.com/confluentinc/confluent-kafka-go/kafka" pb "github.com/raystack/raccoon/proto" mock "github.com/stretchr/testify/mock" ) @@ -12,8 +11,8 @@ type mockKafkaPublisher struct { } // ProduceBulk provides a mock function with given fields: events, deliveryChannel -func (m *mockKafkaPublisher) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error { - mock := m.Called(events, connGroup, deliveryChannel) +func (m *mockKafkaPublisher) ProduceBulk(events []*pb.Event, connGroup string) error { + mock := m.Called(events, connGroup) return mock.Error(0) } diff --git a/worker/worker.go b/worker/worker.go index 5cb6ecff..c3275ea5 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,75 +5,100 @@ import ( "sync" "time" - "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/raystack/raccoon/collection" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/metrics" + pb "github.com/raystack/raccoon/proto" "github.com/raystack/raccoon/publisher" ) +// Producer produces data to sink +type Producer interface { + // ProduceBulk message to a sink. Blocks until all messages are sent. Returns slice of error. + ProduceBulk(events []*pb.Event, connGroup string) error +} + // Pool spawn goroutine as much as Size that will listen to EventsChannel. On Close, wait for all data in EventsChannel to be processed. type Pool struct { Size int deliveryChannelSize int EventsChannel <-chan collection.CollectRequest - kafkaProducer publisher.KafkaProducer + producer Producer wg sync.WaitGroup } // CreateWorkerPool create new Pool struct given size and EventsChannel worker. -func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, kafkaProducer publisher.KafkaProducer) *Pool { +func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, producer Producer) *Pool { return &Pool{ Size: size, deliveryChannelSize: deliveryChannelSize, EventsChannel: eventsChannel, - kafkaProducer: kafkaProducer, + producer: producer, wg: sync.WaitGroup{}, } } -// StartWorkers initialize worker pool as much as Pool.Size -func (w *Pool) StartWorkers() { - w.wg.Add(w.Size) - for i := 0; i < w.Size; i++ { - go func(workerName string) { - logger.Info("Running worker: " + workerName) - deliveryChan := make(chan kafka.Event, w.deliveryChannelSize) - for request := range w.EventsChannel { - metrics.Histogram("batch_idle_in_channel_milliseconds", (time.Now().Sub(request.TimePushed)).Milliseconds(), map[string]string{"worker": workerName}) - batchReadTime := time.Now() - //@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created +func (w *Pool) newWorker(name string) { - err := w.kafkaProducer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group, deliveryChan) + logger.Info("Running worker: " + name) + for request := range w.EventsChannel { - produceTime := time.Since(batchReadTime) - metrics.Histogram("kafka_producebulk_tt_ms", produceTime.Milliseconds(), map[string]string{}) + metrics.Histogram( + "batch_idle_in_channel_milliseconds", + time.Since(request.TimePushed).Milliseconds(), + map[string]string{"worker": name}) - if request.AckFunc != nil { - request.AckFunc(err) - } + batchReadTime := time.Now() + //@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created + + err := w.producer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group) + + // TODO(turtledev): instrument this for individual sinks + // produceTime := time.Since(batchReadTime) + // metrics.Histogram("kafka_producebulk_tt_ms", produceTime.Milliseconds(), map[string]string{}) - totalErr := 0 + if request.AckFunc != nil { + request.AckFunc(err) + } + + totalErr := 0 + if err != nil { + // WARN(turtledev): this can panic if returned error is not of + // type publisher.BulkError + for _, err := range err.(publisher.BulkError).Errors { if err != nil { - for _, err := range err.(publisher.BulkError).Errors { - if err != nil { - logger.Errorf("[worker] Fail to publish message to kafka %v", err) - totalErr++ - } - } - } - lenBatch := int64(len(request.GetEvents())) - logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr))) - if lenBatch > 0 { - eventTimingMs := time.Since(request.GetSentTime().AsTime()).Milliseconds() / lenBatch - metrics.Histogram("event_processing_duration_milliseconds", eventTimingMs, map[string]string{"conn_group": request.ConnectionIdentifier.Group}) - now := time.Now() - metrics.Histogram("worker_processing_duration_milliseconds", (now.Sub(batchReadTime).Milliseconds())/lenBatch, map[string]string{"worker": workerName}) - metrics.Histogram("server_processing_latency_milliseconds", (now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch, map[string]string{"conn_group": request.ConnectionIdentifier.Group}) + logger.Errorf("[worker] Fail to publish message to kafka %v", err) + totalErr++ } } - w.wg.Done() - }(fmt.Sprintf("worker-%d", i)) + } + lenBatch := int64(len(request.GetEvents())) + logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr))) + if lenBatch > 0 { + eventTimingMs := time.Since(request.GetSentTime().AsTime()).Milliseconds() / lenBatch + metrics.Histogram( + "event_processing_duration_milliseconds", + eventTimingMs, + map[string]string{"conn_group": request.ConnectionIdentifier.Group}) + now := time.Now() + metrics.Histogram( + "worker_processing_duration_milliseconds", + (now.Sub(batchReadTime).Milliseconds())/lenBatch, + map[string]string{"worker": name}) + metrics.Histogram( + "server_processing_latency_milliseconds", + (now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch, + map[string]string{"conn_group": request.ConnectionIdentifier.Group}) + } + } + w.wg.Done() +} + +// StartWorkers initialize worker pool as much as Pool.Size +func (w *Pool) StartWorkers() { + w.wg.Add(w.Size) + for i := 0; i < w.Size; i++ { + w.newWorker(fmt.Sprintf("worker-%d", i)) } } diff --git a/worker/worker_test.go b/worker/worker_test.go index 3937f92a..ce32e510 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -36,7 +36,7 @@ func TestWorker(t *testing.T) { Size: 1, deliveryChannelSize: 0, EventsChannel: bc, - kafkaProducer: &kp, + producer: &kp, wg: sync.WaitGroup{}, } worker.StartWorkers() @@ -63,7 +63,7 @@ func TestWorker(t *testing.T) { Size: 1, deliveryChannelSize: 100, EventsChannel: bc, - kafkaProducer: &kp, + producer: &kp, wg: sync.WaitGroup{}, } worker.StartWorkers()