Daijiro MORI
null+****@clear*****
Wed Aug 21 12:31:31 JST 2013
Daijiro MORI 2013-08-21 12:31:31 +0900 (Wed, 21 Aug 2013) New Revision: 03407526accdb79d0350fdc8e23b8b710bac9ade https://github.com/droonga/fluent-plugin-droonga/commit/03407526accdb79d0350fdc8e23b8b710bac9ade Message: Add Executor class to support synchronous execution. Copied files: lib/droonga/executor.rb (from lib/droonga/worker.rb) Modified files: lib/droonga/engine.rb lib/droonga/proxy.rb lib/droonga/server.rb lib/droonga/worker.rb Modified: lib/droonga/engine.rb (+27 -12) =================================================================== --- lib/droonga/engine.rb 2013-08-20 14:09:55 +0900 (53bc70c) +++ lib/droonga/engine.rb 2013-08-21 12:31:31 +0900 (f913f48) @@ -21,6 +21,7 @@ require "cool.io" require "droonga/server" require "droonga/worker" +require "droonga/executor" module Droonga class Engine @@ -29,6 +30,7 @@ module Droonga :queue_name => "DroongaQueue", :handlers => ["proxy"], :n_workers => 1, + :with_server => false } def initialize(options={}) @@ -36,26 +38,38 @@ module Droonga end def start - @message_input, @message_output = IO.pipe - @message_input.sync = true - @message_output.sync = true - start_supervisor - start_emitter + if @options[:n_workers] > 0 || @options[:with_server] + @message_input, @message_output = IO.pipe + @message_input.sync = true + @message_output.sync = true + start_supervisor + end + if @options[:with_server] + start_emitter + else + @executor = Executor.new(@options) + end end def shutdown $log.trace("engine: shutdown: start") - shutdown_emitter - shutdown_supervisor - @message_input.close unless @message_input.closed? - @message_output.close unless @message_output.closed? + shutdown_emitter if @emitter + if @supervisor + shutdown_supervisor + @message_input.close unless @message_input.closed? + @message_output.close unless @message_output.closed? + end $log.trace("engine: shutdown: done") end def emit(tag, time, record) $log.trace("tag: <#{tag}>") - @emitter.write(MessagePack.pack([tag, time, record])) - @loop_breaker.signal + if @executor + @executor.dispatch(tag, time, record) + else + @emitter.write(MessagePack.pack([tag, time, record])) + @loop_breaker.signal + end end private @@ -85,7 +99,8 @@ module Droonga end def start_supervisor - @supervisor = ServerEngine::Supervisor.new(Server, Worker) do + server = @options[:with_server] ? Server : nil + @supervisor = ServerEngine::Supervisor.new(server, Worker) do force_options = { :worker_type => "process", :workers => @options[:n_workers], Copied: lib/droonga/executor.rb (+18 -61) 78% =================================================================== --- lib/droonga/worker.rb 2013-08-20 14:09:55 +0900 (91287f1) +++ lib/droonga/executor.rb 2013-08-21 12:31:31 +0900 (3d2720b) @@ -26,51 +26,21 @@ require "droonga/catalog" require "droonga/proxy" module Droonga - module Worker + class Executor attr_reader :context, :envelope, :name - def initialize - @handlers = [] - @outputs = {} - @name = config[:name] - @database_name = config[:database] || "droonga/db" - @queue_name = config[:queue_name] || "DroongaQueue" - @handler_names = config[:handlers] || ["proxy"] - load_handlers - prepare - end - - def run - $log.trace("worker: run: start") - @queue = @context[@queue_name] - @running = true - while @running - $log.trace("worker: run: pull_message: start") - message = pull_message - $log.trace("worker: run: pull_message: done") - next unless message - body, command, arguments = parse_message(message) - handler = find_handler(command) - handler.handle(command, body, *arguments) if handler - end - @handlers.each do |handler| - handler.shutdown - end - @outputs.each do |dest, output| - output[:logger].close if output[:logger] - end - @queue = nil - @database.close - @context.close - @database = @context = nil - $log.trace("worker: run: done") - end - - def stop - $log.trace("worker: stop: start") - @running = false - $log.trace("worker: stop: done") - end + def initialize(options={}) + @handlers = [] + @outputs = {} + @name = options[:name] + @database_name = options[:database] || "droonga/db" + @queue_name = options[:queue_name] || "DroongaQueue" + Droonga::JobQueue.ensure_schema(@database_name, @queue_name) + @handler_names = options[:handlers] || ["proxy"] + load_handlers + @pool_size = options[:pool_size] || 1 + prepare + end def add_handler(name) plugin = HandlerPlugin.new(name) @@ -81,29 +51,16 @@ module Droonga envelope["via"].push(route) end + def dispatch(*message) + body, type, arguments = parse_message(message) + post_or_push(message, body, "type" => type, "arguments" => arguments) + end + def post(body, destination=nil) post_or_push(nil, body, destination) end private - def shutdown_workers - @pool.each do |pid| - Process.kill(:TERM, pid) - end - queue = @context[@queue_name] - 3.times do |i| - break if****@pool*****? - queue.unblock - @pool.reject! do |pid| - not Process.waitpid(pid, Process::WNOHANG).nil? - end - sleep(i ** 2 * 0.1) - end - @pool.each do |pid| - Process.kill(:KILL, pid) - end - end - def post_or_push(message, body, destination) route = nil unless destination Modified: lib/droonga/proxy.rb (+1 -0) =================================================================== --- lib/droonga/proxy.rb 2013-08-20 14:09:55 +0900 (6527e43) +++ lib/droonga/proxy.rb 2013-08-21 12:31:31 +0900 (8138eee) @@ -245,6 +245,7 @@ module Droonga if command # TODO: should be controllable for each command respectively. synchronous = !n_of_expects.zero? + # TODO: check if asynchronous execution is available. message = { "task"=>task, "name"=>name, Modified: lib/droonga/server.rb (+3 -5) =================================================================== --- lib/droonga/server.rb 2013-08-20 14:09:55 +0900 (7119de5) +++ lib/droonga/server.rb 2013-08-21 12:31:31 +0900 (1b8186f) @@ -20,6 +20,7 @@ require "cool.io" require "groonga" require "droonga/job_queue" +require "droonga/executor" module Droonga module Server @@ -70,6 +71,7 @@ module Droonga @database_name = config[:database] || "droonga/db" @queue_name = config[:queue_name] || "DroongaQueue" Droonga::JobQueue.ensure_schema(@database_name, @queue_name) + @executor = Executor.new(config) end def before_run @@ -80,11 +82,7 @@ module Droonga @receiver_thread = Thread.new do @receiver.run do |message| $log.trace("received: start") - packed_message = message.to_msgpack - queue = @context[@queue_name] - queue.push do |record| - record.message = packed_message - end + @executor.dispatch(*message) $log.trace("received: done") end end Modified: lib/droonga/worker.rb (+1 -0) =================================================================== --- lib/droonga/worker.rb 2013-08-20 14:09:55 +0900 (91287f1) +++ lib/droonga/worker.rb 2013-08-21 12:31:31 +0900 (47a7626) @@ -36,6 +36,7 @@ module Droonga @database_name = config[:database] || "droonga/db" @queue_name = config[:queue_name] || "DroongaQueue" @handler_names = config[:handlers] || ["proxy"] + @pool_size = config[:n_workers] load_handlers prepare end -------------- next part -------------- HTML����������������������������...다운로드