-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathclient.go
263 lines (233 loc) · 6.52 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// Copyright 2015 The rpcmq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpcmq
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/streadway/amqp"
)
// A Client is an RPC client, which is used to invoke remote procedures.
type Client struct {
msgsName string
repliesName string
exchangeName string
exchangeKind string
ac *amqpClient
msgsQueue amqp.Queue
repliesQueue amqp.Queue
mandatory bool
deliveries <-chan amqp.Delivery
results chan Result
// DeliveryMode allows to configure the delivery mode followed by the
// broker. The default mode is Persistent.
DeliveryMode DeliveryMode
// TLSConfig allows to configure the TLS parameters used to connect to
// the broker via amqps.
TLSConfig *tls.Config
}
// A Result contains the data returned by the invoked procedure or an error
// message, in case that it finished with error. The UUID allows to link the
// result with the procedure call.
type Result struct {
UUID string
Data []byte
Err string
}
type rpcMsg struct {
Method string
Data []byte
}
// NewClient returns a reference to a Client object. The paremeter uri is the
// network address of the broker and msgsQueue/repliesQueue are the names of
// queues that will be created to exchange the messages between clients and
// servers. On the other hand, the parameters exchange and kind determine the
// type of exchange that will be created. In fanout mode the queue name is
// ignored, so each queue has its own unique id.
func NewClient(uri, msgsQueue, repliesQueue, exchange, kind string) *Client {
// In fanout mode queue names must be unique. Also, the routing key is
// not really used, so using the mandatory flag does not make sense.
var mandatory bool
if kind == "fanout" {
msgsQueue = "" // in fanout mode queue names must be unique
mandatory = false
} else {
mandatory = true
}
c := &Client{
msgsName: msgsQueue,
repliesName: repliesQueue,
exchangeName: exchange,
exchangeKind: kind,
mandatory: mandatory,
ac: newAmqpClient(uri),
results: make(chan Result),
DeliveryMode: Persistent,
}
c.ac.setupFunc = c.setup
return c
}
// Init initializes the Client object. It establishes the connection with the
// broker, creating a channel and the queues that will be used under the hood.
func (c *Client) Init() error {
c.ac.tlsConfig = c.TLSConfig
if err := c.ac.init(); err != nil {
return err
}
return c.setup()
}
func (c *Client) setup() error {
err := c.ac.channel.ExchangeDeclare(
c.exchangeName, // name
c.exchangeKind, // kind
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("ExchangeDeclare: %v", err)
}
if c.exchangeKind != "fanout" {
// We should create the queue only in non-fanout mode
c.msgsQueue, err = c.ac.channel.QueueDeclare(
c.msgsName, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("QueueDeclare: %v", err)
}
err = c.ac.channel.QueueBind(
c.msgsQueue.Name, // name
c.msgsQueue.Name, // key
c.exchangeName, // exchange
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("QueueBind: %v", err)
}
}
c.repliesQueue, err = c.ac.channel.QueueDeclare(
c.repliesName, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("QueueDeclare: %v", err)
}
c.ac.consumerTag, err = uuid()
if err != nil {
return fmt.Errorf("UUID: %v", err)
}
c.deliveries, err = c.ac.channel.Consume(
c.repliesQueue.Name, // name
c.ac.consumerTag, // consumer
false, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("Queue Consume: %v", err)
}
go c.getDeliveries()
return nil
}
func (c *Client) getDeliveries() {
for d := range c.deliveries {
if d.CorrelationId == "" {
d.Nack(false, false) // drop message
logf("dropped message: %+v", d)
continue
}
var r Result
if err := json.Unmarshal(d.Body, &r); err != nil {
d.Nack(false, false) // drop message
logf("dropped message: %+v", d)
continue
}
c.results <- r
d.Ack(false)
}
c.ac.done <- true
}
// Shutdown shuts down the client gracefully. Using this method will ensure
// that all replies sent by the RPC servers to the client will be received by
// the latter.
func (c *Client) Shutdown() {
c.ac.shutdown()
}
// Call invokes the remote procedure specified by the parameter method, being
// the parameter data the input passed to it. On the other hand, ttl is the
// time that this task will remain in the queue before being considered dead.
// The returned id can be used to identify the result corresponding to each
// invokation. If ttl is 0, the message will not expire.
func (c *Client) Call(method string, data []byte, ttl time.Duration) (id string, err error) {
id, err = uuid()
if err != nil {
return "", fmt.Errorf("UUID: %v", err)
}
msg := &rpcMsg{
Method: method,
Data: data,
}
body, err := json.Marshal(msg)
if err != nil {
return "", fmt.Errorf("Marshal: %v", err)
}
expiration := ""
if ttl > 0 {
expiration = fmt.Sprintf("%d", int64(ttl.Seconds()*1000))
}
// guarantee that the received ack/nack corresponds with this publishing
c.ac.mu.Lock()
defer c.ac.mu.Unlock()
err = c.ac.channel.Publish(
c.exchangeName, // exchange
c.msgsName, // key
c.mandatory, // mandatory
false, // immediate
amqp.Publishing{ // msg
CorrelationId: id,
ReplyTo: c.repliesQueue.Name,
ContentType: "application/json",
Body: body,
DeliveryMode: uint8(c.DeliveryMode),
Expiration: expiration,
},
)
if err != nil {
return "", err
}
select {
case _, ok := <-c.ac.acks:
if ok {
return id, nil
}
case tag, ok := <-c.ac.nacks:
if ok {
logf("nack recived (%v)", tag)
return "", errors.New("nack received")
}
}
logf("missing ack/nack")
return "", errors.New("missing ack/nack")
}
// Results returns a channel used to receive the results returned by the
// invoked procedures.
func (c *Client) Results() <-chan Result {
return (<-chan Result)(c.results)
}