From 111ba6ec55a7815ef89d2fb107f14f104f524a4a Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Wed, 11 Oct 2023 21:02:52 +0200 Subject: [PATCH] added yml config and better schema generation --- meltano.yml | 72 +++++++++++++++++++++-------- target_s3/formats/format_parquet.py | 61 +++++++++++++----------- target_s3/sinks.py | 2 +- 3 files changed, 89 insertions(+), 46 deletions(-) diff --git a/meltano.yml b/meltano.yml index b9df142..7b78dd3 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,25 +1,61 @@ version: 1 send_anonymous_usage_stats: false -project_id: "target-s3" +project_id: target-s3 default_environment: test environments: - - name: test +- name: test plugins: extractors: [] loaders: - - name: "target-s3" - namespace: "target_s3" - pip_url: -e . - capabilities: - - about - - stream-maps - - record-flattening - config: - start_date: "2010-01-01T00:00:00Z" - settings: - # TODO: To configure using Meltano, declare settings and their types here: - - name: username - - name: password - kind: password - - name: start_date - value: "2010-01-01T00:00:00Z" + - name: target-s3 + namespace: target_s3 + pip_url: -e . + capabilities: + - about + - stream-maps + - record-flattening + settings: + - name: format.format_type + - name: format.format_parquet.validate + kind: boolean + value: false + - name: format.format_parquet.get_schema_from_tap + kind: boolean + value: false + - name: cloud_provider.cloud_provider_type + value: aws + - name: cloud_provider.aws.aws_access_key_id + kind: password + - name: cloud_provider.aws.aws_secret_access_key + kind: password + - name: cloud_provider.aws.aws_session_token + kind: password + - name: cloud_provider.aws.aws_region + kind: password + - name: cloud_provider.aws.aws_profile_name + kind: password + - name: cloud_provider.aws.aws_bucket + kind: password + - name: cloud_provider.aws.aws_endpoint_override + - name: prefix + - name: stream_name_path_override + - name: include_process_date + kind: boolean + value: false + - name: append_date_to_prefix + kind: boolean + value: true + - name: partition_name_enabled + kind: boolean + value: false + - name: append_date_to_prefix_grain + value: day + - name: append_date_to_filename + kind: boolean + value: true + - name: append_date_to_filename_grain + value: microsecond + - name: flatten_records + kind: boolean + value: false + diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 3edac49..71a36cd 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -16,6 +16,8 @@ def __init__(self, config, context) -> None: cloud_provider_config_type, cloud_provider_config.get(cloud_provider_config_type, None), ) + self.stream_schema = context.get("stream_schema", {}) + self.parquet_schema = None def create_filesystem( self, @@ -160,7 +162,7 @@ def sanitize(self, value): return None return value - def create_batch_schema(self) -> pyarrow.schema: + def create_schema(self) -> pyarrow.schema: """Generates schema from the records schema present in the tap. This is effective way to declare schema instead of relying on pyarrow to detect schema type. @@ -269,44 +271,49 @@ def get_schema_from_object(properties: dict, level: int = 0): fields.append(pyarrow.field(key, pyarrow.struct(inner_fields))) return fields - properties = self.context["batch_schema"].get("properties") - schema = pyarrow.schema(get_schema_from_object(properties=properties)) - return schema + properties = self.stream_schema.get("properties") + parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties)) + self.parquet_schema = parquet_schema + return parquet_schema def create_dataframe(self) -> Table: """Creates a pyarrow Table object from the record set.""" try: - fields = set() - for d in self.records: - fields = fields.union(d.keys()) - format_parquet = self.format.get("format_parquet", None) - if format_parquet and format_parquet.get("validate", None) == True: - # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html - # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. - schema = dict() - input = { - f: [ - self.validate(schema, self.sanitize(f), row.get(f)) - for row in self.records - ] - for f in fields - } - else: - input = { - f: [self.sanitize(row.get(f)) for row in self.records] - for f in fields - } - if format_parquet and format_parquet.get("get_schema_from_tap", False): + parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema() + fields = set([property.name for property in parquet_schema]) + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } + ret = Table.from_pydict( - mapping=input, schema=self.create_batch_schema() + mapping=input, schema=parquet_schema ) else: + fields = set() + for d in self.records: + fields = fields.union(d.keys()) + if format_parquet and format_parquet.get("validate", None) == True: + # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html + # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. + schema = dict() + input = { + f: [ + self.validate(schema, self.sanitize(f), row.get(f)) + for row in self.records + ] + for f in fields + } + else: + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } ret = Table.from_pydict(mapping=input) except Exception as e: - self.logger.info(self.records) self.logger.error("Failed to create parquet dataframe.") self.logger.error(e) raise e diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 6d59582..98c4259 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -44,7 +44,7 @@ def process_batch(self, context: dict) -> None: # add stream name to context context["stream_name"] = self.stream_name context["logger"] = self.logger - context["batch_schema"] = self.schema + context["stream_schema"] = self.schema # creates new object for each batch format_type_client = format_type_factory( FORMAT_TYPE[self.format_type], self.config, context