From 4b09a09b3c004cea31cc20903768209a3dc64e2c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 13 Sep 2024 23:05:18 -0400 Subject: [PATCH] remove requirement for unique_id --- .../macros/materializations/merge.sql | 36 +++++++++++++------ dev-requirements.txt | 2 +- .../adapter/test_incremental_microbatch.py | 11 ++++++ 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index 9d082878a..57c58afdd 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -51,18 +51,32 @@ {% macro snowflake__get_incremental_microbatch_sql(arg_dict) %} - {% set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') %} - {#-- Add additional incremental_predicates if it is safe to do so --#} - {% if config.get("event_time") -%} - {% if config.get("__dbt_internal_microbatch_event_time_start") -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ config.event_time ~ " >= " ~ config.__dbt_internal_microbatch_event_time_start) %} - {% endif %} - {% if model.config.__dbt_internal_microbatch_event_time_end -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_end %} - {% endif %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} {% endif %} {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} - {% set dml = default__get_incremental_delete_insert_sql(arg_dict) %} - {% do return(dml) %} + delete from {{ target }} DBT_INTERNAL_TARGET + using {{ source }} + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) {% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index 0ae3f286e..4d7460996 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-core.git@patch-microbatch-event-time#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@microbatch-chunked-backfill#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git@use-patch-microbatch-time-method#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py index 26046f12f..bbb57f96c 100644 --- a/tests/functional/adapter/test_incremental_microbatch.py +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -4,7 +4,18 @@ ) +# No requirement for a unique_id for snowflake microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + @pytest.fixture(scope="class") def insert_two_rows_sql(self, project) -> str: test_schema_relation = project.adapter.Relation.create(