diff --git a/go/api/base_client.go b/go/api/base_client.go index ed59831973..61f8bef4bc 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1515,3 +1515,23 @@ func (client *baseClient) ZRevRankWithScore(key string, member string) (Result[i } return handleLongAndDoubleOrNullResponse(result) } + +func (client *baseClient) XTrim(key string, options *options.XTrimOptions) (Result[int64], error) { + xTrimArgs, err := options.ToArgs() + if err != nil { + return CreateNilInt64Result(), err + } + result, err := client.executeCommand(C.XTrim, append([]string{key}, xTrimArgs...)) + if err != nil { + return CreateNilInt64Result(), err + } + return handleLongResponse(result) +} + +func (client *baseClient) XLen(key string) (Result[int64], error) { + result, err := client.executeCommand(C.XLen, []string{key}) + if err != nil { + return CreateNilInt64Result(), err + } + return handleLongResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index 2a07c0ad2c..95a8c69d33 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -85,36 +85,34 @@ func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions { } // Match exactly on the threshold. -func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions { - xto.exact = triStateBoolTrue - return xto +func (xTrimOptions *XTrimOptions) SetExactTrimming() *XTrimOptions { + xTrimOptions.exact = triStateBoolTrue + return xTrimOptions } // Trim in a near-exact manner, which is more efficient. -func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { - xto.exact = triStateBoolFalse - return xto +func (xTrimOptions *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { + xTrimOptions.exact = triStateBoolFalse + return xTrimOptions } // Max number of stream entries to be trimmed for non-exact match. -func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { - xto.exact = triStateBoolFalse - xto.limit = limit - return xto +func (xTrimOptions *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { + xTrimOptions.exact = triStateBoolFalse + xTrimOptions.limit = limit + return xTrimOptions } -func (xto *XTrimOptions) ToArgs() ([]string, error) { - args := []string{} - args = append(args, xto.method) - if xto.exact == triStateBoolTrue { +func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) { + args := []string{xTrimOptions.method} + if xTrimOptions.exact == triStateBoolTrue { args = append(args, "=") - } else if xto.exact == triStateBoolFalse { + } else if xTrimOptions.exact == triStateBoolFalse { args = append(args, "~") } - args = append(args, xto.threshold) - if xto.limit > 0 { - args = append(args, "LIMIT", utils.IntToString(xto.limit)) + args = append(args, xTrimOptions.threshold) + if xTrimOptions.limit > 0 { + args = append(args, "LIMIT", utils.IntToString(xTrimOptions.limit)) } - var err error - return args, err + return args, nil } diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 1696a168c2..5bc1f20856 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -49,4 +49,56 @@ type StreamCommands interface { // // [valkey.io]: https://valkey.io/commands/xadd/ XAddWithOptions(key string, values [][]string, options *options.XAddOptions) (Result[string], error) + + // Trims the stream by evicting older entries. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // options - Stream trim options + // + // Return value: + // Result[int64] - The number of entries deleted from the stream. + // + // For example: + // xAddResult, err = client.XAddWithOptions( + // "key1", + // [][]string{{field1, "foo4"}, {field2, "bar4"}}, + // options.NewXAddOptions().SetTrimOptions( + // options.NewXTrimOptionsWithMinId(id).SetExactTrimming(), + // ), + // ) + // xTrimResult, err := client.XTrim( + // "key1", + // options.NewXTrimOptionsWithMaxLen(1).SetExactTrimming(), + // ) + // fmt.Println(xTrimResult.Value()) // Output: 1 + // + // [valkey.io]: https://valkey.io/commands/xtrim/ + XTrim(key string, options *options.XTrimOptions) (Result[int64], error) + + // Returns the number of entries in the stream stored at `key`. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // + // Return value: + // Result[int64] - The number of entries in the stream. If `key` does not exist, return 0. + // + // For example: + // xAddResult, err = client.XAddWithOptions( + // "key1", + // [][]string{{field1, "foo4"}, {field2, "bar4"}}, + // options.NewXAddOptions().SetTrimOptions( + // options.NewXTrimOptionsWithMinId(id).SetExactTrimming(), + // ), + // ) + // xLenResult, err = client.XLen("key1") + // fmt.Println(xLenResult.Value()) // Output: 2 + // + // [valkey.io]: https://valkey.io/commands/xlen/ + XLen(key string) (Result[int64], error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index f6efdcfe3f..b63ee159ba 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4554,3 +4554,101 @@ func (suite *GlideTestSuite) TestZRevRank() { assert.IsType(suite.T(), &api.RequestError{}, err) }) } + +func (suite *GlideTestSuite) Test_XAdd_XLen_XTrim() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key1 := uuid.NewString() + key2 := uuid.NewString() + field1 := uuid.NewString() + field2 := uuid.NewString() + t := suite.T() + xAddResult, err := client.XAddWithOptions( + key1, + [][]string{{field1, "foo"}, {field2, "bar"}}, + options.NewXAddOptions().SetDontMakeNewStream(), + ) + assert.NoError(t, err) + assert.True(t, xAddResult.IsNil()) + + xAddResult, err = client.XAddWithOptions( + key1, + [][]string{{field1, "foo1"}, {field2, "bar1"}}, + options.NewXAddOptions().SetId("0-1"), + ) + assert.NoError(t, err) + assert.Equal(t, xAddResult.Value(), "0-1") + + xAddResult, err = client.XAdd( + key1, + [][]string{{field1, "foo2"}, {field2, "bar2"}}, + ) + assert.NoError(t, err) + assert.False(t, xAddResult.IsNil()) + + xLenResult, err := client.XLen(key1) + assert.NoError(t, err) + assert.Equal(t, xLenResult.Value(), int64(2)) + + // Trim the first entry. + xAddResult, err = client.XAddWithOptions( + key1, + [][]string{{field1, "foo3"}, {field2, "bar2"}}, + options.NewXAddOptions().SetTrimOptions( + options.NewXTrimOptionsWithMaxLen(2).SetExactTrimming(), + ), + ) + assert.NotNil(t, xAddResult.Value()) + assert.NoError(t, err) + id := xAddResult.Value() + xLenResult, err = client.XLen(key1) + assert.NoError(t, err) + assert.Equal(t, xLenResult.Value(), int64(2)) + + // Trim the second entry. + xAddResult, err = client.XAddWithOptions( + key1, + [][]string{{field1, "foo4"}, {field2, "bar4"}}, + options.NewXAddOptions().SetTrimOptions( + options.NewXTrimOptionsWithMinId(id).SetExactTrimming(), + ), + ) + assert.NoError(t, err) + assert.NotNil(t, xAddResult.Value()) + xLenResult, err = client.XLen(key1) + assert.NoError(t, err) + assert.Equal(t, xLenResult.Value(), int64(2)) + + // Test xtrim to remove 1 element + xTrimResult, err := client.XTrim( + key1, + options.NewXTrimOptionsWithMaxLen(1).SetExactTrimming(), + ) + assert.NoError(t, err) + assert.Equal(t, xTrimResult.Value(), int64(1)) + xLenResult, err = client.XLen(key1) + assert.NoError(t, err) + assert.Equal(t, xLenResult.Value(), int64(1)) + + // Key does not exist - returns 0 + xTrimResult, err = client.XTrim( + key2, + options.NewXTrimOptionsWithMaxLen(1).SetExactTrimming(), + ) + assert.NoError(t, err) + assert.Equal(t, xTrimResult.Value(), int64(0)) + xLenResult, err = client.XLen(key2) + assert.NoError(t, err) + assert.Equal(t, xLenResult.Value(), int64(0)) + + // Throw Exception: Key exists - but it is not a stream + setResult, err := client.Set(key2, "xtrimtest") + assert.NoError(t, err) + assert.Equal(t, setResult.Value(), "OK") + _, err = client.XTrim(key2, options.NewXTrimOptionsWithMinId("0-1")) + assert.NotNil(t, err) + assert.IsType(t, &api.RequestError{}, err) + _, err = client.XLen(key2) + assert.NotNil(t, err) + assert.IsType(t, &api.RequestError{}, err) + }) +}