From 8c29070dfe93f2895de128ade68a767a90906796 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Stachowski?= Date: Thu, 6 Apr 2017 12:54:38 +0200 Subject: [PATCH] Increase number of connectivity options --- cassandra/cassandra.go | 166 +++++++++++++++------ cassandra/cassandra_integration_test.go | 20 ++- cassandra/cassandra_test.go | 35 +++-- cassandra/client.go | 189 +++++++++++++----------- glide.lock | 12 +- glide.yaml | 2 +- 6 files changed, 269 insertions(+), 155 deletions(-) diff --git a/cassandra/cassandra.go b/cassandra/cassandra.go index 36fa560..daef190 100644 --- a/cassandra/cassandra.go +++ b/cassandra/cassandra.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "strings" + "time" log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap/control/plugin" @@ -34,18 +35,26 @@ import ( const ( name = "cassandra" - version = 5 + version = 6 pluginType = plugin.PublisherPluginType - serverAddrRuleKey = "server" - sslOptionsRuleKey = "ssl" - usernameRuleKey = "username" - passwordRuleKey = "password" - keyPathRuleKey = "keyPath" - certPathRuleKey = "certPath" caPathRuleKey = "caPath" + certPathRuleKey = "certPath" + connectionTimeoutRuleKey = "connectionTimeout" + createKeyspaceRuleKey = "createKeyspace" enableServerCertVerRuleKey = "serverCertVerification" + ignorePeerAddrRuleKey = "ignorePeerAddr" + initialHostLookupRuleKey = "initialHostLookup" + keyPathRuleKey = "keyPath" + keyspaceNameRuleKey = "keyspaceName" + passwordRuleKey = "password" + portRuleKey = "port" + serverAddrRuleKey = "server" + sslOptionsRuleKey = "ssl" + tableNameRuleKey = "tableName" tagIndexRuleKey = "tagIndex" + timeoutRuleKey = "timeout" + usernameRuleKey = "username" ) // Meta returns a plugin meta data @@ -70,51 +79,91 @@ func (cas *CassandraPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) cp := cpolicy.New() config := cpolicy.NewPolicyNode() - serverAddrRule, err := cpolicy.NewStringRule(serverAddrRuleKey, true) + caPathRule, err := cpolicy.NewStringRule(caPathRuleKey, false, "") handleErr(err) - serverAddrRule.Description = "Cassandra server" - config.Add(serverAddrRule) + caPathRule.Description = "Path to the CA certificate for the Cassandra server" + config.Add(caPathRule) - useSslOptionsRule, err := cpolicy.NewBoolRule(sslOptionsRuleKey, false, false) + certPathRule, err := cpolicy.NewStringRule(certPathRuleKey, false, "") handleErr(err) - useSslOptionsRule.Description = "Not required, if true, use ssl options to connect to the Cassandra, default: false" - config.Add(useSslOptionsRule) + certPathRule.Description = "Path to the self signed certificate for the Cassandra client" + config.Add(certPathRule) - usernameRule, err := cpolicy.NewStringRule(usernameRuleKey, false, "") + connectionTimeoutRule, err := cpolicy.NewIntegerRule(connectionTimeoutRuleKey, false, 2) handleErr(err) - usernameRule.Description = "Name of a user used to authenticate to Cassandra" - config.Add(usernameRule) + connectionTimeoutRule.Description = "Initial connection timeout in seconds, default: 2" + config.Add(connectionTimeoutRule) - passwordRule, err := cpolicy.NewStringRule(passwordRuleKey, false, "") + createKeyspaceRule, err := cpolicy.NewBoolRule(createKeyspaceRuleKey, false, true) handleErr(err) - passwordRule.Description = "Password used to authenticate to the Cassandra" - config.Add(passwordRule) + createKeyspaceRule.Description = "Create keyspace if it's not exist, default: true" + config.Add(createKeyspaceRule) + + enableServerCertVerRule, err := cpolicy.NewBoolRule(enableServerCertVerRuleKey, false, true) + handleErr(err) + enableServerCertVerRule.Description = "If true, verify a hostname and a server key, default: true" + config.Add(enableServerCertVerRule) + + ignorePeerAddrRule, err := cpolicy.NewBoolRule(ignorePeerAddrRuleKey, false, false) + handleErr(err) + ignorePeerAddrRule.Description = "Turn off cluster hosts tracking, default: false" + config.Add(ignorePeerAddrRule) + + initialHostLookupRule, err := cpolicy.NewBoolRule(initialHostLookupRuleKey, false, true) + handleErr(err) + initialHostLookupRule.Description = "Lookup for cluster hosts information, default: true" + config.Add(initialHostLookupRule) keyPathRule, err := cpolicy.NewStringRule(keyPathRuleKey, false, "") handleErr(err) keyPathRule.Description = "Path to the private key for the Cassandra client" config.Add(keyPathRule) - certPathRule, err := cpolicy.NewStringRule(certPathRuleKey, false, "") + keyspaceNameRule, err := cpolicy.NewStringRule(keyspaceNameRuleKey, false, "snap") handleErr(err) - certPathRule.Description = "Path to the self signed certificate for the Cassandra client" - config.Add(certPathRule) + keyspaceNameRule.Description = "Keyspace name, default: snap" + config.Add(keyspaceNameRule) - caPathRule, err := cpolicy.NewStringRule(caPathRuleKey, false, "") + passwordRule, err := cpolicy.NewStringRule(passwordRuleKey, false, "") handleErr(err) - caPathRule.Description = "Path to the CA certificate for the Cassandra server" - config.Add(caPathRule) + passwordRule.Description = "Password used to authenticate to the Cassandra" + config.Add(passwordRule) - enableServerCertVerRule, err := cpolicy.NewBoolRule(enableServerCertVerRuleKey, false, true) + portRule, err := cpolicy.NewIntegerRule(portRuleKey, false, 9042) handleErr(err) - enableServerCertVerRule.Description = "If true, verify a hostname and a server key, default: true" - config.Add(enableServerCertVerRule) + portRule.Description = "Cassandra server port, default: 9042" + config.Add(portRule) + + serverAddrRule, err := cpolicy.NewStringRule(serverAddrRuleKey, true) + handleErr(err) + serverAddrRule.Description = "Cassandra server" + config.Add(serverAddrRule) + + useSslOptionsRule, err := cpolicy.NewBoolRule(sslOptionsRuleKey, false, false) + handleErr(err) + useSslOptionsRule.Description = "Not required, if true, use ssl options to connect to the Cassandra, default: false" + config.Add(useSslOptionsRule) + + tableNameRule, err := cpolicy.NewStringRule(tableNameRuleKey, false, "metrics") + handleErr(err) + tableNameRule.Description = "Table name, default: metrics" + config.Add(tableNameRule) tagIndexRule, err := cpolicy.NewStringRule(tagIndexRuleKey, false, "") handleErr(err) tagIndexRule.Description = "Name of tags to be indexed separated by a comma" config.Add(tagIndexRule) + timeoutRule, err := cpolicy.NewIntegerRule(timeoutRuleKey, false, 2) + handleErr(err) + timeoutRule.Description = "Connection timeout in seconds, default: 2" + config.Add(timeoutRule) + + usernameRule, err := cpolicy.NewStringRule(usernameRuleKey, false, "") + handleErr(err) + usernameRule.Description = "Name of a user used to authenticate to Cassandra" + config.Add(usernameRule) + cp.Add([]string{""}, config) return cp, nil } @@ -140,23 +189,7 @@ func (cas *CassandraPublisher) Publish(contentType string, content []byte, confi // Only initialize client once if possible if cas.client == nil { - // Get all values for a new client. - useSslOptions, ok := getValueForKey(config, sslOptionsRuleKey).(bool) - checkAssertion(ok, sslOptionsRuleKey) - - var sslOptions *sslOptions - if useSslOptions { - logger.Debug("using ssl options") - sslOptions = getSslOptions(config) - } - - serverAddr, ok := getValueForKey(config, serverAddrRuleKey).(string) - checkAssertion(ok, serverAddrRuleKey) - - co := clientOptions{ - server: serverAddr, - ssl: sslOptions, - } + co := prepareClientOptions(config) // Initialize a new client. tagIndex, ok := getValueForKey(config, tagIndexRuleKey).(string) @@ -173,6 +206,47 @@ func (cas *CassandraPublisher) Close() { } } +func prepareClientOptions(config map[string]ctypes.ConfigValue) clientOptions { + serverAddr, ok := getValueForKey(config, serverAddrRuleKey).(string) + checkAssertion(ok, serverAddrRuleKey) + serverPort, ok := getValueForKey(config, portRuleKey).(int) + checkAssertion(ok, portRuleKey) + timeout, ok := getValueForKey(config, timeoutRuleKey).(int) + checkAssertion(ok, timeoutRuleKey) + connTimeout, ok := getValueForKey(config, connectionTimeoutRuleKey).(int) + checkAssertion(ok, connectionTimeoutRuleKey) + initialHostLookup, ok := getValueForKey(config, initialHostLookupRuleKey).(bool) + checkAssertion(ok, initialHostLookupRuleKey) + ignorePeerAddr, ok := getValueForKey(config, ignorePeerAddrRuleKey).(bool) + checkAssertion(ok, ignorePeerAddrRuleKey) + keyspaceName, ok := getValueForKey(config, keyspaceNameRuleKey).(string) + checkAssertion(ok, keyspaceNameRuleKey) + createKeyspace, ok := getValueForKey(config, createKeyspaceRuleKey).(bool) + checkAssertion(ok, createKeyspaceRuleKey) + useSslOptions, ok := getValueForKey(config, sslOptionsRuleKey).(bool) + checkAssertion(ok, sslOptionsRuleKey) + tableName, ok := getValueForKey(config, tableNameRuleKey).(string) + checkAssertion(ok, tableNameRuleKey) + + var sslOptions *sslOptions + if useSslOptions { + sslOptions = getSslOptions(config) + } + + return clientOptions{ + server: serverAddr, + port: serverPort, + timeout: time.Duration(timeout) * time.Second, + connectionTimeout: time.Duration(connTimeout) * time.Second, + initialHostLookup: initialHostLookup, + ignorePeerAddr: ignorePeerAddr, + keyspace: keyspaceName, + createKeyspace: createKeyspace, + ssl: sslOptions, + tableName: tableName, + } +} + func getValueForKey(cfg map[string]ctypes.ConfigValue, key string) interface{} { if cfg == nil { log.Error("Configuration of a plugin not found") @@ -223,7 +297,7 @@ func getSslOptions(cfg map[string]ctypes.ConfigValue) *sslOptions { func handleErr(e error) { if e != nil { - log.Fatal(e.Error()) + log.Fatalf("%s", e.Error()) } } diff --git a/cassandra/cassandra_integration_test.go b/cassandra/cassandra_integration_test.go index 338dbb2..342b356 100644 --- a/cassandra/cassandra_integration_test.go +++ b/cassandra/cassandra_integration_test.go @@ -35,11 +35,19 @@ import ( ) const ( + connectionTimeout = 2 + shouldCreateKeyspace = true + enableServerCertVerification = false + ignorePeerAddr = false + initialHostLookup = true + keyspaceName = "snap" + tableName = "foo" + password = "password" + port = 9042 serverAddress = "127.0.0.1" sslOptionsFlag = true + timeout = 2 username = "username" - password = "password" - enableServerCertVerification = false ) func TestCassandraPublish(t *testing.T) { @@ -54,9 +62,17 @@ func TestCassandraPublish(t *testing.T) { log.Fatal("SNAP_CASSANDRA_HOST is not set") } + config[connectionTimeoutRuleKey] = ctypes.ConfigValueInt{Value: connectionTimeout} + config[createKeyspaceRuleKey] = ctypes.ConfigValueBool{Value: shouldCreateKeyspace} + config[ignorePeerAddrRuleKey] = ctypes.ConfigValueBool{Value: ignorePeerAddr} + config[initialHostLookupRuleKey] = ctypes.ConfigValueBool{Value: initialHostLookup} + config[keyspaceNameRuleKey] = ctypes.ConfigValueStr{Value: keyspaceName} + config[portRuleKey] = ctypes.ConfigValueInt{Value: port} config[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: hostip} config[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: false} config[tagIndexRuleKey] = ctypes.ConfigValueStr{Value: "experimentId,mode,year"} + config[timeoutRuleKey] = ctypes.ConfigValueInt{Value: timeout} + config[tableNameRuleKey] = ctypes.ConfigValueStr{Value: tableName} Convey("Publish integer metric", func() { tags := map[string]string{core.STD_TAG_PLUGIN_RUNNING_ON: "hostname", "experimentId": "101"} diff --git a/cassandra/cassandra_test.go b/cassandra/cassandra_test.go index bf5bcb9..dd6d61e 100644 --- a/cassandra/cassandra_test.go +++ b/cassandra/cassandra_test.go @@ -31,12 +31,14 @@ import ( ) const ( + enableServerCertVerification = false + keyspaceName = "snap" + password = "password" + path = "/some/path" serverAddress = "127.0.0.1" sslOptionsFlag = true + tableName = "metrics" username = "username" - password = "password" - path = "/some/path" - enableServerCertVerification = false ) func TestCassandraDBPlugin(t *testing.T) { @@ -125,14 +127,16 @@ func TestSslOptions(t *testing.T) { // Prepare test config with ssl options. testConfig := make(map[string]ctypes.ConfigValue) - testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress} - testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag} - testConfig[usernameRuleKey] = ctypes.ConfigValueStr{Value: username} - testConfig[passwordRuleKey] = ctypes.ConfigValueStr{Value: password} testConfig[caPathRuleKey] = ctypes.ConfigValueStr{Value: path} testConfig[certPathRuleKey] = ctypes.ConfigValueStr{Value: path} - testConfig[keyPathRuleKey] = ctypes.ConfigValueStr{Value: path} testConfig[enableServerCertVerRuleKey] = ctypes.ConfigValueBool{Value: enableServerCertVerification} + testConfig[keyPathRuleKey] = ctypes.ConfigValueStr{Value: path} + testConfig[keyspaceName] = ctypes.ConfigValueStr{Value: keyspaceName} + testConfig[passwordRuleKey] = ctypes.ConfigValueStr{Value: password} + testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress} + testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag} + testConfig[tableNameRuleKey] = ctypes.ConfigValueStr{Value: tableName} + testConfig[usernameRuleKey] = ctypes.ConfigValueStr{Value: username} cfg, errs := configPolicy.Get([]string{""}).Process(testConfig) Convey("So config policy should return a config after processing testConfig with valid ssl options", func() { @@ -147,9 +151,10 @@ func TestSslOptions(t *testing.T) { Convey("So received ssl options struct should have proper values for all keys", func() { So(reflect.DeepEqual(expectedSslOptions, receivedSslOptions), ShouldBeTrue) }) + config := prepareClientOptions(testConfig) // Prepare cluster for a given address. - cluster := createCluster(serverAddress) + cluster := createCluster(config) Convey("So while creating cluster it should not be nil", func() { So(cluster, ShouldNotBeNil) }) @@ -168,14 +173,16 @@ func TestSslOptions(t *testing.T) { // Prepare test config with invalid ssl options. testConfig = make(map[string]ctypes.ConfigValue) - testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress} - testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag} - testConfig[usernameRuleKey] = ctypes.ConfigValueInt{Value: 0} - testConfig[passwordRuleKey] = ctypes.ConfigValueInt{Value: 0} testConfig[caPathRuleKey] = ctypes.ConfigValueInt{Value: 0} testConfig[certPathRuleKey] = ctypes.ConfigValueInt{Value: 0} - testConfig[keyPathRuleKey] = ctypes.ConfigValueInt{Value: 0} testConfig[enableServerCertVerRuleKey] = ctypes.ConfigValueStr{Value: ""} + testConfig[keyPathRuleKey] = ctypes.ConfigValueInt{Value: 0} + testConfig[passwordRuleKey] = ctypes.ConfigValueInt{Value: 0} + testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress} + testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag} + testConfig[usernameRuleKey] = ctypes.ConfigValueInt{Value: 0} + testConfig[tableNameRuleKey] = ctypes.ConfigValueStr{Value: tableName} + testConfig[keyspaceName] = ctypes.ConfigValueStr{Value: keyspaceName} cfg, errs = configPolicy.Get([]string{""}).Process(testConfig) Convey("So config policy should not return a config after processing testConfig with invalid ssl options", func() { diff --git a/cassandra/client.go b/cassandra/client.go index 9093574..52f86dc 100644 --- a/cassandra/client.go +++ b/cassandra/client.go @@ -36,25 +36,40 @@ var ( cassaLog = log.WithField("_module", "snap-cassandra-clinet") ErrInvalidDataType = errors.New("Invalid data type value found - %v") - createKeyspaceCQL = "CREATE KEYSPACE IF NOT EXISTS snap WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};" - createTableCQL = "CREATE TABLE IF NOT EXISTS snap.metrics (ns text, ver int, host text, time timestamp, valType text, doubleVal double, strVal text, boolVal boolean, tags map, PRIMARY KEY ((ns, ver, host), time),) WITH CLUSTERING ORDER BY (time DESC);" - createTagTableCQL = "CREATE TABLE IF NOT EXISTS snap.tags (key text, val text, time timestamp, ns text, ver int, host text, valType text, doubleVal double, strVal text, boolVal boolean, tags map, PRIMARY KEY ((key, val), time),) WITH CLUSTERING ORDER BY (time DESC);" + createKeyspaceCQL = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};" + createTableCQL = "CREATE TABLE IF NOT EXISTS %s.%s (ns text, ver int, host text, time timestamp, valType text, doubleVal double, strVal text, boolVal boolean, tags map, PRIMARY KEY ((ns, ver, host), time),) WITH CLUSTERING ORDER BY (time DESC);" + createTagTableCQL = "CREATE TABLE IF NOT EXISTS %s.tags (key text, val text, time timestamp, ns text, ver int, host text, valType text, doubleVal double, strVal text, boolVal boolean, tags map, PRIMARY KEY ((key, val), time),) WITH CLUSTERING ORDER BY (time DESC);" + insertMetricsCQL = `INSERT INTO %s.%s (ns, ver, host, time, valtype, %s, tags) VALUES (?, ?, ?, ? ,?, ?, ?)` + insertTagsCQL = `INSERT INTO %s.tags (key, val, time, ns, ver, host, valtype, %s, tags) VALUES (?, ?, ?, ? ,?, ?, ?, ?, ?)` ) // NewCassaClient creates a new instance of a cassandra client. func NewCassaClient(co clientOptions, tagIndex string) *cassaClient { - return &cassaClient{session: getInstance(co), tagsIndex: tagIndex} + return &cassaClient{session: getInstance(co), keyspace: co.keyspace, tableName: co.tableName, tagsIndex: tagIndex} } // cassaClient contains a long running Cassandra CQL session type cassaClient struct { session *gocql.Session tagsIndex string + keyspace string + tableName string } type clientOptions struct { server string - ssl *sslOptions + port int + + timeout time.Duration + connectionTimeout time.Duration + initialHostLookup bool + ignorePeerAddr bool + + createKeyspace bool + keyspace string + tableName string + + ssl *sslOptions } // sslOptions contains configuration for encrypted communication between the app and the server @@ -84,14 +99,14 @@ func (cc *cassaClient) saveMetrics(mts []plugin.MetricType) error { var err error for _, m := range mts { // insert data into metrics table - err = worker(cc.session, m) + err = worker(cc.session, cc.keyspace, cc.tableName, m) if err != nil { errs = append(errs, err.Error()) } // inserts data into tags table if tagIndex config exists vtags := getValidTagIndex(m.Tags(), cc.tagsIndex) - err = tagWorker(cc.session, m, vtags) + err = tagWorker(cc.session, cc.keyspace, m, vtags) if err != nil { errs = append(errs, err.Error()) } @@ -102,8 +117,44 @@ func (cc *cassaClient) saveMetrics(mts []plugin.MetricType) error { return err } +func executeMetricsQuery(keyspace, tableName, insertColumn string, s *gocql.Session, m plugin.MetricType, value interface{}) error { + queryStr := fmt.Sprintf(insertMetricsCQL, keyspace, tableName, insertColumn) + query := s.Query(queryStr, + m.Namespace().String(), + m.Version(), + m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], + m.Timestamp(), + insertColumn, + value, + m.Tags()) + + if err := query.Exec(); err != nil { + return err + } + return nil +} + +func executeTagsQuery(keyspace, insertColumn, tag string, s *gocql.Session, m plugin.MetricType, value interface{}) error { + queryStr := fmt.Sprintf(insertTagsCQL, keyspace, insertColumn) + query := s.Query(queryStr, + tag, + m.Tags()[tag], + time.Now(), + m.Namespace().String(), + m.Version(), + m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], + insertColumn, + value, + m.Tags()) + + if err := query.Exec(); err != nil { + return err + } + return nil +} + // works insert data into Cassandra DB metrics table only when the data is valid -func worker(s *gocql.Session, m plugin.MetricType) error { +func worker(s *gocql.Session, keyspace, tableName string, m plugin.MetricType) error { value, err := convert(m.Data()) if err != nil { cassaLog.WithFields(log.Fields{ @@ -114,46 +165,25 @@ func worker(s *gocql.Session, m plugin.MetricType) error { switch value.(type) { case float64: - if err = s.Query(`INSERT INTO snap.metrics (ns, ver, host, time, valtype, doubleVal, tags) VALUES (?, ?, ?, ? ,?, ?, ?)`, - m.Namespace().String(), - m.Version(), - m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], - m.Timestamp(), - "doubleval", - value, - m.Tags()).Exec(); err != nil { + err := executeMetricsQuery(keyspace, tableName, "doubleVal", s, m, value) + if err != nil { cassaLog.WithFields(log.Fields{ "err": err, - }).Error("Cassandra client insertion error") - return err + }).Error("Cassandra client insertion error ") } case string: - if err = s.Query(`INSERT INTO snap.metrics (ns, ver, host, time, valtype, strVal, tags) VALUES (?, ?, ?, ? ,?, ?, ?)`, - m.Namespace().String(), - m.Version(), - m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], - m.Timestamp(), - "strval", - value, - m.Tags()).Exec(); err != nil { + err := executeMetricsQuery(keyspace, tableName, "strVal", s, m, value) + if err != nil { cassaLog.WithFields(log.Fields{ "err": err, - }).Error("Cassandra client insertion error") - return err + }).Error("Cassandra client insertion error ") } case bool: - if err = s.Query(`INSERT INTO snap.metrics (ns, ver, host, time, valtype, boolVal, tags) VALUES (?, ?, ?, ? ,?, ?, ?)`, - m.Namespace().String(), - m.Version(), - m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], - m.Timestamp(), - "boolval", - value, - m.Tags()).Exec(); err != nil { + err := executeMetricsQuery(keyspace, tableName, "boolVal", s, m, value) + if err != nil { cassaLog.WithFields(log.Fields{ "err": err, - }).Error("Cassandra client insertion error") - return err + }).Error("Cassandra client insertion error ") } default: return fmt.Errorf(ErrInvalidDataType.Error(), value) @@ -162,7 +192,7 @@ func worker(s *gocql.Session, m plugin.MetricType) error { } // tagWorker insert data into Cassandra DB tags only when the tags array is not empty. -func tagWorker(s *gocql.Session, m plugin.MetricType, tags []string) error { +func tagWorker(s *gocql.Session, keyspace string, m plugin.MetricType, tags []string) error { if len(tags) == 0 { return nil } @@ -178,56 +208,29 @@ func tagWorker(s *gocql.Session, m plugin.MetricType, tags []string) error { switch value.(type) { case float64: for _, v := range tags { - if err = s.Query(`INSERT INTO snap.tags (key, val, time, ns, ver, host, valtype, doubleVal, tags) VALUES (?, ?, ?, ? ,?, ?, ?, ?, ?)`, - v, - m.Tags()[v], - time.Now(), - m.Namespace().String(), - m.Version(), - m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], - "doubleval", - value, - m.Tags()).Exec(); err != nil { + err := executeTagsQuery(keyspace, "doubleVal", v, s, m, value) + if err != nil { cassaLog.WithFields(log.Fields{ "err": err, - }).Error("Cassandra client insertion error") - return err + }).Error("Cassandra client insertion error ") } } case string: for _, v := range tags { - if err = s.Query(`INSERT INTO snap.tags (key, val, time, ns, ver, host, valtype, strVal, tags) VALUES (?, ?, ?, ? ,?, ?, ?, ?, ?)`, - v, - m.Tags()[v], - time.Now(), - m.Namespace().String(), - m.Version(), - m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], - "strval", - value, - m.Tags()).Exec(); err != nil { + err := executeTagsQuery(keyspace, "strVal", v, s, m, value) + if err != nil { cassaLog.WithFields(log.Fields{ "err": err, - }).Error("Cassandra client insertion error") - return err + }).Error("Cassandra client insertion error ") } } case bool: for _, v := range tags { - if err = s.Query(`INSERT INTO snap.tags (key, val, time, ns, ver, host, valtype, boolVal, tags) VALUES (?, ?, ?, ? ,?, ?, ?, ?, ?)`, - v, - m.Tags()[v], - time.Now(), - m.Namespace().String(), - m.Version(), - m.Tags()[core.STD_TAG_PLUGIN_RUNNING_ON], - "boolval", - value, - m.Tags()).Exec(); err != nil { + err := executeTagsQuery(keyspace, "boolVal", v, s, m, value) + if err != nil { cassaLog.WithFields(log.Fields{ "err": err, - }).Error("Cassandra client insertion error") - return err + }).Error("Cassandra client insertion error ") } } default: @@ -277,21 +280,27 @@ func convert(i interface{}) (interface{}, error) { return num, err } -func createCluster(server string) *gocql.ClusterConfig { - cluster := gocql.NewCluster(server) +func createCluster(config clientOptions) *gocql.ClusterConfig { + cluster := gocql.NewCluster(config.server) cluster.Consistency = gocql.One cluster.ProtoVersion = 4 - return cluster -} -func getSession(co clientOptions) *gocql.Session { - cluster := createCluster(co.server) + cluster.Timeout = config.timeout + cluster.ConnectTimeout = config.connectionTimeout + + cluster.DisableInitialHostLookup = !config.initialHostLookup + cluster.IgnorePeerAddr = config.ignorePeerAddr - if co.ssl != nil { - cluster = addSslOptions(cluster, co.ssl) + if config.ssl != nil { + cluster = addSslOptions(cluster, config.ssl) } - session := initializeSession(cluster) + return cluster +} + +func getSession(co clientOptions) *gocql.Session { + cluster := createCluster(co) + session := initializeSession(cluster, co) return session } @@ -322,21 +331,23 @@ func addSslOptions(cluster *gocql.ClusterConfig, options *sslOptions) *gocql.Clu return cluster } -func initializeSession(cluster *gocql.ClusterConfig) *gocql.Session { +func initializeSession(cluster *gocql.ClusterConfig, co clientOptions) *gocql.Session { session, err := cluster.CreateSession() if err != nil { log.Fatal(err.Error()) } - if err := session.Query(createKeyspaceCQL).Exec(); err != nil { - log.Fatal(err.Error()) + if co.createKeyspace { + if err := session.Query(fmt.Sprintf(createKeyspaceCQL, co.keyspace)).Exec(); err != nil { + log.Fatal(err.Error()) + } } - if err := session.Query(createTableCQL).Exec(); err != nil { + if err := session.Query(fmt.Sprintf(createTableCQL, co.keyspace, co.tableName)).Exec(); err != nil { log.Fatal(err.Error()) } - if err := session.Query(createTagTableCQL).Exec(); err != nil { + if err := session.Query(fmt.Sprintf(createTagTableCQL, co.keyspace)).Exec(); err != nil { log.Fatal(err.Error()) } return session diff --git a/glide.lock b/glide.lock index 0883fc6..7915ece 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ -hash: 336023be1e3b912d3f202165b07319d4190e4c59edd016e65355cbc412fe7f22 -updated: 2016-11-16T19:32:00.959436057-08:00 +hash: e3649f79d3a330272250bab0c35dec63d4fde8f9d47063cd00247177d94fc56a +updated: 2017-04-06T12:00:49.756312993+02:00 imports: - name: github.com/gocql/gocql - version: 03ae28ccfc4fa086afea7fcec681bf7ff60f0346 + version: f8b6dcaf4f8a770652c11e7e94b517fcb706fce8 subpackages: - internal/lru - internal/murmur @@ -47,6 +47,12 @@ imports: - convey - convey/gotest - convey/reporting +- name: golang.org/x/net + version: 04557861f124410b768b1ba5bb3a91b705afbfc6 + subpackages: + - context + - http2 + - trace - name: golang.org/x/sys version: c8bc69bc2db9c57ccf979550bc69655df5039a8a subpackages: diff --git a/glide.yaml b/glide.yaml index fa90f67..249a139 100644 --- a/glide.yaml +++ b/glide.yaml @@ -3,7 +3,7 @@ import: - package: github.com/Sirupsen/logrus version: be52937128b38f1d99787bb476c789e2af1147f1 - package: github.com/gocql/gocql - version: 03ae28ccfc4fa086afea7fcec681bf7ff60f0346 + version: f8b6dcaf4f8a770652c11e7e94b517fcb706fce8 subpackages: - internal/lru - internal/murmur