diff --git a/README.md b/README.md index 93e94a05..00fa0fde 100644 --- a/README.md +++ b/README.md @@ -303,8 +303,7 @@ any other filter calls which would run afterwards, as well as canceling invocation of the service method. Note: You must actually return false, not just a "falsey" value such as nil. -__After Filters__ – There is no request shortcutting since the after -filter runs after the request. Duh. +__After Filters__ – No request shortcutting. #### Filter options @@ -415,6 +414,44 @@ Many different options can be passed to the `.client` call above (such as `:timeout => 600`). See the `lib/protobuf/rpc/client.rb` and `lib/protobuf/rpc/service.rb` files for more documentation. +### Dynamic Discovery (ZMQ Only) +It is possible to setup the RPC server and client in a way that +allows servers to be dynamically discovered by the client. + +#### In the client +```ruby +ServiceDirectory.start do |config| + config.port = 53000 +end + +# If your server also runs this code, it will default to the +# given port when sending beacons and have its own service +# directory. You can prevent this code from running on the +# server if needed: +unless defined? ::Protobuf::CLI + ServiceDirectory.start do |config| + config.port = 53000 + end +end +``` + +#### Starting the server +``` +$ rpc_server --broadcast-beacons --beacon-port 53000 ... +``` + +The client will listen on the specified port for beacons broadcast +by servers. Each beacon includes a list of services provided by the +broadcasting server. The client randomly selects a server for the +desired service each time a request is made. + +__CAUTION:__ When running multiple environments on a single network, +e.g., qa and staging, be sure that each environment is setup with +a unique beacon port; otherwise, clients in one environment _will_ +make requests to servers in the other environment. + +Check out {Protobuf::ServiceDirectory} for more details. + ## 3. RPC Interop The main reason I wrote this gem was to provide a ruby implementation diff --git a/lib/protobuf.rb b/lib/protobuf.rb index 3c8edc90..24523879 100644 --- a/lib/protobuf.rb +++ b/lib/protobuf.rb @@ -15,7 +15,7 @@ module Protobuf # Default is Socket as it has no external dependencies. DEFAULT_CONNECTOR = :socket - module_function + module_function # Client Host # @@ -53,14 +53,14 @@ def self.connector_type=(type) # the Garbage Collector when handling an rpc request. # Once the request is completed, the GC is enabled again. # This optomization provides a huge boost in speed to rpc requests. - def self.gc_pause_server_request? - return @_gc_pause_server_request unless @_gc_pause_server_request.nil? - gc_pause_server_request = false - end + def self.gc_pause_server_request? + return @_gc_pause_server_request unless @_gc_pause_server_request.nil? + gc_pause_server_request = false + end - def self.gc_pause_server_request=(value) - @_gc_pause_server_request = !!value - end + def self.gc_pause_server_request=(value) + @_gc_pause_server_request = !!value + end # Print Deprecation Warnings # @@ -72,14 +72,14 @@ def self.gc_pause_server_request=(value) # ENV['PB_IGNORE_DEPRECATIONS'] to a non-empty value. # # The rpc_server option will override the ENV setting. - def self.print_deprecation_warnings? - return @_print_deprecation_warnings unless @_print_deprecation_warnings.nil? - print_deprecation_warnings = ENV.key?('PB_IGNORE_DEPRECATIONS') ? false : true - end - - def self.print_deprecation_warnings=(value) - @_print_deprecation_warnings = !!value - end + def self.print_deprecation_warnings? + return @_print_deprecation_warnings unless @_print_deprecation_warnings.nil? + print_deprecation_warnings = ENV.key?('PB_IGNORE_DEPRECATIONS') ? false : true + end + + def self.print_deprecation_warnings=(value) + @_print_deprecation_warnings = !!value + end end diff --git a/lib/protobuf/cli.rb b/lib/protobuf/cli.rb index af4e2060..6dc0b909 100644 --- a/lib/protobuf/cli.rb +++ b/lib/protobuf/cli.rb @@ -9,7 +9,7 @@ module Protobuf class CLI < ::Thor include ::Thor::Actions - attr_accessor :runner, :mode, :start_aborted + attr_accessor :runner, :mode default_task :start @@ -29,6 +29,10 @@ class CLI < ::Thor option :evented, :type => :boolean, :aliases => %w(-m), :desc => 'Evented Mode for server and client connections (uses EventMachine).' option :zmq, :type => :boolean, :aliases => %w(-z), :desc => 'ZeroMQ Socket Mode for server and client connections.' + option :beacon_address, :type => :string, :desc => 'Broadcast beacons to this address (defaul: value of ServiceDirectory.address)' + option :beacon_interval, :type => :numeric, :desc => 'Broadcast beacons every N seconds. (default: 5)' + option :beacon_port, :type => :numeric, :desc => 'Broadcast beacons to this port (default: value of ServiceDirectory.port)' + option :broadcast_beacons, :type => :boolean, :desc => 'Broadcast beacons for dynamic discovery (Currently only available with ZeroMQ).' option :debug, :type => :boolean, :default => false, :aliases => %w(-d), :desc => 'Debug Mode. Override log level to DEBUG.' option :gc_pause_request, :type => :boolean, :default => false, :desc => 'Enable/Disable GC pause during request.' option :print_deprecation_warnings, :type => :boolean, :default => nil, :desc => 'Cause use of deprecated fields to be printed or ignored.' @@ -36,26 +40,25 @@ class CLI < ::Thor option :worker_port, :type => :numeric, :default => nil, :desc => "Port for 'backend' where workers connect (defaults to port + 1)" def start(app_file) - debug_say 'Configuring the rpc_server process' - @start_aborted = false + debug_say('Configuring the rpc_server process') configure_logger configure_traps - configure_server_mode - require_protobuf! + configure_runner_mode + create_runner + configure_process_name(app_file) configure_gc configure_deprecation_warnings - run_if_no_abort { require_application!(app_file) } - run_if_no_abort { configure_process_name(app_file) } - run_if_no_abort { start_server! } + require_application(app_file) unless exit_requested? + start_server unless exit_requested? rescue => e - say_and_exit!('ERROR: RPC Server failed to start.', e) + say_and_exit('ERROR: RPC Server failed to start.', e) end desc 'version', 'Print ruby and protoc versions and exit.' def version - say "Ruby Protobuf v#{::Protobuf::VERSION}, protoc v#{::Protobuf::PROTOC_VERSION}" + say("Ruby Protobuf v#{::Protobuf::VERSION}, protoc v#{::Protobuf::PROTOC_VERSION}") end no_tasks do @@ -71,7 +74,7 @@ def configure_deprecation_warnings # If we pause during request we don't need to pause in serialization def configure_gc - debug_say 'Configuring gc' + debug_say('Configuring gc') if defined?(JRUBY_VERSION) # GC.enable/disable are noop's on Jruby @@ -83,7 +86,7 @@ def configure_gc # Setup the protobuf logger. def configure_logger - debug_say 'Configuring logger' + debug_say('Configuring logger') ::Protobuf::Logger.configure({ :file => options.log || STDOUT, :level => options.debug? ? ::Logger::DEBUG : options.level }) @@ -94,100 +97,110 @@ def configure_logger # Re-write the $0 var to have a nice process name in ps. def configure_process_name(app_file) - debug_say 'Configuring process name' - $0 = "rpc_server --#{@mode} #{options.host}:#{options.port} #{app_file}" + debug_say('Configuring process name') + $0 = "rpc_server --#{@runner_mode} #{options.host}:#{options.port} #{app_file}" end # Configure the mode of the server and the runner class. - def configure_server_mode - debug_say 'Configuring runner mode' - if options.zmq? && ! options.evented? && ! options.socket? - server_zmq! - elsif options.evented? && ! options.zmq? && ! options.socket? - server_evented! - elsif (env_server_type = ENV["PB_SERVER_TYPE"]) - case - when env_server_type =~ /zmq/i then - server_zmq! - when env_server_type =~ /socket/i then - server_socket! - when env_server_type =~ /evented/i then - server_evented! + def configure_runner_mode + debug_say('Configuring runner mode') + + if multi_mode? + say('WARNING: You have provided multiple mode options. Defaulting to socket mode.', :yellow) + @runner_mode = :socket + elsif options.zmq? + @runner_mode = :zmq + elsif options.evented? + @runner_mode = :evented + else + case server_type = ENV["PB_SERVER_TYPE"] + when nil, /socket/i + @runner_mode = :socket + when /zmq/i + @runner_mode = :zmq + when /evented/i + @runner_mode = :evented else - say "WARNING: You have provided incorrect option 'PB_SERVER_TYPE=#{env_server_type}'. Defaulting to socket mode.", :yellow - server_socket! + say "WARNING: You have provided incorrect option 'PB_SERVER_TYPE=#{server_type}'. Defaulting to socket mode.", :yellow + @runner_mode = :socket end - else - say 'WARNING: You have provided multiple mode options. Defaulting to socket mode.', :yellow if multi_mode? - server_socket! end end # Configure signal traps. # TODO add signal handling for hot-reloading the application. def configure_traps - debug_say 'Configuring traps' - [:INT, :QUIT, :TERM].each do |signal| - debug_say "Registering signal trap for #{signal}", :blue + debug_say('Configuring traps') + + exit_signals = [:INT, :TERM] + exit_signals << :QUIT unless defined?(JRUBY_VERSION) + + exit_signals.each do |signal| + debug_say("Registering trap for exit signal #{signal}", :blue) + trap(signal) do - ::Protobuf::Logger.info { 'RPC Server shutting down...' } - @start_aborted = true - @runner.stop - ::Protobuf::Logger.info { 'Shutdown complete' } + @exit_requested = true + shutdown_server end end end + # Create the runner for the configured mode + def create_runner + debug_say("Creating #{@runner_mode} runner") + @runner = case @runner_mode + when :evented + create_evented_runner + when :zmq + create_zmq_runner + when :socket + create_socket_runner + else + say_and_exit("Unknown runner mode: #{@runner_mode}") + end + end + # Say something if we're in debug mode. def debug_say(message, color = :yellow) say(message, color) if options.debug? end + def exit_requested? + !!@exit_requested + end + # Internal helper to determine if the modes are multi-set which is not valid. def multi_mode? - (options.zmq? && (options.evented? || options.socket?)) \ - && (options.evented? && (options.evented? || options.socket?)) \ - && (options.zmq? && (options.evented? || options.socket?)) \ + [ + options.zmq?, + options.evented?, + options.socket?, + ].count(true) > 1 end # Require the application file given, exiting if the file doesn't exist. - def require_application!(app_file) - debug_say 'Requiring app file' + def require_application(app_file) + debug_say('Requiring app file') require app_file rescue LoadError => e - say_and_exit!("Failed to load application file #{app_file}", e) + say_and_exit("Failed to load application file #{app_file}", e) end - # Loads protobuf in the given mode, exiting if somehow the mode is wrong. - def require_protobuf! - require "protobuf/#{@mode}.rb" - rescue LoadError => e - puts e.message, *(e.backtrace) - say_and_exit!("Failed to load protobuf runner #{@mode}", e) - end + def runner_options + # Symbolize keys + opt = options.inject({}) { |h, (k, v)| h[k.to_sym] = v; h } + + opt[:workers_only] = (!!ENV['PB_WORKERS_ONLY']) || options.workers_only - def run_if_no_abort - yield unless @start_aborted + opt end - def runner_options - { - :host => options.host, - :port => options.port, - :backlog => options.backlog, - :threshold => options.threshold, - :threads => options.threads, - :worker_port => options.worker_port || (options.port + 1), - :workers_only => !!ENV['PB_WORKERS_ONLY'] || options.workers_only - } - end - - def say_and_exit!(message, exception = nil) + def say_and_exit(message, exception = nil) message = set_color(message, :red) if ::Protobuf::Logger.file == STDOUT ::Protobuf::Logger.error { message } if exception - $stderr.puts "[#{exception.class.name}] #{exception.message}" + $stderr.puts "[#{exception.class.name}] #{exception.message}" $stderr.puts exception.backtrace.join("\n") ::Protobuf::Logger.error { "[#{exception.class.name}] #{exception.message}" } @@ -197,35 +210,42 @@ def say_and_exit!(message, exception = nil) exit(1) end - def server_evented! - @mode = :evented - @runner = ::Protobuf::Rpc::EventedRunner + def create_evented_runner + require 'protobuf/evented' + + @runner = ::Protobuf::Rpc::EventedRunner.new(runner_options) + end + + def create_socket_runner + require 'protobuf/socket' + + @runner = ::Protobuf::Rpc::SocketRunner.new(runner_options) end - def server_socket! - @mode = :socket - @runner = ::Protobuf::Rpc::SocketRunner + def create_zmq_runner + require 'protobuf/zmq' + + @runner = ::Protobuf::Rpc::ZmqRunner.new(runner_options) end - def server_zmq! - @mode = :zmq - @runner = ::Protobuf::Rpc::ZmqRunner + def shutdown_server + ::Protobuf::Logger.info { 'RPC Server shutting down...' } + @runner.try(:stop) + ::Protobuf::Rpc::ServiceDirectory.instance.stop + ::Protobuf::Logger.info { 'Shutdown complete' } end # Start the runner and log the relevant options. - def start_server! - @runner.register_signals + def start_server + debug_say('Running server') - debug_say 'Invoking server start' - @runner.run(runner_options) do - ::Protobuf::Logger.info { - "pid #{::Process.pid} -- #{@mode} RPC Server listening at #{options.host}:#{options.port}" + @runner.run do + ::Protobuf::Logger.info { + "pid #{::Process.pid} -- #{@runner_mode} RPC Server listening at #{options.host}:#{options.port}" } end end - end - end end diff --git a/lib/protobuf/rpc/connectors/base.rb b/lib/protobuf/rpc/connectors/base.rb index e00c3889..eb7a2f52 100644 --- a/lib/protobuf/rpc/connectors/base.rb +++ b/lib/protobuf/rpc/connectors/base.rb @@ -17,7 +17,7 @@ module Connectors :request => nil, # The request object sent by the client :request_type => nil, # The request type expected by the client :response_type => nil, # The response type expected by the client - :timeout => 300, # The default timeout for the request, also handled by client.rb + :timeout => 15, # The default timeout for the request, also handled by client.rb :client_host => nil # The hostname or address of this client } diff --git a/lib/protobuf/rpc/connectors/zmq.rb b/lib/protobuf/rpc/connectors/zmq.rb index 993ac3e1..ac221164 100644 --- a/lib/protobuf/rpc/connectors/zmq.rb +++ b/lib/protobuf/rpc/connectors/zmq.rb @@ -1,23 +1,38 @@ require 'protobuf/rpc/connectors/base' +require 'protobuf/rpc/service_directory' module Protobuf module Rpc module Connectors class Zmq < Base + + ## + # Included Modules + # + include Protobuf::Rpc::Connectors::Common include Protobuf::Logger::LogMethods + ## + # Class Constants + # + + CLIENT_RETRIES = (ENV['PB_CLIENT_RETRIES'] || 3) + + ## + # Instance methods + # + + # Start the request/response cycle. We implement the Lazy Pirate + # req/reply reliability pattern as laid out in the ZMQ Guide, Chapter 4. + # + # @see http://zguide.zeromq.org/php:chapter4#Client-side-Reliability-Lazy-Pirate-Pattern + # def send_request - timeout_wrap do - setup_connection - connect_to_rpc_server - post_init - read_response - end + setup_connection + poll_send_data ensure - @socket.close if @socket - @zmq_context.terminate if @zmq_context - @zmq_context = nil + close_connection end def log_signature @@ -26,45 +41,158 @@ def log_signature private + ## + # Private Instance methods + # + def close_connection - return if @error - zmq_error_check(@socket.close) - zmq_error_check(@zmq_context.terminate) - log_debug { sign_message("Connector closed") } + socket_close + zmq_context_terminate end + # Establish a request socket connection to the remote rpc_server. + # Set the socket option LINGER to 0 so that we don't wait + # for queued messages to be accepted when the socket/context are + # asked to close/terminate. + # def connect_to_rpc_server - return if @error - log_debug { sign_message("Establishing connection: #{options[:host]}:#{options[:port]}") } - @zmq_context = ::ZMQ::Context.new - @socket = @zmq_context.socket(::ZMQ::REQ) - zmq_error_check(@socket.connect("tcp://#{options[:host]}:#{options[:port]}")) - log_debug { sign_message("Connection established #{options[:host]}:#{options[:port]}") } + return if error? + + server_uri = lookup_server_uri + log_debug { sign_message("Establishing connection: #{server_uri}") } + socket.setsockopt(::ZMQ::LINGER, 0) + zmq_error_check(socket.connect(server_uri), :socket_connect) + zmq_error_check(poller.register_readable(socket), :poller_register_readable) + log_debug { sign_message("Connection established to #{server_uri}") } end - # Method to determine error state, must be used with Connector api + # Method to determine error state, must be used with Connector API. + # def error? - !!@error + !! @error + end + + # Lookup a server uri for the requested service in the service + # directory. If the service directory is not running, default + # to the host and port in the options + # + def lookup_server_uri + if service_directory.running? + listing = service_directory.lookup(service) + host, port = listing.address, listing.port if listing + end + + host, port = options[:host], options[:port] unless host && port + + "tcp://#{host}:#{port}" + end + + # Trying a number of times, attempt to get a response from the server. + # If we haven't received a legitimate response in the CLIENT_RETRIES number + # of retries, fail the request. + # + def poll_send_data + return if error? + + poll_timeout = (options[:timeout].to_f / CLIENT_RETRIES.to_f) * 1000 + + CLIENT_RETRIES.times do |n| + connect_to_rpc_server + log_debug { sign_message("Sending Request (attempt #{n + 1}, #{socket})") } + send_data + log_debug { sign_message("Request sending complete (attempt #{n + 1}, #{socket})") } + + if poller.poll(poll_timeout) == 1 + read_response + return + else + close_connection + end + end + + fail(:RPC_FAILED, "The server took longer than #{options[:timeout]} seconds to respond") end + def poller + @poller ||= ::ZMQ::Poller.new + end + + # Read the string response from the available readable. This will be + # the current @socket. Calls `parse_response` to invoke the success or + # failed callbacks, depending on the state of the communication + # and response data. + # def read_response - return if @error + return if error? + @response_data = '' - zmq_error_check(@socket.recv_string(@response_data)) + zmq_error_check(socket.recv_string(@response_data), :socket_recv_string) + parse_response end + # Send the request data to the remote rpc_server. + # def send_data - return if @error - log_debug { sign_message("Sending Request: #{@request_data}") } + return if error? + @stats.request_size = @request_data.size - zmq_error_check(@socket.send_string(@request_data)) - log_debug { sign_message("write closed") } + zmq_error_check(socket.send_string(@request_data), :socket_send_string) + end + + # The service we're attempting to connect to + # + def service + options[:service] + end + + # Alias for ::Protobuf::Rpc::ServiceDirectory.instance + def service_directory + ::Protobuf::Rpc::ServiceDirectory.instance + end + + # Setup a ZMQ request socket in the current zmq context. + # + def socket + @socket ||= zmq_context.socket(::ZMQ::REQ) end - def zmq_error_check(return_code) - raise "Last API call failed at #{caller(1)}" unless return_code >= 0 + def socket_close + if socket + log_debug { sign_message("Closing Socket") } + zmq_error_check(socket.close, :socket_close) + log_debug { sign_message("Socket closed") } + @socket = nil + end + end + + # Return the ZMQ Context to use for this process. + # If the context does not exist, create it, then register + # an exit block to ensure the context is terminated correctly. + # + def zmq_context + @zmq_context ||= ::ZMQ::Context.new + end + + # Terminate the zmq_context (if any). + # + def zmq_context_terminate + log_debug { sign_message("Terminating ZMQ Context") } + @zmq_context.try(:terminate) + @zmq_context = nil + log_debug { sign_message("ZMQ Context terminated") } end + + def zmq_error_check(return_code, source) + unless ::ZMQ::Util.resultcode_ok?(return_code || -1) + raise <<-ERROR + Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}". + + #{caller(1).join($/)} + ERROR + end + end + end end end diff --git a/lib/protobuf/rpc/dynamic_discovery.pb.rb b/lib/protobuf/rpc/dynamic_discovery.pb.rb new file mode 100644 index 00000000..51d0d79e --- /dev/null +++ b/lib/protobuf/rpc/dynamic_discovery.pb.rb @@ -0,0 +1,49 @@ +## +# This file is auto-generated. DO NOT EDIT! +# +require 'protobuf/message' + +module Protobuf + + module Rpc + + module DynamicDiscovery + + ## + # Enum Classes + # + class BeaconType < ::Protobuf::Enum + define :HEARTBEAT, 0 + define :FLATLINE, 1 + end + + + ## + # Message Classes + # + class Server < ::Protobuf::Message; end + class Beacon < ::Protobuf::Message; end + + ## + # Message Fields + # + class Server + optional ::Protobuf::Field::StringField, :uuid, 1 + optional ::Protobuf::Field::StringField, :address, 2 + optional ::Protobuf::Field::StringField, :port, 3 + optional ::Protobuf::Field::Int32Field, :ttl, 4 + repeated ::Protobuf::Field::StringField, :services, 5 + end + + class Beacon + optional ::Protobuf::Rpc::DynamicDiscovery::BeaconType, :beacon_type, 1 + optional ::Protobuf::Rpc::DynamicDiscovery::Server, :server, 2 + end + + + end + + end + +end + diff --git a/lib/protobuf/rpc/error/client_error.rb b/lib/protobuf/rpc/error/client_error.rb index 5b65ed3b..19d161af 100644 --- a/lib/protobuf/rpc/error/client_error.rb +++ b/lib/protobuf/rpc/error/client_error.rb @@ -2,30 +2,30 @@ module Protobuf module Rpc - + class InvalidRequestProto < PbError def initialize(message='Invalid request type given') super message, 'INVALID_REQUEST_PROTO' end end - + class BadResponseProto < PbError def initialize(message='Bad response type from server') super message, 'BAD_RESPONSE_PROTO' end end - + class UnkownHost < PbError def initialize(message='Unknown host or port') super message, 'UNKNOWN_HOST' end end - + class IOError < PbError def initialize(message='IO Error occurred') super message, 'IO_ERROR' end end - + end end diff --git a/lib/protobuf/rpc/error/server_error.rb b/lib/protobuf/rpc/error/server_error.rb index f0191776..a37f0a37 100644 --- a/lib/protobuf/rpc/error/server_error.rb +++ b/lib/protobuf/rpc/error/server_error.rb @@ -2,42 +2,42 @@ module Protobuf module Rpc - + class BadRequestData < PbError def initialize message='Unable to parse request' super message, 'BAD_REQUEST_DATA' end end - + class BadRequestProto < PbError def initialize message='Request is of wrong type' super message, 'BAD_REQUEST_PROTO' end end - + class ServiceNotFound < PbError def initialize message='Service class not found' super message, 'SERVICE_NOT_FOUND' end end - + class MethodNotFound < PbError def initialize message='Service method not found' super message, 'METHOD_NOT_FOUND' end end - + class RpcError < PbError def initialize message='RPC exception occurred' super message, 'RPC_ERROR' end end - + class RpcFailed < PbError def initialize message='RPC failed' super message, 'RPC_FAILED' end end - + end end diff --git a/lib/protobuf/rpc/rpc.pb.rb b/lib/protobuf/rpc/rpc.pb.rb index d7b12fca..fc3d69bb 100644 --- a/lib/protobuf/rpc/rpc.pb.rb +++ b/lib/protobuf/rpc/rpc.pb.rb @@ -4,22 +4,13 @@ require 'protobuf/message' module Protobuf + module Socketrpc + ## # Enum Classes # - class ErrorReason < ::Protobuf::Enum; end - - ## - # Message Classes - # - class Request < ::Protobuf::Message; end - class Response < ::Protobuf::Message; end - - ## - # Enum Values - # - class ErrorReason + class ErrorReason < ::Protobuf::Enum define :BAD_REQUEST_DATA, 0 define :BAD_REQUEST_PROTO, 1 define :SERVICE_NOT_FOUND, 2 @@ -32,6 +23,13 @@ class ErrorReason define :IO_ERROR, 9 end + + ## + # Message Classes + # + class Request < ::Protobuf::Message; end + class Response < ::Protobuf::Message; end + ## # Message Fields # @@ -49,5 +47,8 @@ class Response optional ::Protobuf::Socketrpc::ErrorReason, :error_reason, 4 end + end + end + diff --git a/lib/protobuf/rpc/servers/evented_runner.rb b/lib/protobuf/rpc/servers/evented_runner.rb index 0825647d..38acade0 100644 --- a/lib/protobuf/rpc/servers/evented_runner.rb +++ b/lib/protobuf/rpc/servers/evented_runner.rb @@ -2,22 +2,27 @@ module Protobuf module Rpc class EventedRunner - def self.register_signals - # Noop + def initialize(options) + @options = options end - def self.run(options) + def run # Startup and run the rpc server ::EventMachine.schedule do - ::EventMachine.start_server(options[:host], options[:port], ::Protobuf::Rpc::Evented::Server) + ::EventMachine.start_server( + @options[:host], + @options[:port], + ::Protobuf::Rpc::Evented::Server + ) end # Join or start the reactor - yield if block_given? + yield if block_given? + ::EM.reactor_running? ? ::EM.reactor_thread.join : ::EM.run end - def self.stop + def stop ::EventMachine.stop_event_loop if ::EventMachine.reactor_running? end diff --git a/lib/protobuf/rpc/servers/socket/server.rb b/lib/protobuf/rpc/servers/socket/server.rb index 1d67f4ee..50298ad3 100644 --- a/lib/protobuf/rpc/servers/socket/server.rb +++ b/lib/protobuf/rpc/servers/socket/server.rb @@ -11,12 +11,16 @@ class Server AUTO_COLLECT_TIMEOUT = 5 # seconds - def self.cleanup? + def initialize(options) + @options = options + end + + def cleanup? # every 10 connections run a cleanup routine after closing the response @threads.size > (@threshold - 1) && (@threads.size % @threshold) == 0 end - def self.cleanup_threads + def cleanup_threads log_debug { sign_message("Thread cleanup - #{@threads.size} - start") } @threads = @threads.select do |t| @@ -32,11 +36,11 @@ def self.cleanup_threads log_debug { sign_message("Thread cleanup - #{@threads.size} - complete") } end - def self.log_signature + def log_signature @_log_signature ||= "server-#{self.class.name}" end - def self.new_worker(socket) + def new_worker(socket) Thread.new(socket) do |sock| ::Protobuf::Rpc::Socket::Worker.new(sock) do |s| s.close @@ -44,12 +48,12 @@ def self.new_worker(socket) end end - def self.run(options = {}) + def run log_debug { sign_message("Run") } - host = options[:host] - port = options[:port] - backlog = options[:backlog] - @threshold = options[:threshold] + host = @options[:host] + port = @options[:port] + backlog = @options[:backlog] + @threshold = @options[:threshold] @threads = [] @server = ::TCPServer.new(host, port) @@ -62,8 +66,9 @@ def self.run(options = {}) while running? log_debug { sign_message("Waiting for connections") } + ready_cnxns = IO.select(@listen_fds, [], [], AUTO_COLLECT_TIMEOUT) rescue nil - if ready_cnxns = IO.select(@listen_fds, [], [], AUTO_COLLECT_TIMEOUT) + if ready_cnxns cnxns = ready_cnxns.first cnxns.each do |client| case @@ -96,16 +101,15 @@ def self.run(options = {}) raise #if running? end - def self.running? - @running + def running? + !!@running end - def self.stop + def stop @running = false - @server.close if @server + @server.try(:close) end end - end end end diff --git a/lib/protobuf/rpc/servers/socket_runner.rb b/lib/protobuf/rpc/servers/socket_runner.rb index d35cdcbb..bd70aad2 100644 --- a/lib/protobuf/rpc/servers/socket_runner.rb +++ b/lib/protobuf/rpc/servers/socket_runner.rb @@ -2,30 +2,33 @@ module Protobuf module Rpc class SocketRunner - def self.register_signals - # noop - end + def initialize(options) + @options = case + when options.is_a?(OpenStruct) then + options.marshal_dump + when options.is_a?(Hash) then + options + when options.respond_to?(:to_hash) then + options.to_hash + else + raise "Cannot parser Socket Server - server options" + end - def self.run(server) - server_config = case - when server.is_a?(OpenStruct) then - server.marshal_dump - when server.is_a?(Hash) then - server - when server.respond_to?(:to_hash) then - server.to_hash - else - raise "Cannot parser Socket Server - server options" - end + @server = ::Protobuf::Rpc::Socket::Server.new(@options) + end - yield if block_given? - ::Protobuf::Rpc::Socket::Server.run(server_config) + def run + yield if block_given? + @server.run end - def self.stop - ::Protobuf::Rpc::Socket::Server.stop + def running? + @server.running? end + def stop + @server.stop + end end end end diff --git a/lib/protobuf/rpc/servers/zmq/broker.rb b/lib/protobuf/rpc/servers/zmq/broker.rb index 3b005861..430d12d0 100644 --- a/lib/protobuf/rpc/servers/zmq/broker.rb +++ b/lib/protobuf/rpc/servers/zmq/broker.rb @@ -1,128 +1,138 @@ -require 'resolv' -require 'protobuf/rpc/servers/zmq/util' module Protobuf module Rpc module Zmq class Broker include ::Protobuf::Rpc::Zmq::Util - attr_reader :frontend, :backend, :poller, :context, :available_workers, :options, :expected_worker_count - - ## - # Constructor - # - def initialize(options = {}) - @available_workers = [] - @options = options.dup - @expected_worker_count = @options[:threads] - @context = ::ZMQ::Context.new - @poller = ::ZMQ::Poller.new - setup_backend - end - - ## - # Instance Methods - # - def poll - if frontend.nil? - if local_workers_have_started? - # only open the front end when the workers are done booting - log_info { "Starting frontend socket in broker, all workers ready!" } - setup_frontend - end - else - # Start checking the poller after startup - if available_workers.size > 0 - poller.register(frontend, ::ZMQ::POLLIN) if poller.size < 2 - else - poller.delete(frontend) - end - end - poller.poll(1000) - poller.readables.each do |socket| - case socket - when backend then - move_to_frontend(socket) - when frontend then - move_to_backend(socket) + def initialize(server) + @server = server + + init_zmq_context + init_backend_socket + init_frontend_socket + init_shutdown_socket + init_poller + rescue + teardown + raise + end + + def join + @thread.try(:join) + end + + def run + @idle_workers = [] + + catch(:shutdown) do + while @poller.poll > 0 + @poller.readables.each do |readable| + case readable + when @frontend_socket + process_frontend + when @backend_socket + process_backend + when @shutdown_socket + throw :shutdown + end + end end end + ensure + teardown end - def setup_backend - host = options[:host] - port = options[:worker_port] + def start + log_debug { sign_message("starting broker") } - zmq_backend = context.socket(::ZMQ::ROUTER) - zmq_error_check(zmq_backend.bind(bind_address(host, port))) + @thread = Thread.new { self.run } - @backend = zmq_backend - @poller.register(@backend, ::ZMQ::POLLIN) + self end - def setup_frontend - host = options[:host] - port = options[:port] - - zmq_frontend = context.socket(::ZMQ::ROUTER) - zmq_error_check(zmq_frontend.bind(bind_address(host, port))) - - @frontend = zmq_frontend - @poller.register(@frontend, ::ZMQ::POLLIN) + def shutdown_uri + "inproc://#{object_id}" end - def teardown - frontend.try(:close) - backend.try(:close) - context.try(:terminate) + def signal_shutdown + socket = @zmq_context.socket(ZMQ::PAIR) + zmq_error_check(socket.connect(shutdown_uri)) + zmq_error_check(socket.send_string ".") + zmq_error_check(socket.close) end private - def local_workers_have_started? - @local_workers_have_started ||= available_workers.size >= expected_worker_count - end + def init_backend_socket + @backend_socket = @zmq_context.socket(ZMQ::ROUTER) + zmq_error_check(@backend_socket.bind(@server.backend_uri)) + end - def move_to_backend(socket) - message_array = [] - zmq_error_check(socket.recv_strings(message_array)) + def init_frontend_socket + @frontend_socket = @zmq_context.socket(ZMQ::ROUTER) + zmq_error_check(@frontend_socket.bind(@server.frontend_uri)) + end - backend_message_set = [ - available_workers.shift, # Worker UUID for router - "", - message_array[0], # Client UUID for return value - "", - message_array[2] # Client Message payload (request) - ] + def init_poller + @poller = ZMQ::Poller.new + @poller.register_readable(@frontend_socket) + @poller.register_readable(@backend_socket) + @poller.register_readable(@shutdown_socket) + end - zmq_error_check(backend.send_strings(backend_message_set)) - end + def init_shutdown_socket + @shutdown_socket = @zmq_context.socket(ZMQ::PAIR) + zmq_error_check(@shutdown_socket.bind(shutdown_uri)) + end - def move_to_frontend(socket) - message_array = [] - zmq_error_check(socket.recv_strings(message_array)) + def init_zmq_context + @zmq_context = ZMQ::Context.new + end - # Push UUID of socket on the available workers queue - available_workers << message_array[0] + def process_backend + worker, ignore, *frames = read_from_backend - # messages should be [ "uuid of socket", "", "READY_MESSAGE || uuid of client socket"] - if message_array[2] == ::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE - log_info { "Worker #{available_workers.size} of #{expected_worker_count} ready!" } - else - frontend_message_set = [ - message_array[2], # client UUID - "", - message_array[4] # Reply payload - ] + @idle_workers << worker - zmq_error_check(frontend.send_strings(frontend_message_set)) - end + unless frames == [::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE] + write_to_frontend(frames) + end + end + + def process_frontend + if @idle_workers.any? + frames = read_from_frontend + write_to_backend([@idle_workers.shift, ""] + frames) end + end + + def read_from_backend + [].tap do |frames| + zmq_error_check(@backend_socket.recv_strings(frames)) + end + end - def bind_address(host, port) - "tcp://#{resolve_ip(host)}:#{port}" + def read_from_frontend + [].tap do |frames| + zmq_error_check(@frontend_socket.recv_strings(frames)) end + end + + def teardown + @frontend_socket.try(:close) + @backend_socket.try(:close) + @shutdown_socket.try(:close) + @zmq_context.try(:terminate) + end + + def write_to_backend(frames) + zmq_error_check(@backend_socket.send_strings(frames)) + end + + def write_to_frontend(frames) + zmq_error_check(@frontend_socket.send_strings(frames)) + end end end end diff --git a/lib/protobuf/rpc/servers/zmq/server.rb b/lib/protobuf/rpc/servers/zmq/server.rb index b8d11ee6..39736683 100644 --- a/lib/protobuf/rpc/servers/zmq/server.rb +++ b/lib/protobuf/rpc/servers/zmq/server.rb @@ -1,6 +1,8 @@ -require 'protobuf/rpc/servers/zmq/broker' -require 'protobuf/rpc/servers/zmq/worker' require 'protobuf/rpc/servers/zmq/util' +require 'protobuf/rpc/servers/zmq/worker' +require 'protobuf/rpc/servers/zmq/broker' +require 'protobuf/rpc/dynamic_discovery.pb' +require 'securerandom' module Protobuf module Rpc @@ -8,69 +10,287 @@ module Zmq class Server include ::Protobuf::Rpc::Zmq::Util - ## - # Class Methods - # - def self.run(options = {}) - @options = options + DEFAULT_OPTIONS = { + :beacon_interval => 5, + :broadcast_beacons => false + } - unless options[:workers_only] - log_debug { sign_message("initializing broker") } - @broker = ::Protobuf::Rpc::Zmq::Broker.new(options) - end + attr_accessor :options - local_worker_threads = options[:threads] - log_debug { sign_message("starting server workers") } + def initialize(options) + @options = DEFAULT_OPTIONS.merge(options) + @workers = [] - @running = true - local_worker_threads.times do - self.start_worker + init_zmq_context + init_beacon_socket if broadcast_beacons? + init_shutdown_socket + rescue + teardown + raise + end + + def backend_ip + frontend_ip + end + + def backend_port + options[:worker_port] || frontend_port + 1 + end + + def backend_uri + "tcp://#{backend_ip}:#{backend_port}" + end + + def beacon_interval + [options[:beacon_interval].to_i, 1].max + end + + def beacon_ip + unless @beacon_ip + unless address = options[:beacon_address] + address = ::Protobuf::Rpc::ServiceDirectory.address + end + + @beacon_ip = resolve_ip(address) end - log_debug { sign_message("server started") } - while self.running? do - if options[:workers_only] - sleep 5 - Thread.pass - else - @broker.poll + @beacon_ip + end + + def beacon_port + unless @beacon_port + unless port = options[:beacon_port] + port = ::Protobuf::Rpc::ServiceDirectory.port end + + @beacon_port = port.to_i + end + + @beacon_port + end + + def beacon_uri + "udp://#{beacon_ip}:#{beacon_port}" + end + + def broadcast_beacons? + !brokerless? && options[:broadcast_beacons] + end + + def broadcast_flatline + flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( + :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE, + :server => self.to_proto + ) + + @beacon_socket.send flatline.serialize_to_string, 0 + end + + def broadcast_heartbeat + @last_beacon = Time.now.to_i + + heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( + :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT, + :server => self.to_proto + ) + + @beacon_socket.send(heartbeat.serialize_to_string, 0) + + log_debug { sign_message("sent heartbeat to #{beacon_uri}") } + end + + def broadcast_heartbeat? + Time.now.to_i >= next_beacon && broadcast_beacons? + end + + def brokerless? + !!options[:workers_only] + end + + def frontend_ip + @frontend_ip ||= resolve_ip(options[:host]) + end + + def frontend_port + options[:port] + end + + def frontend_uri + "tcp://#{frontend_ip}:#{frontend_port}" + end + + def maintenance_timeout + 1_000 * (next_maintenance - Time.now.to_i) + end + + def next_maintenance + cycles = [next_reaping] + cycles << next_beacon if broadcast_beacons? + + cycles.min + end + + def minimum_timeout + 100 + end + + def next_beacon + if @last_beacon.nil? + 0 + else + @last_beacon + beacon_interval end + end + + def next_reaping + if @last_reaping.nil? + 0 + else + @last_reaping + reaping_interval + end + end + + def reap_dead_workers + @last_reaping = Time.now.to_i + + @workers.keep_if do |worker| + worker.alive? or worker.join && false + end + end + + def reap_dead_workers? + Time.now.to_i >= next_reaping + end + + def reaping_interval + 5 + end + + def run + @running = true + + start_broker unless brokerless? + start_missing_workers + wait_for_shutdown_signal + broadcast_flatline if broadcast_beacons? + stop_workers + stop_broker unless brokerless? ensure - @broker.teardown if @broker + @running = false + teardown end - def self.running? + def running? !!@running end - def self.start_worker - @threads << Thread.new(@options) { |options| - begin - ::Protobuf::Rpc::Zmq::Worker.new(options).run - rescue => e - message = "Worker Failed, spawning new worker: #{e.inspect}\n #{e.backtrace.join($/)}" - $stderr.puts message - log_error { message } + def shutdown_uri + "inproc://#{object_id}" + end - retry if ::Protobuf::Rpc::Zmq::Server.running? - end - } + def signal_shutdown + socket = @zmq_context.socket ZMQ::PAIR + zmq_error_check(socket.connect shutdown_uri) + zmq_error_check(socket.send_string ".") + zmq_error_check(socket.close) end - def self.stop - @running = false + def start_broker + @broker = ::Protobuf::Rpc::Zmq::Broker.new(self).start + end + + def start_missing_workers + missing_workers = total_workers - @workers.size + + if missing_workers > 0 + missing_workers.times { start_worker } + log_debug { sign_message("#{total_workers} workers started") } + end + end - @threads.each do |t| - t.join(5) || t.kill + def start_worker + @workers << ::Protobuf::Rpc::Zmq::Worker.new(self).start + end + + def stop + signal_shutdown + end + + def stop_broker + @broker.signal_shutdown + @broker.join + end + + def stop_workers + @workers.each(&:signal_shutdown) + Thread.pass until reap_dead_workers.empty? + end + + def teardown + @shutdown_socket.try(:close) + @beacon_socket.try(:close) + @zmq_context.try(:terminate) + @last_reaping = @last_beacon = @timeout = nil + end + + def total_workers + @total_workers ||= [@options[:threads].to_i, 1].max + end + + def timeout + if @timeout.nil? + @timeout = 0 + else + @timeout = [minimum_timeout, maintenance_timeout].max + end + end + + def to_proto + @proto ||= ::Protobuf::Rpc::DynamicDiscovery::Server.new( + :uuid => uuid, + :address => frontend_ip, + :port => frontend_port.to_s, + :ttl => (beacon_interval * 1.5).ceil, + :services => ::Protobuf::Rpc::Service.implemented_services + ) + end + + def uuid + @uuid ||= SecureRandom.uuid + end + + def wait_for_shutdown_signal + poller = ZMQ::Poller.new + poller.register_readable(@shutdown_socket) + + # If the poller returns 1, a shutdown signal has been received. + # If the poller returns -1, something went wrong. + while poller.poll(timeout) === 0 + if reap_dead_workers? + reap_dead_workers + start_missing_workers + end + + broadcast_heartbeat if broadcast_heartbeat? end end - def self.threads - @threads + private + + def init_beacon_socket + @beacon_socket = UDPSocket.new + @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_BROADCAST, true) + @beacon_socket.connect(beacon_ip, beacon_port) + end + + def init_shutdown_socket + @shutdown_socket = @zmq_context.socket(ZMQ::PAIR) + zmq_error_check(@shutdown_socket.bind shutdown_uri) end - @threads ||= [] + def init_zmq_context + @zmq_context = ZMQ::Context.new + end end end end diff --git a/lib/protobuf/rpc/servers/zmq/util.rb b/lib/protobuf/rpc/servers/zmq/util.rb index d8ff4508..a684239b 100644 --- a/lib/protobuf/rpc/servers/zmq/util.rb +++ b/lib/protobuf/rpc/servers/zmq/util.rb @@ -1,29 +1,41 @@ +require 'resolv' + module Protobuf module Rpc module Zmq - WORKER_READY_MESSAGE = "WORKER_READY" + WORKER_READY_MESSAGE = "\1" module Util include ::Protobuf::Logger::LogMethods + def self.included(base) base.extend(::Protobuf::Rpc::Zmq::Util) end - def zmq_error_check(return_code) - raise "Last API call failed with \"#{::ZMQ::Util.error_string}\"#{$/}#{$/}#{caller(1)}" unless return_code >= 0 + def zmq_error_check(return_code, source = nil) + unless ::ZMQ::Util.resultcode_ok?(return_code) + raise <<-ERROR + Last ZMQ API call #{source ? "to #{source}" : ""} failed with "#{::ZMQ::Util.error_string}". + + #{caller(1).join($/)} + ERROR + end end def log_signature - @_log_signature ||= "server-#{self.class}-#{object_id}" + unless @_log_signature + name = (self.class == Class ? self.name : self.class.name) + @_log_signature = "[server-#{name}-#{object_id}]" + end + + @_log_signature end def resolve_ip(hostname) ::Resolv.getaddress(hostname) end - end - end end end diff --git a/lib/protobuf/rpc/servers/zmq/worker.rb b/lib/protobuf/rpc/servers/zmq/worker.rb index c0c69053..b648e4fe 100644 --- a/lib/protobuf/rpc/servers/zmq/worker.rb +++ b/lib/protobuf/rpc/servers/zmq/worker.rb @@ -1,9 +1,9 @@ require 'protobuf/rpc/server' require 'protobuf/rpc/servers/zmq/util' + module Protobuf module Rpc module Zmq - class Worker include ::Protobuf::Rpc::Server include ::Protobuf::Rpc::Zmq::Util @@ -11,63 +11,126 @@ class Worker ## # Constructor # - def initialize(options = {}) - host = options[:host] - port = options[:worker_port] - - @zmq_context = ::ZMQ::Context.new - @socket = @zmq_context.socket(::ZMQ::REQ) - zmq_error_check(@socket.connect("tcp://#{resolve_ip(host)}:#{port}")) - - @poller = ::ZMQ::Poller.new - @poller.register(@socket, ::ZMQ::POLLIN) - - # Send request to broker telling it we are ready - zmq_error_check(@socket.send_string(::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE)) + def initialize(server) + @server = server + init_zmq_context + init_backend_socket + init_shutdown_socket + rescue + teardown + raise end ## # Instance Methods # - def handle_request(socket) - message_array = [] - zmq_error_check(socket.recv_strings(message_array)) + def alive? + @thread.try(:alive?) || false + end - @request_data = message_array[2] - @client_address = message_array[0] - log_debug { sign_message("handling request") } unless @request_data.nil? + def join + @thread.try(:join) + end + + def process_request + @client_address, empty, @request_data = read_from_backend + + unless @request_data.nil? + log_debug { sign_message("handling request") } + handle_client + end + end + + def read_from_backend + [].tap do |frames| + zmq_error_check(@backend_socket.recv_strings(frames)) + end end def run - while ::Protobuf::Rpc::Zmq::Server.running? do - # poll for 1_000 milliseconds then continue looping - # This lets us see whether we need to die - @poller.poll(1_000) - @poller.readables.each do |socket| - initialize_request! - handle_request(socket) - handle_client unless @request_data.nil? + poller = ::ZMQ::Poller.new + poller.register_readable(@backend_socket) + poller.register_readable(@shutdown_socket) + + # Send request to broker telling it we are ready + write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE]) + + catch(:shutdown) do + while poller.poll > 0 + poller.readables.each do |readable| + case readable + when @backend_socket + initialize_request! + process_request + when @shutdown_socket + throw :shutdown + end + end end end ensure - @socket.close - @zmq_context.terminate + teardown end def send_data - response_data = @response.to_s # to_s is aliases as serialize_to_string in Message + data = @response.serialize_to_string - response_message_set = [ - @client_address, # client uuid address - "", - response_data - ] + @stats.response_size = data.size - @stats.response_size = response_data.size - zmq_error_check(@socket.send_strings(response_message_set)) + write_to_backend([@client_address, "", data]) + end + + def shutdown_uri + "inproc://#{object_id}" + end + + def signal_shutdown + socket = @zmq_context.socket ZMQ::PAIR + zmq_error_check(socket.connect(shutdown_uri)) + zmq_error_check(socket.send_string(".")) + zmq_error_check(socket.close) + end + + def start + @thread = Thread.new do + begin + self.run + rescue => e + message = "Worker failed: #{e.inspect}\n #{e.backtrace.join($/)}" + $stderr.puts(message) + log_error { message } + end + end + + self + end + + def teardown + @backend_socket.try(:close) + @shutdown_socket.try(:close) + @zmq_context.try(:terminate) end - end + def write_to_backend(frames) + zmq_error_check(@backend_socket.send_strings(frames)) + end + + private + + def init_zmq_context + @zmq_context = ZMQ::Context.new + end + + def init_backend_socket + @backend_socket = @zmq_context.socket(ZMQ::REQ) + zmq_error_check(@backend_socket.connect(@server.backend_uri)) + end + + def init_shutdown_socket + @shutdown_socket = @zmq_context.socket(ZMQ::PAIR) + zmq_error_check(@shutdown_socket.bind(shutdown_uri)) + end + end end end end diff --git a/lib/protobuf/rpc/servers/zmq_runner.rb b/lib/protobuf/rpc/servers/zmq_runner.rb index b2f4db20..656c634a 100644 --- a/lib/protobuf/rpc/servers/zmq_runner.rb +++ b/lib/protobuf/rpc/servers/zmq_runner.rb @@ -1,35 +1,46 @@ +require 'ostruct' + module Protobuf module Rpc class ZmqRunner include ::Protobuf::Logger::LogMethods - def self.register_signals - trap(:TTIN) do - log_info { "TTIN received: Starting new worker" } - ::Protobuf::Rpc::Zmq::Server.start_worker - log_info { "Worker count : #{::Protobuf::Rpc::Zmq::Server.threads.size}" } - end - end + def initialize(options) + @options = case + when options.is_a?(OpenStruct) then + options.marshal_dump + when options.respond_to?(:to_hash) then + options.to_hash + else + raise "Cannot parser Zmq Server - server options" + end - def self.run(server) - server_config = case - when server.is_a?(OpenStruct) then - server.marshal_dump - when server.respond_to?(:to_hash) then - server.to_hash - else - raise "Cannot parser Zmq Server - server options" - end + end - yield if block_given? + def run + @server = ::Protobuf::Rpc::Zmq::Server.new(@options) + register_signals + yield if block_given? + @server.run + end - ::Protobuf::Rpc::Zmq::Server.run(server_config) + def running? + @server.try :running? end - def self.stop - ::Protobuf::Rpc::Zmq::Server.stop + def stop + @server.try :stop end + private + + def register_signals + trap(:TTIN) do + log_info { "TTIN received: Starting new worker" } + @server.start_worker + log_info { "Worker count : #{::Protobuf::Rpc::Zmq::Server.threads.size}" } + end + end end end end diff --git a/lib/protobuf/rpc/service.rb b/lib/protobuf/rpc/service.rb index 86320a4c..c708de58 100644 --- a/lib/protobuf/rpc/service.rb +++ b/lib/protobuf/rpc/service.rb @@ -96,6 +96,18 @@ def self.rpc_method?(name) rpcs.key?(name) end + # An array of defined service classes that contain implementation + # code + def self.implemented_services + classes = (self.subclasses || []).select do |subclass| + subclass.rpcs.any? do |(name, method)| + subclass.method_defined? name + end + end + + classes.map &:name + end + ## # Instance Methods # diff --git a/lib/protobuf/rpc/service_directory.rb b/lib/protobuf/rpc/service_directory.rb new file mode 100644 index 00000000..7ef89197 --- /dev/null +++ b/lib/protobuf/rpc/service_directory.rb @@ -0,0 +1,206 @@ +require 'delegate' +require 'singleton' +require 'socket' +require 'thread' +require 'timeout' + +require 'protobuf/rpc/dynamic_discovery.pb' + +module Protobuf + module Rpc + class ServiceDirectory + include ::Singleton + include ::Protobuf::Logger::LogMethods + + DEFAULT_ADDRESS = "255.255.255.255" + DEFAULT_PORT = 53000 + DEFAULT_TIMEOUT = 1 + + class Listing < Delegator + attr_reader :expires_at + + def initialize(server) + @server = server + @expires_at = Time.now.to_i + ttl + end + + def current? + !expired? + end + + def expired? + Time.now.to_i >= @expires_at + end + + def ttl + [super.to_i, 3].max + end + + def __getobj__ + @server + end + end + + # Class Methods + # + class << self + attr_writer :address, :port + end + + def self.address + @address ||= DEFAULT_ADDRESS + end + + def self.port + @port ||= DEFAULT_PORT + end + + def self.start + yield(self) if block_given? + self.instance.start + end + + def self.stop + self.instance.stop + end + + # Instance Methods + # + def initialize + @listings = {} + @mutex = Mutex.new + end + + def add_listing_for(server) + if server && server.uuid + + log_debug do + action = @listings[server.uuid] ? "Updating" : "Adding"; + sign_message("#{action} server: #{server.inspect}") + end + + @mutex.synchronize do + @listings[server.uuid] = Listing.new(server) + end + + else + log_info { sign_message("Cannot add server without uuid: #{server.inspect}") } + end + end + + def lookup(service) + @mutex.synchronize do + listings = @listings.values.select do |listing| + listing.services.any? do |listed_service| + listing.current? && listed_service == service.to_s + end + end + + listings.sample + end + end + + def remove_expired_listings + @mutex.synchronize do + @listings.delete_if do |uuid, listing| + listing.expired? + end + end + end + + def remove_listing_for(server) + if server && server.uuid + log_debug { sign_message("Removing server: #{server.inspect}") } + + @mutex.synchronize do + @listings.delete(server.uuid) + end + + else + log_info { sign_message("Cannot remove server without uuid: #{server.inspect}") } + end + end + + def restart + stop + start + end + + def running? + !!@thread.try(:alive?) + end + + def start + unless running? + init_socket + log_info { sign_message("listening to udp://#{self.class.address}:#{self.class.port}") } + @thread = Thread.new { self.send(:run) } + end + + self + end + + def stop + log_info { sign_message("Stopping directory") } + + @mutex.synchronize do + @thread.try(:kill) + @thread = nil + @listings = {} + end + + @socket.try(:close) + @socket = nil + end + + def wait_for(service, timeout = DEFAULT_TIMEOUT) + log_debug { sign_message("waiting for #{service}") } + Timeout.timeout(timeout) do + sleep(timeout / 10.0) until listing = lookup(service) + listing + end + rescue + log_info { sign_message("no listing found for #{service}") } + nil + end + + private + + def init_socket + @socket = UDPSocket.new + @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true) + @socket.bind(self.class.address, self.class.port.to_i) + end + + def process_beacon(beacon) + case beacon.beacon_type + when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT + add_listing_for(beacon.server) + when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE + remove_listing_for(beacon.server) + end + end + + def run + loop do + process_beacon(wait_for_beacon) + remove_expired_listings + end + rescue => e + log_debug { sign_message("error: (#{e.class}) #{e.message}") } + retry + end + + def wait_for_beacon + data, addr = @socket.recvfrom(2048) + + ::Protobuf::Rpc::DynamicDiscovery::Beacon.new.tap do |beacon| + beacon.parse_from_string(data) rescue nil + + # Favor the address captured by the socket + beacon.try(:server).try(:address=, addr[3]) + end + end + end + end +end diff --git a/proto/dynamic_discovery.proto b/proto/dynamic_discovery.proto new file mode 100644 index 00000000..dcecca5f --- /dev/null +++ b/proto/dynamic_discovery.proto @@ -0,0 +1,44 @@ +// Copyright (c) 2013 MoneyDesktop, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Authors: Devin Christensen +// +// Protobufs needed for dynamic discovery zmq server and client. + +package protobuf.rpc.dynamicDiscovery; + +enum BeaconType { + HEARTBEAT = 0; + FLATLINE = 1; +} + +message Server { + optional string uuid = 1; + optional string address = 2; + optional string port = 3; + optional int32 ttl = 4; + repeated string services = 5; +} + +message Beacon { + optional BeaconType beacon_type = 1; + optional Server server = 2; +} + diff --git a/spec/benchmark/tasks.rb b/spec/benchmark/tasks.rb index e573b2ca..75267453 100644 --- a/spec/benchmark/tasks.rb +++ b/spec/benchmark/tasks.rb @@ -32,9 +32,7 @@ def em_client_em_server(number_tests, test_length, global_bench = nil) EM.stop if EM.reactor_running? EventMachine.fiber_run do - StubServer.new do |server| - client = ::Test::ResourceService.client - + StubServer.new(:server => Protobuf::Rpc::Evented::Server) do |server| benchmark_wrapper(global_bench) do |bench| bench.report("ES / EC") do (1..number_tests.to_i).each { client.find(:name => "Test Name" * test_length.to_i, :active => true) } diff --git a/spec/functional/socket_server_spec.rb b/spec/functional/socket_server_spec.rb index 02ce00aa..2af158d1 100644 --- a/spec/functional/socket_server_spec.rb +++ b/spec/functional/socket_server_spec.rb @@ -4,14 +4,14 @@ describe 'Functional Socket Client' do before(:all) do load "protobuf/socket.rb" - Thread.abort_on_exception = true - server = OpenStruct.new(:host => "127.0.0.1", :port => 9399, :backlog => 100, :threshold => 100) - @server_thread = Thread.new(server) { |s| Protobuf::Rpc::SocketRunner.run(s) } - Thread.pass until Protobuf::Rpc::Socket::Server.running? + @options = OpenStruct.new(:host => "127.0.0.1", :port => 9399, :backlog => 100, :threshold => 100) + @runner = ::Protobuf::Rpc::SocketRunner.new(@options) + @server_thread = Thread.new(@runner) { |runner| runner.run } + Thread.pass until @runner.running? end after(:all) do - Protobuf::Rpc::SocketRunner.stop + @runner.stop @server_thread.join end @@ -56,3 +56,4 @@ error.message.should =~ /expected request.*ResourceFindRequest.*Resource instead/i end end + diff --git a/spec/functional/zmq_server_spec.rb b/spec/functional/zmq_server_spec.rb index ef02c16c..e45345cb 100644 --- a/spec/functional/zmq_server_spec.rb +++ b/spec/functional/zmq_server_spec.rb @@ -1,24 +1,29 @@ require 'spec_helper' require 'spec/support/test/resource_service' +require 'protobuf/rpc/service_directory' describe 'Functional ZMQ Client' do - before(:all) do - load "protobuf/zmq.rb" - Thread.abort_on_exception = true - server = OpenStruct.new(:host => "127.0.0.1", - :port => 9399, - :worker_port => 9400, - :backlog => 100, - :threshold => 100, - :threads => 5) + let(:options) {{ + :host => "127.0.0.1", + :port => 9399, + :worker_port => 9400, + :backlog => 100, + :threshold => 100, + :threads => 5 + }} + + let(:server) { ::Protobuf::Rpc::Zmq::Server.new(options) } + let(:server_thread) { Thread.new(server) { |server| server.run } } - @server_thread = Thread.new(server) { |s| Protobuf::Rpc::ZmqRunner.run(s) } - Thread.pass until Protobuf::Rpc::Zmq::Server.running? + before do + load "protobuf/zmq.rb" + server_thread.abort_on_exception = true + Thread.pass until server.running? end - after(:all) do - ::Protobuf::Rpc::Zmq::Server.stop - @server_thread.try(:join) + after do + server.stop + server_thread.join end it 'runs fine when required fields are set' do @@ -38,27 +43,51 @@ }.to_not raise_error end - it 'calls the on_failure callback when a message is malformed' do - error = nil - request = ::Test::ResourceFindRequest.new(:active => true) - client = ::Test::ResourceService.client + context 'when a message is malformed' do + it 'calls the on_failure callback' do + error = nil + StubServer.new(:server => Protobuf::Rpc::Zmq::Server) do + request = ::Test::ResourceFindRequest.new(:active => true) + client = ::Test::ResourceService.client + + client.find(request) do |c| + c.on_success { raise "shouldn't pass" } + c.on_failure {|e| error = e } + end + end + error.message.should match(/name.*required/) + end + end + + context 'when the request type is wrong' do + it 'calls the on_failure callback' do + error = nil + StubServer.new(:server => Protobuf::Rpc::Zmq::Server) do + request = ::Test::Resource.new(:name => 'Test Name') + client = ::Test::ResourceService.client - client.find(request) do |c| - c.on_success { raise "shouldn't pass"} - c.on_failure {|e| error = e} + client.find(request) do |c| + c.on_success { raise "shouldn't pass" } + c.on_failure {|e| error = e} + end + end + error.message.should match(/expected request.*ResourceFindRequest.*Resource instead/i) end - error.message.should =~ /name.*required/ end - it 'calls the on_failure callback when the request type is wrong' do - error = nil - request = ::Test::Resource.new(:name => 'Test Name') - client = ::Test::ResourceService.client + context 'when the server takes too long to respond' do + it 'responds with a timeout error' do + error = nil + StubServer.new(:server => Protobuf::Rpc::Zmq::Server) do + client = ::Test::ResourceService.client(:timeout => 1) - client.find(request) do |c| - c.on_success { raise "shouldn't pass"} - c.on_failure {|e| error = e} + client.find_with_sleep(:sleep => 2) do |c| + c.on_success { raise "shouldn't pass" } + c.on_failure { |e| error = e } + end + end + error.message.should match(/The server took longer than 1 seconds to respond/i) end - error.message.should =~ /expected request.*ResourceFindRequest.*Resource instead/i end + end diff --git a/spec/lib/protobuf/cli_spec.rb b/spec/lib/protobuf/cli_spec.rb index 2bfef524..dd6d404d 100644 --- a/spec/lib/protobuf/cli_spec.rb +++ b/spec/lib/protobuf/cli_spec.rb @@ -7,10 +7,14 @@ File.expand_path('../../../support/test_app_file.rb', __FILE__) end - before do - ::Protobuf::Rpc::SocketRunner.stub(:run) - ::Protobuf::Rpc::ZmqRunner.stub(:run) - ::Protobuf::Rpc::EventedRunner.stub(:run) + let(:sock_runner) { double "SocketRunner", run: nil, register_signals: nil } + let(:zmq_runner) { double "ZmqRunner", run: nil, register_signals: nil } + let(:evented_runner) { double "EventedRunner", run: nil, register_signals: nil } + + before(:each) do + ::Protobuf::Rpc::SocketRunner.stub(:new) { sock_runner } + ::Protobuf::Rpc::ZmqRunner.stub(:new) { zmq_runner } + ::Protobuf::Rpc::EventedRunner.stub(:new) { evented_runner } end describe '#start' do @@ -22,9 +26,9 @@ let(:test_args) { [ '--host=123.123.123.123' ] } it 'sends the host option to the runner' do - ::Protobuf::Rpc::SocketRunner.should_receive(:run) do |options| + ::Protobuf::Rpc::SocketRunner.should_receive(:new) do |options| options[:host].should eq '123.123.123.123' - end + end.and_return(sock_runner) described_class.start(args) end end @@ -33,9 +37,9 @@ let(:test_args) { [ '--port=12345' ] } it 'sends the port option to the runner' do - ::Protobuf::Rpc::SocketRunner.should_receive(:run) do |options| + ::Protobuf::Rpc::SocketRunner.should_receive(:new) do |options| options[:port].should eq 12345 - end + end.and_return(sock_runner) described_class.start(args) end end @@ -44,9 +48,9 @@ let(:test_args) { [ '--threads=500' ] } it 'sends the threads option to the runner' do - ::Protobuf::Rpc::SocketRunner.should_receive(:run) do |options| + ::Protobuf::Rpc::SocketRunner.should_receive(:new) do |options| options[:threads].should eq 500 - end + end.and_return(sock_runner) described_class.start(args) end end @@ -55,9 +59,9 @@ let(:test_args) { [ '--backlog=500' ] } it 'sends the backlog option to the runner' do - ::Protobuf::Rpc::SocketRunner.should_receive(:run) do |options| + ::Protobuf::Rpc::SocketRunner.should_receive(:new) do |options| options[:backlog].should eq 500 - end + end.and_return(sock_runner) described_class.start(args) end end @@ -66,9 +70,9 @@ let(:test_args) { [ '--threshold=500' ] } it 'sends the backlog option to the runner' do - ::Protobuf::Rpc::SocketRunner.should_receive(:run) do |options| + ::Protobuf::Rpc::SocketRunner.should_receive(:new) do |options| options[:threshold].should eq 500 - end + end.and_return(sock_runner) described_class.start(args) end end @@ -92,16 +96,18 @@ it 'sets both request and serialization pausing to false' do described_class.start(args) - ::Protobuf.gc_pause_server_request?.should be_false + ::Protobuf.should_not be_gc_pause_server_request end end - context 'request pausing' do - let(:test_args) { [ '--gc_pause_request' ] } + unless defined?(JRUBY_VERSION) + context 'request pausing' do + let(:test_args) { [ '--gc_pause_request' ] } - it 'sets the configuration option to GC pause server request' do - described_class.start(args) - ::Protobuf.gc_pause_server_request?.should be_true + it 'sets the configuration option to GC pause server request' do + described_class.start(args) + ::Protobuf.should be_gc_pause_server_request + end end end end @@ -156,18 +162,18 @@ let(:runner) { ::Protobuf::Rpc::SocketRunner } before do - ::Protobuf::Rpc::EventedRunner.should_not_receive(:run) - ::Protobuf::Rpc::ZmqRunner.should_not_receive(:run) + ::Protobuf::Rpc::EventedRunner.should_not_receive(:new) + ::Protobuf::Rpc::ZmqRunner.should_not_receive(:new) end it 'is activated by the --socket switch' do - runner.should_receive(:run) + runner.should_receive(:new) described_class.start(args) end it 'is activated by PB_SERVER_TYPE=Socket ENV variable' do ENV['PB_SERVER_TYPE'] = "Socket" - runner.should_receive(:run) + runner.should_receive(:new).and_return(sock_runner) described_class.start(args) ENV.delete('PB_SERVER_TYPE') end @@ -183,18 +189,18 @@ let(:runner) { ::Protobuf::Rpc::EventedRunner } before do - ::Protobuf::Rpc::SocketRunner.should_not_receive(:run) - ::Protobuf::Rpc::ZmqRunner.should_not_receive(:run) + ::Protobuf::Rpc::SocketRunner.should_not_receive(:new) + ::Protobuf::Rpc::ZmqRunner.should_not_receive(:new) end it 'is activated by the --evented switch' do - runner.should_receive(:run) + runner.should_receive(:new).and_return(evented_runner) described_class.start(args) end it 'is activated by PB_SERVER_TYPE=Evented ENV variable' do ENV['PB_SERVER_TYPE'] = "Evented" - runner.should_receive(:run) + runner.should_receive(:new).and_return(evented_runner) described_class.start(args) ENV.delete('PB_SERVER_TYPE') end @@ -204,29 +210,29 @@ ::Protobuf.connector_type.should == :evented end end - + context 'zmq workers only' do let(:test_args) { [ '--workers_only', '--zmq' ] } let(:runner) { ::Protobuf::Rpc::ZmqRunner } before do - ::Protobuf::Rpc::SocketRunner.should_not_receive(:run) - ::Protobuf::Rpc::EventedRunner.should_not_receive(:run) + ::Protobuf::Rpc::SocketRunner.should_not_receive(:new) + ::Protobuf::Rpc::EventedRunner.should_not_receive(:new) end it 'is activated by the --workers_only switch' do - runner.should_receive(:run) do |options| + runner.should_receive(:new) do |options| options[:workers_only].should be_true - end + end.and_return(zmq_runner) described_class.start(args) end it 'is activated by PB_WORKERS_ONLY=1 ENV variable' do ENV['PB_WORKERS_ONLY'] = "1" - runner.should_receive(:run) do |options| + runner.should_receive(:new) do |options| options[:workers_only].should be_true - end + end.and_return(zmq_runner) described_class.start(args) ENV.delete('PB_WORKERS_ONLY') @@ -238,14 +244,14 @@ let(:runner) { ::Protobuf::Rpc::ZmqRunner } before do - ::Protobuf::Rpc::SocketRunner.should_not_receive(:run) - ::Protobuf::Rpc::EventedRunner.should_not_receive(:run) + ::Protobuf::Rpc::SocketRunner.should_not_receive(:new) + ::Protobuf::Rpc::EventedRunner.should_not_receive(:new) end it 'is activated by the --worker_port switch' do - runner.should_receive(:run) do |options| + runner.should_receive(:new) do |options| options[:worker_port].should eq(1234) - end + end.and_return(zmq_runner) described_class.start(args) end @@ -256,18 +262,18 @@ let(:runner) { ::Protobuf::Rpc::ZmqRunner } before do - ::Protobuf::Rpc::SocketRunner.should_not_receive(:run) - ::Protobuf::Rpc::EventedRunner.should_not_receive(:run) + ::Protobuf::Rpc::SocketRunner.should_not_receive(:new) + ::Protobuf::Rpc::EventedRunner.should_not_receive(:new) end it 'is activated by the --zmq switch' do - runner.should_receive(:run) + runner.should_receive(:new) described_class.start(args) end it 'is activated by PB_SERVER_TYPE=Zmq ENV variable' do ENV['PB_SERVER_TYPE'] = "Zmq" - runner.should_receive(:run) + runner.should_receive(:new) described_class.start(args) ENV.delete('PB_SERVER_TYPE') end diff --git a/spec/lib/protobuf/enum_spec.rb b/spec/lib/protobuf/enum_spec.rb index daa16dc0..afacc4ee 100644 --- a/spec/lib/protobuf/enum_spec.rb +++ b/spec/lib/protobuf/enum_spec.rb @@ -6,7 +6,7 @@ before(:all) do Test::EnumTestType.define(:MINUS_ONE, -1) - Test::EnumTestType.define(name, tag) + Test::EnumTestType.define(:THREE, 3) end describe '.define' do diff --git a/spec/lib/protobuf/rpc/client_spec.rb b/spec/lib/protobuf/rpc/client_spec.rb index 5a89d41d..c38af459 100644 --- a/spec/lib/protobuf/rpc/client_spec.rb +++ b/spec/lib/protobuf/rpc/client_spec.rb @@ -6,7 +6,7 @@ load 'protobuf/evented.rb' end - context "when using fiber based calls" do + context "when using fiber based calls", :skip => true do it "waits for response" do EventMachine.fiber_run do StubServer.new(:delay => 1) do |server| diff --git a/spec/lib/protobuf/rpc/connectors/zmq_spec.rb b/spec/lib/protobuf/rpc/connectors/zmq_spec.rb index eeca9fc8..2fd0afb9 100644 --- a/spec/lib/protobuf/rpc/connectors/zmq_spec.rb +++ b/spec/lib/protobuf/rpc/connectors/zmq_spec.rb @@ -2,6 +2,16 @@ require 'protobuf/zmq' describe ::Protobuf::Rpc::Connectors::Zmq do + subject { described_class.new(options) } + + let(:options) {{ + :service => "Test::Service", + :method => "find", + :timeout => 3, + :host => "127.0.0.1", + :port => "9400" + }} + let(:socket_mock) do sm = mock(::ZMQ::Socket) sm.stub(:connect).and_return(0) @@ -14,9 +24,41 @@ zc end - before(:each) do + before do ::ZMQ::Context.stub(:new).and_return(zmq_context_mock) end + describe "#lookup_server_uri" do + let(:service_directory) { double('ServiceDirectory', :running? => running? ) } + let(:listing) { double('Listing', :address => '127.0.0.2', :port => 9399) } + let(:running?) { true } + + before do + subject.stub(:service_directory) { service_directory } + end + + context "when the service directory is running" do + it "searches the service directory" do + service_directory.should_receive(:lookup).and_return(listing) + subject.send(:lookup_server_uri).should eq "tcp://127.0.0.2:9399" + end + + it "defaults to the options" do + service_directory.should_receive(:lookup) { nil } + subject.send(:lookup_server_uri).should eq "tcp://127.0.0.1:9400" + end + end + + context "when the service directory is not running" do + let(:running?) { false } + + it "does not search the directory" do + service_directory.should_not_receive(:lookup) + subject.send(:lookup_server_uri).should eq "tcp://127.0.0.1:9400" + end + end + + end + pending end diff --git a/spec/lib/protobuf/rpc/servers/evented_server_spec.rb b/spec/lib/protobuf/rpc/servers/evented_server_spec.rb index 9ed8b546..558044ce 100644 --- a/spec/lib/protobuf/rpc/servers/evented_server_spec.rb +++ b/spec/lib/protobuf/rpc/servers/evented_server_spec.rb @@ -12,7 +12,8 @@ it "Runner provides a stop method" do runner_class = described_class.to_s.gsub(/Evented::Server/, "EventedRunner").constantize - runner_class.respond_to?(:stop).should be_true + runner = runner_class.new({}) + runner.respond_to?(:stop).should be_true end end diff --git a/spec/lib/protobuf/rpc/servers/socket_server_spec.rb b/spec/lib/protobuf/rpc/servers/socket_server_spec.rb index 7f7e1d5e..84de8c13 100644 --- a/spec/lib/protobuf/rpc/servers/socket_server_spec.rb +++ b/spec/lib/protobuf/rpc/servers/socket_server_spec.rb @@ -12,23 +12,24 @@ before(:all) do load 'protobuf/socket.rb' Thread.abort_on_exception = true - server = OpenStruct.new(:server => "127.0.0.1", :port => 9399, :backlog => 100, :threshold => 100) - @server_thread = Thread.new(server) { |s| Protobuf::Rpc::SocketRunner.run(s) } - Thread.pass until Protobuf::Rpc::Socket::Server.running? + @options = OpenStruct.new(:host => "127.0.0.1", :port => 9399, :backlog => 100, :threshold => 100) + @runner = ::Protobuf::Rpc::SocketRunner.new(@options) + @server = @runner.instance_variable_get(:@server) + @server_thread = Thread.new(@runner) { |runner| runner.run } + Thread.pass until @server.running? end after(:all) do - Protobuf::Rpc::SocketRunner.stop + @server.stop @server_thread.join end it "Runner provides a stop method" do - runner_class = described_class.to_s.gsub(/Evented::Server/, "EventedRunner").constantize - runner_class.respond_to?(:stop).should be_true + @runner.should respond_to(:stop) end it "provides a stop method" do - described_class.respond_to?(:stop).should be_true + @server.should respond_to(:stop) end it "provides a Runner class" do @@ -37,7 +38,7 @@ end it "signals the Server is running" do - described_class.running?.should be_true + @server.should be_running end end diff --git a/spec/lib/protobuf/rpc/servers/zmq/broker_spec.rb b/spec/lib/protobuf/rpc/servers/zmq/broker_spec.rb deleted file mode 100644 index 6aef8433..00000000 --- a/spec/lib/protobuf/rpc/servers/zmq/broker_spec.rb +++ /dev/null @@ -1,31 +0,0 @@ -require 'spec_helper' - -describe ::Protobuf::Rpc::Zmq::Broker do - before(:each) do - load 'protobuf/zmq.rb' - end - - after(:each) do - subject.teardown - end - - subject do - described_class.new({ :host => '127.0.0.1', :port => 9399, :worker_port => 9400 }) - end - - it 'sets up a context' do - subject.context.should be_a(::ZMQ::Context) - end - - it 'sets up a backend socket' do - subject.backend.should be_a(::ZMQ::Socket) - end - - it 'sets up a polling object' do - subject.poller.should be_a(::ZMQ::Poller) - end - - describe '#poll' do - # no unit tests for this method - end -end diff --git a/spec/lib/protobuf/rpc/servers/zmq/server_spec.rb b/spec/lib/protobuf/rpc/servers/zmq/server_spec.rb index 33937a30..80ab752e 100644 --- a/spec/lib/protobuf/rpc/servers/zmq/server_spec.rb +++ b/spec/lib/protobuf/rpc/servers/zmq/server_spec.rb @@ -2,40 +2,45 @@ require 'protobuf/rpc/servers/zmq/server' describe Protobuf::Rpc::Zmq::Server do - before(:each) do + subject { described_class.new(options) } + + let(:options) {{ + :host => '127.0.0.1', + :port => 9399, + :worker_port => 9400, + :workers_only => true + }} + + before do load 'protobuf/zmq.rb' end + after do + subject.teardown + end + describe '.running?' do it 'returns true if running' do - described_class.instance_variable_set(:@running, true) - described_class.running?.should be_true + subject.instance_variable_set(:@running, true) + subject.running?.should be_true end it 'returns false if not running' do - described_class.instance_variable_set(:@running, false) - described_class.running?.should be_false + subject.instance_variable_set(:@running, false) + subject.running?.should be_false end end describe '.stop' do - # keep threads instance variable from retaining any thread mocks we've - # created (breaks tests down the line, otherwise) - after(:each) do - described_class.instance_variable_set(:@threads, []) - end - - it 'lets all threads stop' do - thread_mock = double(Thread) - thread_mock.should_receive(:join).and_return(thread_mock) - described_class.instance_variable_set(:@threads, [thread_mock]) - described_class.stop + it 'signals shutdown' do + subject.should_receive(:signal_shutdown) + subject.stop end it 'sets running to false' do - described_class.instance_variable_set(:@threads, []) - described_class.stop - described_class.instance_variable_get(:@running).should be_false + subject.instance_variable_set(:@workers, []) + subject.stop + subject.instance_variable_get(:@running).should be_false end end end diff --git a/spec/lib/protobuf/rpc/servers/zmq/util_spec.rb b/spec/lib/protobuf/rpc/servers/zmq/util_spec.rb index 90a2ef7c..dad46a46 100644 --- a/spec/lib/protobuf/rpc/servers/zmq/util_spec.rb +++ b/spec/lib/protobuf/rpc/servers/zmq/util_spec.rb @@ -13,26 +13,26 @@ class UtilTest describe '#zmq_error_check' do it 'raises when the error code is less than 0' do expect { - subject.zmq_error_check(-1) - }.to raise_error + subject.zmq_error_check(-1, :test) + }.to raise_error(/test/) end it 'retrieves the error string from ZeroMQ' do ZMQ::Util.stub(:error_string).and_return('an error from zmq') expect { - subject.zmq_error_check(-1) + subject.zmq_error_check(-1, :test) }.to raise_error(RuntimeError, /an error from zmq/i) end it 'does nothing if the error code is > 0' do expect { - subject.zmq_error_check(1) + subject.zmq_error_check(1, :test) }.to_not raise_error end it 'does nothing if the error code is == 0' do expect { - subject.zmq_error_check(0) + subject.zmq_error_check(0, :test) }.to_not raise_error end end diff --git a/spec/lib/protobuf/rpc/service_directory_spec.rb b/spec/lib/protobuf/rpc/service_directory_spec.rb new file mode 100644 index 00000000..e4c858d8 --- /dev/null +++ b/spec/lib/protobuf/rpc/service_directory_spec.rb @@ -0,0 +1,183 @@ +require 'spec_helper' + +require 'protobuf/rpc/service_directory' + +describe ::Protobuf::Rpc::ServiceDirectory do + let(:instance) { ::Protobuf::Rpc::ServiceDirectory.instance } + + def listings + instance.instance_variable_get(:@listings) + end + + def duration + start = Time.now.to_f + yield + Time.now.to_f - start + end + + after do + instance.stop + end + + it "is a singleton" do + instance.should be_a_kind_of(Singleton) + end + + describe "#lookup" do + let(:server) { double('server', :uuid => '123', + :services => ['Known::Service'], + :address => "0.0.0.0", + :port => 9999, + :ttl => 15) } + let(:listing) { ::Protobuf::Rpc::ServiceDirectory::Listing.new(server) } + + it "returns a listing for the given service" do + instance.add_listing_for(server) + instance.lookup("Known::Service").should eq listing + end + + it "returns random listings" do + instance.add_listing_for double(:uuid => 1, :ttl => 5, :services => ["Test"]) + instance.add_listing_for double(:uuid => 2, :ttl => 5, :services => ["Test"]) + + uuids = 100.times.map { instance.lookup("Test").uuid } + uuids.count(1).should be_within(25).of(50) + uuids.count(2).should be_within(25).of(50) + end + + it "does not return expired listings" do + instance.instance_variable_set(:@listings, { + '1' => double(:current? => false, :services => ["Test"]), + }) + + instance.lookup("Test").should be_nil + end + end + + describe "#remove_expired_listings" do + before do + instance.instance_variable_set(:@listings, { + '1' => double(:expired? => true), + '2' => double(:expired? => true), + '3' => double(:expired? => false), + }) + end + + it "removes expired listings" do + expect { + instance.remove_expired_listings + }.to change(listings, :size).from(3).to(1) + listings.keys.should eq ['3'] + end + end + + describe "#start" do + it "creates a thread" do + Thread.should_receive(:new) + instance.start + end + + it "initializes the socket" do + instance.should_receive :init_socket + instance.start + end + + it "calls #run" do + instance.should_receive(:run) + instance.start + sleep 0.01 + end + + it "changes the running state" do + expect { + instance.start + }.to change(instance, :running?).from(false).to(true) + end + end + + describe "#wait_for" do + it "returns a listing for the given service" do + server = double(:uuid => 1, :ttl => 5, :services => ["Test"]) + instance.add_listing_for server + instance.lookup("Test").should eq server + end + + it "depends on #lookup" do + instance.stub(:lookup).with("Hayoob!") { "yup" } + instance.wait_for("Hayoob!").should eq "yup" + end + + it "waits for the service to appear" do + server = double(:uuid => 1, :ttl => 5, :services => ["Test"]) + + t = Thread.new do + sleep 0.5 + instance.add_listing_for server + end + + duration { instance.wait_for("Test") }.should be_within(0.01).of(0.5) + t.join + end + + it "returns nil if the service doesn't appear withint the timeout period" do + server = double(:uuid => 1, :ttl => 5, :services => ["Test"]) + + t = Thread.new do + sleep 0.5 + instance.add_listing_for server + end + + instance.wait_for("Test", 0.1).should be_nil + t.join + end + end + + describe "a running service directory" do + let(:socket) { UDPSocket.new } + + def thread + instance.instance_variable_get(:@thread) + end + + before do + described_class.start do |config| + config.address = "127.0.0.1" + config.port = 33333 + end + + socket.connect described_class.address, described_class.port + end + + context "receiving a heartbeat" do + let(:server) { ::Protobuf::Rpc::DynamicDiscovery::Server.new(:uuid => 'heartbeat', :address => '127.0.0.1') } + let(:beacon) { ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( + :server => server, + :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT + )} + let(:payload) { beacon.serialize_to_string } + + it "adds a listing" do + instance.should_receive(:add_listing_for).with(server) + instance.should_receive(:remove_expired_listings) + socket.send(payload, 0) + sleep 0.01 + end + end + + context "receiving a flatline" do + let(:server) { ::Protobuf::Rpc::DynamicDiscovery::Server.new(:uuid => 'flatline', :address => '127.0.0.1') } + let(:beacon) { ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( + :server => server, + :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE + )} + let(:payload) { beacon.serialize_to_string } + + it "removes a listing" do + instance.should_receive(:remove_listing_for).with(server) + instance.should_receive(:remove_expired_listings) + socket.send(payload, 0) + sleep 0.01 + end + end + end +end diff --git a/spec/support/server.rb b/spec/support/server.rb index d961a0c9..7fa49c2d 100644 --- a/spec/support/server.rb +++ b/spec/support/server.rb @@ -53,8 +53,10 @@ def initialize(options = {}) def start case - when @options.server == Protobuf::Rpc::Evented::Server then start_em_server - when @options.server == Protobuf::Rpc::Zmq::Server then start_zmq_server + when @options.server == Protobuf::Rpc::Evented::Server + start_em_server + when @options.server == Protobuf::Rpc::Zmq::Server + start_zmq_server else start_socket_server end @@ -67,18 +69,25 @@ def start end def start_em_server - @server_handle = EventMachine.start_server(@options.host, @options.port, StubProtobufServerFactory.build(@options.delay)) + @server_handle = EventMachine.start_server( + @options.host, + @options.port, + StubProtobufServerFactory.build(@options.delay) + ) end def start_socket_server - @sock_server = Thread.new(@options) { |opt| Protobuf::Rpc::SocketRunner.run(opt) } - @sock_server.abort_on_exception = true # Set for testing purposes - Thread.pass until Protobuf::Rpc::Socket::Server.running? + @sock_runner = ::Protobuf::Rpc::SocketRunner.new(opt) + @sock_thread = Thread.new(@sock_runner) { |runner| runner.run } + @sock_thread.abort_on_exception = true # Set for testing purposes + Thread.pass until @sock_runner.running? end def start_zmq_server - @zmq_server = Thread.new(@options) { |opt| Protobuf::Rpc::ZmqRunner.run(opt) } - Thread.pass until Protobuf::Rpc::Zmq::Server.running? + @zmq_runnger = ::Protobuf::Rpc::ZmqRunner.new(opt) + @zmq_thread = Thread.new(@zmq_runner) { |runner| runner.run } + @zmq_thread.abort_on_exception = true # Set for testing purposes + Thread.pass until @zmq_runner.running? end def stop @@ -86,11 +95,11 @@ def stop when @options.server == Protobuf::Rpc::Evented::Server then EventMachine.stop_server(@server_handle) if @server_handle when @options.server == Protobuf::Rpc::Zmq::Server then - Protobuf::Rpc::ZmqRunner.stop - @zmq_server.join if @zmq_server + @zmq_runner.try :stop + @zmq_thread.join if @zmq_thread else - Protobuf::Rpc::SocketRunner.stop - @sock_server.join if @sock_server + @sock_runner.stop + @sock_thread.join if @sock_thread end @running = false diff --git a/spec/support/test/resource.pb.rb b/spec/support/test/resource.pb.rb index 1582721f..52514c74 100644 --- a/spec/support/test/resource.pb.rb +++ b/spec/support/test/resource.pb.rb @@ -21,6 +21,7 @@ class StatusType < ::Protobuf::Enum # Message Classes # class ResourceFindRequest < ::Protobuf::Message; end + class ResourceSleepRequest < ::Protobuf::Message; end class Resource < ::Protobuf::Message; end class Searchable < ::Protobuf::Message class SearchType < ::Protobuf::Enum @@ -44,6 +45,10 @@ class ResourceFindRequest optional ::Protobuf::Field::BoolField, :active, 2 end + class ResourceSleepRequest + optional ::Protobuf::Field::Int32Field, :sleep, 1 + end + class Resource required ::Protobuf::Field::StringField, :name, 1 optional ::Protobuf::Field::Int64Field, :date_created, 2 @@ -93,6 +98,7 @@ class NestedLevelOne # class ResourceService < ::Protobuf::Rpc::Service rpc :find, ::Test::ResourceFindRequest, ::Test::Resource + rpc :find_with_sleep, ::Test::ResourceSleepRequest, ::Test::Resource end end diff --git a/spec/support/test/resource.proto b/spec/support/test/resource.proto index c7b1a420..4f62d5bc 100644 --- a/spec/support/test/resource.proto +++ b/spec/support/test/resource.proto @@ -12,6 +12,10 @@ message ResourceFindRequest { optional bool active = 2; } +message ResourceSleepRequest { + optional int32 sleep = 1; +} + message Resource { extensions 100 to max; @@ -78,4 +82,5 @@ extend Nested.NestedLevelOne { service ResourceService { rpc Find (ResourceFindRequest) returns (Resource); + rpc FindWithSleep (ResourceSleepRequest) returns (Resource); } diff --git a/spec/support/test/resource_service.rb b/spec/support/test/resource_service.rb index f1dab010..33072ba4 100644 --- a/spec/support/test/resource_service.rb +++ b/spec/support/test/resource_service.rb @@ -10,5 +10,12 @@ def find response.status = request.active ? 1 : 0 end + # request -> Test::ResourceSleepRequest + # response -> Test::Resource + def find_with_sleep + sleep (request.sleep || 1) + response.name = 'Request should have timed out' + end + end end