Skip to content

Commit

Permalink
FEATURE: Add multi store APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ing-eoking committed Jul 17, 2024
1 parent 3151b64 commit 873cdfe
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 5 deletions.
1 change: 1 addition & 0 deletions libmemcached/memcached.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ static inline bool _memcached_init(memcached_st *self)
self->configure.filename= NULL;

self->flags.piped= false;
self->flags.bulk= false;
#ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION
self->server_manager= NULL;
self->logfile= NULL;
Expand Down
1 change: 1 addition & 0 deletions libmemcached/memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct memcached_st {
bool no_block:1; // Don't block
bool no_reply:1;
bool piped:1;
bool bulk:1;
bool randomize_replica_read:1;
bool support_cas:1;
bool tcp_nodelay:1;
Expand Down
176 changes: 172 additions & 4 deletions libmemcached/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
{ value_length, value }
};

flush= (bool) ((ptr->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
/* do not buffer requests internally during bulk operations. */
flush= (bool) ((!ptr->flags.bulk && ptr->flags.buffer_requests && verb == SET_OP) ? 0 : 1);

uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
memcached_server_write_instance_st server= memcached_server_instance_fetch(ptr, server_key);
Expand Down Expand Up @@ -227,7 +228,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
return MEMCACHED_BUFFERED;
}

if (noreply)
if (noreply or ptr->flags.bulk)
{
return MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -310,8 +311,9 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,
{ 2, "\r\n" }
};

/* do not buffer requests internally during bulk operations. */
bool to_write;
if (ptr->flags.buffer_requests && verb == SET_OP)
if (!ptr->flags.bulk && ptr->flags.buffer_requests && verb == SET_OP)
{
to_write= false;
}
Expand Down Expand Up @@ -349,7 +351,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,

if (rc == MEMCACHED_SUCCESS)
{
if (ptr->flags.no_reply)
if (ptr->flags.no_reply or ptr->flags.bulk)
{
rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -428,6 +430,83 @@ static inline memcached_return_t memcached_send(memcached_st *ptr,
return rc;
}

static inline memcached_return_t memcached_send_bulk(memcached_st *ptr,
const char * const *group_keys, const size_t *group_key_length,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations,
uint32_t flags,
uint64_t cas,
memcached_return_t *results,
memcached_storage_action_t verb)
{
arcus_server_check_for_update(ptr);

memcached_return_t rc;
if (memcached_failed(rc= initialize_query(ptr)))
{
return rc;
}

if (not keys or not key_length)
{
return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
memcached_literal_param("key (length) list is null"));
}

if (not values or not value_length)
{
return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
memcached_literal_param("value (length) list is null"));
}

ptr->flags.bulk = true;

for (int i = 0; i < (int)number_of_keys; i++)
{
if (memcached_failed(memcached_validate_key_length(key_length[i], ptr->flags.binary_protocol)) ||
memcached_failed(memcached_key_test(*ptr, (const char **)&keys[i], &key_length[i], 1)))
{
return MEMCACHED_BAD_KEY_PROVIDED;
}

if (ptr->flags.binary_protocol)
{
results[i] = memcached_send_binary(ptr, group_keys[i], group_key_length[i],
keys[i], key_length[i], values[i], value_length[i],
expirations ? expirations[i] : 0,
flags, cas, verb);
}
else
{
results[i] = memcached_send_ascii(ptr, group_keys[i], group_key_length[i],
keys[i], key_length[i], values[i], value_length[i],
expirations ? expirations[i] : 0,
flags, cas, verb);
}
}

ptr->flags.bulk = false;

if (ptr->flags.no_reply) {
return rc;
}

for (int i = 0; i < (int)number_of_keys; i++)
{
if (results[i] == MEMCACHED_SUCCESS)
{
uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_keys[i], group_key_length[i]);
memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);

char result[MEMCACHED_DEFAULT_COMMAND_SIZE];
results[i] = memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
}
}

return rc;
}

memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
const char *value, size_t value_length,
Expand Down Expand Up @@ -605,3 +684,92 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
return rc;
}

memcached_return_t memcached_set_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, SET_OP);
return rc;
}

memcached_return_t memcached_add_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, ADD_OP);
return rc;
}

memcached_return_t memcached_replace_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, REPLACE_OP);
return rc;
}

memcached_return_t memcached_prepend_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, PREPEND_OP);
return rc;
}

memcached_return_t memcached_append_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, APPEND_OP);
return rc;
}

memcached_return_t memcached_cas_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, CAS_OP);
return rc;
}
50 changes: 49 additions & 1 deletion libmemcached/storage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -129,6 +129,54 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
uint32_t flags,
uint64_t cas);

LIBMEMCACHED_API
memcached_return_t memcached_set_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_add_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_replace_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_prepend_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_append_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_cas_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

#ifdef __cplusplus
}
#endif
Expand Down

0 comments on commit 873cdfe

Please sign in to comment.