[Groonga-commit] droonga/fluent-plugin-droonga at 65ee7e9 [master] Use forwarder in executor

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Nov 25 00:03:44 JST 2013

Kouhei Sutou	2013-11-25 00:03:44 +0900 (Mon, 25 Nov 2013)

  New Revision: 65ee7e9724f073e773a33d333e02e20574f660da

    Use forwarder in executor

  Modified files:

  Modified: lib/droonga/executor.rb (+7 -84)
--- lib/droonga/executor.rb    2013-11-24 23:53:30 +0900 (f21d629)
+++ lib/droonga/executor.rb    2013-11-25 00:03:44 +0900 (d483ea1)
@@ -15,11 +15,9 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-require "fluent-logger"
-require "fluent/logger/fluent_logger"
 require "groonga"
-require "droonga/plugin_loader"
+require "droonga/forwarder"
 require "droonga/dispatcher"
 require "droonga/distributor"
@@ -28,7 +26,6 @@ module Droonga
     attr_reader :context, :envelope, :name
     def initialize(options={})
-      @outputs = {}
       @options = options
       @name = options[:name]
       @database_name = options[:database]
@@ -40,9 +37,7 @@ module Droonga
     def shutdown
       $log.trace("#{log_tag}: shutdown: start")
-      @outputs.each do |dest, output|
-        output[:logger].close if output[:logger]
-      end
+      @forwarder.shutdown
       if @database
@@ -103,7 +98,10 @@ module Droonga
         synchronous = destination["synchronous"]
       if receiver
-        output(receiver, body, command, arguments)
+        @forwarder.forward(envelope, body,
+                           "type" => command,
+                           "to" => receiver,
+                           "arguments" => arguments)
         if command == "dispatcher"
           @dispatcher.handle(body, arguments)
@@ -121,51 +119,6 @@ module Droonga
       route.is_a?(String) || route.is_a?(Hash)
-    def output(receiver, body, command, arguments)
-      $log.trace("#{log_tag}: output: start")
-      unless receiver.is_a?(String) && command.is_a?(String)
-        $log.trace("#{log_tag}: output: abort: invalid argument",
-                   :receiver => receiver,
-                   :command  => command)
-        return
-      end
-      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
-        raise "format: hostname:port/tag(?params)"
-      end
-      host = $1
-      port = $2
-      tag  = $3
-      params = $4
-      output = get_output(host, port, params)
-      unless output
-        $log.trace("#{log_tag}: output: abort: no output",
-                   :host   => host,
-                   :port   => port,
-                   :params => params)
-        return
-      end
-      if command =~ /\.result$/
-        message = {
-          inReplyTo: envelope["id"],
-          statusCode: 200,
-          type: command,
-          body: body
-        }
-      else
-        message = envelope.merge(
-          body: body,
-          type: command,
-          arguments: arguments
-        )
-      end
-      output_tag = "#{tag}.message"
-      log_info = "<#{receiver}>:<#{output_tag}>"
-      $log.trace("#{log_tag}: output: post: start: #{log_info}")
-      output.post(output_tag, message)
-      $log.trace("#{log_tag}: output: post: done: #{log_info}")
-      $log.trace("#{log_tag}: output: done")
-    end
     def parse_message(message)
       @message = message # TODO: remove me
       tag, time, record = message
@@ -190,37 +143,7 @@ module Droonga
       @dispatcher = Dispatcher.new(self, name)
       @distributor = Distributor.new(@dispatcher, @options)
-    end
-    def get_output(host, port, params)
-      host_port = "#{host}:#{port}"
-      @outputs[host_port] ||= {}
-      output = @outputs[host_port]
-      has_connection_id = (not params.nil? \
-                           and params =~ /[\?&;]connection_id=([^&;]+)/)
-      if output[:logger].nil? or has_connection_id
-        connection_id = $1
-        if not has_connection_id or output[:connection_id] != connection_id
-          output[:connection_id] = connection_id
-          logger = create_logger(:host => host, :port => port.to_i)
-          # output[:logger] should be closed if it exists beforehand?
-          output[:logger] = logger
-        end
-      end
-      has_client_session_id = (not params.nil? \
-                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
-      if has_client_session_id
-        client_session_id = $1
-        # some generic way to handle client_session_id is expected
-      end
-      output[:logger]
-    end
-    def create_logger(options)
-      Fluent::Logger::FluentLogger.new(nil, options)
+      @forwarder = Forwarder.new
     def log_tag

  Modified: lib/droonga/forwarder.rb (+0 -1)
--- lib/droonga/forwarder.rb    2013-11-24 23:53:30 +0900 (5ba0d2f)
+++ lib/droonga/forwarder.rb    2013-11-25 00:03:44 +0900 (1769664)
@@ -37,7 +37,6 @@ module Droonga
       command = destination["type"]
       receiver = destination["to"]
       arguments = destination["arguments"]
-      synchronous = destination["synchronous"]
       output(receiver, envelope, body, command, arguments)
       $log.trace("#{log_tag}: post: done")
-------------- next part --------------

More information about the Groonga-commit mailing list
Back to archive index