Kouhei Sutou 2019-04-12 15:26:03 +0900 (Fri, 12 Apr 2019) Revision: fb51480071090c1a84da407c123fd84197beab10 https://github.com/groonga/groonga/commit/fb51480071090c1a84da407c123fd84197beab10 Message: logical_select: add support for window function over shards Added files: include/groonga/window_function_executor.h lib/grn_window_function_executor.h lib/mrb/mrb_window_function_executor.c lib/window_function_executor.c Copied files: lib/mrb/mrb_window_function_executor.h (from lib/grn_window_function.h) Modified files: include/groonga.h include/groonga/Makefile.am include/groonga/window_function.h lib/c_sources.am lib/ctx_impl_mrb.c lib/grn_window_function.h lib/mrb/sources.am lib/window_function.c lib/window_functions.c plugins/sharding/dynamic_columns.rb plugins/sharding/logical_range_filter.rb plugins/sharding/logical_select.rb test/command/suite/sharding/logical_select/cache/columns/window/group_keys.expected test/command/suite/sharding/logical_select/cache/columns/window/sort_keys.expected test/command/suite/sharding/logical_select/columns/stage/initial/range.expected Modified: include/groonga.h (+1 -0) =================================================================== --- include/groonga.h 2019-04-10 14:52:22 +0900 (266de1fcb) +++ include/groonga.h 2019-04-12 15:26:03 +0900 (88de19f31) @@ -62,6 +62,7 @@ #include "groonga/type.h" #include "groonga/util.h" #include "groonga/window_function.h" +#include "groonga/window_function_executor.h" #include "groonga/windows.h" #include "groonga/windows_event_logger.h" #include "groonga/vector.h" Modified: include/groonga/Makefile.am (+1 -0) =================================================================== --- include/groonga/Makefile.am 2019-04-10 14:52:22 +0900 (3987dc29a) +++ include/groonga/Makefile.am 2019-04-12 15:26:03 +0900 (e2b047d06) @@ -50,6 +50,7 @@ groonga_include_HEADERS = \ normalizer.h \ util.h \ window_function.h \ + window_function_executor.h \ windows.h \ windows_event_logger.h \ vector.h \ Modified: include/groonga/window_function.h (+15 -6) =================================================================== --- include/groonga/window_function.h 2019-04-10 14:52:22 +0900 (404fd04bb) +++ include/groonga/window_function.h 2019-04-12 15:26:03 +0900 (fa8068557) @@ -1,5 +1,6 @@ /* Copyright(C) 2016 Brazil + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -38,8 +39,15 @@ GRN_API grn_rc grn_window_set_direction(grn_ctx *ctx, grn_window_direction direction); GRN_API grn_obj *grn_window_get_table(grn_ctx *ctx, grn_window *window); -GRN_API grn_bool grn_window_is_sorted(grn_ctx *ctx, - grn_window *window); +GRN_API grn_obj *grn_window_get_output_column(grn_ctx *ctx, + grn_window *window); +GRN_API size_t grn_window_get_n_arguments(grn_ctx *ctx, + grn_window *window); +GRN_API grn_obj *grn_window_get_argument(grn_ctx *ctx, + grn_window *window, + size_t i); +GRN_API bool grn_window_is_sorted(grn_ctx *ctx, + grn_window *window); GRN_API size_t grn_window_get_size(grn_ctx *ctx, grn_window *window); @@ -51,17 +59,18 @@ typedef struct _grn_window_definition { } grn_window_definition; typedef grn_rc grn_window_function_func(grn_ctx *ctx, - grn_obj *output_column, + grn_obj *first_output_column, grn_window *window, - grn_obj **args, - int n_args); + grn_obj **first_args, + int first_n_args); GRN_API grn_obj *grn_window_function_create(grn_ctx *ctx, const char *name, int name_size, grn_window_function_func func); - +/* Deprecated since 9.0.2. + Use grn_window_function_executor() instead. */ GRN_API grn_rc grn_table_apply_window_function(grn_ctx *ctx, grn_obj *table, grn_obj *output_column, Added: include/groonga/window_function_executor.h (+64 -0) 100644 =================================================================== --- /dev/null +++ include/groonga/window_function_executor.h 2019-04-12 15:26:03 +0900 (2612b73ef) @@ -0,0 +1,64 @@ +/* + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _grn_window_function_executor grn_window_function_executor; + +GRN_API grn_window_function_executor * +grn_window_function_executor_open(grn_ctx *ctx); +GRN_API grn_rc +grn_window_function_executor_close(grn_ctx *ctx, + grn_window_function_executor *executor); +GRN_API grn_rc +grn_window_function_executor_add_table(grn_ctx *ctx, + grn_window_function_executor *executor, + grn_obj *table); +GRN_API grn_rc +grn_window_function_executor_set_source(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *source, + size_t source_size); +GRN_API grn_rc +grn_window_function_executor_set_sort_keys(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *sort_keys, + size_t sort_keys_size); +GRN_API grn_rc +grn_window_function_executor_set_group_keys(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *group_keys, + size_t group_keys_size); + +GRN_API grn_rc +grn_window_function_executor_set_output_column_name(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *name, + size_t name_size); + +GRN_API grn_rc +grn_window_function_executor_execute(grn_ctx *ctx, + grn_window_function_executor *executor); + +#ifdef __cplusplus +} +#endif Modified: lib/c_sources.am (+2 -0) =================================================================== --- lib/c_sources.am 2019-04-10 14:52:22 +0900 (806aa674d) +++ lib/c_sources.am 2019-04-12 15:26:03 +0900 (1c84f4979) @@ -131,5 +131,7 @@ libgroonga_c_sources = \ file_reader.c \ window_function.c \ grn_window_function.h \ + window_function_executor.c \ + grn_window_function_executor.h \ window_functions.c \ grn_window_functions.h Modified: lib/ctx_impl_mrb.c (+3 -1) =================================================================== --- lib/ctx_impl_mrb.c 2019-04-10 14:52:22 +0900 (9800ed1f3) +++ lib/ctx_impl_mrb.c 2019-04-12 15:26:03 +0900 (e762b3e59) @@ -1,7 +1,7 @@ /* -*- c-basic-offset: 2 -*- */ /* Copyright(C) 2013-2018 Brazil - Copyright(C) 2018 Kouhei Sutou <kou****@clear*****> + Copyright(C) 2018-2019 Kouhei Sutou <kou****@clear*****> This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -71,6 +71,7 @@ # include "mrb/mrb_eval_context.h" # include "mrb/mrb_thread.h" # include "mrb/mrb_window_definition.h" +# include "mrb/mrb_window_function_executor.h" # include "mrb/mrb_locale_output.h" # include "mrb/mrb_output_columns.h" @@ -206,6 +207,7 @@ mrb_groonga_init(mrb_state *mrb, mrb_value self) grn_mrb_eval_context_init(ctx); grn_mrb_thread_init(ctx); grn_mrb_window_definition_init(ctx); + grn_mrb_window_function_executor_init(ctx); grn_mrb_locale_output_init(ctx); grn_mrb_output_columns_init(ctx); Modified: lib/grn_window_function.h (+29 -8) =================================================================== --- lib/grn_window_function.h 2019-04-10 14:52:22 +0900 (e51798487) +++ lib/grn_window_function.h 2019-04-12 15:26:03 +0900 (799202f1d) @@ -1,6 +1,7 @@ /* -*- c-basic-offset: 2 -*- */ /* Copyright(C) 2016-2017 Brazil + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -17,23 +18,43 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include "grn_db.h" + #pragma once -struct _grn_window { +typedef struct { grn_obj *table; - grn_obj *grouped_table; - grn_obj ids; - size_t n_ids; + grn_obj *window_function_call; + grn_proc *window_function; + grn_obj *arguments; + grn_obj *output_column; + grn_obj *ids; ssize_t current_index; +} grn_window_shard; + +struct _grn_window { + grn_window_shard *shards; + size_t n_shards; + ssize_t current_shard; grn_window_direction direction; - grn_bool is_sorted; + bool is_sorted; }; grn_rc grn_window_init(grn_ctx *ctx, - grn_window *window, - grn_obj *table, - grn_bool is_sorted); + grn_window *window); grn_rc grn_window_fin(grn_ctx *ctx, grn_window *window); +grn_rc grn_window_reset(grn_ctx *ctx, grn_window *window); +grn_rc grn_window_add_record(grn_ctx *ctx, + grn_window *window, + grn_obj *table, + grn_id record_id, + grn_obj *window_function_call, + grn_obj *output_column); +bool grn_window_is_empty(grn_ctx *ctx, grn_window *window); +grn_rc grn_window_set_is_sorted(grn_ctx *ctx, + grn_window *window, + bool is_sorted); +grn_rc grn_window_execute(grn_ctx *ctx, grn_window *window); #ifdef __cplusplus } Added: lib/grn_window_function_executor.h (+62 -0) 100644 =================================================================== --- /dev/null +++ lib/grn_window_function_executor.h 2019-04-12 15:26:03 +0900 (cf14f2fb8) @@ -0,0 +1,62 @@ +/* -*- c-basic-offset: 2 -*- */ +/* + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#pragma once + +#include "grn_window_function.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct _grn_window_function_executor { + grn_obj tables; + grn_obj source; + grn_obj sort_keys; + grn_obj group_keys; + grn_obj output_column_name; + struct { + grn_table_sort_key *sort_keys; + size_t n_sort_keys; + grn_table_sort_key *group_keys; + size_t n_group_keys; + grn_table_sort_key *window_sort_keys; + size_t n_window_sort_keys; + grn_obj *sorted; + } context; + struct { + size_t n; + grn_obj *previous; + grn_obj *current; + } values; + grn_obj window_function_calls; + grn_obj output_columns; + grn_window window; +}; + +grn_rc +grn_window_function_executor_init(grn_ctx *ctx, + grn_window_function_executor *executor); +grn_rc +grn_window_function_executor_fin(grn_ctx *ctx, + grn_window_function_executor *executor); + + +#ifdef __cplusplus +} +#endif Added: lib/mrb/mrb_window_function_executor.c (+203 -0) 100644 =================================================================== --- /dev/null +++ lib/mrb/mrb_window_function_executor.c 2019-04-12 15:26:03 +0900 (44532e25a) @@ -0,0 +1,203 @@ +/* -*- c-basic-offset: 2 -*- */ +/* + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "../grn_ctx_impl.h" + +#ifdef GRN_WITH_MRUBY +#include <mruby.h> +#include <mruby/class.h> +#include <mruby/data.h> + +#include "mrb_converter.h" +#include "mrb_ctx.h" +#include "mrb_window_function_executor.h" + +static void +mrb_grn_window_function_executor_free(mrb_state *mrb, void *data) +{ + grn_window_function_executor *executor = data; + + if (!executor) { + return; + } + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor_close(ctx, executor); +} + +static struct mrb_data_type mrb_grn_window_function_executor_type = { + "Groonga::WindowFunctionExecutor", + mrb_grn_window_function_executor_free +}; + +static mrb_value +mrb_grn_window_function_executor_initialize(mrb_state *mrb, mrb_value self) +{ + DATA_TYPE(self) = &mrb_grn_window_function_executor_type; + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = + grn_window_function_executor_open(ctx); + grn_mrb_ctx_check(mrb); + DATA_PTR(self) = executor; + + return self; +} + +static mrb_value +mrb_grn_window_function_executor_close(mrb_state *mrb, mrb_value self) +{ + grn_window_function_executor *executor = DATA_PTR(self); + if (executor) { + mrb_grn_window_function_executor_free(mrb, executor); + DATA_PTR(self) = NULL; + } + + return mrb_nil_value(); +} + +static mrb_value +mrb_grn_window_function_executor_set_source(mrb_state *mrb, mrb_value self) +{ + char *source; + mrb_int source_size; + mrb_get_args(mrb, "s!", &source, &source_size); + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = DATA_PTR(self); + grn_window_function_executor_set_source(ctx, executor, source, source_size); + grn_mrb_ctx_check(mrb); + + return mrb_nil_value(); +} + +static mrb_value +mrb_grn_window_function_executor_add_table(mrb_state *mrb, mrb_value self) +{ + mrb_value mrb_table; + mrb_get_args(mrb, "o", &mrb_table); + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = DATA_PTR(self); + grn_obj *table = GRN_MRB_DATA_PTR(mrb_table); + grn_window_function_executor_add_table(ctx, executor, table); + grn_mrb_ctx_check(mrb); + + return mrb_nil_value(); +} + +static mrb_value +mrb_grn_window_function_executor_set_sort_keys(mrb_state *mrb, mrb_value self) +{ + char *keys; + mrb_int keys_size; + mrb_get_args(mrb, "s!", &keys, &keys_size); + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = DATA_PTR(self); + grn_window_function_executor_set_sort_keys(ctx, executor, keys, keys_size); + grn_mrb_ctx_check(mrb); + + return mrb_nil_value(); +} + +static mrb_value +mrb_grn_window_function_executor_set_group_keys(mrb_state *mrb, mrb_value self) +{ + char *keys; + mrb_int keys_size; + mrb_get_args(mrb, "s!", &keys, &keys_size); + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = DATA_PTR(self); + grn_window_function_executor_set_group_keys(ctx, executor, keys, keys_size); + grn_mrb_ctx_check(mrb); + + return mrb_nil_value(); +} + +static mrb_value +mrb_grn_window_function_executor_set_output_column_name(mrb_state *mrb, + mrb_value self) +{ + char *name; + mrb_int name_size; + mrb_get_args(mrb, "s!", &name, &name_size); + + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = DATA_PTR(self); + grn_window_function_executor_set_output_column_name(ctx, + executor, + name, + name_size); + grn_mrb_ctx_check(mrb); + + return mrb_nil_value(); +} + +static mrb_value +mrb_grn_window_function_executor_execute(mrb_state *mrb, mrb_value self) +{ + grn_ctx *ctx = (grn_ctx *)mrb->ud; + grn_window_function_executor *executor = DATA_PTR(self); + grn_window_function_executor_execute(ctx, executor); + grn_mrb_ctx_check(mrb); + + return mrb_nil_value(); +} + +void +grn_mrb_window_function_executor_init(grn_ctx *ctx) +{ + grn_mrb_data *data = &(ctx->impl->mrb); + mrb_state *mrb = data->state; + struct RClass *module = data->module; + struct RClass *klass; + + klass = mrb_define_class_under(mrb, module, "WindowFunctionExecutor", + mrb->object_class); + MRB_SET_INSTANCE_TT(klass, MRB_TT_DATA); + + mrb_define_method(mrb, klass, "initialize", + mrb_grn_window_function_executor_initialize, + MRB_ARGS_NONE()); + + mrb_define_method(mrb, klass, "close", + mrb_grn_window_function_executor_close, MRB_ARGS_NONE()); + + mrb_define_method(mrb, klass, "add_table", + mrb_grn_window_function_executor_add_table, + MRB_ARGS_REQ(1)); + mrb_define_method(mrb, klass, "source=", + mrb_grn_window_function_executor_set_source, + MRB_ARGS_REQ(1)); + mrb_define_method(mrb, klass, "sort_keys=", + mrb_grn_window_function_executor_set_sort_keys, + MRB_ARGS_REQ(1)); + mrb_define_method(mrb, klass, "group_keys=", + mrb_grn_window_function_executor_set_group_keys, + MRB_ARGS_REQ(1)); + mrb_define_method(mrb, klass, "output_column_name=", + mrb_grn_window_function_executor_set_output_column_name, + MRB_ARGS_REQ(1)); + + mrb_define_method(mrb, klass, "execute", + mrb_grn_window_function_executor_execute, + MRB_ARGS_NONE()); +} +#endif Copied: lib/mrb/mrb_window_function_executor.h (+10 -18) 54% =================================================================== --- lib/grn_window_function.h 2019-04-10 14:52:22 +0900 (e51798487) +++ lib/mrb/mrb_window_function_executor.h 2019-04-12 15:26:03 +0900 (a0ea38406) @@ -1,11 +1,10 @@ /* -*- c-basic-offset: 2 -*- */ /* - Copyright(C) 2016-2017 Brazil + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. + License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -19,22 +18,15 @@ #pragma once -struct _grn_window { - grn_obj *table; - grn_obj *grouped_table; - grn_obj ids; - size_t n_ids; - ssize_t current_index; - grn_window_direction direction; - grn_bool is_sorted; -}; - -grn_rc grn_window_init(grn_ctx *ctx, - grn_window *window, - grn_obj *table, - grn_bool is_sorted); -grn_rc grn_window_fin(grn_ctx *ctx, grn_window *window); +#include "../grn_ctx.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void grn_mrb_window_function_executor_init(grn_ctx *ctx); #ifdef __cplusplus } #endif + Modified: lib/mrb/sources.am (+2 -0) =================================================================== --- lib/mrb/sources.am 2019-04-10 14:52:22 +0900 (4077112ef) +++ lib/mrb/sources.am 2019-04-12 15:26:03 +0900 (aaf99e7a2) @@ -93,5 +93,7 @@ libgrnmrb_la_SOURCES = \ mrb_void.h \ mrb_window_definition.c \ mrb_window_definition.h \ + mrb_window_function_executor.c \ + mrb_window_function_executor.h \ mrb_writer.c \ mrb_writer.h Modified: lib/window_function.c (+453 -238) =================================================================== --- lib/window_function.c 2019-04-10 14:52:22 +0900 (f62164958) +++ lib/window_function.c 2019-04-12 15:26:03 +0900 (b291f1319) @@ -1,6 +1,7 @@ /* -*- c-basic-offset: 2 -*- */ /* Copyright(C) 2016-2017 Brazil + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -23,20 +24,46 @@ #include <string.h> +static void +grn_window_shard_init(grn_ctx *ctx, + grn_window_shard *shard, + grn_obj *table, + grn_obj *window_function_call, + grn_obj *output_column) +{ + shard->table = table; + shard->window_function_call = window_function_call; + grn_expr *expr = (grn_expr *)(window_function_call); + shard->window_function = (grn_proc *)(expr->codes[0].value); + shard->arguments = grn_obj_open(ctx, GRN_UVECTOR, 0, GRN_ID_NIL); + int32_t n = expr->codes_curr - 1; + for (int32_t i = 1; i < n; i++) { + /* TODO: Check op. */ + GRN_PTR_PUT(ctx, shard->arguments, expr->codes[i].value); + } + shard->output_column = output_column; + shard->ids = grn_obj_open(ctx, GRN_UVECTOR, 0, grn_obj_id(ctx, table)); + shard->current_index = -1; +} + +static void +grn_window_shard_fin(grn_ctx *ctx, + grn_window_shard *shard) +{ + grn_obj_close(ctx, shard->arguments); + grn_obj_close(ctx, shard->ids); +} + grn_rc grn_window_init(grn_ctx *ctx, - grn_window *window, - grn_obj *table, - grn_bool is_sorted) + grn_window *window) { GRN_API_ENTER; - window->table = table; - GRN_RECORD_INIT(&(window->ids), GRN_OBJ_VECTOR, grn_obj_id(ctx, table)); - window->n_ids = 0; - window->current_index = 0; + window->shards = NULL; + window->n_shards = 0; + window->current_shard = -1; window->direction = GRN_WINDOW_DIRECTION_ASCENDING; - window->is_sorted = is_sorted; GRN_API_RETURN(GRN_SUCCESS); } @@ -46,37 +73,50 @@ grn_window_fin(grn_ctx *ctx, grn_window *window) { GRN_API_ENTER; - GRN_OBJ_FIN(ctx, &(window->ids)); + grn_window_reset(ctx, window); - GRN_API_RETURN(GRN_SUCCESS); + GRN_API_RETURN(ctx->rc); } grn_id grn_window_next(grn_ctx *ctx, grn_window *window) { - grn_id next_id; - GRN_API_ENTER; if (!window) { GRN_API_RETURN(GRN_ID_NIL); } + if (window->current_shard < 0) { + GRN_API_RETURN(GRN_ID_NIL); + } + + grn_window_shard *shard = &(window->shards[window->current_shard]); if (window->direction == GRN_WINDOW_DIRECTION_ASCENDING) { - if (window->current_index >= window->n_ids) { - GRN_API_RETURN(GRN_ID_NIL); + if (shard->current_index >= GRN_RECORD_VECTOR_SIZE(shard->ids)) { + if (window->current_shard + 1 < window->n_shards) { + window->current_shard++; + shard = &(window->shards[window->current_shard]); + } else { + GRN_API_RETURN(GRN_ID_NIL); + } } } else { - if (window->current_index < 0) { - GRN_API_RETURN(GRN_ID_NIL); + if (shard->current_index < 0) { + if (window->current_shard > 0) { + window->current_shard--; + shard = &(window->shards[window->current_shard]); + } else { + GRN_API_RETURN(GRN_ID_NIL); + } } } - next_id = GRN_RECORD_VALUE_AT(&(window->ids), window->current_index); + grn_id next_id = GRN_RECORD_VALUE_AT(shard->ids, shard->current_index); if (window->direction == GRN_WINDOW_DIRECTION_ASCENDING) { - window->current_index++; + shard->current_index++; } else { - window->current_index--; + shard->current_index--; } GRN_API_RETURN(next_id); @@ -93,9 +133,17 @@ grn_window_rewind(grn_ctx *ctx, grn_window *window) } if (window->direction == GRN_WINDOW_DIRECTION_ASCENDING) { - window->current_index = 0; + window->current_shard = 0; + for (size_t i = 0; i < window->n_shards; i++) { + grn_window_shard *shard = &(window->shards[i]); + shard->current_index = 0; + } } else { - window->current_index = window->n_ids - 1; + window->current_shard = window->n_shards - 1; + for (size_t i = 0; i < window->n_shards; i++) { + grn_window_shard *shard = &(window->shards[i]); + shard->current_index = GRN_RECORD_VECTOR_SIZE(shard->ids) - 1; + } } GRN_API_RETURN(GRN_SUCCESS); @@ -107,11 +155,74 @@ grn_window_get_table(grn_ctx *ctx, grn_window *window) GRN_API_ENTER; if (!window) { - ERR(GRN_INVALID_ARGUMENT, "[window][rewind] window is NULL"); + ERR(GRN_INVALID_ARGUMENT, "[window][table][get] window is NULL"); + GRN_API_RETURN(NULL); + } + + if (window->current_shard < 0) { GRN_API_RETURN(NULL); } - GRN_API_RETURN(window->table); + grn_window_shard *shard = &(window->shards[window->current_shard]); + GRN_API_RETURN(shard->table); +} + +grn_obj * +grn_window_get_output_column(grn_ctx *ctx, grn_window *window) +{ + GRN_API_ENTER; + + if (!window) { + ERR(GRN_INVALID_ARGUMENT, "[window][output-column][get] window is NULL"); + GRN_API_RETURN(NULL); + } + + if (window->current_shard < 0) { + GRN_API_RETURN(NULL); + } + + grn_window_shard *shard = &(window->shards[window->current_shard]); + GRN_API_RETURN(shard->output_column); +} + +size_t +grn_window_get_n_arguments(grn_ctx *ctx, grn_window *window) +{ + GRN_API_ENTER; + + if (!window) { + ERR(GRN_INVALID_ARGUMENT, "[window][n-arguments][get] window is NULL"); + GRN_API_RETURN(0); + } + + if (window->current_shard < 0) { + GRN_API_RETURN(0); + } + + grn_window_shard *shard = &(window->shards[window->current_shard]); + GRN_API_RETURN(GRN_PTR_VECTOR_SIZE(shard->arguments)); +} + +grn_obj * +grn_window_get_argument(grn_ctx *ctx, grn_window *window, size_t i) +{ + GRN_API_ENTER; + + if (!window) { + ERR(GRN_INVALID_ARGUMENT, "[window][argument][get] window is NULL"); + GRN_API_RETURN(NULL); + } + + if (window->current_shard < 0) { + GRN_API_RETURN(NULL); + } + + grn_window_shard *shard = &(window->shards[window->current_shard]); + if (i < GRN_PTR_VECTOR_SIZE(shard->arguments)) { + GRN_API_RETURN(GRN_PTR_VALUE_AT(shard->arguments, i)); + } else { + GRN_API_RETURN(NULL); + } } grn_rc @@ -121,58 +232,199 @@ grn_window_set_direction(grn_ctx *ctx, { GRN_API_ENTER; + const char *tag = "[window][direction][set]"; if (!window) { - ERR(GRN_INVALID_ARGUMENT, "[window][set][direction] window is NULL"); + ERR(GRN_INVALID_ARGUMENT, "%s window is NULL", tag); GRN_API_RETURN(ctx->rc); } switch (direction) { case GRN_WINDOW_DIRECTION_ASCENDING : window->direction = direction; - window->current_index = 0; break; case GRN_WINDOW_DIRECTION_DESCENDING : window->direction = direction; - window->current_index = window->n_ids - 1; break; default : ERR(GRN_INVALID_ARGUMENT, - "[window][set][direction] direction must be " + "%s direction must be " "GRN_WINDOW_DIRECTION_ASCENDING(%d) or " "GRN_WINDOW_DIRECTION_DESCENDING(%d): %d", + tag, GRN_WINDOW_DIRECTION_ASCENDING, GRN_WINDOW_DIRECTION_DESCENDING, direction); GRN_API_RETURN(ctx->rc); break; } + grn_window_rewind(ctx, window); GRN_API_RETURN(GRN_SUCCESS); } -static grn_inline void +grn_rc grn_window_reset(grn_ctx *ctx, grn_window *window) { - GRN_BULK_REWIND(&(window->ids)); + GRN_API_ENTER; + for (size_t i = 0; i < window->n_shards; i++) { + grn_window_shard *shard = &(window->shards[i]); + grn_window_shard_fin(ctx, shard); + } + if (window->shards) { + GRN_FREE(window->shards); + window->shards = NULL; + window->n_shards = 0; + window->current_shard = -1; + } + GRN_API_RETURN(ctx->rc); +} + +static grn_bool +grn_expr_is_window_function_call(grn_ctx *ctx, + grn_obj *window_function_call) +{ + grn_expr *expr = (grn_expr *)window_function_call; + grn_expr_code *func; + grn_expr_code *call; + + func = &(expr->codes[0]); + call = &(expr->codes[expr->codes_curr - 1]); + + if (func->op != GRN_OP_PUSH) { + return GRN_FALSE; + } + if (!grn_obj_is_window_function_proc(ctx, func->value)) { + return GRN_FALSE; + } + + if (call->op != GRN_OP_CALL) { + return GRN_FALSE; + } + if (call->nargs != (expr->codes_curr - 1)) { + return GRN_FALSE; + } + + return GRN_TRUE; +} + +static bool +grn_window_add_record_validate(grn_ctx *ctx, + grn_window *window, + grn_obj *table, + grn_obj *window_function_call, + grn_obj *output_column, + const char *tag) +{ + if (!table) { + ERR(GRN_INVALID_ARGUMENT, "%s table is NULL", tag); + return false; + } + + if (!grn_expr_is_window_function_call(ctx, window_function_call)) { + grn_obj inspected; + GRN_TEXT_INIT(&inspected, 0); + grn_inspect(ctx, &inspected, window_function_call); + ERR(GRN_INVALID_ARGUMENT, + "%s must be window function call: %.*s", + tag, + (int)GRN_TEXT_LEN(&inspected), + GRN_TEXT_VALUE(&inspected)); + GRN_OBJ_FIN(ctx, &inspected); + return false; + } + + if (!output_column) { + ERR(GRN_INVALID_ARGUMENT, "%s output column is NULL", tag); + return false; + } + + return true; } -static grn_inline void +grn_rc grn_window_add_record(grn_ctx *ctx, grn_window *window, - grn_id record_id) + grn_obj *table, + grn_id record_id, + grn_obj *window_function_call, + grn_obj *output_column) { - GRN_RECORD_PUT(ctx, &(window->ids), record_id); + GRN_API_ENTER; + const char *tag = "[window][record][add]"; + if (window->n_shards == 0) { + if (!grn_window_add_record_validate(ctx, + window, + table, + window_function_call, + output_column, + tag)) { + GRN_API_RETURN(ctx->rc); + } + window->shards = GRN_MALLOCN(grn_window_shard, 1); + grn_window_shard_init(ctx, + &(window->shards[0]), + table, + window_function_call, + output_column); + window->current_shard = 0; + window->n_shards = 1; + } else if (window->shards[window->n_shards - 1].table != table) { + if (!grn_window_add_record_validate(ctx, + window, + table, + window_function_call, + output_column, + tag)) { + GRN_API_RETURN(ctx->rc); + } + const size_t new_n_shards = window->n_shards + 1; + grn_window_shard *shards = + GRN_REALLOC(window->shards, sizeof(grn_window_shard) * new_n_shards); + if (!shards) { + grn_rc rc = ctx->rc; + if (rc == GRN_SUCCESS) { + rc = GRN_NO_MEMORY_AVAILABLE; + } + char message[GRN_CTX_MSGSIZE]; + grn_strcpy(message, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(rc, + "%s failed to expand shards: %s", + tag, + message); + GRN_API_RETURN(ctx->rc); + } + window->shards = shards; + window->n_shards = new_n_shards; + grn_window_shard_init(ctx, + &(window->shards[window->n_shards - 1]), + table, + window_function_call, + output_column); + } + GRN_RECORD_PUT(ctx, + window->shards[window->n_shards - 1].ids, + record_id); + GRN_API_RETURN(ctx->rc); } -static grn_inline grn_bool +bool grn_window_is_empty(grn_ctx *ctx, grn_window *window) { - return GRN_BULK_VSIZE(&(window->ids)) == 0; + GRN_API_ENTER; + bool is_empty = true; + for (size_t i = 0; i < window->n_shards; i++) { + grn_window_shard *shard = &(window->shards[i]); + if (GRN_RECORD_VECTOR_SIZE(shard->ids) > 0) { + is_empty = false; + break; + } + } + GRN_API_RETURN(is_empty); } -grn_bool +bool grn_window_is_sorted(grn_ctx *ctx, grn_window *window) { GRN_API_ENTER; @@ -185,13 +437,32 @@ grn_window_is_sorted(grn_ctx *ctx, grn_window *window) GRN_API_RETURN(window->is_sorted); } +grn_rc +grn_window_set_is_sorted(grn_ctx *ctx, grn_window *window, bool is_sorted) +{ + GRN_API_ENTER; + + if (!window) { + ERR(GRN_INVALID_ARGUMENT, "[window][is-sorted][set] window is NULL"); + GRN_API_RETURN(ctx->rc); + } + + window->is_sorted = is_sorted; + + GRN_API_RETURN(ctx->rc); +} + size_t grn_window_get_size(grn_ctx *ctx, grn_window *window) { GRN_API_ENTER; - - GRN_API_RETURN(window->n_ids); + size_t n_ids = 0; + for (size_t i = 0; i < window->n_shards; i++) { + grn_window_shard *shard = &(window->shards[i]); + n_ids += GRN_RECORD_VECTOR_SIZE(shard->ids); + } + GRN_API_RETURN(n_ids); } grn_obj * @@ -231,70 +502,29 @@ grn_window_function_create(grn_ctx *ctx, GRN_API_RETURN(window_function); } -static grn_bool -grn_expr_is_window_function_call(grn_ctx *ctx, - grn_obj *window_function_call) -{ - grn_expr *expr = (grn_expr *)window_function_call; - grn_expr_code *func; - grn_expr_code *call; - - func = &(expr->codes[0]); - call = &(expr->codes[expr->codes_curr - 1]); - - if (func->op != GRN_OP_PUSH) { - return GRN_FALSE; - } - if (!grn_obj_is_window_function_proc(ctx, func->value)) { - return GRN_FALSE; - } - - if (call->op != GRN_OP_CALL) { - return GRN_FALSE; - } - if (call->nargs != (expr->codes_curr - 1)) { - return GRN_FALSE; - } - - return GRN_TRUE; -} - -static grn_rc -grn_expr_call_window_function(grn_ctx *ctx, - grn_obj *output_column, - grn_window *window, - grn_obj *window_function_call) +grn_rc +grn_window_execute(grn_ctx *ctx, grn_window *window) { - grn_rc rc; - grn_expr *expr = (grn_expr *)window_function_call; - grn_proc *proc; - int32_t i, n; - grn_obj args; - - proc = (grn_proc *)(expr->codes[0].value); + GRN_API_ENTER; - GRN_PTR_INIT(&args, GRN_OBJ_VECTOR, GRN_ID_NIL); - n = expr->codes_curr - 1; - for (i = 1; i < n; i++) { - /* TODO: Check op. */ - GRN_PTR_PUT(ctx, &args, expr->codes[i].value); - } - window->n_ids = GRN_BULK_VSIZE(&(window->ids)) / sizeof(grn_id); - if (window->direction == GRN_WINDOW_DIRECTION_ASCENDING) { - window->current_index = 0; - } else { - window->current_index = window->n_ids - 1; + if (window->n_shards == 0) { + GRN_API_RETURN(ctx->rc); } - rc = proc->callbacks.window_function(ctx, - output_column, - window, - (grn_obj **)GRN_BULK_HEAD(&args), - GRN_BULK_VSIZE(&args) / sizeof(grn_obj *)); - GRN_OBJ_FIN(ctx, &args); - return rc; + grn_window_rewind(ctx, window); + grn_window_shard *shard = &(window->shards[window->current_shard]); + grn_window_function_func *window_function_func = + shard->window_function->callbacks.window_function; + grn_rc rc = window_function_func(ctx, + shard->output_column, + window, + (grn_obj **)GRN_BULK_HEAD(shard->arguments), + GRN_PTR_VECTOR_SIZE(shard->arguments)); + + GRN_API_RETURN(rc); } +/* Deprecated since 9.0.2. */ grn_rc grn_table_apply_window_function(grn_ctx *ctx, grn_obj *table, @@ -310,163 +540,148 @@ grn_table_apply_window_function(grn_ctx *ctx, GRN_API_RETURN(ctx->rc); } - if (!grn_expr_is_window_function_call(ctx, window_function_call)) { - grn_obj inspected; - GRN_TEXT_INIT(&inspected, 0); - grn_inspect(ctx, &inspected, window_function_call); - ERR(GRN_INVALID_ARGUMENT, - "[table][apply][window-function] must be window function call: %.*s", - (int)GRN_TEXT_LEN(&inspected), - GRN_TEXT_VALUE(&inspected)); - GRN_OBJ_FIN(ctx, &inspected); + const size_t n_sort_keys = definition->n_group_keys + definition->n_sort_keys; + grn_table_sort_key *sort_keys = GRN_MALLOCN(grn_table_sort_key, n_sort_keys); + if (!sort_keys) { + grn_rc rc = ctx->rc; + char errbuf[GRN_CTX_MSGSIZE]; + if (rc == GRN_SUCCESS) { + rc = GRN_NO_MEMORY_AVAILABLE; + } + grn_strcpy(errbuf, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(rc, + "[table][apply][window-function] " + "failed to allocate internal sort keys: %s", + errbuf); GRN_API_RETURN(ctx->rc); } - { - size_t n_sort_keys; - grn_table_sort_key *sort_keys; - grn_obj *sorted; - grn_window window; - - n_sort_keys = definition->n_group_keys + definition->n_sort_keys; - sort_keys = GRN_MALLOCN(grn_table_sort_key, n_sort_keys); - if (!sort_keys) { - grn_rc rc = ctx->rc; - char errbuf[GRN_CTX_MSGSIZE]; - if (rc == GRN_SUCCESS) { - rc = GRN_NO_MEMORY_AVAILABLE; - } - grn_strcpy(errbuf, GRN_CTX_MSGSIZE, ctx->errbuf); - ERR(rc, - "[table][apply][window-function] " - "failed to allocate internal sort keys: %s", - errbuf); - GRN_API_RETURN(ctx->rc); - } - { - size_t i; - for (i = 0; i < definition->n_group_keys; i++) { - sort_keys[i] = definition->group_keys[i]; - } - for (i = 0; i < definition->n_sort_keys; i++) { - sort_keys[i + definition->n_group_keys] = definition->sort_keys[i]; - } + for (size_t i = 0; i < definition->n_group_keys; i++) { + sort_keys[i] = definition->group_keys[i]; + } + for (size_t i = 0; i < definition->n_sort_keys; i++) { + sort_keys[i + definition->n_group_keys] = definition->sort_keys[i]; + } + + grn_obj *sorted = grn_table_create(ctx, + NULL, 0, NULL, + GRN_OBJ_TABLE_NO_KEY, + NULL, + table); + if (!sorted) { + grn_rc rc = ctx->rc; + char errbuf[GRN_CTX_MSGSIZE]; + if (rc == GRN_SUCCESS) { + rc = GRN_NO_MEMORY_AVAILABLE; } - sorted = grn_table_create(ctx, - NULL, 0, NULL, - GRN_OBJ_TABLE_NO_KEY, - NULL, - table); - if (!sorted) { - grn_rc rc = ctx->rc; - char errbuf[GRN_CTX_MSGSIZE]; - if (rc == GRN_SUCCESS) { - rc = GRN_NO_MEMORY_AVAILABLE; - } - grn_strcpy(errbuf, GRN_CTX_MSGSIZE, ctx->errbuf); - GRN_FREE(sort_keys); - ERR(rc, - "[table][apply][window-function] " - "failed to allocate table to store sorted result: %s", - errbuf); - GRN_API_RETURN(ctx->rc); + grn_strcpy(errbuf, GRN_CTX_MSGSIZE, ctx->errbuf); + GRN_FREE(sort_keys); + ERR(rc, + "[table][apply][window-function] " + "failed to allocate table to store sorted result: %s", + errbuf); + GRN_API_RETURN(ctx->rc); + } + grn_table_sort(ctx, + table, + 0, -1, + sorted, + sort_keys, n_sort_keys); + + grn_window window; + grn_window_init(ctx, &window); + grn_window_set_is_sorted(ctx, &window, definition->n_sort_keys > 0); + if (definition->n_group_keys > 0) { + grn_obj *previous_values = GRN_MALLOCN(grn_obj, definition->n_group_keys); + grn_obj *current_values = GRN_MALLOCN(grn_obj, definition->n_group_keys); + + const size_t n = definition->n_group_keys; + for (size_t i = 0; i < n; i++) { + GRN_VOID_INIT(&(previous_values[i])); + GRN_VOID_INIT(&(current_values[i])); } - grn_table_sort(ctx, - table, - 0, -1, - sorted, - sort_keys, n_sort_keys); - - grn_window_init(ctx, &window, table, definition->n_sort_keys > 0); - if (definition->n_group_keys > 0) { - grn_obj *previous_values; - grn_obj *current_values; - size_t i, n; - - previous_values = GRN_MALLOCN(grn_obj, definition->n_group_keys); - current_values = GRN_MALLOCN(grn_obj, definition->n_group_keys); - n = definition->n_group_keys; - - for (i = 0; i < n; i++) { - GRN_VOID_INIT(&(previous_values[i])); - GRN_VOID_INIT(&(current_values[i])); - } - GRN_TABLE_EACH_BEGIN(ctx, sorted, cursor, id) { - void *value; - grn_id record_id; - grn_bool is_group_key_changed = GRN_FALSE; - - grn_table_cursor_get_value(ctx, cursor, &value); - record_id = *((grn_id *)value); - - for (i = 0; i < n; i++) { - size_t reverse_i = n - i - 1; - grn_obj *previous_value = &(previous_values[reverse_i]); - grn_obj *current_value = &(current_values[reverse_i]); - grn_obj *group_key = definition->group_keys[reverse_i].key; - - if (is_group_key_changed) { - GRN_BULK_REWIND(previous_value); - grn_obj_get_value(ctx, group_key, record_id, previous_value); - } else { - GRN_BULK_REWIND(current_value); - grn_obj_get_value(ctx, group_key, record_id, current_value); - if ((GRN_BULK_VSIZE(current_value) != - GRN_BULK_VSIZE(previous_value)) || - (memcmp(GRN_BULK_HEAD(current_value), - GRN_BULK_HEAD(previous_value), - GRN_BULK_VSIZE(current_value)) != 0)) { - is_group_key_changed = GRN_TRUE; - grn_bulk_write_from(ctx, - previous_value, - GRN_BULK_HEAD(current_value), - 0, - GRN_BULK_VSIZE(current_value)); - } + GRN_TABLE_EACH_BEGIN(ctx, sorted, cursor, id) { + void *value; + grn_table_cursor_get_value(ctx, cursor, &value); + const grn_id record_id = *((grn_id *)value); + + bool is_group_key_changed = false; + for (size_t i = 0; i < n; i++) { + const size_t reverse_i = n - i - 1; + grn_obj *previous_value = &(previous_values[reverse_i]); + grn_obj *current_value = &(current_values[reverse_i]); + grn_obj *group_key = definition->group_keys[reverse_i].key; + + if (is_group_key_changed) { + GRN_BULK_REWIND(previous_value); + grn_obj_get_value(ctx, group_key, record_id, previous_value); + } else { + GRN_BULK_REWIND(current_value); + grn_obj_get_value(ctx, group_key, record_id, current_value); + if ((GRN_BULK_VSIZE(current_value) != + GRN_BULK_VSIZE(previous_value)) || + (memcmp(GRN_BULK_HEAD(current_value), + GRN_BULK_HEAD(previous_value), + GRN_BULK_VSIZE(current_value)) != 0)) { + is_group_key_changed = true; + grn_bulk_write_from(ctx, + previous_value, + GRN_BULK_HEAD(current_value), + 0, + GRN_BULK_VSIZE(current_value)); } } + } - if (is_group_key_changed && !grn_window_is_empty(ctx, &window)) { - grn_expr_call_window_function(ctx, - output_column, - &window, - window_function_call); - grn_window_reset(ctx, &window); + if (is_group_key_changed && !grn_window_is_empty(ctx, &window)) { + grn_window_execute(ctx, &window); + if (ctx->rc != GRN_SUCCESS) { + break; } - grn_window_add_record(ctx, &window, record_id); - } GRN_TABLE_EACH_END(ctx, cursor); - grn_expr_call_window_function(ctx, - output_column, - &window, - window_function_call); - - for (i = 0; i < definition->n_group_keys; i++) { - GRN_OBJ_FIN(ctx, &(previous_values[i])); - GRN_OBJ_FIN(ctx, &(current_values[i])); + grn_window_reset(ctx, &window); + } + grn_window_add_record(ctx, + &window, + table, + record_id, + window_function_call, + output_column); + if (ctx->rc != GRN_SUCCESS) { + break; } - GRN_FREE(previous_values); - GRN_FREE(current_values); - } else { - GRN_TABLE_EACH_BEGIN(ctx, sorted, cursor, id) { - void *value; - grn_id record_id; - - grn_table_cursor_get_value(ctx, cursor, &value); - record_id = *((grn_id *)value); - grn_window_add_record(ctx, &window, record_id); - } GRN_TABLE_EACH_END(ctx, cursor); - grn_expr_call_window_function(ctx, - output_column, - &window, - window_function_call); + } GRN_TABLE_EACH_END(ctx, cursor); + for (size_t i = 0; i < definition->n_group_keys; i++) { + GRN_OBJ_FIN(ctx, &(previous_values[i])); + GRN_OBJ_FIN(ctx, &(current_values[i])); } - grn_window_fin(ctx, &window); + GRN_FREE(previous_values); + GRN_FREE(current_values); + } else { + GRN_TABLE_EACH_BEGIN(ctx, sorted, cursor, id) { + void *value; + grn_table_cursor_get_value(ctx, cursor, &value); + const grn_id record_id = *((grn_id *)value); + grn_window_add_record(ctx, + &window, + table, + record_id, + window_function_call, + output_column); + if (ctx->rc != GRN_SUCCESS) { + break; + } + } GRN_TABLE_EACH_END(ctx, cursor); + } + if (ctx->rc == GRN_SUCCESS && !grn_window_is_empty(ctx, &window)) { + grn_window_execute(ctx, &window); + } - grn_obj_close(ctx, sorted); + grn_window_fin(ctx, &window); - GRN_FREE(sort_keys); - } + grn_obj_close(ctx, sorted); + + GRN_FREE(sort_keys); GRN_API_RETURN(ctx->rc); } Added: lib/window_function_executor.c (+574 -0) 100644 =================================================================== --- /dev/null +++ lib/window_function_executor.c 2019-04-12 15:26:03 +0900 (f3cfaecc3) @@ -0,0 +1,574 @@ +/* -*- c-basic-offset: 2 -*- */ +/* + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "grn_ctx.h" +#include "grn_window_function_executor.h" + +grn_rc +grn_window_function_executor_init(grn_ctx *ctx, + grn_window_function_executor *executor) +{ + GRN_API_ENTER; + + GRN_PTR_INIT(&(executor->tables), GRN_OBJ_VECTOR, GRN_ID_NIL); + GRN_TEXT_INIT(&(executor->source), 0); + GRN_TEXT_INIT(&(executor->sort_keys), 0); + GRN_TEXT_INIT(&(executor->group_keys), 0); + GRN_TEXT_INIT(&(executor->output_column_name), 0); + executor->context.sort_keys = NULL; + executor->context.n_sort_keys = 0; + executor->context.group_keys = NULL; + executor->context.n_group_keys = 0; + executor->context.window_sort_keys = NULL; + executor->context.n_window_sort_keys = 0; + executor->context.sorted = NULL; + executor->values.n = 0; + executor->values.previous = NULL; + executor->values.current = NULL; + GRN_PTR_INIT(&(executor->window_function_calls), GRN_OBJ_VECTOR, GRN_ID_NIL); + GRN_PTR_INIT(&(executor->output_columns), GRN_OBJ_VECTOR, GRN_ID_NIL); + grn_window_init(ctx, &(executor->window)); + + GRN_API_RETURN(ctx->rc); +} + +static void +grn_window_function_executor_rewind(grn_ctx *ctx, + grn_window_function_executor *executor) +{ + grn_obj *window_function_calls = &(executor->window_function_calls); + const size_t n_calls = GRN_PTR_VECTOR_SIZE(window_function_calls); + for (size_t i = 0; i < n_calls; i++) { + grn_obj *window_function_call = GRN_PTR_VALUE_AT(window_function_calls, i); + if (window_function_call) { + grn_obj_close(ctx, window_function_call); + } + } + GRN_BULK_REWIND(window_function_calls); + + grn_obj *output_columns = &(executor->output_columns); + const size_t n_output_columns = GRN_PTR_VECTOR_SIZE(output_columns); + for (size_t i = 0; i < n_output_columns; i++) { + grn_obj *output_column = GRN_PTR_VALUE_AT(output_columns, i); + if (grn_obj_is_accessor(ctx, output_column)) { + grn_obj_close(ctx, output_column); + } + } + GRN_BULK_REWIND(output_columns); +} + +grn_rc +grn_window_function_executor_fin(grn_ctx *ctx, + grn_window_function_executor *executor) +{ + GRN_API_ENTER; + + if (!executor) { + GRN_API_RETURN(GRN_SUCCESS); + } + + grn_window_fin(ctx, &(executor->window)); + + grn_window_function_executor_rewind(ctx, executor); + GRN_OBJ_FIN(ctx, &(executor->output_columns)); + GRN_OBJ_FIN(ctx, &(executor->window_function_calls)); + + if (executor->values.n > 0) { + for (size_t i = 0; i < executor->values.n; i++) { + GRN_OBJ_FIN(ctx, &(executor->values.previous[i])); + GRN_OBJ_FIN(ctx, &(executor->values.current[i])); + } + GRN_FREE(executor->values.previous); + GRN_FREE(executor->values.current); + } + + if (executor->context.sorted) { + grn_obj_close(ctx, executor->context.sorted); + } + if (executor->context.window_sort_keys) { + GRN_FREE(executor->context.window_sort_keys); + } + if (executor->context.group_keys) { + grn_table_sort_key_close(ctx, + executor->context.group_keys, + executor->context.n_group_keys); + } + if (executor->context.sort_keys) { + grn_table_sort_key_close(ctx, + executor->context.sort_keys, + executor->context.n_sort_keys); + } + + GRN_OBJ_FIN(ctx, &(executor->output_column_name)); + GRN_OBJ_FIN(ctx, &(executor->group_keys)); + GRN_OBJ_FIN(ctx, &(executor->sort_keys)); + GRN_OBJ_FIN(ctx, &(executor->source)); + GRN_OBJ_FIN(ctx, &(executor->tables)); + + GRN_API_RETURN(GRN_SUCCESS); +} + +grn_window_function_executor * +grn_window_function_executor_open(grn_ctx *ctx) +{ + GRN_API_ENTER; + + grn_window_function_executor *executor; + + executor = GRN_CALLOC(sizeof(grn_window_function_executor)); + if (!executor) { + char errbuf[GRN_CTX_MSGSIZE]; + grn_strcpy(errbuf, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(ctx->rc, + "[window-function-executor][open] failed to allocate: %s", + errbuf); + GRN_API_RETURN(NULL); + } + + grn_window_function_executor_init(ctx, executor); + + if (ctx->rc != GRN_SUCCESS) { + GRN_FREE(executor); + executor = NULL; + } + + GRN_API_RETURN(executor); +} + +grn_rc +grn_window_function_executor_close(grn_ctx *ctx, + grn_window_function_executor *executor) +{ + GRN_API_ENTER; + + if (!executor) { + GRN_API_RETURN(GRN_SUCCESS); + } + + grn_window_function_executor_fin(ctx, executor); + GRN_FREE(executor); + + GRN_API_RETURN(GRN_SUCCESS); +} + +grn_rc +grn_window_function_executor_add_table(grn_ctx *ctx, + grn_window_function_executor *executor, + grn_obj *table) +{ + GRN_API_ENTER; + + if (!executor) { + ERR(GRN_INVALID_ARGUMENT, + "[window-function-executor][table][add] executor is NULL"); + GRN_API_RETURN(ctx->rc); + } + + grn_window_function_executor_rewind(ctx, executor); + + GRN_PTR_PUT(ctx, &(executor->tables), table); + + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_window_function_executor_set_source(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *source, + size_t source_size) +{ + GRN_API_ENTER; + + if (!executor) { + ERR(GRN_INVALID_ARGUMENT, + "[window-function-executor][source][set] executor is NULL"); + GRN_API_RETURN(ctx->rc); + } + + GRN_TEXT_SET(ctx, &(executor->source), source, source_size); + + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_window_function_executor_set_sort_keys(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *sort_keys, + size_t sort_keys_size) +{ + GRN_API_ENTER; + + if (!executor) { + ERR(GRN_INVALID_ARGUMENT, + "[window-function-executor][sort-keys][set] executor is NULL"); + GRN_API_RETURN(ctx->rc); + } + + GRN_TEXT_SET(ctx, &(executor->sort_keys), sort_keys, sort_keys_size); + + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_window_function_executor_set_group_keys(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *group_keys, + size_t group_keys_size) +{ + GRN_API_ENTER; + + if (!executor) { + ERR(GRN_INVALID_ARGUMENT, + "[window-function-executor][group-keys][set] executor is NULL"); + GRN_API_RETURN(ctx->rc); + } + + GRN_TEXT_SET(ctx, &(executor->group_keys), group_keys, group_keys_size); + + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_window_function_executor_set_output_column_name(grn_ctx *ctx, + grn_window_function_executor *executor, + const char *name, + size_t name_size) +{ + GRN_API_ENTER; + + if (!executor) { + ERR(GRN_INVALID_ARGUMENT, + "[window-function-executor][output-column-name][set] executor is NULL"); + GRN_API_RETURN(ctx->rc); + } + + GRN_TEXT_SET(ctx, &(executor->output_column_name), name, name_size); + + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_window_function_executor_execute(grn_ctx *ctx, + grn_window_function_executor *executor) +{ + const char *tag = "[window-function-executor][execute]"; + + GRN_API_ENTER; + + if (!executor) { + ERR(GRN_INVALID_ARGUMENT, + "%s executor is NULL", + tag); + GRN_API_RETURN(ctx->rc); + } + + grn_obj *source = &(executor->source); + if (GRN_TEXT_LEN(source) == 0) { + ERR(GRN_INVALID_ARGUMENT, + "%s no source", + tag); + GRN_API_RETURN(ctx->rc); + } + + grn_obj *output_column_name = &(executor->output_column_name); + if (GRN_TEXT_LEN(output_column_name) == 0) { + ERR(GRN_INVALID_ARGUMENT, + "%s no output column", + tag); + GRN_API_RETURN(ctx->rc); + } + + const size_t n_tables = GRN_PTR_VECTOR_SIZE(&(executor->tables)); + if (n_tables == 0) { + GRN_API_RETURN(ctx->rc); + } + + grn_window_function_executor_rewind(ctx, executor); + + for (size_t i = 0; i < n_tables; i++) { + grn_obj *table = GRN_PTR_VALUE_AT(&(executor->tables), i); + + grn_obj *output_column = grn_obj_column(ctx, + table, + GRN_TEXT_VALUE(output_column_name), + GRN_TEXT_LEN(output_column_name)); + if (!output_column) { + char table_name[GRN_TABLE_MAX_KEY_SIZE]; + int table_name_size; + table_name_size = grn_obj_name(ctx, + table, + table_name, + GRN_TABLE_MAX_KEY_SIZE); + if (table_name_size == 0) { + grn_strcpy(table_name, GRN_TABLE_MAX_KEY_SIZE, "(anonymous)"); + table_name_size = strlen(table_name); + } + ERR(GRN_INVALID_ARGUMENT, + "%s output column doesn't exist: <%.*s>: <%.*s>", + tag, + table_name_size, + table_name, + (int)GRN_TEXT_LEN(output_column_name), + GRN_TEXT_VALUE(output_column_name)); + GRN_API_RETURN(ctx->rc); + } + GRN_PTR_PUT(ctx, &(executor->output_columns), output_column); + + grn_obj *window_function_call; + grn_obj *record; + GRN_EXPR_CREATE_FOR_QUERY(ctx, table, window_function_call, record); + if (!window_function_call) { + char message[GRN_CTX_MSGSIZE]; + grn_strcpy(message, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(GRN_INVALID_ARGUMENT, + "%s failed to create expression to compile window function call: %s", + tag, + message); + GRN_API_RETURN(ctx->rc); + } + GRN_PTR_PUT(ctx, &(executor->window_function_calls), window_function_call); + grn_expr_parse(ctx, + window_function_call, + GRN_TEXT_VALUE(source), + GRN_TEXT_LEN(source), + NULL, + GRN_OP_MATCH, + GRN_OP_AND, + GRN_EXPR_SYNTAX_SCRIPT); + if (ctx->rc != GRN_SUCCESS) { + char message[GRN_CTX_MSGSIZE]; + grn_strcpy(message, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(ctx->rc, + "%s failed to parse window function call: <%.*s>: %s", + tag, + (int)GRN_TEXT_LEN(source), + GRN_TEXT_VALUE(source), + message); + GRN_API_RETURN(ctx->rc); + } + + unsigned int n_sort_keys = 0; + grn_table_sort_key *sort_keys = NULL; + if (GRN_TEXT_LEN(&(executor->sort_keys)) > 0) { + sort_keys = + grn_table_sort_key_from_str(ctx, + GRN_TEXT_VALUE(&(executor->sort_keys)), + GRN_TEXT_LEN(&(executor->sort_keys)), + table, + &n_sort_keys); + if (!sort_keys) { + ERR(ctx->rc, + "%s failed to parse sort keys: <%.*s>", + tag, + (int)GRN_TEXT_LEN(&(executor->sort_keys)), + GRN_TEXT_VALUE(&(executor->sort_keys))); + GRN_API_RETURN(ctx->rc); + } + if (executor->context.sort_keys) { + grn_table_sort_key_close(ctx, + executor->context.sort_keys, + executor->context.n_sort_keys); + } + executor->context.sort_keys = sort_keys; + executor->context.n_sort_keys = n_sort_keys; + } + + unsigned int n_group_keys = 0; + grn_table_sort_key *group_keys = NULL; + if (GRN_TEXT_LEN(&(executor->group_keys)) > 0) { + group_keys = + grn_table_sort_key_from_str(ctx, + GRN_TEXT_VALUE(&(executor->group_keys)), + GRN_TEXT_LEN(&(executor->group_keys)), + table, + &n_group_keys); + if (!group_keys) { + ERR(ctx->rc, + "%s failed to parse group keys: <%.*s>", + tag, + (int)GRN_TEXT_LEN(&(executor->group_keys)), + GRN_TEXT_VALUE(&(executor->group_keys))); + GRN_API_RETURN(ctx->rc); + } + if (executor->context.group_keys) { + grn_table_sort_key_close(ctx, + executor->context.group_keys, + executor->context.n_group_keys); + } + executor->context.group_keys = group_keys; + executor->context.n_group_keys = n_group_keys; + } + + const size_t n_window_sort_keys = n_sort_keys + n_group_keys; + if (executor->context.n_window_sort_keys < n_window_sort_keys) { + if (executor->context.window_sort_keys) { + GRN_FREE(executor->context.window_sort_keys); + } + executor->context.window_sort_keys = + GRN_MALLOCN(grn_table_sort_key, n_window_sort_keys); + if (!executor->context.window_sort_keys) { + grn_rc rc = ctx->rc; + char message[GRN_CTX_MSGSIZE]; + if (rc == GRN_SUCCESS) { + rc = GRN_NO_MEMORY_AVAILABLE; + } + grn_strcpy(message, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(rc, + "%s failed to allocate internal sort keys: %s", + tag, + message); + GRN_API_RETURN(ctx->rc); + } + executor->context.n_window_sort_keys = n_window_sort_keys; + } + grn_table_sort_key *window_sort_keys = executor->context.window_sort_keys; + for (size_t j = 0; j < n_group_keys; j++) { + window_sort_keys[j] = group_keys[j]; + } + for (size_t j = 0; j < n_sort_keys; j++) { + window_sort_keys[j + n_group_keys] = sort_keys[j]; + } + + grn_obj *sorted = grn_table_create(ctx, + NULL, 0, NULL, + GRN_OBJ_TABLE_NO_KEY, + NULL, + table); + if (!sorted) { + grn_rc rc = ctx->rc; + char errbuf[GRN_CTX_MSGSIZE]; + if (rc == GRN_SUCCESS) { + rc = GRN_NO_MEMORY_AVAILABLE; + } + grn_strcpy(errbuf, GRN_CTX_MSGSIZE, ctx->errbuf); + ERR(rc, + "%s failed to allocate table to store sorted result: %s", + tag, + errbuf); + GRN_API_RETURN(ctx->rc); + } + if (executor->context.sorted) { + grn_obj_close(ctx, executor->context.sorted); + executor->context.sorted = sorted; + } + grn_table_sort(ctx, + table, + 0, -1, + sorted, + window_sort_keys, + n_window_sort_keys); + + grn_window_set_is_sorted(ctx, &(executor->window), n_sort_keys > 0); + if (n_group_keys > 0) { + if (executor->values.n == 0) { + executor->values.n = n_group_keys; + executor->values.previous = GRN_MALLOCN(grn_obj, n_group_keys); + executor->values.current = GRN_MALLOCN(grn_obj, n_group_keys); + for (size_t j = 0; j < n_group_keys; j++) { + GRN_VOID_INIT(&(executor->values.previous[j])); + GRN_VOID_INIT(&(executor->values.current[j])); + } + } + if (n_group_keys != executor->values.n) { + ERR(GRN_INVALID_ARGUMENT, + "%s the number of group keys in tables is erratic: " + "<%u>: <%" GRN_FMT_SIZE ">", + tag, + n_group_keys, + executor->values.n); + GRN_API_RETURN(ctx->rc); + } + + GRN_TABLE_EACH_BEGIN(ctx, sorted, cursor, id) { + void *value; + grn_table_cursor_get_value(ctx, cursor, &value); + grn_id record_id = *((grn_id *)value); + + bool is_group_key_changed = false; + for (size_t j = 0; j < n_group_keys; j++) { + size_t reverse_j = n_group_keys - j - 1; + grn_obj *previous_value = &(executor->values.previous[reverse_j]); + grn_obj *current_value = &(executor->values.current[reverse_j]); + grn_obj *group_key = group_keys[reverse_j].key; + + if (is_group_key_changed) { + GRN_BULK_REWIND(previous_value); + grn_obj_get_value(ctx, group_key, record_id, previous_value); + } else { + GRN_BULK_REWIND(current_value); + grn_obj_get_value(ctx, group_key, record_id, current_value); + if ((GRN_BULK_VSIZE(current_value) != + GRN_BULK_VSIZE(previous_value)) || + (memcmp(GRN_BULK_HEAD(current_value), + GRN_BULK_HEAD(previous_value), + GRN_BULK_VSIZE(current_value)) != 0)) { + is_group_key_changed = true; + grn_bulk_write_from(ctx, + previous_value, + GRN_BULK_HEAD(current_value), + 0, + GRN_BULK_VSIZE(current_value)); + } + } + } + + if (is_group_key_changed && + !grn_window_is_empty(ctx, &(executor->window))) { + grn_window_execute(ctx, &(executor->window)); + if (ctx->rc != GRN_SUCCESS) { + break; + } + grn_window_reset(ctx, &(executor->window)); + } + grn_window_add_record(ctx, + &(executor->window), + table, + record_id, + window_function_call, + output_column); + if (ctx->rc != GRN_SUCCESS) { + break; + } + } GRN_TABLE_EACH_END(ctx, cursor); + } else { + GRN_TABLE_EACH_BEGIN(ctx, sorted, cursor, id) { + void *value; + grn_id record_id; + + grn_table_cursor_get_value(ctx, cursor, &value); + record_id = *((grn_id *)value); + grn_window_add_record(ctx, + &(executor->window), + table, + record_id, + window_function_call, + output_column); + if (ctx->rc != GRN_SUCCESS) { + break; + } + } GRN_TABLE_EACH_END(ctx, cursor); + } + } + if (ctx->rc == GRN_SUCCESS && + !grn_window_is_empty(ctx, &(executor->window))) { + grn_window_execute(ctx, &(executor->window)); + grn_window_reset(ctx, &(executor->window)); + } + + GRN_API_RETURN(ctx->rc); +} Modified: lib/window_functions.c (+202 -195) =================================================================== --- lib/window_functions.c 2019-04-10 14:52:22 +0900 (bfcec66d0) +++ lib/window_functions.c 2019-04-12 15:26:03 +0900 (9d23bde81) @@ -1,6 +1,7 @@ /* -*- c-basic-offset: 2 -*- */ /* Copyright(C) 2016-2018 Brazil + Copyright(C) 2019 Kouhei Sutou <kou****@clear*****> This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -21,10 +22,10 @@ static grn_rc window_record_number(grn_ctx *ctx, - grn_obj *output_column, + grn_obj *first_output_column, grn_window *window, - grn_obj **args, - int n_args) + grn_obj **first_args, + int first_n_args) { grn_id id; uint32_t nth_record = 1; @@ -33,6 +34,7 @@ window_record_number(grn_ctx *ctx, GRN_UINT32_INIT(&value, 0); while ((id = grn_window_next(ctx, window))) { GRN_UINT32_SET(ctx, &value, nth_record); + grn_obj *output_column = grn_window_get_output_column(ctx, window); grn_obj_set_value(ctx, output_column, id, &value, GRN_OBJ_SET); nth_record++; } @@ -43,28 +45,25 @@ window_record_number(grn_ctx *ctx, static grn_rc window_sum(grn_ctx *ctx, - grn_obj *output_column, + grn_obj *first_output_column, grn_window *window, - grn_obj **args, - int n_args) + grn_obj **first_args, + int first_n_args) { - grn_id id; - grn_obj *target; - - if (n_args != 1) { + if (first_n_args != 1) { GRN_PLUGIN_ERROR(ctx, GRN_INVALID_ARGUMENT, "window_sum(): wrong number of arguments (%d for 1)", - n_args); + first_n_args); return ctx->rc; } - target = args[0]; - if (!(grn_obj_is_scalar_column(ctx, target) || - grn_obj_is_accessor(ctx, target))) { + grn_obj *first_target = first_args[0]; + if (!(grn_obj_is_scalar_column(ctx, first_target) || + grn_obj_is_accessor(ctx, first_target))) { grn_obj inspected; GRN_TEXT_INIT(&inspected, 0); - grn_inspect(ctx, &inspected, target); + grn_inspect(ctx, &inspected, first_target); GRN_PLUGIN_ERROR(ctx, GRN_INVALID_ARGUMENT, "window_sum(): " @@ -76,223 +75,225 @@ window_sum(grn_ctx *ctx, return ctx->rc; } - { - const grn_id output_column_range_id = grn_obj_get_range(ctx, output_column); - const grn_id target_range_id = grn_obj_get_range(ctx, target); - grn_obj sum; - grn_obj value; - - switch (target_range_id) { - case GRN_DB_INT8 : - case GRN_DB_INT16 : - case GRN_DB_INT32 : - case GRN_DB_INT64 : - case GRN_DB_UINT8 : - case GRN_DB_UINT16 : - case GRN_DB_UINT32 : - case GRN_DB_UINT64 : - case GRN_DB_FLOAT : - break; - default : - { - grn_obj inspected; - GRN_TEXT_INIT(&inspected, 0); - grn_inspect(ctx, &inspected, target); - GRN_PLUGIN_ERROR(ctx, - GRN_INVALID_ARGUMENT, - "window_sum(): " - "the target column must be number column: <%.*s>", - (int)GRN_TEXT_LEN(&inspected), - GRN_TEXT_VALUE(&inspected)); - GRN_OBJ_FIN(ctx, &inspected); - return ctx->rc; - } - break; + const grn_id target_range_id = + grn_obj_get_range(ctx, first_target); + switch (target_range_id) { + case GRN_DB_INT8 : + case GRN_DB_INT16 : + case GRN_DB_INT32 : + case GRN_DB_INT64 : + case GRN_DB_UINT8 : + case GRN_DB_UINT16 : + case GRN_DB_UINT32 : + case GRN_DB_UINT64 : + case GRN_DB_FLOAT : + break; + default : + { + grn_obj inspected; + GRN_TEXT_INIT(&inspected, 0); + grn_inspect(ctx, &inspected, first_target); + GRN_PLUGIN_ERROR(ctx, + GRN_INVALID_ARGUMENT, + "window_sum(): " + "the target column must be number column: <%.*s>", + (int)GRN_TEXT_LEN(&inspected), + GRN_TEXT_VALUE(&inspected)); + GRN_OBJ_FIN(ctx, &inspected); + return ctx->rc; } + break; + } - switch (output_column_range_id) { - case GRN_DB_INT8 : - case GRN_DB_INT16 : - case GRN_DB_INT32 : - case GRN_DB_INT64 : - GRN_INT64_INIT(&sum, 0); - break; - case GRN_DB_UINT8 : - case GRN_DB_UINT16 : - case GRN_DB_UINT32 : - case GRN_DB_UINT64 : - GRN_UINT64_INIT(&sum, 0); - break; - case GRN_DB_FLOAT : - GRN_FLOAT_INIT(&sum, 0); - break; - default : - { - grn_obj inspected; - GRN_TEXT_INIT(&inspected, 0); - grn_inspect(ctx, &inspected, output_column); - GRN_PLUGIN_ERROR(ctx, - GRN_INVALID_ARGUMENT, - "window_sum(): " - "the output column must be number column: <%.*s>", - (int)GRN_TEXT_LEN(&inspected), - GRN_TEXT_VALUE(&inspected)); - GRN_OBJ_FIN(ctx, &inspected); - return ctx->rc; - } - break; + grn_obj sum; + const grn_id output_column_range_id = + grn_obj_get_range(ctx, first_output_column); + switch (output_column_range_id) { + case GRN_DB_INT8 : + case GRN_DB_INT16 : + case GRN_DB_INT32 : + case GRN_DB_INT64 : + GRN_INT64_INIT(&sum, 0); + break; + case GRN_DB_UINT8 : + case GRN_DB_UINT16 : + case GRN_DB_UINT32 : + case GRN_DB_UINT64 : + GRN_UINT64_INIT(&sum, 0); + break; + case GRN_DB_FLOAT : + GRN_FLOAT_INIT(&sum, 0); + break; + default : + { + grn_obj inspected; + GRN_TEXT_INIT(&inspected, 0); + grn_inspect(ctx, &inspected, first_output_column); + GRN_PLUGIN_ERROR(ctx, + GRN_INVALID_ARGUMENT, + "window_sum(): " + "the output column must be number column: <%.*s>", + (int)GRN_TEXT_LEN(&inspected), + GRN_TEXT_VALUE(&inspected)); + GRN_OBJ_FIN(ctx, &inspected); + return ctx->rc; } - GRN_VOID_INIT(&value); + break; + } - if (grn_window_is_sorted(ctx, window)) { - while ((id = grn_window_next(ctx, window))) { - GRN_BULK_REWIND(&value); - grn_obj_get_value(ctx, target, id, &value); - switch (target_range_id) { - case GRN_DB_INT8 : - GRN_INT64_SET(ctx, - &sum, - GRN_INT64_VALUE(&sum) + GRN_INT8_VALUE(&value)); - break; - case GRN_DB_INT16 : - GRN_INT64_SET(ctx, - &sum, - GRN_INT64_VALUE(&sum) + GRN_INT16_VALUE(&value)); - break; - case GRN_DB_INT32 : - GRN_INT64_SET(ctx, - &sum, - GRN_INT64_VALUE(&sum) + GRN_INT32_VALUE(&value)); - break; - case GRN_DB_INT64 : - GRN_INT64_SET(ctx, - &sum, - GRN_INT64_VALUE(&sum) + GRN_INT64_VALUE(&value)); - break; - case GRN_DB_UINT8 : - GRN_UINT64_SET(ctx, - &sum, - GRN_UINT64_VALUE(&sum) + GRN_UINT8_VALUE(&value)); - break; - case GRN_DB_UINT16 : - GRN_UINT64_SET(ctx, - &sum, - GRN_UINT64_VALUE(&sum) + GRN_UINT16_VALUE(&value)); - break; - case GRN_DB_UINT32 : - GRN_UINT64_SET(ctx, - &sum, - GRN_UINT64_VALUE(&sum) + GRN_UINT32_VALUE(&value)); - break; - case GRN_DB_UINT64 : - GRN_UINT64_SET(ctx, - &sum, - GRN_UINT64_VALUE(&sum) + GRN_UINT64_VALUE(&value)); - break; - case GRN_DB_FLOAT : - GRN_FLOAT_SET(ctx, - &sum, - GRN_FLOAT_VALUE(&sum) + GRN_FLOAT_VALUE(&value)); - break; - default : - break; - } - grn_obj_set_value(ctx, output_column, id, &sum, GRN_OBJ_SET); - } - } else { - int64_t sum_raw_int64 = 0; - uint64_t sum_raw_uint64 = 0; - double sum_raw_double = 0.0; + grn_obj value; + GRN_VOID_INIT(&value); - while ((id = grn_window_next(ctx, window))) { - GRN_BULK_REWIND(&value); - grn_obj_get_value(ctx, target, id, &value); - switch (target_range_id) { - case GRN_DB_INT8 : - sum_raw_int64 += GRN_INT8_VALUE(&value); - break; - case GRN_DB_INT16 : - sum_raw_int64 += GRN_INT16_VALUE(&value); - break; - case GRN_DB_INT32 : - sum_raw_int64 += GRN_INT32_VALUE(&value); - break; - case GRN_DB_INT64 : - sum_raw_int64 += GRN_INT64_VALUE(&value); - break; - case GRN_DB_UINT8 : - sum_raw_uint64 += GRN_UINT8_VALUE(&value); - break; - case GRN_DB_UINT16 : - sum_raw_uint64 += GRN_UINT16_VALUE(&value); - break; - case GRN_DB_UINT32 : - sum_raw_uint64 += GRN_UINT32_VALUE(&value); - break; - case GRN_DB_UINT64 : - sum_raw_uint64 += GRN_UINT64_VALUE(&value); - break; - case GRN_DB_FLOAT : - sum_raw_double += GRN_FLOAT_VALUE(&value); - break; - default : - break; - } + if (grn_window_is_sorted(ctx, window)) { + grn_id id; + while ((id = grn_window_next(ctx, window))) { + GRN_BULK_REWIND(&value); + grn_obj *target = grn_window_get_argument(ctx, window, 0); + grn_obj_get_value(ctx, target, id, &value); + switch (target_range_id) { + case GRN_DB_INT8 : + GRN_INT64_SET(ctx, + &sum, + GRN_INT64_VALUE(&sum) + GRN_INT8_VALUE(&value)); + break; + case GRN_DB_INT16 : + GRN_INT64_SET(ctx, + &sum, + GRN_INT64_VALUE(&sum) + GRN_INT16_VALUE(&value)); + break; + case GRN_DB_INT32 : + GRN_INT64_SET(ctx, + &sum, + GRN_INT64_VALUE(&sum) + GRN_INT32_VALUE(&value)); + break; + case GRN_DB_INT64 : + GRN_INT64_SET(ctx, + &sum, + GRN_INT64_VALUE(&sum) + GRN_INT64_VALUE(&value)); + break; + case GRN_DB_UINT8 : + GRN_UINT64_SET(ctx, + &sum, + GRN_UINT64_VALUE(&sum) + GRN_UINT8_VALUE(&value)); + break; + case GRN_DB_UINT16 : + GRN_UINT64_SET(ctx, + &sum, + GRN_UINT64_VALUE(&sum) + GRN_UINT16_VALUE(&value)); + break; + case GRN_DB_UINT32 : + GRN_UINT64_SET(ctx, + &sum, + GRN_UINT64_VALUE(&sum) + GRN_UINT32_VALUE(&value)); + break; + case GRN_DB_UINT64 : + GRN_UINT64_SET(ctx, + &sum, + GRN_UINT64_VALUE(&sum) + GRN_UINT64_VALUE(&value)); + break; + case GRN_DB_FLOAT : + GRN_FLOAT_SET(ctx, + &sum, + GRN_FLOAT_VALUE(&sum) + GRN_FLOAT_VALUE(&value)); + break; + default : + break; } + grn_obj *output_column = grn_window_get_output_column(ctx, window); + grn_obj_set_value(ctx, output_column, id, &sum, GRN_OBJ_SET); + } + } else { + int64_t sum_raw_int64 = 0; + uint64_t sum_raw_uint64 = 0; + double sum_raw_double = 0.0; - switch (output_column_range_id) { + grn_id id; + while ((id = grn_window_next(ctx, window))) { + GRN_BULK_REWIND(&value); + grn_obj *target = grn_window_get_argument(ctx, window, 0); + grn_obj_get_value(ctx, target, id, &value); + switch (target_range_id) { case GRN_DB_INT8 : + sum_raw_int64 += GRN_INT8_VALUE(&value); + break; case GRN_DB_INT16 : + sum_raw_int64 += GRN_INT16_VALUE(&value); + break; case GRN_DB_INT32 : + sum_raw_int64 += GRN_INT32_VALUE(&value); + break; case GRN_DB_INT64 : - GRN_INT64_SET(ctx, &sum, sum_raw_int64); + sum_raw_int64 += GRN_INT64_VALUE(&value); break; case GRN_DB_UINT8 : + sum_raw_uint64 += GRN_UINT8_VALUE(&value); + break; case GRN_DB_UINT16 : + sum_raw_uint64 += GRN_UINT16_VALUE(&value); + break; case GRN_DB_UINT32 : + sum_raw_uint64 += GRN_UINT32_VALUE(&value); + break; case GRN_DB_UINT64 : - GRN_UINT64_SET(ctx, &sum, sum_raw_uint64); + sum_raw_uint64 += GRN_UINT64_VALUE(&value); break; case GRN_DB_FLOAT : - GRN_FLOAT_SET(ctx, &sum, sum_raw_double); + sum_raw_double += GRN_FLOAT_VALUE(&value); + break; + default : break; } + } - grn_window_rewind(ctx, window); - while ((id = grn_window_next(ctx, window))) { - grn_obj_set_value(ctx, output_column, id, &sum, GRN_OBJ_SET); - } + switch (output_column_range_id) { + case GRN_DB_INT8 : + case GRN_DB_INT16 : + case GRN_DB_INT32 : + case GRN_DB_INT64 : + GRN_INT64_SET(ctx, &sum, sum_raw_int64); + break; + case GRN_DB_UINT8 : + case GRN_DB_UINT16 : + case GRN_DB_UINT32 : + case GRN_DB_UINT64 : + GRN_UINT64_SET(ctx, &sum, sum_raw_uint64); + break; + case GRN_DB_FLOAT : + GRN_FLOAT_SET(ctx, &sum, sum_raw_double); + break; } - GRN_OBJ_FIN(ctx, &value); - GRN_OBJ_FIN(ctx, &sum); + grn_window_rewind(ctx, window); + while ((id = grn_window_next(ctx, window))) { + grn_obj *output_column = grn_window_get_output_column(ctx, window); + grn_obj_set_value(ctx, output_column, id, &sum, GRN_OBJ_SET); + } } + GRN_OBJ_FIN(ctx, &value); + GRN_OBJ_FIN(ctx, &sum); + return GRN_SUCCESS; } static grn_rc window_count(grn_ctx *ctx, - grn_obj *output_column, + grn_obj *first_output_column, grn_window *window, - grn_obj **args, - int n_args) + grn_obj **first_args, + int first_n_args) { - grn_id id; - grn_id output_column_range_id; - grn_obj n_records; - uint32_t n_records_raw = 0; - - - if (n_args != 0) { + if (first_n_args != 0) { GRN_PLUGIN_ERROR(ctx, GRN_INVALID_ARGUMENT, "window_count(): wrong number of arguments (%d for 0)", - n_args); + first_n_args); return ctx->rc; } - output_column_range_id = grn_obj_get_range(ctx, output_column); + grn_obj n_records; + grn_id output_column_range_id; + output_column_range_id = grn_obj_get_range(ctx, first_output_column); switch (output_column_range_id) { case GRN_DB_INT8 : case GRN_DB_INT16 : @@ -313,7 +314,7 @@ window_count(grn_ctx *ctx, { grn_obj inspected; GRN_TEXT_INIT(&inspected, 0); - grn_inspect(ctx, &inspected, output_column); + grn_inspect(ctx, &inspected, first_output_column); GRN_PLUGIN_ERROR(ctx, GRN_INVALID_ARGUMENT, "window_count(): " @@ -327,6 +328,8 @@ window_count(grn_ctx *ctx, } if (grn_window_is_sorted(ctx, window)) { + uint32_t n_records_raw = 0; + grn_id id; while ((id = grn_window_next(ctx, window))) { n_records_raw++; switch (output_column_range_id) { @@ -348,9 +351,12 @@ window_count(grn_ctx *ctx, default : break; } + grn_obj *output_column = grn_window_get_output_column(ctx, window); grn_obj_set_value(ctx, output_column, id, &n_records, GRN_OBJ_SET); } } else { + uint32_t n_records_raw = 0; + grn_id id; while ((id = grn_window_next(ctx, window))) { n_records_raw++; } @@ -375,6 +381,7 @@ window_count(grn_ctx *ctx, grn_window_rewind(ctx, window); while ((id = grn_window_next(ctx, window))) { + grn_obj *output_column = grn_window_get_output_column(ctx, window); grn_obj_set_value(ctx, output_column, id, &n_records, GRN_OBJ_SET); } } Modified: plugins/sharding/dynamic_columns.rb (+61 -8) =================================================================== --- plugins/sharding/dynamic_columns.rb 2019-04-10 14:52:22 +0900 (9c76f19d2) +++ plugins/sharding/dynamic_columns.rb 2019-04-12 15:26:03 +0900 (2796cc31d) @@ -66,6 +66,18 @@ module Groonga each_output(&block) end + def apply_initial(targets) + apply(@initial_contexts, targets) + end + + def apply_filtered(targets) + apply(@filtered_contexts, targets) + end + + def apply_output(targets) + apply(@output_contexts, targets) + end + def empty? @initial_contexts.empty? and @filtered_contexts.empty? and @@ -91,6 +103,30 @@ module Groonga end key end + + private + def apply(contexts, targets) + window_function_contexts = [] + normal_contexts = [] + contexts.each do |context| + if context.window_function? + window_function_contexts << context + else + normal_contexts << context + end + end + + result_sets = [] + targets.each do |result_set, condition| + normal_contexts.each do |context| + context.apply(result_set, condition) + end + result_sets << result_set + end + window_function_contexts.each do |context| + context.apply_window_function(result_sets) + end + end end class DynamicColumnExecuteContexts @@ -156,6 +192,11 @@ module Groonga @window_group_keys = parse_keys(arguments["window.group_keys"]) end + def window_function? + (not @window_sort_keys.empty?) or + (not @window_group_keys.empty?) + end + def apply(table, condition=nil) column = table.create_column(@label, @flags, @type) return if table.empty? @@ -163,19 +204,31 @@ module Groonga expression = Expression.create(table) begin expression.parse(@value) - if @window_sort_keys.empty? and @window_group_keys.empty? - expression.condition = condition if condition - table.apply_expression(column, expression) - else - table.apply_window_function(column, expression, - :sort_keys => @window_sort_keys, - :group_keys => @window_group_keys) - end + expression.condition = condition if condition + table.apply_expression(column, expression) ensure expression.close end end + def apply_window_function(tables) + executor = WindowFunctionExecutor.new + begin + executor.source = @value + executor.sort_keys = @window_sort_keys.join(", ") + executor.group_keys = @window_group_keys.join(", ") + executor.output_column_name = @label + tables.each do |table| + column = table.create_column(@label, @flags, @type) + return if table.empty? + executor.add_table(table) + end + executor.execute + ensure + executor.close + end + end + private def parse_type(type_raw) return nil if type_raw.nil? Modified: plugins/sharding/logical_range_filter.rb (+7 -10) =================================================================== --- plugins/sharding/logical_range_filter.rb 2019-04-10 14:52:22 +0900 (f53779bfe) +++ plugins/sharding/logical_range_filter.rb 2019-04-12 15:26:03 +0900 (5636e648c) @@ -192,12 +192,9 @@ module Groonga if****@conte*****_sets.empty? result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC, :key_type => first_shard.table) - @context.dynamic_columns.each_initial do |dynamic_column| - dynamic_column.apply(result_set) - end - @context.dynamic_columns.each_filtered do |dynamic_column| - dynamic_column.apply(result_set) - end + targets = [[result_set, nil]] + @context.dynamic_columns.apply_initial(targets) + @context.dynamic_columns.apply_filtered(targets) @context.temporary_tables << result_set @context.result_sets << result_set end @@ -481,7 +478,7 @@ module Groonga range_index = nil end - @context.dynamic_columns.each_initial do |dynamic_column| + if****@conte*****_columns.have_initial? if @target_table ==****@shard***** if @cover_type == :all @target_table = @target_table.select_all @@ -496,7 +493,7 @@ module Groonga end @temporary_tables << @target_table end - dynamic_column.apply(@target_table) + @context.dynamic_columns.apply_initial([[@target_table, nil]]) end execute_filter(range_index, expression_builder) @@ -917,12 +914,12 @@ module Groonga end def sort_result_set(result_set) - @context.dynamic_columns.each_filtered do |dynamic_column| + if****@conte*****_columns.have_filtered? if result_set ==****@shard***** result_set = result_set.select_all @temporary_tables << result_set end - dynamic_column.apply(result_set) + @context.dynamic_columns.apply_filtered([[result_set, nil]]) end unless @post_filter.nil? Modified: plugins/sharding/logical_select.rb (+57 -33) =================================================================== --- plugins/sharding/logical_select.rb 2019-04-10 14:52:22 +0900 (8b9c845f7) +++ plugins/sharding/logical_select.rb 2019-04-12 15:26:03 +0900 (5dfa30e26) @@ -338,6 +338,8 @@ module Groonga attr_reader :load_values attr_reader :dynamic_columns attr_reader :result_sets + attr_reader :shard_targets + attr_reader :shard_results attr_reader :plain_drilldown attr_reader :labeled_drilldowns attr_reader :temporary_tables @@ -360,6 +362,8 @@ module Groonga @dynamic_columns = DynamicColumns.parse(@input) @result_sets = [] + @shard_targets = [] + @shard_results = [] @plain_drilldown = PlainDrilldownExecuteContext.new(@input) @labeled_drilldowns = LabeledDrilldowns.parse(@input) @@ -609,7 +613,7 @@ module Groonga enumerator.each do |shard, shard_range| first_shard ||= shard shard_executor = ShardExecutor.new(@context, shard, shard_range) - shard_executor.execute + shard_executor.execute_pre end if first_shard.nil? message = @@ -618,17 +622,35 @@ module Groonga "shard_key: <#{enumerator.shard_key_name}>" raise InvalidArgument, message end - if****@conte*****_sets.empty? + + if****@conte*****_columns.have_initial? + targets = [] + @context.shard_targets.each do |_, target_table| + targets << [target_table, nil] + end + @context.dynamic_columns.apply_initial(targets) + end + @context.shard_targets.each do |shard_executor, target_table| + shard_executor.execute + end + + if****@conte*****_results.empty? result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC, :key_type => first_shard.table) @context.temporary_tables << result_set - @context.dynamic_columns.each_initial do |dynamic_column| - dynamic_column.apply(result_set) + targets = [[result_set, nil]] + @context.dynamic_columns.apply_initial(targets) + @context.dynamic_columns.apply_filtered(targets) + @context.result_sets << result_set + else + targets = [] + @context.shard_results.each do |_, result_set, condition| + targets << [result_set, condition] end - @context.dynamic_columns.each_filtered do |dynamic_column| - dynamic_column.apply(result_set) + @context.dynamic_columns.apply_filtered(targets) + @context.shard_results.each do |shard_executor, result_set, _| + shard_executor.execute_post(result_set) end - @context.result_sets << result_set end end @@ -691,9 +713,7 @@ module Groonga end end result_set = group_result.table - drilldown.dynamic_columns.each_initial do |dynamic_column| - dynamic_column.apply(result_set) - end + drilldown.dynamic_columns.apply_initial([[result_set, nil]]) result_set = apply_drilldown_filter(drilldown, result_set) if drilldown.sort_keys.empty? drilldown.result_set = result_set @@ -745,6 +765,8 @@ module Groonga @post_filter =****@conte*****_filter @sort_keys =****@conte*****_keys @result_sets =****@conte*****_sets + @shard_targets =****@conte*****_targets + @shard_results =****@conte*****_results @temporary_tables =****@conte*****_tables @target_range =****@conte*****_range @@ -752,7 +774,7 @@ module Groonga @cover_type = @target_range.cover_type(@shard_range) end - def execute + def execute_pre return if @cover_type == :none return if @target_table.empty? @@ -763,7 +785,7 @@ module Groonga raise InvalidArgument, message end - @context.dynamic_columns.each_initial do |dynamic_column| + if****@conte*****_columns.have_initial? if @target_table ==****@shard***** if @cover_type == :all @target_table = @target_table.select_all @@ -777,10 +799,12 @@ module Groonga end @temporary_tables << @target_table end - dynamic_column.apply(@target_table) end + @shard_targets << [self, @target_table] + end - create_expression_builder(shard_key) do |expression_builder| + def execute + create_expression_builder(@shard.key) do |expression_builder| case @cover_type when :all filter_shard_all(expression_builder) @@ -800,11 +824,26 @@ module Groonga end end + def execute_post(result_set) + if @post_filter + result_set = apply_post_filter(result_set) + @temporary_tables << result_set + end + + if @sort_keys.empty? + @result_sets << result_set + else + sorted_result_set = result_set.sort(@sort_keys) + @temporary_tables << sorted_result_set + @result_sets << sorted_result_set + end + end + private def filter_shard_all(expression_builder) if****@query*****? and****@filte*****? @temporary_tables.delete(@target_table) - add_result_set(@target_table, nil) + add_result(@target_table, nil) else filter_table do |expression| expression_builder.build_all(expression) @@ -836,7 +875,7 @@ module Groonga table = @target_table expression = create_expression(table) yield(expression) - add_result_set(table.select(expression), expression) + add_result(table.select(expression), expression) end def apply_post_filter(table) @@ -845,7 +884,7 @@ module Groonga table.select(expression) end - def add_result_set(result_set, condition) + def add_result(result_set, condition) query_logger.log(:size, ":", "select(#{result_set.size})[#{@shard.table_name}]") @@ -863,22 +902,7 @@ module Groonga @temporary_tables << result_set end - @context.dynamic_columns.each_filtered do |dynamic_column| - dynamic_column.apply(result_set, condition) - end - - if @post_filter - result_set = apply_post_filter(result_set) - @temporary_tables << result_set - end - - if @sort_keys.empty? - @result_sets << result_set - else - sorted_result_set = result_set.sort(@sort_keys) - @temporary_tables << sorted_result_set - @result_sets << sorted_result_set - end + @shard_results << [self, result_set, condition] end def query_logger Modified: test/command/suite/sharding/logical_select/cache/columns/window/group_keys.expected (+2 -2) =================================================================== --- test/command/suite/sharding/logical_select/cache/columns/window/group_keys.expected 2019-04-10 14:52:22 +0900 (680ab3e33) +++ test/command/suite/sharding/logical_select/cache/columns/window/group_keys.expected 2019-04-12 15:26:03 +0900 (365d4dcc5) @@ -111,11 +111,11 @@ logical_select Logs --shard_key timestamp --columns[sum].stage initial --c ], [ 200, - 200 + 400 ], [ 200, - 200 + 400 ], [ 300, Modified: test/command/suite/sharding/logical_select/cache/columns/window/sort_keys.expected (+4 -4) =================================================================== --- test/command/suite/sharding/logical_select/cache/columns/window/sort_keys.expected 2019-04-10 14:52:22 +0900 (306c93220) +++ test/command/suite/sharding/logical_select/cache/columns/window/sort_keys.expected 2019-04-12 15:26:03 +0900 (5d98ab774) @@ -60,11 +60,11 @@ logical_select Logs --shard_key timestamp --columns[sum].stage initial --c ], [ 300, - 300 + 600 ], [ 400, - 700 + 1000 ] ] ] @@ -101,11 +101,11 @@ logical_select Logs --shard_key timestamp --columns[sum].stage initial --c ], [ 300, - 700 + 1000 ], [ 400, - 400 + 700 ] ] ] Modified: test/command/suite/sharding/logical_select/columns/stage/initial/range.expected (+5 -5) =================================================================== --- test/command/suite/sharding/logical_select/columns/stage/initial/range.expected 2019-04-10 14:52:22 +0900 (3a329f74b) +++ test/command/suite/sharding/logical_select/columns/stage/initial/range.expected 2019-04-12 15:26:03 +0900 (ac9100880) @@ -71,27 +71,27 @@ logical_select Logs --shard_key timestamp --min "2017/03/15 01:00:00" --mi [ 2, 900, - 900 + 2730 ], [ 1, 520, - 1030 + 2730 ], [ 2, 510, - 1030 + 2730 ], [ 1, 500, - 800 + 2730 ], [ 2, 300, - 800 + 2730 ] ] ]