Skip to content

Commit

Permalink
[Exporter] Use Go SDK to list clusters
Browse files Browse the repository at this point in the history
This helps us with the following:

- use paginated output that is more efficient
- automatically filter non-interactive clusters instead of doing it ourselves

P.S. We still have usage of 2.0 API coming from mounts and some other places
  • Loading branch information
alexott committed Jan 9, 2025
1 parent 6011b67 commit 296f600
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 93 deletions.
17 changes: 9 additions & 8 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestImportingMounts(t *testing.T) {
{
Method: "POST",
ReuseRequest: true,
Resource: "/api/2.0/clusters/events",
Resource: "/api/2.1/clusters/events",
Response: clusters.EventsResponse{
Events: []clusters.ClusterEvent{},
},
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestImportingUsersGroupsSecretScopes(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/clusters/list",
Resource: "/api/2.1/clusters/list?filter_by.cluster_sources=UI&filter_by.cluster_sources=API&page_size=100",
Response: clusters.ClusterList{},
},
{
Expand Down Expand Up @@ -808,7 +808,7 @@ func TestImportingNoResourcesError(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/clusters/list",
Resource: "/api/2.1/clusters/list?filter_by.cluster_sources=UI&filter_by.cluster_sources=API&page_size=100",
Response: clusters.ClusterList{},
},
{
Expand Down Expand Up @@ -862,7 +862,7 @@ func TestImportingClusters(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/clusters/list",
Resource: "/api/2.1/clusters/list?filter_by.cluster_sources=UI&filter_by.cluster_sources=API&page_size=100",
Response: getJSONObject("test-data/clusters-list-response.json"),
ReuseRequest: true,
},
Expand Down Expand Up @@ -934,9 +934,10 @@ func TestImportingClusters(t *testing.T) {
Response: getJSONObject("test-data/get-cluster-awscluster-response.json"),
},
{
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=awscluster",
Response: getJSONObject("test-data/libraries-cluster-status-test2.json"),
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=awscluster",
Response: getJSONObject("test-data/libraries-cluster-status-test2.json"),
ReuseRequest: true,
},
{
Method: "GET",
Expand Down Expand Up @@ -1588,7 +1589,7 @@ func TestImportingSecrets(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/clusters/list",
Resource: "/api/2.1/clusters/list?filter_by.cluster_sources=UI&filter_by.cluster_sources=API&page_size=100",
Response: clusters.ClusterList{},
},
{
Expand Down
91 changes: 91 additions & 0 deletions exporter/impl_compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package exporter

import (
"log"
"strings"

sdk_compute "github.com/databricks/databricks-sdk-go/service/compute"
)

func listClusters(ic *importContext) error {
lastActiveMs := ic.getLastActiveMs()
interactiveClusters := []sdk_compute.ClusterSource{sdk_compute.ClusterSourceUi, sdk_compute.ClusterSourceApi}

it := ic.workspaceClient.Clusters.List(ic.Context, sdk_compute.ListClustersRequest{
FilterBy: &sdk_compute.ListClustersFilterBy{
ClusterSources: interactiveClusters,
},
PageSize: 100,
})
i := 0
for it.HasNext(ic.Context) {
c, err := it.Next(ic.Context)
if err != nil {
return err
}
i++

if strings.HasPrefix(c.ClusterName, "terraform-") {
log.Printf("[INFO] Skipping terraform-specific cluster %s", c.ClusterName)
continue
}
if !ic.MatchesName(c.ClusterName) {
log.Printf("[INFO] Skipping %s because it doesn't match %s", c.ClusterName, ic.match)
continue
}
if c.LastRestartedTime > 0 && c.LastRestartedTime < lastActiveMs {
log.Printf("[INFO] Old inactive cluster %s", c.ClusterName)
continue
}
ic.Emit(&resource{
Resource: "databricks_cluster",
ID: c.ClusterId,
})
if i%50 == 0 {
log.Printf("[INFO] Scanned %d clusters", i)
}
}
return nil
}

func (ic *importContext) importCluster(c *sdk_compute.ClusterSpec) {
if c == nil {
return
}
if c.AwsAttributes != nil && c.AwsAttributes.InstanceProfileArn != "" {
ic.Emit(&resource{
Resource: "databricks_instance_profile",
ID: c.AwsAttributes.InstanceProfileArn,
})
}
if c.InstancePoolId != "" {
// set enable_elastic_disk to false, and remove aws/gcp/azure_attributes
ic.Emit(&resource{
Resource: "databricks_instance_pool",
ID: c.InstancePoolId,
})
}
if c.DriverInstancePoolId != "" {
ic.Emit(&resource{
Resource: "databricks_instance_pool",
ID: c.DriverInstancePoolId,
})
}
if c.PolicyId != "" {
ic.Emit(&resource{
Resource: "databricks_cluster_policy",
ID: c.PolicyId,
})
}
ic.emitInitScripts(c.InitScripts)
ic.emitSecretsFromSecretsPathMap(c.SparkConf)
ic.emitSecretsFromSecretsPathMap(c.SparkEnvVars)
ic.emitUserOrServicePrincipal(c.SingleUserName)
if c.Kind.String() != "" && c.SingleUserName != "" {
ic.Emit(&resource{
Resource: "databricks_group",
Attribute: "display_name",
Value: c.SingleUserName,
})
}
}
37 changes: 1 addition & 36 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/databricks/databricks-sdk-go/service/vectorsearch"
sdk_workspace "github.com/databricks/databricks-sdk-go/service/workspace"
tfcatalog "github.com/databricks/terraform-provider-databricks/catalog"
"github.com/databricks/terraform-provider-databricks/clusters"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/databricks/terraform-provider-databricks/jobs"
"github.com/databricks/terraform-provider-databricks/mws"
Expand Down Expand Up @@ -329,41 +328,7 @@ var resourcesMap map[string]importable = map[string]importable{
{Path: "init_scripts.workspace.destination", Resource: "databricks_repo", Match: "path",
MatchType: MatchPrefix, SearchValueTransformFunc: appendEndingSlashToDirName},
},
List: func(ic *importContext) error {
clusters, err := clusters.NewClustersAPI(ic.Context, ic.Client).List()
if err != nil {
return err
}
lastActiveMs := ic.getLastActiveMs()
nonInteractiveClusters := []string{"JOB", "MODELS", "PIPELINE_MAINTENANCE", "PIPELINE", "SQL"}
for offset, c := range clusters {
if slices.Contains(nonInteractiveClusters, string(c.ClusterSource)) {
// TODO: Should we check cluster name as well?
// jobRunClusterNameRegex = regexp.MustCompile(`^job-\d+-run-\d+$`)
// jobRunClusterNameRegex.MatchString(c.ClusterName)
log.Printf("[INFO] Skipping non-interactive cluster %s", c.ClusterID)
continue
}
if strings.HasPrefix(c.ClusterName, "terraform-") {
log.Printf("[INFO] Skipping terraform-specific cluster %s", c.ClusterName)
continue
}
if !ic.MatchesName(c.ClusterName) {
log.Printf("[INFO] Skipping %s because it doesn't match %s", c.ClusterName, ic.match)
continue
}
if c.LastActivityTime > 0 && c.LastActivityTime < lastActiveMs {
log.Printf("[INFO] Older inactive cluster %s", c.ClusterName)
continue
}
ic.Emit(&resource{
Resource: "databricks_cluster",
ID: c.ClusterID,
})
log.Printf("[INFO] Scanned %d of %d clusters", offset+1, len(clusters))
}
return nil
},
List: listClusters,
Import: func(ic *importContext, r *resource) error {
var c compute.ClusterSpec
s := ic.Resources["databricks_cluster"].Schema
Expand Down
4 changes: 2 additions & 2 deletions exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestClusterListFails(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/clusters/list",
Resource: "/api/2.1/clusters/list?filter_by.cluster_sources=UI&filter_by.cluster_sources=API&page_size=100",
Status: 404,
Response: apierr.NotFound("nope"),
},
Expand All @@ -417,7 +417,7 @@ func TestClusterList_NoNameMatch(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/clusters/list",
Resource: "/api/2.1/clusters/list?filter_by.cluster_sources=UI&filter_by.cluster_sources=API&page_size=100",
Response: clusters.ClusterList{
Clusters: []clusters.ClusterInfo{
{
Expand Down
5 changes: 0 additions & 5 deletions exporter/test-data/clusters-list-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
"cluster_id": "23443",
"cluster_name": "terraform-abc"
},
{
"cluster_id": "23443",
"cluster_name": "job-34234234-run-32423432",
"cluster_source": "JOB"
},
{
"autotermination_minutes": 120,
"azure_attributes": {
Expand Down
42 changes: 0 additions & 42 deletions exporter/util_compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,48 +32,6 @@ func (ic *importContext) emitInitScripts(initScripts []compute.InitScriptInfo) {
}
}

func (ic *importContext) importCluster(c *compute.ClusterSpec) {
if c == nil {
return
}
if c.AwsAttributes != nil && c.AwsAttributes.InstanceProfileArn != "" {
ic.Emit(&resource{
Resource: "databricks_instance_profile",
ID: c.AwsAttributes.InstanceProfileArn,
})
}
if c.InstancePoolId != "" {
// set enable_elastic_disk to false, and remove aws/gcp/azure_attributes
ic.Emit(&resource{
Resource: "databricks_instance_pool",
ID: c.InstancePoolId,
})
}
if c.DriverInstancePoolId != "" {
ic.Emit(&resource{
Resource: "databricks_instance_pool",
ID: c.DriverInstancePoolId,
})
}
if c.PolicyId != "" {
ic.Emit(&resource{
Resource: "databricks_cluster_policy",
ID: c.PolicyId,
})
}
ic.emitInitScripts(c.InitScripts)
ic.emitSecretsFromSecretsPathMap(c.SparkConf)
ic.emitSecretsFromSecretsPathMap(c.SparkEnvVars)
ic.emitUserOrServicePrincipal(c.SingleUserName)
if c.Kind.String() != "" && c.SingleUserName != "" {
ic.Emit(&resource{
Resource: "databricks_group",
Attribute: "display_name",
Value: c.SingleUserName,
})
}
}

func (ic *importContext) emitSecretsFromSecretPathString(v string) {
if res := secretPathRegex.FindStringSubmatch(v); res != nil {
ic.Emit(&resource{
Expand Down

0 comments on commit 296f600

Please sign in to comment.