Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dispensable committed Jan 9, 2024
1 parent 5c4060f commit 2fbbb81
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 118 deletions.
20 changes: 3 additions & 17 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,21 @@ jobs:
strategy:
matrix:
go-version: [1.20.x, 1.21.x]
python-version: [3.x]
platform: [ubuntu-latest, macos-latest]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ matrix.go-version }}

- name: Install Python
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}

- name: Checkout code
uses: actions/checkout@v1

- name: Get test tools
run: go get -u -v golang.org/x/tools/cmd/goimports

- name: Prepare Test
run: pip install --user -r tests/pip-req.txt

- name: Test
run: |
export PATH=${PATH}:`go env GOPATH`/bin
diff <(goimports -d .) <(printf "")
go mod vendor
go get -u -v github.com/douban/gobeansdb
go mod tidy
go install github.com/douban/gobeansdb@latest
make test
- name: Install
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ all:install

export PYTHONPATH=.

.PHONY: test
test:
./misc/gobeansdb_server.sh start
go version
go test github.com/douban/gobeansproxy/config
go test github.com/douban/gobeansproxy/dstore
./misc/gobeansdb_server.sh stop

template:
rm -r /var/lib/gobeansproxy/templates
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ A proxy for [Gobeansdb](https://github.com/douban/gobeansdb).

## Prepare

Supported Go version: > 1.11.0
Supported Go version: > 1.20.0

## Install

```
$ git clone http://github.com/douban/gobeansproxy.git
$ cd gobeansproxy
$ go mod vendor
$ go mod tidy
$ make
```

Expand Down
31 changes: 30 additions & 1 deletion cassandra/prefix_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type PrefixSwitcher struct {
defaultT PrefixSwitchStatus
lock sync.RWMutex
currentTrieMap map[string]string
cstarEnabled bool
}

func (s PrefixSwitchStatus) IsReadOnBeansdb() bool {
Expand Down Expand Up @@ -147,13 +148,21 @@ func GetPrefixSwitchTrieFromCfg(
}

func NewPrefixSwitcher(config *config.CassandraStoreCfg, cqlStore *CassandraStore) (*PrefixSwitcher, error) {
f := new(PrefixSwitcher)

if !config.Enable {
f.defaultT = PrefixSwitchBrw
f.cstarEnabled = false
return f, nil
}

prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(config, cqlStore)
if err != nil {
return nil, err
}

f := new(PrefixSwitcher)
f.trie = prefixTrie
f.cstarEnabled = true

defaultS, err := strToSwitchStatus(config.SwitchToKeyDefault)
if err != nil {
Expand Down Expand Up @@ -196,19 +205,31 @@ func (s *PrefixSwitcher) matchStatus(key string) PrefixSwitchStatus {
}

func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus {
if !s.cstarEnabled {
return PrefixSwitchBrw
}

s.lock.RLock()
defer s.lock.RUnlock()
return s.matchStatus(key)
}

// check key prefix and return bdb read enable c* read enable
func (s *PrefixSwitcher) ReadEnabledOn(key string) (bool, bool) {
if !s.cstarEnabled {
return true, false
}
status := s.GetStatus(key)
return status.IsReadOnBeansdb(), status.IsReadOnCstar()
}

// check keys prefix list and return bdb read keys and c* read keys
func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys []string) {
if !s.cstarEnabled {
bkeys = keys
return
}

s.lock.RLock()
defer s.lock.RUnlock()

Expand All @@ -230,6 +251,9 @@ func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys

// check key prefix and return bdb write enable c* write enable
func (s *PrefixSwitcher) WriteEnabledOn(key string) (bool, bool) {
if !s.cstarEnabled {
return true, false
}
status := s.GetStatus(key)
return status.IsWriteOnBeansdb(), status.IsWriteOnCstar()
}
Expand All @@ -252,6 +276,11 @@ func (s *PrefixSwitcher) LoadStaticCfg(cfgDir string) (*config.CassandraStoreCfg
}

func (s *PrefixSwitcher) LoadCfg(cfg *config.CassandraStoreCfg, cqlStore *CassandraStore) error {
if !cfg.Enable {
logger.Errorf("You can't use prefix switcher when c* backend disabled")
return fmt.Errorf("can't load prefix swicher cfg when cassandra backend disabled")
}

if !cfg.PrefixRWDispatcherCfg.Enable {
logger.Errorf("You can't disable rw dispatcher online")
return fmt.Errorf("You can't disable rw dispathcer online")
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

const (
Version = "v2.0.2-rc5"
Version = "v2.0.3"
)

var (
Expand Down
15 changes: 14 additions & 1 deletion dstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type Storage struct {
}

func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error {
if !pCfg.CassandraStoreCfg.Enable && !pCfg.DStoreConfig.Enable {
return fmt.Errorf("You must enable at least one store engine")
}

if pCfg.CassandraStoreCfg.Enable {
cstar, err := cassandra.NewCassandraStore(&proxyConf.CassandraStoreCfg)
if err != nil {
Expand All @@ -58,6 +62,12 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error {
}
s.dualWErrHandler = dualWErrHandler
logger.Infof("dual write log send to: %s", s.dualWErrHandler.EFile)
} else {
switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, nil)
if err != nil {
return err
}
s.PSwitcher = switcher
}
return nil
}
Expand Down Expand Up @@ -107,7 +117,10 @@ func NewStorageClient(n int, w int, r int,
c.pswitcher = pStoreSwitcher
c.dualWErrHandler = dualEHandler
c.proxyHostName = fmt.Sprintf("%s:%d", proxyConf.Hostname, proxyConf.Port)
c.cstarClusterName = c.cstar.ClusterName
if c.cstar != nil {
// for user disabled cstar store
c.cstarClusterName = c.cstar.ClusterName
}
return c
}

Expand Down
135 changes: 122 additions & 13 deletions dstore/store_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,129 @@
package dstore

import (
"errors"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"testing"
"time"

dbcfg "github.com/douban/gobeansdb/gobeansdb"
mc "github.com/douban/gobeansdb/memcache"
yaml "gopkg.in/yaml.v2"

"github.com/douban/gobeansproxy/config"
"github.com/douban/gobeansproxy/utils"
"github.com/stretchr/testify/assert"
)

var testDataDir = flag.String("testDataDir", "/tmp/gobeansdbproxy/bdb/data/", "this dir will be used by gobeansdb and proxy")


func setupSuite(tb testing.TB) func(tb testing.TB) {
user, err := user.Current()
if err != nil {
tb.Fatalf("get username err: %s", err)
}
gopath := os.Getenv("GOPATH")
gobeansdbBin := filepath.Join(gopath, "bin", "gobeansdb")

if _, err := os.Stat(gobeansdbBin); errors.Is(err, os.ErrNotExist) {
tb.Fatalf("gobeansdb binary not exists, %s", gobeansdbBin)
}

projDir := utils.GetProjectHomeDir()

allGobeansdb := []*exec.Cmd{}
for _, p := range []string{"57980", "57981", "57982", "57983"} {
conn, _ := net.DialTimeout("tcp", net.JoinHostPort("localhost", p), time.Second)
if conn != nil {
conn.Close()
tb.Logf("%s port already listening ignore start ...", p)
continue
}

// we modify config when developer run test without container
gobeansdbCfg := fmt.Sprintf("%s/.doubanpde/scripts/bdb/gobeansproxy/%s/conf/", projDir, p)
cfgParsed := dbcfg.DBConfig{}
yfile, err := ioutil.ReadFile(filepath.Join(gobeansdbCfg, "global.yaml"))
if err != nil {
tb.Fatal(err)
}
err = yaml.Unmarshal(yfile, &cfgParsed)
if err != nil {
tb.Fatalf("load cfg %s err: %s", gobeansdbCfg, err)
}
dataPath := filepath.Join(*testDataDir, p, user.Username, "data")
logPath := filepath.Join(*testDataDir, p, user.Username, "log")
for _, pp := range []string{dataPath, logPath} {
err = os.MkdirAll(pp, os.ModePerm)
if err != nil {
tb.Fatalf("create dir %s err: %s", pp, err)
}
}
cfgParsed.ServerConfig.AccessLog = filepath.Join(logPath, "access.log")
cfgParsed.ServerConfig.ErrorLog = filepath.Join(logPath, "error.log")
cfgParsed.HStoreConfig.DBLocalConfig.Home = dataPath
gobeansdbTestCfg := fmt.Sprintf("%s/.doubanpde/scripts/bdb/gobeansproxy/%s/testconf/", projDir, p)
err = os.MkdirAll(gobeansdbTestCfg, os.ModePerm)
if err != nil {
tb.Fatalf("create dir %s err: %s", gobeansdbTestCfg, err)
}
c, err := yaml.Marshal(cfgParsed)
if err != nil {
tb.Fatalf("marshal cfg err: %s", err)
}

dbGlobalCfg := filepath.Join(gobeansdbTestCfg, "global.yaml")
f, err := os.OpenFile(dbGlobalCfg, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
tb.Fatal(err)
}
defer f.Close()
_, err = f.Write(c)
if err != nil {
tb.Fatal(err)
}
routeCfg := filepath.Join(gobeansdbTestCfg, "route.yaml")
rcfg, err := ioutil.ReadFile(filepath.Join(gobeansdbCfg, "route.yaml"))
if err != nil {
tb.Fatal(err)
}
err = ioutil.WriteFile(routeCfg, rcfg, 0644)
if err != nil {
tb.Fatal(err)
}

cmd := exec.Command(
gobeansdbBin,
"-confdir",
gobeansdbTestCfg,
)
if err := cmd.Start(); err != nil {
tb.Fatalf("failed to start %s gobeansdb: %s", p, err)
}
tb.Logf("start %s with pid: %d", cmd, cmd.Process.Pid)
allGobeansdb = append(allGobeansdb, cmd)
}
// wait some time let the server started
time.Sleep(time.Second * 5)

return func(tb testing.TB) {
for _, execCmd := range allGobeansdb {
if err := execCmd.Process.Kill(); err != nil {
tb.Fatalf("failed to kill process %s: %s", execCmd, err)
}
}
}
}

func testClientSet(t *testing.T, c mc.StorageClient, key string, val []byte) {
assert := assert.New(t)
flag := 2
Expand All @@ -28,7 +140,7 @@ func testClientSet(t *testing.T, c mc.StorageClient, key string, val []byte) {

assert.Equal(val, v.Body)
assert.Equal(flag, v.Flag)
assert.Equal(1, len(getHosts))
assert.Equal(2, len(getHosts))
assert.True(hasIntersection(setHosts, getHosts))
}

Expand Down Expand Up @@ -70,9 +182,8 @@ func testStoreClient(t *testing.T, c mc.StorageClient) {

r, _ := c.Get(key1)
assert.Nil(r)
assert.True(len(c.GetSuccessedTargets()) > 0)
assert.True(len(c.GetSuccessedTargets()) > 2)
c.Clean()
assert.True(len(c.GetSuccessedTargets()) == 0)

// set
key2 := "/test/client/2"
Expand Down Expand Up @@ -108,13 +219,6 @@ func testStoreClient(t *testing.T, c mc.StorageClient) {
val4 := make([]byte, 1024*1000)
testClientSet(t, c, key4, val4)

// incr
key5 := "/test/client/5"
v5, _ := c.Incr(key5, 3)
assert.Equal(3, v5)
v5, _ = c.Incr(key5, 5)
assert.Equal(8, v5)

// delete
key6 := "/test/client/6"
val6 := []byte("value 6")
Expand All @@ -125,14 +229,19 @@ func testStoreClient(t *testing.T, c mc.StorageClient) {
assert.Nil(v6)
}

func TestStore(t *testing.T) {
func TestDStoreOnly(t *testing.T) {
teardown := setupSuite(t)
defer teardown(t)

homeDir := utils.GetProjectHomeDir()
confdir := path.Join(homeDir, "conf")
confdir := path.Join(homeDir, ".doubanpde", "scripts", "bdb", "gobeansproxy", "dstore-only", "conf")
proxyConf := &config.Proxy
proxyConf.Load(confdir)

InitGlobalManualScheduler(config.Route, proxyConf.N)
c := NewStorageClient(proxyConf.N, proxyConf.W, proxyConf.R)
storage := new(Storage)
storage.InitStorageEngine(proxyConf)
c := NewStorageClient(proxyConf.N, proxyConf.W, proxyConf.R, storage.cstar, storage.PSwitcher, storage.dualWErrHandler)

testStoreClient(t, c)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 // indirect
golang.org/x/sys v0.10.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
Loading

0 comments on commit 2fbbb81

Please sign in to comment.