Skip to content

Commit

Permalink
feature/databricks-sql-warehouse-compatibility (#121)
Browse files Browse the repository at this point in the history
* feature/databricks-sql-warehouse-compatibility

* run databricks sql

* changelog pr ref and schema update

* changes to incremental startegy selection

* databricks different schemas

* sql warehouse specific test runs

* adjustment for databricks sql and all other destinations

* unique schema name to ensure drop wont conflict

* changelog reword

* docs regen

* validations and docs regen

* variable adjustment

* change cleanup

* changelog update

* update readme & changelog

* Update models/fivetran_platform__audit_table.sql

Co-authored-by: fivetran-catfritz <[email protected]>

* Update README.md

* docs regen after review

* spark removal from macro

* Apply suggestions from code review

Jamie review notes

Co-authored-by: Jamie Rodriguez <[email protected]>

* changelog for fileformat addition

---------

Co-authored-by: fivetran-catfritz <[email protected]>
Co-authored-by: Jamie Rodriguez <[email protected]>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent 1856dd4 commit acb8b61
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .buildkite/hooks/pre-command
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export CI_DATABRICKS_DBT_HOST=$(gcloud secrets versions access latest --secret="
export CI_DATABRICKS_DBT_HTTP_PATH=$(gcloud secrets versions access latest --secret="CI_DATABRICKS_DBT_HTTP_PATH" --project="dbt-package-testing-363917")
export CI_DATABRICKS_DBT_TOKEN=$(gcloud secrets versions access latest --secret="CI_DATABRICKS_DBT_TOKEN" --project="dbt-package-testing-363917")
export CI_DATABRICKS_DBT_CATALOG=$(gcloud secrets versions access latest --secret="CI_DATABRICKS_DBT_CATALOG" --project="dbt-package-testing-363917")
export CI_DATABRICKS_SQL_DBT_HTTP_PATH=$(gcloud secrets versions access latest --secret="CI_DATABRICKS_SQL_DBT_HTTP_PATH" --project="dbt-package-testing-363917")
export CI_DATABRICKS_SQL_DBT_TOKEN=$(gcloud secrets versions access latest --secret="CI_DATABRICKS_SQL_DBT_TOKEN" --project="dbt-package-testing-363917")
export CI_SQLSERVER_DBT_SERVER=$(gcloud secrets versions access latest --secret="CI_SQLSERVER_DBT_SERVER" --project="dbt-package-testing-363917")
export CI_SQLSERVER_DBT_DATABASE=$(gcloud secrets versions access latest --secret="CI_SQLSERVER_DBT_DATABASE" --project="dbt-package-testing-363917")
export CI_SQLSERVER_DBT_USER=$(gcloud secrets versions access latest --secret="CI_SQLSERVER_DBT_USER" --project="dbt-package-testing-363917")
Expand Down
15 changes: 15 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ steps:
commands: |
bash .buildkite/scripts/run_models.sh databricks
- label: ":databricks: :database: Run Tests - Databricks SQL Warehouse"
key: "run_dbt_databricks_sql"
plugins:
- docker#v3.13.0:
image: "python:3.8"
shell: [ "/bin/bash", "-e", "-c" ]
environment:
- "BASH_ENV=/tmp/.bashrc"
- "CI_DATABRICKS_DBT_HOST"
- "CI_DATABRICKS_SQL_DBT_HTTP_PATH"
- "CI_DATABRICKS_SQL_DBT_TOKEN"
- "CI_DATABRICKS_DBT_CATALOG"
commands: |
bash .buildkite/scripts/run_models.sh databricks-sql
- label: ":azure: Run Tests - SQLServer"
key: "run_dbt_sqlserver"
plugins:
Expand Down
17 changes: 17 additions & 0 deletions .buildkite/scripts/run_models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ db=$1
echo `pwd`
cd integration_tests
dbt deps
if [ "$db" = "databricks-sql" ]; then
dbt seed --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db" --full-refresh
dbt compile --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db"
dbt run --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db" --full-refresh
dbt run --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db"
dbt test --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db"
dbt run --vars '{fivetran_platform_schema: sqlw_tests, fivetran_platform__usage_pricing: true}' --target "$db" --full-refresh
dbt run --vars '{fivetran_platform_schema: sqlw_tests, fivetran_platform__usage_pricing: true}' --target "$db"
dbt test --target "$db"
dbt run --vars '{fivetran_platform_schema: sqlw_tests, fivetran_platform__credits_pricing: false, fivetran_platform__usage_pricing: true}' --target "$db" --full-refresh
dbt run --vars '{fivetran_platform__credits_pricing: false, fivetran_platform__usage_pricing: true}' --target "$db"
dbt test --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db"
dbt run --vars '{fivetran_platform_schema: sqlw_tests, fivetran_platform__usage_pricing: false, fivetran_platform_using_destination_membership: false, fivetran_platform_using_user: false}' --target "$db" --full-refresh
dbt run --vars '{fivetran_platform_schema: sqlw_tests, fivetran_platform__usage_pricing: false, fivetran_platform_using_destination_membership: false, fivetran_platform_using_user: false}' --target "$db"
dbt test --vars '{fivetran_platform_schema: sqlw_tests}' --target "$db"
else
dbt seed --target "$db" --full-refresh
dbt compile --target "$db"
dbt run --target "$db" --full-refresh
Expand All @@ -50,6 +66,7 @@ dbt test --target "$db"
dbt run --vars '{fivetran_platform__usage_pricing: false, fivetran_platform_using_destination_membership: false, fivetran_platform_using_user: false}' --target "$db" --full-refresh
dbt run --vars '{fivetran_platform__usage_pricing: false, fivetran_platform_using_destination_membership: false, fivetran_platform_using_user: false}' --target "$db"
dbt test --target "$db"
fi
if [ "$1" != "sqlserver" ]; then
dbt run-operation fivetran_utils.drop_schemas_automation --target "$db"
fi
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# dbt_fivetran_log v1.7.1
[PR #121](https://github.com/fivetran/dbt_fivetran_log/pull/121) includes the following updates:

## Bug Fixes
- Users leveraging the Databricks SQL Warehouse runtime were previously unable to run the `fivetran_platform__audit_table` model due to an incompatible incremental strategy. As such, the following updates have been made:
- A new macro `is_databricks_sql_warehouse()` has been added to determine if a SQL Warehouse runtime for Databricks is being used. This macro will return a boolean of `true` if the runtime is determined to be SQL Warehouse and `false` if it is any other runtime or a non-Databricks destination.
- The above macro is used in determining the incremental strategy within the `fivetran_platform__audit_table`. For Databricks SQL Warehouses, there will be **no** incremental strategy used. All other destinations and runtime strategies are not impacted with this change.
- For the SQL Warehouse runtime, the best incremental strategy we could elect to use is the `merge` strategy. However, we do not have full confidence in the resulting data integrity of the output model when leveraging this strategy. Therefore, we opted for the model to be materialized as a non-incremental `table` for the time being.
- The file format of the model has changed to `delta` for SQL Warehouse users. For all other destinations the `parquet` file format is still used.

## Features
- Updated README incremental model section to revise descriptions and add information for Databricks SQL Warehouse.

## Under the Hood
- Added integration testing pipeline for Databricks SQL Warehouse.
- Applied modifications to the integration testing pipeline to account for jobs being run on both Databricks All Purpose Cluster and SQL Warehouse runtimes.

# dbt_fivetran_log v1.7.0
[PR #119](https://github.com/fivetran/dbt_fivetran_log/pull/119) includes the following updates:

Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ dispatch:
```
### Database Incremental Strategies
Some of the end models in this package are materialized incrementally. We have chosen `insert_overwrite` as the default strategy for **BigQuery** and **Databricks** databases, as it is only available for these dbt adapters. For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy.
For models in this package that are materialized incrementally, they are configured to work with the different strategies available to each supported warehouse.
`insert_overwrite` is our preferred incremental strategy because it will be able to properly handle updates to records that exist outside the immediate incremental window. That is, because it leverages partitions, `insert_overwrite` will appropriately update existing rows that have been changed upstream instead of inserting duplicates of them--all without requiring a full table scan.
For **BigQuery** and **Databricks All Purpose Cluster runtime** destinations, we have chosen `insert_overwrite` as the default strategy, which benefits from the partitioning capability.
> For Databricks SQL Warehouse destinations, models are materialized as tables without support for incremental runs.

`delete+insert` is our second-choice as it resembles `insert_overwrite` but lacks partitions. This strategy works most of the time and appropriately handles incremental loads that do not contain changes to past records. However, if a past record has been updated and is outside of the incremental window, `delete+insert` will insert a duplicate record. 😱
> Because of this, we highly recommend that **Snowflake**, **Redshift**, and **Postgres** users periodically run a `--full-refresh` to ensure a high level of data quality and remove any possible duplicates.
For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy.

> Regardless of strategy, we recommend that users periodically run a `--full-refresh` to ensure a high level of data quality.

## Step 2: Installing the Package
Include the following Fivetran Platform package version range in your `packages.yml`
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.7.0'
version: '1.7.1'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/run_results.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ integration_tests:
threads: 2
token: "{{ env_var('CI_DATABRICKS_DBT_TOKEN') }}"
type: databricks
databricks-sql:
catalog: "{{ env_var('CI_DATABRICKS_DBT_CATALOG') }}"
host: "{{ env_var('CI_DATABRICKS_DBT_HOST') }}"
http_path: "{{ env_var('CI_DATABRICKS_SQL_DBT_HTTP_PATH') }}"
schema: sqlw_tests
threads: 2
token: "{{ env_var('CI_DATABRICKS_SQL_DBT_TOKEN') }}"
type: databricks
sqlserver:
type: sqlserver
driver: 'ODBC Driver 18 for SQL Server'
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.7.0'
version: '1.7.1'

config-version: 2
profile: 'integration_tests'
Expand All @@ -10,7 +10,7 @@ dispatch:

vars:
fivetran_log:
fivetran_platform_schema: fivetran_platform_integration_tests
fivetran_platform_schema: "fivetran_platform_integration_tests"
fivetran_platform_account_identifier: "account"
fivetran_platform_incremental_mar_identifier: "incremental_mar"
fivetran_platform_connector_identifier: "connector"
Expand All @@ -24,7 +24,7 @@ vars:

models:
fivetran_log:
+schema: fivetran_platform
+schema: "{{ 'sqlw_tests' if target.name == 'databricks-sql' else 'fivetran_platform' }}"

seeds:
fivetran_log_integration_tests:
Expand Down
15 changes: 15 additions & 0 deletions macros/is_databricks_sql_warehouse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro is_databricks_sql_warehouse(target) %}
{% if target.type in ('databricks') %}
{% set re = modules.re %}
{% set path_match = target.http_path %}
{% set regex_pattern = "/sql/.+/warehouses/" %}
{% set match_result = re.search(regex_pattern, path_match) %}
{% if match_result %}
{{ return(True) }}
{% else %}
{{ return(False) }}
{% endif %}
{% else %}
{{ return(False) }}
{% endif %}
{% endmacro %}
6 changes: 3 additions & 3 deletions models/fivetran_platform__audit_table.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{{ config(
materialized='incremental',
materialized='table' if is_databricks_sql_warehouse(target) else 'incremental',
unique_key='unique_table_sync_key',
partition_by={
'field': 'sync_start_day',
'data_type': 'date'
} if target.type == 'bigquery' else ['sync_start_day'],
cluster_by = ['sync_start_day'],
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert',
file_format='parquet'
incremental_strategy='insert_overwrite' if target.type in ('bigquery','spark', 'databricks') else 'delete+insert',
file_format='delta' if is_databricks_sql_warehouse(target) else 'parquet'
) }}

with sync_log as (
Expand Down

0 comments on commit acb8b61

Please sign in to comment.