Skip to content

Commit

Permalink
in_kafka: formatting adjustments
Browse files Browse the repository at this point in the history
Signed-off-by: CoreidCC <[email protected]>
  • Loading branch information
coreidcc committed Jan 9, 2025
1 parent 9525f31 commit b9a3d68
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
rd_kafka_message_destroy(rkm);


if(!ctx->enable_auto_commit) {
if (!ctx->enable_auto_commit) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}
Expand Down Expand Up @@ -248,24 +248,23 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

/* Set the kafka poll timeout dependend on wether we run in our own
* or in the main event thread.
* a) run in main event thread:
* -> minimize the delay we might create
* b) run in our own thread:
* -> optimize for throuput and relay on 'fetch.wait.max.ms'
* which is set to 500 by default default. wa algin our
* timeout with what is set for 'fetch.wait.max.ms'
*/
/* Set the kafka poll timeout dependend on wether we run in our own */
/* or in the main event thread. */
/* a) run in main event thread: */
/* -> minimize the delay we might create */
/* b) run in our own thread: */
/* -> optimize for throuput and relay on 'fetch.wait.max.ms' */
/* which is set to 500 by default default. wa algin our */
/* timeout with what is set for 'fetch.wait.max.ms' */
ctx->poll_timeount_ms = 1;
if (ins->is_threaded) {
ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout

// align our timeout with what was configured for fetch.wait.max.ms
/* align our timeout with what was configured for fetch.wait.max.ms */
dsize = sizeof(conf_val);
res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize);
if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) {
// add 50ms so kafa triggers timout
/* add 50ms so kafa triggers timeout */
ctx->poll_timeount_ms = atoi(conf_val) + 50;
}
}
Expand Down

0 comments on commit b9a3d68

Please sign in to comment.