Skip to content

Commit

Permalink
[FEAT] kv and os compression
Browse files Browse the repository at this point in the history
 Fix #624
  • Loading branch information
aricart committed Oct 26, 2023
1 parent 17a02a6 commit 9bd89b3
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 4 deletions.
8 changes: 8 additions & 0 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
Republish,
RetentionPolicy,
StorageType,
StoreCompression,
StreamConfig,
StreamInfo,
StreamSource,
Expand Down Expand Up @@ -270,6 +271,9 @@ export class Bucket implements KV, KvRemove {
if (opts.metadata) {
sc.metadata = opts.metadata;
}
if (opts.compression) {
sc.compression = opts.compression;
}

const nci = (this.js as unknown as { nc: NatsConnectionImpl }).nc;
const have = nci.getServerVersion();
Expand Down Expand Up @@ -943,6 +947,10 @@ export class KvStatusImpl implements KvStatus {
get metadata(): Record<string, string> {
return this.si.config.metadata ?? {};
}

get compression(): StoreCompression {
return this.si.config.compression || StoreCompression.None;
}
}

class KvStoredEntryImpl implements KvEntry {
Expand Down
8 changes: 8 additions & 0 deletions jetstream/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
DiscardPolicy,
PurgeResponse,
StorageType,
StoreCompression,
StreamConfig,
StreamInfo,
StreamInfoRequestOptions,
Expand Down Expand Up @@ -105,6 +106,10 @@ export class ObjectStoreStatusImpl implements ObjectStoreStatus {
get metadata(): Record<string, string> | undefined {
return this.si.config.metadata;
}

get compression(): StoreCompression {
return this.si.config.compression || StoreCompression.None;
}
}

export type ServerObjectStoreMeta = {
Expand Down Expand Up @@ -789,6 +794,9 @@ export class ObjectStoreImpl implements ObjectStore {
if (opts.metadata) {
sc.metadata = opts.metadata;
}
if (opts.compression) {
sc.compression = opts.compression;
}

try {
await this.jsm.streams.info(sc.name);
Expand Down
20 changes: 17 additions & 3 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
KvOptions,
nanos,
StorageType,
StoreCompression,
} from "../mod.ts";

import {
Expand Down Expand Up @@ -1584,13 +1585,11 @@ Deno.test("kv - mirror cross domain", async () => {
const buf: KvEntry[] = [];
m.set(bucket, buf);

const done = (async () => {
return (async () => {
for await (const e of iter) {
buf.push(e);
}
})().then();

return done;
}

watch(kv, "test", "name").then();
Expand Down Expand Up @@ -1932,3 +1931,18 @@ Deno.test("kv - republish header handling", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - compression", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();
const s2 = await js.views.kv("compressed", {
compression: StoreCompression.S2,
});
let status = await s2.status();
assertEquals(status.compression, StoreCompression.S2);

const none = await js.views.kv("none");
status = await none.status();
assertEquals(status.compression, StoreCompression.None);
await cleanup(ns, nc);
});
22 changes: 21 additions & 1 deletion jetstream/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ import {
} from "https://deno.land/[email protected]/assert/mod.ts";
import { DataBuffer } from "../../nats-base-client/databuffer.ts";
import { crypto } from "https://deno.land/[email protected]/crypto/mod.ts";
import { ObjectInfo, ObjectStoreMeta, StorageType } from "../mod.ts";
import {
ObjectInfo,
ObjectStoreMeta,
StorageType,
StoreCompression,
} from "../mod.ts";
import { Empty, headers, nanos, StringCodec } from "../../src/mod.ts";
import { equals } from "https://deno.land/[email protected]/bytes/mod.ts";
import { SHA256 } from "../../nats-base-client/sha256.js";
Expand Down Expand Up @@ -1099,3 +1104,18 @@ Deno.test("objectstore - stream metadata and entry metadata", async () => {

await cleanup(ns, nc);
});

Deno.test("os - compression", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();
const s2 = await js.views.os("compressed", {
compression: StoreCompression.S2,
});
let status = await s2.status();
assertEquals(status.compression, StoreCompression.S2);

const none = await js.views.os("none");
status = await none.status();
assertEquals(status.compression, StoreCompression.None);
await cleanup(ns, nc);
});
17 changes: 17 additions & 0 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
ReplayPolicy,
Republish,
StorageType,
StoreCompression,
StreamAlternate,
StreamConfig,
StreamInfo,
Expand Down Expand Up @@ -1017,6 +1018,12 @@ export interface KvLimits {
* FIXME: remove this on 1.8
*/
backingStore: StorageType;

/**
* Sets the compression level of the KV. This feature is only supported in
* servers 2.10.x and better.
*/
compression?: StoreCompression;
}

export interface KvStatus extends KvLimits {
Expand Down Expand Up @@ -1365,6 +1372,11 @@ export type ObjectStoreStatus = {
* 2.10.x and better.
*/
metadata?: Record<string, string> | undefined;
/**
* Compression level of the stream. This feature is only supported in
* servers 2.10.x and better.
*/
compression: StoreCompression;
};
/**
* @deprecated {@link ObjectStoreStatus}
Expand Down Expand Up @@ -1402,6 +1414,11 @@ export type ObjectStoreOptions = {
* 2.10.x and better.
*/
metadata?: Record<string, string>;
/**
* Sets the compression level of the stream. This feature is only supported in
* servers 2.10.x and better.
*/
compression?: StoreCompression;
};
/**
* An object that allows reading the object stored under a specified name.
Expand Down

0 comments on commit 9bd89b3

Please sign in to comment.