[Groonga-commit] droonga/fluent-plugin-droonga at 0340752 [master] Add Executor class to support synchronous execution.

Back to archive index

Daijiro MORI null+****@clear*****
Wed Aug 21 12:31:31 JST 2013


Daijiro MORI	2013-08-21 12:31:31 +0900 (Wed, 21 Aug 2013)

  New Revision: 03407526accdb79d0350fdc8e23b8b710bac9ade
  https://github.com/droonga/fluent-plugin-droonga/commit/03407526accdb79d0350fdc8e23b8b710bac9ade

  Message:
    Add Executor class to support synchronous execution.

  Copied files:
    lib/droonga/executor.rb
      (from lib/droonga/worker.rb)
  Modified files:
    lib/droonga/engine.rb
    lib/droonga/proxy.rb
    lib/droonga/server.rb
    lib/droonga/worker.rb

  Modified: lib/droonga/engine.rb (+27 -12)
===================================================================
--- lib/droonga/engine.rb    2013-08-20 14:09:55 +0900 (53bc70c)
+++ lib/droonga/engine.rb    2013-08-21 12:31:31 +0900 (f913f48)
@@ -21,6 +21,7 @@ require "cool.io"
 
 require "droonga/server"
 require "droonga/worker"
+require "droonga/executor"
 
 module Droonga
   class Engine
@@ -29,6 +30,7 @@ module Droonga
       :queue_name => "DroongaQueue",
       :handlers   => ["proxy"],
       :n_workers  => 1,
+      :with_server  => false
     }
 
     def initialize(options={})
@@ -36,26 +38,38 @@ module Droonga
     end
 
     def start
-      @message_input, @message_output = IO.pipe
-      @message_input.sync = true
-      @message_output.sync = true
-      start_supervisor
-      start_emitter
+      if @options[:n_workers] > 0 || @options[:with_server]
+        @message_input, @message_output = IO.pipe
+        @message_input.sync = true
+        @message_output.sync = true
+        start_supervisor
+      end
+      if @options[:with_server]
+        start_emitter
+      else
+        @executor = Executor.new(@options)
+      end
     end
 
     def shutdown
       $log.trace("engine: shutdown: start")
-      shutdown_emitter
-      shutdown_supervisor
-      @message_input.close unless @message_input.closed?
-      @message_output.close unless @message_output.closed?
+      shutdown_emitter if @emitter
+      if @supervisor
+        shutdown_supervisor
+        @message_input.close unless @message_input.closed?
+        @message_output.close unless @message_output.closed?
+      end
       $log.trace("engine: shutdown: done")
     end
 
     def emit(tag, time, record)
       $log.trace("tag: <#{tag}>")
-      @emitter.write(MessagePack.pack([tag, time, record]))
-      @loop_breaker.signal
+      if @executor
+        @executor.dispatch(tag, time, record)
+      else
+        @emitter.write(MessagePack.pack([tag, time, record]))
+        @loop_breaker.signal
+      end
     end
 
     private
@@ -85,7 +99,8 @@ module Droonga
     end
 
     def start_supervisor
-      @supervisor = ServerEngine::Supervisor.new(Server, Worker) do
+      server = @options[:with_server] ? Server : nil
+      @supervisor = ServerEngine::Supervisor.new(server, Worker) do
         force_options = {
           :worker_type   => "process",
           :workers       => @options[:n_workers],

  Copied: lib/droonga/executor.rb (+18 -61) 78%
===================================================================
--- lib/droonga/worker.rb    2013-08-20 14:09:55 +0900 (91287f1)
+++ lib/droonga/executor.rb    2013-08-21 12:31:31 +0900 (3d2720b)
@@ -26,51 +26,21 @@ require "droonga/catalog"
 require "droonga/proxy"
 
 module Droonga
-  module Worker
+  class Executor
     attr_reader :context, :envelope, :name
 
-    def initialize
-      @handlers = []
-      @outputs = {}
-      @name = config[:name]
-      @database_name = config[:database] || "droonga/db"
-      @queue_name = config[:queue_name] || "DroongaQueue"
-      @handler_names = config[:handlers] || ["proxy"]
-      load_handlers
-      prepare
-    end
-
-    def run
-      $log.trace("worker: run: start")
-      @queue = @context[@queue_name]
-      @running = true
-      while @running
-        $log.trace("worker: run: pull_message: start")
-        message = pull_message
-        $log.trace("worker: run: pull_message: done")
-        next unless message
-        body, command, arguments = parse_message(message)
-        handler = find_handler(command)
-        handler.handle(command, body, *arguments) if handler
-      end
-      @handlers.each do |handler|
-        handler.shutdown
-      end
-      @outputs.each do |dest, output|
-        output[:logger].close if output[:logger]
-      end
-      @queue = nil
-      @database.close
-      @context.close
-      @database = @context = nil
-      $log.trace("worker: run: done")
-    end
-
-    def stop
-      $log.trace("worker: stop: start")
-      @running = false
-      $log.trace("worker: stop: done")
-    end
+    def initialize(options={})
+       @handlers = []
+       @outputs = {}
+       @name = options[:name]
+       @database_name = options[:database] || "droonga/db"
+       @queue_name = options[:queue_name] || "DroongaQueue"
+       Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
+       @handler_names = options[:handlers] || ["proxy"]
+       load_handlers
+       @pool_size = options[:pool_size] || 1
+       prepare
+     end
 
     def add_handler(name)
       plugin = HandlerPlugin.new(name)
@@ -81,29 +51,16 @@ module Droonga
       envelope["via"].push(route)
     end
 
+    def dispatch(*message)
+      body, type, arguments = parse_message(message)
+      post_or_push(message, body, "type" => type, "arguments" => arguments)
+    end
+
     def post(body, destination=nil)
       post_or_push(nil, body, destination)
     end
 
     private
-    def shutdown_workers
-      @pool.each do |pid|
-        Process.kill(:TERM, pid)
-      end
-      queue = @context[@queue_name]
-      3.times do |i|
-        break if****@pool*****?
-        queue.unblock
-        @pool.reject! do |pid|
-          not Process.waitpid(pid, Process::WNOHANG).nil?
-        end
-        sleep(i ** 2 * 0.1)
-      end
-      @pool.each do |pid|
-        Process.kill(:KILL, pid)
-      end
-    end
-
     def post_or_push(message, body, destination)
       route = nil
       unless destination

  Modified: lib/droonga/proxy.rb (+1 -0)
===================================================================
--- lib/droonga/proxy.rb    2013-08-20 14:09:55 +0900 (6527e43)
+++ lib/droonga/proxy.rb    2013-08-21 12:31:31 +0900 (8138eee)
@@ -245,6 +245,7 @@ module Droonga
           if command
             # TODO: should be controllable for each command respectively.
             synchronous = !n_of_expects.zero?
+            # TODO: check if asynchronous execution is available.
             message = {
               "task"=>task,
               "name"=>name,

  Modified: lib/droonga/server.rb (+3 -5)
===================================================================
--- lib/droonga/server.rb    2013-08-20 14:09:55 +0900 (7119de5)
+++ lib/droonga/server.rb    2013-08-21 12:31:31 +0900 (1b8186f)
@@ -20,6 +20,7 @@ require "cool.io"
 require "groonga"
 
 require "droonga/job_queue"
+require "droonga/executor"
 
 module Droonga
   module Server
@@ -70,6 +71,7 @@ module Droonga
       @database_name = config[:database] || "droonga/db"
       @queue_name = config[:queue_name] || "DroongaQueue"
       Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
+      @executor = Executor.new(config)
     end
 
     def before_run
@@ -80,11 +82,7 @@ module Droonga
       @receiver_thread = Thread.new do
         @receiver.run do |message|
           $log.trace("received: start")
-          packed_message = message.to_msgpack
-          queue = @context[@queue_name]
-          queue.push do |record|
-            record.message = packed_message
-          end
+          @executor.dispatch(*message)
           $log.trace("received: done")
         end
       end

  Modified: lib/droonga/worker.rb (+1 -0)
===================================================================
--- lib/droonga/worker.rb    2013-08-20 14:09:55 +0900 (91287f1)
+++ lib/droonga/worker.rb    2013-08-21 12:31:31 +0900 (47a7626)
@@ -36,6 +36,7 @@ module Droonga
       @database_name = config[:database] || "droonga/db"
       @queue_name = config[:queue_name] || "DroongaQueue"
       @handler_names = config[:handlers] || ["proxy"]
+      @pool_size = config[:n_workers]
       load_handlers
       prepare
     end
-------------- next part --------------
HTML����������������������������...
다운로드 



More information about the Groonga-commit mailing list
Back to archive index