Skip to content

Commit

Permalink
feat!: VEC-273 0.9.1 support (#11)
Browse files Browse the repository at this point in the history
* !feat: VEC-273 0.9.1 support
- replace ScheduleDelay with Schedule
- replace merge.Parallel with merge.IndexParallel
- add merge.ReIndexParallel
- add applyDefaults to IndexGet and IndexList
  • Loading branch information
Jesse S authored Sep 6, 2024
1 parent b7ab11d commit 97c1df4
Show file tree
Hide file tree
Showing 18 changed files with 792 additions and 381 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ jobs:
echo "$FEATURES_CONF" > docker/tls/config/features.conf
echo "$FEATURES_CONF" > docker/mtls/config/features.conf
echo "$FEATURES_CONF" > docker/auth/config/features.conf
# - name: Login to Aerospike Jfrog
# run: |
# docker login aerospike.jfrog.io --username ${{ secrets.JFROG_USERNAME }} --password ${{ secrets.JFROG_ACCESS_TOKEN }}
- name: Login to Aerospike Jfrog
run: |
docker login aerospike.jfrog.io --username ${{ secrets.JFROG_USERNAME }} --password ${{ secrets.JFROG_ACCESS_TOKEN }}
- name: Run tests
run: |
make coverage
Expand Down
124 changes: 64 additions & 60 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewClient(
tlsConfig *tls.Config,
logger *slog.Logger,
) (*Client, error) {
logger = logger.WithGroup("avs.admin")
logger = logger.WithGroup("avs")
logger.Info("creating new client")

channelProvider, err := newChannelProvider(
Expand Down Expand Up @@ -625,11 +625,7 @@ func (c *Client) WaitForIndexCompletion(
return err
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: indexName,
}

indexStatusReq := createIndexStatusRequest(namespace, indexName)
timer := time.NewTimer(waitInterval)
startTime := time.Now()
unmergedZeroCount := 0
Expand All @@ -638,7 +634,7 @@ func (c *Client) WaitForIndexCompletion(
defer timer.Stop()

for {
indexStatus, err := conn.indexClient.GetStatus(ctx, indexID)
indexStatus, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
logger.ErrorContext(ctx, "failed to wait for index completion", slog.Any("error", err))
return err
Expand Down Expand Up @@ -693,7 +689,7 @@ type IndexCreateOpts struct {
//
// ctx (context.Context): The context for the operation.
// namespace (string): The namespace of the index.
// name (string): The name of the index.
// indexName (string): The name of the index.
// vectorField (string): The field to create the index on.
// dimensions (uint32): The number of dimensions in the vector.
// vectorDistanceMetric (protos.VectorDistanceMetric): The distance metric to use for the index.
Expand All @@ -705,13 +701,13 @@ type IndexCreateOpts struct {
func (c *Client) IndexCreate(
ctx context.Context,
namespace string,
name string,
indexName string,
vectorField string,
dimensions uint32,
vectorDistanceMetric protos.VectorDistanceMetric,
opts *IndexCreateOpts,
) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))
logger.DebugContext(ctx, "creating index")

var (
Expand Down Expand Up @@ -741,7 +737,7 @@ func (c *Client) IndexCreate(
indexDef := &protos.IndexDefinition{
Id: &protos.IndexId{
Namespace: namespace,
Name: name,
Name: indexName,
},
Dimensions: dimensions,
VectorDistanceMetric: vectorDistanceMetric,
Expand Down Expand Up @@ -783,7 +779,11 @@ func (c *Client) IndexCreateFromIndexDef(
return NewAVSError(msg, err)
}

_, err = conn.indexClient.Create(ctx, indexDef)
indexCreateReq := &protos.IndexCreateRequest{
Definition: indexDef,
}

_, err = conn.indexClient.Create(ctx, indexCreateReq)
if err != nil {
msg := "failed to create index"
logger.Error(msg, slog.Any("error", err))
Expand Down Expand Up @@ -813,11 +813,11 @@ func (c *Client) IndexCreateFromIndexDef(
func (c *Client) IndexUpdate(
ctx context.Context,
namespace string,
name string,
indexName string,
metadata map[string]string,
hnswParams *protos.HnswIndexUpdate,
) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))

logger.DebugContext(ctx, "updating index")

Expand All @@ -832,7 +832,7 @@ func (c *Client) IndexUpdate(
indexUpdate := &protos.IndexUpdateRequest{
IndexId: &protos.IndexId{
Namespace: namespace,
Name: name,
Name: indexName,
},
Labels: metadata,
Update: &protos.IndexUpdateRequest_HnswIndexUpdate{
Expand All @@ -857,13 +857,13 @@ func (c *Client) IndexUpdate(
//
// ctx (context.Context): The context for the operation.
// namespace (string): The namespace of the index to drop.
// name (string): The name of the index to drop.
// indexName (string): The name of the index to drop.
//
// Returns:
//
// error: An error if the index drop fails, otherwise nil.
func (c *Client) IndexDrop(ctx context.Context, namespace, name string) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
func (c *Client) IndexDrop(ctx context.Context, namespace, indexName string) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))
logger.DebugContext(ctx, "dropping index")

conn, err := c.channelProvider.GetRandomConn()
Expand All @@ -874,12 +874,14 @@ func (c *Client) IndexDrop(ctx context.Context, namespace, name string) error {
return NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
indexDropReq := &protos.IndexDropRequest{
IndexId: &protos.IndexId{
Namespace: namespace,
Name: indexName,
},
}

_, err = conn.indexClient.Drop(ctx, indexID)
_, err = conn.indexClient.Drop(ctx, indexDropReq)
if err != nil {
msg := "failed to drop index"

Expand All @@ -891,7 +893,7 @@ func (c *Client) IndexDrop(ctx context.Context, namespace, name string) error {
ctx, cancel := context.WithTimeout(ctx, indexTimeoutDuration)
defer cancel()

return c.waitForIndexDrop(ctx, namespace, name, indexWaitDuration)
return c.waitForIndexDrop(ctx, namespace, indexName, indexWaitDuration)
}

// IndexList returns a list of all Aerospike Vector Indexes.
Expand All @@ -904,7 +906,7 @@ func (c *Client) IndexDrop(ctx context.Context, namespace, name string) error {
//
// *protos.IndexDefinitionList: A list of index definitions.
// error: An error if the list retrieval fails, otherwise nil.
func (c *Client) IndexList(ctx context.Context) (*protos.IndexDefinitionList, error) {
func (c *Client) IndexList(ctx context.Context, applyDefaults bool) (*protos.IndexDefinitionList, error) {
c.logger.DebugContext(ctx, "listing indexes")

conn, err := c.channelProvider.GetRandomConn()
Expand All @@ -916,7 +918,11 @@ func (c *Client) IndexList(ctx context.Context) (*protos.IndexDefinitionList, er
return nil, NewAVSErrorFromGrpc(msg, err)
}

indexList, err := conn.indexClient.List(ctx, nil)
indexListReq := &protos.IndexListRequest{
ApplyDefaults: &applyDefaults,
}

indexList, err := conn.indexClient.List(ctx, indexListReq)
if err != nil {
msg := "failed to get indexes"

Expand All @@ -934,14 +940,20 @@ func (c *Client) IndexList(ctx context.Context) (*protos.IndexDefinitionList, er
//
// ctx (context.Context): The context for the operation.
// namespace (string): The namespace of the index.
// name (string): The name of the index.
// indexName (string): The name of the index.
// applyDefaults (bool): Whether to apply server default values to the index definition.
//
// Returns:
//
// *protos.IndexDefinition: The index definition.
// error: An error if the retrieval fails, otherwise nil.
func (c *Client) IndexGet(ctx context.Context, namespace, name string) (*protos.IndexDefinition, error) {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
func (c *Client) IndexGet(
ctx context.Context,
namespace,
indexName string,
applyDefaults bool,
) (*protos.IndexDefinition, error) {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))
logger.DebugContext(ctx, "getting index")

conn, err := c.channelProvider.GetRandomConn()
Expand All @@ -952,12 +964,15 @@ func (c *Client) IndexGet(ctx context.Context, namespace, name string) (*protos.
return nil, NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
indexGetReq := &protos.IndexGetRequest{
IndexId: &protos.IndexId{
Namespace: namespace,
Name: indexName,
},
ApplyDefaults: &applyDefaults,
}

indexDef, err := conn.indexClient.Get(ctx, indexID)
indexDef, err := conn.indexClient.Get(ctx, indexGetReq)
if err != nil {
msg := "failed to get index"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand All @@ -974,14 +989,14 @@ func (c *Client) IndexGet(ctx context.Context, namespace, name string) (*protos.
//
// ctx (context.Context): The context for the operation.
// namespace (string): The namespace of the index.
// name (string): The name of the index.
// indexName (string): The name of the index.
//
// Returns:
//
// *protos.IndexStatusResponse: The index status.
// error: An error if the retrieval fails, otherwise nil.
func (c *Client) IndexGetStatus(ctx context.Context, namespace, name string) (*protos.IndexStatusResponse, error) {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
func (c *Client) IndexGetStatus(ctx context.Context, namespace, indexName string) (*protos.IndexStatusResponse, error) {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))
logger.DebugContext(ctx, "getting index status")

conn, err := c.channelProvider.GetRandomConn()
Expand All @@ -992,12 +1007,9 @@ func (c *Client) IndexGetStatus(ctx context.Context, namespace, name string) (*p
return nil, NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
}
indexStatusReq := createIndexStatusRequest(namespace, indexName)

indexStatus, err := conn.indexClient.GetStatus(ctx, indexID)
indexStatus, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
msg := "failed to get index status"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand All @@ -1014,16 +1026,16 @@ func (c *Client) IndexGetStatus(ctx context.Context, namespace, name string) (*p
//
// ctx (context.Context): The context for the operation.
// namespace (string): The namespace of the index.
// name (string): The name of the index.
// indexName (string): The name of the index.
// cutoffTime (time.Time): The cutoff time for the garbage collection.
//
// Returns:
//
// error: An error if the garbage collection fails, otherwise nil.
func (c *Client) GcInvalidVertices(ctx context.Context, namespace, name string, cutoffTime time.Time) error {
func (c *Client) GcInvalidVertices(ctx context.Context, namespace, indexName string, cutoffTime time.Time) error {
logger := c.logger.With(
slog.String("namespace", namespace),
slog.String("name", name),
slog.String("indexName", indexName),
slog.Any("cutoffTime", cutoffTime),
)

Expand All @@ -1040,7 +1052,7 @@ func (c *Client) GcInvalidVertices(ctx context.Context, namespace, name string,
gcRequest := &protos.GcInvalidVerticesRequest{
IndexId: &protos.IndexId{
Namespace: namespace,
Name: name,
Name: indexName,
},
CutoffTimestamp: cutoffTime.Unix(),
}
Expand Down Expand Up @@ -1543,10 +1555,10 @@ func (c *Client) getConnection(nodeID *protos.NodeId) (*connection, error) {
// The amount of time to wait between each call is defined by waitInterval.
func (c *Client) waitForIndexCreation(ctx context.Context,
namespace,
name string,
indexName string,
waitInterval time.Duration,
) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))

conn, err := c.channelProvider.GetRandomConn()
if err != nil {
Expand All @@ -1556,17 +1568,13 @@ func (c *Client) waitForIndexCreation(ctx context.Context,
return NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
}

indexStatusReq := createIndexStatusRequest(namespace, indexName)
timer := time.NewTimer(waitInterval)

defer timer.Stop()

for {
_, err := conn.indexClient.GetStatus(ctx, indexID)
_, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
code := status.Code(err)
if code == codes.Unavailable || code == codes.NotFound {
Expand Down Expand Up @@ -1600,8 +1608,8 @@ func (c *Client) waitForIndexCreation(ctx context.Context,

// waitForIndexDrop waits for an index to be dropped and blocks until it is. The
// amount of time to wait between each call is defined by waitInterval.
func (c *Client) waitForIndexDrop(ctx context.Context, namespace, name string, waitInterval time.Duration) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
func (c *Client) waitForIndexDrop(ctx context.Context, namespace, indexName string, waitInterval time.Duration) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))

conn, err := c.channelProvider.GetRandomConn()
if err != nil {
Expand All @@ -1611,17 +1619,13 @@ func (c *Client) waitForIndexDrop(ctx context.Context, namespace, name string, w
return NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
}

indexStatusReq := createIndexStatusRequest(namespace, indexName)
timer := time.NewTimer(waitInterval)

defer timer.Stop()

for {
_, err := conn.indexClient.GetStatus(ctx, indexID)
_, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
code := status.Code(err)
if code == codes.Unavailable || code == codes.NotFound {
Expand Down
2 changes: 1 addition & 1 deletion docker/auth/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
timeout: 20s
retries: 20
avs:
image: aerospike/aerospike-vector-search:0.9.0
image: aerospike.jfrog.io/docker/aerospike/aerospike-vector-search-private:0.9.1-SNAPSHOT
depends_on:
aerospike:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker/mtls/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
timeout: 20s
retries: 20
avs:
image: aerospike/aerospike-vector-search:0.9.0
image: aerospike.jfrog.io/docker/aerospike/aerospike-vector-search-private:0.9.1-SNAPSHOT
depends_on:
aerospike:
condition: service_healthy
Expand Down
6 changes: 3 additions & 3 deletions docker/multi-node-LB/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
depends_on:
aerospike:
condition: service_healthy
image: aerospike/aerospike-vector-search:0.9.0
image: aerospike.jfrog.io/docker/aerospike/aerospike-vector-search-private:0.9.1-SNAPSHOT
volumes:
- ./config/aerospike-vector-search-1.yml:/etc/aerospike-vector-search/aerospike-vector-search.yml
- ./config/features.conf:/etc/aerospike-vector-search/features.conf
Expand All @@ -35,7 +35,7 @@ services:
depends_on:
aerospike:
condition: service_healthy
image: aerospike/aerospike-vector-search:0.9.0
image: aerospike.jfrog.io/docker/aerospike/aerospike-vector-search-private:0.9.1-SNAPSHOT
volumes:
- ./config/aerospike-vector-search-2.yml:/etc/aerospike-vector-search/aerospike-vector-search.yml
- ./config/features.conf:/etc/aerospike-vector-search/features.conf
Expand All @@ -50,7 +50,7 @@ services:
depends_on:
aerospike:
condition: service_healthy
image: aerospike/aerospike-vector-search:0.9.0
image: aerospike.jfrog.io/docker/aerospike/aerospike-vector-search-private:0.9.1-SNAPSHOT
volumes:
- ./config/aerospike-vector-search-3.yml:/etc/aerospike-vector-search/aerospike-vector-search.yml
- ./config/features.conf:/etc/aerospike-vector-search/features.conf
Expand Down
Loading

0 comments on commit 97c1df4

Please sign in to comment.