Skip to content

Commit

Permalink
bench: add benchmark test functionality for Tarantool
Browse files Browse the repository at this point in the history
User setup Tarantool single node or cluster and try to understand
"How many specific traffic Tarantool can handle on this hardware"
The same official things are for redis, postgresql and aerospike.

Cartridge bench module makes some load for Tarantool.

user@cartridge-cli % ./cartridge bench
Tarantool 2.8.2 (Binary) f4897ffe-98dd-40fc-a6f2-21ca8bb52fe7

Parameters:
        URL: 127.0.0.1:3301
        user: guest
        connections: 10
        simultaneous requests: 10
        duration: 10 seconds
        key size: 10 bytes
        data size: 20 bytes
Data schema
|       key             |       value
------------------------------------------
|       random(10)      |       random(20)
Benchmark start
...
Benchmark stop

Results:
        Success operations:  1169481
        Failed  operations:  0
        Request count:  1170485
        Time (seconds):  10.000551801
        Requests per second:  117042

Part of tarantool#645
  • Loading branch information
Kirill-Churkin authored and LeonidVas committed Dec 7, 2021
1 parent 479a592 commit f04035c
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Tarantool benchmark tool (early alpha, API can be changed in the near future).
- Ability to reverse search in ``cartridge enter`` and ``cartridge connect`` commands.
- Added support for functionality from Golang 1.17.

Expand Down
138 changes: 138 additions & 0 deletions cli/bench/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package bench

import (
bctx "context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/common"
"github.com/tarantool/cartridge-cli/cli/context"
)

// printResults outputs benchmark foramatted results.
func printResults(results Results) {
fmt.Printf("\nResults:\n")
fmt.Printf("\tSuccess operations: %d\n", results.successResultCount)
fmt.Printf("\tFailed operations: %d\n", results.failedResultCount)
fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount)
fmt.Printf("\tTime (seconds): %f\n", results.duration)
fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond)
}

// spacePreset prepares space for a benchmark.
func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error {
dropBenchmarkSpace(tarantoolConnection)
return createBenchmarkSpace(tarantoolConnection)
}

// incrementRequest increases the counter of successful/failed requests depending on the presence of an error.
func incrementRequest(err error, results *Results) {
if err == nil {
results.successResultCount++
} else {
results.failedResultCount++
}
results.handledRequestsCount++
}

// requestsLoop continuously executes the insert query until the benchmark time runs out.
func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
for {
select {
case <-backgroundCtx.Done():
return
default:
_, err := tarantoolConnection.Exec(
tarantool.Insert(
benchSpaceName,
[]interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)}))
incrementRequest(err, results)
}
}
}

// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads through the same connection.
func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
var connectionWait sync.WaitGroup
for i := 0; i < ctx.SimultaneousRequests; i++ {
connectionWait.Add(1)
go func() {
defer connectionWait.Done()
requestsLoop(ctx, tarantoolConnection, results, backgroundCtx)
}()
}

connectionWait.Wait()
}

// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

// Connect to tarantool and preset space for benchmark.
tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
if err != nil {
return fmt.Errorf(
"Couldn't connect to Tarantool %s.",
ctx.URL)
}
defer tarantoolConnection.Close()

printConfig(ctx, tarantoolConnection)

if err := spacePreset(ctx, tarantoolConnection); err != nil {
return err
}

/// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements.
connectionPool := make([]*tarantool.Connection, ctx.Connections)
for i := 0; i < ctx.Connections; i++ {
connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
if err != nil {
return err
}
defer connectionPool[i].Close()
}

fmt.Println("Benchmark start")
fmt.Println("...")

// The "context" will be used to stop all "connectionLoop" when the time is out.
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())
var waitGroup sync.WaitGroup
results := Results{}

startTime := time.Now()
timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second)))

// Start detached connections.
for i := 0; i < ctx.Connections; i++ {
waitGroup.Add(1)
go func(connection *tarantool.Connection) {
defer waitGroup.Done()
connectionLoop(ctx, connection, &results, backgroundCtx)
}(connectionPool[i])
}
// Sends "signal" to all "connectionLoop" and waits for them to complete.
<-timer.C
cancel()
waitGroup.Wait()

results.duration = time.Since(startTime).Seconds()
results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration)

dropBenchmarkSpace(tarantoolConnection)
fmt.Println("Benchmark stop")

printResults(results)
return nil
}
34 changes: 34 additions & 0 deletions cli/bench/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package bench

import (
"fmt"
"os"
"text/tabwriter"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/context"
)

var (
benchSpaceName = "__benchmark_space__"
benchSpacePrimaryIndexName = "__bench_primary_key__"
)

// printConfig output formatted config parameters.
func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) {
fmt.Printf("%s\n", tarantoolConnection.Greeting().Version)
fmt.Printf("Parameters:\n")
fmt.Printf("\tURL: %s\n", ctx.URL)
fmt.Printf("\tuser: %s\n", ctx.User)
fmt.Printf("\tconnections: %d\n", ctx.Connections)
fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests)
fmt.Printf("\tduration: %d seconds\n", ctx.Duration)
fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize)
fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize)
fmt.Printf("Data schema\n")
w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "|\tkey\t|\tvalue\n")
fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n")
fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize)
w.Flush()
}
56 changes: 56 additions & 0 deletions cli/bench/space.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package bench

import (
"fmt"
"reflect"

"github.com/FZambia/tarantool"
)

// createBenchmarkSpace creates benchmark space with formatting and primary index.
func createBenchmarkSpace(tarantoolConnection *tarantool.Connection) error {
// Creating space.
createCommand := "return box.schema.space.create(...).name"
_, err := tarantoolConnection.Exec(tarantool.Eval(createCommand, []interface{}{benchSpaceName, map[string]bool{"if_not_exists": true}}))
if err != nil {
return err
}

// Formatting space.
formatCommand := fmt.Sprintf("box.space.%s:format", benchSpaceName)
_, err = tarantoolConnection.Exec(tarantool.Call(formatCommand, [][]map[string]string{
{
{"name": "key", "type": "string"},
{"name": "value", "type": "string"},
},
}))
if err != nil {
return err
}

// Creating primary index.
createIndexCommand := fmt.Sprintf("box.space.%s:create_index", benchSpaceName)
_, err = tarantoolConnection.Exec(tarantool.Call(createIndexCommand, []interface{}{
benchSpacePrimaryIndexName,
map[string]interface{}{
"parts": []string{"key"},
"if_not_exists": true,
},
}))
return err
}

// dropBenchmarkSpace deletes benchmark space.
func dropBenchmarkSpace(tarantoolConnection *tarantool.Connection) error {
checkCommand := fmt.Sprintf("return box.space.%s.index[0].name", benchSpaceName)
indexName, err := tarantoolConnection.Exec(tarantool.Eval(checkCommand, []interface{}{}))
if err != nil {
return err
}
if reflect.ValueOf(indexName.Data).Index(0).Elem().String() == benchSpacePrimaryIndexName {
dropCommand := fmt.Sprintf("box.space.%s:drop", benchSpaceName)
_, err := tarantoolConnection.Exec(tarantool.Call(dropCommand, []interface{}{}))
return err
}
return nil
}
10 changes: 10 additions & 0 deletions cli/bench/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package bench

// Results describes set of benchmark results.
type Results struct {
handledRequestsCount int // Count of all executed requests.
successResultCount int // Count of successful request in all connections.
failedResultCount int // Count of failed request in all connections.
duration float64 // Benchmark duration.
requestsPerSecond int // Cumber of requests per second - the main measured value.
}
33 changes: 33 additions & 0 deletions cli/commands/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package commands

import (
"github.com/apex/log"
"github.com/spf13/cobra"
"github.com/tarantool/cartridge-cli/cli/bench"
)

func init() {
var benchCmd = &cobra.Command{
Use: "bench",
Short: "Util for running benchmarks for Tarantool",
Long: "Benchmark utility that simulates running commands done by N clients at the same time sending M simultaneous queries",
Run: func(cmd *cobra.Command, args []string) {
if err := bench.Run(ctx.Bench); err != nil {
log.Fatalf(err.Error())
}
},
}
rootCmd.AddCommand(benchCmd)

configureFlags(benchCmd)

benchCmd.Flags().StringVar(&ctx.Bench.URL, "url", "127.0.0.1:3301", "Tarantool address")
benchCmd.Flags().StringVar(&ctx.Bench.User, "user", "guest", "Tarantool user for connection")
benchCmd.Flags().StringVar(&ctx.Bench.Password, "password", "", "Tarantool password for connection")

benchCmd.Flags().IntVar(&ctx.Bench.Connections, "connections", 10, "Number of concurrent connections")
benchCmd.Flags().IntVar(&ctx.Bench.SimultaneousRequests, "requests", 10, "Number of simultaneous requests per connection")
benchCmd.Flags().IntVar(&ctx.Bench.Duration, "duration", 10, "Duration of benchmark test (seconds)")
benchCmd.Flags().IntVar(&ctx.Bench.KeySize, "keysize", 10, "Size of key part of benchmark data (bytes)")
benchCmd.Flags().IntVar(&ctx.Bench.DataSize, "datasize", 20, "Size of value part of benchmark data (bytes)")
}
12 changes: 12 additions & 0 deletions cli/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Ctx struct {
Replicasets ReplicasetsCtx
Connect ConnectCtx
Failover FailoverCtx
Bench BenchCtx
}

type ProjectCtx struct {
Expand Down Expand Up @@ -173,3 +174,14 @@ type FailoverCtx struct {
ParamsJSON string
ProviderParamsJSON string
}

type BenchCtx struct {
URL string // URL - the URL of the tarantool used for testing
User string // User - username to connect to the tarantool.
Password string // Password to connect to the tarantool.
Connections int // Connections describes the number of connection to be used in the test.
SimultaneousRequests int // SimultaneousRequests describes the number of parallel requests from one connection.
Duration int // Duration describes test duration in seconds.
KeySize int // DataSize describes the size of key part of benchmark data (bytes).
DataSize int // DataSize describes the size of value part of benchmark data (bytes).
}
38 changes: 38 additions & 0 deletions test/integration/bench/test_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import signal
import socket
from subprocess import PIPE, STDOUT, Popen
from threading import Thread

import tenacity
from utils import consume_lines, run_command_and_get_output


@tenacity.retry(stop=tenacity.stop_after_delay(15), wait=tenacity.wait_fixed(1))
def wait_for_connect():
socket.create_connection(('127.0.0.1', 3301))


def test_bench(cartridge_cmd, request, tmpdir):
base_cmd = [cartridge_cmd, 'bench', '--duration=1']
tarantool_cmd = [
"tarantool",
"-e", f"""box.cfg{{listen="127.0.0.1:3301",work_dir=[[{tmpdir}]]}}""",
"-e", """box.schema.user.grant("guest","super",nil,nil,{if_not_exists=true})"""
]

env = os.environ.copy()
process = Popen(tarantool_cmd, stdout=PIPE, stderr=STDOUT, env=env)
thread = Thread(target=consume_lines, args=["3301", process.stdout])
thread.start()

def kill():
process.send_signal(signal.SIGKILL)
if thread is not None:
thread.join(5)
request.addfinalizer(kill)

wait_for_connect()

rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir)
assert rc == 0
8 changes: 8 additions & 0 deletions test/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import glob
import gzip
import json
import logging
import os
import re
import shutil
Expand Down Expand Up @@ -1438,3 +1439,10 @@ def get_tarantool_installer_cmd(package_manager):
return f"curl -L https://tarantool.io/installer.sh | \
VER={short_version} bash -s -- --type {tarantool_type} \
&& {package_manager} install -y tarantool"


def consume_lines(port, pipe):
logger = logging.getLogger(f'localhost:{port}')
with pipe:
for line in iter(pipe.readline, b''):
logger.warning(line.rstrip().decode('utf-8'))

0 comments on commit f04035c

Please sign in to comment.