Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microbatch strategy #1179

Merged
merged 11 commits into from
Sep 18, 2024
2 changes: 1 addition & 1 deletion dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str):
return response

def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "microbatch"]

def debug_query(self):
"""Override for DebugTask method"""
Expand Down
18 changes: 18 additions & 0 deletions dbt/include/snowflake/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,21 @@
{% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}


{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %}
{% set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') %}
{#-- Add additional incremental_predicates if it is safe to do so --#}
{% if model.config.event_time -%}
{% if model.config.event_time_start -%}
{% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " >= " ~ model.__dbt_internal_microbatch_event_time_start) %}
{% endif %}
{% if model.config.event_time_start -%}
{% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_start) %}
{% endif %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

{% set dml = default__get_incremental_delete_insert_sql(arg_dict) %}
{% do return(dml) %}
{% endmacro %}
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@patch-microbatch-event-time#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-adapters.git@use-patch-microbatch-time-method#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git

# dev
Expand Down
13 changes: 13 additions & 0 deletions tests/functional/adapter/test_incremental_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"
Loading