Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Increase number of connectivity options
Browse files Browse the repository at this point in the history
  • Loading branch information
squall0gd authored and Michał Stachowski committed May 25, 2017
1 parent 894f89f commit 8c29070
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 155 deletions.
166 changes: 120 additions & 46 deletions cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/intelsdi-x/snap/control/plugin"
Expand All @@ -34,18 +35,26 @@ import (

const (
name = "cassandra"
version = 5
version = 6
pluginType = plugin.PublisherPluginType

serverAddrRuleKey = "server"
sslOptionsRuleKey = "ssl"
usernameRuleKey = "username"
passwordRuleKey = "password"
keyPathRuleKey = "keyPath"
certPathRuleKey = "certPath"
caPathRuleKey = "caPath"
certPathRuleKey = "certPath"
connectionTimeoutRuleKey = "connectionTimeout"
createKeyspaceRuleKey = "createKeyspace"
enableServerCertVerRuleKey = "serverCertVerification"
ignorePeerAddrRuleKey = "ignorePeerAddr"
initialHostLookupRuleKey = "initialHostLookup"
keyPathRuleKey = "keyPath"
keyspaceNameRuleKey = "keyspaceName"
passwordRuleKey = "password"
portRuleKey = "port"
serverAddrRuleKey = "server"
sslOptionsRuleKey = "ssl"
tableNameRuleKey = "tableName"
tagIndexRuleKey = "tagIndex"
timeoutRuleKey = "timeout"
usernameRuleKey = "username"
)

// Meta returns a plugin meta data
Expand All @@ -70,51 +79,91 @@ func (cas *CassandraPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
cp := cpolicy.New()
config := cpolicy.NewPolicyNode()

serverAddrRule, err := cpolicy.NewStringRule(serverAddrRuleKey, true)
caPathRule, err := cpolicy.NewStringRule(caPathRuleKey, false, "")
handleErr(err)
serverAddrRule.Description = "Cassandra server"
config.Add(serverAddrRule)
caPathRule.Description = "Path to the CA certificate for the Cassandra server"
config.Add(caPathRule)

useSslOptionsRule, err := cpolicy.NewBoolRule(sslOptionsRuleKey, false, false)
certPathRule, err := cpolicy.NewStringRule(certPathRuleKey, false, "")
handleErr(err)
useSslOptionsRule.Description = "Not required, if true, use ssl options to connect to the Cassandra, default: false"
config.Add(useSslOptionsRule)
certPathRule.Description = "Path to the self signed certificate for the Cassandra client"
config.Add(certPathRule)

usernameRule, err := cpolicy.NewStringRule(usernameRuleKey, false, "")
connectionTimeoutRule, err := cpolicy.NewIntegerRule(connectionTimeoutRuleKey, false, 2)
handleErr(err)
usernameRule.Description = "Name of a user used to authenticate to Cassandra"
config.Add(usernameRule)
connectionTimeoutRule.Description = "Initial connection timeout in seconds, default: 2"
config.Add(connectionTimeoutRule)

passwordRule, err := cpolicy.NewStringRule(passwordRuleKey, false, "")
createKeyspaceRule, err := cpolicy.NewBoolRule(createKeyspaceRuleKey, false, true)
handleErr(err)
passwordRule.Description = "Password used to authenticate to the Cassandra"
config.Add(passwordRule)
createKeyspaceRule.Description = "Create keyspace if it's not exist, default: true"
config.Add(createKeyspaceRule)

enableServerCertVerRule, err := cpolicy.NewBoolRule(enableServerCertVerRuleKey, false, true)
handleErr(err)
enableServerCertVerRule.Description = "If true, verify a hostname and a server key, default: true"
config.Add(enableServerCertVerRule)

ignorePeerAddrRule, err := cpolicy.NewBoolRule(ignorePeerAddrRuleKey, false, false)
handleErr(err)
ignorePeerAddrRule.Description = "Turn off cluster hosts tracking, default: false"
config.Add(ignorePeerAddrRule)

initialHostLookupRule, err := cpolicy.NewBoolRule(initialHostLookupRuleKey, false, true)
handleErr(err)
initialHostLookupRule.Description = "Lookup for cluster hosts information, default: true"
config.Add(initialHostLookupRule)

keyPathRule, err := cpolicy.NewStringRule(keyPathRuleKey, false, "")
handleErr(err)
keyPathRule.Description = "Path to the private key for the Cassandra client"
config.Add(keyPathRule)

certPathRule, err := cpolicy.NewStringRule(certPathRuleKey, false, "")
keyspaceNameRule, err := cpolicy.NewStringRule(keyspaceNameRuleKey, false, "snap")
handleErr(err)
certPathRule.Description = "Path to the self signed certificate for the Cassandra client"
config.Add(certPathRule)
keyspaceNameRule.Description = "Keyspace name, default: snap"
config.Add(keyspaceNameRule)

caPathRule, err := cpolicy.NewStringRule(caPathRuleKey, false, "")
passwordRule, err := cpolicy.NewStringRule(passwordRuleKey, false, "")
handleErr(err)
caPathRule.Description = "Path to the CA certificate for the Cassandra server"
config.Add(caPathRule)
passwordRule.Description = "Password used to authenticate to the Cassandra"
config.Add(passwordRule)

enableServerCertVerRule, err := cpolicy.NewBoolRule(enableServerCertVerRuleKey, false, true)
portRule, err := cpolicy.NewIntegerRule(portRuleKey, false, 9042)
handleErr(err)
enableServerCertVerRule.Description = "If true, verify a hostname and a server key, default: true"
config.Add(enableServerCertVerRule)
portRule.Description = "Cassandra server port, default: 9042"
config.Add(portRule)

serverAddrRule, err := cpolicy.NewStringRule(serverAddrRuleKey, true)
handleErr(err)
serverAddrRule.Description = "Cassandra server"
config.Add(serverAddrRule)

useSslOptionsRule, err := cpolicy.NewBoolRule(sslOptionsRuleKey, false, false)
handleErr(err)
useSslOptionsRule.Description = "Not required, if true, use ssl options to connect to the Cassandra, default: false"
config.Add(useSslOptionsRule)

tableNameRule, err := cpolicy.NewStringRule(tableNameRuleKey, false, "metrics")
handleErr(err)
tableNameRule.Description = "Table name, default: metrics"
config.Add(tableNameRule)

tagIndexRule, err := cpolicy.NewStringRule(tagIndexRuleKey, false, "")
handleErr(err)
tagIndexRule.Description = "Name of tags to be indexed separated by a comma"
config.Add(tagIndexRule)

timeoutRule, err := cpolicy.NewIntegerRule(timeoutRuleKey, false, 2)
handleErr(err)
timeoutRule.Description = "Connection timeout in seconds, default: 2"
config.Add(timeoutRule)

usernameRule, err := cpolicy.NewStringRule(usernameRuleKey, false, "")
handleErr(err)
usernameRule.Description = "Name of a user used to authenticate to Cassandra"
config.Add(usernameRule)

cp.Add([]string{""}, config)
return cp, nil
}
Expand All @@ -140,23 +189,7 @@ func (cas *CassandraPublisher) Publish(contentType string, content []byte, confi

// Only initialize client once if possible
if cas.client == nil {
// Get all values for a new client.
useSslOptions, ok := getValueForKey(config, sslOptionsRuleKey).(bool)
checkAssertion(ok, sslOptionsRuleKey)

var sslOptions *sslOptions
if useSslOptions {
logger.Debug("using ssl options")
sslOptions = getSslOptions(config)
}

serverAddr, ok := getValueForKey(config, serverAddrRuleKey).(string)
checkAssertion(ok, serverAddrRuleKey)

co := clientOptions{
server: serverAddr,
ssl: sslOptions,
}
co := prepareClientOptions(config)

// Initialize a new client.
tagIndex, ok := getValueForKey(config, tagIndexRuleKey).(string)
Expand All @@ -173,6 +206,47 @@ func (cas *CassandraPublisher) Close() {
}
}

func prepareClientOptions(config map[string]ctypes.ConfigValue) clientOptions {
serverAddr, ok := getValueForKey(config, serverAddrRuleKey).(string)
checkAssertion(ok, serverAddrRuleKey)
serverPort, ok := getValueForKey(config, portRuleKey).(int)
checkAssertion(ok, portRuleKey)
timeout, ok := getValueForKey(config, timeoutRuleKey).(int)
checkAssertion(ok, timeoutRuleKey)
connTimeout, ok := getValueForKey(config, connectionTimeoutRuleKey).(int)
checkAssertion(ok, connectionTimeoutRuleKey)
initialHostLookup, ok := getValueForKey(config, initialHostLookupRuleKey).(bool)
checkAssertion(ok, initialHostLookupRuleKey)
ignorePeerAddr, ok := getValueForKey(config, ignorePeerAddrRuleKey).(bool)
checkAssertion(ok, ignorePeerAddrRuleKey)
keyspaceName, ok := getValueForKey(config, keyspaceNameRuleKey).(string)
checkAssertion(ok, keyspaceNameRuleKey)
createKeyspace, ok := getValueForKey(config, createKeyspaceRuleKey).(bool)
checkAssertion(ok, createKeyspaceRuleKey)
useSslOptions, ok := getValueForKey(config, sslOptionsRuleKey).(bool)
checkAssertion(ok, sslOptionsRuleKey)
tableName, ok := getValueForKey(config, tableNameRuleKey).(string)
checkAssertion(ok, tableNameRuleKey)

var sslOptions *sslOptions
if useSslOptions {
sslOptions = getSslOptions(config)
}

return clientOptions{
server: serverAddr,
port: serverPort,
timeout: time.Duration(timeout) * time.Second,
connectionTimeout: time.Duration(connTimeout) * time.Second,
initialHostLookup: initialHostLookup,
ignorePeerAddr: ignorePeerAddr,
keyspace: keyspaceName,
createKeyspace: createKeyspace,
ssl: sslOptions,
tableName: tableName,
}
}

func getValueForKey(cfg map[string]ctypes.ConfigValue, key string) interface{} {
if cfg == nil {
log.Error("Configuration of a plugin not found")
Expand Down Expand Up @@ -223,7 +297,7 @@ func getSslOptions(cfg map[string]ctypes.ConfigValue) *sslOptions {

func handleErr(e error) {
if e != nil {
log.Fatal(e.Error())
log.Fatalf("%s", e.Error())
}
}

Expand Down
20 changes: 18 additions & 2 deletions cassandra/cassandra_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,19 @@ import (
)

const (
connectionTimeout = 2
shouldCreateKeyspace = true
enableServerCertVerification = false
ignorePeerAddr = false
initialHostLookup = true
keyspaceName = "snap"
tableName = "foo"
password = "password"
port = 9042
serverAddress = "127.0.0.1"
sslOptionsFlag = true
timeout = 2
username = "username"
password = "password"
enableServerCertVerification = false
)

func TestCassandraPublish(t *testing.T) {
Expand All @@ -54,9 +62,17 @@ func TestCassandraPublish(t *testing.T) {
log.Fatal("SNAP_CASSANDRA_HOST is not set")
}

config[connectionTimeoutRuleKey] = ctypes.ConfigValueInt{Value: connectionTimeout}
config[createKeyspaceRuleKey] = ctypes.ConfigValueBool{Value: shouldCreateKeyspace}
config[ignorePeerAddrRuleKey] = ctypes.ConfigValueBool{Value: ignorePeerAddr}
config[initialHostLookupRuleKey] = ctypes.ConfigValueBool{Value: initialHostLookup}
config[keyspaceNameRuleKey] = ctypes.ConfigValueStr{Value: keyspaceName}
config[portRuleKey] = ctypes.ConfigValueInt{Value: port}
config[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: hostip}
config[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: false}
config[tagIndexRuleKey] = ctypes.ConfigValueStr{Value: "experimentId,mode,year"}
config[timeoutRuleKey] = ctypes.ConfigValueInt{Value: timeout}
config[tableNameRuleKey] = ctypes.ConfigValueStr{Value: tableName}

Convey("Publish integer metric", func() {
tags := map[string]string{core.STD_TAG_PLUGIN_RUNNING_ON: "hostname", "experimentId": "101"}
Expand Down
35 changes: 21 additions & 14 deletions cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import (
)

const (
enableServerCertVerification = false
keyspaceName = "snap"
password = "password"
path = "/some/path"
serverAddress = "127.0.0.1"
sslOptionsFlag = true
tableName = "metrics"
username = "username"
password = "password"
path = "/some/path"
enableServerCertVerification = false
)

func TestCassandraDBPlugin(t *testing.T) {
Expand Down Expand Up @@ -125,14 +127,16 @@ func TestSslOptions(t *testing.T) {

// Prepare test config with ssl options.
testConfig := make(map[string]ctypes.ConfigValue)
testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress}
testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag}
testConfig[usernameRuleKey] = ctypes.ConfigValueStr{Value: username}
testConfig[passwordRuleKey] = ctypes.ConfigValueStr{Value: password}
testConfig[caPathRuleKey] = ctypes.ConfigValueStr{Value: path}
testConfig[certPathRuleKey] = ctypes.ConfigValueStr{Value: path}
testConfig[keyPathRuleKey] = ctypes.ConfigValueStr{Value: path}
testConfig[enableServerCertVerRuleKey] = ctypes.ConfigValueBool{Value: enableServerCertVerification}
testConfig[keyPathRuleKey] = ctypes.ConfigValueStr{Value: path}
testConfig[keyspaceName] = ctypes.ConfigValueStr{Value: keyspaceName}
testConfig[passwordRuleKey] = ctypes.ConfigValueStr{Value: password}
testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress}
testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag}
testConfig[tableNameRuleKey] = ctypes.ConfigValueStr{Value: tableName}
testConfig[usernameRuleKey] = ctypes.ConfigValueStr{Value: username}

cfg, errs := configPolicy.Get([]string{""}).Process(testConfig)
Convey("So config policy should return a config after processing testConfig with valid ssl options", func() {
Expand All @@ -147,9 +151,10 @@ func TestSslOptions(t *testing.T) {
Convey("So received ssl options struct should have proper values for all keys", func() {
So(reflect.DeepEqual(expectedSslOptions, receivedSslOptions), ShouldBeTrue)
})
config := prepareClientOptions(testConfig)

// Prepare cluster for a given address.
cluster := createCluster(serverAddress)
cluster := createCluster(config)
Convey("So while creating cluster it should not be nil", func() {
So(cluster, ShouldNotBeNil)
})
Expand All @@ -168,14 +173,16 @@ func TestSslOptions(t *testing.T) {

// Prepare test config with invalid ssl options.
testConfig = make(map[string]ctypes.ConfigValue)
testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress}
testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag}
testConfig[usernameRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[passwordRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[caPathRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[certPathRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[keyPathRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[enableServerCertVerRuleKey] = ctypes.ConfigValueStr{Value: ""}
testConfig[keyPathRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[passwordRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[serverAddrRuleKey] = ctypes.ConfigValueStr{Value: serverAddress}
testConfig[sslOptionsRuleKey] = ctypes.ConfigValueBool{Value: sslOptionsFlag}
testConfig[usernameRuleKey] = ctypes.ConfigValueInt{Value: 0}
testConfig[tableNameRuleKey] = ctypes.ConfigValueStr{Value: tableName}
testConfig[keyspaceName] = ctypes.ConfigValueStr{Value: keyspaceName}

cfg, errs = configPolicy.Get([]string{""}).Process(testConfig)
Convey("So config policy should not return a config after processing testConfig with invalid ssl options", func() {
Expand Down
Loading

0 comments on commit 8c29070

Please sign in to comment.