Skip to content

Commit

Permalink
Fix kafka reconnect backoff to config
Browse files Browse the repository at this point in the history
  • Loading branch information
yma96 committed Jul 12, 2024
1 parent 7d07944 commit 2db8881
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ public class KafkaStreamBooter

private static final long DEFAULT_KAFKA_STREAM_CLOSE_TIMEOUT = 5 * 60 * 3000L;

private static final long RECONNECT_BACKOFF_MS = 60 * 1000L;

private static final long RECONNECT_BACKOFF_MAX_MS = 30 * 60 * 1000L;

private KafkaStreams streams;

@Override
Expand Down Expand Up @@ -137,8 +133,8 @@ private Properties setKafkaProps()
props.putIfAbsent( StreamsConfig.APPLICATION_ID_CONFIG, config.getGroup() );
props.putIfAbsent( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers() );
props.putIfAbsent( StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, config.getRecordsPerPartition() );
props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS );
props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS );
props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, config.getReconnectBackoff() );
props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.getReconnectBackoffMax() );

logger.info( "Kafka props: {}", props );
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class KafkaConfig

private static final Integer DEFAULT_RECORDS_PER_PARTITION = 1000;

private static final Long DEFAULT_RECONNECT_BACKOFF_MS = 60 * 1000L;

private static final Long DEFAULT_RECONNECT_BACKOFF_MAX_MS = 30 * 60 * 1000L;

private static final boolean DEFAULT_ENABLED = true;

private static final boolean DEFAULT_TRACE = false;
Expand All @@ -58,6 +62,10 @@ public class KafkaConfig

private Boolean tracing;

private Long reconnectBackoff;

private Long reconnectBackoffMax;

public KafkaConfig()
{
}
Expand Down Expand Up @@ -140,6 +148,16 @@ public boolean isTracing()
return tracing == null ? DEFAULT_TRACE : tracing;
}

public Long getReconnectBackoff()
{
return reconnectBackoff == null ? DEFAULT_RECONNECT_BACKOFF_MS : reconnectBackoff;
}

public Long getReconnectBackoffMax()
{
return reconnectBackoffMax == null ? DEFAULT_RECONNECT_BACKOFF_MAX_MS : reconnectBackoffMax;
}

@ConfigName( "kafka.trace" )
public void setTracing( boolean tracing )
{
Expand Down

0 comments on commit 2db8881

Please sign in to comment.