Kouhei Sutou
null+****@clear*****
Mon Nov 25 00:03:44 JST 2013
Kouhei Sutou 2013-11-25 00:03:44 +0900 (Mon, 25 Nov 2013) New Revision: 65ee7e9724f073e773a33d333e02e20574f660da https://github.com/droonga/fluent-plugin-droonga/commit/65ee7e9724f073e773a33d333e02e20574f660da Message: Use forwarder in executor Modified files: lib/droonga/executor.rb lib/droonga/forwarder.rb Modified: lib/droonga/executor.rb (+7 -84) =================================================================== --- lib/droonga/executor.rb 2013-11-24 23:53:30 +0900 (f21d629) +++ lib/droonga/executor.rb 2013-11-25 00:03:44 +0900 (d483ea1) @@ -15,11 +15,9 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "fluent-logger" -require "fluent/logger/fluent_logger" require "groonga" -require "droonga/plugin_loader" +require "droonga/forwarder" require "droonga/dispatcher" require "droonga/distributor" @@ -28,7 +26,6 @@ module Droonga attr_reader :context, :envelope, :name def initialize(options={}) - @outputs = {} @options = options @name = options[:name] @database_name = options[:database] @@ -40,9 +37,7 @@ module Droonga def shutdown $log.trace("#{log_tag}: shutdown: start") @distributor.shutdown - @outputs.each do |dest, output| - output[:logger].close if output[:logger] - end + @forwarder.shutdown if @database @database.close @context.close @@ -103,7 +98,10 @@ module Droonga synchronous = destination["synchronous"] end if receiver - output(receiver, body, command, arguments) + @forwarder.forward(envelope, body, + "type" => command, + "to" => receiver, + "arguments" => arguments) else if command == "dispatcher" @dispatcher.handle(body, arguments) @@ -121,51 +119,6 @@ module Droonga route.is_a?(String) || route.is_a?(Hash) end - def output(receiver, body, command, arguments) - $log.trace("#{log_tag}: output: start") - unless receiver.is_a?(String) && command.is_a?(String) - $log.trace("#{log_tag}: output: abort: invalid argument", - :receiver => receiver, - :command => command) - return - end - unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ - raise "format: hostname:port/tag(?params)" - end - host = $1 - port = $2 - tag = $3 - params = $4 - output = get_output(host, port, params) - unless output - $log.trace("#{log_tag}: output: abort: no output", - :host => host, - :port => port, - :params => params) - return - end - if command =~ /\.result$/ - message = { - inReplyTo: envelope["id"], - statusCode: 200, - type: command, - body: body - } - else - message = envelope.merge( - body: body, - type: command, - arguments: arguments - ) - end - output_tag = "#{tag}.message" - log_info = "<#{receiver}>:<#{output_tag}>" - $log.trace("#{log_tag}: output: post: start: #{log_info}") - output.post(output_tag, message) - $log.trace("#{log_tag}: output: post: done: #{log_info}") - $log.trace("#{log_tag}: output: done") - end - def parse_message(message) @message = message # TODO: remove me tag, time, record = message @@ -190,37 +143,7 @@ module Droonga end @dispatcher = Dispatcher.new(self, name) @distributor = Distributor.new(@dispatcher, @options) - end - - def get_output(host, port, params) - host_port = "#{host}:#{port}" - @outputs[host_port] ||= {} - output = @outputs[host_port] - - has_connection_id = (not params.nil? \ - and params =~ /[\?&;]connection_id=([^&;]+)/) - if output[:logger].nil? or has_connection_id - connection_id = $1 - if not has_connection_id or output[:connection_id] != connection_id - output[:connection_id] = connection_id - logger = create_logger(:host => host, :port => port.to_i) - # output[:logger] should be closed if it exists beforehand? - output[:logger] = logger - end - end - - has_client_session_id = (not params.nil? \ - and params =~ /[\?&;]client_session_id=([^&;]+)/) - if has_client_session_id - client_session_id = $1 - # some generic way to handle client_session_id is expected - end - - output[:logger] - end - - def create_logger(options) - Fluent::Logger::FluentLogger.new(nil, options) + @forwarder = Forwarder.new end def log_tag Modified: lib/droonga/forwarder.rb (+0 -1) =================================================================== --- lib/droonga/forwarder.rb 2013-11-24 23:53:30 +0900 (5ba0d2f) +++ lib/droonga/forwarder.rb 2013-11-25 00:03:44 +0900 (1769664) @@ -37,7 +37,6 @@ module Droonga command = destination["type"] receiver = destination["to"] arguments = destination["arguments"] - synchronous = destination["synchronous"] output(receiver, envelope, body, command, arguments) $log.trace("#{log_tag}: post: done") end -------------- next part -------------- HTML����������������������������... 다운로드