Skip to content

Commit

Permalink
SQS source plugin implementation (#5274)
Browse files Browse the repository at this point in the history
* initial poc of new sqs-source plugin

Signed-off-by: Jeremy Michael <[email protected]>

* modified README

Signed-off-by: Jeremy Michael <[email protected]>

* Changed parser to format described in issue

Signed-off-by: Jeremy Michael <[email protected]>

* Renamed sqs-source-new to sqs-source and removed deprecated sqs-source implementation

Signed-off-by: Jeremy Michael <[email protected]>

* Update README.md

Signed-off-by: Jeremy Michael <[email protected]>

* Update README.md

Signed-off-by: Jeremy Michael <[email protected]>

* Add queueUrl and sentTimestamp metadata to SQS events

Signed-off-by: Jeremy Michael <[email protected]>

* Assign individual BufferAccumulators to each SQS worker

Signed-off-by: Jeremy Michael <[email protected]>

* added unit tests and addressed comments in design doc

Signed-off-by: Jeremy Michael <[email protected]>

* added additional unit tests

Signed-off-by: Jeremy Michael <[email protected]>

* Fix region loading check

Signed-off-by: Jeremy Michael <[email protected]>

* addressed PR comments

Signed-off-by: Jeremy Michael <[email protected]>

* addressed PR comments - modified Buffer and parser implementation

Signed-off-by: Jeremy Michael <[email protected]>

* added copyright headers to tests and updated metadata naming

Signed-off-by: Jeremy Michael <[email protected]>

* some cleanup and converted all metadata to lowerCamelCase

Signed-off-by: Jeremy Michael <[email protected]>

---------

Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Co-authored-by: Jeremy Michael <[email protected]>
  • Loading branch information
jmsusanto and Jeremy Michael authored Jan 9, 2025
1 parent 0d03cc9 commit 2aa376e
Show file tree
Hide file tree
Showing 36 changed files with 1,847 additions and 1,159 deletions.
78 changes: 15 additions & 63 deletions data-prepper-plugins/sqs-source/README.md
Original file line number Diff line number Diff line change
@@ -1,70 +1,22 @@
# SQS Source
# SQS Source

This source allows Data Prepper to use SQS as a source. It uses SQS for notifications
of which data are new and loads those messages to push out events.
This source allows Data Prepper to use SQS as a source. It reads messages from specified SQS queues and processes them into events.

### Example:
## Example Configuration

The following configuration shows a minimum configuration for reading and Sqs messages and push out events.

```
```yaml
sqs-pipeline:
source:
sqs:
acknowledgments: true
queue_urls:
- https://sqs.us-east-1.amazonaws.com/895099421235/MyQueue-1
- https://sqs.us-east-1.amazonaws.com/895099421235/MyQueue-2
- https://sqs.us-east-1.amazonaws.com/895099421235/MyQueue-3
number_of_threads : 1
batch_size : 10
visibility_timeout: PT30S
wait_time : PT20S
queues:
- url: <SQS_QUEUE_URL_1>
batch_size: 10
workers: 1
- url: <SQS_QUEUE_URL_2>
batch_size: 10
workers: 1
aws:
sts_region: us-east-1
sts_role_arn: arn:aws:iam::895099421235:role/test-arn
```

## Configuration Options

All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").

* `queue_url or queue_urls` (Required) : The SQS configuration. See [SQS Configuration](#sqs_configuration) for details.

* `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details.

* `acknowledgments` (Optional) : Enables End-to-end acknowledgments. If set to `true`, sqs message is deleted only after all events from the sqs message are successfully acknowledged by all sinks. Default value `false`.

### <a name="sqs_configuration">SQS Configuration</a>

* `number_of_threads` (Optional) : define no of threads for sqs queue processing. default to 1.
* `batch_size` (Optional) : define batch size for sqs messages processing. default to 10.
* `polling_frequency` (Optional) : Duration - A delay to place between reading and processing a batch of SQS messages and making a subsequent request. Defaults to 0 seconds.
* `visibility_timeout` (Optional) : Duration - The visibility timeout to apply to messages read from the SQS queue. Defaults to null
* `wait_time` (Optional) : Duration - The time to wait for long-polling on the SQS API. Defaults to null.

### <a name="aws_configuration">AWS Configuration</a>

The AWS configuration is the same for both SQS.

* `sts_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.

## Metrics

* `sqsMessagesReceived` - The number of SQS messages received from the queue by the SQS Source.
* `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the SQS Source.
* `sqsMessagesFailed` - The number of SQS messages that the SQS Source failed to parse.
* `sqsMessagesDeleteFailed` - The number of SQS messages that the SQS Source failed to delete from the SQS queue.
* `acknowledgementSetCallbackCounter` - The number of SQS messages processed by SQS Source and successfully acknowledge by sink.

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:sqs-source:integrationTest -Dtests.sqs.source.aws.region=<your-aws-region> -Dtests.sqs.source.queue.url=<your-queue-url>
```
region: <AWS_REGION>
sts_role_arn: <IAM_ROLE_ARN>
sink:
- stdout:
45 changes: 12 additions & 33 deletions data-prepper-plugins/sqs-source/build.gradle
Original file line number Diff line number Diff line change
@@ -1,49 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-sqs-common')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:arns'
implementation 'software.amazon.awssdk:sts'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
}
test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.sqs.source.aws.region', System.getProperty('tests.sqs.source.aws.region')
systemProperty 'tests.sqs.source.queue.url', System.getProperty('tests.sqs.source.queue.url')

filter {
includeTestsMatching '*IT'
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

class AwsAuthenticationAdapter {
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final SqsSourceConfig sqsSourceConfig;


AwsAuthenticationAdapter(
final AwsCredentialsSupplier awsCredentialsSupplier,
final SqsSourceConfig sqsSourceConfig) {
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sqsSourceConfig = sqsSourceConfig;
}

AwsCredentialsProvider getCredentialsProvider() {
final AwsAuthenticationOptions awsAuthenticationOptions = sqsSourceConfig.getAwsAuthenticationOptions();

final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
.build();

return awsCredentialsSupplier.getProvider(options);
}
}
Loading

0 comments on commit 2aa376e

Please sign in to comment.