Skip to content

Commit

Permalink
Necessary fixes for _bulk endpoint (#1202)
Browse files Browse the repository at this point in the history
For our V2 ingest processor, we need better implementation of `_bulk`
API, which can handle more situations. Some shortcuts we took earlier,
are no longer a viable option.

Changes:
* We used to take bulk payload lines and just process them in pairs
(action/metadata line followed by document) which doesn't have to be the
case. There can always be a `delete` call which isn't followed by any
document. So the whole `BulkForEach` had to be rewritten.
* Extending `NDJSON` type validation, so that single-line JSON is also a
valid NDJSON 😉
* Adding some tests 


When loading sample datasets and having operational Kibana, every single
request ends with code 200 🎊
  • Loading branch information
mieciu authored Jan 20, 2025
1 parent 3521a43 commit 7f158aa
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 76 deletions.
56 changes: 34 additions & 22 deletions quesma/quesma/functionality/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,21 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
ingestStatsEnabled bool, esBackendConn *backend_connectors.ElasticsearchBackendConnector, phoneHomeClient diag.PhoneHomeClient, tableResolver table_resolver.TableResolver) (results []BulkItem, err error) {
defer recovery.LogPanic()

bulkSize := len(bulk) / 2 // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
maybeLogBatchSize(bulkSize)
maxBulkSize := len(bulk)
maybeLogBatchSize(maxBulkSize)

// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, tableResolver)
results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver)
if err != nil {
return []BulkItem{}, err
}

// we fail if there are some documents to insert into Clickhouse but ingest processor is not available
if len(clickhouseDocumentsToInsert) > 0 && ip == nil {
if len(clickhouseBulkEntries) > 0 && ip == nil {

indexes := make(map[string]struct{})
for index := range clickhouseDocumentsToInsert {
for index := range clickhouseBulkEntries {
indexes[index] = struct{}{}
}

Expand All @@ -104,16 +104,25 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
}

if ip != nil {
sendToClickhouse(ctx, clickhouseDocumentsToInsert, phoneHomeClient, ingestStatsEnabled, ip)
sendToClickhouse(ctx, clickhouseBulkEntries, phoneHomeClient, ingestStatsEnabled, ip)
}

return results, nil
// Here we filter out empty results so that final response does not contain empty elements
// WARNING: We could have `SplitBulk` returning properly-sized results,
// however at the time of writing this it would've been too much work.
nonEmptyResults := make([]BulkItem, 0, len(results))
for _, result := range results {
if result != (BulkItem{}) {
nonEmptyResults = append(nonEmptyResults, result)
}
}
return nonEmptyResults, nil
}

func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bulkSize int, tableResolver table_resolver.TableResolver) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) {
results := make([]BulkItem, bulkSize)
func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, maxBulkSize int, tableResolver table_resolver.TableResolver) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) {
results := make([]BulkItem, maxBulkSize)

clickhouseDocumentsToInsert := make(map[string][]BulkRequestEntry, bulkSize)
clickhouseBulkEntries := make(map[string][]BulkRequestEntry, maxBulkSize)
var elasticRequestBody []byte
var elasticBulkEntries []BulkRequestEntry

Expand Down Expand Up @@ -189,13 +198,14 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
elasticRequestBody = append(elasticRequestBody, opBytes...)
elasticRequestBody = append(elasticRequestBody, '\n')

documentBytes, err := document.Bytes()
if err != nil {
return err
if operation != "delete" {
documentBytes, err := document.Bytes()
if err != nil {
return err
}
elasticRequestBody = append(elasticRequestBody, documentBytes...)
elasticRequestBody = append(elasticRequestBody, '\n')
}
elasticRequestBody = append(elasticRequestBody, documentBytes...)
elasticRequestBody = append(elasticRequestBody, '\n')

elasticBulkEntries = append(elasticBulkEntries, entryWithResponse)

case *quesma_api.ConnectorDecisionClickhouse:
Expand All @@ -207,7 +217,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
return fmt.Errorf("unsupported bulk operation type: %s. Operation: %v, Document: %v", operation, rawOp, document)
}

clickhouseDocumentsToInsert[index] = append(clickhouseDocumentsToInsert[index], entryWithResponse)
clickhouseBulkEntries[index] = append(clickhouseBulkEntries[index], entryWithResponse)

default:
return fmt.Errorf("unsupported connector type: %T", connector)
Expand All @@ -217,8 +227,10 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul

return nil
})

return results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err
if len(elasticRequestBody) != 0 {
elasticRequestBody = append(elasticRequestBody, '\n')
}
return results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err
}

func sendToElastic(elasticRequestBody []byte, esBackendConn *backend_connectors.ElasticsearchBackendConnector, elasticBulkEntries []BulkRequestEntry) error {
Expand Down Expand Up @@ -255,8 +267,8 @@ func sendToElastic(elasticRequestBody []byte, esBackendConn *backend_connectors.
return nil
}

func sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[string][]BulkRequestEntry, emptyPhoneHomeClient diag.PhoneHomeClient, ingestStatsEnabled bool, ip *ingest.IngestProcessor) {
for indexName, documents := range clickhouseDocumentsToInsert {
func sendToClickhouse(ctx context.Context, clickhouseBulkEntries map[string][]BulkRequestEntry, emptyPhoneHomeClient diag.PhoneHomeClient, ingestStatsEnabled bool, ip *ingest.IngestProcessor) {
for indexName, documents := range clickhouseBulkEntries {
emptyPhoneHomeClient.IngestCounters().Add(indexName, int64(len(documents)))

for _, document := range documents {
Expand Down Expand Up @@ -331,7 +343,7 @@ func maybeLogBatchSize(batchSize int) {
mutex.Lock()
defer mutex.Unlock()
if _, alreadyLogged := loggedBatchSizes[batchSize]; !alreadyLogged {
logger.Info().Msgf("Ingesting via _bulk API, batch size=%d documents", batchSize)
logger.Info().Msgf("Ingesting via _bulk API, batch size=%d lines", batchSize)
loggedBatchSizes[batchSize] = struct{}{}
}
}
Loading

0 comments on commit 7f158aa

Please sign in to comment.