Skip to content

Commit

Permalink
remove requirement for unique_id
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Sep 14, 2024
1 parent d4a2b66 commit 4b09a09
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
36 changes: 25 additions & 11 deletions dbt/include/snowflake/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,32 @@


{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %}
{% set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') %}
{#-- Add additional incremental_predicates if it is safe to do so --#}
{% if config.get("event_time") -%}
{% if config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ config.event_time ~ " >= " ~ config.__dbt_internal_microbatch_event_time_start) %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_end %}
{% endif %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

{% set dml = default__get_incremental_delete_insert_sql(arg_dict) %}
{% do return(dml) %}
delete from {{ target }} DBT_INTERNAL_TARGET
using {{ source }}
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# install latest changes in dbt-core
git+https://github.com/dbt-labs/dbt-core.git@patch-microbatch-event-time#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@microbatch-chunked-backfill#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git@use-patch-microbatch-time-method#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git
Expand Down
11 changes: 11 additions & 0 deletions tests/functional/adapter/test_incremental_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@
)


# No requirement for a unique_id for snowflake microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
Expand Down

0 comments on commit 4b09a09

Please sign in to comment.