From 0df403496188a00a4f7f89f0990bcd5fece5bca1 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 7 Feb 2024 10:05:40 -0400 Subject: [PATCH] feat(cluster-connection): implement conterpart for cluster connections api --- internal/provider/cluster_connection.go | 367 ++++++++++++++++++++++++ internal/provider/provider.go | 1 + internal/scylla/client.go | 8 +- internal/scylla/endpoints.go | 58 +++- internal/scylla/errors.go | 20 +- internal/scylla/model/model.go | 31 ++ 6 files changed, 478 insertions(+), 7 deletions(-) create mode 100644 internal/provider/cluster_connection.go diff --git a/internal/provider/cluster_connection.go b/internal/provider/cluster_connection.go new file mode 100644 index 0000000..62a8b12 --- /dev/null +++ b/internal/provider/cluster_connection.go @@ -0,0 +1,367 @@ +package provider + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + + "github.com/scylladb/terraform-provider-scylladbcloud/internal/scylla" + "github.com/scylladb/terraform-provider-scylladbcloud/internal/scylla/model" +) + +const ( + clusterConnectionRetryTimeout = 40 * time.Minute + clusterConnectionDeleteTimeout = 90 * time.Minute + clusterConnectionRetryDelay = 5 * time.Second +) + +func ResourceClusterConnection() *schema.Resource { + return &schema.Resource{ + CreateContext: resourceClusterConnectionCreate, + ReadContext: resourceClusterConnectionRead, + UpdateContext: resourceClusterConnectionUpdate, + DeleteContext: resourceClusterConnectionDelete, + + Importer: &schema.ResourceImporter{ + StateContext: schema.ImportStatePassthroughContext, + }, + + Timeouts: &schema.ResourceTimeout{ + Create: schema.DefaultTimeout(clusterConnectionRetryTimeout), + Update: schema.DefaultTimeout(clusterConnectionRetryTimeout), + Delete: schema.DefaultTimeout(clusterConnectionDeleteTimeout), + }, + + Schema: map[string]*schema.Schema{ + "id": { + Description: "Cluster connection ID", + Computed: true, + Type: schema.TypeString, + }, + "region": { + Description: "Region name", + Computed: true, + Type: schema.TypeString, + }, + "name": { + Description: "Cluster Connection Name", + Optional: true, + Type: schema.TypeString, + }, + "cluster_id": { + Description: "Cluster ID", + Required: true, + Type: schema.TypeInt, + }, + "datacenter": { + Description: "Cluster datacenter name", + Required: true, + ForceNew: true, + Type: schema.TypeString, + }, + "cidrlist": { + Description: "List of CIDRs to route to the cluster connection", + Required: true, + Type: schema.TypeList, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "type": { + Description: "Connection Type", + Required: true, + ForceNew: true, + Type: schema.TypeString, + }, + "status": { + Description: "Connection Status", + Computed: true, + Optional: true, + Type: schema.TypeString, + }, + "external_id": { + Description: "ID of the cloud resource that represents connection", + Computed: true, + Type: schema.TypeString, + }, + "data": { + Description: "Connection Data", + Required: true, + Type: schema.TypeMap, + Elem: &schema.Schema{ + Required: true, + Computed: true, + Type: schema.TypeString, + }, + }, + }, + } +} + +func resourceClusterConnectionCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + var ( + c = meta.(*scylla.Client) + dcName = d.Get("datacenter").(string) + cidrListVal, cidrListOK = d.GetOk("cidrlist") + clusterID = d.Get("cluster_id").(int) + r = &model.ClusterConnectionCreateRequest{ + Name: d.Get("name").(string), + Data: convertData(d.Get("data").(map[string]interface{})), + Type: d.Get("type").(string), + } + p *scylla.CloudProvider + ) + + dcs, err := c.ListDataCenters(ctx, int64(clusterID)) + if err != nil { + return diag.Errorf("error reading clusters: %s", err) + } + + for _, dc := range dcs { + if strings.EqualFold(dc.Name, dcName) { + r.ClusterDCID = dc.ID + p = c.Meta.ProviderByID(dc.CloudProviderID) + if p == nil { + return diag.Errorf("unable to find cloud provider with id=%d", dc.CloudProviderID) + } + break + } + } + + if r.ClusterDCID == 0 { + return diag.Errorf("unable to find %q datacenter", dcName) + } + + if !cidrListOK { + return diag.Errorf(`"cidrlist" is required for %q cloud`, p.CloudProvider.Name) + } + + if len(cidrListVal.([]any)) == 0 { + return diag.Errorf(`"cidrlist" cannot be empty`) + } + + r.CIDRList, err = CIDRList(cidrListVal) + if err != nil { + return diag.Errorf(`"cidrlist" must be a list of strings`) + } + + conn, err := c.CreateClusterConnection(ctx, int64(clusterID), r) + if err != nil { + return diag.Errorf("error creating cluster connection: %s", err) + } + d.SetId(strconv.FormatInt(conn.ID, 10)) + err = waitForClusterConnection(ctx, c, int64(clusterID), conn.ID, "ACTIVE") + if err != nil { + return diag.Errorf(err.Error()) + } + conn, err = c.GetClusterConnection(ctx, int64(clusterID), conn.ID) + if err != nil { + return diag.Errorf("error reading cluster connection %d: %s", conn.ID, err) + } + _ = d.Set("external_id", conn.ExternalID) + return nil +} + +func resourceClusterConnectionRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + var ( + c = meta.(*scylla.Client) + connIDStr = d.Id() + dc *model.Datacenter + ) + + if connIDStr == "" { + return nil + } + + connectionID, err := strconv.ParseInt(connIDStr, 10, 64) + if err != nil { + return diag.Errorf("failed to parse connection id %q: %s", connIDStr, err) + } + + clusterID := int64(d.Get("cluster_id").(int)) + + cluster, connection, err := getClusterAndConnection(ctx, c, clusterID, connectionID) + if err != nil { + return diag.Errorf(err.Error()) + } + + if connection == nil || cluster == nil { + d.SetId("") + _ = d.Set("cluster_id", 0) + return nil + } + + for id := range cluster.Datacenters { + if cluster.Datacenters[id].ID == connection.ClusterDCID { + dc = &cluster.Datacenters[id] + break + } + } + + if dc == nil { + d.SetId("") + _ = d.Set("cluster_id", 0) + return nil + } + + _ = d.Set("datacenter", dc.Name) + _ = d.Set("external_id", connection.ExternalID) + _ = d.Set("region", dc.Region.ExternalID) + _ = d.Set("cluster_id", connection.ClusterID) + _ = d.Set("cidrlist", connection.CIDRList) + _ = d.Set("name", connection.Name) + _ = d.Set("data", convertFromData(connection.Data)) + _ = d.Set("type", connection.Type) + return nil +} + +func resourceClusterConnectionUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + var ( + c = meta.(*scylla.Client) + clusterID = d.Get("cluster_id").(int) + ) + connID, err := strconv.ParseInt(d.Id(), 10, 64) + if err != nil { + return diag.Errorf("failed to parse connection id %q: %s", d.Id(), err) + } + req := model.ClusterConnectionUpdateRequest{ + Name: d.Get("name").(string), + CIDRList: convertListOfString(d.Get("cidrlist").([]interface{})), + Status: d.Get("status").(string), + } + + err = c.UpdateClusterConnections(ctx, int64(clusterID), connID, &req) + if err != nil { + return diag.Errorf("error updating cluster connection: %s", err) + } + err = waitForClusterConnection(ctx, c, int64(clusterID), connID, req.Status) + if err != nil { + return diag.Errorf(err.Error()) + } + + conn, err := c.GetClusterConnection(ctx, int64(clusterID), connID) + if err != nil { + return diag.Errorf("error reading cluster connection %d: %s", connID, err) + } + _ = d.Set("external_id", conn.ExternalID) + return nil +} + +func resourceClusterConnectionDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + var ( + c = meta.(*scylla.Client) + clusterID = d.Get("cluster_id").(int) + connIDStr = d.Id() + ) + + if connIDStr == "" { + return nil + } + + connID, err := strconv.ParseInt(connIDStr, 10, 64) + if err != nil { + return diag.Errorf("failed to parse connection id %q: %s", connIDStr, err) + } + + if err = c.DeleteClusterConnection(ctx, int64(clusterID), connID); err != nil { + if scylla.IsClusterConnectionDeletedErr(err) { + return nil // cluster was already deleted + } + return diag.Errorf("error deleting cluster connection: %s", err) + } + err = waitForClusterConnection(ctx, c, int64(clusterID), connID, "DELETED") + if err != nil { + return diag.Errorf("error waiting for cluster connection to become deleted: %s", err) + } + return nil +} + +func convertData(mapVal map[string]interface{}) map[string]string { + out := make(map[string]string, len(mapVal)) + for key, val := range mapVal { + out[key] = val.(string) + } + return out +} + +func convertFromData(mapVal map[string]string) map[string]interface{} { + out := make(map[string]interface{}, len(mapVal)) + for key, val := range mapVal { + out[key] = val + } + return out +} + +func convertListOfString(mapVal []interface{}) []string { + out := make([]string, len(mapVal)) + for key, val := range mapVal { + out[key] = val.(string) + } + return out +} + +func waitForClusterConnection(ctx context.Context, c *scylla.Client, clusterID, connectionID int64, targetStatus string) error { + stateConf := &retry.StateChangeConf{ + Pending: []string{"PENDING", "INIT", "DELETING"}, + Target: []string{targetStatus}, + Refresh: func() (interface{}, string, error) { + conn, err := c.GetClusterConnection(context.Background(), clusterID, connectionID) + switch { + case err == nil: + return conn, conn.Status, nil + case scylla.IsNotFound(err), scylla.IsClusterConnectionDeletedErr(err): + return nil, "DELETED", nil + default: + return nil, "", err + } + }, + Delay: clusterConnectionRetryDelay, + Timeout: clusterConnectionRetryTimeout, + } + + _, err := stateConf.WaitForStateContext(ctx) + if err != nil { + return fmt.Errorf("error waiting for cluster connection to become %q: %s", targetStatus, err) + } + return nil +} + +func getClusterAndConnection(ctx context.Context, c *scylla.Client, clusterID, connectionID int64) (cluster *model.Cluster, connection *model.ClusterConnection, err error) { + if clusterID != 0 { + cluster, err = c.GetCluster(ctx, clusterID) + if err != nil { + if scylla.IsNotFound(err) { + return nil, nil, nil + } + return nil, nil, fmt.Errorf("error reading cluster %d: %s", clusterID, err) + } + + connection, err = c.GetClusterConnection(ctx, cluster.ID, connectionID) + if err != nil && !scylla.IsNotFound(err) && !scylla.IsClusterConnectionDeletedErr(err) { + return nil, nil, fmt.Errorf("error reading cluster connection %d: %s", connectionID, err) + } + return cluster, connection, nil + } + clusters, err := c.ListClusters(ctx) + if err != nil { + return nil, nil, fmt.Errorf("error reading cluster list: %s", err) + } + + for i := range clusters { + cluster = &clusters[i] + connection, err = c.GetClusterConnection(ctx, cluster.ID, connectionID) + if err == nil { + return cluster, connection, nil + } + if !scylla.IsNotFound(err) && !scylla.IsClusterConnectionDeletedErr(err) { + return nil, nil, fmt.Errorf("error reading cluster connection %d: %s", connectionID, err) + } + } + return nil, nil, nil +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 2791b36..225615c 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -46,6 +46,7 @@ func New(_ context.Context) (*schema.Provider, error) { "scylladbcloud_cluster": ResourceCluster(), "scylladbcloud_allowlist_rule": ResourceAllowlistRule(), "scylladbcloud_vpc_peering": ResourceVPCPeering(), + "scylladbcloud_cluster_connection": ResourceClusterConnection(), "scylladbcloud_serverless_cluster": ResourceServerlessCluster(), }, } diff --git a/internal/scylla/client.go b/internal/scylla/client.go index 907f8b3..64cb193 100644 --- a/internal/scylla/client.go +++ b/internal/scylla/client.go @@ -226,7 +226,7 @@ func (c *Client) callAPI(ctx context.Context, method, path string, reqBody, resT "error": err.Error(), "body": buf.String(), }) - err = makeError("failed to unmarshal data: "+err.Error(), c.ErrCodes, resp) + err = makeError("failed to unmarshal data: "+err.Error(), c.ErrCodes, resp, buf.String()) return err } @@ -237,7 +237,7 @@ func (c *Client) callAPI(ctx context.Context, method, path string, reqBody, resT } if data.Error != "" { - err = makeError(data.Error, c.ErrCodes, resp) + err = makeError(data.Error, c.ErrCodes, resp, buf.String()) tflog.Trace(ctx, "api returned error: "+err.Error(), map[string]interface{}{ "code": resp.StatusCode, "status": resp.Status, @@ -263,6 +263,10 @@ func (c *Client) post(ctx context.Context, path string, requestBody, resultType return c.callAPI(ctx, http.MethodPost, path, requestBody, resultType) } +func (c *Client) put(ctx context.Context, path string, requestBody, resultType interface{}) error { + return c.callAPI(ctx, http.MethodPut, path, requestBody, resultType) +} + func (c *Client) delete(ctx context.Context, path string) error { return c.callAPI(ctx, http.MethodDelete, path, nil, nil) } diff --git a/internal/scylla/endpoints.go b/internal/scylla/endpoints.go index fca913e..8efdfae 100644 --- a/internal/scylla/endpoints.go +++ b/internal/scylla/endpoints.go @@ -69,8 +69,8 @@ func (c *Client) Bundle(ctx context.Context, clusterID int64) ([]byte, error) { return raw, nil } -func (c *Client) Connect(ctx context.Context, clusterID int64) (*model.ClusterConnection, error) { - var result model.ClusterConnection +func (c *Client) Connect(ctx context.Context, clusterID int64) (*model.ClusterConnectionInformation, error) { + var result model.ClusterConnectionInformation path := fmt.Sprintf("/account/%d/cluster/connect", c.AccountID) @@ -259,7 +259,59 @@ func (c *Client) DeleteClusterVPCPeering(ctx context.Context, clusterID, peerID return c.delete(ctx, path) } -func fix_sf3112(c *model.ClusterConnection) { +func (c *Client) CreateClusterConnection(ctx context.Context, clusterID int64, req *model.ClusterConnectionCreateRequest) (*model.ClusterConnection, error) { + var result struct { + ID int64 `json:"id"` + ConnectionID int64 `json:"connectionID"` + } + + path := fmt.Sprintf("/account/%d/cluster/%d/network/vpc/connection", c.AccountID, clusterID) + + if err := c.post(ctx, path, req, &result); err != nil { + return nil, err + } + return c.GetClusterConnection(ctx, clusterID, result.ConnectionID) +} + +func (c *Client) GetClusterConnection(ctx context.Context, clusterID, connectionID int64) (*model.ClusterConnection, error) { + var result model.ClusterConnection + + path := fmt.Sprintf("/account/%d/cluster/%d/network/vpc/connection/%d", c.AccountID, clusterID, connectionID) + + if err := c.get(ctx, path, &result); err != nil { + return nil, err + } + return &result, nil +} + +func (c *Client) ListClusterConnections(ctx context.Context, clusterID int64) ([]model.ClusterConnection, error) { + var result []model.ClusterConnection + + path := fmt.Sprintf("/account/%d/cluster/%d/network/vpc/connection", c.AccountID, clusterID) + + if err := c.get(ctx, path, &result); err != nil { + return nil, err + } + + return result, nil +} + +func (c *Client) UpdateClusterConnections(ctx context.Context, clusterID, connectionID int64, req *model.ClusterConnectionUpdateRequest) error { + path := fmt.Sprintf("/account/%d/cluster/%d/network/vpc/connection/%d", c.AccountID, clusterID, connectionID) + if err := c.put(ctx, path, req, nil); err != nil { + return err + } + + return nil +} + +func (c *Client) DeleteClusterConnection(ctx context.Context, clusterID, connectionID int64) error { + path := fmt.Sprintf("/account/%d/cluster/%d/network/vpc/connection/%d", c.AccountID, clusterID, connectionID) + + return c.delete(ctx, path) +} + +func fix_sf3112(c *model.ClusterConnectionInformation) { for i := range c.Datacenters { dc := &c.Datacenters[i] diff --git a/internal/scylla/errors.go b/internal/scylla/errors.go index e844746..f20c7f3 100644 --- a/internal/scylla/errors.go +++ b/internal/scylla/errors.go @@ -7,6 +7,13 @@ import ( "strconv" ) +func IsClusterConnectionDeletedErr(err error) bool { + if e := new(APIError); errors.As(err, &e) && e.Code == "083002" { + return true + } + return false +} + func IsDeletedErr(err error) bool { if e := new(APIError); errors.As(err, &e) && e.Code == "040001" { return true @@ -14,6 +21,13 @@ func IsDeletedErr(err error) bool { return false } +func IsNotFound(err error) bool { + if e := new(APIError); errors.As(err, &e) && e.StatusCode == http.StatusNotFound { + return true + } + return false +} + // APIError represents an error that occurred while calling the API. type APIError struct { URL string @@ -21,9 +35,10 @@ type APIError struct { Message string Method string StatusCode int + Body string } -func makeError(text string, errCodes map[string]string, r *http.Response) *APIError { +func makeError(text string, errCodes map[string]string, r *http.Response, body string) *APIError { var err APIError if _, e := strconv.Atoi(text); e == nil { err.Code = text @@ -46,9 +61,10 @@ func makeError(text string, errCodes map[string]string, r *http.Response) *APIEr err.StatusCode = r.StatusCode } err.Method = r.Request.Method + err.Body = body return &err } func (err *APIError) Error() string { - return fmt.Sprintf("Error %q: %s (http status %d, method %s url %q)", err.Code, err.Message, err.StatusCode, err.Method, err.URL) + return fmt.Sprintf("Error %q: %s (http status %d, method %s url %q)\n%s", err.Code, err.Message, err.StatusCode, err.Method, err.URL, err.Body) } diff --git a/internal/scylla/model/model.go b/internal/scylla/model/model.go index 5d80b5f..eb0ca19 100644 --- a/internal/scylla/model/model.go +++ b/internal/scylla/model/model.go @@ -246,7 +246,38 @@ func (vp *VPCPeering) NetworkLink() string { return "projects/" + vp.ProjectID + "/global/networks/" + vp.NetworkName } +type ClusterConnectionCreateRequest struct { + Name string `json:"name"` + CIDRList []string `json:"cidrList"` + ClusterDCID int64 `json:"clusterDCID"` + Data map[string]string `json:"data"` + Type string `json:"type"` +} + +type ClusterConnectionUpdateRequest struct { + Name string `json:"name"` + CIDRList []string `json:"cidrList"` + Status string `json:"status"` +} + type ClusterConnection struct { + ID int64 `json:"id"` + Name string `json:"name"` + ExternalID string `json:"externalId"` + AwaitingData map[string]string `json:"awaitingData"` + AwaitingForClient bool `json:"awaitingForClient"` + CIDRList []string `json:"cidrList"` + ClusterDCID int64 `json:"clusterDCID"` + ClusterVPCID int64 `json:"clusterVPCID"` + ClusterID int64 `json:"clusterId"` + Data map[string]string `json:"data"` + Stage string `json:"stage"` + StageMessage string `json:"stageMessage"` + Status string `json:"status"` + Type string `json:"type"` +} + +type ClusterConnectionInformation struct { BroadcastType string `json:"broadcastType"` Credentials struct { Username string `json:"username"`