null+****@clear*****
null+****@clear*****
2012年 4月 3日 (火) 13:22:17 JST
Kouhei Sutou 2012-04-03 13:22:17 +0900 (Tue, 03 Apr 2012) New Revision: abbc5d90af2ff02bf718ec516b58ad85423079d8 Log: groonga: extract main loop code Modified files: src/groonga.c Modified: src/groonga.c (+51 -45) =================================================================== --- src/groonga.c 2012-04-03 12:56:00 +0900 (9e67506) +++ src/groonga.c 2012-04-03 13:22:17 +0900 (b85ee67) @@ -813,6 +813,56 @@ static grn_mutex q_mutex; static grn_cond q_cond; static uint32_t nthreads = 0, nfthreads = 0, max_nfthreads; +static void +run_server_loop(grn_ctx *ctx, grn_com_event *ev) +{ + while (!grn_com_event_poll(ctx, ev, 1000) && grn_gctx.stat != GRN_CTX_QUIT) { + grn_edge *edge; + while ((edge = (grn_edge *)grn_com_queue_deque(ctx, &ctx_old))) { + grn_obj *msg; + while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) { + grn_msg_close(&edge->ctx, msg); + } + while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->recv_new))) { + grn_msg_close(ctx, msg); + } + grn_ctx_fin(&edge->ctx); + if (edge->com->has_sid && edge->com->opaque == edge) { + grn_com_close(ctx, edge->com); + } + grn_edges_delete(ctx, edge); + } + // todo : log stat + } + for (;;) { + MUTEX_LOCK(q_mutex); + if (nthreads == nfthreads) { break; } + MUTEX_UNLOCK(q_mutex); + usleep(1000); + } + { + grn_edge *edge; + GRN_HASH_EACH(ctx, grn_edges, id, NULL, NULL, &edge, { + grn_obj *obj; + while ((obj = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) { + grn_msg_close(&edge->ctx, obj); + } + while ((obj = (grn_obj *)grn_com_queue_deque(ctx, &edge->recv_new))) { + grn_msg_close(ctx, obj); + } + grn_ctx_fin(&edge->ctx); + if (edge->com->has_sid) { + grn_com_close(ctx, edge->com); + } + grn_edges_delete(ctx, edge); + }); + } + { + grn_com *com; + GRN_HASH_EACH(ctx, ev->hash, id, NULL, NULL, &com, { grn_com_close(ctx, com); }); + } +} + static grn_rc run_server(grn_ctx *ctx, grn_obj *db, grn_com_event *ev, grn_edge_dispatcher_func dispatcher, grn_handler_func handler) @@ -825,51 +875,7 @@ run_server(grn_ctx *ctx, grn_obj *db, grn_com_event *ev, ev->opaque = db; grn_edges_init(ctx, dispatcher); if (!grn_com_sopen(ctx, ev, bind_address, port, handler, he)) { - while (!grn_com_event_poll(ctx, ev, 1000) && grn_gctx.stat != GRN_CTX_QUIT) { - grn_edge *edge; - while ((edge = (grn_edge *)grn_com_queue_deque(ctx, &ctx_old))) { - grn_obj *msg; - while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) { - grn_msg_close(&edge->ctx, msg); - } - while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->recv_new))) { - grn_msg_close(ctx, msg); - } - grn_ctx_fin(&edge->ctx); - if (edge->com->has_sid && edge->com->opaque == edge) { - grn_com_close(ctx, edge->com); - } - grn_edges_delete(ctx, edge); - } - // todo : log stat - } - for (;;) { - MUTEX_LOCK(q_mutex); - if (nthreads == nfthreads) { break; } - MUTEX_UNLOCK(q_mutex); - usleep(1000); - } - { - grn_edge *edge; - GRN_HASH_EACH(ctx, grn_edges, id, NULL, NULL, &edge, { - grn_obj *obj; - while ((obj = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) { - grn_msg_close(&edge->ctx, obj); - } - while ((obj = (grn_obj *)grn_com_queue_deque(ctx, &edge->recv_new))) { - grn_msg_close(ctx, obj); - } - grn_ctx_fin(&edge->ctx); - if (edge->com->has_sid) { - grn_com_close(ctx, edge->com); - } - grn_edges_delete(ctx, edge); - }); - } - { - grn_com *com; - GRN_HASH_EACH(ctx, ev->hash, id, NULL, NULL, &com, { grn_com_close(ctx, com); }); - } + run_server_loop(ctx, ev); rc = 0; } else { fprintf(stderr, "grn_com_sopen failed (%s:%d): %s\n",