From f2dd92295d1e6276ae1bc967c95e40e0444580e7 Mon Sep 17 00:00:00 2001 From: Kirill-Churkin Date: Thu, 24 Feb 2022 15:41:12 +0300 Subject: [PATCH] bench: add the ability to set a load profile Patch for load profile with additional select and update operations and with support for benchmark space pre-filling. Added: - Flag "insert" - percentage of insert operations to bench space. - Flag "select" - percentage of select operations from bench space. - Flag "update" - percentage of update operations in bench space. - Flag "fill" - number of records to pre-fill the space. ``` user@cartridge-cli % ./cartridge bench --select=30 --update=30 --insert=40 --fill=100000 Tarantool 2.8.3 (Binary) f4897ffe-98dd-40fc-a6f2-21ca8bb52fe7 Parameters: URL: 127.0.0.1:3301 user: guest connections: 10 simultaneous requests: 10 duration: 10 seconds key size: 10 bytes data size: 20 bytes insert: 40 percentages select: 30 percentages update: 30 percentages Data schema | key | value | ------------------------------ | ------------------------------ | random(10) | random(20) The pre-filling of the space has started, because the insert operation is not specified or there was an explicit instruction for pre-filling. ... Pre-filling is finished. Number of records: 100000 Benchmark start. ... Benchmark stop. Results: Success operations: 1332979 Failed operations: 0 Request count: 1334004 Time (seconds): 10.000733 Requests per second: 133390 ``` Part of #645 --- CHANGELOG.md | 8 +++ cli/bench/bench.go | 103 +++++++++++++++++++++++---- cli/bench/config.go | 5 ++ cli/bench/requests.go | 73 +++++++++++++++++++ cli/bench/space.go | 83 +++++++++++++++++++++ cli/bench/types.go | 34 +++++++++ cli/commands/bench.go | 6 ++ cli/context/context.go | 4 ++ test/integration/bench/test_bench.py | 8 +++ 9 files changed, 312 insertions(+), 12 deletions(-) create mode 100644 cli/bench/requests.go diff --git a/CHANGELOG.md b/CHANGELOG.md index df6418f26..434e0eb12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. rpm policy [[1](http://ftp.rpm.org/max-rpm/ch-rpm-file-format.html)]). For example, dashes in RPM version (like `1.2.3-0`) is no longer supported. +### Added + +- Tarantool benchmark tool update (select and update operations) + * Flag "insert" - percentage of insert operations to bench space. + * Flag "select" - percentage of select operations from bench space. + * Flag "update" - percentage of update operations in bench space. + * Flag "fill" - number of records to pre-fill the space. + ## [2.11.0] - 2022-01-26 ### Changed diff --git a/cli/bench/bench.go b/cli/bench/bench.go index 5b1e32851..3fdec6fca 100644 --- a/cli/bench/bench.go +++ b/cli/bench/bench.go @@ -8,7 +8,6 @@ import ( "time" "github.com/FZambia/tarantool" - "github.com/tarantool/cartridge-cli/cli/common" "github.com/tarantool/cartridge-cli/cli/context" ) @@ -22,6 +21,17 @@ func printResults(results Results) { fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond) } +// verifyOperationsPercentage checks that the amount of operations percentage is 100. +func verifyOperationsPercentage(ctx *context.BenchCtx) error { + entire_percentage := ctx.InsertCount + ctx.SelectCount + ctx.UpdateCount + if entire_percentage != 100 { + return fmt.Errorf( + "The number of operations as a percentage should be equal to 100, " + + "note that by default the percentage of inserts is 100") + } + return nil +} + // spacePreset prepares space for a benchmark. func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error { dropBenchmarkSpace(tarantoolConnection) @@ -29,7 +39,7 @@ func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection } // incrementRequest increases the counter of successful/failed requests depending on the presence of an error. -func incrementRequest(err error, results *Results) { +func (results *Results) incrementRequestsCounters(err error) { if err == nil { results.successResultCount++ } else { @@ -39,39 +49,62 @@ func incrementRequest(err error, results *Results) { } // requestsLoop continuously executes the insert query until the benchmark time runs out. -func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) { +func requestsLoop(requestsSequence *RequestsSequence, backgroundCtx bctx.Context) { for { select { case <-backgroundCtx.Done(): return default: - _, err := tarantoolConnection.Exec( - tarantool.Insert( - benchSpaceName, - []interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)})) - incrementRequest(err, results) + request := requestsSequence.getNext() + request.operation(request) } } } -// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads through the same connection. -func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) { +// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads +// through the same connection. +func connectionLoop( + ctx *context.BenchCtx, + requestsSequence RequestsSequence, + backgroundCtx bctx.Context, +) { var connectionWait sync.WaitGroup for i := 0; i < ctx.SimultaneousRequests; i++ { connectionWait.Add(1) go func() { defer connectionWait.Done() - requestsLoop(ctx, tarantoolConnection, results, backgroundCtx) + requestsLoop(&requestsSequence, backgroundCtx) }() } connectionWait.Wait() } +// preFillBenchmarkSpaceIfRequired fills benchmark space +// if insert count = 0 or PreFillingCount flag is explicitly specified. +func preFillBenchmarkSpaceIfRequired(ctx context.BenchCtx, connectionPool []*tarantool.Connection) error { + if ctx.InsertCount == 0 || ctx.PreFillingCount != PreFillingCount { + fmt.Println("\nThe pre-filling of the space has started,\n" + + "because the insert operation is not specified\n" + + "or there was an explicit instruction for pre-filling.") + fmt.Println("...") + filledCount, err := fillBenchmarkSpace(ctx, connectionPool) + if err != nil { + return err + } + fmt.Printf("Pre-filling is finished. Number of records: %d\n\n", filledCount) + } + return nil +} + // Main benchmark function. func Run(ctx context.BenchCtx) error { rand.Seed(time.Now().UnixNano()) + if err := verifyOperationsPercentage(&ctx); err != nil { + return err + } + // Connect to tarantool and preset space for benchmark. tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{ User: ctx.User, @@ -103,6 +136,10 @@ func Run(ctx context.BenchCtx) error { defer connectionPool[i].Close() } + if err := preFillBenchmarkSpaceIfRequired(ctx, connectionPool); err != nil { + return err + } + fmt.Println("Benchmark start") fmt.Println("...") @@ -110,6 +147,11 @@ func Run(ctx context.BenchCtx) error { backgroundCtx, cancel := bctx.WithCancel(bctx.Background()) var waitGroup sync.WaitGroup results := Results{} + getRandomTupleCommand := fmt.Sprintf( + "box.space.%s.index.%s:random", + benchSpaceName, + benchSpacePrimaryIndexName, + ) startTime := time.Now() timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second))) @@ -119,7 +161,44 @@ func Run(ctx context.BenchCtx) error { waitGroup.Add(1) go func(connection *tarantool.Connection) { defer waitGroup.Done() - connectionLoop(ctx, connection, &results, backgroundCtx) + requestsSequence := RequestsSequence{ + []RequestsGenerator{ + { + Request{ + insertOperation, + ctx, + connection, + &results, + nil, + }, + ctx.InsertCount, + }, + { + Request{ + selectOperation, + ctx, + connection, + &results, + &getRandomTupleCommand, + }, + ctx.SelectCount, + }, + { + Request{ + updateOperation, + ctx, + connection, + &results, + &getRandomTupleCommand, + }, + ctx.UpdateCount, + }, + }, + 0, + ctx.InsertCount, + sync.Mutex{}, + } + connectionLoop(&ctx, requestsSequence, backgroundCtx) }(connectionPool[i]) } // Sends "signal" to all "connectionLoop" and waits for them to complete. diff --git a/cli/bench/config.go b/cli/bench/config.go index dae0c8005..f82f3b7a8 100644 --- a/cli/bench/config.go +++ b/cli/bench/config.go @@ -12,6 +12,7 @@ import ( var ( benchSpaceName = "__benchmark_space__" benchSpacePrimaryIndexName = "__bench_primary_key__" + PreFillingCount = 1000000 ) // printConfig output formatted config parameters. @@ -25,6 +26,10 @@ func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection fmt.Printf("\tduration: %d seconds\n", ctx.Duration) fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize) fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize) + fmt.Printf("\tinsert: %d percentages\n", ctx.InsertCount) + fmt.Printf("\tselect: %d percentages\n", ctx.SelectCount) + fmt.Printf("\tupdate: %d percentages\n\n", ctx.UpdateCount) + fmt.Printf("Data schema\n") w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) fmt.Fprintf(w, "|\tkey\t|\tvalue\n") diff --git a/cli/bench/requests.go b/cli/bench/requests.go new file mode 100644 index 000000000..ebd20cca2 --- /dev/null +++ b/cli/bench/requests.go @@ -0,0 +1,73 @@ +package bench + +import ( + "math/rand" + "reflect" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/common" +) + +// insertOperation execute insert operation. +func insertOperation(request Request) { + _, err := request.tarantoolConnection.Exec( + tarantool.Insert( + benchSpaceName, + []interface{}{ + common.RandomString(request.ctx.KeySize), + common.RandomString(request.ctx.DataSize), + })) + request.results.incrementRequestsCounters(err) +} + +// selectOperation execute select operation. +func selectOperation(request Request) { + _, err := request.tarantoolConnection.Exec(tarantool.Call( + *request.getRandomTupleCommand, + []interface{}{rand.Int()})) + request.results.incrementRequestsCounters(err) +} + +// updateOperation execute update operation. +func updateOperation(request Request) { + getRandomTupleResponse, err := request.tarantoolConnection.Exec( + tarantool.Call(*request.getRandomTupleCommand, + []interface{}{rand.Int()})) + if err == nil { + data := getRandomTupleResponse.Data + if len(data) > 0 { + key := reflect.ValueOf(data[0]).Index(0).Elem().String() + _, err := request.tarantoolConnection.Exec( + tarantool.Update( + benchSpaceName, + benchSpacePrimaryIndexName, + []interface{}{key}, + []tarantool.Op{tarantool.Op( + tarantool.OpAssign( + 2, + common.RandomString(request.ctx.DataSize)))})) + request.results.incrementRequestsCounters(err) + } + } +} + +// getNext return next operation in operations sequence. +func (requestsSequence *RequestsSequence) getNext() Request { + // If at the moment the number of remaining requests = 0, + // then find a new generator, which requests count > 0. + // If new generator has requests count = 0, then repeat. + requestsSequence.findNewRequestsGeneratorMutex.Lock() + for requestsSequence.currentCounter == 0 { + // Increase the index, which means logical switching to a new generator. + requestsSequence.currentRequestIndex++ + requestsSequence.currentRequestIndex %= len(requestsSequence.requests) + // Get new generator by index. + nextRequestsGenerator := requestsSequence.requests[requestsSequence.currentRequestIndex] + // Get requests count for new operation. + requestsSequence.currentCounter = nextRequestsGenerator.count + } + // Logical taking of a single request. + requestsSequence.currentCounter-- + requestsSequence.findNewRequestsGeneratorMutex.Unlock() + return requestsSequence.requests[requestsSequence.currentRequestIndex].request +} diff --git a/cli/bench/space.go b/cli/bench/space.go index 27211b244..e3074a5da 100644 --- a/cli/bench/space.go +++ b/cli/bench/space.go @@ -1,10 +1,14 @@ package bench import ( + bctx "context" "fmt" "reflect" + "sync" "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/common" + "github.com/tarantool/cartridge-cli/cli/context" ) // createBenchmarkSpace creates benchmark space with formatting and primary index. @@ -54,3 +58,82 @@ func dropBenchmarkSpace(tarantoolConnection *tarantool.Connection) error { } return nil } + +// fillBenchmarkSpace fills benchmark space +// with a PreFillingCount number of records +// using connectionPool for fast filling. +func fillBenchmarkSpace(ctx context.BenchCtx, connectionPool []*tarantool.Connection) (int, error) { + var insertMutex sync.Mutex + var waitGroup sync.WaitGroup + filledCount := 0 + errorChan := make(chan error, ctx.Connections) + backgroundCtx, cancel := bctx.WithCancel(bctx.Background()) + + for i := 0; i < ctx.Connections; i++ { + waitGroup.Add(1) + go func(tarantoolConnection *tarantool.Connection) { + defer waitGroup.Done() + for filledCount < ctx.PreFillingCount && len(errorChan) == 0 { + select { + case <-backgroundCtx.Done(): + return + default: + // Lock mutex for checking extra iteration and increment counter. + insertMutex.Lock() + if filledCount == ctx.PreFillingCount { + insertMutex.Unlock() + return + } + filledCount++ + insertMutex.Unlock() + _, err := tarantoolConnection.Exec(tarantool.Insert( + benchSpaceName, + []interface{}{ + common.RandomString(ctx.KeySize), + common.RandomString(ctx.DataSize), + }, + )) + if err != nil { + fmt.Println(err) + errorChan <- err + return + } + } + } + }(connectionPool[i]) + } + + // Thread for checking error in channel. + go func() { + for { + select { + case <-backgroundCtx.Done(): + return + default: + if len(errorChan) > 0 { + // Stop "insert" threads + cancel() + return + } + } + } + }() + + waitGroup.Wait() + // Stop all threads + // If "error" thread stopped others "insert" threads, "error" thread stops itself + // If "insert" threads successfully completed, then need to stop "error" thread + cancel() + + // Check if we have an error + if len(errorChan) > 0 { + err := <-errorChan + close(errorChan) + return filledCount, fmt.Errorf( + "Error during space pre-filling: %s.", + err.Error()) + } + close(errorChan) + + return filledCount, nil +} diff --git a/cli/bench/types.go b/cli/bench/types.go index 8398b2ab5..08f3abc16 100644 --- a/cli/bench/types.go +++ b/cli/bench/types.go @@ -1,5 +1,12 @@ package bench +import ( + "sync" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/context" +) + // Results describes set of benchmark results. type Results struct { handledRequestsCount int // Count of all executed requests. @@ -8,3 +15,30 @@ type Results struct { duration float64 // Benchmark duration. requestsPerSecond int // Cumber of requests per second - the main measured value. } + +// Request describes various types of requests. +type Request struct { + operation func(Request) // insertOperation, selectOperation or updateOperation. + ctx context.BenchCtx + tarantoolConnection *tarantool.Connection + results *Results + getRandomTupleCommand *string // for selectOperation or updateOperation. +} + +// RequestsGenerator data structure for abstraction of a renewable heap of identical requests. +type RequestsGenerator struct { + request Request // Request with specified operation. + count int // Count of requests. +} + +// RequestsSequence data structure for abstraction for the constant issuance of new requests. +type RequestsSequence struct { + requests []RequestsGenerator + // currentRequestIndex describes what type of request will be issued by the sequence. + currentRequestIndex int + // currentCounter describes how many requests of the same type + // are left to issue from RequestsPool. + currentCounter int + // findNewRequestsGeneratorMutex provides thread-safe search for new generator. + findNewRequestsGeneratorMutex sync.Mutex +} diff --git a/cli/commands/bench.go b/cli/commands/bench.go index 8e408a6e8..93ba4bb40 100644 --- a/cli/commands/bench.go +++ b/cli/commands/bench.go @@ -30,4 +30,10 @@ func init() { benchCmd.Flags().IntVar(&ctx.Bench.Duration, "duration", 10, "Duration of benchmark test (seconds)") benchCmd.Flags().IntVar(&ctx.Bench.KeySize, "keysize", 10, "Size of key part of benchmark data (bytes)") benchCmd.Flags().IntVar(&ctx.Bench.DataSize, "datasize", 20, "Size of value part of benchmark data (bytes)") + + benchCmd.Flags().IntVar(&ctx.Bench.InsertCount, "insert", 100, "percentage of inserts") + benchCmd.Flags().IntVar(&ctx.Bench.SelectCount, "select", 0, "percentage of selects") + benchCmd.Flags().IntVar(&ctx.Bench.UpdateCount, "update", 0, "percentage of updates") + benchCmd.Flags().IntVar(&ctx.Bench.PreFillingCount, "fill", bench.PreFillingCount, "number of records to pre-fill the space") + } diff --git a/cli/context/context.go b/cli/context/context.go index 02433b54a..b98daf171 100644 --- a/cli/context/context.go +++ b/cli/context/context.go @@ -185,4 +185,8 @@ type BenchCtx struct { Duration int // Duration describes test duration in seconds. KeySize int // DataSize describes the size of key part of benchmark data (bytes). DataSize int // DataSize describes the size of value part of benchmark data (bytes). + InsertCount int // InsertCount describes the number of insert operations as a percentage. + SelectCount int // SelectCount describes the number of select operations as a percentage. + UpdateCount int // UpdateCount describes the number of update operations as a percentage. + PreFillingCount int // PreFillingCount describes the number of records to pre-fill the space. } diff --git a/test/integration/bench/test_bench.py b/test/integration/bench/test_bench.py index c0a8cb247..f2ef36bea 100644 --- a/test/integration/bench/test_bench.py +++ b/test/integration/bench/test_bench.py @@ -36,3 +36,11 @@ def kill(): rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir) assert rc == 0 + + base_cmd = [cartridge_cmd, 'bench', '--duration=1', '--fill=1000'] + rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir) + assert rc == 0 + + base_cmd = [cartridge_cmd, 'bench', '--duration=1', '--insert=0', '--select=50', '--update=50'] + rc, output = run_command_and_get_output(base_cmd, cwd=tmpdir) + assert rc == 0