diff --git a/lib/protobuf/rpc/connectors/zmq.rb b/lib/protobuf/rpc/connectors/zmq.rb index a8499b1c..f097c450 100644 --- a/lib/protobuf/rpc/connectors/zmq.rb +++ b/lib/protobuf/rpc/connectors/zmq.rb @@ -1,3 +1,4 @@ +require 'thread_safe' require 'protobuf/rpc/connectors/base' require 'protobuf/rpc/service_directory' @@ -30,6 +31,10 @@ def self.zmq_context @zmq_contexts[Process.pid] end + def self.ping_port_responses + @ping_port_responses ||= ::ThreadSafe::Cache.new + end + ## # Instance methods # @@ -105,7 +110,7 @@ def error? # to the host and port in the options # def lookup_server_uri - 5.times do + 15.times do service_directory.all_listings_for(service).each do |listing| host = listing.try(:address) port = listing.try(:port) @@ -115,6 +120,8 @@ def lookup_server_uri host = options[:host] port = options[:port] return "tcp://#{host}:#{port}" if host_alive?(host) + + sleep (5.0/100.0) end raise "Host not found for service #{service}" @@ -122,6 +129,22 @@ def lookup_server_uri def host_alive?(host) return true unless ping_port_enabled? + + if (last_response = self.class.ping_port_responses[host]) + if (Time.now.to_i - last_response[:at]) <= 2 + return last_response[:ping_port_open] + end + end + + ping_port_open = ping_port_open?(host) + self.class.ping_port_responses[host] = { + :at => Time.now.to_i, + :ping_port_open => ping_port_open + } + ping_port_open + end + + def ping_port_open?(host) socket = TCPSocket.new(host, ping_port.to_i) socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1) socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, [1,0].pack('ii')) diff --git a/lib/protobuf/rpc/servers/zmq/broker.rb b/lib/protobuf/rpc/servers/zmq/broker.rb index 93ef6e5e..5e86148c 100644 --- a/lib/protobuf/rpc/servers/zmq/broker.rb +++ b/lib/protobuf/rpc/servers/zmq/broker.rb @@ -25,26 +25,17 @@ def run @idle_workers = [] loop do - unless local_queue.empty? - process_local_queue - end - + process_local_queue rc = @poller.poll(500) # The server was shutdown and no requests are pending break if rc == 0 && !running? - # Something went wrong break if rc == -1 - @poller.readables.each do |readable| - case readable - when @frontend_socket - process_frontend - when @backend_socket - process_backend - end - end + process_backend if @poller.readables.include?(@backend_socket) + process_local_queue # Fair ordering so queued requests get in before new requests + process_frontend if @poller.readables.include?(@frontend_socket) end ensure teardown @@ -107,15 +98,15 @@ def process_frontend if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE if local_queue.size < local_queue_max_size - write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE]) + write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE]) else - write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE]) + write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE]) end else if @idle_workers.empty? - local_queue.push([address, "", message ] + frames) + local_queue << [address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message ].concat(frames) else - write_to_backend([@idle_workers.shift, ""] + [address, "", message ] + frames) + write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message ]).concat(frames)) end end end @@ -124,7 +115,7 @@ def process_local_queue return if local_queue.empty? return if @idle_workers.empty? - write_to_backend([@idle_workers.shift, ""] + local_queue.pop) + write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat(local_queue.shift)) process_local_queue end @@ -143,7 +134,7 @@ def read_from_frontend def teardown @frontend_socket.try(:close) @backend_socket.try(:close) - @zmq_context.try(:terminate) + @zmq_context.try(:terminate) unless inproc? end def write_to_backend(frames) diff --git a/lib/protobuf/rpc/servers/zmq/util.rb b/lib/protobuf/rpc/servers/zmq/util.rb index af7dcde5..f16bbcfd 100644 --- a/lib/protobuf/rpc/servers/zmq/util.rb +++ b/lib/protobuf/rpc/servers/zmq/util.rb @@ -9,6 +9,7 @@ module Zmq CHECK_AVAILABLE_MESSAGE = "\3" NO_WORKERS_AVAILABLE = "\4" WORKERS_AVAILABLE = "\5" + EMPTY_STRING = "" module Util include ::Protobuf::Logging diff --git a/lib/protobuf/rpc/servers/zmq/worker.rb b/lib/protobuf/rpc/servers/zmq/worker.rb index 6b3d5582..e0c28357 100644 --- a/lib/protobuf/rpc/servers/zmq/worker.rb +++ b/lib/protobuf/rpc/servers/zmq/worker.rb @@ -31,7 +31,7 @@ def process_request gc_pause do encoded_response = handle_request(data) - write_to_backend([client_address, "", encoded_response]) + write_to_backend([client_address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, encoded_response]) end end @@ -93,7 +93,7 @@ def read_from_backend def teardown @backend_socket.try(:close) - @zmq_context.try(:terminate) + @zmq_context.try(:terminate) unless inproc? end def write_to_backend(frames) diff --git a/lib/protobuf/rpc/service_directory.rb b/lib/protobuf/rpc/service_directory.rb index 868e53be..cddc7a53 100644 --- a/lib/protobuf/rpc/service_directory.rb +++ b/lib/protobuf/rpc/service_directory.rb @@ -219,7 +219,7 @@ def reset end def run - sweep_interval = 1 # sweep expired listings every 1 second + sweep_interval = 5 # sweep expired listings every 5 seconds next_sweep = Time.now.to_i + sweep_interval loop do diff --git a/protobuf.gemspec b/protobuf.gemspec index 0071c58a..2e3b254f 100644 --- a/protobuf.gemspec +++ b/protobuf.gemspec @@ -22,6 +22,7 @@ require "protobuf/version" s.add_dependency 'activesupport', '>= 3.2' s.add_dependency 'middleware' s.add_dependency 'thor' + s.add_dependency 'thread_safe' s.add_development_dependency 'ffi-rzmq' s.add_development_dependency 'pry-nav'