diff --git a/libmemcached/memcached.cc b/libmemcached/memcached.cc index 8f2652fe..9ae2806e 100644 --- a/libmemcached/memcached.cc +++ b/libmemcached/memcached.cc @@ -176,6 +176,7 @@ static inline bool _memcached_init(memcached_st *self) self->configure.filename= NULL; self->flags.piped= false; + self->flags.bulk= false; #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION self->server_manager= NULL; self->logfile= NULL; diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 0cbcef17..0c6781a3 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -136,6 +136,7 @@ struct memcached_st { bool no_block:1; // Don't block bool no_reply:1; bool piped:1; + bool bulk:1; bool randomize_replica_read:1; bool support_cas:1; bool tcp_nodelay:1; diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index bce3425e..ec46a509 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -170,7 +170,8 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, { value_length, value } }; - flush= (bool) ((ptr->flags.buffer_requests && verb == SET_OP) ? 0 : 1); + /* do not buffer requests internally during bulk operations. */ + flush= (bool) ((!ptr->flags.bulk && ptr->flags.buffer_requests && verb == SET_OP) ? 0 : 1); uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); memcached_server_write_instance_st server= memcached_server_instance_fetch(ptr, server_key); @@ -227,7 +228,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return MEMCACHED_BUFFERED; } - if (noreply) + if (noreply or ptr->flags.bulk) { return MEMCACHED_SUCCESS; } @@ -310,8 +311,9 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, { 2, "\r\n" } }; + /* do not buffer requests internally during bulk operations. */ bool to_write; - if (ptr->flags.buffer_requests && verb == SET_OP) + if (!ptr->flags.bulk && ptr->flags.buffer_requests && verb == SET_OP) { to_write= false; } @@ -349,7 +351,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (rc == MEMCACHED_SUCCESS) { - if (ptr->flags.no_reply) + if (ptr->flags.no_reply or ptr->flags.bulk) { rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; } @@ -428,6 +430,83 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, return rc; } +static inline memcached_return_t memcached_send_bulk(memcached_st *ptr, + const char * const *group_keys, const size_t *group_key_length, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, + uint32_t flags, + uint64_t cas, + memcached_return_t *results, + memcached_storage_action_t verb) +{ + arcus_server_check_for_update(ptr); + + memcached_return_t rc; + if (memcached_failed(rc= initialize_query(ptr))) + { + return rc; + } + + if (not keys or not key_length) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("key (length) list is null")); + } + + if (not values or not value_length) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("value (length) list is null")); + } + + ptr->flags.bulk = true; + + for (int i = 0; i < (int)number_of_keys; i++) + { + if (memcached_failed(memcached_validate_key_length(key_length[i], ptr->flags.binary_protocol)) || + memcached_failed(memcached_key_test(*ptr, (const char **)&keys[i], &key_length[i], 1))) + { + return MEMCACHED_BAD_KEY_PROVIDED; + } + + if (ptr->flags.binary_protocol) + { + results[i] = memcached_send_binary(ptr, group_keys[i], group_key_length[i], + keys[i], key_length[i], values[i], value_length[i], + expirations ? expirations[i] : 0, + flags, cas, verb); + } + else + { + results[i] = memcached_send_ascii(ptr, group_keys[i], group_key_length[i], + keys[i], key_length[i], values[i], value_length[i], + expirations ? expirations[i] : 0, + flags, cas, verb); + } + } + + ptr->flags.bulk = false; + + if (ptr->flags.no_reply) { + return rc; + } + + for (int i = 0; i < (int)number_of_keys; i++) + { + if (results[i] == MEMCACHED_SUCCESS) + { + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_keys[i], group_key_length[i]); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + results[i] = memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + } + } + + return rc; +} memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length, const char *value, size_t value_length, @@ -605,3 +684,92 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, return rc; } +memcached_return_t memcached_set_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, SET_OP); + return rc; +} + +memcached_return_t memcached_add_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, ADD_OP); + return rc; +} + +memcached_return_t memcached_replace_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, REPLACE_OP); + return rc; +} + +memcached_return_t memcached_prepend_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, PREPEND_OP); + return rc; +} + + memcached_return_t memcached_append_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, APPEND_OP); + return rc; +} + +memcached_return_t memcached_cas_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, CAS_OP); + return rc; +} diff --git a/libmemcached/storage.h b/libmemcached/storage.h index 215f4ebb..a0774f95 100644 --- a/libmemcached/storage.h +++ b/libmemcached/storage.h @@ -1,5 +1,5 @@ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -129,6 +129,54 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, uint32_t flags, uint64_t cas); +LIBMEMCACHED_API +memcached_return_t memcached_set_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_add_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_replace_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_prepend_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_append_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_cas_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + #ifdef __cplusplus } #endif