diff --git a/README.md b/README.md index 1bfae33..6e1f25a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com). "format": { "format_type": "json", "format_parquet": { - "validate": "[true/false]" + "validate": "[true|false]" }, "format_json": {}, "format_csv": {} diff --git a/pyproject.toml b/pyproject.toml index 214729f..ade29c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "target-s3" -version = "1.0.3" +version = "1.0.6" description = "`target-s3` is a Singer target for s3, built with the Meltano Singer SDK." authors = ["crowemi"] keywords = [ diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 1b56363..c924b28 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -40,10 +40,22 @@ def create_filesystem( raise e def validate(self, schema: dict, field, value) -> dict: - """Validates data elements.""" + """ + Validates data elements against a given schema and field. If the field is not in the schema, it will be added. + If the value does not match the expected type in the schema, it will be cast to the expected type. + The method returns the validated value. + + :param schema: A dictionary representing the schema to validate against. + :param field: The field to validate. + :param value: The value to validate. + :return: The validated value. + """ def unpack_dict(record) -> dict: ret = dict() + # set empty dictionaries to type string + if len(record) == 0: + ret = {"type": type(str())} for field in record: if isinstance(record[field], dict): ret[field] = unpack_dict(record[field]) @@ -120,7 +132,7 @@ def validate_list(value, fields): else: expected_type = schema[field].get("type") if not isinstance(value, expected_type): - # if the values don't match try to cast current value to expected type, this souldn't happen, + # if the values don't match try to cast current value to expected type, this shouldn't happen, # an error will occur during target instantiation. value = expected_type(value) @@ -128,13 +140,26 @@ def validate_list(value, fields): # add new entry for field if isinstance(value, dict): schema[field] = {"type": type(value), "fields": unpack_dict(value)} + validate_dict(value, schema[field].get("fields")) elif isinstance(value, list): schema[field] = {"type": type(value), "fields": unpack_list(value)} + validate_list(value, schema[field].get("fields")) else: schema[field] = {"type": type(value)} + expected_type = schema[field].get("type") + if not isinstance(value, expected_type): + # if the values don't match try to cast current value to expected type, this shouldn't happen, + # an error will occur during target instantiation. + value = expected_type(value) return value + def sanitize(self, value): + if isinstance(value, dict) and not value: + # pyarrow can't process empty struct + return None + return value + def create_dataframe(self) -> Table: """Creates a pyarrow Table object from the record set.""" try: @@ -144,13 +169,21 @@ def create_dataframe(self) -> Table: format_parquet = self.format.get("format_parquet", None) if format_parquet and format_parquet.get("validate", None) == True: + # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html + # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. schema = dict() input = { - f: [self.validate(schema, f, row.get(f)) for row in self.records] + f: [ + self.validate(schema, self.sanitize(f), row.get(f)) + for row in self.records + ] for f in fields } else: - input = {f: [row.get(f) for row in self.records] for f in fields} + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } ret = Table.from_pydict(mapping=input) except Exception as e: