Skip to content

Commit

Permalink
INTERNAL: Move io reset from where calls memcached_vdo() to where fai…
Browse files Browse the repository at this point in the history
…ls inside of memcached_vdo() naver#167
  • Loading branch information
uhm0311 committed Apr 26, 2022
1 parent ed30ca2 commit 9adb488
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 0 deletions.
11 changes: 11 additions & 0 deletions libmemcached/auto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ static memcached_return_t text_incr_decr(memcached_st *ptr,
#endif
/* Send command header */
memcached_return_t rc= memcached_vdo(instance, vector, 7, true);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (ptr->flags.no_reply or rc != MEMCACHED_SUCCESS)
#else
if (ptr->flags.no_reply or memcached_failed(rc))
#endif
{
return rc;
}
Expand Down Expand Up @@ -214,6 +218,12 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
#ifdef ENABLE_REPLICATION
do_action:
#endif
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc= memcached_vdo(instance, vector, 3, true);
if (no_reply or rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
memcached_return_t rc;
if (memcached_failed(rc= memcached_vdo(instance, vector, 3, true)))
{
Expand All @@ -223,6 +233,7 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,

if (no_reply)
return MEMCACHED_SUCCESS;
#endif

rc= memcached_response(instance, (char*)value, sizeof(*value), NULL);
#ifdef ENABLE_REPLICATION
Expand Down
55 changes: 55 additions & 0 deletions libmemcached/collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,11 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif

return rc;
}
Expand Down Expand Up @@ -1043,10 +1046,13 @@ static memcached_return_t do_coll_create(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -1186,8 +1192,11 @@ static memcached_return_t internal_coll_piped_insert(memcached_st *ptr,
rc= memcached_vdo(instance, vector, 7, to_write);
if (rc != MEMCACHED_SUCCESS)
{
#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif
return rc;
}

Expand Down Expand Up @@ -1261,8 +1270,11 @@ static memcached_return_t internal_coll_piped_exist(memcached_st *ptr,
rc= memcached_vdo(instance, vector, 6, to_write);
if (rc != MEMCACHED_SUCCESS)
{
#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif
return rc;
}

Expand Down Expand Up @@ -1426,8 +1438,11 @@ static memcached_return_t do_coll_insert(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif

return rc;
}
Expand Down Expand Up @@ -1656,10 +1671,13 @@ static memcached_return_t do_coll_delete(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -1966,10 +1984,13 @@ static memcached_return_t do_coll_get(memcached_st *ptr,
mkey_buffer= NULL;
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2339,10 +2360,13 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2442,10 +2466,13 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2555,10 +2582,13 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2927,10 +2957,13 @@ static memcached_return_t do_coll_exist(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -3506,10 +3539,13 @@ static memcached_return_t do_coll_update(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -3765,7 +3801,25 @@ static memcached_return_t do_coll_count(memcached_st *ptr,
memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);

rc= memcached_vdo(instance, vector, 4, to_write);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (rc == MEMCACHED_SUCCESS && to_write == false) {
rc= MEMCACHED_BUFFERED;
}
if (rc != MEMCACHED_SUCCESS || ptr->flags.no_reply || ptr->flags.piped) {
return rc;
}

char response[MEMCACHED_DEFAULT_COMMAND_SIZE];
rc= memcached_coll_response(instance, response, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->collection_result);

if (rc == MEMCACHED_COUNT)
{
*count= ptr->collection_result.collection_count;
/* reset collection because it is used in memcached_coll_result_reset(collection_result.cc)*/
ptr->collection_result.collection_count= 0;
rc= MEMCACHED_SUCCESS;
}
#else
if (rc == MEMCACHED_SUCCESS)
{
if (to_write == false)
Expand Down Expand Up @@ -3795,6 +3849,7 @@ static memcached_return_t do_coll_count(memcached_st *ptr,
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down
1 change: 1 addition & 0 deletions libmemcached/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#define POOL_UPDATE_SERVERLIST 1
#define POOL_MORE_CONCURRENCY 1
#define KETAMA_HASH_COLLSION 1
#define MEMCACHED_VDO_ERROR_HANDLING 1

/* Public defines */
#define MEMCACHED_DEFAULT_PORT 11211
Expand Down
15 changes: 15 additions & 0 deletions libmemcached/delete.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,14 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr,
#endif
}
}
#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
else
{
memcached_io_reset(instance);
return rc;
}
#endif

return rc;
}
Expand Down Expand Up @@ -184,10 +187,15 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
memcached_io_write(instance, NULL, 0, true);
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc= memcached_vdo(instance, vector, 3, to_write);
if (rc != MEMCACHED_SUCCESS) {
#else
memcached_return_t rc= MEMCACHED_SUCCESS;
if ((rc= memcached_vdo(instance, vector, 3, to_write)) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
#endif
return rc;
}

Expand All @@ -205,6 +213,12 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,

replica= memcached_server_instance_fetch(ptr, server_key);

#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (memcached_vdo(replica, vector, 3, to_write) == MEMCACHED_SUCCESS)
{
memcached_server_response_decrement(replica);
}
#else
if (memcached_vdo(replica, vector, 3, to_write) != MEMCACHED_SUCCESS)
{
memcached_io_reset(replica);
Expand All @@ -213,6 +227,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
{
memcached_server_response_decrement(replica);
}
#endif
}
}

Expand Down
6 changes: 6 additions & 0 deletions libmemcached/do.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ memcached_return_t memcached_do(memcached_server_write_instance_st ptr,
if (sent_length == -1 || (size_t)sent_length != command_length)
{
rc= MEMCACHED_WRITE_FAILURE;
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_io_reset(ptr);
#endif
return rc;
}
if ((ptr->root->flags.no_reply) == 0)
Expand Down Expand Up @@ -114,6 +117,9 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
rc= MEMCACHED_WRITE_FAILURE;
WATCHPOINT_ERROR(rc);
WATCHPOINT_ERRNO(errno);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_io_reset(ptr);
#endif
return rc;
}
if ((ptr->root->flags.no_reply) == 0 and (ptr->root->flags.piped == false))
Expand Down
11 changes: 11 additions & 0 deletions libmemcached/exist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ static memcached_return_t ascii_exist(memcached_st *memc,
#else
memcached_return_t rc= memcached_vdo(instance, vector, 8, true);
#endif

if (rc == MEMCACHED_SUCCESS)
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
Expand All @@ -110,8 +111,11 @@ static memcached_return_t ascii_exist(memcached_st *memc,
#endif
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif

return rc;
}
Expand Down Expand Up @@ -148,12 +152,19 @@ static memcached_return_t binary_exist(memcached_st *memc,
memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, server_key);

/* write the header */
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc= memcached_vdo(instance, vector, 3, true);
if (rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
memcached_return_t rc;
if ((rc= memcached_vdo(instance, vector, 3, true)) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
}
#endif

rc= memcached_response(instance, NULL, 0, NULL);

Expand Down
8 changes: 8 additions & 0 deletions libmemcached/flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,19 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr,
request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc = memcached_do(instance, request.bytes, sizeof(request.bytes), true);
if (rc != MEMCACHED_SUCCESS)
{
return rc;
}
#else
if (memcached_do(instance, request.bytes, sizeof(request.bytes), true) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
return MEMCACHED_WRITE_FAILURE;
}
#endif
}

for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
Expand Down
4 changes: 4 additions & 0 deletions libmemcached/get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,13 @@ static memcached_return_t ascii_get_by_key(memcached_st *ptr,
};

rc= memcached_vdo(instance, vector, 4, true);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (rc != MEMCACHED_SUCCESS) {
#else
if (memcached_failed(rc))
{
memcached_io_reset(instance);
#endif
memcached_set_error(*ptr, rc, MEMCACHED_AT);
}
return rc;
Expand Down
Loading

0 comments on commit 9adb488

Please sign in to comment.