diff --git a/_data-prepper/managing-data-prepper/configuring-data-prepper.md b/_data-prepper/managing-data-prepper/configuring-data-prepper.md index bcff65ed4c..d6750daba4 100644 --- a/_data-prepper/managing-data-prepper/configuring-data-prepper.md +++ b/_data-prepper/managing-data-prepper/configuring-data-prepper.md @@ -128,6 +128,7 @@ extensions: region: sts_role_arn: refresh_interval: + disable_refresh: false : ... ``` @@ -148,7 +149,8 @@ Option | Required | Type | Description secret_id | Yes | String | The AWS secret name or ARN. | region | No | String | The AWS region of the secret. Defaults to `us-east-1`. sts_role_arn | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to the AWS Secrets Manager. Defaults to `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). -refresh_interval | No | Duration | The refreshment interval for AWS secrets extension plugin to poll new secret values. Defaults to `PT1H`. See [Automatically refreshing secrets](#automatically-refreshing-secrets) for details. +refresh_interval | No | Duration | The refreshment interval for the AWS Secrets extension plugin to poll new secret values. Defaults to `PT1H`. For more information, see [Automatically refreshing secrets](#automatically-refreshing-secrets). +disable_refresh | No | Boolean | Disables regular polling on the latest secret values inside the AWS secrets extension plugin. Defaults to `false`. When set to `true`, `refresh_interval` will not be used. #### Reference secrets ß diff --git a/_data-prepper/managing-data-prepper/extensions/extensions.md b/_data-prepper/managing-data-prepper/extensions/extensions.md new file mode 100644 index 0000000000..8cbfc602c7 --- /dev/null +++ b/_data-prepper/managing-data-prepper/extensions/extensions.md @@ -0,0 +1,15 @@ +--- +layout: default +title: Extensions +parent: Managing Data Prepper +has_children: true +nav_order: 18 +--- + +# Extensions + +Data Prepper extensions provide Data Prepper functionality outside of core Data Prepper pipeline components. +Many extensions provide configuration options that give Data Prepper administrators greater flexibility over Data Prepper's functionality. + +Extension configurations can be configured in the `data-prepper-config.yaml` file under the `extensions:` YAML block. + diff --git a/_data-prepper/managing-data-prepper/extensions/geoip_service.md b/_data-prepper/managing-data-prepper/extensions/geoip_service.md new file mode 100644 index 0000000000..53c21a08ff --- /dev/null +++ b/_data-prepper/managing-data-prepper/extensions/geoip_service.md @@ -0,0 +1,67 @@ +--- +layout: default +title: geoip_service +nav_order: 5 +parent: Extensions +grand_parent: Managing Data Prepper +--- + +# geoip_service + +The `geoip_service` extension configures all [`geoip`]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/geoip) processors in Data Prepper. + +## Usage + +You can configure the GeoIP service that Data Prepper uses for the `geoip` processor. +By default, the GeoIP service comes with the [`maxmind`](#maxmind) option configured. + +The following example shows how to configure the `geoip_service` in the `data-prepper-config.yaml` file: + +``` +extensions: + geoip_service: + maxmind: + database_refresh_interval: PT1H + cache_count: 16_384 +``` + +## maxmind + +The GeoIP service supports the MaxMind [GeoIP and GeoLite](https://dev.maxmind.com/geoip) databases. +By default, Data Prepper will use all three of the following [MaxMind GeoLite2](https://dev.maxmind.com/geoip/geolite2-free-geolocation-data) databases: + +* City +* Country +* ASN + +The service also downloads databases automatically to keep Data Prepper up to date with changes from MaxMind. + +You can use the following options to configure the `maxmind` extension. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`databases` | No | [database](#database) | The database configuration. +`database_refresh_interval` | No | Duration | How frequently to check for updates from MaxMind. This can be any duration in the range of 15 minutes to 30 days. Default is `PT7D`. +`cache_count` | No | Integer | The maximum cache count by number of items in the cache, with a range of 100--100,000. Default is `4096`. +`database_destination` | No | String | The name of the directory in which to store downloaded databases. Default is `{data-prepper.dir}/data/geoip`. +`aws` | No | [aws](#aws) | Configures the AWS credentials for downloading the database from Amazon Simple Storage Service (Amazon S3). +`insecure` | No | Boolean | When `true`, this options allows you to download database files over HTTP. Default is `false`. + +## database + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`city` | No | String | The URL of the city in which the database resides. Can be an HTTP URL for a manifest file, an MMDB file, or an S3 URL. +`country` | No | String | The URL of the country in which the database resides. Can be an HTTP URL for a manifest file, an MMDB file, or an S3 URL. +`asn` | No | String | The URL of the Autonomous System Number (ASN) of where the database resides. Can be an HTTP URL for a manifest file, an MMDB file, or an S3 URL. +`enterprise` | No | String | The URL of the enterprise in which the database resides. Can be an HTTP URL for a manifest file, an MMDB file, or an S3 URL. + + +## aws + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`region` | No | String | The AWS Region to use for the credentials. Default is the [standard SDK behavior for determining the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). +`sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon S3. Default is `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). +`aws_sts_header_overrides` | No | Map | A map of header overrides that the AWS Identity and Access Management (IAM) role assumes when downloading from Amazon S3. +`sts_external_id` | No | String | An STS external ID used when Data Prepper assumes the STS role. For more information, see the `ExternalID` documentation in the [STS AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) API reference. diff --git a/_data-prepper/pipelines/configuration/processors/date.md b/_data-prepper/pipelines/configuration/processors/date.md index 27b571df04..7ac1040c26 100644 --- a/_data-prepper/pipelines/configuration/processors/date.md +++ b/_data-prepper/pipelines/configuration/processors/date.md @@ -9,24 +9,32 @@ nav_order: 50 # date -The `date` processor adds a default timestamp to an event, parses timestamp fields, and converts timestamp information to the International Organization for Standardization (ISO) 8601 format. This timestamp information can be used as an event timestamp. +The `date` processor adds a default timestamp to an event, parses timestamp fields, and converts timestamp information to the International Organization for Standardization (ISO) 8601 format. This timestamp information can be used as an event timestamp. ## Configuration The following table describes the options you can use to configure the `date` processor. + Option | Required | Type | Description :--- | :--- | :--- | :--- -match | Conditionally | List | List of `key` and `patterns` where patterns is a list. The list of match can have exactly one `key` and `patterns`. There is no default value. This option cannot be defined at the same time as `from_time_received`. Include multiple date processors in your pipeline if both options should be used. -from_time_received | Conditionally | Boolean | A boolean that is used for adding default timestamp to event data from event metadata which is the time when source receives the event. Default value is `false`. This option cannot be defined at the same time as `match`. Include multiple date processors in your pipeline if both options should be used. -destination | No | String | Field to store the timestamp parsed by date processor. It can be used with both `match` and `from_time_received`. Default value is `@timestamp`. -source_timezone | No | String | Time zone used to parse dates. It is used in case the zone or offset cannot be extracted from the value. If the zone or offset are part of the value, then timezone is ignored. Find all the available timezones [the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List) in the **TZ database name** column. -destination_timezone | No | String | Timezone used for storing timestamp in `destination` field. The available timezone values are the same as `source_timestamp`. -locale | No | String | Locale is used for parsing dates. It's commonly used for parsing month names(`MMM`). It can have language, country and variant fields using IETF BCP 47 or String representation of [Locale](https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object. For example `en-US` for IETF BCP 47 and `en_US` for string representation of Locale. Full list of locale fields which includes language, country and variant can be found [the language subtag registry](https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). Default value is `Locale.ROOT`. +`match` | Conditionally | [Match](#Match) | The date match configuration. This option cannot be defined at the same time as `from_time_received`. There is no default value. +`from_time_received` | Conditionally | Boolean | When `true`, the timestamp from the event metadata, which is the time at which the source receives the event, is added to the event data. This option cannot be defined at the same time as `match`. Default is `false`. +`date_when` | No | String | Specifies under what condition the `date` processor should perform matching. Default is no condition. +`to_origination_metadata` | No | Boolean | When `true`, the matched time is also added to the event's metadata as an instance of `Instant`. Default is `false`. +`destination` | No | String | The field used to store the timestamp parsed by the date processor. Can be used with both `match` and `from_time_received`. Default is `@timestamp`. +`output_format` | No | String | Determines the format of the timestamp added to an event. Default is `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`. +`source_timezone` | No | String | The time zone used to parse dates, including when the zone or offset cannot be extracted from the value. If the zone or offset are part of the value, then the time zone is ignored. A list of all the available time zones is contained in the **TZ database name** column of [the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List). +`destination_timezone` | No | String | The time zone used for storing the timestamp in the `destination` field. A list of all the available time zones is contained in the **TZ database name** column of [the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List). +`locale` | No | String | The location used for parsing dates. Commonly used for parsing month names (`MMM`). The value can contain language, country, or variant fields in IETF BCP 47, such as `en-US`, or a string representation of the [locale](https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. A full list of locale fields, including language, country, and variant, can be found in [the language subtag registry](https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). Default is `Locale.ROOT`. + - +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`key` | Yes | String | Represents the event key against which to match patterns. Required if `match` is configured. +`patterns` | Yes | List | A list of possible patterns that the timestamp value of the key can have. The patterns are based on a sequence of letters and symbols. The `patterns` support all the patterns listed in the Java [DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. The timestamp value also supports `epoch_second`, `epoch_milli`, and `epoch_nano` values, which represent the timestamp as the number of seconds, milliseconds, and nanoseconds since the epoch. Epoch values always use the UTC time zone. ## Metrics @@ -40,5 +48,29 @@ The following table describes common [Abstract processor](https://github.com/ope The `date` processor includes the following custom metrics. -* `dateProcessingMatchSuccessCounter`: Returns the number of records that match with at least one pattern specified by the `match configuration` option. -* `dateProcessingMatchFailureCounter`: Returns the number of records that did not match any of the patterns specified by the `patterns match` configuration option. \ No newline at end of file +* `dateProcessingMatchSuccessCounter`: Returns the number of records that match at least one pattern specified by the `match configuration` option. +* `dateProcessingMatchFailureCounter`: Returns the number of records that did not match any of the patterns specified by the `patterns match` configuration option. + +## Example: Add the default timestamp to an event +The following `date` processor configuration can be used to add a default timestamp in the `@timestamp` filed applied to all events: + +```yaml +- date: + from_time_received: true + destination: "@timestamp" +``` + +## Example: Parse a timestamp to convert its format and time zone +The following `date` processor configuration can be used to parse the value of the timestamp applied to `dd/MMM/yyyy:HH:mm:ss` and write it in `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` format: + +```yaml +- date: + match: + - key: timestamp + patterns: ["dd/MMM/yyyy:HH:mm:ss"] + destination: "@timestamp" + output_format: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" + source_timezone: "America/Los_Angeles" + destination_timezone: "America/Chicago" + locale: "en_US" +``` diff --git a/_data-prepper/pipelines/configuration/processors/decompress.md b/_data-prepper/pipelines/configuration/processors/decompress.md new file mode 100644 index 0000000000..d03c236ac5 --- /dev/null +++ b/_data-prepper/pipelines/configuration/processors/decompress.md @@ -0,0 +1,49 @@ +--- +layout: default +title: decompress +parent: Processors +grand_parent: Pipelines +nav_order: 40 +--- + +# decompress + +The `decompress` processor decompresses any Base64-encoded compressed fields inside of an event. + +## Configuration + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`keys` | Yes | List | The fields in the event that will be decompressed. +`type` | Yes | Enum | The type of decompression to use for the `keys` in the event. Only `gzip` is supported. +`decompress_when` | No | String| A [Data Prepper conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) that determines when the `decompress` processor will run on certain events. +`tags_on_failure` | No | List | A list of strings with which to tag events when the processor fails to decompress the `keys` inside an event. Defaults to `_decompression_failure`. + +## Usage + +The following example shows the `decompress` processor used in `pipelines.yaml`: + +```yaml +processor: + - decompress: + decompress_when: '/some_key == null' + keys: [ "base_64_gzip_key" ] + type: gzip +``` + +## Metrics + +The following table describes common [abstract processor](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java) metrics. + +| Metric name | Type | Description | +| ------------- | ---- | -----------| +| `recordsIn` | Counter | The ingress of records to a pipeline component. | +| `recordsOut` | Counter | The egress of records from a pipeline component. | +| `timeElapsed` | Timer | The time elapsed during execution of a pipeline component. | + +### Counter + +The `decompress` processor accounts for the following metrics: + +* `processingErrors`: The number of processing errors that have occurred in the `decompress` processor. + diff --git a/_data-prepper/pipelines/configuration/processors/delete-entries.md b/_data-prepper/pipelines/configuration/processors/delete-entries.md index 0546ed67c4..33c54a0b29 100644 --- a/_data-prepper/pipelines/configuration/processors/delete-entries.md +++ b/_data-prepper/pipelines/configuration/processors/delete-entries.md @@ -3,7 +3,7 @@ layout: default title: delete_entries parent: Processors grand_parent: Pipelines -nav_order: 51 +nav_order: 41 --- # delete_entries diff --git a/_data-prepper/pipelines/configuration/processors/dissect.md b/_data-prepper/pipelines/configuration/processors/dissect.md index 2d32ba47ae..a8258bee4e 100644 --- a/_data-prepper/pipelines/configuration/processors/dissect.md +++ b/_data-prepper/pipelines/configuration/processors/dissect.md @@ -3,7 +3,7 @@ layout: default title: dissect parent: Processors grand_parent: Pipelines -nav_order: 52 +nav_order: 45 --- # dissect diff --git a/_data-prepper/pipelines/configuration/processors/drop-events.md b/_data-prepper/pipelines/configuration/processors/drop-events.md index d030f14a27..1f601c9743 100644 --- a/_data-prepper/pipelines/configuration/processors/drop-events.md +++ b/_data-prepper/pipelines/configuration/processors/drop-events.md @@ -3,7 +3,7 @@ layout: default title: drop_events parent: Processors grand_parent: Pipelines -nav_order: 53 +nav_order: 46 --- # drop_events diff --git a/_data-prepper/pipelines/configuration/processors/flatten.md b/_data-prepper/pipelines/configuration/processors/flatten.md new file mode 100644 index 0000000000..43793c2b83 --- /dev/null +++ b/_data-prepper/pipelines/configuration/processors/flatten.md @@ -0,0 +1,239 @@ +--- +layout: default +title: flatten +parent: Processors +grand_parent: Pipelines +nav_order: 48 +--- + +# flatten + +The `flatten` processor transforms nested objects inside of events into flattened structures. + +## Configuration + +The following table describes configuration options for the `flatten` processor. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`source` | Yes | String | The source key on which to perform the operation. If set to an empty string (`""`), then the processor uses the root of the event as the source. +`target` | Yes | String | The target key to put into the flattened fields. If set to an empty string (`""`), then the processor uses the root of the event as the target. +`exclude_keys` | No | List | The keys from the source field that should be excluded from processing. Default is an empty list (`[]`). +`remove_processed_fields` | No | Boolean | When `true`, the processor removes all processed fields from the source. Default is `false`. +`remove_list_indices` | No | Boolean | When `true`, the processor converts the fields from the source map into lists and puts the lists into the target field. Default is `false`. +`flatten_when` | No | String | A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as `/some-key == "test"'`, that determines whether the `flatten` processor will be run on the event. Default is `null`, which means that all events will be processed unless otherwise stated. +`tags_on_failure` | No | List | A list of tags to add to the event metadata when the event fails to process. + +## Usage + +The following examples show how the `flatten` processor can be used in Data Prepper pipelines. + +### Minimum configuration + +The following example shows only the parameters that are required for using the `flatten` processor, `source` and `target`: + +```yaml +... + processor: + - flatten: + source: "key2" + target: "flattened-key2" +... +``` +{% include copy.html %} + +For example, when the input event contains the following nested objects: + +```json +{ + "key1": "val1", + "key2": { + "key3": { + "key4": "val2" + } + } +} +``` + +The `flatten` processor creates a flattened structure under the `flattened-key2` object, as shown in the following output: + +```json +{ + "key1": "val1", + "key2": { + "key3": { + "key4": "val2" + } + }, + "flattened-key2": { + "key3.key4": "val2" + } +} +``` + +### Remove processed fields + +Use the `remove_processed_fields` option when flattening all of an event's nested objects. This removes all the event's processed fields, as shown in the following example: + +```yaml +... + processor: + - flatten: + source: "" # empty string represents root of event + target: "" # empty string represents root of event + remove_processed_fields: true +... +``` + +For example, when the input event contains the following nested objects: + +```json +{ + "key1": "val1", + "key2": { + "key3": { + "key4": "val2" + } + }, + "list1": [ + { + "list2": [ + { + "name": "name1", + "value": "value1" + }, + { + "name": "name2", + "value": "value2" + } + ] + } + ] +} +``` + + +The `flatten` processor creates a flattened structure in which all processed fields are absent, as shown in the following output: + +```json +{ + "key1": "val1", + "key2.key3.key4": "val2", + "list1[0].list2[0].name": "name1", + "list1[0].list2[0].value": "value1", + "list1[0].list2[1].name": "name2", + "list1[0].list2[1].value": "value2", +} +``` + +### Exclude specific keys from flattening + +Use the `exclude_keys` option to prevent specific keys from being flattened in the output, as shown in the following example, where the `key2` value is excluded: + +```yaml +... + processor: + - flatten: + source: "" # empty string represents root of event + target: "" # empty string represents root of event + remove_processed_fields: true + exclude_keys: ["key2"] +... +``` + +For example, when the input event contains the following nested objects: + +```json +{ + "key1": "val1", + "key2": { + "key3": { + "key4": "val2" + } + }, + "list1": [ + { + "list2": [ + { + "name": "name1", + "value": "value1" + }, + { + "name": "name2", + "value": "value2" + } + ] + } + ] +} +``` + +All other nested objects in the input event, excluding the `key2` key, will be flattened, as shown in the following example: + +```json +{ + "key1": "val1", + "key2": { + "key3": { + "key4": "val2" + } + }, + "list1[0].list2[0].name": "name1", + "list1[0].list2[0].value": "value1", + "list1[0].list2[1].name": "name2", + "list1[0].list2[1].value": "value2", +} +``` + +### Remove list indexes + +Use the `remove_list_indices` option to convert the fields from the source map into lists and put the lists into the target field, as shown in the following example: + +```yaml +... + processor: + - flatten: + source: "" # empty string represents root of event + target: "" # empty string represents root of event + remove_processed_fields: true + remove_list_indices: true +... +``` + +For example, when the input event contains the following nested objects: + +```json +{ + "key1": "val1", + "key2": { + "key3": { + "key4": "val2" + } + }, + "list1": [ + { + "list2": [ + { + "name": "name1", + "value": "value1" + }, + { + "name": "name2", + "value": "value2" + } + ] + } + ] +} +``` + +The processor removes all indexes from the output and places them into the source map as a flattened, structured list, as shown in the following example: + +```json +{ + "key1": "val1", + "key2.key3.key4": "val2", + "list1[].list2[].name": ["name1","name2"], + "list1[].list2[].value": ["value1","value2"] +} +``` diff --git a/_data-prepper/pipelines/configuration/processors/geoip.md b/_data-prepper/pipelines/configuration/processors/geoip.md new file mode 100644 index 0000000000..b7418c66c6 --- /dev/null +++ b/_data-prepper/pipelines/configuration/processors/geoip.md @@ -0,0 +1,67 @@ +--- +layout: default +title: geoip +parent: Processors +grand_parent: Pipelines +nav_order: 49 +--- + +# geoip + +The `geoip` processor enriches events with geographic information extracted from IP addresses contained in the events. +By default, Data Prepper uses the [MaxMind GeoLite2](https://dev.maxmind.com/geoip/geolite2-free-geolocation-data) geolocation database. +Data Prepper administrators can configure the databases using the [`geoip_service`]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/extensions/geoip_service) extension configuration. + +## Usage + +You can configure the `geoip` processor to work on entries. + +The minimal configuration requires at least one entry, and each entry at least one source field. + +The following configuration extracts all available geolocation data from the IP address provided in the field named `clientip`. +It will write the geolocation data to a new field named `geo`, the default source when none is configured: + +``` +my-pipeline: + processor: + - geoip: + entries: + - source: clientip +``` + +The following example excludes Autonomous System Number (ASN) fields and puts the geolocation data into a field named `clientlocation`: + +``` +my-pipeline: + processor: + - geoip: + entries: + - source: clientip + target: clientlocation + include_fields: [asn, asn_organization, network] +``` + + +## Configuration + +You can use the following options to configure the `geoip` processor. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`entries` | Yes | [entry](#entry) list | The list of entries marked for enrichment. +`geoip_when` | No | String | Specifies under what condition the `geoip` processor should perform matching. Default is no condition. +`tags_on_no_valid_ip` | No | String | The tags to add to the event metadata if the source field is not a valid IP address. This includes the localhost IP address. +`tags_on_ip_not_found` | No | String | The tags to add to the event metadata if the `geoip` processor is unable to find a location for the IP address. +`tags_on_engine_failure` | No | String | The tags to add to the event metadata if the `geoip` processor is unable to enrich an event due to an engine failure. + +## entry + +The following parameters allow you to configure a single geolocation entry. Each entry corresponds to a single IP address. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`source` | Yes | String | The key of the source field containing the IP address to geolocate. +`target` | No | String | The key of the target field in which to save the geolocation data. Default is `geo`. +`include_fields` | No | String list | The list of geolocation fields to include in the `target` object. By default, this is all the fields provided by the configured databases. +`exclude_fields` | No | String list | The list of geolocation fields to exclude from the `target` object. + diff --git a/_data-prepper/pipelines/configuration/processors/grok.md b/_data-prepper/pipelines/configuration/processors/grok.md index d1eea278d2..16f72c4968 100644 --- a/_data-prepper/pipelines/configuration/processors/grok.md +++ b/_data-prepper/pipelines/configuration/processors/grok.md @@ -3,7 +3,7 @@ layout: default title: Grok parent: Processors grand_parent: Pipelines -nav_order: 54 +nav_order: 50 --- # Grok @@ -15,26 +15,25 @@ The Grok processor uses pattern matching to structure and extract important keys The following table describes options you can use with the Grok processor to structure your data and make your data easier to query. Option | Required | Type | Description -:--- | :--- | :--- | :--- -break_on_match | No | Boolean | Specifies whether to match all patterns or stop once the first successful match is found. Default value is `true`. -grok_when | No | String | Specifies under what condition the `Grok` processor should perform matching. Default is no condition. -keep_empty_captures | No | Boolean | Enables the preservation of `null` captures. Default value is `false`. -keys_to_overwrite | No | List | Specifies which existing keys will be overwritten if there is a capture with the same key value. Default value is `[]`. -match | No | Map | Specifies which keys to match specific patterns against. Default value is an empty body. -named_captures_only | No | Boolean | Specifies whether to keep only named captures. Default value is `true`. -pattern_definitions | No | Map | Allows for custom pattern use inline. Default value is an empty body. -patterns_directories | No | List | Specifies the path of directories that contain customer pattern files. Default value is an empty list. -pattern_files_glob | No | String | Specifies which pattern files to use from the directories specified for `pattern_directories`. Default value is `*`. -target_key | No | String | Specifies a parent-level key used to store all captures. Default value is `null`. -timeout_millis | No | Integer | The maximum amount of time during which matching occurs. Setting to `0` disables the timeout. Default value is `30,000`. - - +:--- | :--- |:--- | :--- +`break_on_match` | No | Boolean | Specifies whether to match all patterns (`true`) or stop once the first successful match is found (`false`). Default is `true`. +`grok_when` | No | String | Specifies under what condition the `grok` processor should perform matching. Default is no condition. +`keep_empty_captures` | No | Boolean | Enables the preservation of `null` captures from the processed output. Default is `false`. +`keys_to_overwrite` | No | List | Specifies which existing keys will be overwritten if there is a capture with the same key value. Default is `[]`. +`match` | No | Map | Specifies which keys should match specific patterns. Default is an empty response body. +`named_captures_only` | No | Boolean | Specifies whether to keep only named captures. Default is `true`. +`pattern_definitions` | No | Map | Allows for a custom pattern that can be used inline inside the response body. Default is an empty response body. +`patterns_directories` | No | List | Specifies which directory paths contain the custom pattern files. Default is an empty list. +`pattern_files_glob` | No | String | Specifies which pattern files to use from the directories specified for `pattern_directories`. Default is `*`. +`target_key` | No | String | Specifies a parent-level key used to store all captures. Default value is `null`. +`timeout_millis` | No | Integer | The maximum amount of time during which matching occurs. Setting to `0` prevents any matching from occurring. Default is `30,000`. +`performance_metadata` | No | Boolean | Whether or not to add the performance metadata to events. Default is `false`. For more information, see [Grok performance metadata](#grok-performance-metadata). + ## Conditional grok -The Grok processor can be configured to run conditionally by using the `grok_when` option. The following is an example Grok processor configuration that uses `grok_when`: +The `grok` processor can be configured to run conditionally by using the `grok_when` option. The following is an example Grok processor configuration that uses `grok_when`: + ``` processor: - grok: @@ -46,8 +45,36 @@ processor: match: message: ['%{IPV6:clientip} %{WORD:request} %{POSINT:bytes}'] ``` +{% include copy.html %} + The `grok_when` option can take a conditional expression. This expression is detailed in the [Expression syntax](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) documentation. +## Grok performance metadata + +When the `performance_metadata` option is set to `true`, the `grok` processor adds the following metadata keys to each event: + +* `_total_grok_processing_time`: The total amount of time, in milliseconds, that the `grok` processor takes to match the event. This is the sum of the processing time based on all of the `grok` processors that ran on the event and have the `performance_metadata` option enabled. +* `_total_grok_patterns_attempted`: The total number of `grok` pattern match attempts across all `grok` processors that ran on the event. + +To include Grok performance metadata when the event is sent to the sink inside the pipeline, use the `add_entries` processor to describe the metadata you want to include, as shown in the following example: + + +```yaml +processor: + - grok: + performance_metadata: true + match: + log: "%{COMMONAPACHELOG"} + - add_entries: + entries: + - add_when: 'getMetadata("_total_grok_patterns_attempted") != null' + key: "grok_patterns_attempted" + value_expression: 'getMetadata("_total_grok_patterns_attempted")' + - add_when: 'getMetadata("_total_grok_processing_time") != null' + key: "grok_time_spent" + value_expression: 'getMetadata("_total_grok_processing_time")' +``` + ## Metrics The following table describes common [Abstract processor](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java) metrics. diff --git a/_data-prepper/pipelines/configuration/processors/list-to-map.md b/_data-prepper/pipelines/configuration/processors/list-to-map.md index 4b137f5ce8..15a90ffc24 100644 --- a/_data-prepper/pipelines/configuration/processors/list-to-map.md +++ b/_data-prepper/pipelines/configuration/processors/list-to-map.md @@ -16,10 +16,12 @@ The following table describes the configuration options used to generate target Option | Required | Type | Description :--- | :--- | :--- | :--- -`key` | Yes | String | The key of the fields to be extracted as keys in the generated mappings. `source` | Yes | String | The list of objects with `key` fields to be converted into keys for the generated map. `target` | No | String | The target for the generated map. When not specified, the generated map will be placed in the root node. +`key` | Conditionally | String | The key of the fields to be extracted as keys in the generated mappings. Must be specified if `use_source_key` is `false`. +`use_source_key` | No | Boolean | When `true`, keys in the generated map will use original keys from the source. Default is `false`. `value_key` | No | String | When specified, values given a `value_key` in objects contained in the source list will be extracted and converted into the value specified by this option based on the generated map. When not specified, objects contained in the source list retain their original value when mapped. +`extract_value` | No | Boolean | When `true`, object values from the source list will be extracted and added to the generated map. When `false`, object values from the source list are added to the generated map as they appear in the source list. Default is `false` `flatten` | No | Boolean | When `true`, values in the generated map output flatten into single items based on the `flattened_element`. Otherwise, objects mapped to values from the generated map appear as lists. `flattened_element` | Conditionally | String | The element to keep, either `first` or `last`, when `flatten` is set to `true`. @@ -302,4 +304,52 @@ Some objects in the response may have more than one element in their values, as "val-c" ] } +``` + +### Example: `use_source_key` and `extract_value` set to `true` + +The following example `pipeline.yaml` file sets `flatten` to `false`, causing the processor to output values from the generated map as a list: + +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - list_to_map: + source: "mylist" + use_source_key: true + extract_value: true + sink: + - stdout: +``` +{% include copy.html %} + +Object values from `mylist` are extracted and added to fields with the source keys `name` and `value`, as shown in the following response: + +```json +{ + "mylist": [ + { + "name": "a", + "value": "val-a" + }, + { + "name": "b", + "value": "val-b1" + }, + { + "name": "b", + "value": "val-b2" + }, + { + "name": "c", + "value": "val-c" + } + ], + "name": ["a", "b", "b", "c"], + "value": ["val-a", "val-b1", "val-b2", "val-c"] +} ``` \ No newline at end of file diff --git a/_data-prepper/pipelines/configuration/processors/map-to-list.md b/_data-prepper/pipelines/configuration/processors/map-to-list.md new file mode 100644 index 0000000000..f3393e6c46 --- /dev/null +++ b/_data-prepper/pipelines/configuration/processors/map-to-list.md @@ -0,0 +1,277 @@ +--- +layout: default +title: map_to_list +parent: Processors +grand_parent: Pipelines +nav_order: 63 +--- + +# map_to_list + +The `map_to_list` processor converts a map of key-value pairs to a list of objects. Each object contains the key and value in separate fields. + +## Configuration + +The following table describes the configuration options for the `map_to_list` processor. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`source` | Yes | String | The source map used to perform the mapping operation. When set to an empty string (`""`), it will use the root of the event as the `source`. +`target` | Yes | String | The target for the generated list. +`key_name` | No | String | The name of the field in which to store the original key. Default is `key`. +`value_name` | No | String | The name of the field in which to store the original value. Default is `value`. +`exclude_keys` | No | List | The keys in the source map that will be excluded from processing. Default is an empty list (`[]`). +`remove_processed_fields` | No | Boolean | When `true`, the processor will remove the processed fields from the source map. Default is `false`. +`convert_field_to_list` | No | Boolean | If `true`, the processor will convert the fields from the source map into lists and place them in fields in the target list. Default is `false`. +`map_to_list_when` | No | String | A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as `/some-key == "test"'`, that will be evaluated to determine whether the processor will be run on the event. Default is `null`. All events will be processed unless otherwise stated. +`tags_on_failure` | No | List | A list of tags to add to the event metadata when the event fails to process. + +## Usage + +The following examples show how the `map_to_list` processor can be used in your pipeline. + +### Example: Minimum configuration + +The following example shows the `map_to_list` processor with only the required parameters, `source` and `target`, configured: + +```yaml +... + processor: + - map_to_list: + source: "my-map" + target: "my-list" +... +``` +{% include copy.html %} + +When the input event contains the following data: + +```json +{ + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + + +The processed event will contain the following output: + +```json +{ + "my-list": [ + { + "key": "key1", + "value": "value1" + }, + { + "key": "key2", + "value": "value2" + }, + { + "key": "key3", + "value": "value3" + } + ], + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +### Example: Custom key name and value name + +The following example shows how to configure a custom key name and value name: + +```yaml +... + processor: + - map_to_list: + source: "my-map" + target: "my-list" + key_name: "name" + value_name: "data" +... +``` +{% include copy.html %} + +When the input event contains the following data: + +```json +{ + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +The processed event will contain the following output: + +```json +{ + "my-list": [ + { + "name": "key1", + "data": "value1" + }, + { + "name": "key2", + "data": "value2" + }, + { + "name": "key3", + "data": "value3" + } + ], + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +### Example: Exclude specific keys from processing and remove any processed fields + +The following example shows how to exclude specific keys and remove any processed fields from the output: + +```yaml +... + processor: + - map_to_list: + source: "my-map" + target: "my-list" + exclude_keys: ["key1"] + remove_processed_fields: true +... +``` +{% include copy.html %} + +When the input event contains the following data: +```json +{ + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +The processed event will remove the "key2" and "key3" fields, but the "my-map" object, "key1", will remain, as shown in the following output: + +```json +{ + "my-list": [ + { + "key": "key2", + "value": "value2" + }, + { + "key": "key3", + "value": "value3" + } + ], + "my-map": { + "key1": "value1" + } +} +``` + +### Example: Use convert_field_to_list + +The following example shows how to use the `convert_field_to_list` option in the processor: + +```yaml +... + processor: + - map_to_list: + source: "my-map" + target: "my-list" + convert_field_to_list: true +... +``` +{% include copy.html %} + +When the input event contains the following data: + +```json +{ + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +The processed event will convert all fields into lists, as shown in the following output: + +```json +{ + "my-list": [ + ["key1", "value1"], + ["key2", "value2"], + ["key3", "value3"] + ], + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +### Example: Use the event root as the source + +The following example shows how you can use an event's root as the source by setting the `source` setting to an empty string (`""`): + +```yaml +... + processor: + - map_to_list: + source: "" + target: "my-list" +... +``` +{% include copy.html %} + +When the input event contains the following data: + +```json +{ + "key1": "value1", + "key2": "value2", + "key3": "value3" +} +``` + +The processed event will contain the following output: + +```json +{ + "my-list": [ + { + "key": "key1", + "value": "value1" + }, + { + "key": "key2", + "value": "value2" + }, + { + "key": "key3", + "value": "value3" + } + ], + "key1": "value1", + "key2": "value2", + "key3": "value3" +} +``` diff --git a/_data-prepper/pipelines/configuration/processors/mutate-event.md b/_data-prepper/pipelines/configuration/processors/mutate-event.md index 032bc89fcd..b0172c36ae 100644 --- a/_data-prepper/pipelines/configuration/processors/mutate-event.md +++ b/_data-prepper/pipelines/configuration/processors/mutate-event.md @@ -19,3 +19,4 @@ Mutate event processors allow you to modify events in Data Prepper. The followin + diff --git a/_data-prepper/pipelines/configuration/processors/obfuscate.md b/_data-prepper/pipelines/configuration/processors/obfuscate.md index 4c33d8baab..13d906acb3 100644 --- a/_data-prepper/pipelines/configuration/processors/obfuscate.md +++ b/_data-prepper/pipelines/configuration/processors/obfuscate.md @@ -67,6 +67,8 @@ Use the following configuration options with the `obfuscate` processor. | `source` | Yes | The source field to obfuscate. | | `target` | No | The new field in which to store the obfuscated value. This leaves the original source field unchanged. When no `target` is provided, the source field updates with the obfuscated value. | | `patterns` | No | A list of regex patterns that allow you to obfuscate specific parts of a field. Only parts that match the regex pattern will obfuscate. When not provided, the processor obfuscates the whole field. | +| `obfuscate_when` | No | Specifies under what condition the Obfuscate processor should perform matching. Default is no condition. | +| `tags_on_match_failure` | No | The tag to add to an event if the obfuscate processor fails to match the pattern. | | `action` | No | The obfuscation action. As of Data Prepper 2.3, only the `mask` action is supported. | You can customize the `mask` action with the following optional configuration options. diff --git a/_data-prepper/pipelines/configuration/processors/parse-ion.md b/_data-prepper/pipelines/configuration/processors/parse-ion.md new file mode 100644 index 0000000000..0edd446c42 --- /dev/null +++ b/_data-prepper/pipelines/configuration/processors/parse-ion.md @@ -0,0 +1,56 @@ +--- +layout: default +title: parse_ion +parent: Processors +grand_parent: Pipelines +nav_order: 79 +--- + +# parse_ion + +The `parse_ion` processor parses [Amazon Ion](https://amazon-ion.github.io/ion-docs/) data. + +## Configuration + +You can configure the `parse_ion` processor with the following options. + +| Option | Required | Type | Description | +| :--- | :--- | :--- | :--- | +| `source` | No | String | The field in the `event` that is parsed. Default value is `message`. | +| `destination` | No | String | The destination field of the parsed JSON. Defaults to the root of the `event`. Cannot be `""`, `/`, or any white-space-only `string` because these are not valid `event` fields. | +| `pointer` | No | String | A JSON pointer to the field to be parsed. There is no `pointer` by default, meaning that the entire `source` is parsed. The `pointer` can access JSON array indexes as well. If the JSON pointer is invalid, then the entire `source` data is parsed into the outgoing `event`. If the key that is pointed to already exists in the `event` and the `destination` is the root, then the pointer uses the entire path of the key. | +| `tags_on_failure` | No | String | A list of strings that specify the tags to be set in the event that the processors fails or an unknown exception occurs while parsing. + +## Usage + +The following examples show how to use the `parse_ion` processor in your pipeline. + +### Example: Minimum configuration + +The following example shows the minimum configuration for the `parse_ion` processor: + +```yaml +parse-json-pipeline: + source: + stdin: + processor: + - parse_json: + source: "my_ion" + sink: + - stdout: +``` +{% include copy.html %} + +When the input event contains the following data: + +``` +{"my_ion": "{ion_value1: \"hello\", ion_value2: \"world\"}"} +``` + +The processor parses the event into the following output: + +``` +{"ion_value1": "hello", "ion_value2" : "world"} +``` + + diff --git a/_data-prepper/pipelines/configuration/processors/split-event.md b/_data-prepper/pipelines/configuration/processors/split-event.md new file mode 100644 index 0000000000..f059fe5b95 --- /dev/null +++ b/_data-prepper/pipelines/configuration/processors/split-event.md @@ -0,0 +1,52 @@ +--- +layout: default +title: split-event +parent: Processors +grand_parent: Pipelines +nav_order: 96 +--- + +# split-event + +The `split-event` processor is used to split events based on a delimiter and generates multiple events from a user-specified field. + +## Configuration + +The following table describes the configuration options for the `split-event` processor. + +| Option | Type | Description | +|------------------|---------|-----------------------------------------------------------------------------------------------| +| `field` | String | The event field to be split. | +| `delimiter_regex`| String | The regular expression used as the delimiter for splitting the field. | +| `delimiter` | String | The delimiter used for splitting the field. If not specified, the default delimiter is used. | + +# Usage + +To use the `split-event` processor, add the following to your `pipelines.yaml` file: + +``` +split-event-pipeline: + source: + http: + processor: + - split_event: + field: query + delimiter: ' ' + sink: + - stdout: +``` +{% include copy.html %} + +When an event contains the following example input: + +``` +{"query" : "open source", "some_other_field" : "abc" } +``` + +The input will be split into multiple events based on the `query` field, with the delimiter set as white space, as shown in the following example: + +``` +{"query" : "open", "some_other_field" : "abc" } +{"query" : "source", "some_other_field" : "abc" } +``` + diff --git a/_data-prepper/pipelines/configuration/sinks/opensearch.md b/_data-prepper/pipelines/configuration/sinks/opensearch.md index b4861f68fd..d485fbb2b9 100644 --- a/_data-prepper/pipelines/configuration/sinks/opensearch.md +++ b/_data-prepper/pipelines/configuration/sinks/opensearch.md @@ -50,45 +50,82 @@ pipeline: The following table describes options you can configure for the `opensearch` sink. + +Option | Required | Type | Description +:--- | :--- |:---| :--- +`hosts` | Yes | List | A list of OpenSearch hosts to write to, such as `["https://localhost:9200", "https://remote-cluster:9200"]`. +`cert` | No | String | The path to the security certificate. For example, `"config/root-ca.pem"` if the cluster uses the OpenSearch Security plugin. +`username` | No | String | The username for HTTP basic authentication. +`password` | No | String | The password for HTTP basic authentication. +`aws` | No | AWS | The [AWS](#aws) configuration. +[max_retries](#configure-max_retries) | No | Integer | The maximum number of times that the `opensearch` sink should try to push data to the OpenSearch server before considering it to be a failure. Defaults to `Integer.MAX_VALUE`. When not provided, the sink will try to push data to the OpenSearch server indefinitely and exponential backoff will increase the waiting time before a retry. +`aws_sigv4` | No | Boolean | **Deprecated in Data Prepper 2.7.** Default is `false`. Whether to use AWS Identity and Access Management (IAM) signing to connect to an Amazon OpenSearch Service domain. For your access key, secret key, and optional session token, Data Prepper uses the default credential chain (environment variables, Java system properties, `~/.aws/credential`). +`aws_region` | No | String | **Deprecated in Data Prepper 2.7.** The AWS Region (for example, `"us-east-1"`) for the domain when you are connecting to Amazon OpenSearch Service. +`aws_sts_role_arn` | No | String | **Deprecated in Data Prepper 2.7.** The IAM role that the plugin uses to sign requests sent to Amazon OpenSearch Service. If this information is not provided, then the plugin uses the default credentials. +`socket_timeout` | No | Integer | The timeout value, in milliseconds, when waiting for data to be returned (the maximum period of inactivity between two consecutive data packets). A timeout value of `0` is interpreted as an infinite timeout. If this timeout value is negative or not set, then the underlying Apache HttpClient will rely on operating system settings to manage socket timeouts. +`connect_timeout` | No | Integer| The timeout value, in milliseconds, when requesting a connection from the connection manager. A timeout value of `0` is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient will rely on operating system settings to manage connection timeouts. +`insecure` | No | Boolean | Whether or not to verify SSL certificates. If set to `true`, then certificate authority (CA) certificate verification is disabled and insecure HTTP requests are sent instead. Default is `false`. +`proxy` | No | String | The address of the [forward HTTP proxy server](https://en.wikipedia.org/wiki/Proxy_server). The format is `"<hostname or IP>:<port>"` (for example, `"example.com:8100"`, `"http://example.com:8100"`, `"112.112.112.112:8100"`). The port number cannot be omitted. +`index` | Conditionally | String | The name of the export index. Only required when the `index_type` is `custom`. The index can be a plain string, such as `my-index-name`, contain [Java date-time patterns](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html), such as `my-index-${yyyy.MM.dd}` or `my-${yyyy-MM-dd-HH}-index`, be formatted using field values, such as `my-index-${/my_field}`, or use [Data Prepper expressions](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as `my-index-${getMetadata(\"my_metadata_field\"}`. All formatting options can be combined to provide flexibility when creating static, dynamic, and rolling indexes. +`index_type` | No | String | Tells the sink plugin what type of data it is handling. Valid values are `custom`, `trace-analytics-raw`, `trace-analytics-service-map`, or `management-disabled`. Default is `custom`. +`template_type` | No | String | Defines what type of OpenSearch template to use. Available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. The `index-template` option uses composable [index templates]({{site.url}}{{site.baseurl}}/opensearch/index-templates/), which are available through the OpenSearch `_index_template` API. Composable index types offer more flexibility than the default and are necessary when an OpenSearch cluster contains existing index templates. Composable templates are available for all versions of OpenSearch and some later versions of Elasticsearch. When `distribution_version` is set to `es6`, Data Prepper enforces the `template_type` as `v1`. +`template_file` | No | String | The path to a JSON [index template]({{site.url}}{{site.baseurl}}/opensearch/index-templates/) file, such as `/your/local/template-file.json`, when `index_type` is set to `custom`. For an example template file, see [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json). If you supply a template file, then it must match the template format specified by the `template_type` parameter. +`template_content` | No | JSON | Contains all the inline JSON found inside of the index [index template]({{site.url}}{{site.baseurl}}/opensearch/index-templates/). For an example of template content, see [the example template content](#example_template_content). +`document_id_field` | No | String | **Deprecated in Data Prepper 2.7 in favor of `document_id`.** The field from the source data to use for the OpenSearch document ID (for example, `"my-field"`) if `index_type` is `custom`. +`document_id` | No | String | A format string to use as the `_id` in OpenSearch documents. To specify a single field in an event, use `${/my_field}`. You can also use Data Prepper expressions to construct the `document_id`, for example, `${getMetadata(\"some_metadata_key\")}`. These options can be combined into more complex formats, such as `${/my_field}-test-${getMetadata(\"some_metadata_key\")}`. +`document_version` | No | String | A format string to use as the `_version` in OpenSearch documents. To specify a single field in an event, use `${/my_field}`. You can also use Data Prepper expressions to construct the `document_version`, for example, `${getMetadata(\"some_metadata_key\")}`. These options can be combined into more complex versions, such as `${/my_field}${getMetadata(\"some_metadata_key\")}`. The `document_version` format must evaluate to a long type and can only be used when `document_version_type` is set to either `external` or `external_gte`. +`document_version_type` | No | String | The document version type for index operations. Must be one of `external`, `external_gte`, or `internal`. If set to `external` or `external_gte`, then `document_version` is required. +`dlq_file` | No | String | The path to your preferred dead letter queue file (such as `/your/local/dlq-file`). Data Prepper writes to this file when it fails to index a document on the OpenSearch cluster. +`dlq` | No | N/A | [DLQ configurations]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/dlq/). +`bulk_size` | No | Integer (long) | The maximum size (in MiB) of bulk requests sent to the OpenSearch cluster. Values below `0` indicate an unlimited size. If a single document exceeds the maximum bulk request size, then Data Prepper sends each request individually. Default value is `5`. +`ism_policy_file` | No | String | The absolute file path for an Index State Management (ISM) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, the `custom` index type is currently the only type without a built-in policy file, so it will use this policy file if it is provided through this parameter. For more information about the policy JSON file, see [ISM policies]({{site.url}}{{site.baseurl}}/im-plugin/ism/policies/). +`number_of_shards` | No | Integer | The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when `template_file` is either explicitly provided in the sink configuration or built in. If this parameter is set, then it will override the value in the index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/). +`number_of_replicas` | No | Integer | The number of replica shards that each primary shard should have on the destination OpenSearch server. For example, if you have 4 primary shards and set `number_of_replicas` to `3`, then the index has 12 replica shards. This parameter is effective only when `template_file` is either explicitly provided in the sink configuration or built in. If this parameter is set, then it will override the value in the index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/). +`distribution_version` | No | String | Indicates whether the backend version of the sink is Elasticsearch 6 or later. `es6` represents Elasticsearch 6. `default` represents the latest compatible backend version, such as Elasticsearch 7.x, OpenSearch 1.x, or OpenSearch 2.x. Default is `default`. +`enable_request_compression` | No | Boolean | Whether to enable compression when sending requests to OpenSearch. When `distribution_version` is set to `es6`, default is `false`. For all other distribution versions, default is `true`. +`action` | No | String | The OpenSearch bulk action to use for documents. Must be one of `create`, `index`, `update`, `upsert`, or `delete`. Default is `index`. +`actions` | No | List | A [list of actions](#actions) that can be used as an alternative to `action`, which reads as a switch case statement that conditionally determines the bulk action to take for an event. +`flush_timeout` | No | Long | A long class that contains the amount of time, in milliseconds, to try packing a bulk request up to the `bulk_size` before flushing the request. If this timeout expires before a bulk request has reached the `bulk_size`, the request will be flushed. Set to `-1` to disable the flush timeout and instead flush whatever is present at the end of each batch. Default is `60,000`, or 1 minute. +`normalize_index` | No | Boolean | If true, then the OpenSearch sink will try to create dynamic index names. Index names with format options specified in `${})` are valid according to the [index naming restrictions]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/#index-naming-restrictions). Any invalid characters will be removed. Default value is `false`. +`routing` | No | String | A string used as a hash for generating the `shard_id` for a document when it is stored in OpenSearch. Each incoming record is searched. When present, the string is used as the routing field for the document. When not present, the default routing mechanism (`document_id`) is used by OpenSearch when storing the document. Supports formatting with fields in events and [Data Prepper expressions]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/expression-syntax/), such as `${/my_field}-test-${getMetadata(\"some_metadata_key\")}`. +`document_root_key` | No | String | The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist, then the entire event is written as the document. If `document_root_key` is of a basic value type, such as a string or integer, then the document will have a structure of `{"data": }`. +`serverless` | No | Boolean | Determines whether the OpenSearch backend is Amazon OpenSearch Serverless. Set this value to `true` when the destination for the `opensearch` sink is an Amazon OpenSearch Serverless collection. Default is `false`. +`serverless_options` | No | Object | The network configuration options available when the backend of the `opensearch` sink is set to Amazon OpenSearch Serverless. For more information, see [Serverless options](#serverless-options). + + + +## aws + Option | Required | Type | Description :--- | :--- | :--- | :--- -hosts | Yes | List | List of OpenSearch hosts to write to (for example, `["https://localhost:9200", "https://remote-cluster:9200"]`). -cert | No | String | Path to the security certificate (for example, `"config/root-ca.pem"`) if the cluster uses the OpenSearch Security plugin. -username | No | String | Username for HTTP basic authentication. -password | No | String | Password for HTTP basic authentication. -aws_sigv4 | No | Boolean | Default value is false. Whether to use AWS Identity and Access Management (IAM) signing to connect to an Amazon OpenSearch Service domain. For your access key, secret key, and optional session token, Data Prepper uses the default credential chain (environment variables, Java system properties, `~/.aws/credential`, etc.). -aws_region | No | String | The AWS region (for example, `"us-east-1"`) for the domain if you are connecting to Amazon OpenSearch Service. -aws_sts_role_arn | No | String | IAM role that the plugin uses to sign requests sent to Amazon OpenSearch Service. If this information is not provided, the plugin uses the default credentials. -[max_retries](#configure-max_retries) | No | Integer | The maximum number of times the OpenSearch sink should try to push data to the OpenSearch server before considering it to be a failure. Defaults to `Integer.MAX_VALUE`. If not provided, the sink will try to push data to the OpenSearch server indefinitely because the default value is high and exponential backoff would increase the waiting time before retry. -socket_timeout | No | Integer | The timeout, in milliseconds, waiting for data to return (or the maximum period of inactivity between two consecutive data packets). A timeout value of zero is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient would rely on operating system settings for managing socket timeouts. -connect_timeout | No | Integer | The timeout in milliseconds used when requesting a connection from the connection manager. A timeout value of zero is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient would rely on operating system settings for managing connection timeouts. -insecure | No | Boolean | Whether or not to verify SSL certificates. If set to true, certificate authority (CA) certificate verification is disabled and insecure HTTP requests are sent instead. Default value is `false`. -proxy | No | String | The address of a [forward HTTP proxy server](https://en.wikipedia.org/wiki/Proxy_server). The format is "<host name or IP>:<port>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Port number cannot be omitted. -index | Conditionally | String | Name of the export index. Applicable and required only when the `index_type` is `custom`. -index_type | No | String | This index type tells the Sink plugin what type of data it is handling. Valid values: `custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management-disabled`. Default value is `custom`. -template_type | No | String | Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. The `index-template` option uses composable [index templates]({{site.url}}{{site.baseurl}}/opensearch/index-templates/) which are available through OpenSearch's `_index_template` API. Composable index types offer more flexibility than the default and are necessary when an OpenSearch cluster has already existing index templates. Composable templates are available for all versions of OpenSearch and some later versions of Elasticsearch. When `distribution_version` is set to `es6`, Data Prepper enforces the `template_type` as `v1`. -template_file | No | String | The path to a JSON [index template]({{site.url}}{{site.baseurl}}/opensearch/index-templates/) file such as `/your/local/template-file.json` when `index_type` is set to `custom`. For an example template file, see [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json). If you supply a template file it must match the template format specified by the `template_type` parameter. -document_id_field | No | String | The field from the source data to use for the OpenSearch document ID (for example, `"my-field"`) if `index_type` is `custom`. -dlq_file | No | String | The path to your preferred dead letter queue file (for example, `/your/local/dlq-file`). Data Prepper writes to this file when it fails to index a document on the OpenSearch cluster. -dlq | No | N/A | DLQ configurations. See [Dead Letter Queues]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/dlq/) for details. If the `dlq_file` option is also available, the sink will fail. -bulk_size | No | Integer (long) | The maximum size (in MiB) of bulk requests sent to the OpenSearch cluster. Values below 0 indicate an unlimited size. If a single document exceeds the maximum bulk request size, Data Prepper sends it individually. Default value is 5. -ism_policy_file | No | String | The absolute file path for an ISM (Index State Management) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, `custom` index type is currently the only one without a built-in policy file, thus it would use the policy file here if it's provided through this parameter. For more information, see [ISM policies]({{site.url}}{{site.baseurl}}/im-plugin/ism/policies/). -number_of_shards | No | Integer | The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/). -number_of_replicas | No | Integer | The number of replica shards each primary shard should have on the destination OpenSearch server. For example, if you have 4 primary shards and set number_of_replicas to 3, the index has 12 replica shards. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/). -distribution_version | No | String | Indicates whether the sink backend version is Elasticsearch 6 or later. `es6` represents Elasticsearch 6. `default` represents the latest compatible backend version, such as Elasticsearch 7.x, OpenSearch 1.x, or OpenSearch 2.x. Default is `default`. -enable_request_compression | No | Boolean | Whether to enable compression when sending requests to OpenSearch. When `distribution_version` is set to `es6`, default is `false`. For all other distribution versions, default is `true`. -serverless | No | Boolean | Determines whether the OpenSearch backend is Amazon OpenSearch Serverless. Set this value to `true` when the destination for the `opensearch` sink is an Amazon OpenSearch Serverless collection. Default is `false`. -serverless_options | No | Object | The network configuration options available when the backend of the `opensearch` sink is set to Amazon OpenSearch Serverless. For more information, see [Serverless options](#serverless-options). - -### Serverless options +`region` | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). +`sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to `null`, which will use [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). +`sts_header_overrides` | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. +`sts_external_id` | No | String | The external ID to attach to AssumeRole requests from AWS STS. +`serverless` | No | Boolean | **Deprecated in Data Prepper 2.7. Use this option with the `aws` configuration instead.** Determines whether the OpenSearch backend is Amazon OpenSearch Serverless. Set this value to `true` when the destination for the `opensearch` sink is an Amazon OpenSearch Serverless collection. Default is `false`. +`serverless_options` | No | Object | **Deprecated in Data Prepper 2.7. Use this option with the `aws` configuration instead.** The network configuration options available when the backend of the `opensearch` sink is set to Amazon OpenSearch Serverless. For more information, see [Serverless options](#serverless-options). + + +## actions + + +The following options can be used inside the `actions` option. + +Option | Required | Type | Description +:--- |:---| :--- | :--- +`type` | Yes | String | The type of bulk action to use if the `when` condition evaluates to true. Must be either `create`, `index`, `update`, `upsert`, or `delete`. +`when` | No | String | A [Data Prepper expression]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/expression-syntax/) that conditionally evaluates whether an event will be sent to OpenSearch using the bulk action configured in `type`. When empty, the bulk action will be chosen automatically when the event is sent to OpenSearch. + + +## Serverless options The following options can be used in the `serverless_options` object. Option | Required | Type | Description :--- | :--- | :---| :--- -network_policy_name | Yes | String | The name of the network policy to create. -collection_name | Yes | String | The name of the Amazon OpenSearch Serverless collection to configure. -vpce_id | Yes | String | The virtual private cloud (VPC) endpoint to which the source connects. +`network_policy_name` | Yes | String | The name of the network policy to create. +`collection_name` | Yes | String | The name of the Amazon OpenSearch Serverless collection to configure. +`vpce_id` | Yes | String | The virtual private cloud (VPC) endpoint to which the source connects. ### Configure max_retries @@ -191,7 +228,6 @@ If your domain uses a master user in the internal user database, specify the mas sink: opensearch: hosts: ["https://your-fgac-amazon-opensearch-service-endpoint"] - aws_sigv4: false username: "master-username" password: "master-password" ``` @@ -302,3 +338,53 @@ log-pipeline: sts_role_arn: "arn:aws:iam:::role/PipelineRole" region: "us-east-1" ``` + +### Example with template_content and actions + +The following example pipeline contains both `template_content` and a list of conditional `actions`: + +```yaml +log-pipeline: + source: + http: + processor: + - date: + from_time_received: true + destination: "@timestamp" + sink: + - opensearch: + hosts: [ "https://" ] + index: "my-serverless-index" + template_type: index-template + template_content: > + { + "template" : { + "mappings" : { + "properties" : { + "Data" : { + "type" : "binary" + }, + "EncodedColors" : { + "type" : "binary" + }, + "Type" : { + "type" : "keyword" + }, + "LargeDouble" : { + "type" : "double" + } + } + } + } + } + # index is the default case + actions: + - type: "delete" + when: '/operation == "delete"' + - type: "update" + when: '/operation == "update"' + - type: "index" + aws: + sts_role_arn: "arn:aws:iam:::role/PipelineRole" + region: "us-east-1" +``` diff --git a/_data-prepper/pipelines/configuration/sinks/s3.md b/_data-prepper/pipelines/configuration/sinks/s3.md index cb881e814a..c752bf6b3d 100644 --- a/_data-prepper/pipelines/configuration/sinks/s3.md +++ b/_data-prepper/pipelines/configuration/sinks/s3.md @@ -8,7 +8,22 @@ nav_order: 55 # s3 -The `s3` sink saves batches of events to [Amazon Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/) objects. +The `s3` sink saves and writes batches of Data Prepper events to Amazon Simple Storage Service (Amazon S3) objects. The configured `codec` determines how the `s3` sink serializes the data into Amazon S3. + +The `s3` sink uses the following format when batching events: + +``` +${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension} +``` + +When a batch of objects is written to S3, the objects are formatted similarly to the following: + +``` +my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json +``` + + +For more information about how to configure an object, see the [Object key](#object-key-configuration) section. ## Usage @@ -22,14 +37,12 @@ pipeline: aws: region: us-east-1 sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper - sts_header_overrides: max_retries: 5 - bucket: - name: bucket_name - object_key: - path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/ + bucket: mys3bucket + object_key: + path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/ threshold: - event_count: 2000 + event_count: 10000 maximum_size: 50mb event_collect_timeout: 15s codec: @@ -37,17 +50,37 @@ pipeline: buffer_type: in_memory ``` +## IAM permissions + +In order to use the `s3` sink, configure AWS Identity and Access Management (IAM) to grant Data Prepper permissions to write to Amazon S3. You can use a configuration similar to the following JSON configuration: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "s3-access", + "Effect": "Allow", + "Action": [ + "s3:PutObject" + ], + "Resource": "arn:aws:s3:::/*" + } + ] +} +``` + ## Configuration Use the following options when customizing the `s3` sink. Option | Required | Type | Description :--- | :--- | :--- | :--- -`bucket` | Yes | String | The object from which the data is retrieved and then stored. The `name` must match the name of your object store. -`codec` | Yes | [Buffer type](#buffer-type) | Determines the buffer type. +`bucket` | Yes | String | The name of the S3 bucket to which the sink writes. +`codec` | Yes | [Codec](#codec) | The codec that determines how the data is serialized in the S3 object. `aws` | Yes | AWS | The AWS configuration. See [aws](#aws) for more information. `threshold` | Yes | [Threshold](#threshold-configuration) | Configures when to write an object to S3. -`object_key` | No | Sets the `path_prefix` and the `file_pattern` of the object store. Defaults to the S3 object `events-%{yyyy-MM-dd'T'hh-mm-ss}` found inside the root directory of the bucket. +`object_key` | No | [Object key](#object-key-configuration) | Sets the `path_prefix` of the object in S3. Defaults to the S3 object `events-%{yyyy-MM-dd'T'hh-mm-ss}` found in the root directory of the bucket. `compression` | No | String | The compression algorithm to apply: `none`, `gzip`, or `snappy`. Default is `none`. `buffer_type` | No | [Buffer type](#buffer-type) | Determines the buffer type. `max_retries` | No | Integer | The maximum number of times a single request should retry when ingesting data to S3. Defaults to `5`. @@ -59,33 +92,34 @@ Option | Required | Type | Description `region` | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). `sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). `sts_header_overrides` | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. -`sts_external_id` | No | String | The external ID to attach to AssumeRole requests from AWS STS. +`sts_external_id` | No | String | An STS external ID used when Data Prepper assumes the role. For more information, see the `ExternalId` documentation in the [STS AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) API reference. + ## Threshold configuration -Use the following options to set ingestion thresholds for the `s3` sink. +Use the following options to set ingestion thresholds for the `s3` sink. When any of these conditions are met, Data Prepper will write events to an S3 object. Option | Required | Type | Description :--- | :--- | :--- | :--- -`event_count` | Yes | Integer | The maximum number of events the S3 bucket can ingest. -`maximum_size` | Yes | String | The maximum number of bytes that the S3 bucket can ingest after compression. Defaults to `50mb`. -`event_collect_timeout` | Yes | String | Sets the time period during which events are collected before ingestion. All values are strings that represent duration, either an ISO_8601 notation string, such as `PT20.345S`, or a simple notation, such as `60s` or `1500ms`. +`event_count` | Yes | Integer | The number of Data Prepper events to accumulate before writing an object to S3. +`maximum_size` | No | String | The maximum number of bytes to accumulate before writing an object to S3. Default is `50mb`. +`event_collect_timeout` | Yes | String | The maximum amount of time before Data Prepper writes an event to S3. The value should be either an ISO-8601 duration, such as `PT2M30S`, or a simple notation, such as `60s` or `1500ms`. ## Buffer type -`buffer_type` is an optional configuration that records stored events temporarily before flushing them into an S3 bucket. The default value is `in_memory`. Use one of the following options: +`buffer_type` is an optional configuration that determines how Data Prepper temporarily stores data before writing an object to S3. The default value is `in_memory`. Use one of the following options: - `in_memory`: Stores the record in memory. -- `local_file`: Flushes the record into a file on your machine. +- `local_file`: Flushes the record into a file on your local machine. This uses your machine's temporary directory. - `multipart`: Writes using the [S3 multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html). Every 10 MB is written as a part. ## Object key configuration Option | Required | Type | Description :--- | :--- | :--- | :--- -`path_prefix` | Yes | String | The S3 key prefix path to use. Accepts date-time formatting. For example, you can use `%{yyyy}/%{MM}/%{dd}/%{HH}/` to create hourly folders in S3. By default, events write to the root of the bucket. +`path_prefix` | No | String | The S3 key prefix path to use for objects written to S3. Accepts date-time formatting. For example, you can use `%{yyyy}/%{MM}/%{dd}/%{HH}/` to create hourly folders in S3. The prefix path should end with `/`. By default, Data Prepper writes objects to the root of the S3 bucket. ## codec @@ -156,3 +190,49 @@ Option | Required | Type | Description `schema` | Yes | String | The Avro [schema declaration](https://avro.apache.org/docs/current/specification/#schema-declaration). Not required if `auto_schema` is set to true. `auto_schema` | No | Boolean | When set to `true`, automatically generates the Avro [schema declaration](https://avro.apache.org/docs/current/specification/#schema-declaration) from the first event. +### Setting a schema with Parquet + +The following example shows you how to configure the `s3` sink to write Parquet data into a Parquet file using a schema for [VPC Flow Logs](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html#flow-log-records): + +``` +pipeline: + ... + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper + bucket: mys3bucket + object_key: + path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/ + codec: + parquet: + schema: > + { + "type" : "record", + "namespace" : "org.opensearch.dataprepper.examples", + "name" : "VpcFlowLog", + "fields" : [ + { "name" : "version", "type" : ["null", "string"]}, + { "name" : "srcport", "type": ["null", "int"]}, + { "name" : "dstport", "type": ["null", "int"]}, + { "name" : "accountId", "type" : ["null", "string"]}, + { "name" : "interfaceId", "type" : ["null", "string"]}, + { "name" : "srcaddr", "type" : ["null", "string"]}, + { "name" : "dstaddr", "type" : ["null", "string"]}, + { "name" : "start", "type": ["null", "int"]}, + { "name" : "end", "type": ["null", "int"]}, + { "name" : "protocol", "type": ["null", "int"]}, + { "name" : "packets", "type": ["null", "int"]}, + { "name" : "bytes", "type": ["null", "int"]}, + { "name" : "action", "type": ["null", "string"]}, + { "name" : "logStatus", "type" : ["null", "string"]} + ] + } + threshold: + event_count: 500000000 + maximum_size: 20mb + event_collect_timeout: PT15M + buffer_type: in_memory +``` + diff --git a/_data-prepper/pipelines/configuration/sources/dynamo-db.md b/_data-prepper/pipelines/configuration/sources/dynamo-db.md index 597e835151..f75489f103 100644 --- a/_data-prepper/pipelines/configuration/sources/dynamo-db.md +++ b/_data-prepper/pipelines/configuration/sources/dynamo-db.md @@ -31,6 +31,7 @@ cdc-pipeline: s3_prefix: "myprefix" stream: start_position: "LATEST" # Read latest data from streams (Default) + view_on_remove: NEW_IMAGE aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role" @@ -84,12 +85,112 @@ Option | Required | Type | Description The following option lets you customize how the pipeline reads events from the DynamoDB table. -Option | Required | Type | Description +Option | Required | Type | Description :--- | :--- | :--- | :--- `start_position` | No | String | The position from where the source starts reading stream events when the DynamoDB stream option is enabled. `LATEST` starts reading events from the most recent stream record. +`view_on_remove` | No | Enum | The [stream record view](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to use for REMOVE events from DynamoDB streams. Must be either `NEW_IMAGE` or `OLD_IMAGE` . Defaults to `NEW_IMAGE`. If the `OLD_IMAGE` option is used and the old image can not be found, the source will find the `NEW_IMAGE`. + +## Exposed metadata attributes + +The following metadata will be added to each event that is processed by the `dynamodb` source. These metadata attributes can be accessed using the [expression syntax `getMetadata` function](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/#getmetadata). + +* `primary_key`: The primary key of the DynamoDB item. For tables that only contain a partition key, this value provides the partition key. For tables that contain both a partition and sort key, the `primary_key` attribute will be equal to the partition and sort key, separated by a `|`, for example, `partition_key|sort_key`. +* `partition_key`: The partition key of the DynamoDB item. +* `sort_key`: The sort key of the DynamoDB item. This will be null if the table does not contain a sort key. +* `dynamodb_timestamp`: The timestamp of the DynamoDB item. This will be the export time for export items and the DynamoDB stream event time for stream items. This timestamp is used by sinks to emit an `EndtoEndLatency` metric for DynamoDB stream events that tracks the latency between a change occurring in the DynamoDB table and that change being applied to the sink. +* `document_version`: Uses the `dynamodb_timestamp` to modify break ties between stream items that are received in the same second. Recommend for use with the `opensearch` sink's `document_version` setting. +* `opensearch_action`: A default value for mapping DynamoDB event actions to OpenSearch actions. This action will be `index` for export items, and `INSERT` or `MODIFY` for stream events, and `REMOVE` stream events when the OpenSearch action is `delete`. +* `dynamodb_event_name`: The exact event type for the item. Will be `null` for export items and either `INSERT`, `MODIFY`, or `REMOVE` for stream events. +* `table_name`: The name of the DynamoDB table that an event came from. + + +## Permissions + +The following are the minimum required permissions for running DynamoDB as a source: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "allowDescribeTable", + "Effect": "Allow", + "Action": [ + "dynamodb:DescribeTable" + ], + "Resource": [ + "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table" + ] + }, + { + "Sid": "allowRunExportJob", + "Effect": "Allow", + "Action": [ + "dynamodb:DescribeContinuousBackups", + "dynamodb:ExportTableToPointInTime" + ], + "Resource": [ + "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table" + ] + }, + { + "Sid": "allowCheckExportjob", + "Effect": "Allow", + "Action": [ + "dynamodb:DescribeExport" + ], + "Resource": [ + "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/export/*" + ] + }, + { + "Sid": "allowReadFromStream", + "Effect": "Allow", + "Action": [ + "dynamodb:DescribeStream", + "dynamodb:GetRecords", + "dynamodb:GetShardIterator" + ], + "Resource": [ + "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/stream/*" + ] + }, + { + "Sid": "allowReadAndWriteToS3ForExport", + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:AbortMultipartUpload", + "s3:PutObject", + "s3:PutObjectAcl" + ], + "Resource": [ + "arn:aws:s3:::my-bucket/*" + ] + } + ] +} +``` + +When performing an export, the `"Sid": "allowReadFromStream"` section is not required. If only reading from DynamoDB streams, the +`"Sid": "allowReadAndWriteToS3ForExport"`, `"Sid": "allowCheckExportjob"`, and ` "Sid": "allowRunExportJob"` sections are not required. + +## Metrics +The `dynamodb` source includes the following metrics. +### Counters +* `exportJobSuccess`: The number of export jobs that have been submitted successfully. +* `exportJobFailure`: The number of export job submission attempts that have failed. +* `exportS3ObjectsTotal`: The total number of export data files found in S3. +* `exportS3ObjectsProcessed`: The total number of export data files that have been processed successfully from S3. +* `exportRecordsTotal`: The total number of records found in the export. +* `exportRecordsProcessed`: The total number of export records that have been processed successfully. +* `exportRecordsProcessingErrors`: The number of export record processing errors. +* `changeEventsProcessed`: The number of change events processed from DynamoDB streams. +* `changeEventsProcessingErrors`: The number of processing errors for change events from DynamoDB streams. +* `shardProgress`: The incremented shard progress when DynamoDB streams are being read correctly. This being`0` for any significant amount of time means there is a problem with the pipeline that has streams enabled. diff --git a/_data-prepper/pipelines/configuration/sources/s3.md b/_data-prepper/pipelines/configuration/sources/s3.md index 7dc31caade..e88ab2eb02 100644 --- a/_data-prepper/pipelines/configuration/sources/s3.md +++ b/_data-prepper/pipelines/configuration/sources/s3.md @@ -8,7 +8,10 @@ nav_order: 20 # s3 source -`s3` is a source plugin that reads events from [Amazon Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/) objects. It requires an [Amazon Simple Queue Service (Amazon SQS)](https://aws.amazon.com/sqs/) queue that receives [S3 Event Notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html). After Amazon SQS is configured, the `s3` source receives messages from Amazon SQS. When the SQS message indicates that an S3 object was created, the `s3` source loads the S3 objects and then parses them using the configured [codec](#codec). You can also configure the `s3` source to use [Amazon S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html) instead of Data Prepper to parse S3 objects. +`s3` is a source plugin that reads events from [Amazon Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/) objects. You can configure the source to either use an [Amazon Simple Queue Service (Amazon SQS)](https://aws.amazon.com/sqs/) queue or scan an S3 bucket: + +- To use Amazon SQS notifications, configure S3 event notifications on your S3 bucket. After Amazon SQS is configured, the `s3` source receives messages from Amazon SQS. When the SQS message indicates that an S3 object has been created, the `s3` source loads the S3 objects and then parses them using the configured [codec](#codec). +- To use an S3 bucket, configure the `s3` source to use Amazon S3 Select instead of Data Prepper to parse S3 objects. ## IAM permissions @@ -86,19 +89,26 @@ Option | Required | Type | Description :--- | :--- | :--- | :--- `notification_type` | Yes | String | Must be `sqs`. `notification_source` | No | String | Determines how notifications are received by SQS. Must be `s3` or `eventbridge`. `s3` represents notifications that are directly sent from Amazon S3 to Amazon SQS or fanout notifications from Amazon S3 to Amazon Simple Notification Service (Amazon SNS) to Amazon SQS. `eventbridge` represents notifications from [Amazon EventBridge](https://aws.amazon.com/eventbridge/) and [Amazon Security Lake](https://aws.amazon.com/security-lake/). Default is `s3`. -`compression` | No | String | The compression algorithm to apply: `none`, `gzip`, or `automatic`. Default is `none`. +`compression` | No | String | The compression algorithm to apply: `none`, `gzip`, `snappy`, or `automatic`. Default is `none`. `codec` | Yes | Codec | The [codec](#codec) to apply. `sqs` | Yes | SQS | The SQS configuration. See [sqs](#sqs) for more information. `aws` | Yes | AWS | The AWS configuration. See [aws](#aws) for more information. `on_error` | No | String | Determines how to handle errors in Amazon SQS. Can be either `retain_messages` or `delete_messages`. `retain_messages` leaves the message in the Amazon SQS queue and tries to send the message again. This is recommended for dead-letter queues. `delete_messages` deletes failed messages. Default is `retain_messages`. -buffer_timeout | No | Duration | The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the Amazon S3 source cannot write to the buffer during the set amount of time are discarded. Default is `10s`. +`buffer_timeout` | No | Duration | The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the Amazon S3 source cannot write to the buffer during the specified amount of time are discarded. Default is `10s`. `records_to_accumulate` | No | Integer | The number of messages that accumulate before being written to the buffer. Default is `100`. `metadata_root_key` | No | String | The base key for adding S3 metadata to each event. The metadata includes the key and bucket for each S3 object. Default is `s3/`. +`default_bucket_owner` | No | String | The AWS account ID for the owner of an S3 bucket. For more information, see [Cross-account S3 access](#s3_bucket_ownership). +`bucket_owners` | No | Map | A map of bucket names that includes the IDs of the accounts that own the buckets. For more information, see [Cross-account S3 access](#s3_bucket_ownership). `disable_bucket_ownership_validation` | No | Boolean | When `true`, the S3 source does not attempt to validate that the bucket is owned by the expected account. The expected account is the same account that owns the Amazon SQS queue. Default is `false`. `acknowledgments` | No | Boolean | When `true`, enables `s3` sources to receive [end-to-end acknowledgments]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/pipelines#end-to-end-acknowledgments) when events are received by OpenSearch sinks. `s3_select` | No | [s3_select](#s3_select) | The Amazon S3 Select configuration. `scan` | No | [scan](#scan) | The S3 scan configuration. `delete_s3_objects_on_read` | No | Boolean | When `true`, the S3 scan attempts to delete S3 objects after all events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled when deleting S3 objects. Default is `false`. +<<<<<<< HEAD +======= +`workers` | No | Integer | Configures the number of worker threads that the source uses to read data from S3. Leaving this value at the default unless your S3 objects are less than 1MB. Performance may decrease for larger S3 objects. This setting only affects SQS-based sources. Default is `1`. + +>>>>>>> 48651b0d (Data Prepper 2.7 documentation (#6763)) ## sqs @@ -112,7 +122,7 @@ Option | Required | Type | Description `visibility_timeout` | No | Duration | The visibility timeout to apply to messages read from the Amazon SQS queue. This should be set to the amount of time that Data Prepper may take to read all the S3 objects in a batch. Default is `30s`. `wait_time` | No | Duration | The amount of time to wait for long polling on the Amazon SQS API. Default is `20s`. `poll_delay` | No | Duration | A delay placed between the reading and processing of a batch of Amazon SQS messages and making a subsequent request. Default is `0s`. -`visibility_duplication_protection` | No | Boolean | If set to `true`, Data Prepper attempts to avoid duplicate processing by extending the visibility timeout of SQS messages. Until the data reaches the sink, Data Prepper will regularly call `ChangeMessageVisibility` to avoid reading the S3 object again. To use this feature, you need to grant permissions to `ChangeMessageVisibility` on the IAM role. Default is `false`. +`visibility_duplication_protection` | No | Boolean | If set to `true`, Data Prepper attempts to avoid duplicate processing by extending the visibility timeout of SQS messages. Until the data reaches the sink, Data Prepper will regularly call `ChangeMessageVisibility` to avoid rereading of the S3 object. To use this feature, you need to grant permissions to `sqs:ChangeMessageVisibility` on the IAM role. Default is `false`. `visibility_duplicate_protection_timeout` | No | Duration | Sets the maximum total length of time that a message will not be processed when using `visibility_duplication_protection`. Defaults to two hours. @@ -123,6 +133,7 @@ Option | Required | Type | Description `region` | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). `sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). `aws_sts_header_overrides` | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. +`sts_external_id` | No | String | An STS external ID used when Data Prepper assumes the STS role. For more information, see the `ExternalID` documentation in the [STS AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) API reference. ## codec @@ -154,9 +165,6 @@ Option | Required | Type | Description `header` | No | String list | The header containing the column names used to parse CSV data. `detect_header` | No | Boolean | Whether the first line of the Amazon S3 object should be interpreted as a header. Default is `true`. - - - ## Using `s3_select` with the `s3` source When configuring `s3_select` to parse Amazon S3 objects, use the following options: @@ -198,16 +206,18 @@ Option | Required | Type | Description `start_time` | No | String | The time from which to start scanning objects modified after the given `start_time`. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, for example, `023-01-23T10:00:00`. If `end_time` is configured along with `start_time`, all objects after `start_time` and before `end_time` will be processed. `start_time` and `range` cannot be used together. `end_time` | No | String | The time after which no objects will be scanned after the given `end_time`. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, for example, `023-01-23T10:00:00`. If `start_time` is configured along with `end_time`, all objects after `start_time` and before `end_time` will be processed. `end_time` and `range` cannot be used together. `range` | No | String | The time range from which objects are scanned from all buckets. Supports ISO_8601 notation strings, such as `PT20.345S` or `PT15M`, and notation strings for seconds (`60s`) and milliseconds (`1600ms`). `start_time` and `end_time` cannot be used with `range`. Range `P12H` scans all the objects modified in the last 12 hours from the time pipeline started. -`buckets` | Yes | List | A list of [buckets](#bucket) to scan. +`buckets` | Yes | List | A list of [scan buckets](#scan-bucket) to scan. `scheduling` | No | List | The configuration for scheduling periodic scans on all buckets. `start_time`, `end_time` and `range` can not be used if scheduling is configured. -### bucket + +### scan bucket + Option | Required | Type | Description :--- | :--- |:-----| :--- `bucket` | Yes | Map | Provides options for each bucket. -You can configure the following options inside the [bucket](#bucket) setting. +You can configure the following options in the `bucket` setting map. Option | Required | Type | Description :--- | :--- | :--- | :--- @@ -244,13 +254,17 @@ The `s3` source includes the following metrics: * `s3ObjectsNotFound`: The number of S3 objects that the `s3` source failed to read due to an S3 "Not Found" error. These are also counted toward `s3ObjectsFailed`. * `s3ObjectsAccessDenied`: The number of S3 objects that the `s3` source failed to read due to an "Access Denied" or "Forbidden" error. These are also counted toward `s3ObjectsFailed`. * `s3ObjectsSucceeded`: The number of S3 objects that the `s3` source successfully read. +* `s3ObjectNoRecordsFound`: The number of S3 objects that resulted in 0 records being added to the buffer by the `s3` source. +* `s3ObjectsDeleted`: The number of S3 objects deleted by the `s3` source. +* `s3ObjectsDeleteFailed`: The number of S3 objects that the `s3` source failed to delete. +* `s3ObjectsEmpty`: The number of S3 objects that are considered empty because they have a size of `0`. These objects will be skipped by the `s3` source. * `sqsMessagesReceived`: The number of Amazon SQS messages received from the queue by the `s3` source. * `sqsMessagesDeleted`: The number of Amazon SQS messages deleted from the queue by the `s3` source. * `sqsMessagesFailed`: The number of Amazon SQS messages that the `s3` source failed to parse. -* `s3ObjectNoRecordsFound` -- The number of S3 objects that resulted in 0 records added to the buffer by the `s3` source. * `sqsMessagesDeleteFailed` -- The number of SQS messages that the `s3` source failed to delete from the SQS queue. -* `s3ObjectsDeleted` -- The number of S3 objects deleted by the `s3` source. -* `s3ObjectsDeleteFailed` -- The number of S3 objects that the `s3` source failed to delete. +* `sqsVisibilityTimeoutChangedCount`: The number of times that the `s3` source changed the visibility timeout for an SQS message. This includes multiple visibility timeout changes on the same message. +* `sqsVisibilityTimeoutChangeFailedCount`: The number of times that the `s3` source failed to change the visibility timeout for an SQS message. This includes multiple visibility timeout change failures on the same message. +* `acknowledgementSetCallbackCounter`: The number of times that the `s3` source received an acknowledgment from Data Prepper. ### Timers diff --git a/_data-prepper/pipelines/expression-syntax.md b/_data-prepper/pipelines/expression-syntax.md index 8257ab8978..be0be6f792 100644 --- a/_data-prepper/pipelines/expression-syntax.md +++ b/_data-prepper/pipelines/expression-syntax.md @@ -230,7 +230,7 @@ The `length()` function takes one argument of the JSON pointer type and returns ### `hasTags()` -The `hastags()` function takes one or more string type arguments and returns `true` if all the arguments passed are present in an event's tags. When an argument does not exist in the event's tags, the function returns `false`. For example, if you use the expression `hasTags("tag1")` and the event contains `tag1`, Data Prepper returns `true`. If you use the expression `hasTags("tag2")` but the event only contains a `tag1` tag, Data Prepper returns `false`. +The `hasTags()` function takes one or more string type arguments and returns `true` if all of the arguments passed are present in an event's tags. When an argument does not exist in the event's tags, the function returns `false`. For example, if you use the expression `hasTags("tag1")` and the event contains `tag1`, Data Prepper returns `true`. If you use the expression `hasTags("tag2")` but the event only contains `tag1`, Data Prepper returns `false`. ### `getMetadata()` @@ -245,3 +245,21 @@ The `contains()` function takes two string arguments and determines whether eith The `cidrContains()` function takes two or more arguments. The first argument is a JSON pointer, which represents the key to the IP address that is checked. It supports both IPv4 and IPv6 addresses. Every argument that comes after the key is a string type that represents CIDR blocks that are checked against. If the IP address in the first argument is in the range of any of the given CIDR blocks, the function returns `true`. If the IP address is not in the range of the CIDR blocks, the function returns `false`. For example, `cidrContains(/sourceIp,"192.0.2.0/24","10.0.1.0/16")` will return `true` if the `sourceIp` field indicated in the JSON pointer has a value of `192.0.2.5`. + +### `join()` + +The `join()` function joins elements of a list to form a string. The function takes a JSON pointer, which represents the key to a list or a map where values are of the list type, and joins the lists as strings using commas (`,`), the default delimiter between strings. + +If `{"source": [1, 2, 3]}` is the input data, as shown in the following example: + + +```json +{"source": {"key1": [1, 2, 3], "key2": ["a", "b", "c"]}} +``` + +Then `join(/source)` will return `"1,2,3"` in the following format: + +```json +{"key1": "1,2,3", "key2": "a,b,c"} +``` +You can also specify a delimiter other than the default inside the expression. For example, `join("-", /source)` joins each `source` field using a hyphen (`-`) as the delimiter.