-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
493 lines (493 loc) · 23.9 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
'use strict';
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
// Import Axios XHR client
var axios = require('axios');
// Import Moment.js
var moment = require('moment-timezone');
// RFC3339 Time format used by Appengine/Datastore
var rfc3339 = 'YYYY-MM-DDTHH:mm:ssZ';
// Stores the last time block addresses were queries for in updateBloom
var blockAddressQueriedAt = null;
// Hanzo Ethereum Webhook
var ethereumWebhook = 'https://api.hanzo.io/ethereum/webhook';
var ethereumWebhookPassword = '3NRD2H3EbnrX4fFPBvHqUxsQjMMdVpbGXRn2jFggnq66bEEczjF3GK4r66JX3veY6WJUrxCSpB2AKsNRBHuDTHZkXBrY258tCpa4xMJPnyrCh5dZaPD5TvCC8BSHgEeMwkaN6Vgcme783fFBeS9eY88NpAgH84XbLL5W5AXahLa2ZSJy4VT8nkRVpSNPE32KGE4Jp3uhuHPUd7eKdYjrX9x8aukgQKtuyCNKdxhh4jw8ZzYZ2JUbgMmTtjduFswc';
// This function updates a bloom filter with new addresses
function updateBloom(bloom, datastore, network) {
return __awaiter(this, void 0, void 0, function* () {
// Query all the blockaddresses
var query = datastore.createQuery('blockaddress').filter('Type', '=', network);
if (blockAddressQueriedAt) {
query = query.filter('CreatedAt', '>=', blockAddressQueriedAt);
console.log(`Checking Addresses Created After '${blockAddressQueriedAt}'`);
}
console.log(`Start Getting '${network}' Block Addresses`);
// Get all the results
var [results, qInfo] = (yield datastore.runQuery(query));
console.log(`Found ${results.length} Block Addresses`);
console.log('Additional Query Info:\n', JSON.stringify(qInfo));
blockAddressQueriedAt = moment().toDate();
// Start building the bloom filter from the results
for (var result of results) {
console.log(`Adding BlockAddress ${result.Address} to Bloom Filter`);
bloom.add(result.Address);
}
});
}
// function strip0x(str) {
// return str.replace(/^0x/, '')
// }
// This function converts an array into a Datastore compatible array
function toDatastoreArray(array, type) {
var values = array.map((x) => {
var y = {};
y[`${type}Value`] = x;
return y;
});
return {
values: values
};
}
function saveReadingBlock(datastore, network, result) {
var createdAt = moment().toDate();
// Convert to the Go Compatible Datastore Representation
var id = `${network}/${result.number}`;
var data = {
Id_: id,
EthereumBlockNumber: result.number,
EthereumBlockHash: result.hash,
EthereumBlockParentHash: result.parentHash,
EthereumBlockNonce: result.nonce,
EthereumBlockSha3Uncles: result.sha3Uncles,
EthereumBlockLogsBloom: result.logsBloom,
EthereumBlockTransactionsRoot: result.transactionsRoot,
EthereumBlockStateRoot: result.stateRoot,
EthereumBlockMiner: result.miner,
EthereumBlockDifficulty: result.difficulty.toString(10),
EthereumBlockTotalDifficulty: result.totalDifficulty.toString(10),
EthereumBlockExtraData: result.extraData,
EthereumBlockSize: result.size,
EthereumBlockGasLimit: result.gasLimit,
EthereumBlockGasUsed: result.gasUsed,
EthereumBlockTimeStamp: result.timestamp,
EthereumBlockUncles: toDatastoreArray(result.uncles, 'string'),
Type: network,
// Disabled because we aren't running the pending/confirmed code for blocks
// to save calls
// Status: "reading",
UpdatedAt: createdAt,
CreatedAt: createdAt,
};
console.log(`Saving New Block #${id} In Reading Status`);
// Save the data to the key
return [id, data, datastore.save({
key: datastore.key(['block', id]),
data: data,
}).then((result) => {
console.log(`Reading Block #${data.EthereumBlockNumber} Saved:\n`, JSON.stringify(result));
console.log(`Issuing New Block #${data.EthereumBlockNumber} Webhook Event`);
return axios.post(ethereumWebhook, {
name: 'block.reading',
type: network,
password: ethereumWebhookPassword,
dataId: data.Id_,
dataKind: 'block',
data: data,
}).then((result) => {
console.log(`Successfully Issued New Block #${data.EthereumBlockNumber} Webhook Event`);
}).catch((error) => {
console.log(`Error Issuing New Block #${data.EthereumBlockNumber} Webhook Event:\n`, error);
});
}).catch((error) => {
console.log(`Error Saving New Block #${data.EthereumBlockNumber}:\n`, error);
})];
}
function updatePendingBlock(datastore, data) {
console.log(`Updating Reading Block #'${data.Id_}' To Pending Status`);
// Update the block status to pending
data.Status = 'pending';
data.UpdatedAt = moment().toDate();
// Save the data to the key
return datastore.save({
key: datastore.key(['block', data.Id_]),
data: data,
}).then((result) => {
console.log(`Pending Block #${data.EthereumBlockNumber} Updated:\n`, JSON.stringify(result));
console.log(`Issuing Pending Block #${data.EthereumBlockNumber} Webhook Event`);
return axios.post(ethereumWebhook, {
name: 'block.pending',
type: data.Type,
password: ethereumWebhookPassword,
dataId: data.Id_,
dataKind: 'block',
data: data,
}).then((result) => {
console.log(`Successfully Issued Pending Block #${data.EthereumBlockNumber} Webhook Event`);
}).catch((error) => {
console.log(`Error Issuing Pending Block #${data.EthereumBlockNumber} Webhook Event:\n`, error);
});
}).catch((error) => {
console.log(`Error Updating Reading Block #${data.EthereumBlockNumber}:\n`, error);
});
}
function getAndUpdateConfirmedBlock(datastore, network, number, confirmations) {
var id = `${network}/${number}`;
var key = datastore.key(['block', id]);
console.log(`Fetching Pending Block #'${number}'`);
// Get the pending block to confirm
return datastore.get(key).then((result) => {
var [data] = result;
if (!data) {
console.log(`Pending Block #${number} Not Found`);
return;
}
data.Confirmations = confirmations;
data.UpdatedAt = moment().toDate();
data.Status = 'confirmed';
console.log(`Updating Pending Block #${number} To Confirmed Status`);
// Save the data to the key
return datastore.save({
key: key,
data: data,
}).then((result) => {
console.log(`Confirmed Block #${data.EthereumBlockNumber} Updated:\n`, JSON.stringify(result));
console.log(`Issuing Confirmed Block #${data.EthereumBlockNumber} Webhook Event`);
return axios.post(ethereumWebhook, {
name: 'block.confirmed',
type: network,
password: ethereumWebhookPassword,
dataId: data.Id_,
dataKind: 'block',
data: data,
}).then((result) => {
console.log(`Successfully Issued Confirmed Block #${data.EthereumBlockNumber} Webhook Event`);
}).catch((error) => {
console.log(`Error Issuing Confirmed Block #${data.EthereumBlockNumber} Webhook Event:\n`, error);
});
}).catch((error) => {
console.log(`Error Saving Confirmed Block #${data.EthereumBlockNumber}:\n`, error);
});
}).catch((error) => {
console.log(`Error Getting Pending Block #${number}:\n`, error);
});
}
function savePendingBlockTransaction(datastore, transaction, network, address, usage) {
var query = datastore.createQuery('blockaddress').filter('Type', '=', network).filter('Address', '=', address);
console.log(`Checking If Address ${address} Is Being Watched`);
// Get all the results
return datastore.runQuery(query).then((resultsAndQInfo) => {
var [results, qInfo] = resultsAndQInfo;
if (!results || !results[0]) {
console.log(`Address ${address} Not Found:\n`, qInfo);
return;
}
var createdAt = moment().toDate();
// Convert to the Go Compatible Datastore Representation
var id = `${network}/${address}/${transaction.hash}`;
var data = {
Id_: id,
EthereumTransactionHash: transaction.hash,
EthereumTransactionNonce: transaction.nonce,
EthereumTransactionBlockHash: transaction.blockHash,
EthereumTransactionBlockNumber: transaction.blockNumber,
EthereumTransactionTransactionIndex: transaction.transactionIndex,
EthereumTransactionFrom: transaction.from,
EthereumTransactionTo: transaction.to,
EthereumTransactionValue: transaction.value.toString(10),
EthereumTransactionGasPrice: transaction.gasPrice.toString(10),
EthereumTransactionGas: transaction.gas.toString(10),
EthereumTransactionInput: transaction.input,
Address: address,
Usage: usage,
Type: network,
Status: 'pending',
UpdatedAt: createdAt,
CreatedAt: createdAt,
};
console.log(`Saving New Block Transaction with Id '${id}' In Pending Status`);
// Save the data to the key
return datastore.save({
key: datastore.key(['blocktransaction', id]),
data: data,
}).then((result) => {
console.log(`Pending Block Transaction ${transaction.hash} Saved:\n`, JSON.stringify(result));
console.log(`Issuing Pending Block Transaction ${transaction.hash} Webhook Event`);
return axios.post(ethereumWebhook, {
name: 'blocktransaction.pending',
type: network,
password: ethereumWebhookPassword,
dataId: data.Id_,
dataKind: 'blocktransaction',
data: data,
}).then((result) => {
console.log(`Successfully Issued Pending Block Transaction ${transaction.hash} Webhook Event`);
}).catch((error) => {
console.log(`Error Issuing Pending Block Transaction ${transaction.hash} Webhook Event:\n`, error);
});
}).catch((error) => {
console.log(`Error Saving New Block Transaction ${transaction.hash}:\n`, error);
});
}).catch((error) => {
console.log(`Address ${address} Not Found Due to Error:\n`, error);
});
}
function getAndUpdateConfirmedBlockTransaction(web3, datastore, network, number, confirmations) {
var query = datastore.createQuery('blocktransaction').filter('Type', '=', network).filter('EthereumTransactionBlockNumber', '=', number);
console.log(`Fetching Pending Block Transactions From Block #${number}`);
// Get all the results
return datastore.runQuery(query).then((resultsAndQInfo) => {
var [results, qInfo] = resultsAndQInfo;
if (!results || !results.length) {
console.log(`Block #${number} Has No Block Transactions:\n`, qInfo);
return;
}
// Loop over the blocks
var ps = results.map((transaction) => {
var id = transaction.Id_;
var key = datastore.key(['blocktransaction', id]);
console.log(`Fetching Pending Block Transaction '${transaction.EthereumTransactionHash}' Receipt`);
return new Promise((resolve, reject) => {
web3.eth.getTransactionReceipt(transaction.EthereumTransactionHash, (error, receipt) => {
console.log(error, JSON.stringify(receipt));
if (error) {
return reject(error);
}
transaction.EthereumTransactionReceiptBlockHash = receipt.blockHash;
transaction.EthereumTransactionReceiptBlockNumber = receipt.blockNumber;
transaction.EthereumTransactionReceiptTransactionHash = receipt.transactionHash;
transaction.EthereumTransactionReceiptTransactionIndex = receipt.transactionIndex;
transaction.EthereumTransactionReceiptFrom = receipt.from;
transaction.EthereumTransactionReceiptTo = receipt.to;
transaction.EthereumTransactionReceiptCumulativeGasUsed = receipt.cumulativeGasUsed;
transaction.EthereumTransactionReceiptGasUsed = receipt.gasUsed;
transaction.EthereumTransactionReceiptContractAddress = receipt.contractAddress;
transaction.Confirmations = confirmations;
transaction.UpdatedAt = moment().toDate();
transaction.Status = 'confirmed';
console.log(`Updating Pending Block Transaction with Id '${id}' To Confirmed Status`);
return resolve(datastore.save({
key: key,
data: transaction,
}).then((result) => {
console.log(`Confirmed Block Transaction ${transaction.EthereumTransactionHash} Saved:\n`, JSON.stringify(result));
console.log(`Issuing Confirmed Block Transaction ${transaction.EthereumTransactionHash} Webhook Event`);
return axios.post(ethereumWebhook, {
name: 'blocktransaction.confirmed',
type: network,
password: ethereumWebhookPassword,
dataId: transaction.Id_,
dataKind: 'blocktransaction',
data: transaction,
}).then((result) => {
console.log(`Successfully Issued Confirmed Block Transaction ${transaction.EthereumTransactionHash} Webhook Event`);
}).catch((error) => {
console.log(`Error Issuing Confirmed Block Transaction ${transaction.EthereumTransactionHash} Webhook Event:\n`, error);
});
}).catch((error) => {
console.log(`Error Updating Pending Block Transaction ${transaction.EthereumTransactionHash}:\n`, error);
}));
});
});
});
return Promise.all(ps);
// Save the data to the key
}).catch((error) => {
console.log(`No Block Transactions From for Block #${number} Due To Error:\n`, error);
});
}
// Import the Bloomfilter
var { BloomFilter } = require('bloomfilter');
// Imports the Google Cloud client library
var Datastore = require('@google-cloud/datastore');
// How many confirmations does it take to confirm? (default: 12)
var confirmations = process.env.CONFIRMATIONS || 12;
// How many concurrent blocks can it be processing? (default: 20)
var inflightLimit = process.env.INFLIGHT_LIMIT || 10;
function main() {
return __awaiter(this, void 0, void 0, function* () {
// Initialize the Bloomfilter for a 1*10^-6 error rate for 1 million entries)
var bloom = new BloomFilter(4096 * 4096 * 2, 20);
// Your Google Cloud Platform project ID
var projectId = 'crowdstart-us';
// Instantiates a client
var datastore = Datastore({
projectId: projectId,
namespace: '_blockchains'
});
// Determine ethereum network
var network = (process.env.ENVIRONMENT == 'production') ? 'ethereum' : 'ethereum-ropsten';
// Determine geth/parity node URI
// GETH PROD: 'http://35.193.184.247:13264'
// PARITY PROD: 'http://35.192.92.62:13264'
var nodeURI = (process.env.ENVIRONMENT == 'production') ? 'http://35.193.184.247:13264' : 'http://35.192.74.139:13264';
// Import Web3
var Web3 = require('web3');
console.log('Connecting to', nodeURI);
var web3 = new Web3(new Web3.providers.HttpProvider(nodeURI, 1000000));
// Ensure a connection was actually established
if (!web3.isConnected()) {
console.log('Could Not Connected');
console.log(`Are you running 'sudo geth --cache=1024 --rpc --rpcaddr 0.0.0.0 --rpcport 13264 --syncmode=fast --rpccorsdomain "*" in your geth node?'`);
return;
}
// Report current full block
console.log('Current FullBlock Is', web3.eth.blockNumber);
// Load addresses into bloom filter
console.log(`Starting Reader For '${network}' Using Node '${nodeURI}'`);
console.log('Initializing Bloom Filter');
yield updateBloom(bloom, datastore, network);
console.log('Connected');
// Report Syncing Status
var lastBlockData = {};
web3.eth.isSyncing((isSynced, blockData) => {
if (isSynced) {
console.log('Syncing Complete');
return;
}
if (lastBlockData.currentBlock != blockData.currentBlock) {
console.log(`Currently @ ${blockData.currentBlock}, Syncing From ${blockData.startingBlock} To ${blockData.highestBlock}`);
lastBlockData = blockData;
}
});
var lastBlock = undefined;
// Query to find the latest block read
var query = datastore.createQuery('block').filter('Type', '=', network).order('EthereumBlockNumber', { descending: true }).limit(1);
console.log('Finding Block To Resume At');
// Get all the results
var [results, qInfo] = (yield datastore.runQuery(query));
if (results[0]) {
// console.log(JSON.stringify(results[0]))
lastBlock = results[0].EthereumBlockNumber;
console.log(`Resuming From Block #${lastBlock}`);
}
else {
lastBlock = 'latest';
console.log(`Resuming From 'latest'`);
}
console.log('Additional Query Info:\n', JSON.stringify(qInfo));
console.log('Start Watching For New Blocks');
lastBlock = 2387758;
// Start watching for new blocks
var filter = web3.eth.filter({
// 1892728
fromBlock: lastBlock,
toBlock: 'latest',
});
var lastNumber = lastBlock == 'latest' ? web3.eth.blockNumber : lastBlock - 1;
var currentNumber = lastNumber;
var blockNumber = lastNumber;
var inflight = 0;
function run() {
// Ignore if inflight limit reached or blocknumber reached
if (inflight > inflightLimit || currentNumber >= blockNumber) {
if (currentNumber > blockNumber) {
console.log(`Current Number ${currentNumber} > Block Number ${blockNumber}`);
currentNumber = blockNumber;
}
return;
}
console.log(`\nInflight Requests: ${inflight}\nCurrent Block #${currentNumber}\nTarget Block #${blockNumber}\n`);
inflight++;
currentNumber++;
var number = currentNumber;
console.log(`Fetching New Block #${number}`);
web3.eth.getBlock(number, true, (error, result) => {
if (error) {
console.log(`Error Fetching Block #${number}:\n`, error);
return;
}
// Parity skipped?
if (!result) {
console.log(`Block #${number} returned null, Ancient Block/Parity Issue?`);
inflight--;
return;
}
console.log(`Fetched Block #${result.number}`);
var [_, data, readingBlockPromise] = saveReadingBlock(datastore, network, result);
setTimeout(function () {
return __awaiter(this, void 0, void 0, function* () {
yield updateBloom(bloom, datastore, network);
// Iterate through transactions looking for ones we care about
for (var transaction of result.transactions) {
// Manually fetch
// for(var transactionId of result.transactions) {
// console.log(`Fetching New Block Transaction #${ transactionId }:\n`, error)
// web3.eth.getTransaction(transactionId, (error, transaction) => {
// if (error) {
// console.log(`Error Fetching Block Transaction #${ transaction.hash }:\n`, error)
// return
// }
console.log(`Processing Block Transaction ${transaction.hash}`);
var toAddress = transaction.to;
var fromAddress = transaction.from;
console.log(`Checking Addresses\nTo: ${toAddress}\nFrom: ${fromAddress}`);
if (bloom.test(toAddress)) {
console.log(`Receiver Address ${toAddress}`);
// Do the actual query and fetch
savePendingBlockTransaction(datastore, transaction, network, toAddress, 'receiver');
}
if (bloom.test(fromAddress)) {
console.log(`Sender Address ${fromAddress}`);
// Do the actual query and fetch
savePendingBlockTransaction(datastore, transaction, network, fromAddress, 'sender');
}
// })
}
});
}, 10000);
// Disabled to save calls
// readingBlockPromise.then(()=>{
// return updatePendingBlock(datastore, data)
// }).then(()=> {
// var confirmationBlock = result.number - confirmations
// return Promise.all([
// // getAndUpdateConfirmedBlock(
// // datastore,
// // network,
// // confirmationBlock,
// // confirmations
// // ),
// getAndUpdateConfirmedBlockTransaction(
// web3,
// datastore,
// network,
// confirmationBlock,
// confirmations
// ),
// ])
// })
((result) => {
readingBlockPromise.then(() => {
return new Promise((resolve, reject) => {
setTimeout(function () {
// It is cheaper on calls to just update the blocktransactions instead
var confirmationBlock = result.number - confirmations;
resolve(getAndUpdateConfirmedBlockTransaction(web3, datastore, network, confirmationBlock, confirmations));
inflight--;
}, 12000);
});
});
})(result);
});
}
setInterval(run, 1);
function check() {
web3.eth.getBlockNumber((error, n) => {
if (error) {
console.log(`Error getting blockNumber\n`, error);
return;
}
blockNumber = n;
});
}
setInterval(check, 1000);
});
}
main();
//# sourceMappingURL=index.js.map