Skip to content

Commit

Permalink
FEATURE: Add multi store APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ing-eoking committed Jul 17, 2024
1 parent 3151b64 commit 7bd4c60
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 3 deletions.
1 change: 1 addition & 0 deletions libmemcached/memcached.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ static inline bool _memcached_init(memcached_st *self)
self->configure.filename= NULL;

self->flags.piped= false;
self->flags.bulk= false;
#ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION
self->server_manager= NULL;
self->logfile= NULL;
Expand Down
1 change: 1 addition & 0 deletions libmemcached/memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct memcached_st {
bool no_block:1; // Don't block
bool no_reply:1;
bool piped:1;
bool bulk:1;
bool randomize_replica_read:1;
bool support_cas:1;
bool tcp_nodelay:1;
Expand Down
187 changes: 185 additions & 2 deletions libmemcached/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
return MEMCACHED_BUFFERED;
}

if (noreply)
if (noreply or ptr->flags.bulk)
{
return MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -349,7 +349,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,

if (rc == MEMCACHED_SUCCESS)
{
if (ptr->flags.no_reply)
if (ptr->flags.no_reply or ptr->flags.bulk)
{
rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -428,6 +428,100 @@ static inline memcached_return_t memcached_send(memcached_st *ptr,
return rc;
}

static inline memcached_return_t memcached_send_bulk(memcached_st *ptr,
const char * const *group_keys, const size_t *group_key_length,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations,
uint32_t flags,
uint64_t cas,
memcached_return_t *results,
memcached_storage_action_t verb)
{
arcus_server_check_for_update(ptr);

memcached_return_t rc;

if (memcached_failed(rc= initialize_query(ptr)))
{
return rc;
}

if (not keys or not key_length)
{
return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
memcached_literal_param("key (length) list is null"));
}

if (not values or not value_length)
{
return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
memcached_literal_param("value (length) list is null"));
}

bool original_buffered = ptr->flags.buffer_requests;
char result[MEMCACHED_DEFAULT_COMMAND_SIZE];
uint32_t server_key;
memcached_server_write_instance_st instance;

/* buffered requests are incompatible with bulks. just flush them.. */
if (original_buffered) {
for (server_key = 0; server_key < memcached_server_count(ptr); server_key++)
{
instance= memcached_server_instance_fetch(ptr, server_key);
memcached_io_write(instance, NULL, 0, true);
rc = memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
}
ptr->flags.buffer_requests = false;
}

ptr->flags.bulk = true;

for (int i = 0; i < (int)number_of_keys; i++)
{
if (memcached_failed(memcached_validate_key_length(key_length[i], ptr->flags.binary_protocol)) ||
memcached_failed(memcached_key_test(*ptr, (const char **)&keys[i], &key_length[i], 1)))
{
return MEMCACHED_BAD_KEY_PROVIDED;
}

if (ptr->flags.binary_protocol)
{
results[i] = memcached_send_binary(ptr, group_keys[i], group_key_length[i],
keys[i], key_length[i], values[i], value_length[i],
expirations ? expirations[i] : 0,
flags, cas, verb);
}
else
{
results[i] = memcached_send_ascii(ptr, group_keys[i], group_key_length[i],
keys[i], key_length[i], values[i], value_length[i],
expirations ? expirations[i] : 0,
flags, cas, verb);
}
}

//ptr->flags.buffer_requests = original_buffered;
ptr->flags.bulk = false;

if (ptr->flags.no_reply) {
return rc;
}

for (int i = 0; i < (int)number_of_keys; i++)
{
if (results[i] == MEMCACHED_SUCCESS)
{
server_key= memcached_generate_hash_with_redistribution(ptr, group_keys[i], group_key_length[i]);
instance= memcached_server_instance_fetch(ptr, server_key);

results[i] = memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
}
}

return rc;
}

memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
const char *value, size_t value_length,
Expand Down Expand Up @@ -605,3 +699,92 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
return rc;
}

memcached_return_t memcached_set_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, SET_OP);
return rc;
}

memcached_return_t memcached_add_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, ADD_OP);
return rc;
}

memcached_return_t memcached_replace_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, REPLACE_OP);
return rc;
}

memcached_return_t memcached_prepend_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, PREPEND_OP);
return rc;
}

memcached_return_t memcached_append_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, APPEND_OP);
return rc;
}

memcached_return_t memcached_cas_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results)
{
memcached_return_t rc;
rc= memcached_send_bulk(ptr, keys, key_length,
keys, key_length, number_of_keys,
values, value_length,
expirations, flags, 0, results, CAS_OP);
return rc;
}
50 changes: 49 additions & 1 deletion libmemcached/storage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -129,6 +129,54 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
uint32_t flags,
uint64_t cas);

LIBMEMCACHED_API
memcached_return_t memcached_set_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_add_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_replace_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_prepend_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_append_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_cas_bulk(memcached_st *ptr,
const char * const *keys, const size_t *key_length,
size_t number_of_keys,
const char * const *values, const size_t *value_length,
time_t *expirations, uint32_t flags,
memcached_return_t *results);

#ifdef __cplusplus
}
#endif
Expand Down

0 comments on commit 7bd4c60

Please sign in to comment.