From c49136f4990338a42bc3eccac035a6e5f15232ea Mon Sep 17 00:00:00 2001 From: Adi Suresh Date: Tue, 13 Aug 2024 14:37:43 -0500 Subject: [PATCH] Add flags for Iceberg and Lake Formation and Security Lake as a data source type. (#2858) Previously, Iceberg catalog was set as the default catalog. This poses problems as the behavior to fall back to default Spark catalog is only correct in some versions of Iceberg. Rather than always opt into Iceberg, Iceberg should be an option. Additionally, the Lake Formation flag enabled Lake Formation for the EMR job. This did not work as expected because EMR system space does not work with Flint. Instead Lake Formation can be enabled using the Iceberg catalog implementation. This changes adds Security Lake as a data source type. Security Lake as a data source is simply specific options set on top of the base S3Glue data source. --------- Signed-off-by: Adi Suresh --- .../spark/data/constants/SparkConstants.java | 29 ++- .../parameter/SparkSubmitParameters.java | 4 + .../SparkSubmitParametersBuilder.java | 14 +- .../dispatcher/SparkQueryDispatcherTest.java | 59 +---- .../SparkSubmitParametersBuilderTest.java | 2 + ...3GlueDataSourceSparkParameterComposer.java | 84 ++++++- .../config/AsyncExecutorServiceModule.java | 6 +- .../AsyncQueryExecutorServiceSpec.java | 67 ++++- ...eDataSourceSparkParameterComposerTest.java | 230 +++++++++++++++++- .../sql/datasource/model/DataSourceType.java | 3 +- .../glue/GlueDataSourceFactory.java | 17 ++ .../glue/SecurityLakeDataSourceFactory.java | 57 +++++ .../glue/GlueDataSourceFactoryTest.java | 63 +++++ .../glue/SecurityLakeSourceFactoryTest.java | 141 +++++++++++ .../ppl/admin/connectors/s3glue_connector.rst | 7 +- .../connectors/security_lake_connector.rst | 78 ++++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 2 + 17 files changed, 760 insertions(+), 103 deletions(-) create mode 100644 datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java create mode 100644 datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java create mode 100644 docs/user/ppl/admin/connectors/security_lake_connector.rst diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 5b25bc175a..e87dbba03e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -92,18 +92,35 @@ public class SparkConstants { public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL"; public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog"; + public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl"; + public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region"; + public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN = + SPARK_CATALOG + ".client.assume-role.arn"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION = + SPARK_CATALOG + ".client.assume-role.region"; + public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY = + SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller"; + public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id"; + public static final String SPARK_CATALOG_GLUE_LF_ENABLED = + SPARK_CATALOG + ".glue.lakeformation-enabled"; + public static final String ICEBERG_SESSION_CATALOG = "org.apache.iceberg.spark.SparkSessionCatalog"; public static final String ICEBERG_SPARK_EXTENSION = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; - public static final String ICEBERG_SPARK_RUNTIME_PACKAGE = - "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"; - public static final String SPARK_CATALOG_CATALOG_IMPL = - "spark.sql.catalog.spark_catalog.catalog-impl"; + public static final String ICEBERG_SPARK_JARS = + "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30"; public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; + public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY = + "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"; + public static final String ICEBERG_LF_CLIENT_FACTORY = + "org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory"; + // The following option is needed in Iceberg 1.5 when reading timestamp types that do not + // contain timezone in parquet files. The timezone is assumed to be GMT. + public static final String ICEBERG_TS_WO_TZ = + "spark.sql.iceberg.handle-timestamp-without-timezone"; - public static final String EMR_LAKEFORMATION_OPTION = - "spark.emr-serverless.lakeformation.enabled"; public static final String FLINT_ACCELERATE_USING_COVERING_INDEX = "spark.flint.optimizer.covering.enabled"; } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java index 2e142ed117..84fd49b712 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java @@ -30,6 +30,10 @@ public void deleteConfigItem(String key) { config.remove(key); } + public String getConfigItem(String key) { + return config.get(key); + } + @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java index 3fe7d99373..d9d5859f64 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java @@ -27,20 +27,13 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE; @@ -71,7 +64,6 @@ private void setDefaultConfigs() { setConfigItem( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE); setConfigItem( SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE); @@ -85,12 +77,8 @@ private void setDefaultConfigs() { setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - setConfigItem( - SPARK_SQL_EXTENSIONS_KEY, - ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); - setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); } private void setConfigItem(String key, String value) { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 592309cb75..ee840e8b4c 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -203,39 +203,6 @@ void testDispatchSelectQuery() { verifyNoInteractions(flintIndexMetadataService); } - @Test - void testDispatchSelectQueryWithLakeFormation() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); - } - @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); @@ -1085,7 +1052,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String + getConfParam( "spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", "spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory", - "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", "spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT", "spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots", "spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/", @@ -1097,10 +1063,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String "spark.datasource.flint.scheme=SCHEMA", "spark.datasource.flint.auth=basic", "spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", - "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", - "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", - "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", - "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog") + "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") + getConfParam("spark.flint.job.query=" + query) + (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "") + getConfParam( @@ -1149,25 +1113,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { .build(); } - private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() { - - Map properties = new HashMap<>(); - properties.put("glue.auth.type", "iam_role"); - properties.put( - "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); - properties.put( - "glue.indexstore.opensearch.uri", - "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); - properties.put("glue.indexstore.opensearch.auth", "awssigv4"); - properties.put("glue.indexstore.opensearch.region", "eu-west-1"); - properties.put("glue.lakeformation.enabled", "true"); - return new DataSourceMetadata.Builder() - .setName(MY_GLUE) - .setConnector(DataSourceType.S3GLUE) - .setProperties(properties) - .build(); - } - private DataSourceMetadata constructPrometheusDataSourceType() { return new DataSourceMetadata.Builder() .setName("my_prometheus") diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java index 8947cb61f7..8fb975d187 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.parameter; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -93,6 +94,7 @@ public void testOverrideConfigItem() { params.setConfigItem(SPARK_JARS_KEY, "Overridden"); String result = params.toString(); + assertEquals("Overridden", params.getConfigItem(SPARK_JARS_KEY)); assertTrue(result.contains(String.format("%s=Overridden", SPARK_JARS_KEY))); } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java index 26dbf3529a..189e140416 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java @@ -5,15 +5,16 @@ package org.opensearch.sql.spark.parameter; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION; import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY; @@ -25,19 +26,50 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_JARS; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_TS_WO_TZ; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import com.amazonaws.arn.Arn; import java.net.URI; import java.net.URISyntaxException; +import java.util.Optional; import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +@RequiredArgsConstructor public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer { public static final String FLINT_BASIC_AUTH = "basic"; + public static final String FALSE = "false"; + public static final String TRUE = "true"; + + private final SparkExecutionEngineConfigClusterSettingLoader settingLoader; @Override public void compose( @@ -45,7 +77,16 @@ public void compose( SparkSubmitParameters params, DispatchQueryRequest dispatchQueryRequest, AsyncQueryRequestContext context) { - String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final Optional maybeClusterSettings = + settingLoader.load(); + if (!maybeClusterSettings.isPresent()) { + throw new RuntimeException("No cluster settings present"); + } + final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get(); + final String region = clusterSetting.getRegion(); + + final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final String accountId = Arn.fromString(roleArn).getAccountId(); params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); @@ -53,11 +94,40 @@ public void compose( params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName()); - final boolean lakeFormationEnabled = - BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); - params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled)); - params.setConfigItem( - FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled)); + final boolean icebergEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED)); + if (icebergEnabled) { + params.setConfigItem( + SPARK_JAR_PACKAGES_KEY, + params.getConfigItem(SPARK_JAR_PACKAGES_KEY) + "," + ICEBERG_SPARK_JARS); + params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); + params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); + params.setConfigItem( + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + + params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region); + params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region); + params.setConfigItem(ICEBERG_TS_WO_TZ, TRUE); + + final boolean lakeFormationEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); + if (lakeFormationEnabled) { + final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG); + if (StringUtils.isBlank(sessionTag)) { + throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required"); + } + + params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE); + params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE); + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY); + params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag); + } else { + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY); + } + } setFlintIndexStoreHost( params, diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 05f7d1095c..9cc69b2fb7 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -159,7 +159,11 @@ public FlintIndexStateModelService flintIndexStateModelService( public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider( Settings settings, SparkExecutionEngineConfigClusterSettingLoader clusterSettingLoader) { SparkParameterComposerCollection collection = new SparkParameterComposerCollection(); - collection.register(DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + collection.register( + DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); + collection.register( + DataSourceType.SECURITY_LAKE, + new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index d8e3b80175..641b083d53 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -18,12 +18,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.net.URL; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -42,6 +47,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; @@ -58,6 +64,7 @@ import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.config.OpenSearchSparkSubmitParameterModifier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.DatasourceEmbeddedQueryIdProvider; import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; @@ -100,6 +107,10 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { public static final String MYS3_DATASOURCE = "mys3"; public static final String MYGLUE_DATASOURCE = "my_glue"; + public static final String ACCOUNT_ID = "accountId"; + public static final String APPLICATION_ID = "appId"; + public static final String REGION = "us-west-2"; + public static final String ROLE_ARN = "roleArn"; protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; @@ -262,7 +273,13 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( SparkParameterComposerCollection sparkParameterComposerCollection = new SparkParameterComposerCollection(); sparkParameterComposerCollection.register( - DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + DataSourceType.S3GLUE, + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader())); + sparkParameterComposerCollection.register( + DataSourceType.SECURITY_LAKE, + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader())); SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider = new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection); QueryHandlerFactory queryHandlerFactory = @@ -372,14 +389,56 @@ public EMRServerlessClient getClient(String accountId) { public SparkExecutionEngineConfig sparkExecutionEngineConfig( AsyncQueryRequestContext asyncQueryRequestContext) { return SparkExecutionEngineConfig.builder() - .applicationId("appId") - .region("us-west-2") - .executionRoleARN("roleArn") + .applicationId(APPLICATION_ID) + .region(REGION) + .executionRoleARN(ROLE_ARN) .sparkSubmitParameterModifier(new OpenSearchSparkSubmitParameterModifier("")) .clusterName("myCluster") .build(); } + public static class TestSettings extends org.opensearch.sql.common.setting.Settings { + final Map values; + + public TestSettings() { + values = new HashMap<>(); + } + + /** Get Setting Value. */ + @Override + public T getSettingValue(Key key) { + return (T) values.get(key); + } + + @Override + public List getSettings() { + return values.keySet().stream().map(Key::getKeyValue).collect(Collectors.toList()); + } + + public void putSettingValue(Key key, T value) { + values.put(key, value); + } + } + + public SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APPLICATION_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final TestSettings settings = new TestSettings(); + settings.putSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG, jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + public void enableSession(boolean enabled) { // doNothing } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java index 55e62d52f0..3e12aa78d0 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java @@ -7,21 +7,28 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @ExtendWith(MockitoExtension.class) @@ -33,18 +40,20 @@ class S3GlueDataSourceSparkParameterComposerTest { public static final String PASSWORD = "PASSWORD"; public static final String REGION = "REGION"; public static final String TRUE = "true"; - public static final String ROLE_ARN = "ROLE_ARN"; + public static final String ROLE_ARN = "arn:aws:iam::123456789012:role/ROLE_NAME"; + public static final String APP_ID = "APP_ID"; + public static final String CLUSTER_NAME = "CLUSTER_NAME"; + public static final String ACCOUNT_ID = "123456789012"; + public static final String SESSION_TAG = "SESSION_TAG"; private static final String COMMON_EXPECTED_PARAMS = " --class org.apache.spark.sql.FlintJob " + getConfList( - "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.hive.metastore.glue.role.arn=ROLE_ARN", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", "spark.flint.datasource.name=DATASOURCE_NAME", - "spark.emr-serverless.lakeformation.enabled=true", - "spark.flint.optimizer.covering.enabled=false", "spark.datasource.flint.host=test.host.com", "spark.datasource.flint.port=9200", "spark.datasource.flint.scheme=https"); @@ -57,7 +66,7 @@ public void testBasicAuth() { getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -79,7 +88,7 @@ public void testComposeWithSigV4Auth() { getDataSourceMetadata(AuthenticationType.AWSSIGV4AUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -99,7 +108,7 @@ public void testComposeWithNoAuth() { getDataSourceMetadata(AuthenticationType.NOAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -120,7 +129,8 @@ public void testComposeWithBadUri() { assertThrows( IllegalArgumentException.class, () -> - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -128,6 +138,174 @@ public void testComposeWithBadUri() { new NullAsyncQueryRequestContext())); } + @Test + public void testIcebergEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.jars.packages=package,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.sql.iceberg.handle-timestamp-without-timezone=true", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + sparkSubmitParameters.setConfigItem(SPARK_JAR_PACKAGES_KEY, "package"); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG, SESSION_TAG) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.jars.packages=package,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.sql.iceberg.handle-timestamp-without-timezone=true", + "spark.flint.optimizer.covering.enabled=false", + "spark.sql.catalog.spark_catalog.glue.lakeformation-enabled=true", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory", + "spark.sql.catalog.spark_catalog.client.assume-role.tags.LakeFormationAuthorizedCaller=SESSION_TAG", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + sparkSubmitParameters.setConfigItem(SPARK_JAR_PACKAGES_KEY, "package"); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabledNoSessionTag() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()); + assertThrows( + IllegalArgumentException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + + @Test + public void testNoClusterConfigAvailable() { + DataSourceMetadata dataSourceMetadata = + getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)).thenReturn(null); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + new SparkExecutionEngineConfigClusterSettingLoader(settings)); + + assertThrows( + RuntimeException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + private DataSourceMetadata getDataSourceMetadata( AuthenticationType authenticationType, String uri) { return new DataSourceMetadata.Builder() @@ -140,10 +318,20 @@ private DataSourceMetadata getDataSourceMetadata( .build(); } + private DataSourceMetadata getDataSourceMetadata(Map properties) { + return new DataSourceMetadata.Builder() + .setConnector(DataSourceType.S3GLUE) + .setName("DATASOURCE_NAME") + .setDescription("DESCRIPTION") + .setResultIndex("RESULT_INDEX") + .setDataSourceStatus(DataSourceStatus.ACTIVE) + .setProperties(properties) + .build(); + } + private Map getProperties(AuthenticationType authType, String uri) { return ImmutableMap.builder() .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) - .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, uri) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, authType.getName()) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) @@ -152,6 +340,26 @@ private Map getProperties(AuthenticationType authType, String ur .build(); } + private SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APP_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)) + .thenReturn(jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + private static String getConfList(String... params) { return Arrays.stream(params) .map(param -> String.format(" --conf %s ", param)) diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index c727c3c531..c74964fc00 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -15,12 +15,13 @@ public class DataSourceType { public static DataSourceType OPENSEARCH = new DataSourceType("OPENSEARCH"); public static DataSourceType SPARK = new DataSourceType("SPARK"); public static DataSourceType S3GLUE = new DataSourceType("S3GLUE"); + public static DataSourceType SECURITY_LAKE = new DataSourceType("SECURITY_LAKE"); // Map from uppercase DataSourceType name to DataSourceType object private static Map knownValues = new HashMap<>(); static { - register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE); + register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE, SECURITY_LAKE); } private final String name; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index e0c13ff005..11a33a2969 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -29,7 +31,9 @@ public class GlueDataSourceFactory implements DataSourceFactory { "glue.indexstore.opensearch.auth.password"; public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_ICEBERG_ENABLED = "glue.iceberg.enabled"; public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled"; + public static final String GLUE_LAKEFORMATION_SESSION_TAG = "glue.lakeformation.session_tag"; @Override public DataSourceType getDataSourceType() { @@ -76,5 +80,18 @@ private void validateGlueDataSourceConfiguration(Map dataSourceM DatasourceValidationUtils.validateHost( dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI), pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + + // validate Lake Formation config + if (BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_ENABLED))) { + if (!BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_ICEBERG_ENABLED))) { + throw new IllegalArgumentException( + "Lake Formation can only be enabled when Iceberg is enabled."); + } + + if (StringUtils.isBlank(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_SESSION_TAG))) { + throw new IllegalArgumentException( + "Lake Formation session tag must be specified when enabling Lake Formation"); + } + } } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java new file mode 100644 index 0000000000..0f336a08d1 --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java @@ -0,0 +1,57 @@ +package org.opensearch.sql.datasources.glue; + +import java.util.Map; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +public class SecurityLakeDataSourceFactory extends GlueDataSourceFactory { + + private final Settings pluginSettings; + + public static final String TRUE = "true"; + + public SecurityLakeDataSourceFactory(final Settings pluginSettings) { + super(pluginSettings); + this.pluginSettings = pluginSettings; + } + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.SECURITY_LAKE; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + validateProperties(metadata.getProperties()); + metadata.getProperties().put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE); + metadata.getProperties().put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE); + return super.createDataSource(metadata); + } + + private void validateProperties(Map properties) { + // validate Lake Formation config + if (properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED) != null + && !BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED))) { + throw new IllegalArgumentException( + GlueDataSourceFactory.GLUE_ICEBERG_ENABLED + + " cannot be false when using Security Lake data source."); + } + + if (properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED) != null + && !BooleanUtils.toBoolean( + properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED))) { + throw new IllegalArgumentException( + GLUE_LAKEFORMATION_ENABLED + " cannot be false when using Security Lake data source."); + } + + if (StringUtils.isBlank(properties.get(GLUE_LAKEFORMATION_SESSION_TAG))) { + throw new IllegalArgumentException( + GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG + + " must be specified when using Security Lake data source"); + } + } +} diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java index 52f8ec9cd1..2833717265 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -210,4 +210,67 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { Assertions.assertEquals( "Invalid flint host in properties.", illegalArgumentException.getMessage()); } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoIceberg() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "false"); + properties.put("glue.lakeformation.session_tag", "session_tag"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation can only be enabled when Iceberg is enabled.", + illegalArgumentException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoSessionTags() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "true"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation session tag must be specified when enabling Lake Formation", + illegalArgumentException.getMessage()); + } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java new file mode 100644 index 0000000000..561d549826 --- /dev/null +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java @@ -0,0 +1,141 @@ +package org.opensearch.sql.datasources.glue; + +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +@ExtendWith(MockitoExtension.class) +public class SecurityLakeSourceFactoryTest { + + @Mock private Settings settings; + + @Test + void testGetConnectorType() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + Assertions.assertEquals( + DataSourceType.SECURITY_LAKE, securityLakeDataSourceFactory.getDataSourceType()); + } + + @Test + @SneakyThrows + void testCreateSecurityLakeDataSource() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.session_tag", "session_tag"); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + DataSource dataSource = securityLakeDataSourceFactory.createDataSource(metadata); + Assertions.assertEquals(DataSourceType.SECURITY_LAKE, dataSource.getConnectorType()); + + Assertions.assertEquals( + properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED), + SecurityLakeDataSourceFactory.TRUE); + Assertions.assertEquals( + properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED), + SecurityLakeDataSourceFactory.TRUE); + } + + @Test + @SneakyThrows + void testCreateSecurityLakeDataSourceIcebergCannotBeDisabled() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.iceberg.enabled", "false"); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> securityLakeDataSourceFactory.createDataSource(metadata)); + } + + @Test + @SneakyThrows + void testCreateSecurityLakeDataSourceLakeFormationCannotBeDisabled() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.iceberg.enabled", "true"); + properties.put("glue.lakeformation.enabled", "false"); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> securityLakeDataSourceFactory.createDataSource(metadata)); + } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoSessionTags() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.iceberg.enabled", "true"); + properties.put("glue.lakeformation.enabled", "true"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> securityLakeDataSourceFactory.createDataSource(metadata)); + } +} diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index 5e91df70e5..48f19a9d1e 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -42,7 +42,9 @@ Glue Connector Properties. * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] -* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified +* ``glue.iceberg.enabled`` determines whether to enable Iceberg for the session. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.enabled`` determines whether to enable Lake Formation for queries when Iceberg is also enabled. If Iceberg is not enabled, then this property has no effect. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.session_tag`` what session tag to use when assuming the data source role. This property is required when both Iceberg and Lake Formation are enabled. Sample Glue dataSource configuration ======================================== @@ -71,8 +73,7 @@ Glue datasource configuration:: "glue.auth.role_arn": "role_arn", "glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200", "glue.indexstore.opensearch.auth" :"awssigv4", - "glue.indexstore.opensearch.auth.region" :"awssigv4", - "glue.lakeformation.enabled": "true" + "glue.indexstore.opensearch.auth.region" :"us-east-1" }, "resultIndex": "query_execution_result" }] diff --git a/docs/user/ppl/admin/connectors/security_lake_connector.rst b/docs/user/ppl/admin/connectors/security_lake_connector.rst new file mode 100644 index 0000000000..6afddca131 --- /dev/null +++ b/docs/user/ppl/admin/connectors/security_lake_connector.rst @@ -0,0 +1,78 @@ +.. highlight:: sh + +==================== +Security Lake Connector +==================== + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + + +Introduction +============ + +Security Lake connector provides a way to query Security Lake tables. + +Required resources for Security Lake Connector +======================================== +* ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration. + More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_ +* ``S3``: This is where the data lies. +* ``Glue``: Metadata store: Glue takes care of table metadata. +* ``Lake Formation``: AWS service that performs authorization on Security Lake tables +* ``Security Lake``: AWS service that orchestrates creation of S3 files, Glue tables, and Lake Formation permissions. +* ``Opensearch IndexStore``: Index for s3 data lies in opensearch and also acts as temporary buffer for query results. + +We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future. + +Glue Connector Properties. + +* ``resultIndex`` is a new parameter specific to glue connector. Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result. +* ``glue.auth.type`` [Required] + * This parameters provides the authentication type information required for execution engine to connect to glue. + * S3 Glue connector currently only supports ``iam_role`` authentication and the below parameters is required. + * ``glue.auth.role_arn`` +* ``glue.indexstore.opensearch.*`` [Required] + * This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also + * ``glue.indexstore.opensearch.uri`` [Required] + * ``glue.indexstore.opensearch.auth`` [Required] + * Accepted values include ["noauth", "basicauth", "awssigv4"] + * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` + * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` + * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] +* ``glue.lakeformation.session_tag`` [Required] + * What session tag to use when assuming the data source role. + +Sample Glue dataSource configuration +======================================== + +Glue datasource configuration:: + + [{ + "name" : "my_sl", + "connector": "security_lake", + "properties" : { + "glue.auth.type": "iam_role", + "glue.auth.role_arn": "role_arn", + "glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200", + "glue.indexstore.opensearch.auth" :"awssigv4", + "glue.indexstore.opensearch.auth.region" :"us-east-1", + "glue.lakeformation.session_tag": "sesson_tag" + }, + "resultIndex": "query_execution_result" + }] + +Sample Security Lake datasource queries APIS +===================================== + +Sample Queries + +* Select Query : ``select * from mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 limit 1`` +* Create Covering Index Query: ``create index srcip_time on mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 (src_endpoint.ip, time) WITH (auto_refresh=true)`` + +These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_ + +Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index a1b1e32955..971ef5e928 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -60,6 +60,7 @@ import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.datasources.glue.SecurityLakeDataSourceFactory; import org.opensearch.sql.datasources.model.transport.*; import org.opensearch.sql.datasources.rest.RestDataSourceQueryAction; import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; @@ -326,6 +327,7 @@ private DataSourceServiceImpl createDataSourceService() { .add(new PrometheusStorageFactory(pluginSettings)) .add(new SparkStorageFactory(this.client, pluginSettings)) .add(new GlueDataSourceFactory(pluginSettings)) + .add(new SecurityLakeDataSourceFactory(pluginSettings)) .build(), dataSourceMetadataStorage, dataSourceUserAuthorizationHelper);