From 7bd4c6074671ba12c4aa34137de37d35581c9650 Mon Sep 17 00:00:00 2001 From: yeoncheol-kim Date: Mon, 15 Jul 2024 14:08:39 +0900 Subject: [PATCH] FEATURE: Add multi store APIs --- libmemcached/memcached.cc | 1 + libmemcached/memcached.h | 1 + libmemcached/storage.cc | 187 +++++++++++++++++++++++++++++++++++++- libmemcached/storage.h | 50 +++++++++- 4 files changed, 236 insertions(+), 3 deletions(-) diff --git a/libmemcached/memcached.cc b/libmemcached/memcached.cc index 8f2652fe..9ae2806e 100644 --- a/libmemcached/memcached.cc +++ b/libmemcached/memcached.cc @@ -176,6 +176,7 @@ static inline bool _memcached_init(memcached_st *self) self->configure.filename= NULL; self->flags.piped= false; + self->flags.bulk= false; #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION self->server_manager= NULL; self->logfile= NULL; diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 0cbcef17..0c6781a3 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -136,6 +136,7 @@ struct memcached_st { bool no_block:1; // Don't block bool no_reply:1; bool piped:1; + bool bulk:1; bool randomize_replica_read:1; bool support_cas:1; bool tcp_nodelay:1; diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index bce3425e..227a41b5 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -227,7 +227,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return MEMCACHED_BUFFERED; } - if (noreply) + if (noreply or ptr->flags.bulk) { return MEMCACHED_SUCCESS; } @@ -349,7 +349,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (rc == MEMCACHED_SUCCESS) { - if (ptr->flags.no_reply) + if (ptr->flags.no_reply or ptr->flags.bulk) { rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; } @@ -428,6 +428,100 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, return rc; } +static inline memcached_return_t memcached_send_bulk(memcached_st *ptr, + const char * const *group_keys, const size_t *group_key_length, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, + uint32_t flags, + uint64_t cas, + memcached_return_t *results, + memcached_storage_action_t verb) +{ + arcus_server_check_for_update(ptr); + + memcached_return_t rc; + + if (memcached_failed(rc= initialize_query(ptr))) + { + return rc; + } + + if (not keys or not key_length) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("key (length) list is null")); + } + + if (not values or not value_length) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("value (length) list is null")); + } + + bool original_buffered = ptr->flags.buffer_requests; + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + uint32_t server_key; + memcached_server_write_instance_st instance; + + /* buffered requests are incompatible with bulks. just flush them.. */ + if (original_buffered) { + for (server_key = 0; server_key < memcached_server_count(ptr); server_key++) + { + instance= memcached_server_instance_fetch(ptr, server_key); + memcached_io_write(instance, NULL, 0, true); + rc = memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + } + ptr->flags.buffer_requests = false; + } + + ptr->flags.bulk = true; + + for (int i = 0; i < (int)number_of_keys; i++) + { + if (memcached_failed(memcached_validate_key_length(key_length[i], ptr->flags.binary_protocol)) || + memcached_failed(memcached_key_test(*ptr, (const char **)&keys[i], &key_length[i], 1))) + { + return MEMCACHED_BAD_KEY_PROVIDED; + } + + if (ptr->flags.binary_protocol) + { + results[i] = memcached_send_binary(ptr, group_keys[i], group_key_length[i], + keys[i], key_length[i], values[i], value_length[i], + expirations ? expirations[i] : 0, + flags, cas, verb); + } + else + { + results[i] = memcached_send_ascii(ptr, group_keys[i], group_key_length[i], + keys[i], key_length[i], values[i], value_length[i], + expirations ? expirations[i] : 0, + flags, cas, verb); + } + } + + //ptr->flags.buffer_requests = original_buffered; + ptr->flags.bulk = false; + + if (ptr->flags.no_reply) { + return rc; + } + + for (int i = 0; i < (int)number_of_keys; i++) + { + if (results[i] == MEMCACHED_SUCCESS) + { + server_key= memcached_generate_hash_with_redistribution(ptr, group_keys[i], group_key_length[i]); + instance= memcached_server_instance_fetch(ptr, server_key); + + results[i] = memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + } + } + + return rc; +} memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length, const char *value, size_t value_length, @@ -605,3 +699,92 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, return rc; } +memcached_return_t memcached_set_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, SET_OP); + return rc; +} + +memcached_return_t memcached_add_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, ADD_OP); + return rc; +} + +memcached_return_t memcached_replace_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, REPLACE_OP); + return rc; +} + +memcached_return_t memcached_prepend_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, PREPEND_OP); + return rc; +} + + memcached_return_t memcached_append_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, APPEND_OP); + return rc; +} + +memcached_return_t memcached_cas_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results) +{ + memcached_return_t rc; + rc= memcached_send_bulk(ptr, keys, key_length, + keys, key_length, number_of_keys, + values, value_length, + expirations, flags, 0, results, CAS_OP); + return rc; +} diff --git a/libmemcached/storage.h b/libmemcached/storage.h index 215f4ebb..a0774f95 100644 --- a/libmemcached/storage.h +++ b/libmemcached/storage.h @@ -1,5 +1,5 @@ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -129,6 +129,54 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, uint32_t flags, uint64_t cas); +LIBMEMCACHED_API +memcached_return_t memcached_set_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_add_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_replace_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_prepend_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_append_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_cas_bulk(memcached_st *ptr, + const char * const *keys, const size_t *key_length, + size_t number_of_keys, + const char * const *values, const size_t *value_length, + time_t *expirations, uint32_t flags, + memcached_return_t *results); + #ifdef __cplusplus } #endif