-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathsimple-benchmark
executable file
·142 lines (108 loc) · 4.48 KB
/
simple-benchmark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/python
import os
import sys
import time
import threading
import argparse
from memsql.common import database
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=None, help="The hostname of the MemSQL node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the MemSQL node to connect to")
parser.add_argument("--user", default="root", help="The user of the MemSQL node to connect to")
parser.add_argument("--password", default="", help="The password of the MemSQL node to connect to")
parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")
parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")
options = parser.parse_args()
HOST = None
PORT = None
LINK_MEMSQL_ADDR = os.getenv("MEMSQL_PORT_3306_TCP_ADDR", "127.0.0.1")
LINK_MEMSQL_PORT = int(os.getenv("MEMSQL_PORT_3306_TCP_PORT", 3306))
TABLE = "tbl"
BATCH_SIZE = 5000
# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))
def get_connection(host=None, port=None, db=options.database):
""" Returns a new connection to the database. """
if host is None:
host = HOST
if port is None:
port = PORT
return database.connect(
host=host,
port=port,
user=options.user,
password=options.password,
database=db)
class InsertWorker(threading.Thread):
""" A simple thread which inserts empty rows in a loop. """
def __init__(self, stopping):
super(InsertWorker, self).__init__()
self.stopping = stopping
self.daemon = True
self.exception = None
def run(self):
with get_connection() as conn:
while not self.stopping.is_set():
conn.execute(QUERY_TEXT)
def test_connection():
try:
with get_connection(db="information_schema") as conn:
conn.ping()
except database.MySQLError:
print("Unable to connect to MemSQL with provided connection details.")
print("Please verify that MemSQL is running @ %s:%s" % (HOST, PORT))
sys.exit(1)
def setup_test_db():
""" Create a database and table for this benchmark to use. """
with get_connection(db="information_schema") as conn:
print('Creating database %s' % options.database)
conn.query('CREATE DATABASE %s' % options.database)
conn.query('USE %s' % options.database)
conn.query('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)
def warmup():
print('Warming up workload')
with get_connection() as conn:
conn.execute(QUERY_TEXT)
def run_benchmark():
""" Run a set of InsertWorkers and record their performance. """
stopping = threading.Event()
workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]
print('Launching %d workers' % options.num_workers)
print('Workload will take approximately %d seconds.' % options.time)
[ worker.start() for worker in workers ]
time.sleep(options.time)
print('Stopping workload')
stopping.set()
[ worker.join() for worker in workers ]
with get_connection() as conn:
count = conn.get("SELECT COUNT(*) AS count FROM %s" % TABLE).count
print("%d rows inserted using %d threads" % (count, options.num_workers))
print("%.1f rows per second" % (count / float(options.time)))
def cleanup():
""" Cleanup the database this benchmark is using. """
try:
with get_connection() as conn:
conn.query('DROP DATABASE IF EXISTS %s' % options.database)
except database.MySQLError:
pass
if __name__ == '__main__':
# try using docker link host/port if the user doesn't specify otherwise
if options.host is None and options.port is None:
try:
with get_connection(host=LINK_MEMSQL_ADDR, port=LINK_MEMSQL_PORT, db="information_schema") as conn:
conn.ping()
options.host = LINK_MEMSQL_ADDR
options.port = LINK_MEMSQL_PORT
except database.MySQLError:
pass
HOST = options.host or "127.0.0.1"
PORT = options.port or 3306
cleanup()
try:
test_connection()
setup_test_db()
warmup()
run_benchmark()
except KeyboardInterrupt:
print("Interrupted... exiting...")