Skip to content

Commit

Permalink
Merge pull request #217 from localshred/abrandoned/ping_port_health_c…
Browse files Browse the repository at this point in the history
…heck

Abrandoned/ping port health check
  • Loading branch information
abrandoned committed Sep 13, 2014
2 parents c4f1bad + 6df180f commit 7aafd57
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 23 deletions.
25 changes: 24 additions & 1 deletion lib/protobuf/rpc/connectors/zmq.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'thread_safe'
require 'protobuf/rpc/connectors/base'
require 'protobuf/rpc/service_directory'

Expand Down Expand Up @@ -30,6 +31,10 @@ def self.zmq_context
@zmq_contexts[Process.pid]
end

def self.ping_port_responses
@ping_port_responses ||= ::ThreadSafe::Cache.new
end

##
# Instance methods
#
Expand Down Expand Up @@ -105,7 +110,7 @@ def error?
# to the host and port in the options
#
def lookup_server_uri
5.times do
15.times do
service_directory.all_listings_for(service).each do |listing|
host = listing.try(:address)
port = listing.try(:port)
Expand All @@ -115,13 +120,31 @@ def lookup_server_uri
host = options[:host]
port = options[:port]
return "tcp://#{host}:#{port}" if host_alive?(host)

sleep (5.0/100.0)
end

raise "Host not found for service #{service}"
end

def host_alive?(host)
return true unless ping_port_enabled?

if (last_response = self.class.ping_port_responses[host])
if (Time.now.to_i - last_response[:at]) <= 2
return last_response[:ping_port_open]
end
end

ping_port_open = ping_port_open?(host)
self.class.ping_port_responses[host] = {
:at => Time.now.to_i,
:ping_port_open => ping_port_open
}
ping_port_open
end

def ping_port_open?(host)
socket = TCPSocket.new(host, ping_port.to_i)
socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, [1,0].pack('ii'))
Expand Down
29 changes: 10 additions & 19 deletions lib/protobuf/rpc/servers/zmq/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,17 @@ def run
@idle_workers = []

loop do
unless local_queue.empty?
process_local_queue
end

process_local_queue
rc = @poller.poll(500)

# The server was shutdown and no requests are pending
break if rc == 0 && !running?

# Something went wrong
break if rc == -1

@poller.readables.each do |readable|
case readable
when @frontend_socket
process_frontend
when @backend_socket
process_backend
end
end
process_backend if @poller.readables.include?(@backend_socket)
process_local_queue # Fair ordering so queued requests get in before new requests
process_frontend if @poller.readables.include?(@frontend_socket)
end
ensure
teardown
Expand Down Expand Up @@ -107,15 +98,15 @@ def process_frontend

if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE
if local_queue.size < local_queue_max_size
write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
else
write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])
write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])
end
else
if @idle_workers.empty?
local_queue.push([address, "", message ] + frames)
local_queue << [address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message ].concat(frames)
else
write_to_backend([@idle_workers.shift, ""] + [address, "", message ] + frames)
write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message ]).concat(frames))
end
end
end
Expand All @@ -124,7 +115,7 @@ def process_local_queue
return if local_queue.empty?
return if @idle_workers.empty?

write_to_backend([@idle_workers.shift, ""] + local_queue.pop)
write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat(local_queue.shift))
process_local_queue
end

Expand All @@ -143,7 +134,7 @@ def read_from_frontend
def teardown
@frontend_socket.try(:close)
@backend_socket.try(:close)
@zmq_context.try(:terminate)
@zmq_context.try(:terminate) unless inproc?
end

def write_to_backend(frames)
Expand Down
1 change: 1 addition & 0 deletions lib/protobuf/rpc/servers/zmq/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Zmq
CHECK_AVAILABLE_MESSAGE = "\3"
NO_WORKERS_AVAILABLE = "\4"
WORKERS_AVAILABLE = "\5"
EMPTY_STRING = ""

module Util
include ::Protobuf::Logging
Expand Down
4 changes: 2 additions & 2 deletions lib/protobuf/rpc/servers/zmq/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def process_request

gc_pause do
encoded_response = handle_request(data)
write_to_backend([client_address, "", encoded_response])
write_to_backend([client_address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, encoded_response])
end
end

Expand Down Expand Up @@ -93,7 +93,7 @@ def read_from_backend

def teardown
@backend_socket.try(:close)
@zmq_context.try(:terminate)
@zmq_context.try(:terminate) unless inproc?
end

def write_to_backend(frames)
Expand Down
2 changes: 1 addition & 1 deletion lib/protobuf/rpc/service_directory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def reset
end

def run
sweep_interval = 1 # sweep expired listings every 1 second
sweep_interval = 5 # sweep expired listings every 5 seconds
next_sweep = Time.now.to_i + sweep_interval

loop do
Expand Down
1 change: 1 addition & 0 deletions protobuf.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require "protobuf/version"
s.add_dependency 'activesupport', '>= 3.2'
s.add_dependency 'middleware'
s.add_dependency 'thor'
s.add_dependency 'thread_safe'

s.add_development_dependency 'ffi-rzmq'
s.add_development_dependency 'pry-nav'
Expand Down

0 comments on commit 7aafd57

Please sign in to comment.