-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueues.js
109 lines (94 loc) · 2.69 KB
/
queues.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
const amqp = require('amqplib/callback_api');
const submitPublish = require('./submitPublish');
const submitRetract = require('./submitRetract');
/**
* Important: mqConfig channels get set in the async calls.
*/
let mqConfig = {
"exchange" : "editor_doc_publish",
"IRI_Q": {
"key": "iriQ",
"channel": {}
},
"STATUS_Q": {
"key": "statusQ",
"channel": {}
}
}
function bail(err) {
console.error(err);
process.exit(1);
}
function getExchange() {
return mqConfig.exchange;
}
function getChannel(qName) {
return mqConfig[qName].channel;
}
function setChannel(qName, ch) {
mqConfig[qName].channel = ch;
}
function getQKey(qName) {
return mqConfig[qName].key;
}
// Publisher
function publisherStatusQ(conn) {
const qName = 'STATUS_Q'; //Will be changed to STATUS_Q
const ex = getExchange();
const key = getQKey(qName);
conn.createChannel(onOpen);
function onOpen(err, channel) {
if (err != null) bail(err);
setChannel(qName, channel);
channel.assertExchange(ex, 'direct', {durable: true});
console.log(" %s publisher channel opened", qName);
//Test Message
// let msg = 'Hello World!';
// let msg = "/akn/ke/act/legge/1970-06-03/Cap_44/eng@/!main";
// channel.publish(ex, key, new Buffer(msg));
// console.log(" [x] Sent %s: '%s'", key, msg);
}
}
// Dispatches the IRI to the function based on action : publish or retract
function dispatch(qObj) {
const {action} = qObj;
if (action === 'publish') {
submitPublish.toPortal(qObj);
} else if (action === 'retract') {
submitRetract.toPortal(qObj);
}
}
// Consumer
function consumerIriQ(conn) {
const qName = 'IRI_Q';
const ex = getExchange();
const key = getQKey(qName);
conn.createChannel(onOpen);
function onOpen(err, channel) {
if (err != null) bail(err);
channel.assertExchange(ex, 'direct', {durable: true});
channel.assertQueue('editor_iri_q', {exclusive: false, durable: true}, function(err, q) {
console.log(" %s consumer channel opened.", qName);
console.log(' [*] Waiting for messages. To exit press CTRL+C');
channel.bindQueue(q.queue, ex, key);
channel.consume(q.queue, function(msg) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
dispatch(JSON.parse(msg.content.toString()));
}, {noAck: true});
//For standalone testing only
// publisherStatusQ(conn);
});
}
}
const rabbit = amqp.connect('amqp://localhost', function(err, conn) {
console.log(" AMQP CONNECTED");
if (err != null) bail(err);
consumerIriQ(conn);
publisherStatusQ(conn);
});
module.exports = {
rabbit: rabbit,
getExchange: getExchange,
getChannel: getChannel,
getQKey: getQKey
};