Kouhei Sutou
null+****@clear*****
Tue Dec 17 19:14:37 JST 2013
Kouhei Sutou 2013-12-17 19:14:37 +0900 (Tue, 17 Dec 2013) New Revision: fae80d76ba6f31110653d59c4bf58caa2be2ac56 https://github.com/droonga/droonga-client-ruby/commit/fae80d76ba6f31110653d59c4bf58caa2be2ac56 Message: Support async request It breaks backward compatibility. Sorry... client.connection.send(message, :response => :none) # -> client.connection.send(message) client.connection.send(message, :response => :one) # -> client.connection.execute(message) Modified files: lib/droonga/client/connection/droonga_protocol.rb Modified: lib/droonga/client/connection/droonga_protocol.rb (+78 -22) =================================================================== --- lib/droonga/client/connection/droonga_protocol.rb 2013-12-09 17:25:02 +0900 (d22366f) +++ lib/droonga/client/connection/droonga_protocol.rb 2013-12-17 19:14:37 +0900 (a6aca48) @@ -16,6 +16,7 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA require "socket" +require "thread" require "msgpack" require "fluent-logger" @@ -23,6 +24,16 @@ module Droonga class Client module Connection class DroongaProtocol + class Request + def initialize(thread) + @thread = thread + end + + def wait + @thread.join + end + end + def initialize(options={}) default_options = { :tag => "droonga", @@ -38,7 +49,7 @@ module Droonga @read_timeout = options[:read_timeout] end - def search(body) + def search(body, &block) envelope = { "id" => Time.now.to_f.to_s, "date" => Time.now, @@ -46,38 +57,83 @@ module Droonga "type" => "search", "body" => body, } - send(envelope, :response => :one) + execute(envelope, &block) end - # Sends low level request. Normally, you should use other - # convenience methods. + # Sends a request message and receives one ore more response + # messages. # - # @param envelope [Hash] Request envelope. - # @param options [Hash] The options to send request. - # @option options :response [nil, :none, :one] (nil) The response type. - # If you specify `nil`, it is treated as `:one`. - # @return The response. TODO: WRITE ME - def send(envelope, options={}) - response_type = options[:response] || :one - case response_type - when :none - @logger.post("message", envelope) - when :one - receiver = Receiver.new + # @overload execute(message, options={}) + # Executes the request message synchronously. + # + # @param message [Hash] Request message. + # @param options [Hash] The options to executes a request. + # TODO: WRITE ME + # + # @return [Object] The response. TODO: WRITE ME + # + # @overload execute(message, options={}, &block) + # Executes the passed request message asynchronously. + # + # @param message [Hash] Request message. + # @param options [Hash] The options to executes a request. + # TODO: WRITE ME + # @yield [response] + # The block is called when response is received. + # @yieldparam [Object] response + # The response. + # @yieldreturn [Boolean] + # true if you want to wait more responses, + # false otherwise. + # + # @return [Request] The request object. + def execute(message, options={}, &block) + receiver = Receiver.new + message = message.dup + message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga" + send(message, options) + + connect_timeout = options[:connect_timeout] || @connect_timeout + read_timeout = options[:read_timeout] || @read_timeout + receive_options = { + :connect_timeout => connect_timeout, + :read_timeout => read_timeout + } + sync = block.nil? + if sync begin - envelope = envelope.dup - envelope["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga" - @logger.post("message", envelope) - receiver.receive(:connect_timeout => @connect_timeout, - :read_timeout => @read_timeout) + receiver.receive(receive_options) ensure receiver.close end else - raise InvalidResponseType.new(response_type) + thread = Thread.new do + begin + loop do + response = receiver.receive(receive_options) + break if response.nil? + continue_p = yield(response) + break unless continue_p + end + ensure + receiver.close + end + end + Request.new(thread) end end + # Sends low level request. Normally, you should use other + # convenience methods. + # + # @param envelope [Hash] Request envelope. + # @param options [Hash] The options to send request. + # TODO: WRITE ME + # @return [void] + def send(envelope, options={}, &block) + @logger.post("message", envelope) + end + # Close the connection. This connection can't be used anymore. # # @return [void] -------------- next part -------------- HTML����������������������������...다운로드