Skip to content

Commit

Permalink
Not signinig the request for iam auth disabled cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Amanda Xiang <[email protected]>
  • Loading branch information
Amanda Xiang committed Jan 2, 2025
1 parent e525d90 commit 0fa8341
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
import org.opensearch.dataprepper.plugins.source.neptune.configuration.AwsConfig;
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.signer.NoOpSigner;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.neptunedata.NeptunedataClient;
import software.amazon.awssdk.services.sts.StsClient;
Expand All @@ -20,12 +24,28 @@

public class NeptuneDataClientFactory {
public static NeptunedataClient provideNeptuneDataClient(final NeptuneSourceConfig sourceConfig) {
final AwsConfig awsConfig = sourceConfig.getAwsConfig();
return NeptunedataClient
.builder()
.region(Region.of(sourceConfig.getRegion()))
.credentialsProvider(getAwsCredentials(Region.of(sourceConfig.getRegion()), awsConfig.getAwsStsRoleArn(), awsConfig.getAwsStsExternalId()))
.endpointOverride(URI.create(String.format("https://%s:%s", sourceConfig.getHost(), sourceConfig.getPort()))).build();
final URI endpoint = URI.create(String.format("https://%s:%s", sourceConfig.getHost(), sourceConfig.getPort()));
if (sourceConfig.isIamAuth()) {
final AwsConfig awsConfig = sourceConfig.getAwsConfig();
return NeptunedataClient.builder()
.endpointOverride(endpoint)
.region(Region.of(sourceConfig.getRegion()))
.credentialsProvider(getAwsCredentials(Region.of(sourceConfig.getRegion()), awsConfig.getAwsStsRoleArn(), awsConfig.getAwsStsExternalId()))
.build();
} else {
final ClientOverrideConfiguration clientOverrideConfiguration =
// Do not sign the request
ClientOverrideConfiguration.builder()
.putAdvancedOption(SdkAdvancedClientOption.SIGNER, new NoOpSigner())
.build();

return NeptunedataClient.builder()
.endpointOverride(endpoint)
.region(Region.of(sourceConfig.getRegion()))
.overrideConfiguration(clientOverrideConfiguration)
.credentialsProvider(AnonymousCredentialsProvider.create())
.build();
}
}

private static AwsCredentialsProvider getAwsCredentials(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public NeptuneStreamClient(final NeptuneSourceConfig config, final int batchSize
this.retryCount = 0;
}


public void setStreamPosition(final long commitNum, final long opNum) {
streamPositionInfo = new StreamPosition(commitNum, opNum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class NeptuneSourceConfig {
@JsonProperty("region")
private String region;
@JsonProperty("iam_auth")
private boolean iamAuth = false;
private boolean iamAuth;

@JsonProperty("trust_store_file_path")
private String trustStoreFilePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ Optional<AcknowledgementSet> createAcknowledgementSet(final StreamCheckpoint che
}

void shutdown() {
monitoringTask.cancel(true);
if (monitoringTask != null) {
monitoringTask.cancel(true);
}
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
this.executorService.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public boolean onNeptuneStreamException(final Exception exception, final StreamP
LOG.warn("Stream is corrupt, stopping the worker and resetting the stream.");
this.isUnrecoverableError = true;
} else {
LOG.info("Error fetching stream data, stopping processing");
LOG.info("Error fetching stream data, stopping processing: {}", exception.getMessage());
}
return false;
}
Expand Down

0 comments on commit 0fa8341

Please sign in to comment.