Skip to content

Commit

Permalink
Refactored schema config resource (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
beevital authored Nov 14, 2023
1 parent b600ed4 commit ff77a75
Show file tree
Hide file tree
Showing 9 changed files with 861 additions and 827 deletions.
13 changes: 7 additions & 6 deletions fivetran/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ const Version = "1.1.1" // Current provider version

func Provider() *schema.Provider {
var resourceMap = map[string]*schema.Resource{
"fivetran_group": resourceGroup(),
"fivetran_group_users": resourceGroupUsers(),
"fivetran_destination": resourceDestination(),
"fivetran_connector": resourceConnector(),
"fivetran_connector_schedule": resourceConnectorSchedule(),
"fivetran_connector_schema_config": connector_schema.ResourceSchemaConfig(),
"fivetran_group": resourceGroup(),
"fivetran_group_users": resourceGroupUsers(),
"fivetran_destination": resourceDestination(),
"fivetran_connector": resourceConnector(),
"fivetran_connector_schedule": resourceConnectorSchedule(),
//"fivetran_connector_schema_config": connector_schema.ResourceSchemaConfig(),
"fivetran_connector_schema_config": connector_schema.ResourceSchemaConfigNew(),
"fivetran_dbt_transformation": resourceDbtTransformation(),
"fivetran_dbt_project": resourceDbtProject(),
"fivetran_webhook": resourceWebhook(),
Expand Down
285 changes: 233 additions & 52 deletions fivetran/tests/mock/resource_connector_schema_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ func TestResourceEmptyDefaultSchemaMock(t *testing.T) {

Check: resource.ComposeAggregateTestCheckFunc(
func(s *terraform.State) error {
assertEqual(t, schemaEmptyDefaultReloadHandler.Interactions, 1) // 1 reload on create
assertEqual(t, schemaEmptyDefaultGetHandler.Interactions, 2) // 1 read attempt before reload, 1 read after create
assertEqual(t, schemaEmptyDefaultReloadHandler.Interactions, 1)
assertEqual(t, schemaEmptyDefaultGetHandler.Interactions, 1)
assertEqual(t, schemaEmptyDefaultPatchHandler.Interactions, 0)
assertNotEmpty(t, schemaEmptyDefaultData) // schema initialised
return nil
Expand All @@ -561,6 +561,7 @@ func TestResourceEmptyDefaultSchemaMock(t *testing.T) {
return nil
},
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema_change_handling", "BLOCK_ALL"),
resource.TestCheckNoResourceAttr("fivetran_connector_schema_config.test_schema", "schema.0"),
),
}

Expand Down Expand Up @@ -749,7 +750,7 @@ func setupMockClientHashedAlignmentSchemaResource(t *testing.T) {
tablesMap := schema1["tables"].(map[string]interface{})
table1 := tablesMap["table_1"].(map[string]interface{})

assertEqual(t, len(table1), 2)
assertEqual(t, len(table1), 1)

assertNotEmpty(t, table1["columns"])

Expand All @@ -767,66 +768,217 @@ func setupMockClientHashedAlignmentSchemaResource(t *testing.T) {
)
}

func setupMockClientConsistentWithUpstreamResource(t *testing.T) {
mockClient.Reset()
schemaConsistentWithUpstreamData = nil
func TestSyncModeMock(t *testing.T) {
setupMockClientConsistentWithUpstreamResource := func(t *testing.T) {
mockClient.Reset()
schemaConsistentWithUpstreamData = createMapFromJsonString(t, `
{
"enable_new_by_default": true,
"schema_change_handling": "BLOCK_ALL",
"schemas": {
"schema_1": {
"name_in_destination": "schema_1",
"enabled": true,
"tables": {
"table_1": {
"name_in_destination": "table_1",
"enabled": true,
"sync_mode": "SOFT_DELETE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_2": {
"name_in_destination": "table_2",
"enabled": true,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_3": {
"name_in_destination": "table_3",
"enabled": false,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
}
}
}
}
}
`)

schemaConsistentWithUpstreamGetHandler = mockClient.When(http.MethodGet, "/v1/connectors/connector_id/schemas").ThenCall(
func(req *http.Request) (*http.Response, error) {
return fivetranSuccessResponse(t, req, http.StatusOK, "Success", schemaConsistentWithUpstreamData), nil
},
)

schemaConsistentWithUpstreamPathchHandler = mockClient.When(http.MethodPatch, "/v1/connectors/connector_id/schemas/").ThenCall(
func(req *http.Request) (*http.Response, error) {
body := requestBodyToJson(t, req)
fmt.Print(body)
syncMode := body["schemas"].(map[string]interface{})["schema_1"].(map[string]interface{})["tables"].(map[string]interface{})["table_1"].(map[string]interface{})["sync_mode"].(string)
assertEqual(t, syncMode, "HISTORY")

schemaConsistentWithUpstreamGetHandler = mockClient.When(http.MethodGet, "/v1/connectors/connector_id/schemas").ThenCall(
func(req *http.Request) (*http.Response, error) {
if nil == schemaHashedAlignmentData {
schemaConsistentWithUpstreamData = createMapFromJsonString(t, `
{
"enable_new_by_default": true,
"schema_change_handling": "BLOCK_ALL",
"schemas": {
"schema_1": {
"name_in_destination": "schema_1",
"enabled": true,
"tables": {
"table_1": {
"name_in_destination": "table_1",
"enabled": true,
"sync_mode": "SOFT_DELETE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_2": {
"name_in_destination": "table_2",
"enabled": true,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_3": {
"name_in_destination": "table_3",
"enabled": false,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
{
"enable_new_by_default": true,
"schema_change_handling": "BLOCK_ALL",
"schemas": {
"schema_1": {
"name_in_destination": "schema_1",
"enabled": true,
"tables": {
"table_1": {
"name_in_destination": "table_1",
"enabled": true,
"sync_mode": "HISTORY",
"enabled_patch_settings": {
"allowed": true
}
},
"table_2": {
"name_in_destination": "table_2",
"enabled": true,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_3": {
"name_in_destination": "table_3",
"enabled": false,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
}
}
}
}
}
`)

return fivetranSuccessResponse(t, req, http.StatusOK, "Success", schemaConsistentWithUpstreamData), nil
},
)
}

step1 := resource.TestStep{
Config: `
resource "fivetran_connector_schema_config" "test_schema" {
provider = fivetran-provider
connector_id = "connector_id"
schema_change_handling = "BLOCK_ALL"
schema {
name = "schema_1"
enabled = true
table {
name = "table_1"
enabled = true
sync_mode = "HISTORY"
}
table {
name = "table_2"
enabled = true
}
}
`)
}
return fivetranSuccessResponse(t, req, http.StatusOK, "Success", schemaConsistentWithUpstreamData), nil
},
)
}`,

schemaConsistentWithUpstreamPathchHandler = mockClient.When(http.MethodPatch, "/v1/connectors/connector_id/schemas/").ThenCall(
func(req *http.Request) (*http.Response, error) {
body := requestBodyToJson(t, req)
fmt.Print(body)
return fivetranSuccessResponse(t, req, http.StatusOK, "Success", schemaConsistentWithUpstreamData), nil
Check: resource.ComposeAggregateTestCheckFunc(
func(s *terraform.State) error {
assertEqual(t, schemaConsistentWithUpstreamGetHandler.Interactions, 1) // 1 read attempt before reload, 1 read after create
assertNotEmpty(t, schemaConsistentWithUpstreamData) // schema initialised
return nil
},
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema_change_handling", "BLOCK_ALL"),
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema.0.table.0.sync_mode", "HISTORY"),
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema.0.table.1.sync_mode", ""),
),
}

resource.Test(
t,
resource.TestCase{
PreCheck: func() {
setupMockClientConsistentWithUpstreamResource(t)
},
Providers: testProviders,
CheckDestroy: func(s *terraform.State) error {
// there is no possibility to destroy schema config - it alsways exists within the connector
return nil
},

Steps: []resource.TestStep{
step1,
},
},
)
}

func TestConsistentWithUpstreamSchemaMock(t *testing.T) {
setupMockClientConsistentWithUpstreamResource := func(t *testing.T) {
mockClient.Reset()
schemaConsistentWithUpstreamData = nil

schemaConsistentWithUpstreamGetHandler = mockClient.When(http.MethodGet, "/v1/connectors/connector_id/schemas").ThenCall(
func(req *http.Request) (*http.Response, error) {
if nil == schemaHashedAlignmentData {
schemaConsistentWithUpstreamData = createMapFromJsonString(t, `
{
"enable_new_by_default": true,
"schema_change_handling": "BLOCK_ALL",
"schemas": {
"schema_1": {
"name_in_destination": "schema_1",
"enabled": true,
"tables": {
"table_1": {
"name_in_destination": "table_1",
"enabled": true,
"sync_mode": "SOFT_DELETE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_2": {
"name_in_destination": "table_2",
"enabled": true,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
},
"table_3": {
"name_in_destination": "table_3",
"enabled": false,
"sync_mode": "LIVE",
"enabled_patch_settings": {
"allowed": true
}
}
}
}
}
}
`)
}
return fivetranSuccessResponse(t, req, http.StatusOK, "Success", schemaConsistentWithUpstreamData), nil
},
)

schemaConsistentWithUpstreamPathchHandler = mockClient.When(http.MethodPatch, "/v1/connectors/connector_id/schemas/").ThenCall(
func(req *http.Request) (*http.Response, error) {
body := requestBodyToJson(t, req)
fmt.Print(body)
return fivetranSuccessResponse(t, req, http.StatusOK, "Success", schemaConsistentWithUpstreamData), nil
},
)
}

step1 := resource.TestStep{
Config: `
resource "fivetran_connector_schema_config" "test_schema" {
Expand All @@ -850,8 +1002,8 @@ func TestConsistentWithUpstreamSchemaMock(t *testing.T) {

Check: resource.ComposeAggregateTestCheckFunc(
func(s *terraform.State) error {
assertEqual(t, schemaConsistentWithUpstreamGetHandler.Interactions, 2) // 1 read attempt before reload, 1 read after create
assertNotEmpty(t, schemaConsistentWithUpstreamData) // schema initialised
assertEqual(t, schemaConsistentWithUpstreamGetHandler.Interactions, 1)
assertNotEmpty(t, schemaConsistentWithUpstreamData)
return nil
},
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema_change_handling", "BLOCK_ALL"),
Expand Down Expand Up @@ -888,6 +1040,34 @@ func TestConsistentWithUpstreamSchemaMock(t *testing.T) {
),
}

step3 := resource.TestStep{
Config: `
resource "fivetran_connector_schema_config" "test_schema" {
provider = fivetran-provider
connector_id = "connector_id"
schema_change_handling = "BLOCK_ALL"
schema {
name = "schema_1"
enabled = true
table {
name = "table_1"
enabled = true
}
table {
name = "table_2"
enabled = true
sync_mode = "LIVE"
}
}
}`,

Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema_change_handling", "BLOCK_ALL"),
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema.0.table.0.sync_mode", ""),
resource.TestCheckResourceAttr("fivetran_connector_schema_config.test_schema", "schema.0.table.1.sync_mode", "LIVE"),
),
}

resource.Test(
t,
resource.TestCase{
Expand All @@ -903,6 +1083,7 @@ func TestConsistentWithUpstreamSchemaMock(t *testing.T) {
Steps: []resource.TestStep{
step1,
step2,
step3,
},
},
)
Expand All @@ -919,7 +1100,7 @@ func TestResourceHashedAlignmentSchemaMock(t *testing.T) {

Check: resource.ComposeAggregateTestCheckFunc(
func(s *terraform.State) error {
assertEqual(t, schemaHashedAlignmentGetHandler.Interactions, 2) // 1 read attempt before reload, 1 read after create
assertEqual(t, schemaHashedAlignmentGetHandler.Interactions, 1) // 1 read attempt before reload, 1 read after create
assertEqual(t, schemaHashedAlignmentPatchHandler.Interactions, 1) // Update hashed for column
assertNotEmpty(t, schemaHashedAlignmentData) // schema initialised
return nil
Expand Down
Loading

0 comments on commit ff77a75

Please sign in to comment.