mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-18 12:30:41 +00:00
feat: support setting LSM write spec for a table (#3396)
## Summary Split out from #3354 Adds `LsmWriteSpec` and `Table::set_lsm_write_spec` / `unset_lsm_write_spec` to install and clear the spec that selects Lance's MemWAL LSM-style write path for `merge_insert`. `LsmWriteSpec` offers three sharding strategies, all built on Lance's `InitializeMemWalBuilder`: - `LsmWriteSpec::bucket(column, num_buckets)` — hash-bucket sharding by the single-column unenforced primary key. - `LsmWriteSpec::identity(column)` — identity sharding by the raw value of a scalar column. - `LsmWriteSpec::unsharded()` — a single MemWAL shard. Each can be refined with `with_maintained_indexes(...)` (indexes the MemWAL keeps up to date as rows are appended) and `with_writer_config_defaults(...)` (default `ShardWriter` configuration recorded in the MemWAL index, so every writer starts from the same defaults). All variants require the table to have an unenforced primary key. - `set_lsm_write_spec` installs the spec by initializing the MemWAL index; `unset_lsm_write_spec` removes it (dropping the MemWAL index), reverting to the standard `merge_insert` path. `unset` is idempotent. - Bindings: Python (`LsmWriteSpec.bucket` / `.identity` / `.unsharded`, `set_lsm_write_spec` / `unset_lsm_write_spec`) and TypeScript (`setLsmWriteSpec` with `specType` `"bucket"` / `"identity"` / `"unsharded"`). `RemoteTable` returns `NotSupported`. The actual `merge_insert` LSM dispatch and `ShardWriter` write path are a follow-up — this PR only installs and clears the spec.
This commit is contained in:
68
Cargo.lock
generated
68
Cargo.lock
generated
@@ -3212,8 +3212,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.4",
|
||||
@@ -4426,8 +4426,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4497,8 +4497,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4518,8 +4518,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4528,8 +4528,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4564,8 +4564,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4595,8 +4595,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4614,8 +4614,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4650,8 +4650,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4682,8 +4682,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4747,8 +4747,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4790,8 +4790,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4807,8 +4807,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -4820,8 +4820,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -4870,8 +4870,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4910,8 +4910,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -4922,8 +4922,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-tokenizer"
|
||||
version = "7.0.0-beta.10"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.10#4739057aa428bc6b36077a84fc0ee08c6540053a"
|
||||
version = "7.0.0-beta.12"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.12#2b182f4f3fbcd97b662059b2c2981984e8c10c8e"
|
||||
dependencies = [
|
||||
"jieba-rs",
|
||||
"lindera",
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.0.0-beta.10", default-features = false, "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.10", default-features = false, "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.10", default-features = false, "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.10", "tag" = "v7.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.0.0-beta.12", default-features = false, "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.12", default-features = false, "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.12", default-features = false, "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.12", "tag" = "v7.0.0-beta.12", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
@@ -690,6 +690,49 @@ of the given query
|
||||
|
||||
***
|
||||
|
||||
### setLsmWriteSpec()
|
||||
|
||||
```ts
|
||||
abstract setLsmWriteSpec(spec): Promise<void>
|
||||
```
|
||||
|
||||
Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL
|
||||
LSM-style write path for future `mergeInsert` calls.
|
||||
|
||||
`LsmWriteSpec` chooses one of three sharding strategies via `specType`:
|
||||
|
||||
- `"bucket"` — hash-bucket writes by the single-column unenforced primary
|
||||
key (`column` and `numBuckets` required).
|
||||
- `"identity"` — shard by the raw value of a scalar `column`.
|
||||
- `"unsharded"` — route every write to a single shard.
|
||||
|
||||
All variants require the table to have an unenforced primary key
|
||||
([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally
|
||||
requires it to be the single column being bucketed.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md)
|
||||
The sharding spec to install.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 16,
|
||||
maintainedIndexes: ["id_idx"],
|
||||
});
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### setUnenforcedPrimaryKey()
|
||||
|
||||
```ts
|
||||
@@ -818,6 +861,23 @@ Return the table as an arrow table
|
||||
|
||||
***
|
||||
|
||||
### unsetLsmWriteSpec()
|
||||
|
||||
```ts
|
||||
abstract unsetLsmWriteSpec(): Promise<void>
|
||||
```
|
||||
|
||||
Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard
|
||||
`mergeInsert` write path.
|
||||
|
||||
Errors if no spec is currently set.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### update()
|
||||
|
||||
#### update(opts)
|
||||
|
||||
@@ -80,6 +80,7 @@
|
||||
- [IvfRqOptions](interfaces/IvfRqOptions.md)
|
||||
- [ListNamespacesOptions](interfaces/ListNamespacesOptions.md)
|
||||
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
|
||||
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
|
||||
- [MergeResult](interfaces/MergeResult.md)
|
||||
- [OpenTableOptions](interfaces/OpenTableOptions.md)
|
||||
- [OptimizeOptions](interfaces/OptimizeOptions.md)
|
||||
|
||||
64
docs/src/js/interfaces/LsmWriteSpec.md
Normal file
64
docs/src/js/interfaces/LsmWriteSpec.md
Normal file
@@ -0,0 +1,64 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / LsmWriteSpec
|
||||
|
||||
# Interface: LsmWriteSpec
|
||||
|
||||
Specification selecting Lance's MemWAL LSM-style write path for
|
||||
`mergeInsert`.
|
||||
|
||||
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
`column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
required.
|
||||
|
||||
## Properties
|
||||
|
||||
### column?
|
||||
|
||||
```ts
|
||||
optional column: string;
|
||||
```
|
||||
|
||||
Bucket and identity variants: the sharding column.
|
||||
|
||||
***
|
||||
|
||||
### maintainedIndexes?
|
||||
|
||||
```ts
|
||||
optional maintainedIndexes: string[];
|
||||
```
|
||||
|
||||
Names of indexes the MemWAL should keep up to date during writes.
|
||||
|
||||
***
|
||||
|
||||
### numBuckets?
|
||||
|
||||
```ts
|
||||
optional numBuckets: number;
|
||||
```
|
||||
|
||||
Bucket variant: the number of buckets, in `[1, 1024]`.
|
||||
|
||||
***
|
||||
|
||||
### specType
|
||||
|
||||
```ts
|
||||
specType: "bucket" | "identity" | "unsharded";
|
||||
```
|
||||
|
||||
One of `"bucket"`, `"identity"`, or `"unsharded"`.
|
||||
|
||||
***
|
||||
|
||||
### writerConfigDefaults?
|
||||
|
||||
```ts
|
||||
optional writerConfigDefaults: Record<string, string>;
|
||||
```
|
||||
|
||||
Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
@@ -2397,3 +2397,81 @@ describe("setUnenforcedPrimaryKey", () => {
|
||||
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
async function makeTable(conn: Connection): Promise<Table> {
|
||||
return await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]),
|
||||
);
|
||||
}
|
||||
|
||||
it("installs and removes a bucket spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 4,
|
||||
});
|
||||
await table.unsetLsmWriteSpec();
|
||||
// A second unset errors — there is no spec left to remove.
|
||||
await expect(table.unsetLsmWriteSpec()).rejects.toThrow();
|
||||
// A fresh spec can be installed after unset.
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 8,
|
||||
});
|
||||
});
|
||||
|
||||
it("installs an unsharded spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({ specType: "unsharded" });
|
||||
await table.unsetLsmWriteSpec();
|
||||
});
|
||||
|
||||
it("installs an identity spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({ specType: "identity", column: "id" });
|
||||
await table.unsetLsmWriteSpec();
|
||||
});
|
||||
|
||||
it("rejects an invalid spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
// num_buckets out of range.
|
||||
await expect(
|
||||
table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 0,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
// Column mismatch.
|
||||
await expect(
|
||||
table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "missing",
|
||||
numBuckets: 4,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -113,6 +113,7 @@ export {
|
||||
UpdateOptions,
|
||||
OptimizeOptions,
|
||||
Version,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
} from "./table";
|
||||
|
||||
|
||||
@@ -106,6 +106,27 @@ export interface Version {
|
||||
metadata: Record<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specification selecting Lance's MemWAL LSM-style write path for
|
||||
* `mergeInsert`.
|
||||
*
|
||||
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
* `column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
* required.
|
||||
*/
|
||||
export interface LsmWriteSpec {
|
||||
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
|
||||
specType: "bucket" | "identity" | "unsharded";
|
||||
/** Bucket and identity variants: the sharding column. */
|
||||
column?: string;
|
||||
/** Bucket variant: the number of buckets, in `[1, 1024]`. */
|
||||
numBuckets?: number;
|
||||
/** Names of indexes the MemWAL should keep up to date during writes. */
|
||||
maintainedIndexes?: string[];
|
||||
/** Default `ShardWriter` configuration recorded in the MemWAL index. */
|
||||
writerConfigDefaults?: Record<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* A Table is a collection of Records in a LanceDB Database.
|
||||
*
|
||||
@@ -461,6 +482,42 @@ export abstract class Table {
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
|
||||
/**
|
||||
* Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL
|
||||
* LSM-style write path for future `mergeInsert` calls.
|
||||
*
|
||||
* `LsmWriteSpec` chooses one of three sharding strategies via `specType`:
|
||||
*
|
||||
* - `"bucket"` — hash-bucket writes by the single-column unenforced primary
|
||||
* key (`column` and `numBuckets` required).
|
||||
* - `"identity"` — shard by the raw value of a scalar `column`.
|
||||
* - `"unsharded"` — route every write to a single shard.
|
||||
*
|
||||
* All variants require the table to have an unenforced primary key
|
||||
* ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally
|
||||
* requires it to be the single column being bucketed.
|
||||
* @param {LsmWriteSpec} spec The sharding spec to install.
|
||||
* @returns {Promise<void>}
|
||||
* @example
|
||||
* ```ts
|
||||
* await table.setUnenforcedPrimaryKey("id");
|
||||
* await table.setLsmWriteSpec({
|
||||
* specType: "bucket",
|
||||
* column: "id",
|
||||
* numBuckets: 16,
|
||||
* maintainedIndexes: ["id_idx"],
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise<void>;
|
||||
/**
|
||||
* Remove the {@link LsmWriteSpec} from this table, reverting to the standard
|
||||
* `mergeInsert` write path.
|
||||
*
|
||||
* Errors if no spec is currently set.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract unsetLsmWriteSpec(): Promise<void>;
|
||||
/** Retrieve the version of the table */
|
||||
|
||||
abstract version(): Promise<number>;
|
||||
@@ -914,6 +971,14 @@ export class LocalTable extends Table {
|
||||
return await this.inner.setUnenforcedPrimaryKey(cols);
|
||||
}
|
||||
|
||||
async setLsmWriteSpec(spec: LsmWriteSpec): Promise<void> {
|
||||
return await this.inner.setLsmWriteSpec(spec);
|
||||
}
|
||||
|
||||
async unsetLsmWriteSpec(): Promise<void> {
|
||||
return await this.inner.unsetLsmWriteSpec();
|
||||
}
|
||||
|
||||
async version(): Promise<number> {
|
||||
return await this.inner.version();
|
||||
}
|
||||
|
||||
@@ -352,6 +352,23 @@ impl Table {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> {
|
||||
let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?;
|
||||
self.inner_ref()?
|
||||
.set_lsm_write_spec(native_spec)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.unset_lsm_write_spec()
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn version(&self) -> napi::Result<i64> {
|
||||
self.inner_ref()?
|
||||
@@ -546,6 +563,63 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `mergeInsert`.
|
||||
///
|
||||
/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For
|
||||
/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`,
|
||||
/// `column` is required.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LsmWriteSpec {
|
||||
/// One of `"bucket"`, `"identity"`, or `"unsharded"`.
|
||||
pub spec_type: String,
|
||||
/// Bucket and identity variants: the sharding column.
|
||||
pub column: Option<String>,
|
||||
/// Bucket variant: the number of buckets, in `[1, 1024]`.
|
||||
pub num_buckets: Option<u32>,
|
||||
/// Names of indexes the MemWAL should keep up to date during writes.
|
||||
pub maintained_indexes: Option<Vec<String>>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
pub writer_config_defaults: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
impl TryFrom<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
|
||||
type Error = napi::Error;
|
||||
|
||||
fn try_from(value: LsmWriteSpec) -> napi::Result<Self> {
|
||||
let maintained = value.maintained_indexes.unwrap_or_default();
|
||||
let writer_config_defaults = value.writer_config_defaults.unwrap_or_default();
|
||||
let spec = match value.spec_type.as_str() {
|
||||
"bucket" => {
|
||||
let column = value.column.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec bucket requires `column`")
|
||||
})?;
|
||||
let num_buckets = value.num_buckets.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`")
|
||||
})?;
|
||||
Self::bucket(column, num_buckets)
|
||||
}
|
||||
"identity" => {
|
||||
let column = value.column.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec identity requires `column`")
|
||||
})?;
|
||||
Self::identity(column)
|
||||
}
|
||||
"unsharded" => Self::unsharded(),
|
||||
other => {
|
||||
return Err(napi::Error::from_reason(format!(
|
||||
"LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'",
|
||||
other
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(spec
|
||||
.with_maintained_indexes(maintained)
|
||||
.with_writer_config_defaults(writer_config_defaults))
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics about a compaction operation.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -218,6 +218,8 @@ class Table:
|
||||
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
|
||||
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
|
||||
async def unset_lsm_write_spec(self) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
@@ -419,6 +421,37 @@ class MergeResult:
|
||||
num_deleted_rows: int
|
||||
num_attempts: int
|
||||
|
||||
class LsmWriteSpec:
|
||||
"""Specification selecting Lance's MemWAL LSM-style write path for
|
||||
`merge_insert`."""
|
||||
|
||||
@staticmethod
|
||||
def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ...
|
||||
@staticmethod
|
||||
def identity(column: str) -> "LsmWriteSpec": ...
|
||||
@staticmethod
|
||||
def unsharded() -> "LsmWriteSpec": ...
|
||||
def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec":
|
||||
"""Return a copy of this spec asking the MemWAL to keep the named
|
||||
indexes up to date as rows are appended."""
|
||||
...
|
||||
def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec":
|
||||
"""Return a copy of this spec recording the given default
|
||||
`ShardWriter` configuration in the MemWAL index."""
|
||||
...
|
||||
@property
|
||||
def spec_type(self) -> str:
|
||||
"""One of 'bucket', 'identity', or 'unsharded'."""
|
||||
...
|
||||
@property
|
||||
def column(self) -> Optional[str]: ...
|
||||
@property
|
||||
def num_buckets(self) -> Optional[int]: ...
|
||||
@property
|
||||
def maintained_indexes(self) -> List[str]: ...
|
||||
@property
|
||||
def writer_config_defaults(self) -> Dict[str, str]: ...
|
||||
|
||||
class AddColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ from lancedb._lancedb import (
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
LsmWriteSpec,
|
||||
MergeResult,
|
||||
UpdateResult,
|
||||
)
|
||||
@@ -659,6 +660,14 @@ class RemoteTable(Table):
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_lsm_write_spec(spec))
|
||||
|
||||
def unset_lsm_write_spec(self) -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def drop_index(self, index_name: str):
|
||||
return LOOP.run(self._table.drop_index(index_name))
|
||||
|
||||
|
||||
@@ -154,6 +154,7 @@ if TYPE_CHECKING:
|
||||
AlterColumnsResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
MergeResult,
|
||||
UpdateResult,
|
||||
)
|
||||
@@ -3268,6 +3269,16 @@ class LanceTable(Table):
|
||||
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Install an LsmWriteSpec. See
|
||||
[`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.set_lsm_write_spec(spec))
|
||||
|
||||
def unset_lsm_write_spec(self) -> None:
|
||||
"""Remove the LsmWriteSpec. See
|
||||
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
"""
|
||||
Check if the table is using the new v2 manifest paths.
|
||||
@@ -3838,6 +3849,44 @@ class AsyncTable:
|
||||
columns = list(columns)
|
||||
await self._inner.set_unenforced_primary_key(columns)
|
||||
|
||||
async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Install an LsmWriteSpec on this table.
|
||||
|
||||
The spec selects Lance's MemWAL LSM-style write path for future
|
||||
`merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding
|
||||
strategies:
|
||||
|
||||
- ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by
|
||||
the single-column unenforced primary key.
|
||||
- ``LsmWriteSpec.identity(column)`` — shard by the raw value of a
|
||||
scalar column.
|
||||
- ``LsmWriteSpec.unsharded()`` — route every write to a single shard.
|
||||
|
||||
All variants require the table to have an unenforced primary key set
|
||||
via [`set_unenforced_primary_key`]; bucket sharding additionally
|
||||
requires it to be the single column being bucketed.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spec : LsmWriteSpec
|
||||
The sharding spec to install.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> from lancedb._lancedb import LsmWriteSpec
|
||||
>>> # table.set_unenforced_primary_key("id")
|
||||
>>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16))
|
||||
"""
|
||||
await self._inner.set_lsm_write_spec(spec)
|
||||
|
||||
async def unset_lsm_write_spec(self) -> None:
|
||||
"""Remove the LsmWriteSpec from this table.
|
||||
|
||||
Reverts to the standard `merge_insert` write path. Errors if no spec
|
||||
is currently set.
|
||||
"""
|
||||
await self._inner.unset_lsm_write_spec()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table."""
|
||||
|
||||
149
python/python/tests/test_lsm_write_spec.py
Normal file
149
python/python/tests/test_lsm_write_spec.py
Normal file
@@ -0,0 +1,149 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for installing and clearing an LsmWriteSpec via
|
||||
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
from lancedb._lancedb import LsmWriteSpec
|
||||
|
||||
SCHEMA = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.utf8(), nullable=False),
|
||||
pa.field("v", pa.int32(), nullable=False),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _batch(ids, vs):
|
||||
return pa.RecordBatch.from_arrays(
|
||||
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
|
||||
schema=SCHEMA,
|
||||
)
|
||||
|
||||
|
||||
def _reader(ids, vs):
|
||||
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
|
||||
|
||||
|
||||
def _make_table(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _reader(["seed"], [0]))
|
||||
return db, table
|
||||
|
||||
|
||||
def test_set_lsm_write_spec_validates(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# No PK set yet.
|
||||
with pytest.raises(Exception, match="primary key"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# Column mismatch.
|
||||
with pytest.raises(Exception, match="match"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
|
||||
|
||||
# Out-of-range num_buckets.
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
|
||||
|
||||
# Happy path then mutation rejected.
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
with pytest.raises(Exception, match="mutation"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
||||
|
||||
|
||||
def test_unset_lsm_write_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# unset errors when no spec is set.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
# Install a spec, then remove it; afterwards a fresh spec can be set.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
table.unset_lsm_write_spec()
|
||||
# A second unset errors — there is no spec left to remove.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
table.unset_lsm_write_spec()
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
||||
|
||||
|
||||
def test_set_unsharded_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
# Lance MemWAL still requires a primary key on the dataset; Unsharded
|
||||
# just skips per-row hashing.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_lsm_write_spec_repr():
|
||||
s = LsmWriteSpec.bucket("id", 4)
|
||||
assert s.spec_type == "bucket"
|
||||
assert s.column == "id"
|
||||
assert s.num_buckets == 4
|
||||
assert s.maintained_indexes == []
|
||||
assert "bucket" in repr(s)
|
||||
assert "id" in repr(s)
|
||||
assert "4" in repr(s)
|
||||
|
||||
u = LsmWriteSpec.unsharded()
|
||||
assert u.spec_type == "unsharded"
|
||||
assert u.column is None
|
||||
assert u.num_buckets is None
|
||||
assert "unsharded" in repr(u)
|
||||
|
||||
|
||||
def test_lsm_write_spec_with_maintained_indexes():
|
||||
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
|
||||
assert s.maintained_indexes == ["idx_a", "idx_b"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_set_unset_lsm_write_spec(tmp_path):
|
||||
db = await lancedb.connect_async(
|
||||
tmp_path, read_consistency_interval=timedelta(seconds=0)
|
||||
)
|
||||
table = await db.create_table(
|
||||
"t",
|
||||
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
|
||||
)
|
||||
|
||||
await table.set_unenforced_primary_key("id")
|
||||
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
await table.unset_lsm_write_spec()
|
||||
# A second unset errors.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
await table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_set_identity_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
# Identity sharding still requires an unenforced primary key on the
|
||||
# table; it shards by the raw value of the given column.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_lsm_write_spec_identity_and_writer_config_defaults():
|
||||
s = LsmWriteSpec.identity("v")
|
||||
assert s.spec_type == "identity"
|
||||
assert s.column == "v"
|
||||
assert s.num_buckets is None
|
||||
assert "identity" in repr(s)
|
||||
|
||||
s = s.with_writer_config_defaults({"durable_write": "false"})
|
||||
assert s.writer_config_defaults == {"durable_write": "false"}
|
||||
assert "durable_write" in repr(s)
|
||||
@@ -15,8 +15,8 @@ use pyo3::{
|
||||
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult,
|
||||
Table, UpdateResult,
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -52,6 +52,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
m.add_class::<DeleteResult>()?;
|
||||
m.add_class::<DropColumnsResult>()?;
|
||||
m.add_class::<UpdateResult>()?;
|
||||
|
||||
@@ -171,6 +171,141 @@ impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
}
|
||||
}
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `merge_insert`.
|
||||
///
|
||||
/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()`
|
||||
/// classmethods, then optionally chain `with_maintained_indexes(...)` and
|
||||
/// `with_writer_config_defaults(...)`.
|
||||
#[pyclass(from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LsmWriteSpec {
|
||||
inner: lancedb::table::LsmWriteSpec,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl LsmWriteSpec {
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
#[staticmethod]
|
||||
pub fn bucket(column: String, num_buckets: u32) -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets),
|
||||
}
|
||||
}
|
||||
|
||||
/// Identity sharding — shard by the raw value of `column`.
|
||||
#[staticmethod]
|
||||
pub fn identity(column: String) -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::identity(column),
|
||||
}
|
||||
}
|
||||
|
||||
/// No sharding — every `merge_insert` call writes to a single
|
||||
/// MemWAL shard.
|
||||
#[staticmethod]
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::unsharded(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the list of indexes the MemWAL should keep up to date as
|
||||
/// rows are appended. Each name must reference an index that
|
||||
/// already exists on the table at the time `set_lsm_write_spec`
|
||||
/// is called.
|
||||
pub fn with_maintained_indexes(&self, indexes: Vec<String>) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone().with_maintained_indexes(indexes),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the default `ShardWriter` configuration recorded in the
|
||||
/// MemWAL index, so every writer starts from the same defaults.
|
||||
pub fn with_writer_config_defaults(&self, defaults: HashMap<String, String>) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone().with_writer_config_defaults(defaults),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
column, num_buckets, maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
lancedb::table::LsmWriteSpec::Identity {
|
||||
column,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
column, maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
lancedb::table::LsmWriteSpec::Unsharded {
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Discriminator string identifying the variant ("bucket", "identity",
|
||||
/// or "unsharded").
|
||||
#[getter]
|
||||
pub fn spec_type(&self) -> &'static str {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket",
|
||||
lancedb::table::LsmWriteSpec::Identity { .. } => "identity",
|
||||
lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded",
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket and identity variants: the sharding column. `None` for unsharded.
|
||||
#[getter]
|
||||
pub fn column(&self) -> Option<String> {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { column, .. }
|
||||
| lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()),
|
||||
lancedb::table::LsmWriteSpec::Unsharded { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket variant only: the number of buckets.
|
||||
#[getter]
|
||||
pub fn num_buckets(&self) -> Option<u32> {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Names of indexes the MemWAL should keep up to date during writes.
|
||||
#[getter]
|
||||
pub fn maintained_indexes(&self) -> Vec<String> {
|
||||
self.inner.maintained_indexes().to_vec()
|
||||
}
|
||||
|
||||
/// Default `ShardWriter` configuration recorded by this spec.
|
||||
#[getter]
|
||||
pub fn writer_config_defaults(&self) -> HashMap<String, String> {
|
||||
self.inner.writer_config_defaults().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
|
||||
fn from(spec: LsmWriteSpec) -> Self {
|
||||
spec.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddColumnsResult {
|
||||
@@ -818,6 +953,24 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_lsm_write_spec<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
spec: LsmWriteSpec,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
let native_spec = lancedb::table::LsmWriteSpec::from(spec);
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.set_lsm_write_spec(native_spec).await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.unset_lsm_write_spec().await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
|
||||
@@ -1673,6 +1673,18 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
|
||||
@@ -273,6 +273,176 @@ pub trait Tags: Send + Sync {
|
||||
|
||||
pub use self::merge::MergeResult;
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `merge_insert`.
|
||||
///
|
||||
/// Construct via [`LsmWriteSpec::bucket`], [`LsmWriteSpec::identity`], or
|
||||
/// [`LsmWriteSpec::unsharded`], then optionally chain
|
||||
/// [`LsmWriteSpec::with_maintained_indexes`] (indexes the MemWAL keeps up to
|
||||
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
|
||||
/// `ShardWriter` configuration recorded in the MemWAL index).
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key.
|
||||
///
|
||||
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
|
||||
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
|
||||
/// onto the MemWAL writer is a follow-up.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum LsmWriteSpec {
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
///
|
||||
/// `column` must equal the table's currently-set single-column
|
||||
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
|
||||
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
|
||||
/// `bucket(column, num_buckets)` value is stable across processes.
|
||||
Bucket {
|
||||
column: String,
|
||||
num_buckets: u32,
|
||||
/// Names of indexes (already created on the table) that the
|
||||
/// MemWAL should maintain in-memory as rows are appended.
|
||||
maintained_indexes: Vec<String>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
writer_config_defaults: HashMap<String, String>,
|
||||
},
|
||||
/// Identity sharding — shard by the raw value of `column`.
|
||||
///
|
||||
/// Use this when the data is already partitioned by `column`; each
|
||||
/// distinct value of `column` becomes its own shard.
|
||||
Identity {
|
||||
column: String,
|
||||
/// Names of indexes (already created on the table) that the
|
||||
/// MemWAL should maintain in-memory as rows are appended.
|
||||
maintained_indexes: Vec<String>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
writer_config_defaults: HashMap<String, String>,
|
||||
},
|
||||
/// No sharding — every `merge_insert` call writes to a single MemWAL shard.
|
||||
Unsharded {
|
||||
/// Names of indexes (already created on the table) that the
|
||||
/// MemWAL should maintain in-memory as rows are appended.
|
||||
maintained_indexes: Vec<String>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
writer_config_defaults: HashMap<String, String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl LsmWriteSpec {
|
||||
/// Construct a hash-bucket sharding spec with no maintained indexes.
|
||||
pub fn bucket(column: impl Into<String>, num_buckets: u32) -> Self {
|
||||
Self::Bucket {
|
||||
column: column.into(),
|
||||
num_buckets,
|
||||
maintained_indexes: Vec::new(),
|
||||
writer_config_defaults: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct an identity-sharding spec (shard by the raw value of
|
||||
/// `column`) with no maintained indexes.
|
||||
pub fn identity(column: impl Into<String>) -> Self {
|
||||
Self::Identity {
|
||||
column: column.into(),
|
||||
maintained_indexes: Vec::new(),
|
||||
writer_config_defaults: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct an unsharded spec with no maintained indexes.
|
||||
pub fn unsharded() -> Self {
|
||||
Self::Unsharded {
|
||||
maintained_indexes: Vec::new(),
|
||||
writer_config_defaults: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the list of indexes the MemWAL should keep up to date as
|
||||
/// rows are appended. Each name must reference an index that already
|
||||
/// exists on the table at the time `set_lsm_write_spec` is called.
|
||||
pub fn with_maintained_indexes<I, S>(mut self, indexes: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
{
|
||||
let v: Vec<String> = indexes.into_iter().map(Into::into).collect();
|
||||
match &mut self {
|
||||
Self::Bucket {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Identity {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
maintained_indexes, ..
|
||||
} => *maintained_indexes = v,
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Replace the default `ShardWriter` configuration recorded in the MemWAL
|
||||
/// index, so every writer starts from the same defaults. Keys are
|
||||
/// `ShardWriter` config field names (`Duration` knobs use a `_ms` suffix);
|
||||
/// values are their string encodings.
|
||||
pub fn with_writer_config_defaults<I, K, V>(mut self, defaults: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = (K, V)>,
|
||||
K: Into<String>,
|
||||
V: Into<String>,
|
||||
{
|
||||
let m: HashMap<String, String> = defaults
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.into(), v.into()))
|
||||
.collect();
|
||||
match &mut self {
|
||||
Self::Bucket {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Identity {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
writer_config_defaults,
|
||||
..
|
||||
} => *writer_config_defaults = m,
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Borrow the list of index names this spec asks MemWAL to maintain.
|
||||
pub fn maintained_indexes(&self) -> &[String] {
|
||||
match self {
|
||||
Self::Bucket {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Identity {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
maintained_indexes, ..
|
||||
} => maintained_indexes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow the default `ShardWriter` configuration recorded by this spec.
|
||||
pub fn writer_config_defaults(&self) -> &HashMap<String, String> {
|
||||
match self {
|
||||
Self::Bucket {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Identity {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
writer_config_defaults,
|
||||
..
|
||||
} => writer_config_defaults,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for anything "table-like". This is used for both native tables (which target
|
||||
/// Lance datasets) and remote tables (which target LanceDB cloud)
|
||||
///
|
||||
@@ -360,6 +530,29 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
message: "set_unenforced_primary_key is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Install an [`LsmWriteSpec`] on this table.
|
||||
///
|
||||
/// The spec selects Lance's MemWAL LSM-style write path for future
|
||||
/// `merge_insert` calls.
|
||||
///
|
||||
/// The default implementation returns `NotSupported`. Implementations
|
||||
/// that support the MemWAL LSM write path must override this.
|
||||
async fn set_lsm_write_spec(&self, _spec: LsmWriteSpec) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_lsm_write_spec is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Remove the [`LsmWriteSpec`] from this table.
|
||||
///
|
||||
/// This is a no-op if no spec is currently set.
|
||||
///
|
||||
/// The default implementation returns `NotSupported`. Implementations
|
||||
/// that support the MemWAL LSM write path must override this.
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "unset_lsm_write_spec is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Gets the table tag manager.
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||
/// Optimize the dataset.
|
||||
@@ -1100,6 +1293,46 @@ impl Table {
|
||||
self.inner.set_unenforced_primary_key(&borrowed).await
|
||||
}
|
||||
|
||||
/// Install an [`LsmWriteSpec`] on this table, selecting Lance's MemWAL
|
||||
/// LSM-style write path for future `merge_insert` calls.
|
||||
///
|
||||
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
|
||||
///
|
||||
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
|
||||
/// unenforced primary key.
|
||||
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
|
||||
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key
|
||||
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
|
||||
/// requires it to be the single column being bucketed.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::table::{LsmWriteSpec, Table};
|
||||
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// table.set_unenforced_primary_key(["id"]).await?;
|
||||
/// table
|
||||
/// .set_lsm_write_spec(
|
||||
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
|
||||
/// )
|
||||
/// .await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
|
||||
self.inner.set_lsm_write_spec(spec).await
|
||||
}
|
||||
|
||||
/// Remove the [`LsmWriteSpec`] from this table, reverting to the standard
|
||||
/// `merge_insert` write path.
|
||||
///
|
||||
/// Errors if no spec is currently set.
|
||||
pub async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
self.inner.unset_lsm_write_spec().await
|
||||
}
|
||||
|
||||
/// Retrieve the version of the table
|
||||
///
|
||||
/// LanceDb supports versioning. Every operation that modifies the table increases
|
||||
@@ -2510,6 +2743,14 @@ impl BaseTable for NativeTable {
|
||||
primary_key::set_unenforced_primary_key(self, columns).await
|
||||
}
|
||||
|
||||
async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
|
||||
merge::lsm::set_lsm_write_spec(self, spec).await
|
||||
}
|
||||
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
merge::lsm::unset_lsm_write_spec(self).await
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
// Delegate to the submodule implementation
|
||||
@@ -4052,6 +4293,249 @@ mod tests {
|
||||
assert_eq!(pk[0].name, "id");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec() {
|
||||
use arrow_array::StringArray;
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("name", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Reject when no PK is set.
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.expect_err("should reject without PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Set PK, then a mismatched column on the spec must be rejected.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
|
||||
.await
|
||||
.expect_err("should reject column != PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Reject num_buckets out of range.
|
||||
for bad in [0u32, 1025] {
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", bad))
|
||||
.await
|
||||
.expect_err("should reject");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
}
|
||||
|
||||
// Happy path: install spec; verify MemWAL details record it.
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let native_tbl = table.as_native().unwrap();
|
||||
let dataset = native_tbl.dataset.get().await.unwrap();
|
||||
let details = dataset
|
||||
.mem_wal_index_details()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("MemWAL index should be initialized");
|
||||
assert_eq!(details.num_shards, 4);
|
||||
assert_eq!(details.sharding_specs.len(), 1);
|
||||
let installed = &details.sharding_specs[0];
|
||||
assert_eq!(installed.fields.len(), 1);
|
||||
let f = &installed.fields[0];
|
||||
assert_eq!(f.transform.as_deref(), Some("bucket"));
|
||||
assert_eq!(
|
||||
f.parameters.get("num_buckets").map(String::as_str),
|
||||
Some("4")
|
||||
);
|
||||
// Bucket parameters must hold only `num_buckets`.
|
||||
assert_eq!(f.parameters.len(), 1);
|
||||
|
||||
// Mutation rejected.
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
|
||||
.await
|
||||
.expect_err("mutation should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_unsharded() {
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Lance's MemWAL still requires *some* unenforced primary key on
|
||||
// the dataset; Unsharded just skips the per-row hashing step.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::unsharded())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
let details = dataset
|
||||
.mem_wal_index_details()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("MemWAL index should be initialized");
|
||||
assert_eq!(details.num_shards, 1);
|
||||
assert_eq!(details.sharding_specs.len(), 1);
|
||||
let f = &details.sharding_specs[0].fields[0];
|
||||
assert_eq!(f.transform.as_deref(), Some("unsharded"));
|
||||
assert!(f.source_ids.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_identity() {
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("region", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(
|
||||
LsmWriteSpec::identity("region")
|
||||
.with_writer_config_defaults([("durable_write", "false")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
let details = dataset
|
||||
.mem_wal_index_details()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("MemWAL index should be initialized");
|
||||
// Identity sharding records an open-ended shard count.
|
||||
assert_eq!(details.num_shards, 0);
|
||||
assert_eq!(details.sharding_specs.len(), 1);
|
||||
let f = &details.sharding_specs[0].fields[0];
|
||||
assert_eq!(f.transform.as_deref(), Some("identity"));
|
||||
// Writer config defaults round-trip into the MemWAL index.
|
||||
assert_eq!(
|
||||
details
|
||||
.writer_config_defaults
|
||||
.get("durable_write")
|
||||
.map(String::as_str),
|
||||
Some("false")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unset_lsm_write_spec() {
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// unset errors when no spec is set.
|
||||
table.unset_lsm_write_spec().await.unwrap_err();
|
||||
|
||||
// Install a spec, then unset it.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
|
||||
}
|
||||
|
||||
table.unset_lsm_write_spec().await.unwrap();
|
||||
{
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
assert!(dataset.mem_wal_index_details().await.unwrap().is_none());
|
||||
}
|
||||
|
||||
// A second unset errors; a fresh spec can still be installed afterwards.
|
||||
table.unset_lsm_write_spec().await.unwrap_err();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_stats() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
|
||||
@@ -16,6 +16,8 @@ use crate::error::{Error, Result};
|
||||
|
||||
use super::{BaseTable, NativeTable};
|
||||
|
||||
pub(crate) mod lsm;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct MergeResult {
|
||||
// The commit version associated with the operation.
|
||||
|
||||
101
rust/lancedb/src/table/merge/lsm.rs
Normal file
101
rust/lancedb/src/table/merge/lsm.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! MemWAL LSM write-path spec management.
|
||||
//!
|
||||
//! [`set_lsm_write_spec`] installs a [`super::super::LsmWriteSpec`] on a
|
||||
//! table, which selects Lance's MemWAL LSM-style write path for future
|
||||
//! `merge_insert` calls. [`unset_lsm_write_spec`] removes it. The actual
|
||||
//! `merge_insert` dispatch and writer are a follow-up.
|
||||
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
use lance::index::DatasetIndexExt;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::{LsmWriteSpec, NativeTable};
|
||||
|
||||
// =============================================================================
|
||||
// set_lsm_write_spec
|
||||
// =============================================================================
|
||||
|
||||
/// Install an [`LsmWriteSpec`] on the table.
|
||||
///
|
||||
/// The bucket / unsharded sharding spec is constructed and validated by Lance's
|
||||
/// [`InitializeMemWalBuilder`](lance::dataset::mem_wal::InitializeMemWalBuilder).
|
||||
#[allow(clippy::redundant_pub_crate)]
|
||||
pub(crate) async fn set_lsm_write_spec(table: &NativeTable, spec: LsmWriteSpec) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
{
|
||||
let dataset = table.dataset.get().await?;
|
||||
if dataset.mem_wal_index_details().await?.is_some() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_lsm_write_spec: an LSM write spec is already set on this table; mutation is not supported".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let mut builder = dataset.initialize_mem_wal();
|
||||
let (maintained_indexes, writer_config_defaults) = match spec {
|
||||
LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.bucket_sharding(column, num_buckets);
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
LsmWriteSpec::Identity {
|
||||
column,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.identity_sharding(column);
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
LsmWriteSpec::Unsharded {
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.unsharded();
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
};
|
||||
builder = builder.maintained_indexes(maintained_indexes);
|
||||
for (key, value) in writer_config_defaults {
|
||||
builder = builder.add_writer_config_default(key, value);
|
||||
}
|
||||
builder.execute().await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// unset_lsm_write_spec
|
||||
// =============================================================================
|
||||
|
||||
/// Remove the [`LsmWriteSpec`] from the table by dropping the MemWAL index.
|
||||
///
|
||||
/// Errors if no spec is currently set.
|
||||
#[allow(clippy::redundant_pub_crate)]
|
||||
pub(crate) async fn unset_lsm_write_spec(table: &NativeTable) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
{
|
||||
let dataset = table.dataset.get().await?;
|
||||
if dataset.mem_wal_index_details().await?.is_none() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "unset_lsm_write_spec: no LSM write spec is set on this table".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
dataset
|
||||
.drop_index(lance_index::mem_wal::MEM_WAL_INDEX_NAME)
|
||||
.await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user