[Groonga-commit] droonga/droonga-client-ruby at fae80d7 [master] Support async request

Back to archive index

Kouhei Sutou null+****@clear*****
Tue Dec 17 19:14:37 JST 2013


Kouhei Sutou	2013-12-17 19:14:37 +0900 (Tue, 17 Dec 2013)

  New Revision: fae80d76ba6f31110653d59c4bf58caa2be2ac56
  https://github.com/droonga/droonga-client-ruby/commit/fae80d76ba6f31110653d59c4bf58caa2be2ac56

  Message:
    Support async request
    
    It breaks backward compatibility. Sorry...
    
        client.connection.send(message, :response => :none)
        # ->
        client.connection.send(message)
    
        client.connection.send(message, :response => :one)
        # ->
        client.connection.execute(message)

  Modified files:
    lib/droonga/client/connection/droonga_protocol.rb

  Modified: lib/droonga/client/connection/droonga_protocol.rb (+78 -22)
===================================================================
--- lib/droonga/client/connection/droonga_protocol.rb    2013-12-09 17:25:02 +0900 (d22366f)
+++ lib/droonga/client/connection/droonga_protocol.rb    2013-12-17 19:14:37 +0900 (a6aca48)
@@ -16,6 +16,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "socket"
+require "thread"
 require "msgpack"
 require "fluent-logger"
 
@@ -23,6 +24,16 @@ module Droonga
   class Client
     module Connection
       class DroongaProtocol
+        class Request
+          def initialize(thread)
+            @thread = thread
+          end
+
+          def wait
+            @thread.join
+          end
+        end
+
         def initialize(options={})
           default_options = {
             :tag             => "droonga",
@@ -38,7 +49,7 @@ module Droonga
           @read_timeout = options[:read_timeout]
         end
 
-        def search(body)
+        def search(body, &block)
           envelope = {
             "id"         => Time.now.to_f.to_s,
             "date"       => Time.now,
@@ -46,38 +57,83 @@ module Droonga
             "type"       => "search",
             "body"       => body,
           }
-          send(envelope, :response => :one)
+          execute(envelope, &block)
         end
 
-        # Sends low level request. Normally, you should use other
-        # convenience methods.
+        # Sends a request message and receives one ore more response
+        # messages.
         #
-        # @param envelope [Hash] Request envelope.
-        # @param options [Hash] The options to send request.
-        # @option options :response [nil, :none, :one] (nil) The response type.
-        #   If you specify `nil`, it is treated as `:one`.
-        # @return The response. TODO: WRITE ME
-        def send(envelope, options={})
-          response_type = options[:response] || :one
-          case response_type
-          when :none
-            @logger.post("message", envelope)
-          when :one
-            receiver = Receiver.new
+        # @overload execute(message, options={})
+        #   Executes the request message synchronously.
+        #
+        #   @param message [Hash] Request message.
+        #   @param options [Hash] The options to executes a request.
+        #      TODO: WRITE ME
+        #
+        #   @return [Object] The response. TODO: WRITE ME
+        #
+        # @overload execute(message, options={}, &block)
+        #   Executes the passed request message asynchronously.
+        #
+        #   @param message [Hash] Request message.
+        #   @param options [Hash] The options to executes a request.
+        #      TODO: WRITE ME
+        #   @yield [response]
+        #      The block is called when response is received.
+        #   @yieldparam [Object] response
+        #      The response.
+        #   @yieldreturn [Boolean]
+        #      true if you want to wait more responses,
+        #      false otherwise.
+        #
+        #   @return [Request] The request object.
+        def execute(message, options={}, &block)
+          receiver = Receiver.new
+          message = message.dup
+          message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga"
+          send(message, options)
+
+          connect_timeout = options[:connect_timeout] || @connect_timeout
+          read_timeout = options[:read_timeout] || @read_timeout
+          receive_options = {
+            :connect_timeout => connect_timeout,
+            :read_timeout    => read_timeout
+          }
+          sync = block.nil?
+          if sync
             begin
-              envelope = envelope.dup
-              envelope["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga"
-              @logger.post("message", envelope)
-              receiver.receive(:connect_timeout => @connect_timeout,
-                               :read_timeout    => @read_timeout)
+              receiver.receive(receive_options)
             ensure
               receiver.close
             end
           else
-            raise InvalidResponseType.new(response_type)
+            thread = Thread.new do
+              begin
+                loop do
+                  response = receiver.receive(receive_options)
+                  break if response.nil?
+                  continue_p = yield(response)
+                  break unless continue_p
+                end
+              ensure
+                receiver.close
+              end
+            end
+            Request.new(thread)
           end
         end
 
+        # Sends low level request. Normally, you should use other
+        # convenience methods.
+        #
+        # @param envelope [Hash] Request envelope.
+        # @param options [Hash] The options to send request.
+        #   TODO: WRITE ME
+        # @return [void]
+        def send(envelope, options={}, &block)
+          @logger.post("message", envelope)
+        end
+
         # Close the connection. This connection can't be used anymore.
         #
         # @return [void]
-------------- next part --------------
HTML����������������������������...
다운로드 



More information about the Groonga-commit mailing list
Back to archive index