diff --git a/Makefile.am b/Makefile.am index ca103440b..27074c96e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -85,7 +85,10 @@ memcached_SOURCES = \ cmdlog.h \ lqdetect.c \ lqdetect.h \ - trace.h + trace.h \ + cmd_in_second.c \ + cmd_in_second.h + memcached_LDFLAGS =-R '$(libdir)' memcached_CFLAGS = @PROFILER_FLAGS@ ${AM_CFLAGS} memcached_DEPENDENCIES = libmcd_util.la diff --git a/cmd_in_second.c b/cmd_in_second.c new file mode 100644 index 000000000..167cbd424 --- /dev/null +++ b/cmd_in_second.c @@ -0,0 +1,393 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * arcus-memcached - Arcus memory cache server + * Copyright 2015 JaM2in Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cmd_in_second.h" + +#define LOG_LENGTH 400 +#define IP_LENGTH 16 +#define KEY_LENGTH 256 +#define CMD_STR_LEN 30 + +static EXTENSION_LOGGER_DESCRIPTOR *mc_logger; + +typedef enum cmd_in_second_state { + NOT_STARTED, + ON_LOGGING, + ON_FLUSHING, + STOPPED +} state; + +typedef struct cmd_in_second_log { + char key[KEY_LENGTH]; + char client_ip[IP_LENGTH]; +} cmd_in_second_log; + +typedef struct cmd_in_second_buffer { + cmd_in_second_log* ring; + int front; + int rear; + int capacity; +} cmd_in_second_buffer; + +typedef struct cmd_in_second_timer { + struct timeval* ring; + int front; + int rear; + int capacity; + int last_elem_idx; + int circular_counter; +} cmd_in_second_timer; + +typedef struct cmd_in_second_flush_thread { + pthread_t tid; + pthread_attr_t attr; + pthread_cond_t cond; + pthread_mutex_t lock; + bool sleep; +} flush_thread; + +struct cmd_in_second_global { + int operation; + char cmd_str[50]; + struct cmd_in_second_buffer buffer; + cmd_in_second_timer timer; + int bulk_limit; + int log_per_timer; + state cur_state; + flush_thread flush; + pthread_mutex_t lock; +}; +static struct cmd_in_second_global cmd_in_second; + +static bool is_bulk_cmd() +{ + bool timer_empty = cmd_in_second.timer.front == cmd_in_second.timer.rear; + + if (timer_empty) { + return false; + } + + struct timeval* front_time = &cmd_in_second.timer.ring[cmd_in_second.timer.front]; + struct timeval* last_time = &cmd_in_second.timer.ring[cmd_in_second.timer.last_elem_idx]; + + return last_time->tv_sec - front_time->tv_sec <= 1; +} + +static void do_flush_sleep() { + struct timeval now; + struct timespec timeout; + + pthread_mutex_lock(&cmd_in_second.flush.lock); + gettimeofday(&now, NULL); + + now.tv_usec += 50000; + + if (now.tv_usec >= 1000000) { + now.tv_sec += 1; + now.tv_usec -= 1000000; + } + + timeout.tv_sec = now.tv_sec; + timeout.tv_nsec = now.tv_usec * 1000; + + + cmd_in_second.flush.sleep = true; + pthread_cond_timedwait(&cmd_in_second.flush.cond, &cmd_in_second.flush.lock, &timeout); + cmd_in_second.flush.sleep = false; + + pthread_mutex_unlock(&cmd_in_second.flush.lock); +} + +static void do_flush_wakeup(){ + pthread_mutex_lock(&cmd_in_second.flush.lock); + if (cmd_in_second.flush.sleep) { + pthread_cond_signal(&cmd_in_second.flush.cond); + } + pthread_mutex_unlock(&cmd_in_second.flush.lock); +} + +static void* do_flush_write() +{ + int fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644); + + cmd_in_second_buffer* buffer = &cmd_in_second.buffer; + cmd_in_second_timer* timer = &cmd_in_second.timer; + + while(1) { + if (fd < 0) { + mc_logger->log(EXTENSION_LOG_WARNING, NULL, + "Can't open cmd_in_second.log"); + break; + } + + if (cmd_in_second.cur_state == STOPPED) { + break; + } + + if (cmd_in_second.cur_state != ON_FLUSHING) { + do_flush_sleep(); + continue; + } + + + char* log_str = (char*)malloc(LOG_LENGTH * cmd_in_second.bulk_limit * sizeof(char)); + + if (log_str == NULL) { + break; + } + + const size_t cmd_len = strlen(cmd_in_second.cmd_str); + const int whitespaces = 3; + + size_t expected_write_length = 0; + int circular_log_counter = 0; + + bool buffer_empty = buffer->front == buffer->rear; + + while (!buffer_empty) { + + cmd_in_second_log front = buffer->ring[buffer->front]; + buffer->front = (buffer->front+1) % buffer->capacity; + buffer_empty = buffer->front == buffer->rear; + + char time_str[50] = ""; + + if (circular_log_counter == 0) { + + struct timeval* front_time = &timer->ring[timer->front]; + struct tm* lt = localtime((time_t*)&front_time->tv_sec); + + timer->front = (timer->front+1) % timer->capacity; + + if (lt == NULL) { + mc_logger->log(EXTENSION_LOG_WARNING, NULL, + "localtime failed"); + continue; + } + + sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, + lt->tm_hour, lt->tm_min, lt->tm_sec, (int)front_time->tv_usec); + expected_write_length += 27; + } + + char log[LOG_LENGTH] = ""; + snprintf(log, LOG_LENGTH, "%s%s %s %s\n", time_str, cmd_in_second.cmd_str, front.key, front.client_ip); + strncat(log_str, log, LOG_LENGTH); + + expected_write_length += cmd_len + strlen(front.key) + strlen(front.client_ip) + whitespaces; + circular_log_counter = (circular_log_counter+1) % cmd_in_second.log_per_timer; + } + + int written_length = write(fd, log_str, expected_write_length); + free(log_str); + + if (written_length != expected_write_length) { + mc_logger->log(EXTENSION_LOG_WARNING, NULL, + "write length is difference to expectation"); + break; + } + + break; + } + + if (fd >= 0) { + close(fd); + } + + free(buffer->ring); + buffer->ring = NULL; + + free(timer->ring); + timer->ring = NULL; + + cmd_in_second.cur_state = NOT_STARTED; + + return NULL; +} + +static void do_buffer_add(const char* key, const char* client_ip) +{ + + cmd_in_second_buffer* buffer = &cmd_in_second.buffer; + + snprintf(buffer->ring[buffer->rear].key, + KEY_LENGTH, "%s", key); + snprintf(buffer->ring[buffer->rear].client_ip, + IP_LENGTH, "%s", client_ip); + + buffer->rear = (buffer->rear+1) % buffer->capacity; + + bool buffer_full = (buffer->rear+1) % buffer->capacity == + buffer->front; + + if (buffer_full) { + if (is_bulk_cmd()) { + cmd_in_second.cur_state = ON_FLUSHING; + do_flush_wakeup(); + return; + } + buffer->front = (buffer->front+1) % buffer->capacity; + } +} + +static void do_timer_add() +{ + struct cmd_in_second_timer* timer = &cmd_in_second.timer; + + if (gettimeofday(&timer->ring[timer->rear], NULL) == -1) { + mc_logger->log(EXTENSION_LOG_WARNING, NULL, + "gettimeofday failed"); + return; + } + + timer->rear = (timer->rear+1) % timer->capacity; + timer->last_elem_idx = timer->rear; + bool timer_full = (timer->rear+1) % timer->capacity == timer->front; + + if (timer_full) { + timer->front = (timer->front+1) % timer->capacity; + } +} + +void cmd_in_second_write(int operation, const char* key, const char* client_ip) +{ + pthread_mutex_lock(&cmd_in_second.lock); + + if (cmd_in_second.cur_state != ON_LOGGING || + cmd_in_second.operation != operation) { + pthread_mutex_unlock(&cmd_in_second.lock); + return; + } + + if (cmd_in_second.timer.circular_counter == 0) { + do_timer_add(); + } + + do_buffer_add(key, client_ip); + cmd_in_second.timer.circular_counter = (cmd_in_second.timer.circular_counter+1) % + cmd_in_second.log_per_timer; + + pthread_mutex_unlock(&cmd_in_second.lock); + return; +} + +void cmd_in_second_init(EXTENSION_LOGGER_DESCRIPTOR *logger) +{ + mc_logger = logger; + cmd_in_second.cur_state = NOT_STARTED; + + pthread_mutex_init(&cmd_in_second.lock, NULL); + pthread_attr_init(&cmd_in_second.flush.attr); + pthread_mutex_init(&cmd_in_second.flush.lock, NULL); + pthread_cond_init(&cmd_in_second.flush.cond, NULL); + + cmd_in_second.buffer.ring = NULL; + cmd_in_second.timer.ring = NULL; +} + +int cmd_in_second_start(int operation, const char cmd_str[], int bulk_limit) +{ + + pthread_mutex_lock(&cmd_in_second.lock); + + if (cmd_in_second.cur_state != NOT_STARTED) { + pthread_mutex_unlock(&cmd_in_second.lock); + return CMD_IN_SECOND_STARTED_ALREADY; + } + + int fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644); + + if (fd < 0) { + pthread_mutex_unlock(&cmd_in_second.lock); + return CMD_IN_SECOND_FILE_FAILED; + } + + close(fd); + + if (pthread_attr_init(&cmd_in_second.flush.attr) != 0 || + pthread_attr_setdetachstate(&cmd_in_second.flush.attr, PTHREAD_CREATE_DETACHED) != 0 || + (pthread_create(&cmd_in_second.flush.tid, &cmd_in_second.flush.attr, + do_flush_write, NULL)) != 0) + { + pthread_mutex_unlock(&cmd_in_second.lock); + return CMD_IN_SECOND_THREAD_FAILED; + } + + cmd_in_second.operation = operation; + cmd_in_second.bulk_limit = bulk_limit; + snprintf(cmd_in_second.cmd_str, CMD_STR_LEN, "%s", cmd_str); + + cmd_in_second.buffer.front = 0; + cmd_in_second.buffer.rear = 0; + cmd_in_second.buffer.capacity = bulk_limit+1; + cmd_in_second.buffer.ring = (cmd_in_second_log*)malloc(cmd_in_second.buffer.capacity * + sizeof(cmd_in_second_log)); + + if (cmd_in_second.buffer.ring == NULL) { + pthread_mutex_unlock(&cmd_in_second.lock); + return CMD_IN_SECOND_NO_MEM; + } + + cmd_in_second.log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0); + + cmd_in_second.timer.front = 0; + cmd_in_second.timer.rear = 0; + cmd_in_second.timer.circular_counter = 0; + cmd_in_second.timer.last_elem_idx = 0; + cmd_in_second.timer.capacity = 12; + cmd_in_second.timer.ring = (struct timeval*) malloc (cmd_in_second.timer.capacity * + sizeof(struct timeval)); + + if (cmd_in_second.timer.ring == NULL) { + free(cmd_in_second.buffer.ring); + pthread_mutex_unlock(&cmd_in_second.lock); + return CMD_IN_SECOND_NO_MEM; + } + + cmd_in_second.cur_state = ON_LOGGING; + + pthread_mutex_unlock(&cmd_in_second.lock); + + return CMD_IN_SECOND_START; +} + +void cmd_in_second_stop(bool* already_stop) { + + pthread_mutex_lock(&cmd_in_second.lock); + + if (cmd_in_second.cur_state != ON_LOGGING) { + *already_stop = true; + pthread_mutex_unlock(&cmd_in_second.lock); + return; + } + + *already_stop = false; + + cmd_in_second.cur_state = STOPPED; + + pthread_mutex_unlock(&cmd_in_second.lock); +} diff --git a/cmd_in_second.h b/cmd_in_second.h new file mode 100644 index 000000000..6490e84f2 --- /dev/null +++ b/cmd_in_second.h @@ -0,0 +1,21 @@ +#ifndef __CMD_IN_SECOND_ +#define __CMD_IN_SECOND_ +#endif + +#include +#include +#include +#include + +typedef enum { + CMD_IN_SECOND_START, + CMD_IN_SECOND_STARTED_ALREADY, + CMD_IN_SECOND_NO_MEM, + CMD_IN_SECOND_THREAD_FAILED, + CMD_IN_SECOND_FILE_FAILED, +} CMD_IN_SECOND_START_CODE; + +void cmd_in_second_init(EXTENSION_LOGGER_DESCRIPTOR *mc_logger); +int cmd_in_second_start(int operation, const char cmd[], int bulk_limit); +void cmd_in_second_write(int operation, const char* key, const char* client_ip); +void cmd_in_second_stop(bool* already_stop); diff --git a/include/memcached/types.h b/include/memcached/types.h index 5e12291ff..104cf2da1 100644 --- a/include/memcached/types.h +++ b/include/memcached/types.h @@ -38,7 +38,7 @@ struct iovec { #define SUPPORT_BOP_SMGET #define JHPARK_OLD_SMGET_INTERFACE #define MAX_EFLAG_COMPARE_COUNT 100 - +#define CMD_IN_SECOND #ifdef __cplusplus extern "C" { @@ -106,6 +106,19 @@ extern "C" { OPERATION_MGET /**< Retrieve with mget semantics */ } ENGINE_RETRIEVE_OPERATION; + typedef enum { + OPERATION_INCR = 31, + OPERATION_DECR, + } ENGINE_INCR_DECR_OPERATION; + + typedef enum { + OPERATION_DELETE = 51 + } ENGINE_DELETION_OPERATION; + + typedef enum { + OPERATION_GETATTR = 61, + OPERATION_SETATTR = 61, + } ENGINE_ITEM_ATTR_OPERATION; /* collection operation */ typedef enum { /* list operation */ @@ -142,7 +155,9 @@ extern "C" { // SUPPORT_BOP_MGET OPERATION_BOP_MGET, /**< B+tree operation with mget(multiple get) element semantics */ // SUPPORT_BOP_SMGET - OPERATION_BOP_SMGET /**< B+tree operation with smget(sort-merge get) element semantics */ + OPERATION_BOP_SMGET, /**< B+tree operation with smget(sort-merge get) element semantics */ + OPERATION_BOP_INCR, + OPERATION_BOP_DECR } ENGINE_COLL_OPERATION; /* item type */ diff --git a/memcached.c b/memcached.c index 22b72262e..4bbbd680d 100644 --- a/memcached.c +++ b/memcached.c @@ -36,6 +36,7 @@ #ifdef ENABLE_ZK_INTEGRATION #include "arcus_zk.h" #endif +#include "cmd_in_second.h" #if defined(ENABLE_SASL) || defined(ENABLE_ISASL) #define SASL_ENABLED @@ -1629,7 +1630,9 @@ static void process_lop_insert_complete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_lop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } - +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_INSERT, c->coll_key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { if (! lqdetect_lop_insert(c->client_ip, c->coll_key, c->coll_index)) { @@ -1692,6 +1695,9 @@ static void process_sop_insert_complete(conn *c) { stats_prefix_record_sop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_INSERT, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: STATS_HITS(c, sop_insert, c->coll_key, c->coll_nkey); @@ -1746,6 +1752,9 @@ static void process_sop_delete_complete(conn *c) { (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_DELETE, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: STATS_ELEM_HITS(c, sop_delete, c->coll_key, c->coll_nkey); @@ -1793,6 +1802,9 @@ static void process_sop_exist_complete(conn *c) { stats_prefix_record_sop_exist(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_EXIST, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -1864,6 +1876,9 @@ static void process_mop_insert_complete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_mop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MOP_INSERT, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -1914,6 +1929,9 @@ static void process_mop_update_complete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_mop_update(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MOP_UPDATE, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -1998,6 +2016,9 @@ static void process_mop_delete_complete(conn *c) { (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MOP_DELETE, c->coll_key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { if (! lqdetect_mop_delete(c->client_ip, c->coll_key, del_count, @@ -2117,6 +2138,9 @@ static void process_mop_get_complete(conn *c) stats_prefix_record_mop_get(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MOP_GET, c->coll_key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { if (! lqdetect_mop_get(c->client_ip, c->coll_key, elem_count, @@ -2291,6 +2315,9 @@ static void process_bop_insert_complete(conn *c) { stats_prefix_record_bop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_INSERT, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: STATS_HITS(c, bop_insert, c->coll_key, c->coll_nkey); @@ -2393,6 +2420,9 @@ static void process_bop_update_complete(conn *c) stats_prefix_record_bop_update(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_UPDATE, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -2486,6 +2516,9 @@ static void process_bop_mget_complete(conn *c) { stats_prefix_record_bop_get(key_tokens[k].value, key_tokens[k].length, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_MGET, c->coll_key, c->client_ip); +#endif if (ret == ENGINE_SUCCESS) { do { @@ -3072,6 +3105,9 @@ static void process_mget_complete(conn *c) if (settings.detail_enabled) { stats_prefix_record_get(key, nkey, NULL != it); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MGET, key, c->client_ip); +#endif if (it) { /* get_item_info() always returns true. */ (void)mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo); @@ -3658,13 +3694,21 @@ static void complete_incr_bin(conn *c) { } if (settings.detail_enabled) { - if (incr) { + if (incr) { stats_prefix_record_incr(key, nkey); - } else { + } else { stats_prefix_record_decr(key, nkey); - } + } } +#ifdef CMD_IN_SECOND + if (incr) { + cmd_in_second_write(OPERATION_INCR, key, c->client_ip); + } else { + cmd_in_second_write(OPERATION_DECR, key, c->client_ip); + } +#endif + switch (ret) { case ENGINE_SUCCESS: rsp->message.body.value = htonll(rsp->message.body.value); @@ -3917,6 +3961,9 @@ static void process_bin_get(conn *c) { if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) { stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_GET, key, c->client_ip); +#endif } static void append_bin_stats(const char *key, const uint16_t klen, @@ -4496,6 +4543,9 @@ static void process_bin_lop_create(conn *c) { stats_prefix_record_lop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: STATS_OKS(c, lop_create, key, nkey); @@ -4563,6 +4613,9 @@ static void process_bin_lop_prepare_nread(conn *c) { if (settings.detail_enabled && ret != ENGINE_SUCCESS) { stats_prefix_record_lop_insert(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_INSERT, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -4631,6 +4684,10 @@ static void process_bin_lop_insert_complete(conn *c) { stats_prefix_record_lop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_INSERT, c->coll_key, c->client_ip); +#endif + switch (ret) { case ENGINE_SUCCESS: STATS_HITS(c, lop_insert, c->coll_key, c->coll_nkey); @@ -4710,6 +4767,9 @@ static void process_bin_lop_delete(conn *c) { stats_prefix_record_lop_delete(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_DELETE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -4799,6 +4859,9 @@ static void process_bin_lop_get(conn *c) { stats_prefix_record_lop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_GET, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -4915,6 +4978,9 @@ static void process_bin_sop_create(conn *c) { if (settings.detail_enabled) { stats_prefix_record_sop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -4985,13 +5051,23 @@ static void process_bin_sop_prepare_nread(conn *c) { } if (settings.detail_enabled && ret != ENGINE_SUCCESS) { - if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) + if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { stats_prefix_record_sop_insert(key, nkey, false); - else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) + } else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { stats_prefix_record_sop_delete(key, nkey, false); - else + } else { stats_prefix_record_sop_exist(key, nkey, false); + } } +#ifdef CMD_IN_SECOND + if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { + cmd_in_second_write(OPERATION_SOP_INSERT, key, c->client_ip); + } else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { + cmd_in_second_write(OPERATION_SOP_DELETE, key, c->client_ip); + } else { + cmd_in_second_write(OPERATION_SOP_EXIST, key, c->client_ip); + } +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5081,6 +5157,9 @@ static void process_bin_sop_insert_complete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_sop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_INSERT, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5141,7 +5220,9 @@ static void process_bin_sop_delete_complete(conn *c) { stats_prefix_record_sop_delete(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } - +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_DELETE, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: STATS_ELEM_HITS(c, sop_delete, c->coll_key, c->coll_nkey); @@ -5190,6 +5271,9 @@ static void process_bin_sop_exist_complete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_sop_exist(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_EXIST, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5286,6 +5370,9 @@ static void process_bin_sop_get(conn *c) { stats_prefix_record_sop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_GET, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5412,6 +5499,9 @@ static void process_bin_bop_create(conn *c) { if (settings.detail_enabled) { stats_prefix_record_bop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5492,6 +5582,9 @@ static void process_bin_bop_prepare_nread(conn *c) { if (settings.detail_enabled && ret != ENGINE_SUCCESS) { stats_prefix_record_bop_insert(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_INSERT, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5571,6 +5664,9 @@ static void process_bin_bop_insert_complete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_bop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_INSERT, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5651,6 +5747,9 @@ static void process_bin_bop_update_complete(conn *c) { (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_UPDATE, c->coll_key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: STATS_ELEM_HITS(c, bop_update, c->coll_key, c->coll_nkey); @@ -5764,6 +5863,9 @@ static void process_bin_bop_update_prepare_nread(conn *c) { stats_prefix_record_bop_update(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_UPDATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: c->ritem = ((value_item *)elem)->ptr; @@ -5841,6 +5943,9 @@ static void process_bin_bop_delete(conn *c) { stats_prefix_record_bop_delete(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_DELETE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -5946,6 +6051,9 @@ static void process_bin_bop_get(conn *c) { stats_prefix_record_bop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_GET, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -6078,6 +6186,9 @@ static void process_bin_bop_count(conn *c) { if (settings.detail_enabled) { stats_prefix_record_bop_count(key, nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_COUNT, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -6721,6 +6832,9 @@ static void process_bin_getattr(conn *c) { if (settings.detail_enabled) { stats_prefix_record_getattr(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_GETATTR, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -6858,6 +6972,9 @@ static void process_bin_setattr(conn *c) { if (settings.detail_enabled) { stats_prefix_record_setattr(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SETATTR, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -7343,6 +7460,9 @@ static void process_bin_update(conn *c) { if (settings.detail_enabled) { stats_prefix_record_set(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SET, key, c->client_ip); +#endif ENGINE_ERROR_CODE ret; ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen+2, @@ -7428,6 +7548,9 @@ static void process_bin_append_prepend(conn *c) { if (settings.detail_enabled) { stats_prefix_record_set(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SET, key, c->client_ip); +#endif ENGINE_ERROR_CODE ret; ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen+2, @@ -7599,6 +7722,9 @@ static void process_bin_delete(conn *c) { if (settings.detail_enabled) { stats_prefix_record_delete(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_DELETE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -8519,6 +8645,9 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, if (settings.detail_enabled) { stats_prefix_record_get(key, nkey, NULL != it); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_GET, key, c->client_ip); +#endif if (it) { if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { @@ -8733,6 +8862,9 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken if (settings.detail_enabled) { stats_prefix_record_set(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SET, key, c->client_ip); +#endif ENGINE_ERROR_CODE ret; ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen, @@ -8825,6 +8957,15 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt stats_prefix_record_decr(key, nkey); } } +#ifdef CMD_IN_SECOND + if (incr) { + stats_prefix_record_incr(key, nkey); + cmd_in_second_write(OPERATION_INCR, key, c->client_ip); + } else { + stats_prefix_record_decr(key, nkey); + cmd_in_second_write(OPERATION_DECR, key, c->client_ip); + } +#endif ENGINE_ERROR_CODE ret; uint64_t cas; @@ -8915,8 +9056,11 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken } if (settings.detail_enabled) { - stats_prefix_record_delete(key, nkey); + stats_prefix_record_delete(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_DELETE, key, c->client_ip); +#endif ENGINE_ERROR_CODE ret; ret = mc_engine.v1->remove(mc_engine.v0, c, key, nkey, 0, 0); @@ -9949,8 +10093,11 @@ static void process_lop_get(conn *c, char *key, size_t nkey, } if (settings.detail_enabled) { - stats_prefix_record_lop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); + stats_prefix_record_lop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_GET, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -10057,8 +10204,11 @@ static void process_lop_prepare_nread(conn *c, int cmd, size_t vlen, } if (settings.detail_enabled && ret != ENGINE_SUCCESS) { - stats_prefix_record_lop_insert(key, nkey, false); + stats_prefix_record_lop_insert(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_INSERT, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -10100,8 +10250,11 @@ static void process_lop_create(conn *c, char *key, size_t nkey, item_attr *attrp } if (settings.detail_enabled) { - stats_prefix_record_lop_create(key, nkey); + stats_prefix_record_lop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -10139,8 +10292,11 @@ static void process_lop_delete(conn *c, char *key, size_t nkey, } if (settings.detail_enabled) { - stats_prefix_record_lop_delete(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); + stats_prefix_record_lop_delete(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_LOP_DELETE, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -10367,8 +10523,11 @@ static void process_sop_get(conn *c, char *key, size_t nkey, uint32_t count, } if (settings.detail_enabled) { - stats_prefix_record_sop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); + stats_prefix_record_sop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_GET, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -10481,12 +10640,13 @@ static void process_sop_prepare_nread(conn *c, int cmd, size_t vlen, char *key, } if (settings.detail_enabled && ret != ENGINE_SUCCESS) { - if (cmd == (int)OPERATION_SOP_INSERT) + if (cmd == (int)OPERATION_SOP_INSERT) { stats_prefix_record_sop_insert(key, nkey, false); - else if (cmd == (int)OPERATION_SOP_DELETE) + } else if (cmd == (int)OPERATION_SOP_DELETE) { stats_prefix_record_sop_delete(key, nkey, false); - else + } else { stats_prefix_record_sop_exist(key, nkey, false); + } } switch (ret) { @@ -10541,6 +10701,9 @@ static void process_sop_create(conn *c, char *key, size_t nkey, item_attr *attrp if (settings.detail_enabled) { stats_prefix_record_sop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -10762,6 +10925,9 @@ static void process_bop_get(conn *c, char *key, size_t nkey, if (settings.detail_enabled) { stats_prefix_record_bop_get(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_GET, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -10878,6 +11044,9 @@ static void process_bop_count(conn *c, char *key, size_t nkey, if (settings.detail_enabled) { stats_prefix_record_bop_count(key, nkey, (ret==ENGINE_SUCCESS)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_COUNT, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -10928,6 +11097,9 @@ static void process_bop_position(conn *c, char *key, size_t nkey, if (settings.detail_enabled) { stats_prefix_record_bop_position(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_POSITION, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -10987,6 +11159,9 @@ static void process_bop_pwg(conn *c, char *key, size_t nkey, const bkey_range *b if (settings.detail_enabled) { stats_prefix_record_bop_pwg(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_PWG, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -11100,6 +11275,9 @@ static void process_bop_gbp(conn *c, char *key, size_t nkey, ENGINE_BTREE_ORDER if (settings.detail_enabled) { stats_prefix_record_bop_gbp(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_GBP, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -11210,6 +11388,9 @@ static void process_bop_update_prepare_nread(conn *c, int cmd, char *key, size_t if (settings.detail_enabled && ret != ENGINE_SUCCESS) { stats_prefix_record_bop_update(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_UPDATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -11252,6 +11433,9 @@ static void process_bop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, if (settings.detail_enabled && ret != ENGINE_SUCCESS) { stats_prefix_record_bop_insert(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_INSERT, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -11394,6 +11578,9 @@ static void process_bop_create(conn *c, char *key, size_t nkey, item_attr *attrp if (settings.detail_enabled) { stats_prefix_record_bop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -11435,6 +11622,9 @@ static void process_bop_delete(conn *c, char *key, size_t nkey, if (settings.detail_enabled) { stats_prefix_record_bop_delete(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_BOP_DELETE, key, c->client_ip); +#endif #ifdef DETECT_LONG_QUERY if (lqdetect_in_use && ret == ENGINE_SUCCESS) { @@ -11488,7 +11678,20 @@ static void process_bop_arithmetic(conn *c, char *key, size_t nkey, bkey_range * c->ewouldblock = true; ret = ENGINE_SUCCESS; } +#ifdef CMD_IN_SECOND + if (incr) { + cmd_in_second_write(OPERATION_BOP_INCR, key, c->client_ip); + if (settings.detail_enabled) { + stats_prefix_record_bop_incr(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); + } + } else { + cmd_in_second_write(OPERATION_BOP_DECR, key, c->client_ip); + if (settings.detail_enabled) { + stats_prefix_record_bop_decr(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); + } + } +#else if (settings.detail_enabled) { if (incr) { stats_prefix_record_bop_incr(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); @@ -11496,8 +11699,9 @@ static void process_bop_arithmetic(conn *c, char *key, size_t nkey, bkey_range * stats_prefix_record_bop_decr(key, nkey, (ret==ENGINE_SUCCESS || ret==ENGINE_ELEM_ENOENT)); } } +#endif - switch (ret) { + switch (ret) { case ENGINE_SUCCESS: if (incr) { STATS_ELEM_HITS(c, bop_incr, key, nkey); @@ -11744,6 +11948,9 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, if (settings.detail_enabled && ret != ENGINE_SUCCESS) { stats_prefix_record_mop_insert(key, nkey, false); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MOP_INSERT, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -11834,6 +12041,9 @@ static void process_mop_create(conn *c, char *key, size_t nkey, item_attr *attrp if (settings.detail_enabled) { stats_prefix_record_mop_create(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_MOP_CREATE, key, c->client_ip); +#endif switch (ret) { case ENGINE_SUCCESS: @@ -12851,6 +13061,9 @@ static void process_getattr_command(conn *c, token_t *tokens, const size_t ntoke if (settings.detail_enabled) { stats_prefix_record_getattr(key, nkey); } +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_GETATTR, key, c->client_ip); +#endif } switch (ret) { @@ -13006,6 +13219,9 @@ static void process_setattr_command(conn *c, token_t *tokens, const size_t ntoke if (settings.detail_enabled) { stats_prefix_record_setattr(key, nkey); +#ifdef CMD_IN_SECOND + cmd_in_second_write(OPERATION_SETATTR, key, c->client_ip); +#endif } } @@ -13030,6 +13246,225 @@ static void process_setattr_command(conn *c, token_t *tokens, const size_t ntoke } } +#ifdef CMD_IN_SECOND +static void process_second_command(conn *c, token_t *tokens, const size_t ntokens) { + + assert(c != NULL); + + if (ntokens == 3) { + if (strcmp("stop", tokens[SUBCOMMAND_TOKEN].value) != 0) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + bool already_stop = false; + cmd_in_second_stop(&already_stop); + + if (already_stop) { + out_string(c, "cmd_in_second already stopped"); + return; + } + + out_string(c, "cmd_in_second stopped"); + return; + } + + + int operation_enum = 0; + char cmd[20] = ""; + + bool is_collection_cmd = true; + + if (strcmp("lop", tokens[SUBCOMMAND_TOKEN].value) == 0) { + if (strcmp("create", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "lop create"); + operation_enum = OPERATION_LOP_CREATE; + } else if (strcmp("insert", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "lop insert"); + operation_enum = OPERATION_LOP_INSERT; + } else if (strcmp("delete", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "lop delete"); + operation_enum = OPERATION_LOP_DELETE; + } else if (strcmp("get", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "lop get"); + operation_enum = OPERATION_LOP_GET; + } else { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + } else if (strcmp("sop", tokens[SUBCOMMAND_TOKEN].value) == 0) { + if (strcmp("create", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "sop create"); + operation_enum = OPERATION_SOP_CREATE; + } else if (strcmp("insert", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "sop insert"); + operation_enum = OPERATION_SOP_INSERT; + } else if (strcmp("delete", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "sop delete"); + operation_enum = OPERATION_SOP_DELETE; + } else if (strcmp("exist", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "sop exist"); + operation_enum = OPERATION_SOP_EXIST; + } else if (strcmp("get", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "sop get"); + operation_enum = OPERATION_SOP_GET; + } else { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + } else if (strcmp("mop", tokens[SUBCOMMAND_TOKEN].value) == 0) { + if (strcmp("create", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "mop create"); + operation_enum = OPERATION_MOP_CREATE; + } else if (strcmp("insert", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "mop insert"); + operation_enum = OPERATION_MOP_INSERT; + } else if (strcmp("delete", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "mop delete"); + operation_enum = OPERATION_MOP_DELETE; + } else if (strcmp("update", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "mop update"); + operation_enum = OPERATION_MOP_UPDATE; + } else if (strcmp("get", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "mop get"); + operation_enum = OPERATION_MOP_GET; + } else { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + } else if (strcmp("bop", tokens[SUBCOMMAND_TOKEN].value) == 0) { + if (strcmp("create", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop create"); + operation_enum = OPERATION_BOP_CREATE; + } else if (strcmp("insert", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop insert"); + operation_enum = OPERATION_BOP_INSERT; + } else if (strcmp("upsert", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop upsert"); + operation_enum = OPERATION_BOP_UPSERT; + } else if (strcmp("update", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop update"); + operation_enum = OPERATION_BOP_UPDATE; + } else if (strcmp("delete", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop delete"); + operation_enum = OPERATION_BOP_DELETE; + } else if (strcmp("count", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop count"); + operation_enum = OPERATION_BOP_COUNT; + } else if (strcmp("incr", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop incr"); + operation_enum = OPERATION_BOP_INCR; + } else if (strcmp("decr", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop decr"); + operation_enum = OPERATION_BOP_DECR; + } else if (strcmp("get", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop get"); + operation_enum = OPERATION_BOP_GET; + } else if (strcmp("mget", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop mget"); + operation_enum = OPERATION_BOP_MGET; + } else if (strcmp("smget", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop smget"); + operation_enum = OPERATION_BOP_SMGET; + } else if (strcmp("position", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop position"); + operation_enum = OPERATION_BOP_POSITION; + } else if (strcmp("gbp", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop gbp"); + operation_enum = OPERATION_BOP_GBP; + } else if (strcmp("pwg", tokens[SUBCOMMAND_TOKEN+1].value) == 0) { + sprintf(cmd, "bop pwg"); + operation_enum = OPERATION_BOP_PWG; + } else { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + } else { /* key value command */ + if (strcmp("set", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "set"); + operation_enum = OPERATION_SET; + } else if (strcmp("add", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "add"); + operation_enum = OPERATION_ADD; + } else if (strcmp("replace", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "replace"); + operation_enum = OPERATION_REPLACE; + } else if (strcmp("append", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "append"); + operation_enum = OPERATION_APPEND; + } else if (strcmp("prepend", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "prepend"); + operation_enum = OPERATION_PREPEND; + } else if (strcmp("delete", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "delete"); + operation_enum = OPERATION_DELETE; + } else if (strcmp("get", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "get"); + operation_enum = OPERATION_GET; + } else if (strcmp("gets", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "get"); + operation_enum = OPERATION_GETS; + } else if (strcmp("mget", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "mget"); + operation_enum = OPERATION_MGET; + } else if (strcmp("cas", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "cas"); + operation_enum = OPERATION_CAS; + } else if (strcmp("incr", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "incr"); + operation_enum = OPERATION_INCR; + } else if (strcmp("decr", tokens[SUBCOMMAND_TOKEN].value) == 0) { + sprintf(cmd, "decr"); + operation_enum = OPERATION_DECR; + } else { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + is_collection_cmd = false; + } + + if ((is_collection_cmd && ntokens != 5) || + (!is_collection_cmd && ntokens != 4)) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + int cmd_idx = SUBCOMMAND_TOKEN; + int cnt_idx = cmd_idx + 1 + is_collection_cmd; + + int cnt_to_log= 0; + + if (!safe_strtol(tokens[cnt_idx].value, &cnt_to_log) || cnt_to_log <= 0) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + int start_code = cmd_in_second_start(operation_enum, cmd, cnt_to_log); + + char response[100] = ""; + + switch (start_code) { + case CMD_IN_SECOND_STARTED_ALREADY: + sprintf(response, "cmd in second already started"); + break; + case CMD_IN_SECOND_NO_MEM: + sprintf(response, "SERVER_ERROR out of memory"); + break; + case CMD_IN_SECOND_THREAD_FAILED: + sprintf(response, "cmd_in_second failed to create a flush thread"); + break; + case CMD_IN_SECOND_FILE_FAILED: + sprintf(response, "cmd_in_second failed to create a log file"); + break; + case CMD_IN_SECOND_START: + settings.detail_enabled = true; + sprintf(response, "%s will be logged", cmd); + break; + } + out_string(c, response); +} +#endif + static void process_command(conn *c, char *command, int cmdlen) { /* One more token is reserved in tokens strucure @@ -13151,6 +13586,12 @@ static void process_command(conn *c, char *command, int cmdlen) { process_config_command(c, tokens, ntokens); } +#ifdef CMD_IN_SECOND + else if ((ntokens >= 3 && ntokens <= 5) && (strcmp(tokens[COMMAND_TOKEN].value, "cmd_in_second") == 0)) + { + process_second_command(c, tokens, ntokens); + } +#endif #ifdef ENABLE_ZK_INTEGRATION else if ((ntokens >= 3) && (strcmp(tokens[COMMAND_TOKEN].value, "zkensemble") == 0)) { process_zkensemble_command(c, tokens, ntokens); @@ -15718,6 +16159,10 @@ int main (int argc, char **argv) { cmdlog_init(settings.port, mc_logger); #endif +#ifdef CMD_IN_SECOND + cmd_in_second_init(mc_logger); +#endif + #ifdef DETECT_LONG_QUERY /* initialise long query detection */ if (lqdetect_init() == -1) { diff --git a/t/bulk.t b/t/bulk.t new file mode 100644 index 000000000..8db53268e --- /dev/null +++ b/t/bulk.t @@ -0,0 +1,193 @@ +#!/usr/bin/perl + +use strict; +use Test::More tests => 20781; +use Time::HiRes qw(gettimeofday time); +use FindBin qw($Bin); +use lib "$Bin/lib"; +use MemcachedTest; +use Cwd; + +my $engine = shift; +my $server = get_memcached($engine); +my $sock = $server->sock; + +my $start = 0; +my $on_logging = 1; + +my $bulk_size = 3461; + +#mem_cmd_is($sock, $cmd, $val, $rst); +# +sub request_log { + my ($whole_cmd, $cnt, $state) = @_; + + if ($cnt <= 0) { + my $rst= "CLIENT_ERROR bad command line format"; + mem_cmd_is($sock, "cmd_in_second $whole_cmd $cnt", "", $rst); + return; + } + + if ($state == $start){ + my $rst = "$whole_cmd will be logged"; + mem_cmd_is($sock, "cmd_in_second $whole_cmd $cnt", "", $rst); + return; + } + + if ($state == $on_logging) { + my $rst = "cmd in second already started"; + mem_cmd_is($sock, "cmd_in_second $whole_cmd $cnt", "", $rst); + } +} + +sub stop_log { + request_log("bop insert", 1000, $start); + mem_cmd_is($sock, "cmd_in_second stop", "", "cmd_in_second stopped"); + mem_cmd_is($sock, "cmd_in_second stop", "", "cmd_in_second already stopped"); +} + +sub do_bulk_coll_insert { + + + my ($collection, $key, $count) = @_; + my $start_time = time; + + my $cmd = "insert"; + + for (my $index=0; $index < $count; $index++) { + my $val = "datum$index"; + my $vleng = length($val); + + my $whole_cmd; + + if ($collection eq "sop") { + $whole_cmd = "$collection $cmd $key $vleng"; + } + + elsif ($collection eq "bop" or $collection eq "lop") { + $whole_cmd = "$collection $cmd $key $index $vleng"; + } + + elsif ($collection eq "mop") { + my $field = "f$index"; + $whole_cmd = "$collection $cmd $key $field $vleng"; + } + + my $rst = "STORED"; + + if ($index == 0) { + my $create = "create 13 0 0"; + + if ($collection eq "sop") { + $whole_cmd = "$collection $cmd $key $vleng $create"; + } + + elsif ($collection eq "bop" or $collection eq "lop") { + $whole_cmd = "$collection $cmd $key $index $vleng $create"; + } + + elsif ($collection eq "mop") { + my $field = "f$index"; + $whole_cmd = "$collection $cmd $key $field $vleng $create"; + } + + $rst = "CREATED_STORED"; + } + + if ($count >= 10 and $count < $bulk_size) { + if ($index % (int($count/10)) == 0) { + request_log("mop insert", 1000, $on_logging); + } + } + + mem_cmd_is($sock, $whole_cmd, $val, $rst); + } + + + my $end_time = time; + + cmp_ok($end_time - $start_time, "<=", 1000, "all commands are done in a second"); + sleep(1); + + file_check() +} + + +sub wrong_cmd_test{ + + request_log("sop insert", 0, $start); + request_log("sop insert", -1, $start); + + my $bad_format = "CLIENT_ERROR bad command line format"; + mem_cmd_is($sock, "cmd_in_second lop upsert 1000", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second sop upsert 1000", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second mop upsert 1000", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second bop cas 1000", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second bop insert", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second bop insert blahblah", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second set 1000 blahblah", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second bop", "", $bad_format); + mem_cmd_is($sock, "cmd_in_second stop blahblah", "", $bad_format); + + my $unknown = "ERROR unknown command"; + mem_cmd_is($sock, "cmd_in_second bop insert 1000 blahblah", "", $unknown); + mem_cmd_is($sock, "cmd_in_second bop insert 1000 1000", "", $unknown); + +} + + +sub slow_cmd { + + my $request_cnt = 10000; + request_log("sop exist", $request_cnt, $start); + unlink "cmd_in_second.log" or die "can't remove old file\n"; + + for (my $i=0; $i < 5; $i++) { + for (my $j = 0; $j < $bulk_size; $j++) { + my $val = "datum$j"; + my $vleng = length($val); + + mem_cmd_is($sock, "sop exist skey $vleng", $val, "EXIST"); + sleep(0.001); + } + } + + if (-e "cmd_in_second.log") { + die "log made by non-bulk command"; + } +} + +sub file_check { + + my $file_handle; + open($file_handle, "cmd_in_second.log") or die "log file not exist\n"; + + if (-s "cmd_in_second.log" == 0) { + die "empty log"; + } + + my $line_cnt; + close($file_handle); +} + + +wrong_cmd_test(); +sleep(0.3); + +stop_log(); + +sleep(0.3); +#extremely small cases +request_log("bop insert", 1, $start); +do_bulk_coll_insert("bop", "bkey1", 1); +sleep(0.3); + +request_log("bop insert", 9, $start); +do_bulk_coll_insert("bop", "bkey2", 9); + +sleep(0.3); +request_log("bop insert", $bulk_size, $start); +do_bulk_coll_insert("bop", "bkey3", $bulk_size); + +#slow_cmd(); +release_memcached($engine, $server);