diff --git a/Cargo.lock b/Cargo.lock index dbc13605b..aafa75496 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3212,8 +3212,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] name = "fsst" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-array", "rand 0.9.4", @@ -4426,8 +4426,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a" [[package]] name = "lance" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arc-swap", "arrow", @@ -4497,8 +4497,8 @@ dependencies = [ [[package]] name = "lance-arrow" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-array", "arrow-buffer", @@ -4518,8 +4518,8 @@ dependencies = [ [[package]] name = "lance-bitpacking" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrayref", "paste", @@ -4528,8 +4528,8 @@ dependencies = [ [[package]] name = "lance-core" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-array", "arrow-buffer", @@ -4564,8 +4564,8 @@ dependencies = [ [[package]] name = "lance-datafusion" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow", "arrow-array", @@ -4595,8 +4595,8 @@ dependencies = [ [[package]] name = "lance-datagen" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow", "arrow-array", @@ -4614,8 +4614,8 @@ dependencies = [ [[package]] name = "lance-encoding" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-arith", "arrow-array", @@ -4650,8 +4650,8 @@ dependencies = [ [[package]] name = "lance-file" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-arith", "arrow-array", @@ -4682,8 +4682,8 @@ dependencies = [ [[package]] name = "lance-index" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arc-swap", "arrow", @@ -4747,8 +4747,8 @@ dependencies = [ [[package]] name = "lance-io" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow", "arrow-arith", @@ -4790,8 +4790,8 @@ dependencies = [ [[package]] name = "lance-linalg" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-array", "arrow-buffer", @@ -4807,8 +4807,8 @@ dependencies = [ [[package]] name = "lance-namespace" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow", "async-trait", @@ -4820,8 +4820,8 @@ dependencies = [ [[package]] name = "lance-namespace-impls" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow", "arrow-ipc", @@ -4870,8 +4870,8 @@ dependencies = [ [[package]] name = "lance-table" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow", "arrow-array", @@ -4910,8 +4910,8 @@ dependencies = [ [[package]] name = "lance-testing" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "arrow-array", "arrow-schema", @@ -4922,8 +4922,8 @@ dependencies = [ [[package]] name = "lance-tokenizer" -version = "7.0.0-beta.10" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a" +version = "7.0.0-beta.12" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e" dependencies = [ "jieba-rs", "lindera", diff --git a/Cargo.toml b/Cargo.toml index 54f1c744e..55f5e8b73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,20 +13,20 @@ categories = ["database-implementations"] rust-version = "1.91.0" [workspace.dependencies] -lance = { "version" = "=7.0.0-beta.10", default-features = false, "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-core = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-datagen = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-file = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-io = { "version" = "=7.0.0-beta.10", default-features = false, "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-index = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-linalg = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-namespace = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-namespace-impls = { "version" = "=7.0.0-beta.10", default-features = false, "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-table = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-testing = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-datafusion = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-encoding = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } -lance-arrow = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" } +lance = { "version" = "=7.0.0-beta.12", default-features = false, "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-core = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-datagen = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-file = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-io = { "version" = "=7.0.0-beta.12", default-features = false, "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-index = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-linalg = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-namespace = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-namespace-impls = { "version" = "=7.0.0-beta.12", default-features = false, "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-table = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-testing = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-datafusion = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-encoding = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } +lance-arrow = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" } ahash = "0.8" # Note that this one does not include pyarrow arrow = { version = "58.0.0", optional = false } diff --git a/docs/src/js/classes/Table.md b/docs/src/js/classes/Table.md index ca940e819..45fa13362 100644 --- a/docs/src/js/classes/Table.md +++ b/docs/src/js/classes/Table.md @@ -690,6 +690,49 @@ of the given query *** +### setLsmWriteSpec() + +```ts +abstract setLsmWriteSpec(spec): Promise +``` + +Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL +LSM-style write path for future `mergeInsert` calls. + +`LsmWriteSpec` chooses one of three sharding strategies via `specType`: + +- `"bucket"` — hash-bucket writes by the single-column unenforced primary + key (`column` and `numBuckets` required). +- `"identity"` — shard by the raw value of a scalar `column`. +- `"unsharded"` — route every write to a single shard. + +All variants require the table to have an unenforced primary key +([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally +requires it to be the single column being bucketed. + +#### Parameters + +* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md) + The sharding spec to install. + +#### Returns + +`Promise`<`void`> + +#### Example + +```ts +await table.setUnenforcedPrimaryKey("id"); +await table.setLsmWriteSpec({ + specType: "bucket", + column: "id", + numBuckets: 16, + maintainedIndexes: ["id_idx"], +}); +``` + +*** + ### setUnenforcedPrimaryKey() ```ts @@ -818,6 +861,23 @@ Return the table as an arrow table *** +### unsetLsmWriteSpec() + +```ts +abstract unsetLsmWriteSpec(): Promise +``` + +Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard +`mergeInsert` write path. + +Errors if no spec is currently set. + +#### Returns + +`Promise`<`void`> + +*** + ### update() #### update(opts) diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index fd2def7ab..0f582b10f 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -80,6 +80,7 @@ - [IvfRqOptions](interfaces/IvfRqOptions.md) - [ListNamespacesOptions](interfaces/ListNamespacesOptions.md) - [ListNamespacesResponse](interfaces/ListNamespacesResponse.md) +- [LsmWriteSpec](interfaces/LsmWriteSpec.md) - [MergeResult](interfaces/MergeResult.md) - [OpenTableOptions](interfaces/OpenTableOptions.md) - [OptimizeOptions](interfaces/OptimizeOptions.md) diff --git a/docs/src/js/interfaces/LsmWriteSpec.md b/docs/src/js/interfaces/LsmWriteSpec.md new file mode 100644 index 000000000..017e819dc --- /dev/null +++ b/docs/src/js/interfaces/LsmWriteSpec.md @@ -0,0 +1,64 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / LsmWriteSpec + +# Interface: LsmWriteSpec + +Specification selecting Lance's MemWAL LSM-style write path for +`mergeInsert`. + +`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`, +`column` and `numBuckets` are required; for `"identity"`, `column` is +required. + +## Properties + +### column? + +```ts +optional column: string; +``` + +Bucket and identity variants: the sharding column. + +*** + +### maintainedIndexes? + +```ts +optional maintainedIndexes: string[]; +``` + +Names of indexes the MemWAL should keep up to date during writes. + +*** + +### numBuckets? + +```ts +optional numBuckets: number; +``` + +Bucket variant: the number of buckets, in `[1, 1024]`. + +*** + +### specType + +```ts +specType: "bucket" | "identity" | "unsharded"; +``` + +One of `"bucket"`, `"identity"`, or `"unsharded"`. + +*** + +### writerConfigDefaults? + +```ts +optional writerConfigDefaults: Record; +``` + +Default `ShardWriter` configuration recorded in the MemWAL index. diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 882733df6..23f65da4e 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -2397,3 +2397,81 @@ describe("setUnenforcedPrimaryKey", () => { await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow(); }); }); + +describe("setLsmWriteSpec / unsetLsmWriteSpec", () => { + let tmpDir: tmp.DirResult; + + beforeEach(() => { + tmpDir = tmp.dirSync({ unsafeCleanup: true }); + }); + afterEach(() => tmpDir.removeCallback()); + + async function makeTable(conn: Connection): Promise { + return await conn.createEmptyTable( + "t", + new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]), + ); + } + + it("installs and removes a bucket spec", async () => { + const conn = await connect(tmpDir.name); + const table = await makeTable(conn); + + await table.setUnenforcedPrimaryKey("id"); + await table.setLsmWriteSpec({ + specType: "bucket", + column: "id", + numBuckets: 4, + }); + await table.unsetLsmWriteSpec(); + // A second unset errors — there is no spec left to remove. + await expect(table.unsetLsmWriteSpec()).rejects.toThrow(); + // A fresh spec can be installed after unset. + await table.setLsmWriteSpec({ + specType: "bucket", + column: "id", + numBuckets: 8, + }); + }); + + it("installs an unsharded spec", async () => { + const conn = await connect(tmpDir.name); + const table = await makeTable(conn); + + await table.setUnenforcedPrimaryKey("id"); + await table.setLsmWriteSpec({ specType: "unsharded" }); + await table.unsetLsmWriteSpec(); + }); + + it("installs an identity spec", async () => { + const conn = await connect(tmpDir.name); + const table = await makeTable(conn); + + await table.setUnenforcedPrimaryKey("id"); + await table.setLsmWriteSpec({ specType: "identity", column: "id" }); + await table.unsetLsmWriteSpec(); + }); + + it("rejects an invalid spec", async () => { + const conn = await connect(tmpDir.name); + const table = await makeTable(conn); + + await table.setUnenforcedPrimaryKey("id"); + // num_buckets out of range. + await expect( + table.setLsmWriteSpec({ + specType: "bucket", + column: "id", + numBuckets: 0, + }), + ).rejects.toThrow(); + // Column mismatch. + await expect( + table.setLsmWriteSpec({ + specType: "bucket", + column: "missing", + numBuckets: 4, + }), + ).rejects.toThrow(); + }); +}); diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index f1a36722f..fc6d7777a 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -113,6 +113,7 @@ export { UpdateOptions, OptimizeOptions, Version, + LsmWriteSpec, ColumnAlteration, } from "./table"; diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 56cc127ce..59816a413 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -106,6 +106,27 @@ export interface Version { metadata: Record; } +/** + * Specification selecting Lance's MemWAL LSM-style write path for + * `mergeInsert`. + * + * `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`, + * `column` and `numBuckets` are required; for `"identity"`, `column` is + * required. + */ +export interface LsmWriteSpec { + /** One of `"bucket"`, `"identity"`, or `"unsharded"`. */ + specType: "bucket" | "identity" | "unsharded"; + /** Bucket and identity variants: the sharding column. */ + column?: string; + /** Bucket variant: the number of buckets, in `[1, 1024]`. */ + numBuckets?: number; + /** Names of indexes the MemWAL should keep up to date during writes. */ + maintainedIndexes?: string[]; + /** Default `ShardWriter` configuration recorded in the MemWAL index. */ + writerConfigDefaults?: Record; +} + /** * A Table is a collection of Records in a LanceDB Database. * @@ -461,6 +482,42 @@ export abstract class Table { * @returns {Promise} */ abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise; + /** + * Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL + * LSM-style write path for future `mergeInsert` calls. + * + * `LsmWriteSpec` chooses one of three sharding strategies via `specType`: + * + * - `"bucket"` — hash-bucket writes by the single-column unenforced primary + * key (`column` and `numBuckets` required). + * - `"identity"` — shard by the raw value of a scalar `column`. + * - `"unsharded"` — route every write to a single shard. + * + * All variants require the table to have an unenforced primary key + * ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally + * requires it to be the single column being bucketed. + * @param {LsmWriteSpec} spec The sharding spec to install. + * @returns {Promise} + * @example + * ```ts + * await table.setUnenforcedPrimaryKey("id"); + * await table.setLsmWriteSpec({ + * specType: "bucket", + * column: "id", + * numBuckets: 16, + * maintainedIndexes: ["id_idx"], + * }); + * ``` + */ + abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise; + /** + * Remove the {@link LsmWriteSpec} from this table, reverting to the standard + * `mergeInsert` write path. + * + * Errors if no spec is currently set. + * @returns {Promise} + */ + abstract unsetLsmWriteSpec(): Promise; /** Retrieve the version of the table */ abstract version(): Promise; @@ -914,6 +971,14 @@ export class LocalTable extends Table { return await this.inner.setUnenforcedPrimaryKey(cols); } + async setLsmWriteSpec(spec: LsmWriteSpec): Promise { + return await this.inner.setLsmWriteSpec(spec); + } + + async unsetLsmWriteSpec(): Promise { + return await this.inner.unsetLsmWriteSpec(); + } + async version(): Promise { return await this.inner.version(); } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 94e7bf630..29bf7bba4 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -352,6 +352,23 @@ impl Table { .default_error() } + #[napi(catch_unwind)] + pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> { + let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?; + self.inner_ref()? + .set_lsm_write_spec(native_spec) + .await + .default_error() + } + + #[napi(catch_unwind)] + pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> { + self.inner_ref()? + .unset_lsm_write_spec() + .await + .default_error() + } + #[napi(catch_unwind)] pub async fn version(&self) -> napi::Result { self.inner_ref()? @@ -546,6 +563,63 @@ impl From for IndexConfig { } } +/// Specification selecting Lance's MemWAL LSM-style write path for +/// `mergeInsert`. +/// +/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For +/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`, +/// `column` is required. +#[napi(object)] +#[derive(Clone, Debug)] +pub struct LsmWriteSpec { + /// One of `"bucket"`, `"identity"`, or `"unsharded"`. + pub spec_type: String, + /// Bucket and identity variants: the sharding column. + pub column: Option, + /// Bucket variant: the number of buckets, in `[1, 1024]`. + pub num_buckets: Option, + /// Names of indexes the MemWAL should keep up to date during writes. + pub maintained_indexes: Option>, + /// Default `ShardWriter` configuration recorded in the MemWAL index. + pub writer_config_defaults: Option>, +} + +impl TryFrom for lancedb::table::LsmWriteSpec { + type Error = napi::Error; + + fn try_from(value: LsmWriteSpec) -> napi::Result { + let maintained = value.maintained_indexes.unwrap_or_default(); + let writer_config_defaults = value.writer_config_defaults.unwrap_or_default(); + let spec = match value.spec_type.as_str() { + "bucket" => { + let column = value.column.ok_or_else(|| { + napi::Error::from_reason("LsmWriteSpec bucket requires `column`") + })?; + let num_buckets = value.num_buckets.ok_or_else(|| { + napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`") + })?; + Self::bucket(column, num_buckets) + } + "identity" => { + let column = value.column.ok_or_else(|| { + napi::Error::from_reason("LsmWriteSpec identity requires `column`") + })?; + Self::identity(column) + } + "unsharded" => Self::unsharded(), + other => { + return Err(napi::Error::from_reason(format!( + "LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'", + other + ))); + } + }; + Ok(spec + .with_maintained_indexes(maintained) + .with_writer_config_defaults(writer_config_defaults)) + } +} + /// Statistics about a compaction operation. #[napi(object)] #[derive(Clone, Debug)] diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 2ba2b6fb8..db28e0fc8 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -218,6 +218,8 @@ class Table: async def initial_storage_options(self) -> Optional[Dict[str, str]]: ... async def latest_storage_options(self) -> Optional[Dict[str, str]]: ... async def set_unenforced_primary_key(self, columns: List[str]) -> None: ... + async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ... + async def unset_lsm_write_spec(self) -> None: ... @property def tags(self) -> Tags: ... def query(self) -> Query: ... @@ -419,6 +421,37 @@ class MergeResult: num_deleted_rows: int num_attempts: int +class LsmWriteSpec: + """Specification selecting Lance's MemWAL LSM-style write path for + `merge_insert`.""" + + @staticmethod + def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ... + @staticmethod + def identity(column: str) -> "LsmWriteSpec": ... + @staticmethod + def unsharded() -> "LsmWriteSpec": ... + def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec": + """Return a copy of this spec asking the MemWAL to keep the named + indexes up to date as rows are appended.""" + ... + def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec": + """Return a copy of this spec recording the given default + `ShardWriter` configuration in the MemWAL index.""" + ... + @property + def spec_type(self) -> str: + """One of 'bucket', 'identity', or 'unsharded'.""" + ... + @property + def column(self) -> Optional[str]: ... + @property + def num_buckets(self) -> Optional[int]: ... + @property + def maintained_indexes(self) -> List[str]: ... + @property + def writer_config_defaults(self) -> Dict[str, str]: ... + class AddColumnsResult: version: int diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 64aca21f1..6f81d1628 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -14,6 +14,7 @@ from lancedb._lancedb import ( DeleteResult, DropColumnsResult, IndexConfig, + LsmWriteSpec, MergeResult, UpdateResult, ) @@ -659,6 +660,14 @@ class RemoteTable(Table): """Not supported on LanceDB Cloud.""" return LOOP.run(self._table.set_unenforced_primary_key(columns)) + def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None: + """Not supported on LanceDB Cloud.""" + return LOOP.run(self._table.set_lsm_write_spec(spec)) + + def unset_lsm_write_spec(self) -> None: + """Not supported on LanceDB Cloud.""" + return LOOP.run(self._table.unset_lsm_write_spec()) + def drop_index(self, index_name: str): return LOOP.run(self._table.drop_index(index_name)) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index bc3b22b1c..6c4b3eff9 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -154,6 +154,7 @@ if TYPE_CHECKING: AlterColumnsResult, DeleteResult, DropColumnsResult, + LsmWriteSpec, MergeResult, UpdateResult, ) @@ -3268,6 +3269,16 @@ class LanceTable(Table): [`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key].""" return LOOP.run(self._table.set_unenforced_primary_key(columns)) + def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None: + """Install an LsmWriteSpec. See + [`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec].""" + return LOOP.run(self._table.set_lsm_write_spec(spec)) + + def unset_lsm_write_spec(self) -> None: + """Remove the LsmWriteSpec. See + [`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec].""" + return LOOP.run(self._table.unset_lsm_write_spec()) + def uses_v2_manifest_paths(self) -> bool: """ Check if the table is using the new v2 manifest paths. @@ -3838,6 +3849,44 @@ class AsyncTable: columns = list(columns) await self._inner.set_unenforced_primary_key(columns) + async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None: + """Install an LsmWriteSpec on this table. + + The spec selects Lance's MemWAL LSM-style write path for future + `merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding + strategies: + + - ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by + the single-column unenforced primary key. + - ``LsmWriteSpec.identity(column)`` — shard by the raw value of a + scalar column. + - ``LsmWriteSpec.unsharded()`` — route every write to a single shard. + + All variants require the table to have an unenforced primary key set + via [`set_unenforced_primary_key`]; bucket sharding additionally + requires it to be the single column being bucketed. + + Parameters + ---------- + spec : LsmWriteSpec + The sharding spec to install. + + Examples + -------- + >>> from lancedb._lancedb import LsmWriteSpec + >>> # table.set_unenforced_primary_key("id") + >>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16)) + """ + await self._inner.set_lsm_write_spec(spec) + + async def unset_lsm_write_spec(self) -> None: + """Remove the LsmWriteSpec from this table. + + Reverts to the standard `merge_insert` write path. Errors if no spec + is currently set. + """ + await self._inner.unset_lsm_write_spec() + @property def name(self) -> str: """The name of the table.""" diff --git a/python/python/tests/test_lsm_write_spec.py b/python/python/tests/test_lsm_write_spec.py new file mode 100644 index 000000000..b81153994 --- /dev/null +++ b/python/python/tests/test_lsm_write_spec.py @@ -0,0 +1,149 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +"""Tests for installing and clearing an LsmWriteSpec via +`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`. +""" + +from datetime import timedelta + +import lancedb +import pyarrow as pa +import pytest +from lancedb._lancedb import LsmWriteSpec + +SCHEMA = pa.schema( + [ + pa.field("id", pa.utf8(), nullable=False), + pa.field("v", pa.int32(), nullable=False), + ] +) + + +def _batch(ids, vs): + return pa.RecordBatch.from_arrays( + [pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())], + schema=SCHEMA, + ) + + +def _reader(ids, vs): + return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)]) + + +def _make_table(tmp_path): + db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0)) + table = db.create_table("t", _reader(["seed"], [0])) + return db, table + + +def test_set_lsm_write_spec_validates(tmp_path): + _db, table = _make_table(tmp_path) + + # No PK set yet. + with pytest.raises(Exception, match="primary key"): + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4)) + + table.set_unenforced_primary_key("id") + + # Column mismatch. + with pytest.raises(Exception, match="match"): + table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4)) + + # Out-of-range num_buckets. + with pytest.raises(Exception, match="num_buckets"): + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0)) + with pytest.raises(Exception, match="num_buckets"): + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025)) + + # Happy path then mutation rejected. + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4)) + with pytest.raises(Exception, match="mutation"): + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8)) + + +def test_unset_lsm_write_spec(tmp_path): + _db, table = _make_table(tmp_path) + + # unset errors when no spec is set. + with pytest.raises(Exception, match="no LSM write spec"): + table.unset_lsm_write_spec() + + # Install a spec, then remove it; afterwards a fresh spec can be set. + table.set_unenforced_primary_key("id") + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4)) + table.unset_lsm_write_spec() + # A second unset errors — there is no spec left to remove. + with pytest.raises(Exception, match="no LSM write spec"): + table.unset_lsm_write_spec() + table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8)) + + +def test_set_unsharded_spec(tmp_path): + _db, table = _make_table(tmp_path) + # Lance MemWAL still requires a primary key on the dataset; Unsharded + # just skips per-row hashing. + table.set_unenforced_primary_key("id") + table.set_lsm_write_spec(LsmWriteSpec.unsharded()) + table.unset_lsm_write_spec() + + +def test_lsm_write_spec_repr(): + s = LsmWriteSpec.bucket("id", 4) + assert s.spec_type == "bucket" + assert s.column == "id" + assert s.num_buckets == 4 + assert s.maintained_indexes == [] + assert "bucket" in repr(s) + assert "id" in repr(s) + assert "4" in repr(s) + + u = LsmWriteSpec.unsharded() + assert u.spec_type == "unsharded" + assert u.column is None + assert u.num_buckets is None + assert "unsharded" in repr(u) + + +def test_lsm_write_spec_with_maintained_indexes(): + s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"]) + assert s.maintained_indexes == ["idx_a", "idx_b"] + + +@pytest.mark.asyncio +async def test_async_set_unset_lsm_write_spec(tmp_path): + db = await lancedb.connect_async( + tmp_path, read_consistency_interval=timedelta(seconds=0) + ) + table = await db.create_table( + "t", + pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]), + ) + + await table.set_unenforced_primary_key("id") + await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4)) + await table.unset_lsm_write_spec() + # A second unset errors. + with pytest.raises(Exception, match="no LSM write spec"): + await table.unset_lsm_write_spec() + + +def test_set_identity_spec(tmp_path): + _db, table = _make_table(tmp_path) + # Identity sharding still requires an unenforced primary key on the + # table; it shards by the raw value of the given column. + table.set_unenforced_primary_key("id") + table.set_lsm_write_spec(LsmWriteSpec.identity("v")) + table.unset_lsm_write_spec() + + +def test_lsm_write_spec_identity_and_writer_config_defaults(): + s = LsmWriteSpec.identity("v") + assert s.spec_type == "identity" + assert s.column == "v" + assert s.num_buckets is None + assert "identity" in repr(s) + + s = s.with_writer_config_defaults({"durable_write": "false"}) + assert s.writer_config_defaults == {"durable_write": "false"} + assert "durable_write" in repr(s) diff --git a/python/src/lib.rs b/python/src/lib.rs index d0e933dba..b37a88226 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,8 +15,8 @@ use pyo3::{ use query::{FTSQuery, HybridQuery, Query, VectorQuery}; use session::Session; use table::{ - AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult, - Table, UpdateResult, + AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec, + MergeResult, Table, UpdateResult, }; pub mod arrow; @@ -52,6 +52,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/table.rs b/python/src/table.rs index 0c6298779..546bec555 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -171,6 +171,141 @@ impl From for MergeResult { } } +/// Specification selecting Lance's MemWAL LSM-style write path for +/// `merge_insert`. +/// +/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()` +/// classmethods, then optionally chain `with_maintained_indexes(...)` and +/// `with_writer_config_defaults(...)`. +#[pyclass(from_py_object)] +#[derive(Clone, Debug)] +pub struct LsmWriteSpec { + inner: lancedb::table::LsmWriteSpec, +} + +#[pymethods] +impl LsmWriteSpec { + /// Hash-bucket sharding by the unenforced primary key column. + #[staticmethod] + pub fn bucket(column: String, num_buckets: u32) -> Self { + Self { + inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets), + } + } + + /// Identity sharding — shard by the raw value of `column`. + #[staticmethod] + pub fn identity(column: String) -> Self { + Self { + inner: lancedb::table::LsmWriteSpec::identity(column), + } + } + + /// No sharding — every `merge_insert` call writes to a single + /// MemWAL shard. + #[staticmethod] + pub fn unsharded() -> Self { + Self { + inner: lancedb::table::LsmWriteSpec::unsharded(), + } + } + + /// Replace the list of indexes the MemWAL should keep up to date as + /// rows are appended. Each name must reference an index that + /// already exists on the table at the time `set_lsm_write_spec` + /// is called. + pub fn with_maintained_indexes(&self, indexes: Vec) -> Self { + Self { + inner: self.inner.clone().with_maintained_indexes(indexes), + } + } + + /// Replace the default `ShardWriter` configuration recorded in the + /// MemWAL index, so every writer starts from the same defaults. + pub fn with_writer_config_defaults(&self, defaults: HashMap) -> Self { + Self { + inner: self.inner.clone().with_writer_config_defaults(defaults), + } + } + + pub fn __repr__(&self) -> String { + match &self.inner { + lancedb::table::LsmWriteSpec::Bucket { + column, + num_buckets, + maintained_indexes, + writer_config_defaults, + } => format!( + "LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})", + column, num_buckets, maintained_indexes, writer_config_defaults, + ), + lancedb::table::LsmWriteSpec::Identity { + column, + maintained_indexes, + writer_config_defaults, + } => format!( + "LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})", + column, maintained_indexes, writer_config_defaults, + ), + lancedb::table::LsmWriteSpec::Unsharded { + maintained_indexes, + writer_config_defaults, + } => format!( + "LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})", + maintained_indexes, writer_config_defaults, + ), + } + } + + /// Discriminator string identifying the variant ("bucket", "identity", + /// or "unsharded"). + #[getter] + pub fn spec_type(&self) -> &'static str { + match &self.inner { + lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket", + lancedb::table::LsmWriteSpec::Identity { .. } => "identity", + lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded", + } + } + + /// Bucket and identity variants: the sharding column. `None` for unsharded. + #[getter] + pub fn column(&self) -> Option { + match &self.inner { + lancedb::table::LsmWriteSpec::Bucket { column, .. } + | lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()), + lancedb::table::LsmWriteSpec::Unsharded { .. } => None, + } + } + + /// Bucket variant only: the number of buckets. + #[getter] + pub fn num_buckets(&self) -> Option { + match &self.inner { + lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets), + _ => None, + } + } + + /// Names of indexes the MemWAL should keep up to date during writes. + #[getter] + pub fn maintained_indexes(&self) -> Vec { + self.inner.maintained_indexes().to_vec() + } + + /// Default `ShardWriter` configuration recorded by this spec. + #[getter] + pub fn writer_config_defaults(&self) -> HashMap { + self.inner.writer_config_defaults().clone() + } +} + +impl From for lancedb::table::LsmWriteSpec { + fn from(spec: LsmWriteSpec) -> Self { + spec.inner + } +} + #[pyclass(get_all, from_py_object)] #[derive(Clone, Debug)] pub struct AddColumnsResult { @@ -818,6 +953,24 @@ impl Table { }) } + pub fn set_lsm_write_spec<'a>( + self_: PyRef<'a, Self>, + spec: LsmWriteSpec, + ) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + let native_spec = lancedb::table::LsmWriteSpec::from(spec); + future_into_py(self_.py(), async move { + inner.set_lsm_write_spec(native_spec).await.infer_error() + }) + } + + pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + inner.unset_lsm_write_spec().await.infer_error() + }) + } + pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult> { let inner = self_.inner_ref()?.clone(); future_into_py(self_.py(), async move { diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index ad9a7a303..2d8f626cc 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -1673,6 +1673,18 @@ impl BaseTable for RemoteTable { }) } + async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> { + Err(Error::NotSupported { + message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(), + }) + } + + async fn unset_lsm_write_spec(&self) -> Result<()> { + Err(Error::NotSupported { + message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(), + }) + } + async fn tags(&self) -> Result> { Ok(Box::new(RemoteTags { inner: self })) } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index d32403a4a..9398fb4e1 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -273,6 +273,176 @@ pub trait Tags: Send + Sync { pub use self::merge::MergeResult; +/// Specification selecting Lance's MemWAL LSM-style write path for +/// `merge_insert`. +/// +/// Construct via [`LsmWriteSpec::bucket`], [`LsmWriteSpec::identity`], or +/// [`LsmWriteSpec::unsharded`], then optionally chain +/// [`LsmWriteSpec::with_maintained_indexes`] (indexes the MemWAL keeps up to +/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default +/// `ShardWriter` configuration recorded in the MemWAL index). +/// +/// All variants require the table to have an unenforced primary key. +/// +/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with +/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch +/// onto the MemWAL writer is a follow-up. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum LsmWriteSpec { + /// Hash-bucket sharding by the unenforced primary key column. + /// + /// `column` must equal the table's currently-set single-column + /// unenforced primary key. `num_buckets` must be in `[1, 1024]`. + /// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's + /// `bucket(column, num_buckets)` value is stable across processes. + Bucket { + column: String, + num_buckets: u32, + /// Names of indexes (already created on the table) that the + /// MemWAL should maintain in-memory as rows are appended. + maintained_indexes: Vec, + /// Default `ShardWriter` configuration recorded in the MemWAL index. + writer_config_defaults: HashMap, + }, + /// Identity sharding — shard by the raw value of `column`. + /// + /// Use this when the data is already partitioned by `column`; each + /// distinct value of `column` becomes its own shard. + Identity { + column: String, + /// Names of indexes (already created on the table) that the + /// MemWAL should maintain in-memory as rows are appended. + maintained_indexes: Vec, + /// Default `ShardWriter` configuration recorded in the MemWAL index. + writer_config_defaults: HashMap, + }, + /// No sharding — every `merge_insert` call writes to a single MemWAL shard. + Unsharded { + /// Names of indexes (already created on the table) that the + /// MemWAL should maintain in-memory as rows are appended. + maintained_indexes: Vec, + /// Default `ShardWriter` configuration recorded in the MemWAL index. + writer_config_defaults: HashMap, + }, +} + +impl LsmWriteSpec { + /// Construct a hash-bucket sharding spec with no maintained indexes. + pub fn bucket(column: impl Into, num_buckets: u32) -> Self { + Self::Bucket { + column: column.into(), + num_buckets, + maintained_indexes: Vec::new(), + writer_config_defaults: HashMap::new(), + } + } + + /// Construct an identity-sharding spec (shard by the raw value of + /// `column`) with no maintained indexes. + pub fn identity(column: impl Into) -> Self { + Self::Identity { + column: column.into(), + maintained_indexes: Vec::new(), + writer_config_defaults: HashMap::new(), + } + } + + /// Construct an unsharded spec with no maintained indexes. + pub fn unsharded() -> Self { + Self::Unsharded { + maintained_indexes: Vec::new(), + writer_config_defaults: HashMap::new(), + } + } + + /// Replace the list of indexes the MemWAL should keep up to date as + /// rows are appended. Each name must reference an index that already + /// exists on the table at the time `set_lsm_write_spec` is called. + pub fn with_maintained_indexes(mut self, indexes: I) -> Self + where + I: IntoIterator, + S: Into, + { + let v: Vec = indexes.into_iter().map(Into::into).collect(); + match &mut self { + Self::Bucket { + maintained_indexes, .. + } + | Self::Identity { + maintained_indexes, .. + } + | Self::Unsharded { + maintained_indexes, .. + } => *maintained_indexes = v, + } + self + } + + /// Replace the default `ShardWriter` configuration recorded in the MemWAL + /// index, so every writer starts from the same defaults. Keys are + /// `ShardWriter` config field names (`Duration` knobs use a `_ms` suffix); + /// values are their string encodings. + pub fn with_writer_config_defaults(mut self, defaults: I) -> Self + where + I: IntoIterator, + K: Into, + V: Into, + { + let m: HashMap = defaults + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + match &mut self { + Self::Bucket { + writer_config_defaults, + .. + } + | Self::Identity { + writer_config_defaults, + .. + } + | Self::Unsharded { + writer_config_defaults, + .. + } => *writer_config_defaults = m, + } + self + } + + /// Borrow the list of index names this spec asks MemWAL to maintain. + pub fn maintained_indexes(&self) -> &[String] { + match self { + Self::Bucket { + maintained_indexes, .. + } + | Self::Identity { + maintained_indexes, .. + } + | Self::Unsharded { + maintained_indexes, .. + } => maintained_indexes, + } + } + + /// Borrow the default `ShardWriter` configuration recorded by this spec. + pub fn writer_config_defaults(&self) -> &HashMap { + match self { + Self::Bucket { + writer_config_defaults, + .. + } + | Self::Identity { + writer_config_defaults, + .. + } + | Self::Unsharded { + writer_config_defaults, + .. + } => writer_config_defaults, + } + } +} + /// A trait for anything "table-like". This is used for both native tables (which target /// Lance datasets) and remote tables (which target LanceDB cloud) /// @@ -360,6 +530,29 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { message: "set_unenforced_primary_key is not supported on this table type".into(), }) } + /// Install an [`LsmWriteSpec`] on this table. + /// + /// The spec selects Lance's MemWAL LSM-style write path for future + /// `merge_insert` calls. + /// + /// The default implementation returns `NotSupported`. Implementations + /// that support the MemWAL LSM write path must override this. + async fn set_lsm_write_spec(&self, _spec: LsmWriteSpec) -> Result<()> { + Err(Error::NotSupported { + message: "set_lsm_write_spec is not supported on this table type".into(), + }) + } + /// Remove the [`LsmWriteSpec`] from this table. + /// + /// This is a no-op if no spec is currently set. + /// + /// The default implementation returns `NotSupported`. Implementations + /// that support the MemWAL LSM write path must override this. + async fn unset_lsm_write_spec(&self) -> Result<()> { + Err(Error::NotSupported { + message: "unset_lsm_write_spec is not supported on this table type".into(), + }) + } /// Gets the table tag manager. async fn tags(&self) -> Result>; /// Optimize the dataset. @@ -1100,6 +1293,46 @@ impl Table { self.inner.set_unenforced_primary_key(&borrowed).await } + /// Install an [`LsmWriteSpec`] on this table, selecting Lance's MemWAL + /// LSM-style write path for future `merge_insert` calls. + /// + /// [`LsmWriteSpec`] chooses one of three sharding strategies: + /// + /// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column + /// unenforced primary key. + /// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column. + /// - [`LsmWriteSpec::unsharded`] — route every write to a single shard. + /// + /// All variants require the table to have an unenforced primary key + /// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally + /// requires it to be the single column being bucketed. + /// + /// # Example + /// + /// ``` + /// # use lancedb::table::{LsmWriteSpec, Table}; + /// # async fn example(table: &Table) -> Result<(), Box> { + /// table.set_unenforced_primary_key(["id"]).await?; + /// table + /// .set_lsm_write_spec( + /// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]), + /// ) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> { + self.inner.set_lsm_write_spec(spec).await + } + + /// Remove the [`LsmWriteSpec`] from this table, reverting to the standard + /// `merge_insert` write path. + /// + /// Errors if no spec is currently set. + pub async fn unset_lsm_write_spec(&self) -> Result<()> { + self.inner.unset_lsm_write_spec().await + } + /// Retrieve the version of the table /// /// LanceDb supports versioning. Every operation that modifies the table increases @@ -2510,6 +2743,14 @@ impl BaseTable for NativeTable { primary_key::set_unenforced_primary_key(self, columns).await } + async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> { + merge::lsm::set_lsm_write_spec(self, spec).await + } + + async fn unset_lsm_write_spec(&self) -> Result<()> { + merge::lsm::unset_lsm_write_spec(self).await + } + /// Delete rows from the table async fn delete(&self, predicate: &str) -> Result { // Delegate to the submodule implementation @@ -4052,6 +4293,249 @@ mod tests { assert_eq!(pk[0].name, "id"); } + #[tokio::test] + async fn test_set_lsm_write_spec() { + use arrow_array::StringArray; + use lance::dataset::mem_wal::DatasetMemWalExt; + + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + let reader: Box = + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())); + + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn.create_table("t", reader).execute().await.unwrap(); + + // Reject when no PK is set. + let err = table + .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4)) + .await + .expect_err("should reject without PK"); + assert!(matches!(err, Error::Lance { .. }), "got {:?}", err); + + // Set PK, then a mismatched column on the spec must be rejected. + table.set_unenforced_primary_key(["id"]).await.unwrap(); + let err = table + .set_lsm_write_spec(LsmWriteSpec::bucket("name", 4)) + .await + .expect_err("should reject column != PK"); + assert!(matches!(err, Error::Lance { .. }), "got {:?}", err); + + // Reject num_buckets out of range. + for bad in [0u32, 1025] { + let err = table + .set_lsm_write_spec(LsmWriteSpec::bucket("id", bad)) + .await + .expect_err("should reject"); + assert!(matches!(err, Error::Lance { .. }), "got {:?}", err); + } + + // Happy path: install spec; verify MemWAL details record it. + table + .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4)) + .await + .unwrap(); + + let native_tbl = table.as_native().unwrap(); + let dataset = native_tbl.dataset.get().await.unwrap(); + let details = dataset + .mem_wal_index_details() + .await + .unwrap() + .expect("MemWAL index should be initialized"); + assert_eq!(details.num_shards, 4); + assert_eq!(details.sharding_specs.len(), 1); + let installed = &details.sharding_specs[0]; + assert_eq!(installed.fields.len(), 1); + let f = &installed.fields[0]; + assert_eq!(f.transform.as_deref(), Some("bucket")); + assert_eq!( + f.parameters.get("num_buckets").map(String::as_str), + Some("4") + ); + // Bucket parameters must hold only `num_buckets`. + assert_eq!(f.parameters.len(), 1); + + // Mutation rejected. + let err = table + .set_lsm_write_spec(LsmWriteSpec::bucket("id", 8)) + .await + .expect_err("mutation should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + } + + #[tokio::test] + async fn test_set_lsm_write_spec_unsharded() { + use lance::dataset::mem_wal::DatasetMemWalExt; + + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int64Array::from(vec![1]))], + ) + .unwrap(); + let reader: Box = + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())); + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn.create_table("t", reader).execute().await.unwrap(); + + // Lance's MemWAL still requires *some* unenforced primary key on + // the dataset; Unsharded just skips the per-row hashing step. + table.set_unenforced_primary_key(["id"]).await.unwrap(); + table + .set_lsm_write_spec(LsmWriteSpec::unsharded()) + .await + .unwrap(); + + let dataset = table.as_native().unwrap().dataset.get().await.unwrap(); + let details = dataset + .mem_wal_index_details() + .await + .unwrap() + .expect("MemWAL index should be initialized"); + assert_eq!(details.num_shards, 1); + assert_eq!(details.sharding_specs.len(), 1); + let f = &details.sharding_specs[0].fields[0]; + assert_eq!(f.transform.as_deref(), Some("unsharded")); + assert!(f.source_ids.is_empty()); + } + + #[tokio::test] + async fn test_set_lsm_write_spec_identity() { + use lance::dataset::mem_wal::DatasetMemWalExt; + + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("region", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + let reader: Box = + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())); + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn.create_table("t", reader).execute().await.unwrap(); + + table.set_unenforced_primary_key(["id"]).await.unwrap(); + table + .set_lsm_write_spec( + LsmWriteSpec::identity("region") + .with_writer_config_defaults([("durable_write", "false")]), + ) + .await + .unwrap(); + + let dataset = table.as_native().unwrap().dataset.get().await.unwrap(); + let details = dataset + .mem_wal_index_details() + .await + .unwrap() + .expect("MemWAL index should be initialized"); + // Identity sharding records an open-ended shard count. + assert_eq!(details.num_shards, 0); + assert_eq!(details.sharding_specs.len(), 1); + let f = &details.sharding_specs[0].fields[0]; + assert_eq!(f.transform.as_deref(), Some("identity")); + // Writer config defaults round-trip into the MemWAL index. + assert_eq!( + details + .writer_config_defaults + .get("durable_write") + .map(String::as_str), + Some("false") + ); + } + + #[tokio::test] + async fn test_unset_lsm_write_spec() { + use lance::dataset::mem_wal::DatasetMemWalExt; + + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int64Array::from(vec![1]))], + ) + .unwrap(); + let reader: Box = + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())); + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn.create_table("t", reader).execute().await.unwrap(); + + // unset errors when no spec is set. + table.unset_lsm_write_spec().await.unwrap_err(); + + // Install a spec, then unset it. + table.set_unenforced_primary_key(["id"]).await.unwrap(); + table + .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4)) + .await + .unwrap(); + { + let dataset = table.as_native().unwrap().dataset.get().await.unwrap(); + assert!(dataset.mem_wal_index_details().await.unwrap().is_some()); + } + + table.unset_lsm_write_spec().await.unwrap(); + { + let dataset = table.as_native().unwrap().dataset.get().await.unwrap(); + assert!(dataset.mem_wal_index_details().await.unwrap().is_none()); + } + + // A second unset errors; a fresh spec can still be installed afterwards. + table.unset_lsm_write_spec().await.unwrap_err(); + table + .set_lsm_write_spec(LsmWriteSpec::bucket("id", 8)) + .await + .unwrap(); + { + let dataset = table.as_native().unwrap().dataset.get().await.unwrap(); + assert!(dataset.mem_wal_index_details().await.unwrap().is_some()); + } + } + #[tokio::test] pub async fn test_stats() { let tmp_dir = tempdir().unwrap(); diff --git a/rust/lancedb/src/table/merge.rs b/rust/lancedb/src/table/merge.rs index d8805acb8..def78aa4f 100644 --- a/rust/lancedb/src/table/merge.rs +++ b/rust/lancedb/src/table/merge.rs @@ -16,6 +16,8 @@ use crate::error::{Error, Result}; use super::{BaseTable, NativeTable}; +pub(crate) mod lsm; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct MergeResult { // The commit version associated with the operation. diff --git a/rust/lancedb/src/table/merge/lsm.rs b/rust/lancedb/src/table/merge/lsm.rs new file mode 100644 index 000000000..51d04f5e0 --- /dev/null +++ b/rust/lancedb/src/table/merge/lsm.rs @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! MemWAL LSM write-path spec management. +//! +//! [`set_lsm_write_spec`] installs a [`super::super::LsmWriteSpec`] on a +//! table, which selects Lance's MemWAL LSM-style write path for future +//! `merge_insert` calls. [`unset_lsm_write_spec`] removes it. The actual +//! `merge_insert` dispatch and writer are a follow-up. + +use lance::dataset::mem_wal::DatasetMemWalExt; +use lance::index::DatasetIndexExt; + +use crate::error::{Error, Result}; +use crate::table::{LsmWriteSpec, NativeTable}; + +// ============================================================================= +// set_lsm_write_spec +// ============================================================================= + +/// Install an [`LsmWriteSpec`] on the table. +/// +/// The bucket / unsharded sharding spec is constructed and validated by Lance's +/// [`InitializeMemWalBuilder`](lance::dataset::mem_wal::InitializeMemWalBuilder). +#[allow(clippy::redundant_pub_crate)] +pub(crate) async fn set_lsm_write_spec(table: &NativeTable, spec: LsmWriteSpec) -> Result<()> { + table.dataset.ensure_mutable()?; + + { + let dataset = table.dataset.get().await?; + if dataset.mem_wal_index_details().await?.is_some() { + return Err(Error::InvalidInput { + message: "set_lsm_write_spec: an LSM write spec is already set on this table; mutation is not supported".into(), + }); + } + } + + let mut dataset = (*table.dataset.get().await?).clone(); + let mut builder = dataset.initialize_mem_wal(); + let (maintained_indexes, writer_config_defaults) = match spec { + LsmWriteSpec::Bucket { + column, + num_buckets, + maintained_indexes, + writer_config_defaults, + } => { + builder = builder.bucket_sharding(column, num_buckets); + (maintained_indexes, writer_config_defaults) + } + LsmWriteSpec::Identity { + column, + maintained_indexes, + writer_config_defaults, + } => { + builder = builder.identity_sharding(column); + (maintained_indexes, writer_config_defaults) + } + LsmWriteSpec::Unsharded { + maintained_indexes, + writer_config_defaults, + } => { + builder = builder.unsharded(); + (maintained_indexes, writer_config_defaults) + } + }; + builder = builder.maintained_indexes(maintained_indexes); + for (key, value) in writer_config_defaults { + builder = builder.add_writer_config_default(key, value); + } + builder.execute().await?; + table.dataset.update(dataset); + Ok(()) +} + +// ============================================================================= +// unset_lsm_write_spec +// ============================================================================= + +/// Remove the [`LsmWriteSpec`] from the table by dropping the MemWAL index. +/// +/// Errors if no spec is currently set. +#[allow(clippy::redundant_pub_crate)] +pub(crate) async fn unset_lsm_write_spec(table: &NativeTable) -> Result<()> { + table.dataset.ensure_mutable()?; + + { + let dataset = table.dataset.get().await?; + if dataset.mem_wal_index_details().await?.is_none() { + return Err(Error::InvalidInput { + message: "unset_lsm_write_spec: no LSM write spec is set on this table".into(), + }); + } + } + + let mut dataset = (*table.dataset.get().await?).clone(); + dataset + .drop_index(lance_index::mem_wal::MEM_WAL_INDEX_NAME) + .await?; + table.dataset.update(dataset); + Ok(()) +}