Skip to content

Commit

Permalink
Close connection if any queue in the topology fails to init. (#14)
Browse files Browse the repository at this point in the history
Co-authored-by: Philip Pålsson <[email protected]>
  • Loading branch information
emilklasson and Philip Pålsson authored Jun 29, 2020
1 parent 76c5ea6 commit 2969215
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions src/Insurello.RabbitMqClient/MqClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ module MqClient =
| Json of string
| Binary of byte array

[<RequireQualifiedAccess>]
type CorrelationId =
| Generate
| Id of string
Expand Down Expand Up @@ -176,13 +177,8 @@ module MqClient =

| :? System.ArgumentException as ex -> Error ex.Message

let private closeConnection: IConnection -> unit =
fun connection -> if connection.IsOpen then connection.Close() else ()

let private closeConnectionAsync: System.TimeSpan -> IConnection -> unit =
fun waitTimeout connection ->
Async.Start
(async { closeConnection connection }, (new System.Threading.CancellationTokenSource(waitTimeout)).Token)
let private closeConnection: int -> IConnection -> unit =
fun timeout connection -> if connection.IsOpen then connection.Close(timeout) else ()

let private createChannel: ChannelConfig -> ExceptionCallback -> IConnection -> Result<IModel, string> =
fun config exCallback connection ->
Expand Down Expand Up @@ -350,7 +346,7 @@ module MqClient =
let exCallback =
(fun ex context connection ->
logError (ex, "Unhandled exception on channel in context {$c}", context)
closeConnectionAsync (System.TimeSpan.FromSeconds 3.0) connection)
closeConnection 3000 connection)

createChannel
{ withConfirmSelect = true
Expand All @@ -360,15 +356,16 @@ module MqClient =
{ withConfirmSelect = false
prefetchCount = prefetchCount } exCallback connection
|> Result.map (fun rpcChannel ->
Model
{ channelConsumer = AsyncEventingBasicConsumer channel
(connection,
Model
{ channelConsumer = AsyncEventingBasicConsumer channel

rpcConsumer = AsyncEventingBasicConsumer rpcChannel
rpcConsumer = AsyncEventingBasicConsumer rpcChannel

pendingRequests =
System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource>
() })))
|> Result.bind (fun model ->
pendingRequests =
System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource>
() }))))
|> Result.bind (fun (connection, model) ->
let declareAQueue = declareQueue model
let bindAQueue = bindQueueToExchange model

Expand All @@ -381,6 +378,9 @@ module MqClient =
declareAQueue queueTopology
|> Result.bind bindAQueue
|> Result.map consumeAQueue) prevResult) (Ok model)
|> Result.mapError (fun error ->
closeConnection 3000 connection
error)
|> Result.map initReplyQueue)

/// Will publish with confirm.
Expand Down Expand Up @@ -437,8 +437,8 @@ module MqClient =
MessageId = messageId,
CorrelationId =
(match message.CorrelationId with
| Generate -> ""
| Id correlationId -> correlationId),
| CorrelationId.Generate -> ""
| CorrelationId.Id correlationId -> correlationId),
Headers =
(message.Headers
|> Map.map (fun _ v -> v :> obj)
Expand Down

0 comments on commit 2969215

Please sign in to comment.