diff --git a/memcached.c b/memcached.c index c7b65f4d7..45ff6a33a 100644 --- a/memcached.c +++ b/memcached.c @@ -528,6 +528,17 @@ static bool conn_reset_buffersize(conn *c) } } + if (c->rlsize != RITEM_LIST_INITIAL) { + void *ptr = malloc(sizeof(struct iovec) * RITEM_LIST_INITIAL); + if (ptr != NULL) { + free(c->rlist); + c->rlist = ptr; + c->rlsize = RITEM_LIST_INITIAL; + } else { + ret = false; + } + } + return ret; } @@ -555,6 +566,7 @@ static int conn_constructor(void *buffer, void *unused1, int unused2) free(c->suffixlist); free(c->iov); free(c->msglist); + free(c->rlist); mc_logger->log(EXTENSION_LOG_WARNING, NULL, "Failed to allocate buffers for connection\n"); return 1; @@ -583,6 +595,7 @@ static void conn_destructor(void *buffer, void *unused) free(c->suffixlist); free(c->iov); free(c->msglist); + free(c->rlist); LOCK_STATS(); mc_stats.conn_structs--; @@ -655,11 +668,8 @@ conn *conn_new(const int sfd, STATE_FUNC init_state, c->rbytes = c->wbytes = 0; c->wcurr = c->wbuf; c->rcurr = c->rbuf; - c->rtype = CONN_RTYPE_NONE; - c->rindex = 0; /* used when rtype is HINFO or EINFO */ - c->ritem = 0; - c->rlbytes = 0; - c->rltotal = 0; /* used when read with multiple mem blocks */ + c->rlcurr = 0; + c->rlused = 0; #ifdef SCAN_COMMAND c->pcurr = c->ilist; #endif @@ -1010,85 +1020,89 @@ static void conn_shrink(conn *c) c->iov = newbuf; c->iovsize = IOV_LIST_INITIAL; } + /* TODO check error condition? */ + } + + if (c->rlsize > RITEM_LIST_HIGHWAT) { + struct iovec *newbuf = (struct iovec *) realloc((void *)c->rlist, RITEM_LIST_INITIAL * sizeof(c->rlist[0])); + if (newbuf) { + c->rlist = newbuf; + c->rlsize = IOV_LIST_INITIAL; + } /* TODO check return value */ } } -static void ritem_set_first(conn *c, int rtype, int vleng) -{ - c->rtype = rtype; - if (c->rtype == CONN_RTYPE_MBLCK) { - c->membk = MBLCK_GET_HEADBLK(&c->memblist); - c->ritem = MBLCK_GET_BODYPTR(c->membk); - c->rlbytes = vleng < MBLCK_GET_BODYLEN(&c->memblist) - ? vleng : MBLCK_GET_BODYLEN(&c->memblist); - c->rltotal = vleng; - } - else if (c->rtype == CONN_RTYPE_HINFO) { - if (c->hinfo.naddnl == 0) { - c->ritem = (char*)c->hinfo.value; - c->rlbytes = vleng; - c->rltotal = 0; - } else { - if (c->hinfo.nvalue > 0) { - c->ritem = (char*)c->hinfo.value; - c->rlbytes = vleng < c->hinfo.nvalue - ? vleng : c->hinfo.nvalue; - c->rindex = 0; - } else { - c->ritem = c->hinfo.addnl[0]->ptr; - c->rlbytes = vleng < c->hinfo.addnl[0]->len - ? vleng : c->hinfo.addnl[0]->len; - c->rindex = 1; - } - c->rltotal = vleng; - } +static int add_ritem(conn *c, void *buf, int len) { + if (c->rlused >= c->rlsize) { + struct iovec *new_rlist = (struct iovec *)realloc(c->rlist, + (c->rlsize * 2) * sizeof(struct iovec)); + if (! new_rlist) + return -1; + c->rlist = new_rlist; + c->rlsize *= 2; } - else if (c->rtype == CONN_RTYPE_EINFO) { - if (c->einfo.naddnl == 0) { - c->ritem = (char*)c->einfo.value; - c->rlbytes = vleng; - c->rltotal = 0; - } else { - if (c->einfo.nvalue > 0) { - c->ritem = (char*)c->einfo.value; - c->rlbytes = vleng < c->einfo.nvalue - ? vleng : c->einfo.nvalue; - c->rindex = 0; - } else { - c->ritem = c->einfo.addnl[0]->ptr; - c->rlbytes = vleng < c->einfo.addnl[0]->len - ? vleng : c->einfo.addnl[0]->len; - c->rindex = 1; - } - c->rltotal = vleng; - } + if (len != 0) { + c->rlist[c->rlused].iov_base = (void *)buf; + c->rlist[c->rlused].iov_len = len; + c->rlused++; } + return 0; } -static void ritem_set_next(conn *c) -{ - assert(c->rltotal > 0); - if (c->rtype == CONN_RTYPE_MBLCK) { - c->membk = MBLCK_GET_NEXTBLK(c->membk); - c->ritem = MBLCK_GET_BODYPTR(c->membk); - c->rlbytes = c->rltotal < MBLCK_GET_BODYLEN(&c->memblist) - ? c->rltotal : MBLCK_GET_BODYLEN(&c->memblist); +static int add_ritem_mblck(conn *c, mblck_list_t *memblist) { + uint32_t rltotal = MBLCK_GET_ITEMCNT(memblist); + uint32_t rlbytes; + for (mblck_node_t *membk = MBLCK_GET_HEADBLK(memblist); + membk != NULL && rltotal > 0; + membk = MBLCK_GET_NEXTBLK(membk)) { + rlbytes = rltotal < MBLCK_GET_BODYLEN(memblist) + ? rltotal : MBLCK_GET_BODYLEN(memblist); + if (add_ritem(c, MBLCK_GET_BODYPTR(membk), rlbytes) != 0) + return -1; + rltotal -= rlbytes; } - else if (c->rtype == CONN_RTYPE_HINFO) { - c->ritem = c->hinfo.addnl[c->rindex]->ptr; - c->rlbytes = c->rltotal < c->hinfo.addnl[c->rindex]->len - ? c->rltotal : c->hinfo.addnl[c->rindex]->len; - c->rindex += 1; + return 0; +} + +static int add_ritem_hinfo(conn *c, item_info *hinfo) { + uint32_t rltotal = hinfo->nbytes; + uint32_t rlbytes; + if (c->protocol == binary_prot) rltotal -= 2; + rlbytes = rltotal < hinfo->nvalue + ? rltotal : hinfo->nvalue; + if (add_ritem(c, (void *)hinfo->value, rlbytes) != 0) + return -1; + rltotal -= rlbytes; + for (int i = 0; i < hinfo->naddnl && rltotal > 0; i++) { + rlbytes = rltotal < hinfo->addnl[i]->len + ? rltotal : hinfo->addnl[i]->len; + if (add_ritem(c, hinfo->addnl[i]->ptr, rlbytes) != 0) + return -1; + rltotal -= rlbytes; } - else if (c->rtype == CONN_RTYPE_EINFO) { - c->ritem = c->einfo.addnl[c->rindex]->ptr; - c->rlbytes = c->rltotal < c->einfo.addnl[c->rindex]->len - ? c->rltotal : c->einfo.addnl[c->rindex]->len; - c->rindex += 1; + return 0; +} + +static int add_ritem_einfo(conn *c, eitem_info *einfo) { + uint32_t rltotal = einfo->nbytes; + uint32_t rlbytes; + if (c->protocol == binary_prot) rltotal -= 2; + rlbytes = rltotal < einfo->nvalue + ? rltotal : einfo->nvalue; + if (add_ritem(c, (void *)einfo->value, rlbytes) != 0) + return -1; + rltotal -= rlbytes; + for (int i = 0; i < einfo->naddnl && rltotal > 0; i++) { + rlbytes = rltotal < einfo->addnl[i]->len + ? rltotal : einfo->addnl[i]->len; + if (add_ritem(c, einfo->addnl[i]->ptr, rlbytes) != 0) + return -1; + rltotal -= rlbytes; } + return 0; } /** @@ -4125,9 +4139,15 @@ static void bin_read_chunk(conn *c, enum bin_substates next_substate, uint32_t c } /* preserve the header in the buffer.. */ - c->ritem = c->rcurr + sizeof(protocol_binary_request_header); - c->rlbytes = chunk; - c->rltotal = 0; + if (add_ritem(c, c->rcurr + sizeof(protocol_binary_request_header), chunk) != 0) + { + if (settings.verbose) { + mc_logger->log(EXTENSION_LOG_WARNING, c, + "%d: Failed to grow rlist. closing connection\n", c->sfd); + } + conn_set_state(c, conn_closing); + return; + } conn_set_state(c, conn_nread); } @@ -4242,9 +4262,10 @@ static void process_bin_sasl_auth(conn *c) memcpy(data->data, key, nkey); c->item = data; - c->ritem = data->data + nkey; - c->rlbytes = vlen; - c->rltotal = 0; + if (add_ritem(c, data->data + nkey, vlen) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + return; + } conn_set_state(c, conn_nread); c->substate = bin_reading_sasl_auth_data; } @@ -4303,7 +4324,6 @@ static void process_bin_complete_sasl_auth(conn *c) free(c->item); c->item = NULL; - c->ritem = NULL; if (settings.verbose) { mc_logger->log(EXTENSION_LOG_INFO, c, @@ -4475,28 +4495,28 @@ static void process_bin_lop_prepare_nread(conn *c) ret = mc_engine.v1->list_elem_alloc(mc_engine.v0, c, key, nkey, vlen+2, &elem); } if (ret == ENGINE_SUCCESS) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = OPERATION_LOP_INSERT; - c->coll_index = req->message.body.index; - if (req->message.body.create) { - c->coll_attrp = &c->coll_attr_space; /* create if not exist */ - c->coll_attrp->flags = req->message.body.flags; - c->coll_attrp->exptime = realtime(req->message.body.exptime); - c->coll_attrp->maxcount = req->message.body.maxcount; - c->coll_attrp->readable = 1; + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; } else { - c->coll_attrp = NULL; + c->coll_index = req->message.body.index; + if (req->message.body.create) { + c->coll_attrp = &c->coll_attr_space; /* create if not exist */ + c->coll_attrp->flags = req->message.body.flags; + c->coll_attrp->exptime = realtime(req->message.body.exptime); + c->coll_attrp->maxcount = req->message.body.maxcount; + c->coll_attrp->readable = 1; + } else { + c->coll_attrp = NULL; + } + conn_set_state(c, conn_nread); + c->substate = bin_reading_lop_nread_complete; } - conn_set_state(c, conn_nread); - c->substate = bin_reading_lop_nread_complete; } else { - if (settings.detail_enabled) { - stats_prefix_record_lop_insert(key, nkey, false); - } - STATS_CMD_NOKEY(c, lop_insert); if (ret == ENGINE_E2BIG) write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); else if (ret == ENGINE_ENOMEM) @@ -4504,6 +4524,12 @@ static void process_bin_lop_prepare_nread(conn *c) else handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } + if (ret != ENGINE_SUCCESS) { + if (settings.detail_enabled) { + stats_prefix_record_lop_insert(key, nkey, false); + } + STATS_CMD_NOKEY(c, lop_insert); + } } static void process_bin_lop_insert_complete(conn *c) @@ -4862,42 +4888,58 @@ static void process_bin_sop_prepare_nread(conn *c) } } if (ret == ENGINE_SUCCESS) { - if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; - } c->coll_eitem = (void *)elem; c->coll_ecount = 1; if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { - protocol_binary_request_sop_insert* req = binary_get_request(c); - if (req->message.body.create) { - req->message.body.exptime = ntohl(req->message.body.exptime); - req->message.body.maxcount = ntohl(req->message.body.maxcount); - } c->coll_op = OPERATION_SOP_INSERT; - if (req->message.body.create) { - c->coll_attrp = &c->coll_attr_space; /* create if not exist */ - c->coll_attrp->flags = req->message.body.flags; - c->coll_attrp->exptime = realtime(req->message.body.exptime); - c->coll_attrp->maxcount = req->message.body.maxcount; - c->coll_attrp->readable = 1; - } else { - c->coll_attrp = NULL; + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); + ret = ENGINE_ENOMEM; + } + } else { + if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { + c->coll_op = OPERATION_SOP_DELETE; + } else { /* PROTOCOL_BINARY_CMD_SOP_EXIST */ + c->coll_op = OPERATION_SOP_EXIST; + } + if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); + ret = ENGINE_ENOMEM; } - } else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { - protocol_binary_request_sop_delete* req = binary_get_request(c); - c->coll_op = OPERATION_SOP_DELETE; - c->coll_drop = (req->message.body.drop ? true : false); - } else { /* PROTOCOL_BINARY_CMD_SOP_EXIST */ - c->coll_op = OPERATION_SOP_EXIST; } - conn_set_state(c, conn_nread); - c->substate = bin_reading_sop_nread_complete; + if (ret == ENGINE_SUCCESS) { + if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { + protocol_binary_request_sop_insert* req = binary_get_request(c); + if (req->message.body.create) { + req->message.body.exptime = ntohl(req->message.body.exptime); + req->message.body.maxcount = ntohl(req->message.body.maxcount); + } + if (req->message.body.create) { + c->coll_attrp = &c->coll_attr_space; /* create if not exist */ + c->coll_attrp->flags = req->message.body.flags; + c->coll_attrp->exptime = realtime(req->message.body.exptime); + c->coll_attrp->maxcount = req->message.body.maxcount; + c->coll_attrp->readable = 1; + } else { + c->coll_attrp = NULL; + } + } else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { + protocol_binary_request_sop_delete* req = binary_get_request(c); + c->coll_drop = (req->message.body.drop ? true : false); + } + conn_set_state(c, conn_nread); + c->substate = bin_reading_sop_nread_complete; + } } else { + if (ret == ENGINE_E2BIG) + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); + else if (ret == ENGINE_ENOMEM) + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + else + handle_unexpected_errorcode_bin(c, __func__, ret, vlen); + } + if (ret != ENGINE_SUCCESS) { if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { if (settings.detail_enabled) stats_prefix_record_sop_insert(key, nkey, false); @@ -4911,12 +4953,6 @@ static void process_bin_sop_prepare_nread(conn *c) stats_prefix_record_sop_exist(key, nkey, false); STATS_CMD_NOKEY(c, sop_exist); } - if (ret == ENGINE_E2BIG) - write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); - else if (ret == ENGINE_ENOMEM) - write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); - else - handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } } @@ -5333,37 +5369,37 @@ static void process_bin_bop_prepare_nread(conn *c) req->message.body.neflag, vlen+2, &elem); } if (ret == ENGINE_SUCCESS) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); - if (c->einfo.nscore == 0) { - memcpy((void*)c->einfo.score, req->message.body.bkey, sizeof(uint64_t)); - } else { - memcpy((void*)c->einfo.score, req->message.body.bkey, c->einfo.nscore); - } - if (c->einfo.neflag > 0) { - memcpy((void*)c->einfo.eflag, req->message.body.eflag, c->einfo.neflag); - } - - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = (c->cmd == PROTOCOL_BINARY_CMD_BOP_INSERT ? OPERATION_BOP_INSERT : OPERATION_BOP_UPSERT); - if (req->message.body.create) { - c->coll_attrp = &c->coll_attr_space; /* create if not exist */ - c->coll_attrp->flags = req->message.body.flags; - c->coll_attrp->exptime = realtime(req->message.body.exptime); - c->coll_attrp->maxcount = req->message.body.maxcount; - c->coll_attrp->readable = 1; + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; } else { - c->coll_attrp = NULL; + if (c->einfo.nscore == 0) { + memcpy((void*)c->einfo.score, req->message.body.bkey, sizeof(uint64_t)); + } else { + memcpy((void*)c->einfo.score, req->message.body.bkey, c->einfo.nscore); + } + if (c->einfo.neflag > 0) { + memcpy((void*)c->einfo.eflag, req->message.body.eflag, c->einfo.neflag); + } + + if (req->message.body.create) { + c->coll_attrp = &c->coll_attr_space; /* create if not exist */ + c->coll_attrp->flags = req->message.body.flags; + c->coll_attrp->exptime = realtime(req->message.body.exptime); + c->coll_attrp->maxcount = req->message.body.maxcount; + c->coll_attrp->readable = 1; + } else { + c->coll_attrp = NULL; + } + conn_set_state(c, conn_nread); + c->substate = bin_reading_bop_nread_complete; } - conn_set_state(c, conn_nread); - c->substate = bin_reading_bop_nread_complete; } else { - if (settings.detail_enabled) { - stats_prefix_record_bop_insert(key, nkey, false); - } - STATS_CMD_NOKEY(c, bop_insert); if (ret == ENGINE_E2BIG) write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); else if (ret == ENGINE_ENOMEM) @@ -5371,6 +5407,12 @@ static void process_bin_bop_prepare_nread(conn *c) else handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } + if (ret != ENGINE_SUCCESS) { + if (settings.detail_enabled) { + stats_prefix_record_bop_insert(key, nkey, false); + } + STATS_CMD_NOKEY(c, bop_insert); + } } static void process_bin_bop_insert_complete(conn *c) @@ -5572,19 +5614,17 @@ static void process_bin_bop_update_prepare_nread(conn *c) ((value_item*)elem)->len = vlen + 2; } if (ret == ENGINE_SUCCESS) { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = OPERATION_BOP_UPDATE; - conn_set_state(c, conn_nread); - c->substate = bin_reading_bop_update_nread_complete; - } else { - if (settings.detail_enabled) { - stats_prefix_record_bop_update(key, nkey, false); + if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; + } else { + conn_set_state(c, conn_nread); + c->substate = bin_reading_bop_update_nread_complete; } - STATS_CMD_NOKEY(c, bop_update); + } else { /* ret == ENGINE_E2BIG || ret == ENGINE_ENOMEM */ if (ret == ENGINE_E2BIG) write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); @@ -5593,6 +5633,12 @@ static void process_bin_bop_update_prepare_nread(conn *c) else handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } + if (ret != ENGINE_SUCCESS) { + if (settings.detail_enabled) { + stats_prefix_record_bop_update(key, nkey, false); + } + STATS_CMD_NOKEY(c, bop_update); + } } static void process_bin_bop_delete(conn *c) @@ -6044,12 +6090,15 @@ static void process_bin_bop_prepare_nread_keys(conn *c) if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 0; c->coll_op = (c->cmd==PROTOCOL_BINARY_CMD_BOP_MGET ? OPERATION_BOP_MGET : OPERATION_BOP_SMGET); - conn_set_state(c, conn_nread); - c->substate = bin_reading_bop_nread_keys_complete; + if (add_ritem_mblck(c, &c->memblist) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + } else { + conn_set_state(c, conn_nread); + c->substate = bin_reading_bop_nread_keys_complete; + } } else { /* ret == ENGINE_EBADVALUE || ret == ENGINE_ENOMEM */ if (ret == ENGINE_EBADVALUE) @@ -7117,26 +7166,29 @@ static void process_bin_update(conn *c) realtime(req->message.body.expiration), c->binary_header.request.cas); if (ret == ENGINE_SUCCESS) { + if (c->cmd == PROTOCOL_BINARY_CMD_ADD) + c->store_op = OPERATION_ADD; + else if (c->cmd == PROTOCOL_BINARY_CMD_SET) + c->store_op = OPERATION_SET; + else if (c->cmd == PROTOCOL_BINARY_CMD_REPLACE) + c->store_op = OPERATION_REPLACE; + else + assert(0); + + if (c->binary_header.request.cas != 0) { + c->store_op = OPERATION_CAS; + } + if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { mc_engine.v1->release(mc_engine.v0, c, it); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); ret = ENGINE_ENOMEM; + } else if (add_ritem_hinfo(c, &c->hinfo) != 0) { + mc_engine.v1->release(mc_engine.v0, c, it); + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; } else { - if (c->cmd == PROTOCOL_BINARY_CMD_ADD) - c->store_op = OPERATION_ADD; - else if (c->cmd == PROTOCOL_BINARY_CMD_SET) - c->store_op = OPERATION_SET; - else if (c->cmd == PROTOCOL_BINARY_CMD_REPLACE) - c->store_op = OPERATION_REPLACE; - else - assert(0); - - if (c->binary_header.request.cas != 0) { - c->store_op = OPERATION_CAS; - } - c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); conn_set_state(c, conn_nread); c->substate = bin_read_set_value; } @@ -7198,19 +7250,20 @@ static void process_bin_append_prepend(conn *c) ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen+2, 0, 0, c->binary_header.request.cas); if (ret == ENGINE_SUCCESS) { + c->item = it; + if (c->cmd == PROTOCOL_BINARY_CMD_APPEND) + c->store_op = OPERATION_APPEND; + else if (c->cmd == PROTOCOL_BINARY_CMD_PREPEND) + c->store_op = OPERATION_PREPEND; + else + assert(0); if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { mc_engine.v1->release(mc_engine.v0, c, it); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + } else if (add_ritem_hinfo(c, &c->hinfo) != 0) { + mc_engine.v1->release(mc_engine.v0, c, it); + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); } else { - if (c->cmd == PROTOCOL_BINARY_CMD_APPEND) - c->store_op = OPERATION_APPEND; - else if (c->cmd == PROTOCOL_BINARY_CMD_PREPEND) - c->store_op = OPERATION_PREPEND; - else - assert(0); - - c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); conn_set_state(c, conn_nread); c->substate = bin_read_set_value; } @@ -7529,6 +7582,8 @@ static int try_read_command_binary(conn *c) c->msgcurr = 0; c->msgused = 0; c->iovused = 0; + c->rlcurr = 0; + c->rlused = 0; int ret = add_msghdr(c); assert(ret == 0); @@ -8397,11 +8452,17 @@ static void process_prepare_nread_keys(conn *c, uint32_t vlen, uint32_t kcnt, bo } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_op = (return_cas ? OPERATION_MGETS : OPERATION_MGET); - conn_set_state(c, conn_nread); + if (add_ritem_mblck(c, &c->memblist) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + conn_set_state(c, conn_nread); + } } else { out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { c->sbytes = vlen; c->write_and_go = conn_swallow; } @@ -8485,14 +8546,17 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen, htonl(flags), realtime(exptime), req_cas_id); if (ret == ENGINE_SUCCESS) { + c->store_op = store_op; if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { mc_engine.v1->release(mc_engine.v0, c, it); out_string(c, "SERVER_ERROR error getting item data"); ret = ENGINE_ENOMEM; + } else if (add_ritem_hinfo(c, &c->hinfo) != 0) { + mc_engine.v1->release(mc_engine.v0, c, it); + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; } else { c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); - c->store_op = store_op; conn_set_state(c, conn_nread); } } else { @@ -9543,10 +9607,9 @@ static void process_extension_command(conn *c, token_t *tokens, size_t ntokens) conn_set_state(c, conn_new_cmd); } } + } else if (add_ritem(c, ptr, nbytes) != 0) { + out_string(c, "SERVER ERROR out of memory"); } else { - c->ritem = ptr; - c->rlbytes = nbytes; - c->rltotal = 0; c->ascii_cmd = cmd; /* NOT SUPPORTED YET! */ conn_set_state(c, conn_nread); @@ -10259,21 +10322,27 @@ static void process_lop_prepare_nread(conn *c, int cmd, size_t vlen, ret = mc_engine.v1->list_elem_alloc(mc_engine.v0, c, key, nkey, vlen, &elem); } if (ret == ENGINE_SUCCESS) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = OPERATION_LOP_INSERT; - c->coll_index = index; - conn_set_state(c, conn_nread); + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + c->coll_index = index; + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_lop_insert(key, nkey, false); } STATS_CMD_NOKEY(c, lop_insert); - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ @@ -10655,19 +10724,28 @@ static void process_sop_prepare_nread(conn *c, int cmd, size_t vlen, char *key, } } if (ret == ENGINE_SUCCESS) { - if (cmd == (int)OPERATION_SOP_INSERT) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; - } c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; - conn_set_state(c, conn_nread); + if (cmd == (int)OPERATION_SOP_INSERT) { + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + } else if (add_ritem(c, ((value_item *)elem)->ptr, vlen)){ + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + if (ret == ENGINE_SUCCESS) { + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (cmd == (int)OPERATION_SOP_INSERT) { if (settings.detail_enabled) stats_prefix_record_sop_insert(key, nkey, false); @@ -10681,10 +10759,6 @@ static void process_sop_prepare_nread(conn *c, int cmd, size_t vlen, char *key, stats_prefix_record_sop_exist(key, nkey, false); STATS_CMD_NOKEY(c, sop_exist); } - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); - if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ c->sbytes = vlen; @@ -11333,21 +11407,24 @@ static void process_bop_update_prepare_nread(conn *c, int cmd, ((value_item*)elem)->len = vlen; } if (ret == ENGINE_SUCCESS) { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; - conn_set_state(c, conn_nread); + if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_bop_update(key, nkey, false); } STATS_CMD_NOKEY(c, bop_update); - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ c->sbytes = vlen; if (c->state == conn_write) { @@ -11373,24 +11450,29 @@ static void process_bop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, nbkey, neflag, vlen, &elem); } if (ret == ENGINE_SUCCESS) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); - memcpy((void*)c->einfo.score, bkey, (c->einfo.nscore==0 ? sizeof(uint64_t) : c->einfo.nscore)); - if (c->einfo.neflag > 0) - memcpy((void*)c->einfo.eflag, eflag, c->einfo.neflag); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; /* OPERATION_BOP_INSERT | OPERATION_BOP_UPSERT */ - conn_set_state(c, conn_nread); + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + memcpy((void*)c->einfo.score, bkey, (c->einfo.nscore==0 ? sizeof(uint64_t) : c->einfo.nscore)); + if (c->einfo.neflag > 0) + memcpy((void*)c->einfo.eflag, eflag, c->einfo.neflag); + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_bop_insert(key, nkey, false); } STATS_CMD_NOKEY(c, bop_insert); - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); - if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ c->sbytes = vlen; @@ -11469,12 +11551,20 @@ static void process_bop_prepare_nread_keys(conn *c, int cmd, uint32_t vlen, uint } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 0; c->coll_op = cmd; - conn_set_state(c, conn_nread); + if (add_ritem_mblck(c, &c->memblist) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + conn_set_state(c, conn_nread); + } } else { + /* ret == ENGINE_ENOMEM */ + out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { #ifdef SUPPORT_BOP_MGET if (cmd == OPERATION_BOP_MGET) STATS_CMD_NOKEY(c, bop_mget); @@ -11483,9 +11573,6 @@ static void process_bop_prepare_nread_keys(conn *c, int cmd, uint32_t vlen, uint if (cmd == OPERATION_BOP_SMGET) STATS_CMD_NOKEY(c, bop_smget); #endif - /* ret == ENGINE_ENOMEM */ - out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ assert(c->state == conn_write); c->sbytes = vlen; @@ -11831,20 +11918,29 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, ((value_item*)elem)->len = vlen; } if (ret == ENGINE_SUCCESS) { - if (cmd == OPERATION_MOP_INSERT) { - mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_MAP, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; - } c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; - c->coll_field = *field; - conn_set_state(c, conn_nread); + if (cmd == OPERATION_MOP_INSERT) { + mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_MAP, elem, &c->einfo); + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + } else if (add_ritem(c, ((value_item *)elem)->ptr, vlen)){ + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + if (ret == ENGINE_SUCCESS) { + c->coll_field = *field; + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_mop_insert(key, nkey, false); } @@ -11853,10 +11949,6 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, } else if (cmd == OPERATION_MOP_UPDATE) { STATS_CMD_NOKEY(c, mop_update); } - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); - if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ c->sbytes = vlen; @@ -11879,20 +11971,25 @@ static void process_mop_prepare_nread_fields(conn *c, int cmd, char *key, size_t } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, flen); c->coll_ecount = 1; c->coll_op = cmd; - c->coll_lenkeys = flen; - conn_set_state(c, conn_nread); + if (add_ritem_mblck(c, &c->memblist) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + c->coll_lenkeys = flen; + conn_set_state(c, conn_nread); + } } else { + /* ret == ENGINE_ENOMEM */ + out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { if (cmd == OPERATION_MOP_DELETE) { STATS_CMD_NOKEY(c, mop_delete); } else if (cmd == OPERATION_MOP_GET) { STATS_CMD_NOKEY(c, mop_get); } - /* ret == ENGINE_ENOMEM */ - out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ c->sbytes = flen; if (c->state == conn_write) { @@ -13079,6 +13176,8 @@ static void process_command_ascii(conn *c, char *command, int cmdlen) c->msgcurr = 0; c->msgused = 0; c->iovused = 0; + c->rlcurr = 0; + c->rlused = 0; if (add_msghdr(c) != 0) { out_string(c, "SERVER_ERROR out of memory preparing response"); return; @@ -13767,7 +13866,7 @@ bool conn_nread(conn *c) { ssize_t res; - if (c->rlbytes == 0) { + if (c->rlcurr == c->rlused) { complete_nread(c); /* complete_nread eventually calls write functions @@ -13785,6 +13884,11 @@ bool conn_nread(conn *c) return true; } + if (c->rlbytes == 0) { + c->ritem = c->rlist[c->rlcurr].iov_base; + c->rlbytes = c->rlist[c->rlcurr].iov_len; + } + /* first check if we have leftovers in the conn_read buffer */ while (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; @@ -13795,14 +13899,14 @@ bool conn_nread(conn *c) c->rlbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; - if (c->rltotal > 0) { /* string block read */ - c->rltotal -= tocopy; - if (c->rlbytes == 0 && c->rltotal > 0) { - ritem_set_next(c); - continue; + if (c->rlbytes == 0) { /* string block read */ + c->rlcurr++; + if (c->rlcurr < c->rlused) { + c->ritem = c->rlist[c->rlcurr].iov_base; + c->rlbytes = c->rlist[c->rlcurr].iov_len; } } - if (c->rlbytes == 0) { + if (c->rlcurr == c->rlused) { return true; } } @@ -13816,11 +13920,8 @@ bool conn_nread(conn *c) } c->ritem += res; c->rlbytes -= res; - if (c->rltotal > 0) { - c->rltotal -= res; - if (c->rlbytes == 0 && c->rltotal > 0) { - ritem_set_next(c); - } + if (c->rlbytes == 0) { + c->rlcurr++; } return true; } diff --git a/memcached.h b/memcached.h index b2a49df1d..bd72a5aef 100644 --- a/memcached.h +++ b/memcached.h @@ -76,11 +76,15 @@ /** Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 +/** Initial size of list of items being read. */ +#define RITEM_LIST_INITIAL 1 + /** High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 +#define RITEM_LIST_HIGHWAT 400 /* Binary protocol stuff */ #define MIN_BIN_PKT_LENGTH 16 @@ -261,12 +265,6 @@ typedef bool (*STATE_FUNC)(conn *); /** * The structure representing a connection into memcached. */ -/* rtype in connection */ -#define CONN_RTYPE_NONE 0 -#define CONN_RTYPE_MBLCK 1 -#define CONN_RTYPE_HINFO 2 -#define CONN_RTYPE_EINFO 3 - struct conn { int sfd; short nevents; @@ -290,13 +288,16 @@ struct conn { STATE_FUNC write_and_go; void *write_and_free; /** free this memory after finishing writing */ - int rtype; /* CONN_RTYPE_XXXXX */ - int rindex; /* used when rtype is HINFO or EINFO */ - char *ritem; /** when we read in an item's value, it goes here */ - uint32_t rlbytes; + /** data for nread state */ + struct iovec *rlist; /** list to read data except commands if needed */ + uint32_t rlcurr; /** element in rlist[] being read now */ + uint32_t rlsize; /** total allocated size of rlist */ + uint32_t rlused; /** number of elements used in rlist[] */ + + char *ritem; /** data pointer being read currently */ + uint32_t rlbytes; /** data length being read currently */ + /* use memory blocks */ - uint32_t rltotal; /* Used when read data with memory block */ - mblck_node_t *membk; /* current memory block pointer */ mblck_list_t memblist; /* (key or field) string memory block list */ /* hash item and elem item info */