Skip to content

Commit

Permalink
addtional params
Browse files Browse the repository at this point in the history
  • Loading branch information
yokawasa committed Jul 17, 2021
1 parent b23d06c commit 88913e1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
30 changes: 28 additions & 2 deletions src/main/java/com/github/yokawasa/kinesis/KinesisConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import org.apache.commons.lang3.ObjectUtils;

/**
Expand All @@ -21,6 +22,11 @@ public static void main(String... args) {
config.dumpKinesisConfig();
}

/**
* Idle time between record reads in milliseconds.
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;

/**
* Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
* will be regarded as having problems and it's shards will be assigned to other workers.
Expand All @@ -38,6 +44,7 @@ public static void main(String... args) {
private String streamName;
private Region region;
private String initialPositionInStream; // LATEST or TRIM_HORIZON.
private long idleTimeBetweenReadsInMillis;
private long failoverTimeMillis;
private int maxRecords;

Expand All @@ -56,6 +63,9 @@ public KinesisConfig() {
value = System.getenv("KINESIS_INITIAL_POSITION_IN_STREAM");
this.initialPositionInStream = value !=null ? value : "LATEST";

value = System.getenv("KINESIS_IDLETIME_BETWEEN_READS_MILLIS");
this.idleTimeBetweenReadsInMillis = value !=null ? Long.valueOf(value) : DEFAULT_IDLETIME_BETWEEN_READS_MILLIS;

value = System.getenv("KINESIS_FAILOVER_TIME_MILLIS");
this.failoverTimeMillis = value !=null ? Long.valueOf(value) : DEFAULT_FAILOVER_TIME_MILLIS;

Expand Down Expand Up @@ -87,8 +97,22 @@ public Region getRegion() {
/**
* @return Name of the initial Position In Stream
*/
public String getInitialPositionInStream() {
return this.initialPositionInStream;
public InitialPositionInStream getInitialPositionInStream() {
switch (this.initialPositionInStream) {
case "LATEST":
return InitialPositionInStream.LATEST;
case "TRIM_HORIZON":
return InitialPositionInStream.TRIM_HORIZON;
default:
throw new IllegalArgumentException("Invalid InitialPosition");
}
}

/**
* @return Idle time between record reads in milliseconds
*/
public long getIdleTimeBetweenReadsInMillis() {
return idleTimeBetweenReadsInMillis;
}

/**
Expand Down Expand Up @@ -116,6 +140,8 @@ public void dumpKinesisConfig() {
this.streamName,System.getenv("KINESIS_STREAM_NAME")));
log.info(String.format("region: %s [env KINESIS_REGION: %s]",
this.region.toString(),System.getenv("KINESIS_REGION")));
log.info(String.format("idleTimeBetweenReadsInMillis: %d [env KINESIS_IDLETIME_BETWEEN_READS_MILLIS: %s]",
this.idleTimeBetweenReadsInMillis,System.getenv("KINESIS_IDLETIME_BETWEEN_READS_MILLIS")));
log.info(String.format("initialPositionInStream: %s [env KINESIS_INITIAL_POSITION_IN_STREAM: %s]",
this.initialPositionInStream,System.getenv("KINESIS_INITIAL_POSITION_IN_STREAM")));
log.info(String.format("failoverTimeMillis: %d [env KINESIS_FAILOVER_TIME_MILLIS: %s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
Expand Down Expand Up @@ -108,16 +110,23 @@ private void run() {
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.leaseManagementConfig()
.failoverTimeMillis(this.config.getFailoverTimeMillis()),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(
new PollingConfig(
this.config.getStreamName(),
this.kinesisClient
this.kinesisClient
)
.maxRecords(this.config.getMaxRecords())
.idleTimeBetweenReadsInMillis(this.config.getIdleTimeBetweenReadsInMillis())
)
.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(this.config.getInitialPositionInStream())
)
);

/**
Expand Down

0 comments on commit 88913e1

Please sign in to comment.