Skip to content

Commit

Permalink
fix edge case with empty batch and msg too big (#134)
Browse files Browse the repository at this point in the history
* fix edge case with empty batch and msg too big

* go imports

* make linter happy

* have batcher operate quicker

* Add retry if ledger has not been cleanedup yet
  • Loading branch information
stlava authored Aug 20, 2024
1 parent c355207 commit 82f3aaf
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 35 deletions.
35 changes: 34 additions & 1 deletion itests/common.bash
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,35 @@ _check_lsn() {
return 0
}

_wait_for_ledger_signal() {
bifrost_logs=$(docker logs bifrost 2>&1)

if ! echo "${bifrost_logs}" | grep -q 'Got a signal 29'; then
return 1
fi

return 0
}

_check_ledger() {
docker kill -s SIGIO bifrost # dump ledger to stdout
_retry _wait_for_ledger_signal
bifrost_logs=$(docker logs bifrost 2>&1)

if ! echo "${bifrost_logs}" | grep -q 'No ledger entries to dump'; then
if ! echo "${bifrost_logs}" | grep -q 'entry:'; then
log "ledger did not dump as expected"
return 1
else
log "ledger had entries in it"
return 2
fi
fi

log "ledger was empty"
return 0
}

_gather_test_output() {
log "Exporting messages test output from containers"
# Copy output directories from the containers to the host
Expand Down Expand Up @@ -216,6 +245,11 @@ _verify() {
FAILED=1
_retry _check_lsn
FAILED=0

log "Verifying ledger is empty"
FAILED=1
_retry _check_ledger
FAILED=0
}

_profile() {
Expand Down Expand Up @@ -269,7 +303,6 @@ teardown() {
_end_timer

# Print current state of the ledger for debugging
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose kill -s IO bifrost # dump ledger to stdout
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose logs bifrost
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose logs data-poller

Expand Down
3 changes: 2 additions & 1 deletion itests/containers/defaults.env
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ CREATE_SLOT=true
AWS_ACCESS_KEY_ID=DUMMYACCESSKEYID
AWS_SECRET_ACCESS_KEY=DUMMYSECRETACCESSKEY
AWS_REGION=us-east-1
NO_MARSHAL_OLD_VALUE=true
NO_MARSHAL_OLD_VALUE=true
BATCHER_TICK_RATE=100
7 changes: 7 additions & 0 deletions itests/tests/kafka/test_big_record/envfile.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Bifrost
WORKERS=1
KAFKA_PARTITION_METHOD=transaction-constant

# Verifier
KAFKA_PARTITION_COUNT=1
EXPECTED_COUNT=5
5 changes: 5 additions & 0 deletions itests/tests/kafka/test_big_record/golden/test.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"1111"}},"id":{"new":{"q":"false","t":"integer","v":"1"}},"last_name":{"new":{"q":"true","t":"text","v":"1111"}}}}
{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"3333"}},"id":{"new":{"q":"false","t":"integer","v":"3"}},"last_name":{"new":{"q":"true","t":"text","v":"3333"}}}}
{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"4444"}},"id":{"new":{"q":"false","t":"integer","v":"4"}},"last_name":{"new":{"q":"true","t":"text","v":"4444"}}}}
{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"6666"}},"id":{"new":{"q":"false","t":"integer","v":"6"}},"last_name":{"new":{"q":"true","t":"text","v":"6666"}}}}
{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"8888"}},"id":{"new":{"q":"false","t":"integer","v":"8"}},"last_name":{"new":{"q":"true","t":"text","v":"8888"}}}}
18 changes: 18 additions & 0 deletions itests/tests/kafka/test_big_record/input/001.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE customers (id serial primary key, first_name text, last_name text);
ALTER TABLE customers REPLICA IDENTITY FULL;

INSERT INTO customers (first_name, last_name) VALUES ('1111', '1111');
INSERT INTO customers (first_name, last_name) VALUES ('2222', repeat('2', 1048576));
INSERT INTO customers (first_name, last_name) VALUES ('3333', '3333');

BEGIN;
INSERT INTO customers (first_name, last_name) VALUES ('4444', '4444');
INSERT INTO customers (first_name, last_name) VALUES ('5555', repeat('5', 1048576));
INSERT INTO customers (first_name, last_name) VALUES ('6666', '6666');
COMMIT;

BEGIN;
INSERT INTO customers (first_name, last_name) VALUES ('7777', repeat('7', 1048576));
COMMIT;

INSERT INTO customers (first_name, last_name) VALUES ('8888', '8888');
5 changes: 5 additions & 0 deletions itests/tests/kafka/test_big_record/test.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
load ../../../common

@test "kafka/test_big_record" {
do_test "kafka_topic_wait"
}
22 changes: 14 additions & 8 deletions transport/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"os"
"time"

"github.com/cevaris/ordered_map"

"github.com/Nextdoor/pg-bifrost.git/marshaller"
"github.com/Nextdoor/pg-bifrost.git/shutdown"
"github.com/Nextdoor/pg-bifrost.git/stats"
Expand Down Expand Up @@ -63,6 +65,7 @@ type Batcher struct {
inputChan <-chan *marshaller.MarshalledMessage // receive single MarshalledMessages
outputChans []chan transport.Batch // one output channel per Transporter worker
txnsSeenChan chan<- []*progress.Seen // channel to report transactions seen to ProgressTracker
txnsWritten chan<- *ordered_map.OrderedMap // channel to report empty batches with commit messages
statsChan chan stats.Stat

tickRate time.Duration // controls frequency that batcher looks for input. This should be non-zero to avoid CPU spin.
Expand All @@ -84,6 +87,7 @@ type Batcher struct {
func NewBatcher(shutdownHandler shutdown.ShutdownHandler,
inputChan <-chan *marshaller.MarshalledMessage,
txnsSeenChan chan<- []*progress.Seen,
txnsWritten chan<- *ordered_map.OrderedMap,
statsChan chan stats.Stat,

tickRate int, // number of milliseconds that batcher will wait to check for input.
Expand All @@ -110,6 +114,7 @@ func NewBatcher(shutdownHandler shutdown.ShutdownHandler,
inputChan,
outputChans,
txnsSeenChan,
txnsWritten,
statsChan,
time.Duration(tickRate) * time.Millisecond,
batchFactory,
Expand Down Expand Up @@ -345,14 +350,9 @@ func (b *Batcher) handleTicker() bool {
log.Debugf("flushing %s", key)
curBatch := b.batches[key]

// If the batch is empty then don't send it. This could be the case
// when a new batch was created but nothing added to it.
if !curBatch.IsEmpty() {
ok := b.sendBatch(curBatch)

if !ok {
return false
}
ok := b.sendBatch(curBatch)
if !ok {
return false
}

b.statsChan <- stats.NewStatCount("batcher", "batch_closed_early", 1, time.Now().UnixNano())
Expand All @@ -376,6 +376,12 @@ func (b *Batcher) sendBatch(batch transport.Batch) bool {
}
}

// On empty batches report their transactions as written because they may contain COMMITTs
if batch.IsEmpty() {
b.txnsWritten <- batch.GetTransactions()
return true
}

ok, err := batch.Close()
if !ok {
log.Error(err)
Expand Down
Loading

0 comments on commit 82f3aaf

Please sign in to comment.