[Groonga-commit] droonga/fluent-plugin-droonga at 0422504 [master] Use serverengine gem to manage workers

Back to archive index

Kouhei Sutou null+****@clear*****
Tue Aug 20 14:09:55 JST 2013


Kouhei Sutou	2013-08-20 14:09:55 +0900 (Tue, 20 Aug 2013)

  New Revision: 04225040c57c6b67f551901dd3465dc1f967550c
  https://github.com/droonga/fluent-plugin-droonga/commit/04225040c57c6b67f551901dd3465dc1f967550c

  Message:
    Use serverengine gem to manage workers

  Added files:
    lib/droonga/engine.rb
    lib/droonga/server.rb
  Modified files:
    fluent-plugin-droonga.gemspec
    lib/droonga/worker.rb
    lib/fluent/plugin/out_droonga.rb

  Modified: fluent-plugin-droonga.gemspec (+1 -0)
===================================================================
--- fluent-plugin-droonga.gemspec    2013-08-19 16:08:45 +0900 (09b1d18)
+++ fluent-plugin-droonga.gemspec    2013-08-20 14:09:55 +0900 (1b89081)
@@ -31,6 +31,7 @@ Gem::Specification.new do |gem|
   gem.add_dependency "rroonga", ">= 3.0.3"
   gem.add_dependency "groonga-command", ">= 1.0.3"
   gem.add_dependency "fluent-logger"
+  gem.add_dependency "serverengine"
   gem.add_development_dependency "rake"
   gem.add_development_dependency "bundler"
   gem.add_development_dependency "test-unit"

  Added: lib/droonga/engine.rb (+111 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/engine.rb    2013-08-20 14:09:55 +0900 (53bc70c)
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 droonga project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "serverengine"
+require "msgpack"
+require "cool.io"
+
+require "droonga/server"
+require "droonga/worker"
+
+module Droonga
+  class Engine
+    DEFAULT_OPTIONS = {
+      :database   => "droonga/db",
+      :queue_name => "DroongaQueue",
+      :handlers   => ["proxy"],
+      :n_workers  => 1,
+    }
+
+    def initialize(options={})
+      @options = DEFAULT_OPTIONS.merge(options)
+    end
+
+    def start
+      @message_input, @message_output = IO.pipe
+      @message_input.sync = true
+      @message_output.sync = true
+      start_supervisor
+      start_emitter
+    end
+
+    def shutdown
+      $log.trace("engine: shutdown: start")
+      shutdown_emitter
+      shutdown_supervisor
+      @message_input.close unless @message_input.closed?
+      @message_output.close unless @message_output.closed?
+      $log.trace("engine: shutdown: done")
+    end
+
+    def emit(tag, time, record)
+      $log.trace("tag: <#{tag}>")
+      @emitter.write(MessagePack.pack([tag, time, record]))
+      @loop_breaker.signal
+    end
+
+    private
+    def start_emitter
+      @loop = Coolio::Loop.new
+      @emitter = Coolio::IO.new(@message_output)
+      @emitter.on_write_complete do
+        $log.trace("emitter: written")
+      end
+      @emitter.attach(@loop)
+      @loop_breaker = Coolio::AsyncWatcher.new
+      @loop_breaker.attach(@loop)
+      @emitter_thread = Thread.new do
+        @loop.run
+      end
+    end
+
+    def shutdown_emitter
+      $log.trace("emitter: shutdown: start")
+      @emitter.close
+      $log.trace("emitter: shutdown: emitter: closed")
+      @loop.stop
+      @loop_breaker.signal
+      $log.trace("emitter: shutdown: loop: stopped")
+      @emitter_thread.join
+      $log.trace("emitter: shutdown: done")
+    end
+
+    def start_supervisor
+      @supervisor = ServerEngine::Supervisor.new(Server, Worker) do
+        force_options = {
+          :worker_type   => "process",
+          :workers       => @options[:n_workers],
+          :message_input => @message_input,
+          :log_level     => $log.level,
+        }
+        @options.merge(force_options)
+      end
+      @supervisor.logger = $log
+      @supervisor_thread = Thread.new do
+        @supervisor.main
+      end
+    end
+
+    def shutdown_supervisor
+      $log.trace("supervisor: shutdown: start")
+      @supervisor.stop(true)
+      $log.trace("supervisor: shutdown: stopped")
+      @supervisor_thread.join
+      $log.trace("supervisor: shutdown: done")
+    end
+  end
+end

  Added: lib/droonga/server.rb (+128 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/server.rb    2013-08-20 14:09:55 +0900 (7119de5)
@@ -0,0 +1,128 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 droonga project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "msgpack"
+require "cool.io"
+require "groonga"
+
+require "droonga/job_queue"
+
+module Droonga
+  module Server
+    class Receiver
+      def initialize(input)
+        @input = input
+      end
+
+      def run
+        $log.trace
+        @io = Coolio::IO.new(@input)
+        unpacker = MessagePack::Unpacker.new
+        @io.on_read do |data|
+          $log.trace("receiver: received: <#{data.bytesize}>")
+          unpacker.feed_each(data) do |message|
+            yield message
+          end
+        end
+        @loop = Coolio::Loop.new
+        @loop.attach(@io)
+        @loop_breaker = Coolio::AsyncWatcher.new
+        @loop.attach(@loop_breaker)
+        @running = true
+        @loop.run
+      end
+
+      def stop
+        unless @running
+          $log.trace("receiver: stop: not needed")
+          return
+        end
+
+        $log.trace("receiver: stop: start")
+        @io.close
+        $log.trace("receiver: stop: closed")
+        @loop.stop
+        @running = false
+        @loop_breaker.signal
+        $log.trace("receiver: stop: done")
+      end
+    end
+
+    def initialize
+      super
+      @name = config[:name]
+      @context = Groonga::Context.new
+      @message_input = config[:message_input]
+      @database_name = config[:database] || "droonga/db"
+      @queue_name = config[:queue_name] || "DroongaQueue"
+      Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
+    end
+
+    def before_run
+      @database =****@conte*****_database(@database_name)
+      @context.encoding = :none
+
+      @receiver = Receiver.new(@message_input)
+      @receiver_thread = Thread.new do
+        @receiver.run do |message|
+          $log.trace("received: start")
+          packed_message = message.to_msgpack
+          queue = @context[@queue_name]
+          queue.push do |record|
+            record.message = packed_message
+          end
+          $log.trace("received: done")
+        end
+      end
+    end
+
+    def after_run
+      $log.trace("server: after_run: start")
+
+      $log.trace("server: after_run: receiver: start")
+      @receiver_thread.join
+      $log.trace("server: after_run: receiver: done")
+
+      $log.trace("server: after_run: groonga: start")
+      @database.close
+      @context.close
+      @database = @context = nil
+      $log.trace("server: after_run: groonga: done")
+
+      $log.trace("server: after_run: done")
+    end
+
+    def stop(stop_graceful)
+      $log.trace("server: stop: start")
+
+      $log.trace("server: stop: receiver: stop: start")
+      @receiver.stop
+      $log.trace("server: stop: receiver: stop: done")
+
+      $log.trace("server: stop: queue: unblock: start")
+      queue = @context[@queue_name]
+      3.times do |i|
+        super
+        queue.unblock
+        sleep(i ** 2 * 0.1)
+      end
+      $log.trace("server: stop: queue: unblock: done")
+
+      $log.trace("server: stop: done")
+    end
+  end
+end

  Modified: lib/droonga/worker.rb (+25 -54)
===================================================================
--- lib/droonga/worker.rb    2013-08-19 16:08:45 +0900 (ae9110a)
+++ lib/droonga/worker.rb    2013-08-20 14:09:55 +0900 (91287f1)
@@ -26,40 +26,50 @@ require "droonga/catalog"
 require "droonga/proxy"
 
 module Droonga
-  class Worker
+  module Worker
     attr_reader :context, :envelope, :name
 
-    def initialize(options={})
-      @pool = []
+    def initialize
       @handlers = []
       @outputs = {}
-      @name = options[:name]
-      @database_name = options[:database] || "droonga/db"
-      @queue_name = options[:queue_name] || "DroongaQueue"
-      Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
-      @handler_names = options[:handlers] || ["proxy"]
+      @name = config[:name]
+      @database_name = config[:database] || "droonga/db"
+      @queue_name = config[:queue_name] || "DroongaQueue"
+      @handler_names = config[:handlers] || ["proxy"]
       load_handlers
-      @pool_size = options[:pool_size] || 1
-      @pool = spawn
       prepare
     end
 
-    def shutdown
-      shutdown_workers
+    def run
+      $log.trace("worker: run: start")
+      @queue = @context[@queue_name]
+      @running = true
+      while @running
+        $log.trace("worker: run: pull_message: start")
+        message = pull_message
+        $log.trace("worker: run: pull_message: done")
+        next unless message
+        body, command, arguments = parse_message(message)
+        handler = find_handler(command)
+        handler.handle(command, body, *arguments) if handler
+      end
       @handlers.each do |handler|
         handler.shutdown
       end
       @outputs.each do |dest, output|
         output[:logger].close if output[:logger]
       end
+      @queue = nil
       @database.close
       @context.close
       @database = @context = nil
+      $log.trace("worker: run: done")
     end
 
-    def dispatch(*message)
-      body, type, arguments = parse_message(message)
-      post_or_push(message, body, "type" => type, "arguments" => arguments)
+    def stop
+      $log.trace("worker: stop: start")
+      @running = false
+      $log.trace("worker: stop: done")
     end
 
     def add_handler(name)
@@ -192,9 +202,7 @@ module Droonga
 
     def pull_message
       packed_message = nil
-      @status = :IDLE
       @queue.pull do |record|
-        @status = :BUSY
         if record
           packed_message = record.message
           record.delete
@@ -204,43 +212,6 @@ module Droonga
       MessagePack.unpack(packed_message)
     end
 
-    def start
-      @finish = false
-      @status = :IDLE
-      # TODO: doesn't work
-      Signal.trap(:TERM) do
-        @finish = true
-        exit! 0 if @status == :IDLE
-      end
-      @queue = @context[@queue_name]
-      while !@finish
-        message = pull_message
-        next unless message
-        body, command, arguments = parse_message(message)
-        handler = find_handler(command)
-        handler.handle(command, body, *arguments) if handler
-      end
-    end
-
-    def spawn
-      pool = []
-      @pool_size.times do
-        pid = Process.fork
-        if pid
-          pool << pid
-          next
-        end
-        # child process
-        begin
-          prepare
-          start
-          shutdown
-          exit! 0
-        end
-      end
-      pool
-    end
-
     def load_handlers
       @handler_names.each do |handler_name|
         plugin = Droonga::Plugin.new("handler", handler_name)

  Modified: lib/fluent/plugin/out_droonga.rb (+6 -5)
===================================================================
--- lib/fluent/plugin/out_droonga.rb    2013-08-19 16:08:45 +0900 (5b985a1)
+++ lib/fluent/plugin/out_droonga.rb    2013-08-20 14:09:55 +0900 (f89205e)
@@ -15,7 +15,7 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/worker"
+require "droonga/engine"
 
 module Fluent
   class DroongaOutput < Output
@@ -31,21 +31,22 @@ module Fluent
 
     def start
       super
-      @worker = Droonga::Worker.new(:database => @database,
+      @engine = Droonga::Engine.new(:database => @database,
                                     :queue_name => @queue_name,
-                                    :pool_size => @n_workers,
+                                    :n_workers => @n_workers,
                                     :handlers => @handlers,
                                     :name => @name)
+      @engine.start
     end
 
     def shutdown
+      @engine.shutdown
       super
-      @worker.shutdown
     end
 
     def emit(tag, es, chain)
       es.each do |time, record|
-        @worker.dispatch(tag, time, record)
+        @engine.emit(tag, time, record)
       end
       chain.next
     end
-------------- next part --------------
HTML����������������������������...
다운로드 



More information about the Groonga-commit mailing list
Back to archive index