From b51c86d55ef7d6142617ab16c181c8523f02216b Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Sun, 8 Oct 2023 17:36:17 +0200 Subject: [PATCH 1/3] added parquet schema generation from tap --- target_s3/formats/format_parquet.py | 129 +++++++++++++++++++++++++++- target_s3/sinks.py | 2 + target_s3/target.py | 33 +++++++ 3 files changed, 162 insertions(+), 2 deletions(-) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index c924b28..3edac49 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -160,6 +160,119 @@ def sanitize(self, value): return None return value + def create_batch_schema(self) -> pyarrow.schema: + """Generates schema from the records schema present in the tap. + This is effective way to declare schema instead of relying on pyarrow to + detect schema type. + + Note: At level 0 (outermost level) any key that is of type datetime in record + is converted to datetime by base target class. Hence string at level 0 is handled with + type datetime. + + :return: schema made from stream's schema definition + :rtype: pyarrow.schema + """ + + # TODO: handle non nullable types; by default nullable + def get_schema_from_array(items: dict, level: int): + """Returns item schema for an array. + + :param items: items definition of array + :type items: dict + :param level: depth level of array in jsonschema + :type level: int + :return: detected datatype for all items of array. + :rtype: pyarrow datatype + """ + type = items.get("type") + properties = items.get("properties") + items = items.get("items") + if "integer" in type: + return pyarrow.int64() + elif "number" in type: + return pyarrow.float64() + elif "string" in type: + return pyarrow.string() + elif "array" in type: + return pyarrow.list_(get_schema_from_array(items=items, level=level)) + elif "object" in type: + return pyarrow.struct( + get_schema_from_object(properties=properties, level=level + 1) + ) + else: + return pyarrow.null() + + def get_schema_from_object(properties: dict, level: int = 0): + """Returns schema for an object. + + :param properties: properties definition of object + :type properties: dict + :param level: depth level of object in jsonschema + :type level: int + :return: detected fields for properties in object. + :rtype: pyarrow datatype + """ + fields = [] + for key, val in properties.items(): + type = val["type"] + format = val.get("format") + if "integer" in type: + fields.append(pyarrow.field(key, pyarrow.int64())) + elif "number" in type: + fields.append(pyarrow.field(key, pyarrow.float64())) + elif "string" in type: + if format and level == 0: + # this is done to handle explicit datetime conversion + # which happens only at level 1 of a record + if format == "date": + fields.append(pyarrow.field(key, pyarrow.date64())) + elif format == "time": + fields.append(pyarrow.field(key, pyarrow.time64())) + else: + fields.append( + pyarrow.field(key, pyarrow.timestamp("s", tz="utc")) + ) + else: + fields.append(pyarrow.field(key, pyarrow.string())) + elif "array" in type: + items = val.get("items") + if items: + item_type = get_schema_from_array(items=items, level=level) + if item_type == pyarrow.null(): + self.logger.warn( + f""" + key: {key} is defined as list of null, while this would be + correct for list of all null but it is better to define + exact item types for the list, if not null.""" + ) + fields.append(pyarrow.field(key, pyarrow.list_(item_type))) + else: + self.logger.warn( + f""" + key: {key} is defined as list of null, while this would be + correct for list of all null but it is better to define + exact item types for the list, if not null.""" + ) + fields.append(pyarrow.field(key, pyarrow.list_(pyarrow.null()))) + elif "object" in type: + prop = val.get("properties") + inner_fields = get_schema_from_object( + properties=prop, level=level + 1 + ) + if not inner_fields: + self.logger.warn( + f""" + key: {key} has no fields defined, this may cause + saving parquet failure as parquet doesn't support + empty/null complex types [array, structs] """ + ) + fields.append(pyarrow.field(key, pyarrow.struct(inner_fields))) + return fields + + properties = self.context["batch_schema"].get("properties") + schema = pyarrow.schema(get_schema_from_object(properties=properties)) + return schema + def create_dataframe(self) -> Table: """Creates a pyarrow Table object from the record set.""" try: @@ -185,7 +298,13 @@ def create_dataframe(self) -> Table: for f in fields } - ret = Table.from_pydict(mapping=input) + if format_parquet and format_parquet.get("get_schema_from_tap", False): + ret = Table.from_pydict( + mapping=input, schema=self.create_batch_schema() + ) + else: + ret = Table.from_pydict(mapping=input) + except Exception as e: self.logger.info(self.records) self.logger.error("Failed to create parquet dataframe.") @@ -208,7 +327,13 @@ def _write(self, contents: str = None) -> None: filesystem=self.file_system, ).write_table(df) except Exception as e: - self.logger.error("Failed to write parquet file to S3.") + self.logger.error(e) + if type(e) is pyarrow.lib.ArrowNotImplementedError: + self.logger.error( + """Failed to write parquet file to S3. Complex types [array, object] in schema cannot be left without type definition """ + ) + else: + self.logger.error("Failed to write parquet file to S3.") raise e def run(self) -> None: diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 27c05e7..6d59582 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -30,6 +30,7 @@ def __init__( super().__init__(target, stream_name, schema, key_properties) # what type of file are we building? self.format_type = self.config.get("format", None).get("format_type", None) + self.schema = schema if self.format_type: if self.format_type not in FORMAT_TYPE: raise Exception( @@ -43,6 +44,7 @@ def process_batch(self, context: dict) -> None: # add stream name to context context["stream_name"] = self.stream_name context["logger"] = self.logger + context["batch_schema"] = self.schema # creates new object for each batch format_type_client = format_type_factory( FORMAT_TYPE[self.format_type], self.config, context diff --git a/target_s3/target.py b/target_s3/target.py index 7e3bc61..6b6e2c1 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -38,6 +38,17 @@ class Targets3(Target): required=False, default=False, ), + th.Property( + "get_schema_from_tap", + th.BooleanType, + required=False, + default=False, + description="Set true if you want to declare schema of the\ + resulting parquet file based on taps. Doesn't \ + work with 'anyOf' types or when complex data is\ + not defined at element level. Doesn't work with \ + validate option for now." + ), ), required=False, ), @@ -165,6 +176,28 @@ class Targets3(Target): default_sink_class = s3Sink + def deserialize_json(self, line: str) -> dict: + """Override base target's method to overcome Decimal cast, + only applied when generating parquet schema from tap schema. + + :param line: serialized record from stream + :type line: str + :return: deserialized record + :rtype: dict + """ + try: + self.format = self.config.get("format", None) + format_parquet = self.format.get("format_parquet", None) + if format_parquet and format_parquet.get("get_schema_from_tap", False): + return json.loads(line) # type: ignore[no-any-return] + else: + return json.loads( # type: ignore[no-any-return] + line, parse_float=decimal.Decimal + ) + except json.decoder.JSONDecodeError as exc: + self.logger.error("Unable to parse:\n%s", line, exc_info=exc) + raise + if __name__ == "__main__": Targets3.cli() From 8d1e41344d7886da30c543d92fe6889618d2d05c Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Mon, 9 Oct 2023 12:00:40 +0200 Subject: [PATCH 2/3] fix imports and linting --- target_s3/formats/format_base.py | 56 +++++++++++++++++++++++++------- target_s3/target.py | 4 ++- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/target_s3/formats/format_base.py b/target_s3/formats/format_base.py index 216232a..ed10062 100644 --- a/target_s3/formats/format_base.py +++ b/target_s3/formats/format_base.py @@ -117,23 +117,55 @@ def create_key(self) -> str: grain = DATE_GRAIN[self.config["append_date_to_prefix_grain"].lower()] partition_name_enabled = False if self.config["partition_name_enabled"]: - partition_name_enabled = self.config["partition_name_enabled"] - folder_path += self.create_folder_structure(batch_start, grain, partition_name_enabled) + partition_name_enabled = self.config["partition_name_enabled"] + folder_path += self.create_folder_structure( + batch_start, grain, partition_name_enabled + ) if self.config["append_date_to_filename"]: grain = DATE_GRAIN[self.config["append_date_to_filename_grain"].lower()] file_name += f"{self.create_file_structure(batch_start, grain)}" return f"{folder_path}{file_name}" - def create_folder_structure(self, batch_start: datetime, grain: int, partition_name_enabled: bool) -> str: + def create_folder_structure( + self, batch_start: datetime, grain: int, partition_name_enabled: bool + ) -> str: ret = "" - ret += f"{'year=' if partition_name_enabled else ''}{batch_start.year}/" if grain <= DATE_GRAIN["year"] else "" - ret += f"{'month=' if partition_name_enabled else ''}{batch_start.month:02}/" if grain <= DATE_GRAIN["month"] else "" - ret += f"{'day=' if partition_name_enabled else ''}{batch_start.day:02}/" if grain <= DATE_GRAIN["day"] else "" - ret += f"{'hour=' if partition_name_enabled else ''}{batch_start.hour:02}/" if grain <= DATE_GRAIN["hour"] else "" - ret += f"{'minute=' if partition_name_enabled else ''}{batch_start.minute:02}/" if grain <= DATE_GRAIN["minute"] else "" - ret += f"{'second=' if partition_name_enabled else ''}{batch_start.second:02}/" if grain <= DATE_GRAIN["second"] else "" - ret += f"{'microsecond=' if partition_name_enabled else ''}{batch_start.microsecond}/" if grain <= DATE_GRAIN["microsecond"] else "" + ret += ( + f"{'year=' if partition_name_enabled else ''}{batch_start.year}/" + if grain <= DATE_GRAIN["year"] + else "" + ) + ret += ( + f"{'month=' if partition_name_enabled else ''}{batch_start.month:02}/" + if grain <= DATE_GRAIN["month"] + else "" + ) + ret += ( + f"{'day=' if partition_name_enabled else ''}{batch_start.day:02}/" + if grain <= DATE_GRAIN["day"] + else "" + ) + ret += ( + f"{'hour=' if partition_name_enabled else ''}{batch_start.hour:02}/" + if grain <= DATE_GRAIN["hour"] + else "" + ) + ret += ( + f"{'minute=' if partition_name_enabled else ''}{batch_start.minute:02}/" + if grain <= DATE_GRAIN["minute"] + else "" + ) + ret += ( + f"{'second=' if partition_name_enabled else ''}{batch_start.second:02}/" + if grain <= DATE_GRAIN["second"] + else "" + ) + ret += ( + f"{'microsecond=' if partition_name_enabled else ''}{batch_start.microsecond}/" + if grain <= DATE_GRAIN["microsecond"] + else "" + ) return ret def create_file_structure(self, batch_start: datetime, grain: int) -> str: @@ -144,7 +176,9 @@ def create_file_structure(self, batch_start: datetime, grain: int) -> str: ret += f"-{batch_start.hour:02}" if grain <= DATE_GRAIN["hour"] else "" ret += f"{batch_start.minute:02}" if grain <= DATE_GRAIN["minute"] else "" ret += f"{batch_start.second:02}" if grain <= DATE_GRAIN["second"] else "" - ret += f"{batch_start.microsecond}" if grain <= DATE_GRAIN["microsecond"] else "" + ret += ( + f"{batch_start.microsecond}" if grain <= DATE_GRAIN["microsecond"] else "" + ) return ret def flatten_key(self, k, parent_key, sep) -> str: diff --git a/target_s3/target.py b/target_s3/target.py index 6b6e2c1..99c08c4 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -1,6 +1,8 @@ """s3 target class.""" from __future__ import annotations +import decimal +import json from singer_sdk.target_base import Target from singer_sdk import typing as th @@ -47,7 +49,7 @@ class Targets3(Target): resulting parquet file based on taps. Doesn't \ work with 'anyOf' types or when complex data is\ not defined at element level. Doesn't work with \ - validate option for now." + validate option for now.", ), ), required=False, From 111ba6ec55a7815ef89d2fb107f14f104f524a4a Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Wed, 11 Oct 2023 21:02:52 +0200 Subject: [PATCH 3/3] added yml config and better schema generation --- meltano.yml | 72 +++++++++++++++++++++-------- target_s3/formats/format_parquet.py | 61 +++++++++++++----------- target_s3/sinks.py | 2 +- 3 files changed, 89 insertions(+), 46 deletions(-) diff --git a/meltano.yml b/meltano.yml index b9df142..7b78dd3 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,25 +1,61 @@ version: 1 send_anonymous_usage_stats: false -project_id: "target-s3" +project_id: target-s3 default_environment: test environments: - - name: test +- name: test plugins: extractors: [] loaders: - - name: "target-s3" - namespace: "target_s3" - pip_url: -e . - capabilities: - - about - - stream-maps - - record-flattening - config: - start_date: "2010-01-01T00:00:00Z" - settings: - # TODO: To configure using Meltano, declare settings and their types here: - - name: username - - name: password - kind: password - - name: start_date - value: "2010-01-01T00:00:00Z" + - name: target-s3 + namespace: target_s3 + pip_url: -e . + capabilities: + - about + - stream-maps + - record-flattening + settings: + - name: format.format_type + - name: format.format_parquet.validate + kind: boolean + value: false + - name: format.format_parquet.get_schema_from_tap + kind: boolean + value: false + - name: cloud_provider.cloud_provider_type + value: aws + - name: cloud_provider.aws.aws_access_key_id + kind: password + - name: cloud_provider.aws.aws_secret_access_key + kind: password + - name: cloud_provider.aws.aws_session_token + kind: password + - name: cloud_provider.aws.aws_region + kind: password + - name: cloud_provider.aws.aws_profile_name + kind: password + - name: cloud_provider.aws.aws_bucket + kind: password + - name: cloud_provider.aws.aws_endpoint_override + - name: prefix + - name: stream_name_path_override + - name: include_process_date + kind: boolean + value: false + - name: append_date_to_prefix + kind: boolean + value: true + - name: partition_name_enabled + kind: boolean + value: false + - name: append_date_to_prefix_grain + value: day + - name: append_date_to_filename + kind: boolean + value: true + - name: append_date_to_filename_grain + value: microsecond + - name: flatten_records + kind: boolean + value: false + diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 3edac49..71a36cd 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -16,6 +16,8 @@ def __init__(self, config, context) -> None: cloud_provider_config_type, cloud_provider_config.get(cloud_provider_config_type, None), ) + self.stream_schema = context.get("stream_schema", {}) + self.parquet_schema = None def create_filesystem( self, @@ -160,7 +162,7 @@ def sanitize(self, value): return None return value - def create_batch_schema(self) -> pyarrow.schema: + def create_schema(self) -> pyarrow.schema: """Generates schema from the records schema present in the tap. This is effective way to declare schema instead of relying on pyarrow to detect schema type. @@ -269,44 +271,49 @@ def get_schema_from_object(properties: dict, level: int = 0): fields.append(pyarrow.field(key, pyarrow.struct(inner_fields))) return fields - properties = self.context["batch_schema"].get("properties") - schema = pyarrow.schema(get_schema_from_object(properties=properties)) - return schema + properties = self.stream_schema.get("properties") + parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties)) + self.parquet_schema = parquet_schema + return parquet_schema def create_dataframe(self) -> Table: """Creates a pyarrow Table object from the record set.""" try: - fields = set() - for d in self.records: - fields = fields.union(d.keys()) - format_parquet = self.format.get("format_parquet", None) - if format_parquet and format_parquet.get("validate", None) == True: - # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html - # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. - schema = dict() - input = { - f: [ - self.validate(schema, self.sanitize(f), row.get(f)) - for row in self.records - ] - for f in fields - } - else: - input = { - f: [self.sanitize(row.get(f)) for row in self.records] - for f in fields - } - if format_parquet and format_parquet.get("get_schema_from_tap", False): + parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema() + fields = set([property.name for property in parquet_schema]) + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } + ret = Table.from_pydict( - mapping=input, schema=self.create_batch_schema() + mapping=input, schema=parquet_schema ) else: + fields = set() + for d in self.records: + fields = fields.union(d.keys()) + if format_parquet and format_parquet.get("validate", None) == True: + # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html + # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. + schema = dict() + input = { + f: [ + self.validate(schema, self.sanitize(f), row.get(f)) + for row in self.records + ] + for f in fields + } + else: + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } ret = Table.from_pydict(mapping=input) except Exception as e: - self.logger.info(self.records) self.logger.error("Failed to create parquet dataframe.") self.logger.error(e) raise e diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 6d59582..98c4259 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -44,7 +44,7 @@ def process_batch(self, context: dict) -> None: # add stream name to context context["stream_name"] = self.stream_name context["logger"] = self.logger - context["batch_schema"] = self.schema + context["stream_schema"] = self.schema # creates new object for each batch format_type_client = format_type_factory( FORMAT_TYPE[self.format_type], self.config, context