Skip to content

Commit

Permalink
Merge pull request #5 from bdewitt/master
Browse files Browse the repository at this point in the history
New server/client API introduced
  • Loading branch information
localshred committed Jan 2, 2012
2 parents f598e5c + 9c15f47 commit 48f8daa
Show file tree
Hide file tree
Showing 39 changed files with 1,227 additions and 611 deletions.
6 changes: 5 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ PATH
specs:
protobuf (1.0.1)
eventmachine (~> 0.12.10)
json_pure

GEM
remote: http://rubygems.org/
specs:
diff-lcs (1.1.2)
diff-lcs (1.1.3)
eventmachine (0.12.10)
eventmachine (0.12.10-java)
json_pure (1.6.3)
rake (0.8.7)
rspec (2.7.0)
rspec-core (~> 2.7.0)
Expand All @@ -20,6 +23,7 @@ GEM
rspec-mocks (2.7.0)

PLATFORMS
java
ruby

DEPENDENCIES
Expand Down
71 changes: 38 additions & 33 deletions bin/rpc_server
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@
require 'optparse'
require 'ostruct'
require 'logger'
require 'protobuf'
require 'protobuf/rpc/server'

[:INT, :QUIT, :TERM].each do |sig|
trap(sig) do
EventMachine.stop_event_loop if EventMachine.reactor_running?
Protobuf::Logger.info 'Shutdown complete'
$stdout.puts 'Shutdown complete'
end
end
require 'protobuf/version'
require 'protobuf/rpc/servers/evented_runner'
require 'protobuf/rpc/servers/socket_runner'

# Default options
server = OpenStruct.new({
:app => nil,
:env => ENV['RPC_SERVER_ENV'] || 'development',
:host => '127.0.0.1',
:port => 9595,
:backlog => 100,
:threshold => 100,
:log => File.expand_path('./protobuf.log'),
:level => ::Logger::INFO,
:runner => Protobuf::Rpc::EventedRunner,
:debug => false
})

Expand All @@ -47,6 +43,23 @@ parser = OptionParser.new do |opts|
opts.on("-v N", "--level=N", Integer, "Log level to use, 0-5 (see http://www.ruby-doc.org/stdlib/libdoc/logger/rdoc/)") do |v|
server.level = v.to_i
end

opts.on("-b N", "--backlog=N", Integer, "Backlog for listening socket when using Socket Server") do |v|
server.backlog = v.to_i
end

opts.on("-t N", "--threshold=N", Integer, "Multi-threaded Socket Server cleanup threshold") do |v|
server.threshold = v.to_i
end

opts.on("-c", "--client_socket", "Socket Mode for client connections (No EventMachine)") do |v|
Protobuf::ConnectorType = "Socket"
end

opts.on("-s", "--socket", "Socket Server Mode (No EventMachine)") do |v|
Protobuf::ServerType = "SocketServer"
server.runner = Protobuf::Rpc::SocketRunner
end

opts.on("-d", "--[no-]debug", "Debug Mode. Override log level to DEBUG.") do |v|
server.debug = v
Expand All @@ -67,49 +80,41 @@ parser = OptionParser.new do |opts|
end
end

parser.parse!
require 'protobuf'

[:INT, :QUIT, :TERM].each do |sig|
trap(sig) do
server.runner.stop
end
end

begin
parser.parse!

if ARGV.empty?
raise 'You must specify an app file to use.'
puts 'You must specify an app file to use.'
puts parser.help
exit
else
server.app = ARGV.pop
raise 'Invalid app file specified (%s).' % server.app unless File.exists?(server.app)
end

# Configure the Protobuf::Logger
Protobuf::Logger.configure :file => server.log, :level => server.debug ? ::Logger::DEBUG : server.level

# Output the server opts
Protobuf::Logger.debug 'Debugging options:'
Protobuf::Logger.debug server.inspect

# Ensure errors thrown within EM are caught and logged appropriately
EventMachine.error_handler do |error|
if error.message == 'no acceptor'
raise 'Failed binding to %s:%d (%s)' % [server.host, server.port, error.message]
else
Protobuf::Logger.error error.message
Protobuf::Logger.error error.backtrace.join("\n")
end
end

# Set the name of the process
$0 = 'rpc_server %s:%d %s' % [server.host, server.port, server.app]

# Require the given application file
require server.app

# Startup and run the rpc server
EM.schedule do
EventMachine.start_server(server.host, server.port, Protobuf::Rpc::Server) && \
Protobuf::Logger.info('RPC Server listening at %s:%d in %s' % [server.host, server.port, server.env])
end

# Join or start the reactor
EM.reactor_running? ? EM.reactor_thread.join : EM.run
server.runner.run(server)
rescue
msg = 'ERROR: RPC Server failed to start. %s' % $!.message
msg = 'ERROR: RPC Server failed to start. %s' % $!.inspect
$stderr.puts msg, *($!.backtrace)
Protobuf::Logger.error msg
Protobuf::Logger.error $!.backtrace.join("\n")
Expand Down
25 changes: 22 additions & 3 deletions lib/protobuf.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
require 'eventmachine'
require 'protobuf/ext/eventmachine'
require 'logger'
require 'socket'
require 'pp'
require 'stringio'

module Protobuf
end

# When setting up a client
unless defined?(Protobuf::ConnectorType) && Protobuf::ConnectorType == "Socket"
Protobuf::ConnectorType = "EventMachine"
require 'eventmachine'
require 'protobuf/ext/eventmachine'
require 'protobuf/rpc/connectors/eventmachine'
end

# For running the rpc_server
unless defined?(Protobuf::ServerType) && Protobuf::ServerType == "SocketServer"
Protobuf::ServerType = "EventedServer"
require 'eventmachine'
require 'protobuf/ext/eventmachine'
require 'protobuf/rpc/servers/evented_server'
end

require 'protobuf/rpc/client'
require 'protobuf/rpc/server'
require 'protobuf/rpc/connectors/socket'
require 'protobuf/rpc/service'
require 'protobuf/rpc/servers/socket_server'
14 changes: 6 additions & 8 deletions lib/protobuf/common/logger.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
require 'logger'

module Protobuf
class Logger < ::Logger

class << self
attr_accessor :file, :level

# One-line file/level configuration
def configure options
def configure(options)
self.file = options[:file] if options[:file]
self.level = options[:level] if options[:level]
end

def configured?
! instance.nil?
end

# Use to reset the instance
def reset_device!
self.file = self.level = @__instance = nil
Expand Down Expand Up @@ -58,7 +52,11 @@ module LogMethods
Protobuf::Logger.__send__(m, *params, &block)
end
end

def self.included(base)
base.extend(LogMethods)
end
end

end
end
end
17 changes: 8 additions & 9 deletions lib/protobuf/compiler/visitors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,14 @@ def required_message_from_proto(proto_file)
end

def create_files(filename, out_dir, file_create)
begin
$: << File.expand_path(out_dir)
Class.new.class_eval(to_s) # check the message
$:.delete File.expand_path(out_dir)
rescue LoadError
puts "Error creating file #{filename}"
puts $!.message
exit 1
end
$: << File.expand_path(out_dir)
Class.new.class_eval(to_s) # check the message
$:.delete File.expand_path(out_dir)
rescue LoadError
puts "Error creating file #{filename}"
puts $!.message
exit 1
else

file = File.basename(filename)
message_module = Util.module_to_path(package.map{|p| p.to_s.capitalize}.join('::'))
Expand Down
6 changes: 2 additions & 4 deletions lib/protobuf/ext/eventmachine.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
require 'fiber'

# Method and concept from em-synchrony
# Method from em-synchrony
# https://github.com/igrigorik/em-synchrony
#
# A convenience method for wrapping EM.run body within
# a Ruby Fiber such that async operations can be transparently
# paused and resumed based on IO scheduling
module EventMachine
def self.fiber_run(blk=nil, tail=nil, &block)
blk ||= block
context = Proc.new{ Fiber.new{ blk.call }.resume }

context = Proc.new{ Fiber.new{ (b = blk || block) and b.call }.resume }
self.run(context, tail)
end
end
4 changes: 1 addition & 3 deletions lib/protobuf/message/message.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
require 'pp'
require 'stringio'
require 'protobuf/descriptor/descriptor'
require 'protobuf/message/decoder'
require 'protobuf/message/encoder'
Expand Down Expand Up @@ -27,7 +25,7 @@ def include_tag?(tag)
end
end

class <<self
class << self
include Protoable

# Reserve field numbers for extensions. Don't use this method directly.
Expand Down
12 changes: 6 additions & 6 deletions lib/protobuf/rpc/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ class Buffer

MODES = [:read, :write]

def initialize mode=:read, data=''
def initialize(mode=:read, data='')
@data = data.is_a?(Protobuf::Message) ? data.serialize_to_string : data.to_s
@flush = false
self.mode = mode
end

def mode= mode
if MODES.include? mode
def mode=(mode)
if MODES.include?(mode)
@mode = mode
else
@mode = :read
end
end

def write force_mode=true
def write(force_mode=true)
if force_mode and reading?
mode = :write
elsif not force_mode and reading?
Expand All @@ -32,7 +32,7 @@ def write force_mode=true
'%d-%s' % [@size, @data]
end

def << data
def <<(data)
@data << data
if reading?
get_data_size
Expand Down Expand Up @@ -71,4 +71,4 @@ def check_for_flush

end
end
end
end
Loading

0 comments on commit 48f8daa

Please sign in to comment.