[Groonga-commit] droonga/droonga-engine at dbb241c [master] Extract droonga-engine-service implementation code as a class

Back to archive index

Kouhei Sutou null+****@clear*****
Tue May 27 13:04:59 JST 2014

Kouhei Sutou	2014-05-27 13:04:59 +0900 (Tue, 27 May 2014)

  New Revision: dbb241c53f61d9ef1d7b59e253f3d3bb105d2e97

    Extract droonga-engine-service implementation code as a class

  Added files:
  Modified files:

  Modified: bin/droonga-engine (+1 -1)
--- bin/droonga-engine    2014-05-27 12:54:12 +0900 (c4abf44)
+++ bin/droonga-engine    2014-05-27 13:04:59 +0900 (a4c9041)
@@ -17,4 +17,4 @@
 require "droonga/command/droonga_engine"

  Modified: bin/droonga-engine-service (+2 -2)
--- bin/droonga-engine-service    2014-05-27 12:54:12 +0900 (b1fdeaf)
+++ bin/droonga-engine-service    2014-05-27 13:04:59 +0900 (a1d9a48)
@@ -15,6 +15,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-require "droonga/command/droonga_engine"
+require "droonga/command/droonga_engine_service"

  Modified: lib/droonga/command/droonga_engine.rb (+73 -321)
--- lib/droonga/command/droonga_engine.rb    2014-05-27 12:54:12 +0900 (378a08b)
+++ lib/droonga/command/droonga_engine.rb    2014-05-27 13:04:59 +0900 (23fcd53)
@@ -17,20 +17,85 @@ require "optparse"
 require "socket"
 require "ipaddr"
 require "fileutils"
-require "pathname"
+require "coolio"
 require "droonga/path"
-require "droonga/engine"
 require "droonga/serf"
-require "droonga/event_loop"
-require "droonga/fluent_message_receiver"
-require "droonga/internal_fluent_message_receiver"
-require "droonga/plugin_loader"
 module Droonga
   module Command
-    module DroongaEngine
-      class SupervisorConfiguration
+    class DroongaEngine
+      class << self
+        def run(command_line_arguments)
+          new.run(command_line_arguments)
+        end
+      end
+      def initialize
+        @configuration = Configuration.new
+        @log_output = nil
+      end
+      def run(command_line_arguments)
+        parse_command_line_arguments!(command_line_arguments)
+        ensure_path
+        if****@confi*****?
+          Process.daemon
+        end
+        open_log_file do
+          write_pid_file do
+            run_main_loop
+          end
+        end
+      end
+      private
+      def parse_command_line_arguments!(command_line_arguments)
+        parser = OptionParser.new
+        @configuration.add_command_line_options(parser)
+        parser.parse!(command_line_arguments)
+      end
+      def ensure_path
+        Droonga::Path.base
+      end
+      def run_main_loop
+        main_loop = MainLoop.new(@configuration)
+        main_loop.run
+      end
+      def open_log_file
+        if****@confi*****_file
+          File.open(@configuration.log_file, "a") do |file|
+            @log_output = file
+            yield
+          end
+        else
+          yield
+        end
+      end
+      def write_pid_file
+        if****@confi*****_file
+          File.open(@configuration.pid_file, "w") do |file|
+            file.puts(Process.pid)
+          end
+          begin
+            yield
+          ensure
+            FileUtils.rm_f(@configuration.pid_file)
+          end
+        else
+          yield
+        end
+      end
+      class Configuration
         DEFAULT_HOST = Socket.gethostname
         DEFAULT_PORT = 10031
@@ -151,77 +216,6 @@ module Droonga
-      class Supervisor
-        class << self
-          def run(command_line_arguments)
-            new.run(command_line_arguments)
-          end
-        end
-        def initialize
-          @configuration = SupervisorConfiguration.new
-          @log_output = nil
-        end
-        def run(command_line_arguments)
-          parse_command_line_arguments!(command_line_arguments)
-          ensure_path
-          if****@confi*****?
-            Process.daemon
-          end
-          open_log_file do
-            write_pid_file do
-              run_main_loop
-            end
-          end
-        end
-        private
-        def parse_command_line_arguments!(command_line_arguments)
-          parser = OptionParser.new
-          @configuration.add_command_line_options(parser)
-          parser.parse!(command_line_arguments)
-        end
-        def ensure_path
-          Droonga::Path.base
-        end
-        def run_main_loop
-          main_loop = MainLoop.new(@configuration)
-          main_loop.run
-        end
-        def open_log_file
-          if****@confi*****_file
-            File.open(@configuration.log_file, "a") do |file|
-              @log_output = file
-              yield
-            end
-          else
-            yield
-          end
-        end
-        def write_pid_file
-          if****@confi*****_file
-            File.open(@configuration.pid_file, "w") do |file|
-              file.puts(Process.pid)
-            end
-            begin
-              yield
-            ensure
-              FileUtils.rm_f(@configuration.pid_file)
-            end
-          else
-            yield
-          end
-        end
-      end
       class MainLoop
         def initialize(configuration)
           @configuration = configuration
@@ -390,248 +384,6 @@ module Droonga
-      class Service
-        class << self
-          def run(command_line_arguments)
-            new.run(command_line_arguments)
-          end
-        end
-        include Loggable
-        def initialize
-          @engine_name = nil
-          @listen_fd = nil
-          @heartbeat_fd = nil
-          @contrtol_read_fd = nil
-          @contrtol_write_fd = nil
-          @contrtol_write_closed = false
-        end
-        def run(command_line_arguments)
-          create_new_process_group
-          parse_command_line_arguments!(command_line_arguments)
-          PluginLoader.load_all
-          control_write_io = IO.new(@control_write_fd)
-          begin
-            run_services
-          rescue
-            logger.exception("failed to run services", $!)
-          ensure
-            unless @control_write_closed
-              control_write_io.write("finish\n")
-              control_write_io.close
-            end
-          end
-          true
-        end
-        private
-        def create_new_process_group
-          begin
-            Process.setsid
-          rescue SystemCallError, NotImplementedError
-          end
-        end
-        def parse_command_line_arguments!(command_line_arguments)
-          parser = OptionParser.new
-          add_internal_options(parser)
-          parser.parse!(command_line_arguments)
-        end
-        def add_internal_options(parser)
-          parser.separator("")
-          parser.separator("Internal:")
-          parser.on("--engine-name=NAME",
-                    "Use NAME as the name of the engine") do |name|
-            @engine_name = name
-          end
-          parser.on("--listen-fd=FD", Integer,
-                    "Use FD as the listen file descriptor") do |fd|
-            @listen_fd = fd
-          end
-          parser.on("--heartbeat-fd=FD", Integer,
-                    "Use FD as the heartbeat file descriptor") do |fd|
-            @heartbeat_fd = fd
-          end
-          parser.on("--control-read-fd=FD", Integer,
-                    "Use FD to read control messages from the service") do |fd|
-            @control_read_fd = fd
-          end
-          parser.on("--control-write-fd=FD", Integer,
-                    "Use FD to write control messages from the service") do |fd|
-            @control_write_fd = fd
-          end
-        end
-        def host
-          @engine_name.split(":", 2).first
-        end
-        def run_services
-          @stopping = false
-          @engine = nil
-          @receiver = nil
-          @loop = Coolio::Loop.default
-          run_internal_message_receiver
-          run_engine
-          run_receiver
-          run_control_io
-          @loop.run
-        end
-        def run_internal_message_receiver
-          @internal_message_receiver = create_internal_message_receiver
-          host, port = @internal_message_receiver.start
-          tag = @engine_name.split("/", 2).last.split(".", 2).first
-          @internal_engine_name = "#{host}:#{port}/#{tag}"
-        end
-        def create_internal_message_receiver
-          InternalFluentMessageReceiver.new(@loop, host) do |tag, time, record|
-            on_message(tag, time, record)
-          end
-        end
-        def shutdown_internal_message_receiver
-          return if @internal_message_receiver.nil?
-          @internal_message_receiver, receiver = nil, @internal_message_receiver
-          receiver.shutdown
-        end
-        def run_engine
-          @engine = Engine.new(@loop, @engine_name, @internal_engine_name)
-          @engine.start
-        end
-        def run_receiver
-          @receiver = create_receiver
-          @receiver.start
-        end
-        def shutdown_receiver
-          return if****@recei*****?
-          @receiver, receiver = nil, @receiver
-          receiver.shutdown
-        end
-        def run_control_io
-          @control_read = Coolio::IO.new(IO.new(@control_read_fd))
-          @control_read_fd = nil
-          on_read = lambda do |data|
-            # TODO: should buffer data to handle half line received case
-            data.each_line do |line|
-              case line
-              when "stop-graceful\n"
-                stop_graceful
-              when "stop-immediately\n"
-                stop_immediately
-              end
-            end
-          end
-          @control_read.on_read do |data|
-            on_read.call(data)
-          end
-          read_on_close = lambda do
-            if @control_read
-              @control_read = nil
-              stop_immediately
-            end
-          end
-          @control_read.on_close do
-            read_on_close.call
-          end
-          @loop.attach(@control_read)
-          @control_write = Coolio::IO.new(IO.new(@control_write_fd))
-          @control_write_fd = nil
-          write_on_close = lambda do
-            if @control_write
-              @control_write = nil
-              stop_immediately
-            end
-            @control_write_closed = true
-          end
-          @control_write.on_close do
-            write_on_close.call
-          end
-          @loop.attach(@control_write)
-          @control_write.write("ready\n")
-        end
-        def shutdown_control_io
-          if @control_write
-            @control_write, control_write = nil, @control_write
-            control_write.detach
-          end
-          if @control_read
-            @control_read, control_read = nil, @control_read
-            control_read.close
-          end
-        end
-        def create_receiver
-          options = {
-            :listen_fd => @listen_fd,
-            :heartbeat_fd => @heartbeat_fd,
-          }
-          FluentMessageReceiver.new(@loop, options) do |tag, time, record|
-            on_message(tag, time, record)
-          end
-        end
-        def on_message(tag, time, record)
-          prefix, type, *arguments = tag.split(/\./)
-          if type.nil? or type.empty? or type == "message"
-            message = record
-          else
-            message = {
-              "type" => type,
-              "arguments" => arguments,
-              "body" => record
-            }
-          end
-          reply_to = message["replyTo"]
-          if reply_to.is_a? String
-            message["replyTo"] = {
-              "type" => "#{message["type"]}.result",
-              "to" => reply_to
-            }
-          end
-          @engine.process(message)
-        end
-        def stop_graceful
-          return if @stopping
-          @stopping = true
-          shutdown_receiver
-          @engine.stop_graceful do
-            shutdown_control_io
-            shutdown_internal_message_receiver
-          end
-        end
-        # It may be called after stop_graceful.
-        def stop_immediately
-          shutdown_control_io
-          shutdown_receiver if @receiver
-          shutdown_internal_message_receiver
-          @engine.stop_immediately
-          @loop.stop
-        end
-        def log_tag
-          "service"
-        end
-      end

  Added: lib/droonga/command/droonga_engine_service.rb (+269 -0) 100644
--- /dev/null
+++ lib/droonga/command/droonga_engine_service.rb    2014-05-27 13:04:59 +0900 (48638bc)
@@ -0,0 +1,269 @@
+# Copyright (C) 2014 Droonga Project
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# Lesser General Public License for more details.
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+require "optparse"
+require "coolio"
+require "droonga/engine"
+require "droonga/fluent_message_receiver"
+require "droonga/internal_fluent_message_receiver"
+require "droonga/plugin_loader"
+module Droonga
+  module Command
+    class DroongaEngineService
+      class << self
+        def run(command_line_arguments)
+          new.run(command_line_arguments)
+        end
+      end
+      include Loggable
+      def initialize
+        @engine_name = nil
+        @listen_fd = nil
+        @heartbeat_fd = nil
+        @contrtol_read_fd = nil
+        @contrtol_write_fd = nil
+        @contrtol_write_closed = false
+      end
+      def run(command_line_arguments)
+        create_new_process_group
+        parse_command_line_arguments!(command_line_arguments)
+        PluginLoader.load_all
+        control_write_io = IO.new(@control_write_fd)
+        begin
+          run_services
+        rescue
+          logger.exception("failed to run services", $!)
+        ensure
+          unless @control_write_closed
+            control_write_io.write("finish\n")
+            control_write_io.close
+          end
+        end
+        true
+      end
+      private
+      def create_new_process_group
+        begin
+          Process.setsid
+        rescue SystemCallError, NotImplementedError
+        end
+      end
+      def parse_command_line_arguments!(command_line_arguments)
+        parser = OptionParser.new
+        add_internal_options(parser)
+        parser.parse!(command_line_arguments)
+      end
+      def add_internal_options(parser)
+        parser.separator("")
+        parser.separator("Internal:")
+        parser.on("--engine-name=NAME",
+                  "Use NAME as the name of the engine") do |name|
+          @engine_name = name
+        end
+        parser.on("--listen-fd=FD", Integer,
+                  "Use FD as the listen file descriptor") do |fd|
+          @listen_fd = fd
+        end
+        parser.on("--heartbeat-fd=FD", Integer,
+                  "Use FD as the heartbeat file descriptor") do |fd|
+          @heartbeat_fd = fd
+        end
+        parser.on("--control-read-fd=FD", Integer,
+                  "Use FD to read control messages from the service") do |fd|
+          @control_read_fd = fd
+        end
+        parser.on("--control-write-fd=FD", Integer,
+                  "Use FD to write control messages from the service") do |fd|
+          @control_write_fd = fd
+        end
+      end
+      def host
+        @engine_name.split(":", 2).first
+      end
+      def run_services
+        @stopping = false
+        @engine = nil
+        @receiver = nil
+        @loop = Coolio::Loop.default
+        run_internal_message_receiver
+        run_engine
+        run_receiver
+        run_control_io
+        @loop.run
+      end
+      def run_internal_message_receiver
+        @internal_message_receiver = create_internal_message_receiver
+        host, port = @internal_message_receiver.start
+        tag = @engine_name.split("/", 2).last.split(".", 2).first
+        @internal_engine_name = "#{host}:#{port}/#{tag}"
+      end
+      def create_internal_message_receiver
+        InternalFluentMessageReceiver.new(@loop, host) do |tag, time, record|
+          on_message(tag, time, record)
+        end
+      end
+      def shutdown_internal_message_receiver
+        return if @internal_message_receiver.nil?
+        @internal_message_receiver, receiver = nil, @internal_message_receiver
+        receiver.shutdown
+      end
+      def run_engine
+        @engine = Engine.new(@loop, @engine_name, @internal_engine_name)
+        @engine.start
+      end
+      def run_receiver
+        @receiver = create_receiver
+        @receiver.start
+      end
+      def shutdown_receiver
+        return if****@recei*****?
+        @receiver, receiver = nil, @receiver
+        receiver.shutdown
+      end
+      def run_control_io
+        @control_read = Coolio::IO.new(IO.new(@control_read_fd))
+        @control_read_fd = nil
+        on_read = lambda do |data|
+          # TODO: should buffer data to handle half line received case
+          data.each_line do |line|
+            case line
+            when "stop-graceful\n"
+              stop_graceful
+            when "stop-immediately\n"
+              stop_immediately
+            end
+          end
+        end
+        @control_read.on_read do |data|
+          on_read.call(data)
+        end
+        read_on_close = lambda do
+          if @control_read
+            @control_read = nil
+            stop_immediately
+          end
+        end
+        @control_read.on_close do
+          read_on_close.call
+        end
+        @loop.attach(@control_read)
+        @control_write = Coolio::IO.new(IO.new(@control_write_fd))
+        @control_write_fd = nil
+        write_on_close = lambda do
+          if @control_write
+            @control_write = nil
+            stop_immediately
+          end
+          @control_write_closed = true
+        end
+        @control_write.on_close do
+          write_on_close.call
+        end
+        @loop.attach(@control_write)
+        @control_write.write("ready\n")
+      end
+      def shutdown_control_io
+        if @control_write
+          @control_write, control_write = nil, @control_write
+          control_write.detach
+        end
+        if @control_read
+          @control_read, control_read = nil, @control_read
+          control_read.close
+        end
+      end
+      def create_receiver
+        options = {
+          :listen_fd => @listen_fd,
+          :heartbeat_fd => @heartbeat_fd,
+        }
+        FluentMessageReceiver.new(@loop, options) do |tag, time, record|
+          on_message(tag, time, record)
+        end
+      end
+      def on_message(tag, time, record)
+        prefix, type, *arguments = tag.split(/\./)
+        if type.nil? or type.empty? or type == "message"
+          message = record
+        else
+          message = {
+            "type" => type,
+            "arguments" => arguments,
+            "body" => record
+          }
+        end
+        reply_to = message["replyTo"]
+        if reply_to.is_a? String
+          message["replyTo"] = {
+            "type" => "#{message["type"]}.result",
+            "to" => reply_to
+          }
+        end
+        @engine.process(message)
+      end
+      def stop_graceful
+        return if @stopping
+        @stopping = true
+        shutdown_receiver
+        @engine.stop_graceful do
+          shutdown_control_io
+          shutdown_internal_message_receiver
+        end
+      end
+      # It may be called after stop_graceful.
+      def stop_immediately
+        shutdown_control_io
+        shutdown_receiver if @receiver
+        shutdown_internal_message_receiver
+        @engine.stop_immediately
+        @loop.stop
+      end
+      def log_tag
+        "droonga-engine-service"
+      end
+    end
+  end

  Modified: lib/droonga/internal_fluent_message_receiver.rb (+1 -0)
--- lib/droonga/internal_fluent_message_receiver.rb    2014-05-27 12:54:12 +0900 (e7a6bb0)
+++ lib/droonga/internal_fluent_message_receiver.rb    2014-05-27 13:04:59 +0900 (b005767)
@@ -14,6 +14,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 require "socket"
+require "ipaddr"
 require "droonga/fluent_message_receiver"
-------------- next part --------------

More information about the Groonga-commit mailing list
Back to archive index