-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbcoordinator.cpp
127 lines (111 loc) · 4.48 KB
/
dbcoordinator.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#include "dbcoordinator.hpp"
//#define DEBUG
dbcoordinator::dbcoordinator(int workerNums, int maxKeyNums) : workerNums(workerNums), maxKeyNums(maxKeyNums), keysPerWorker(maxKeyNums/workerNums)
{
int lo = 0;
workers = new dbworker[workerNums];
worker_threads = new pthread_t[workerNums];
for(int i = 0; i < workerNums; i++) {
workers[i].p.ceiling = lo;
workers[i].p.floor = lo + keysPerWorker - 1;
lo += keysPerWorker;
}
}
void dbcoordinator::setupRPC(ExcuteTransaction* rpc_method)
{
dbRegistry = new xmlrpc_c::registry();
xmlrpc_c::methodPtr const ExcuteTransactionP(rpc_method);
dbRegistry->addMethod("db.ExcuteTransaction", ExcuteTransactionP);
dbServer = new xmlrpc_c::serverAbyss(
xmlrpc_c::serverAbyss::constrOpt()
.registryP(dbRegistry)
.portNumber(8080));
}
void dbcoordinator::setupWorkers()
{
long t;
int rc;
for(t=0; t < workerNums; t++){
cout << "Coordinator: creating worker thread " << t << endl;
long i = t;
rc = pthread_create(&worker_threads[t], NULL, dbworker::worker_routine, (void *)(&workers[i]));
if (rc){
cout << "ERROR; return code from pthread_create() is " << rc << endl;
exit(-1);
}
}
}
map<int, TransactionReq> dbcoordinator::mapTransaction(TransactionReq transreq)
{
map<int, TransactionReq> transmap;
for(int i = 0; i < transreq.size(); i++) {
const InMemDB::TransReq::Op &op = transreq.Operation(i);
switch(op.code()) {
case InMemDB::TransReq_Op_OpCode_GET: {
transmap[op.key()/keysPerWorker].addOperation(op.code(), op.key());
break;
}
case InMemDB::TransReq_Op_OpCode_PUT: {
transmap[op.key()/keysPerWorker].addOperation(op.code(), op.key(), op.value());
break;
}
case InMemDB::TransReq_Op_OpCode_GETRANGE: {
for(int j = op.key() / keysPerWorker * keysPerWorker; j <= op.key2(); j += keysPerWorker) {
int begin = j;
int end = j + keysPerWorker - 1;
if(begin < op.key()) begin = op.key();
if(end > op.key2()) end = op.key2();
transmap[j/keysPerWorker].addOperation(op.code(), begin, end);
}
break;
}
default: break;
}
}
return transmap;
}
void ExcuteTransaction::execute(xmlrpc_c::paramList const& paramList,
xmlrpc_c::value * const retvalP) {
TransactionReq transreq = TransactionReq(paramList.getString(0));
paramList.verifyEnd(1);
map<int, TransactionReq> transmap = rpc_method.mapTransaction(transreq);
map<int,TransactionReq>::iterator it;
#ifdef DEBUG
cout << "hit site num: " << transmap.size() << endl;
#endif
for (it=transmap.begin(); it!=transmap.end(); ++it) {
pthread_mutex_lock(&((rpc_method.workers[it->first]).work_mutex));
#ifdef DEBUG
std::cout << "Acquire mutex of worker " <<it->first << ".\n";
#endif
pthread_mutex_lock(&(rpc_method.workers[it->first].buf_mutex));
(rpc_method.workers[it->first]).trans_req = &it->second;
pthread_mutex_unlock(&(rpc_method.workers[it->first].buf_mutex));
}
for (it=transmap.begin(); it!=transmap.end(); ++it) {
#ifdef DEBUG
std::cout << "Wake up worker " <<it->first << ".\n";
#endif
pthread_cond_signal(&(rpc_method.workers[it->first]).work_cv);
}
TransactionRsp transrsp;
for(it=transmap.begin(); it!=transmap.end(); ++it) {
pthread_mutex_lock(&(rpc_method.workers[it->first].buf_mutex));
while(rpc_method.workers[it->first].isComplete == false) {pthread_cond_wait(&(rpc_method.workers[it->first].buf_cv),&(rpc_method.workers[it->first].buf_mutex));}
rpc_method.workers[it->first].isComplete = false;
transrsp.mergeFrom(rpc_method.workers[it->first].trans_rsp);
rpc_method.workers[it->first].trans_rsp.clear();
#ifdef DEBUG
std::cout << "get result from worker:" << it->first << endl;
#endif
pthread_mutex_unlock(&(rpc_method.workers[it->first].buf_mutex));
}
*retvalP = transrsp.toString();
for (it=transmap.begin(); it!=transmap.end(); ++it) {
#ifdef DEBUG
std::cout << "Release mutex of worker " <<it->first << ".\n";
#endif
pthread_mutex_unlock(&((rpc_method.workers[it->first]).work_mutex));
//(rpc_method.workers[it->first]).trans_req = &it->second;
}
}