Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature_support_multi-endpoints #1147

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type InstanceInfo struct {
RouterIP string `json:"router-ip"`
FwdMode string `json:"fwd-mode"`
ArpMode string `json:"arp-mode"`
DbURL string `json:"db-url"`
DbURL []string `json:"db-url"`
PluginMode string `json:"plugin-mode"`
HostPvtNW int `json:"host-pvt-nw"`
VxlanUDPPort int `json:"vxlan-port"`
Expand Down
8 changes: 4 additions & 4 deletions netmaster/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type MasterDaemon struct {
ListenURL string // URL where netmaster listens for ext requests
ControlURL string // URL where netmaster listens for ctrl pkts
ClusterStoreDriver string // state store driver name
ClusterStoreURL string // state store endpoint
ClusterStoreURL []string // state store endpoint
ClusterMode string // cluster scheduler used docker/kubernetes/mesos etc
NetworkMode string // network mode (vlan or vxlan)
NetForwardMode string // forwarding mode (bridge or routing)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (d *MasterDaemon) Init() {
// initialize state driver
d.stateDriver, err = utils.NewStateDriver(d.ClusterStoreDriver, &core.InstanceInfo{DbURL: d.ClusterStoreURL})
if err != nil {
log.Fatalf("Failed to init state-store: driver %q, URLs %q. Error: %s", d.ClusterStoreDriver, d.ClusterStoreURL, err)
log.Fatalf("Failed to init state-store: driver %q, URLs %v. Error: %s", d.ClusterStoreDriver, d.ClusterStoreURL, err)
}

// Initialize resource manager
Expand All @@ -91,9 +91,9 @@ func (d *MasterDaemon) Init() {
}

// Create an objdb client
d.objdbClient, err = objdb.InitClient(d.ClusterStoreDriver, []string{d.ClusterStoreURL})
d.objdbClient, err = objdb.InitClient(d.ClusterStoreDriver, d.ClusterStoreURL)
if err != nil {
log.Fatalf("Error connecting to state store: driver %q, URLs %q. Err: %v", d.ClusterStoreDriver, d.ClusterStoreURL, err)
log.Fatalf("Error connecting to state store: driver %q, URLs %v. Err: %v", d.ClusterStoreDriver, d.ClusterStoreURL, err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions netmaster/docknet/docknet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (

// initStateDriver initialize etcd state driver
func initStateDriver() (core.StateDriver, error) {
instInfo := core.InstanceInfo{DbURL: "etcd://127.0.0.1:2379"}
instInfo := core.InstanceInfo{DbURL: []string{"etcd://127.0.0.1:2379"}}

return utils.NewStateDriver(utils.EtcdNameStr, &instInfo)
return utils.NewStateDriver(utils.EtcdNameStr, &instInfo),nil
}

// getDocknetState gets docknet oper state
Expand Down
4 changes: 2 additions & 2 deletions netmaster/k8snetwork/networkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ const (

// initStateDriver initialize etcd state driver
func initStateDriver() (core.StateDriver, error) {
instInfo := core.InstanceInfo{DbURL: "etcd://127.0.0.1:2379"}
instInfo := core.InstanceInfo{DbURL: []string{"etcd://127.0.0.1:2379"}}

return utils.NewStateDriver(utils.EtcdNameStr, &instInfo)
return utils.NewStateDriver(utils.EtcdNameStr, &instInfo),nil
}

// cleanupState cleans up default tenant and other global state
Expand Down
2 changes: 1 addition & 1 deletion netmaster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func initNetMaster(ctx *cli.Context) (*daemon.MasterDaemon, error) {
ListenURL: externalAddress,
ControlURL: internalAddress,
ClusterStoreDriver: dbConfigs.StoreDriver,
ClusterStoreURL: dbConfigs.StoreURL, //TODO: support more than one url
ClusterStoreURL: dbConfigs.StoreURL,
ClusterMode: netConfigs.Mode,
NetworkMode: netConfigs.NetworkMode,
NetForwardMode: netConfigs.ForwardMode,
Expand Down
4 changes: 2 additions & 2 deletions netmaster/objApi/objapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ var stateStore core.StateDriver

// initStateDriver initialize etcd state driver
func initStateDriver() (core.StateDriver, error) {
instInfo := core.InstanceInfo{DbURL: "etcd://127.0.0.1:2379"}
instInfo := core.InstanceInfo{DbURL: []string{"etcd://127.0.0.1:2379"}}

return utils.NewStateDriver(utils.EtcdNameStr, &instInfo)
return utils.NewStateDriver(utils.EtcdNameStr, &instInfo),nil
}

type restAPIFunc func(r *http.Request) (interface{}, error)
Expand Down
2 changes: 1 addition & 1 deletion netplugin/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewAgent(pluginConfig *plugin.Config) *Agent {
netPlugin := &plugin.NetPlugin{}

// init cluster state
err := cluster.Init(pluginConfig.Drivers.State, []string{opts.DbURL})
err := cluster.Init(pluginConfig.Drivers.State, opts.DbURL)
if err != nil {
log.Fatalf("Error initializing cluster. Err: %v", err)
}
Expand Down
16 changes: 8 additions & 8 deletions netplugin/agent/state_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ func processEpgEvent(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, ID str
func processReinit(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, newCfg *mastercfg.GlobConfig) {

// parse store URL
parts := strings.Split(opts.DbURL, "://")
if len(parts) < 2 {
log.Fatalf("Invalid cluster-store-url %s", opts.DbURL)
//parts := strings.Split(opts.DbURL, "://")
if len(opts.DbURL) == 0 {
log.Fatalf("Invalid cluster-store-url %v", opts.DbURL)
}
stateStore := parts[0]
stateStore := strings.Split(opts.DbURL[0], "://")[0]
// initialize the config
pluginConfig := plugin.Config{
Drivers: plugin.Drivers{
Expand Down Expand Up @@ -412,11 +412,11 @@ func processGlobalConfigUpdEvent(netPlugin *plugin.NetPlugin, opts core.Instance
func processARPModeChange(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, arpMode string) {

// parse store URL
parts := strings.Split(opts.DbURL, "://")
if len(parts) < 2 {
log.Fatalf("Invalid cluster-store-url %s", opts.DbURL)
//parts := strings.Split(opts.DbURL, "://")
if len(opts.DbURL) == 0 {
log.Fatalf("Invalid cluster-store-url %v", opts.DbURL)
}
stateStore := parts[0]
stateStore := strings.Split(opts.DbURL[0], "://")[0]
// initialize the config
pluginConfig := plugin.Config{
Drivers: plugin.Drivers{
Expand Down
4 changes: 2 additions & 2 deletions state/consulstatedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (d *ConsulStateDriver) Init(instInfo *core.InstanceInfo) error {
var err error
var endpoint *url.URL

if instInfo == nil || instInfo.DbURL == "" {
if instInfo == nil || len(instInfo.DbURL) == 0 {
return errors.New("no consul config found")
}
endpoint, err = url.Parse(instInfo.DbURL)
endpoint, err = url.Parse(instInfo.DbURL[0])
if err != nil {
return err
}
Expand Down
27 changes: 16 additions & 11 deletions state/etcdstatedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,26 @@ func (d *EtcdStateDriver) Init(instInfo *core.InstanceInfo) error {
var err error
var endpoint *url.URL

if instInfo == nil || instInfo.DbURL == "" {
if instInfo == nil || len(instInfo.DbURL) == 0 {
return errors.New("no etcd config found")
}
endpoint, err = url.Parse(instInfo.DbURL)
if err != nil {
return err
}
if endpoint.Scheme == "etcd" {
endpoint.Scheme = "http"
} else if endpoint.Scheme != "http" && endpoint.Scheme != "https" {
return core.Errorf("invalid etcd URL scheme %q", endpoint.Scheme)

for _,dburl := range instInfo.DbURL {

endpoint, err = url.Parse(dburl)
if err != nil {
return err
}

if endpoint.Scheme == "etcd" {
endpoint.Scheme = "http"
} else if endpoint.Scheme != "http" && endpoint.Scheme != "https" {
return core.Errorf("invalid etcd URL scheme %q", endpoint.Scheme)
}

}
// TODO: support multi-endpoints
etcdConfig := client.Config{
Endpoints: []string{endpoint.String()},
Endpoints: instInfo.DbURL,
}

d.Client, err = client.New(etcdConfig)
Expand Down
22 changes: 19 additions & 3 deletions test/integration/npcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package integration
import (
"os"
"time"

"strings"
log "github.com/Sirupsen/logrus"

"github.com/contiv/netplugin/contivmodel"
Expand All @@ -27,6 +27,9 @@ import (
"github.com/contiv/netplugin/netplugin/agent"
"github.com/contiv/netplugin/netplugin/plugin"
"github.com/contiv/netplugin/utils/netutils"
"github.com/contiv/netplugin/utils"
"net/url"
"fmt"
)

// NPCluster holds a new neplugin/netmaster cluster stats
Expand All @@ -40,6 +43,7 @@ type NPCluster struct {
// NewNPCluster creates a new cluster of netplugin/netmaster
func NewNPCluster(its *integTestSuite) (*NPCluster, error) {
// get local host name
storeURL := []string{}
hostLabel, err := os.Hostname()
if err != nil {
log.Fatalf("Failed to fetch hostname. Error: %s", err)
Expand All @@ -51,13 +55,25 @@ func NewNPCluster(its *integTestSuite) (*NPCluster, error) {
log.Fatalf("Error getting local address. Err: %v", err)
}


for _, endpoint := range utils.FilterEmpty(strings.Split(its.clusterStoreURL, ",")) {
_, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid endpoint: %v", endpoint)
}
// TODO: support multi-endpoints
storeURL = append(storeURL,endpoint)
log.Infof("Using state db endpoints: %v", storeURL)

}

// create master daemon
md := &daemon.MasterDaemon{
ListenURL: "0.0.0.0:9999",
ControlURL: "0.0.0.0:9999",
ClusterMode: "test",
ClusterStoreDriver: its.clusterStoreDriver,
ClusterStoreURL: its.clusterStoreURL,
ClusterStoreURL: storeURL,
NetworkMode: its.encap,
NetForwardMode: its.fwdMode,
NetInfraType: its.fabricMode,
Expand All @@ -73,7 +89,7 @@ func NewNPCluster(its *integTestSuite) (*NPCluster, error) {
CtrlIP: localIP,
VtepIP: localIP,
UplinkIntf: []string{"eth2"},
DbURL: its.clusterStoreURL,
DbURL: storeURL,
PluginMode: "test",
},
}
Expand Down
11 changes: 6 additions & 5 deletions utils/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func BuildDBFlags(binary string) []cli.Flag {
// DBConfigs validated db configs
type DBConfigs struct {
StoreDriver string
StoreURL string
StoreURL []string
}

// BuildLogFlags CLI logging flags for given binary
Expand Down Expand Up @@ -177,8 +177,9 @@ func InitLogging(binary string, ctx *cli.Context) error {
// ValidateDBOptions returns error if db options are not valid
func ValidateDBOptions(binary string, ctx *cli.Context) (*DBConfigs, error) {
var storeDriver string
var storeURL string
var storeURLs string

storeURL := []string{}
etcdURLs := ctx.String("etcd")
consulURLs := ctx.String("consul")

Expand All @@ -201,12 +202,12 @@ func ValidateDBOptions(binary string, ctx *cli.Context) (*DBConfigs, error) {
return nil, fmt.Errorf("invalid %s %v endpoint: %v", binary, storeDriver, endpoint)
}
// TODO: support multi-endpoints
storeURL = endpoint
storeURL = append(storeURL,endpoint)
logrus.Infof("Using %s state db endpoints: %v: %v", binary, storeDriver, storeURL)
break

}

if storeURL == "" {
if len(storeURL) == 0 {
return nil, fmt.Errorf("invalid %s %s endpoints: empty", binary, storeDriver)
}

Expand Down