Skip to content

Commit

Permalink
Merge branch 'master' into 2-8-stable
Browse files Browse the repository at this point in the history
  • Loading branch information
localshred committed Jun 20, 2013
2 parents 2822455 + ab8d49f commit 80fc96e
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 209 deletions.
11 changes: 10 additions & 1 deletion lib/protobuf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
require 'pp'
require 'stringio'
require 'active_support/core_ext/object/blank'
require 'active_support/core_ext/object/try'
require 'active_support/version'

if ActiveSupport::VERSION::MAJOR > 2
require 'active_support/core_ext/object/try'
else
require 'active_support/core_ext/module/delegation'
require 'active_support/core_ext/kernel/reporting'
require 'active_support/core_ext/try'
end

require 'active_support/inflector'
require 'active_support/json'

Expand Down
6 changes: 1 addition & 5 deletions lib/protobuf/field/float_field.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ module Protobuf
module Field
class FloatField < BaseField
def self.default; 0.0; end
def self.max; 1.0/0; end
def self.min; -1.0/0; end
def max; 1.0/0; end
def min; -1.0/0; end

def wire_type
WireType::FIXED32
Expand All @@ -22,7 +18,7 @@ def encode(value)
end

def acceptable?(val)
(val > min || val < max) rescue false
val.respond_to?(:to_f)
end
end
end
Expand Down
21 changes: 9 additions & 12 deletions lib/protobuf/rpc/connectors/zmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ class Zmq < Base

CLIENT_RETRIES = (ENV['PB_CLIENT_RETRIES'] || 3)

##
# Class Methods
#
def self.zmq_context
@zmq_context ||= ZMQ::Context.new
end

##
# Instance methods
#
Expand Down Expand Up @@ -47,7 +54,6 @@ def log_signature

def close_connection
socket_close
zmq_context_terminate
end

# Establish a request socket connection to the remote rpc_server.
Expand Down Expand Up @@ -106,7 +112,7 @@ def poll_send_data
read_response
return
else
close_connection
socket_close
end
end

Expand Down Expand Up @@ -171,16 +177,7 @@ def socket_close
# an exit block to ensure the context is terminated correctly.
#
def zmq_context
@zmq_context ||= ::ZMQ::Context.new
end

# Terminate the zmq_context (if any).
#
def zmq_context_terminate
log_debug { sign_message("Terminating ZMQ Context") }
@zmq_context.try(:terminate)
@zmq_context = nil
log_debug { sign_message("ZMQ Context terminated") }
self.class.zmq_context
end

def zmq_error_check(return_code, source)
Expand Down
64 changes: 17 additions & 47 deletions lib/protobuf/rpc/servers/zmq/broker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

module Protobuf
module Rpc
module Zmq
Expand All @@ -11,55 +10,33 @@ def initialize(server)
init_zmq_context
init_backend_socket
init_frontend_socket
init_shutdown_socket
init_poller
rescue
teardown
raise
end

def join
@thread.try(:join)
end

def run
@idle_workers = []

catch(:shutdown) do
while @poller.poll > 0
@poller.readables.each do |readable|
case readable
when @frontend_socket
process_frontend
when @backend_socket
process_backend
when @shutdown_socket
throw :shutdown
end
while running?
break if @poller.poll(500) < 0

@poller.readables.each do |readable|
case readable
when @frontend_socket
process_frontend
when @backend_socket
process_backend
end
end
end
ensure
teardown
end

def start
log_debug { sign_message("starting broker") }

@thread = Thread.new { self.run }

self
end

def shutdown_uri
"inproc://#{object_id}"
end

def signal_shutdown
socket = @zmq_context.socket(ZMQ::PAIR)
zmq_error_check(socket.connect(shutdown_uri))
zmq_error_check(socket.send_string ".")
zmq_error_check(socket.close)
def running?
@server.running? || @server.workers.any?
end

private
Expand All @@ -78,12 +55,6 @@ def init_poller
@poller = ZMQ::Poller.new
@poller.register_readable(@frontend_socket)
@poller.register_readable(@backend_socket)
@poller.register_readable(@shutdown_socket)
end

def init_shutdown_socket
@shutdown_socket = @zmq_context.socket(ZMQ::PAIR)
zmq_error_check(@shutdown_socket.bind(shutdown_uri))
end

def init_zmq_context
Expand All @@ -108,21 +79,20 @@ def process_frontend
end

def read_from_backend
[].tap do |frames|
zmq_error_check(@backend_socket.recv_strings(frames))
end
frames = []
zmq_error_check(@backend_socket.recv_strings(frames))
frames
end

def read_from_frontend
[].tap do |frames|
zmq_error_check(@frontend_socket.recv_strings(frames))
end
frames = []
zmq_error_check(@frontend_socket.recv_strings(frames))
frames
end

def teardown
@frontend_socket.try(:close)
@backend_socket.try(:close)
@shutdown_socket.try(:close)
@zmq_context.try(:terminate)
end

Expand Down
81 changes: 36 additions & 45 deletions lib/protobuf/rpc/servers/zmq/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@ class Server
:broadcast_beacons => false
}

attr_accessor :options
attr_accessor :options, :workers

def initialize(options)
@options = DEFAULT_OPTIONS.merge(options)
@workers = []

init_zmq_context
init_beacon_socket if broadcast_beacons?
init_shutdown_socket
init_shutdown_pipe
rescue
teardown
raise
end

def add_worker
@total_workers = total_workers + 1
end

def backend_ip
frontend_ip
end
Expand Down Expand Up @@ -120,7 +124,7 @@ def frontend_uri
end

def maintenance_timeout
1_000 * (next_maintenance - Time.now.to_i)
next_maintenance - Time.now.to_i
end

def next_maintenance
Expand All @@ -131,7 +135,7 @@ def next_maintenance
end

def minimum_timeout
100
0.1
end

def next_beacon
Expand Down Expand Up @@ -173,8 +177,8 @@ def run
start_missing_workers
wait_for_shutdown_signal
broadcast_flatline if broadcast_beacons?
stop_workers
stop_broker unless brokerless?
Thread.pass until reap_dead_workers.empty?
@broker.join unless brokerless?
ensure
@running = false
teardown
Expand All @@ -184,21 +188,6 @@ def running?
!!@running
end

def shutdown_uri
"inproc://#{object_id}"
end

def signal_shutdown
socket = @zmq_context.socket ZMQ::PAIR
zmq_error_check(socket.connect shutdown_uri)
zmq_error_check(socket.send_string ".")
zmq_error_check(socket.close)
end

def start_broker
@broker = ::Protobuf::Rpc::Zmq::Broker.new(self).start
end

def start_missing_workers
missing_workers = total_workers - @workers.size

Expand All @@ -208,26 +197,14 @@ def start_missing_workers
end
end

def start_worker
@workers << ::Protobuf::Rpc::Zmq::Worker.new(self).start
end

def stop
signal_shutdown
end

def stop_broker
@broker.signal_shutdown
@broker.join
end

def stop_workers
@workers.each(&:signal_shutdown)
Thread.pass until reap_dead_workers.empty?
@running = false
@shutdown_w.write('.')
end

def teardown
@shutdown_socket.try(:close)
@shutdown_r.try(:close)
@shutdown_w.try(:close)
@beacon_socket.try(:close)
@zmq_context.try(:terminate)
@last_reaping = @last_beacon = @timeout = nil
Expand Down Expand Up @@ -260,12 +237,9 @@ def uuid
end

def wait_for_shutdown_signal
poller = ZMQ::Poller.new
poller.register_readable(@shutdown_socket)
loop do
break if IO.select([@shutdown_r], nil, nil, timeout)

# If the poller returns 1, a shutdown signal has been received.
# If the poller returns -1, something went wrong.
while poller.poll(timeout) === 0
if reap_dead_workers?
reap_dead_workers
start_missing_workers
Expand All @@ -283,14 +257,31 @@ def init_beacon_socket
@beacon_socket.connect(beacon_ip, beacon_port)
end

def init_shutdown_socket
@shutdown_socket = @zmq_context.socket(ZMQ::PAIR)
zmq_error_check(@shutdown_socket.bind shutdown_uri)
def init_shutdown_pipe
@shutdown_r, @shutdown_w = IO.pipe
end

def init_zmq_context
@zmq_context = ZMQ::Context.new
end

def start_broker
@broker = Thread.new(self) do |server|
::Protobuf::Rpc::Zmq::Broker.new(server).run
end
end

def start_worker
@workers << Thread.new(self) do |server|
begin
::Protobuf::Rpc::Zmq::Worker.new(server).run
rescue => e
message = "Worker failed: #{e.inspect}\n #{e.backtrace.join($/)}"
$stderr.puts(message)
log_error { message }
end
end
end
end
end
end
Expand Down
Loading

0 comments on commit 80fc96e

Please sign in to comment.