diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f8011ce..e992a3ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Change History +## December 20 2024: v8.0.0-beta.2 + +- **New Features** + - [CLIENT-2820] Actively refresh pool connections that will be idle before the next tend. + +- **Fixes** + - [CLIENT-3218] Fix FilterExpression encoding in Batch commands. + ## December 13 2024: v8.0.0-beta.1 Major breaking release. This release supports Multi-Record Transactions. Please note that this is a beta release and will be subject to breaking changes. diff --git a/batch_index_command_get.go b/batch_index_command_get.go index e949b7b1..0a7d0012 100644 --- a/batch_index_command_get.go +++ b/batch_index_command_get.go @@ -16,12 +16,13 @@ package aerospike import ( "github.com/aerospike/aerospike-client-go/v8/types" + Buffer "github.com/aerospike/aerospike-client-go/v8/utils/buffer" ) type batchIndexCommandGet struct { batchCommandOperate - indexRecords []*BatchRead + records []*BatchRead } func newBatchIndexCommandGet( @@ -31,19 +32,24 @@ func newBatchIndexCommandGet( records []*BatchRead, isOperation bool, ) batchIndexCommandGet { - recIfcs := make([]BatchRecordIfc, len(records)) - for i := range records { - recIfcs[i] = records[i] - } - res := batchIndexCommandGet{ - batchCommandOperate: newBatchCommandOperate(client, batch, policy, recIfcs), - indexRecords: records, + batchCommandOperate: newBatchCommandOperate(client, batch, policy, nil), + records: records, } res.txn = policy.Txn return res } +func (cmd *batchIndexCommandGet) writeBuffer(ifc command) Error { + attr, err := cmd.setBatchOperateRead(cmd.client, cmd.policy, cmd.records, cmd.batch) + cmd.attr = attr + return err +} + +func (cmd *batchIndexCommandGet) isRead() bool { + return true +} + func (cmd *batchIndexCommandGet) cloneBatchCommand(batch *batchNode) batcher { res := *cmd res.batch = batch @@ -60,7 +66,7 @@ func (cmd *batchIndexCommandGet) Execute() Error { } func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error { - for _, br := range cmd.indexRecords { + for _, br := range cmd.records { var ops []*Operation if br.headerOnly() { ops = []*Operation{GetHeaderOp()} @@ -96,3 +102,149 @@ func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error { } return nil } + +// Parse all results in the batch. Add records to shared list. +// If the record was not found, the bins will be nil. +func (cmd *batchIndexCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, Error) { + //Parse each message response and add it to the result array + cmd.dataOffset = 0 + for cmd.dataOffset < receiveSize { + if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil { + return false, err + } + resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF) + + info3 := int(cmd.dataBuffer[3]) + + // If cmd is the end marker of the response, do not proceed further + if resultCode == 0 && (info3&_INFO3_LAST) == _INFO3_LAST { + return false, nil + } + + generation := Buffer.BytesToUint32(cmd.dataBuffer, 6) + expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10)) + batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14)) + fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18)) + opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20)) + + err := cmd.parseFieldsBatch(resultCode, fieldCount, cmd.records[batchIndex]) + if err != nil { + return false, err + } + + if resultCode != 0 { + if resultCode == types.FILTERED_OUT { + cmd.filteredOutCnt++ + } + + // If it looks like the error is on the first record and the message is marked as last part, + // the error is for the whole command and not just for the first batchIndex + lastMessage := (info3 & _INFO3_LAST) == _INFO3_LAST + if resultCode != 0 && lastMessage && receiveSize == int(_MSG_REMAINING_HEADER_SIZE) { + return false, newError(resultCode).setNode(cmd.node) + } + + if resultCode == types.UDF_BAD_RESPONSE { + rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration) + if err != nil { + cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter)) + return false, err + } + + // for UDF failures + var msg any + if rec != nil { + msg = rec.Bins["FAILURE"] + } + + // Need to store record because failure bin contains an error message. + cmd.records[batchIndex].setRecord(rec) + if msg, ok := msg.(string); ok && len(msg) > 0 { + cmd.records[batchIndex].setErrorWithMsg(cmd.node, resultCode, msg, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter)) + } else { + cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter)) + } + + // If cmd is the end marker of the response, do not proceed further + // if (info3 & _INFO3_LAST) == _INFO3_LAST { + if lastMessage { + return false, nil + } + continue + } + + cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter)) + + // If cmd is the end marker of the response, do not proceed further + if (info3 & _INFO3_LAST) == _INFO3_LAST { + return false, nil + } + continue + } + + if resultCode == 0 { + if cmd.objects == nil { + rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration) + if err != nil { + cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter)) + return false, err + } + cmd.records[batchIndex].setRecord(rec) + } else if batchObjectParser != nil { + // mark it as found + cmd.objectsFound[batchIndex] = true + if err := batchObjectParser(cmd, batchIndex, opCount, fieldCount, generation, expiration); err != nil { + return false, err + + } + } + } + } + + return true, nil +} + +// Parses the given byte buffer and populate the result object. +// Returns the number of bytes that were parsed from the given buffer. +func (cmd *batchIndexCommandGet) parseRecord(key *Key, opCount int, generation, expiration uint32) (*Record, Error) { + bins := make(BinMap, opCount) + + for i := 0; i < opCount; i++ { + if err := cmd.readBytes(8); err != nil { + return nil, err + } + opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0)) + particleType := int(cmd.dataBuffer[5]) + nameSize := int(cmd.dataBuffer[7]) + + if err := cmd.readBytes(nameSize); err != nil { + return nil, err + } + name := string(cmd.dataBuffer[:nameSize]) + + particleBytesSize := opSize - (4 + nameSize) + if err := cmd.readBytes(particleBytesSize); err != nil { + return nil, err + } + value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize) + if err != nil { + return nil, err + } + + if cmd.isOperation { + if prev, ok := bins[name]; ok { + if prev2, ok := prev.(OpResults); ok { + bins[name] = append(prev2, value) + } else { + bins[name] = OpResults{prev, value} + } + } else { + bins[name] = value + } + } else { + bins[name] = value + } + } + + return newRecord(cmd.node, key, bins, generation, expiration), nil +} diff --git a/command.go b/command.go index 18a51773..57aec92f 100644 --- a/command.go +++ b/command.go @@ -1258,6 +1258,161 @@ func (cmd *baseCommand) setBatchOperateIfcOffsets( } +func (cmd *baseCommand) setBatchOperateRead( + client *Client, + policy *BatchPolicy, + records []*BatchRead, + batch *batchNode, +) (*batchAttr, Error) { + offsets := newBatchOffsetsNative(batch) + return cmd.setBatchOperateReadOffsets(client, policy, records, offsets) +} + +func (cmd *baseCommand) setBatchOperateReadOffsets( + client *Client, + policy *BatchPolicy, + records []*BatchRead, + offsets BatchOffsets, +) (*batchAttr, Error) { + max := offsets.size() + txn := policy.Txn + var versions []*uint64 + + // Estimate buffer size + cmd.begin() + + if txn != nil { + versions = make([]*uint64, max) + + for i := 0; i < max; i++ { + offset := offsets.get(i) + record := records[offset] + versions[i] = txn.GetReadVersion(record.key()) + } + } + + fieldCount := 1 + predSize := 0 + if policy.FilterExpression != nil { + var err Error + predSize, err = cmd.estimateExpressionSize(policy.FilterExpression) + if err != nil { + return nil, err + } + if predSize > 0 { + fieldCount++ + } + } + cmd.dataOffset += predSize + + cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 5 + + var prev BatchRecordIfc + var verPrev *uint64 + for i := 0; i < max; i++ { + record := records[offsets.get(i)] + key := record.key() + + var ver *uint64 + if len(versions) > 0 { + ver = versions[i] + } + + cmd.dataOffset += len(key.digest) + 4 + + // Try reference equality in hope that namespace/set for all keys is set from fixed variables. + // if !policy.SendKey && prev != nil && prev.key().namespace == key.namespace && (prev.key().setName == key.setName) && record.equals(prev) { + if canRepeat(policy, key, record, prev, ver, verPrev) { + // Can set repeat previous namespace/bin names to save space. + cmd.dataOffset++ + } else { + // Must write full header and namespace/set/bin names. + cmd.dataOffset += 12 // header(4) + ttl(4) + fielCount(2) + opCount(2) = 12 + cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE) + cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE) + cmd.sizeTxnBatch(txn, ver, record.BatchRec().hasWrite) + if sz, err := record.size(&policy.BasePolicy); err != nil { + return nil, err + } else { + cmd.dataOffset += sz + } + + prev = record + verPrev = ver + } + + } + + if err := cmd.sizeBuffer(policy.compress()); err != nil { + return nil, err + } + + cmd.writeBatchHeader(policy, fieldCount) + + if policy.FilterExpression != nil { + if err := cmd.writeFilterExpression(policy.FilterExpression, predSize); err != nil { + return nil, err + } + } + + // Write real field size. + fieldSizeOffset := cmd.dataOffset + cmd.writeFieldHeader(0, BATCH_INDEX) + + cmd.WriteUint32(uint32(max)) + + cmd.WriteByte(cmd.getBatchFlags(policy)) + + attr := &batchAttr{} + prev = nil + verPrev = nil + for i := 0; i < max; i++ { + index := offsets.get(i) + cmd.WriteUint32(uint32(index)) + + record := records[index] + + var ver *uint64 + if len(versions) > 0 { + ver = versions[i] + } + + key := record.key() + if _, err := cmd.Write(key.digest[:]); err != nil { + return nil, newCommonError(err) + } + + // Try reference equality in hope that namespace/set for all keys is set from fixed variables. + // if !policy.SendKey && prev != nil && prev.key().namespace == key.namespace && prev.key().setName == key.setName && record.equals(prev) { + if canRepeat(policy, key, record, prev, ver, verPrev) { + // Can set repeat previous namespace/bin names to save space. + cmd.WriteByte(_BATCH_MSG_REPEAT) // repeat + } else { + // Write full message. + attr.setBatchRead(client.getUsableBatchReadPolicy(record.Policy)) + if len(record.BinNames) > 0 { + cmd.writeBatchBinNames(key, txn, ver, record.BinNames, attr, attr.filterExp) + } else if record.Ops != nil { + attr.adjustRead(record.Ops) + cmd.writeBatchOperations(key, txn, ver, record.Ops, attr, attr.filterExp) + } else { + attr.adjustReadForAllBins(record.ReadAllBins) + cmd.writeBatchRead(key, txn, ver, attr, attr.filterExp, 0) + } + + prev = record + verPrev = ver + } + } + + cmd.WriteUint32At(uint32(cmd.dataOffset)-uint32(_MSG_TOTAL_HEADER_SIZE)-4, fieldSizeOffset) + cmd.end() + cmd.markCompressed(policy) + + return attr, nil + +} + func (cmd *baseCommand) setBatchOperate( policy *BatchPolicy, keys []*Key, @@ -2043,7 +2198,13 @@ func (cmd *baseCommand) writeBatchFieldsTxn( } if filter != nil { - filter.pack(cmd) + expSize, err := filter.size() + if err != nil { + return err + } + if err := cmd.writeFilterExpression(filter, expSize); err != nil { + return err + } } if attr.sendKey && key.hasValueToSend() { @@ -2088,7 +2249,13 @@ func (cmd *baseCommand) writeBatchFieldsReg( cmd.writeBatchFields(key, fieldCount, opCount) if filter != nil { - filter.pack(cmd) + expSize, err := filter.size() + if err != nil { + return err + } + if err := cmd.writeFilterExpression(filter, expSize); err != nil { + return err + } } if attr.sendKey && key.hasValueToSend() { @@ -2958,7 +3125,13 @@ func (cmd *baseCommand) writeKeyAttr( cmd.writeKeyWithPolicy(policy.GetBasePolicy(), key, attr.hasWrite) if filterExp != nil { - filterExp.pack(cmd) + expSize, err := filterExp.size() + if err != nil { + return err + } + if err := cmd.writeFilterExpression(filterExp, expSize); err != nil { + return err + } } return nil } diff --git a/connection.go b/connection.go index 255f11f9..6af6cf7e 100644 --- a/connection.go +++ b/connection.go @@ -453,6 +453,11 @@ func selectWithinRange[T int | uint | int64 | uint64](min, val, max T) T { return val } +// willBeIdleIn returns true if the connection will be idle before the next interval +func (ctn *Connection) willBeIdleIn(tendInterval time.Duration) bool { + return ctn.idleTimeout > 0 && time.Now().Add(tendInterval).After(ctn.idleDeadline) +} + // refresh extends the idle deadline of the connection. func (ctn *Connection) refresh() { now := time.Now() diff --git a/connection_heap.go b/connection_heap.go index cc6cb102..ce496186 100644 --- a/connection_heap.go +++ b/connection_heap.go @@ -17,6 +17,7 @@ package aerospike import ( "runtime" "sync" + "time" ) // singleConnectionHeap is a non-blocking LIFO heap. @@ -142,6 +143,49 @@ func (h *singleConnectionHeap) DropIdleTail() bool { return false } +// RefreshIdleTail closes idle connection in tail. +// It will return true if tail connection was idle and dropped +func (h *singleConnectionHeap) RefreshIdleTail(tendInterval time.Duration) bool { + h.mutex.Lock() + defer h.mutex.Unlock() + + // the heap has been cleaned up + if h.data == nil { + return false + } + + // if heap is not empty + if h.full || (h.tail != h.head) { + conn := h.data[(h.tail+1)%h.size] + + if conn.IsConnected() && conn.willBeIdleIn(tendInterval+time.Second) { + h.tail = (h.tail + 1) % h.size + h.data[h.tail] = nil + h.full = false + + // refresh in a goroutine asynchronously + go func() { + timeout := time.Second + deadline := time.Now().Add(timeout) + conn.SetTimeout(deadline, timeout) + conn.refresh() + if _, err := conn.RequestInfo("build"); err == nil { + // return to the pool + conn.refresh() + h.Offer(conn) + } else { + // drop on error + conn.Close() + } + }() + + return true + } + } + + return false +} + // Len returns the number of connections in the heap func (h *singleConnectionHeap) Len() int { cnt := 0 @@ -190,15 +234,17 @@ func newConnectionHeap(minSize, maxSize int) *connectionHeap { // will be >= 1 perHeapSize := maxSize / heapCount - heaps := make([]singleConnectionHeap, heapCount) - for i := range heaps { - heaps[i] = *newSingleConnectionHeap(perHeapSize) - } - // add a heap for the remainder remainder := maxSize - heapCount*perHeapSize - if remainder > 0 { - heaps = append(heaps, *newSingleConnectionHeap(remainder)) + + heaps := make([]singleConnectionHeap, heapCount) + for i := range heaps { + rem := 0 + if remainder > 0 { + rem = 1 + remainder-- + } + heaps[i] = *newSingleConnectionHeap(perHeapSize + rem) } return &connectionHeap{ @@ -240,20 +286,30 @@ func (h *connectionHeap) Poll(hint byte) (res *Connection) { // DropIdle closes all idle connections. // It will only drop connections if there are // at least ClientPolicy.MinConnectionPerNode available -func (h *connectionHeap) DropIdle() { +func (h *connectionHeap) DropIdle(tendInterval time.Duration) { // decide how many conns are allowed to drop // in minSize is 0, up to all connection can // be closed if idle excessCount := h.LenAll() - h.minSize - if excessCount <= 0 { - return + excessDropped := false + + if excessCount > 0 { + MAIN_LOOP: + for i := 0; i < len(h.heaps); i++ { + for h.heaps[i].DropIdleTail() { + excessCount-- + if excessCount == 0 { + excessDropped = true + break MAIN_LOOP + } + } + } } - for i := 0; i < len(h.heaps); i++ { - for h.heaps[i].DropIdleTail() { - excessCount-- - if excessCount == 0 { - return + // Now refresh the rest if they are approaching Idle + if excessDropped || h.LenAll() <= h.minSize { + for i := 0; i < len(h.heaps); i++ { + for h.heaps[i].RefreshIdleTail(tendInterval) { } } } diff --git a/node.go b/node.go index 02389d95..b596579b 100644 --- a/node.go +++ b/node.go @@ -399,7 +399,9 @@ func (nd *Node) refreshFailed(e Error) { // if that connection is idle, it drops it and takes the next one until it picks // a fresh connection or exhaust the queue. func (nd *Node) dropIdleConnections() { - nd.connections.DropIdle() + if nd.cluster != nil { + nd.connections.DropIdle(nd.cluster.clientPolicy.TendInterval) + } } // GetConnection gets a connection to the node.