Skip to content

Commit

Permalink
bench: add the ability to set a load profile
Browse files Browse the repository at this point in the history
Patch for load profile with additional select and update operations and with support for benchmark space pre-filling.

Added:
- Flag "insert" - percentage of insert operations to bench space.
- Flag "select" - percentage of select operations from bench space.
- Flag "update" - percentage of update operations in bench space.
- Flag "fill" - number of records to pre-fill the space.

```
user@cartridge-cli % ./cartridge bench --select=30 --update=30 --insert=40 --fill=100000
Tarantool 2.8.3 (Binary) f4897ffe-98dd-40fc-a6f2-21ca8bb52fe7

Parameters:
        URL: 127.0.0.1:3301
        user: guest
        connections: 10
        simultaneous requests: 10
        duration: 10 seconds
        key size: 10 bytes
        data size: 20 bytes
        insert: 40 percentages
        select: 30 percentages
        update: 30 percentages

Data schema
| key                            | value
| ------------------------------ | ------------------------------
| random(10)                     | random(20)

The pre-filling of the space has started,
because the insert operation is not specified
or there was an explicit instruction for pre-filling.
...
Pre-filling is finished. Number of records: 100000

Benchmark start.
...
Benchmark stop.

Results:
        Success operations: 1332979
        Failed  operations: 0
        Request count: 1334004
        Time (seconds): 10.000733
        Requests per second: 133390
```

Part of tarantool#645
  • Loading branch information
Kirill-Churkin committed Feb 24, 2022
1 parent 0a1d653 commit f2dd922
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 12 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
rpm policy [[1](http://ftp.rpm.org/max-rpm/ch-rpm-file-format.html)]).
For example, dashes in RPM version (like `1.2.3-0`) is no longer supported.

### Added

- Tarantool benchmark tool update (select and update operations)
* Flag "insert" - percentage of insert operations to bench space.
* Flag "select" - percentage of select operations from bench space.
* Flag "update" - percentage of update operations in bench space.
* Flag "fill" - number of records to pre-fill the space.

## [2.11.0] - 2022-01-26

### Changed
Expand Down
103 changes: 91 additions & 12 deletions cli/bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/common"
"github.com/tarantool/cartridge-cli/cli/context"
)

Expand All @@ -22,14 +21,25 @@ func printResults(results Results) {
fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond)
}

// verifyOperationsPercentage checks that the amount of operations percentage is 100.
func verifyOperationsPercentage(ctx *context.BenchCtx) error {
entire_percentage := ctx.InsertCount + ctx.SelectCount + ctx.UpdateCount
if entire_percentage != 100 {
return fmt.Errorf(
"The number of operations as a percentage should be equal to 100, " +
"note that by default the percentage of inserts is 100")
}
return nil
}

// spacePreset prepares space for a benchmark.
func spacePreset(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) error {
dropBenchmarkSpace(tarantoolConnection)
return createBenchmarkSpace(tarantoolConnection)
}

// incrementRequest increases the counter of successful/failed requests depending on the presence of an error.
func incrementRequest(err error, results *Results) {
func (results *Results) incrementRequestsCounters(err error) {
if err == nil {
results.successResultCount++
} else {
Expand All @@ -39,39 +49,62 @@ func incrementRequest(err error, results *Results) {
}

// requestsLoop continuously executes the insert query until the benchmark time runs out.
func requestsLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
func requestsLoop(requestsSequence *RequestsSequence, backgroundCtx bctx.Context) {
for {
select {
case <-backgroundCtx.Done():
return
default:
_, err := tarantoolConnection.Exec(
tarantool.Insert(
benchSpaceName,
[]interface{}{common.RandomString(ctx.KeySize), common.RandomString(ctx.DataSize)}))
incrementRequest(err, results)
request := requestsSequence.getNext()
request.operation(request)
}
}
}

// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads through the same connection.
func connectionLoop(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection, results *Results, backgroundCtx bctx.Context) {
// connectionLoop runs "ctx.SimultaneousRequests" requests execution threads
// through the same connection.
func connectionLoop(
ctx *context.BenchCtx,
requestsSequence RequestsSequence,
backgroundCtx bctx.Context,
) {
var connectionWait sync.WaitGroup
for i := 0; i < ctx.SimultaneousRequests; i++ {
connectionWait.Add(1)
go func() {
defer connectionWait.Done()
requestsLoop(ctx, tarantoolConnection, results, backgroundCtx)
requestsLoop(&requestsSequence, backgroundCtx)
}()
}

connectionWait.Wait()
}

// preFillBenchmarkSpaceIfRequired fills benchmark space
// if insert count = 0 or PreFillingCount flag is explicitly specified.
func preFillBenchmarkSpaceIfRequired(ctx context.BenchCtx, connectionPool []*tarantool.Connection) error {
if ctx.InsertCount == 0 || ctx.PreFillingCount != PreFillingCount {
fmt.Println("\nThe pre-filling of the space has started,\n" +
"because the insert operation is not specified\n" +
"or there was an explicit instruction for pre-filling.")
fmt.Println("...")
filledCount, err := fillBenchmarkSpace(ctx, connectionPool)
if err != nil {
return err
}
fmt.Printf("Pre-filling is finished. Number of records: %d\n\n", filledCount)
}
return nil
}

// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

if err := verifyOperationsPercentage(&ctx); err != nil {
return err
}

// Connect to tarantool and preset space for benchmark.
tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Expand Down Expand Up @@ -103,13 +136,22 @@ func Run(ctx context.BenchCtx) error {
defer connectionPool[i].Close()
}

if err := preFillBenchmarkSpaceIfRequired(ctx, connectionPool); err != nil {
return err
}

fmt.Println("Benchmark start")
fmt.Println("...")

// The "context" will be used to stop all "connectionLoop" when the time is out.
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())
var waitGroup sync.WaitGroup
results := Results{}
getRandomTupleCommand := fmt.Sprintf(
"box.space.%s.index.%s:random",
benchSpaceName,
benchSpacePrimaryIndexName,
)

startTime := time.Now()
timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second)))
Expand All @@ -119,7 +161,44 @@ func Run(ctx context.BenchCtx) error {
waitGroup.Add(1)
go func(connection *tarantool.Connection) {
defer waitGroup.Done()
connectionLoop(ctx, connection, &results, backgroundCtx)
requestsSequence := RequestsSequence{
[]RequestsGenerator{
{
Request{
insertOperation,
ctx,
connection,
&results,
nil,
},
ctx.InsertCount,
},
{
Request{
selectOperation,
ctx,
connection,
&results,
&getRandomTupleCommand,
},
ctx.SelectCount,
},
{
Request{
updateOperation,
ctx,
connection,
&results,
&getRandomTupleCommand,
},
ctx.UpdateCount,
},
},
0,
ctx.InsertCount,
sync.Mutex{},
}
connectionLoop(&ctx, requestsSequence, backgroundCtx)
}(connectionPool[i])
}
// Sends "signal" to all "connectionLoop" and waits for them to complete.
Expand Down
5 changes: 5 additions & 0 deletions cli/bench/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
var (
benchSpaceName = "__benchmark_space__"
benchSpacePrimaryIndexName = "__bench_primary_key__"
PreFillingCount = 1000000
)

// printConfig output formatted config parameters.
Expand All @@ -25,6 +26,10 @@ func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection
fmt.Printf("\tduration: %d seconds\n", ctx.Duration)
fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize)
fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize)
fmt.Printf("\tinsert: %d percentages\n", ctx.InsertCount)
fmt.Printf("\tselect: %d percentages\n", ctx.SelectCount)
fmt.Printf("\tupdate: %d percentages\n\n", ctx.UpdateCount)

fmt.Printf("Data schema\n")
w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "|\tkey\t|\tvalue\n")
Expand Down
73 changes: 73 additions & 0 deletions cli/bench/requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package bench

import (
"math/rand"
"reflect"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/common"
)

// insertOperation execute insert operation.
func insertOperation(request Request) {
_, err := request.tarantoolConnection.Exec(
tarantool.Insert(
benchSpaceName,
[]interface{}{
common.RandomString(request.ctx.KeySize),
common.RandomString(request.ctx.DataSize),
}))
request.results.incrementRequestsCounters(err)
}

// selectOperation execute select operation.
func selectOperation(request Request) {
_, err := request.tarantoolConnection.Exec(tarantool.Call(
*request.getRandomTupleCommand,
[]interface{}{rand.Int()}))
request.results.incrementRequestsCounters(err)
}

// updateOperation execute update operation.
func updateOperation(request Request) {
getRandomTupleResponse, err := request.tarantoolConnection.Exec(
tarantool.Call(*request.getRandomTupleCommand,
[]interface{}{rand.Int()}))
if err == nil {
data := getRandomTupleResponse.Data
if len(data) > 0 {
key := reflect.ValueOf(data[0]).Index(0).Elem().String()
_, err := request.tarantoolConnection.Exec(
tarantool.Update(
benchSpaceName,
benchSpacePrimaryIndexName,
[]interface{}{key},
[]tarantool.Op{tarantool.Op(
tarantool.OpAssign(
2,
common.RandomString(request.ctx.DataSize)))}))
request.results.incrementRequestsCounters(err)
}
}
}

// getNext return next operation in operations sequence.
func (requestsSequence *RequestsSequence) getNext() Request {
// If at the moment the number of remaining requests = 0,
// then find a new generator, which requests count > 0.
// If new generator has requests count = 0, then repeat.
requestsSequence.findNewRequestsGeneratorMutex.Lock()
for requestsSequence.currentCounter == 0 {
// Increase the index, which means logical switching to a new generator.
requestsSequence.currentRequestIndex++
requestsSequence.currentRequestIndex %= len(requestsSequence.requests)
// Get new generator by index.
nextRequestsGenerator := requestsSequence.requests[requestsSequence.currentRequestIndex]
// Get requests count for new operation.
requestsSequence.currentCounter = nextRequestsGenerator.count
}
// Logical taking of a single request.
requestsSequence.currentCounter--
requestsSequence.findNewRequestsGeneratorMutex.Unlock()
return requestsSequence.requests[requestsSequence.currentRequestIndex].request
}
83 changes: 83 additions & 0 deletions cli/bench/space.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package bench

import (
bctx "context"
"fmt"
"reflect"
"sync"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/common"
"github.com/tarantool/cartridge-cli/cli/context"
)

// createBenchmarkSpace creates benchmark space with formatting and primary index.
Expand Down Expand Up @@ -54,3 +58,82 @@ func dropBenchmarkSpace(tarantoolConnection *tarantool.Connection) error {
}
return nil
}

// fillBenchmarkSpace fills benchmark space
// with a PreFillingCount number of records
// using connectionPool for fast filling.
func fillBenchmarkSpace(ctx context.BenchCtx, connectionPool []*tarantool.Connection) (int, error) {
var insertMutex sync.Mutex
var waitGroup sync.WaitGroup
filledCount := 0
errorChan := make(chan error, ctx.Connections)
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())

for i := 0; i < ctx.Connections; i++ {
waitGroup.Add(1)
go func(tarantoolConnection *tarantool.Connection) {
defer waitGroup.Done()
for filledCount < ctx.PreFillingCount && len(errorChan) == 0 {
select {
case <-backgroundCtx.Done():
return
default:
// Lock mutex for checking extra iteration and increment counter.
insertMutex.Lock()
if filledCount == ctx.PreFillingCount {
insertMutex.Unlock()
return
}
filledCount++
insertMutex.Unlock()
_, err := tarantoolConnection.Exec(tarantool.Insert(
benchSpaceName,
[]interface{}{
common.RandomString(ctx.KeySize),
common.RandomString(ctx.DataSize),
},
))
if err != nil {
fmt.Println(err)
errorChan <- err
return
}
}
}
}(connectionPool[i])
}

// Thread for checking error in channel.
go func() {
for {
select {
case <-backgroundCtx.Done():
return
default:
if len(errorChan) > 0 {
// Stop "insert" threads
cancel()
return
}
}
}
}()

waitGroup.Wait()
// Stop all threads
// If "error" thread stopped others "insert" threads, "error" thread stops itself
// If "insert" threads successfully completed, then need to stop "error" thread
cancel()

// Check if we have an error
if len(errorChan) > 0 {
err := <-errorChan
close(errorChan)
return filledCount, fmt.Errorf(
"Error during space pre-filling: %s.",
err.Error())
}
close(errorChan)

return filledCount, nil
}
Loading

0 comments on commit f2dd922

Please sign in to comment.