Merge remote-tracking branch 'origin/main' into xuanwo/bump-lance-7-2-beta-3

# Conflicts:
#	Cargo.lock
#	Cargo.toml
This commit is contained in:
Xuanwo
2026-06-01 17:13:38 +08:00
28 changed files with 2110 additions and 97 deletions

149
Cargo.lock generated
View File

@@ -568,7 +568,7 @@ dependencies = [
"bytes",
"fastrand",
"hex",
"http 1.4.0",
"http 1.4.1",
"sha1 0.10.6",
"time",
"tokio",
@@ -631,7 +631,7 @@ dependencies = [
"bytes-utils",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"http-body 0.4.6",
"http-body 1.0.1",
"percent-encoding",
@@ -661,7 +661,7 @@ dependencies = [
"bytes",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"http-body-util",
"regex-lite",
"tracing",
@@ -686,7 +686,7 @@ dependencies = [
"bytes",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"regex-lite",
"tracing",
]
@@ -710,7 +710,7 @@ dependencies = [
"bytes",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"regex-lite",
"tracing",
]
@@ -740,7 +740,7 @@ dependencies = [
"hex",
"hmac 0.13.0",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"lru",
"percent-encoding",
@@ -769,7 +769,7 @@ dependencies = [
"bytes",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"regex-lite",
"tracing",
]
@@ -793,7 +793,7 @@ dependencies = [
"bytes",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"regex-lite",
"tracing",
]
@@ -818,7 +818,7 @@ dependencies = [
"aws-types",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"regex-lite",
"tracing",
]
@@ -840,7 +840,7 @@ dependencies = [
"hex",
"hmac 0.13.0",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"p256",
"percent-encoding",
"ring",
@@ -873,7 +873,7 @@ dependencies = [
"bytes",
"crc-fast",
"hex",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"md-5 0.11.0",
@@ -907,7 +907,7 @@ dependencies = [
"bytes-utils",
"futures-core",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"percent-encoding",
@@ -928,7 +928,7 @@ dependencies = [
"h2 0.3.27",
"h2 0.4.14",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"http-body 0.4.6",
"hyper 0.14.32",
"hyper 1.9.0",
@@ -976,20 +976,21 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.11.1"
version = "1.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0504b1ab12debb5959e5165ee5fe97dd387e7aa7ea6a477bfd7635dfe769a4f5"
checksum = "b8e6f5caf6fea86f8c2206541ab5857cfcda9013426cdbe8fa0098b9e2d32182"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-http-client",
"aws-smithy-observability",
"aws-smithy-runtime-api",
"aws-smithy-schema",
"aws-smithy-types",
"bytes",
"fastrand",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"http-body 0.4.6",
"http-body 1.0.1",
"http-body-util",
@@ -1001,16 +1002,16 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime-api"
version = "1.12.0"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71a13df6ada0aafbf21a73bdfcdf9324cfa9df77d96b8446045be3cde61b42e"
checksum = "dc117c179ecf39a62a0a3f49f600e9ac26a7ad7dd172177999f83933af776c32"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api-macros",
"aws-smithy-types",
"bytes",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"pin-project-lite",
"tokio",
"tracing",
@@ -1029,17 +1030,28 @@ dependencies = [
]
[[package]]
name = "aws-smithy-types"
version = "1.4.7"
name = "aws-smithy-schema"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c"
checksum = "7442cb268338f0eb8278140a107c046756aa01093d8ef5e99628d34ae09c94f5"
dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
"http 1.4.1",
]
[[package]]
name = "aws-smithy-types"
version = "1.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "056b66dbce2f81cc0c1e2b05bb402eb58f8a3530479d650efadd5bbae9a4050b"
dependencies = [
"base64-simd",
"bytes",
"bytes-utils",
"futures-core",
"http 0.2.12",
"http 1.4.0",
"http 1.4.1",
"http-body 0.4.6",
"http-body 1.0.1",
"http-body-util",
@@ -1087,7 +1099,7 @@ dependencies = [
"axum-core",
"bytes",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.9.0",
@@ -1120,7 +1132,7 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"mime",
@@ -3685,7 +3697,7 @@ dependencies = [
"fnv",
"futures-core",
"futures-sink",
"http 1.4.0",
"http 1.4.1",
"indexmap 2.14.0",
"slab",
"tokio",
@@ -3791,7 +3803,7 @@ checksum = "629d8f3bbeda9d148036d6b0de0a3ab947abd08ce90626327fc3547a49d59d97"
dependencies = [
"dirs",
"futures",
"http 1.4.0",
"http 1.4.1",
"indicatif",
"libc",
"log",
@@ -3814,7 +3826,7 @@ checksum = "430b33fa84f92796d4d263070b6c0d3ca219df7b9a0e1853ee431029b1612bcd"
dependencies = [
"async-trait",
"bytes",
"http 1.4.0",
"http 1.4.1",
"more-asserts",
"serde",
"thiserror 2.0.18",
@@ -3868,9 +3880,9 @@ dependencies = [
[[package]]
name = "http"
version = "1.4.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a"
checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0"
dependencies = [
"bytes",
"itoa",
@@ -3894,7 +3906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http 1.4.0",
"http 1.4.1",
]
[[package]]
@@ -3905,7 +3917,7 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"pin-project-lite",
]
@@ -3972,7 +3984,7 @@ dependencies = [
"futures-channel",
"futures-core",
"h2 0.4.14",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"httparse",
"httpdate",
@@ -4004,7 +4016,7 @@ version = "0.27.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f"
dependencies = [
"http 1.4.0",
"http 1.4.1",
"hyper 1.9.0",
"hyper-util",
"rustls 0.23.40",
@@ -4025,7 +4037,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"hyper 1.9.0",
"ipnet",
@@ -4932,7 +4944,7 @@ dependencies = [
"chrono",
"deepsize",
"futures",
"http 1.4.0",
"http 1.4.1",
"io-uring",
"lance-arrow",
"lance-core",
@@ -5153,7 +5165,7 @@ dependencies = [
"futures",
"half",
"hf-hub",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"lance",
"lance-arrow",
@@ -5445,9 +5457,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.29"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5"
[[package]]
name = "loom"
@@ -6017,7 +6029,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body-util",
"httparse",
"humantime",
@@ -6130,7 +6142,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"futures",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"jiff",
"log",
@@ -6155,7 +6167,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d6f81ba6960e3fae1882f253b114b21d7e444e1534f209c7737a79f6243eb6f"
dependencies = [
"futures",
"http 1.4.0",
"http 1.4.1",
"mea",
"opendal-core",
]
@@ -6199,7 +6211,7 @@ checksum = "0030644366ef5d8cbe3a4a5822bf99a4aafddc1666e9d24b44d158d9062fc76a"
dependencies = [
"base64 0.22.1",
"bytes",
"http 1.4.0",
"http 1.4.1",
"log",
"opendal-core",
"opendal-service-azure-common",
@@ -6220,7 +6232,7 @@ checksum = "6dea4908d490143a9b0b7f7a790e139ff829b06a023f670455ed3d44f664b361"
dependencies = [
"base64 0.22.1",
"bytes",
"http 1.4.0",
"http 1.4.1",
"log",
"opendal-core",
"opendal-service-azure-common",
@@ -6238,7 +6250,7 @@ version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b489f13c42e69d69bdd72952b634356ec43a7881a20259b38b540fcecdf4051"
dependencies = [
"http 1.4.0",
"http 1.4.1",
"opendal-core",
]
@@ -6250,7 +6262,7 @@ checksum = "48de101aac565ed06af4b47903c24eafd249075553ec1fb18256751c45148d47"
dependencies = [
"async-trait",
"bytes",
"http 1.4.0",
"http 1.4.1",
"log",
"opendal-core",
"percent-encoding",
@@ -6271,7 +6283,7 @@ checksum = "c4922661976a1d40794a2adfbdb888cc3c23097690f825a92f773af38908a848"
dependencies = [
"bytes",
"hf-xet",
"http 1.4.0",
"http 1.4.1",
"log",
"opendal-core",
"percent-encoding",
@@ -6287,7 +6299,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "328fa55e8888cbdfe00826bfea2a79042422b720e8369e9e021e46121dea5ace"
dependencies = [
"bytes",
"http 1.4.0",
"http 1.4.1",
"log",
"opendal-core",
"quick-xml 0.39.4",
@@ -6306,7 +6318,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"crc32c",
"http 1.4.0",
"http 1.4.1",
"log",
"md-5 0.11.0",
"opendal-core",
@@ -7714,7 +7726,7 @@ checksum = "57ac2757f3140aa2e213b554148ae0b52733e624fc6723f0cc6bb3d440176c95"
dependencies = [
"anyhow",
"form_urlencoded",
"http 1.4.0",
"http 1.4.1",
"log",
"percent-encoding",
"reqsign-core",
@@ -7732,7 +7744,7 @@ dependencies = [
"anyhow",
"bytes",
"form_urlencoded",
"http 1.4.0",
"http 1.4.1",
"log",
"percent-encoding",
"quick-xml 0.39.4",
@@ -7754,7 +7766,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"form_urlencoded",
"http 1.4.0",
"http 1.4.1",
"jsonwebtoken",
"log",
"pem",
@@ -7779,7 +7791,7 @@ dependencies = [
"futures",
"hex",
"hmac 0.12.1",
"http 1.4.0",
"http 1.4.1",
"jiff",
"log",
"percent-encoding",
@@ -7806,7 +7818,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35cc609b49c69e76ecaceb775a03f792d1ed3e7755ab3548d4534fd801e3242e"
dependencies = [
"form_urlencoded",
"http 1.4.0",
"http 1.4.1",
"jsonwebtoken",
"log",
"percent-encoding",
@@ -7831,7 +7843,7 @@ dependencies = [
"futures-core",
"futures-util",
"h2 0.4.14",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.9.0",
@@ -7875,7 +7887,7 @@ dependencies = [
"bytes",
"futures-core",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.9.0",
@@ -7929,7 +7941,7 @@ checksum = "199dda04a536b532d0cc04d7979e39b1c763ea749bf91507017069c00b96056f"
dependencies = [
"anyhow",
"async-trait",
"http 1.4.0",
"http 1.4.1",
"reqwest 0.13.3",
"thiserror 2.0.18",
"tower-service",
@@ -8417,9 +8429,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.149"
version = "1.0.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86"
checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9"
dependencies = [
"itoa",
"memchr",
@@ -8564,6 +8576,12 @@ dependencies = [
"digest 0.11.3",
]
[[package]]
name = "sha1_smol"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "sha2"
version = "0.10.9"
@@ -9472,7 +9490,7 @@ checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
dependencies = [
"bitflags 2.11.1",
"bytes",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"pin-project-lite",
@@ -9492,7 +9510,7 @@ dependencies = [
"bytes",
"futures-core",
"futures-util",
"http 1.4.0",
"http 1.4.1",
"http-body 1.0.1",
"http-body-util",
"pin-project-lite",
@@ -9781,13 +9799,14 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.23.1"
version = "1.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76"
checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7"
dependencies = [
"getrandom 0.4.2",
"js-sys",
"serde_core",
"sha1_smol",
"wasm-bindgen",
]
@@ -10512,7 +10531,7 @@ dependencies = [
"clap",
"crc32fast",
"futures",
"http 1.4.0",
"http 1.4.1",
"hyper 1.9.0",
"lazy_static",
"more-asserts",
@@ -10586,7 +10605,7 @@ dependencies = [
"chrono",
"clap",
"gearhash",
"http 1.4.0",
"http 1.4.1",
"itertools 0.14.0",
"lazy_static",
"more-asserts",

View File

@@ -76,6 +76,57 @@ the query optimizer chooses a suboptimal path.
***
### useLsmWrite()
```ts
useLsmWrite(useLsmWrite): MergeInsertBuilder
```
Controls whether the merge uses the MemWAL LSM write path.
By default (unset), a `mergeInsert` on a table with an LSM write spec is
routed through Lance's MemWAL shard writer, and a table without one uses
the standard path. Pass `false` to force the standard path even when a
spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
is installed.
#### Parameters
* **useLsmWrite**: `boolean`
Whether to use the LSM write path.
#### Returns
[`MergeInsertBuilder`](MergeInsertBuilder.md)
***
### validateSingleShard()
```ts
validateSingleShard(validateSingleShard): MergeInsertBuilder
```
Controls how an LSM merge checks that its input targets a single shard.
When a table has an LSM write spec, every row in a `mergeInsert` call must
route to the same shard. When `true` (the default), every row is inspected
to verify this. When `false`, only the first row is inspected and the
shard it routes to is used for the whole input — a faster path for callers
that have already pre-sharded their input. Has no effect on tables without
an LSM write spec.
#### Parameters
* **validateSingleShard**: `boolean`
Whether to check every row routes to one shard. Defaults to `true`.
#### Returns
[`MergeInsertBuilder`](MergeInsertBuilder.md)
***
### whenMatchedUpdateAll()
```ts

View File

@@ -187,6 +187,25 @@ Any attempt to use the table after it is closed will result in an error.
***
### closeLsmWriters()
```ts
abstract closeLsmWriters(): Promise<void>
```
Drain and close any cached MemWAL shard writers held for this table.
When an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) is installed, `mergeInsert` opens MemWAL
shard writers and caches them for reuse across calls. This closes them,
flushing pending data; writers reopen lazily on the next `mergeInsert`.
It is a no-op when no writers are cached.
#### Returns
`Promise`&lt;`void`&gt;
***
### countRows()
```ts

View File

@@ -11,7 +11,10 @@ Specification selecting Lance's MemWAL LSM-style write path for
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
`column` and `numBuckets` are required; for `"identity"`, `column` is
required.
required and must be a deterministic function of the unenforced primary
key (every row with a given primary key must always produce the same
`column` value, or upserts of that key can land in different shards and a
stale version can win).
## Properties

View File

@@ -32,6 +32,14 @@ numInsertedRows: number;
***
### numRows
```ts
numRows: number;
```
***
### numUpdatedRows
```ts

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.1.0-beta.4</lance-core.version>
<lance-core.version>7.2.0-beta.1</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -2625,3 +2625,97 @@ describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
).rejects.toThrow();
});
});
describe("LSM merge insert", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
async function bucketTable(conn: Connection): Promise<Table> {
// The primary key column must be non-nullable.
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Utf8(), false),
new arrow.Field("value", new arrow.Float64(), true),
]),
);
await table.add([
{ id: "a", value: 1 },
{ id: "b", value: 2 },
]);
await table.setUnenforcedPrimaryKey("id");
// numBuckets = 1: every row routes to the single bucket.
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 1,
});
return table;
}
it("routes merge_insert through the shard writer", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute([
{ id: "c", value: 3 },
{ id: "d", value: 4 },
]);
// LSM path: rows go to the MemWAL, so only numRows is populated.
expect(res.numRows).toBe(2);
expect(res.version).toBe(0);
expect(res.numInsertedRows).toBe(0);
await table.closeLsmWriters();
});
it("falls back to the standard path with useLsmWrite(false)", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.useLsmWrite(false)
.execute([
{ id: "b", value: 9 },
{ id: "e", value: 5 },
]);
// Standard path commits: id="e" inserted ("b" already exists).
expect(res.numInsertedRows).toBe(1);
expect(await table.countRows()).toBe(3);
});
it("supports validateSingleShard(false)", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.validateSingleShard(false)
.execute([{ id: "f", value: 6 }]);
expect(res.numRows).toBe(1);
});
it("rejects a non-upsert merge under an LSM spec", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
await expect(
table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.execute([{ id: "g", value: 7 }]),
).rejects.toThrow();
});
});

View File

@@ -87,6 +87,41 @@ export class MergeInsertBuilder {
this.#schema,
);
}
/**
* Controls whether the merge uses the MemWAL LSM write path.
*
* By default (unset), a `mergeInsert` on a table with an LSM write spec is
* routed through Lance's MemWAL shard writer, and a table without one uses
* the standard path. Pass `false` to force the standard path even when a
* spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
* is installed.
*
* @param useLsmWrite - Whether to use the LSM write path.
*/
useLsmWrite(useLsmWrite: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.useLsmWrite(useLsmWrite),
this.#schema,
);
}
/**
* Controls how an LSM merge checks that its input targets a single shard.
*
* When a table has an LSM write spec, every row in a `mergeInsert` call must
* route to the same shard. When `true` (the default), every row is inspected
* to verify this. When `false`, only the first row is inspected and the
* shard it routes to is used for the whole input — a faster path for callers
* that have already pre-sharded their input. Has no effect on tables without
* an LSM write spec.
*
* @param validateSingleShard - Whether to check every row routes to one shard. Defaults to `true`.
*/
validateSingleShard(validateSingleShard: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.validateSingleShard(validateSingleShard),
this.#schema,
);
}
/**
* Executes the merge insert operation
*

View File

@@ -161,7 +161,10 @@ export interface Version {
*
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
* `column` and `numBuckets` are required; for `"identity"`, `column` is
* required.
* required and must be a deterministic function of the unenforced primary
* key (every row with a given primary key must always produce the same
* `column` value, or upserts of that key can land in different shards and a
* stale version can win).
*/
export interface LsmWriteSpec {
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
@@ -567,6 +570,16 @@ export abstract class Table {
* @returns {Promise<void>}
*/
abstract unsetLsmWriteSpec(): Promise<void>;
/**
* Drain and close any cached MemWAL shard writers held for this table.
*
* When an {@link LsmWriteSpec} is installed, `mergeInsert` opens MemWAL
* shard writers and caches them for reuse across calls. This closes them,
* flushing pending data; writers reopen lazily on the next `mergeInsert`.
* It is a no-op when no writers are cached.
* @returns {Promise<void>}
*/
abstract closeLsmWriters(): Promise<void>;
/** Retrieve the version of the table */
abstract version(): Promise<number>;
@@ -1041,6 +1054,10 @@ export class LocalTable extends Table {
return await this.inner.unsetLsmWriteSpec();
}
async closeLsmWriters(): Promise<void> {
return await this.inner.closeLsmWriters();
}
async version(): Promise<number> {
return await this.inner.version();
}

View File

@@ -50,6 +50,20 @@ impl NativeMergeInsertBuilder {
this
}
#[napi]
pub fn use_lsm_write(&self, use_lsm_write: bool) -> Self {
let mut this = self.clone();
this.inner.use_lsm_write(use_lsm_write);
this
}
#[napi]
pub fn validate_single_shard(&self, validate_single_shard: bool) -> Self {
let mut this = self.clone();
this.inner.validate_single_shard(validate_single_shard);
this
}
#[napi(catch_unwind)]
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
let data = ipc_file_to_batches(buf.to_vec())

View File

@@ -391,6 +391,11 @@ impl Table {
.default_error()
}
#[napi(catch_unwind)]
pub async fn close_lsm_writers(&self) -> napi::Result<()> {
self.inner_ref()?.close_lsm_writers().await.default_error()
}
#[napi(catch_unwind)]
pub async fn version(&self) -> napi::Result<i64> {
self.inner_ref()?
@@ -940,6 +945,7 @@ pub struct MergeResult {
pub num_updated_rows: i64,
pub num_deleted_rows: i64,
pub num_attempts: i64,
pub num_rows: i64,
}
impl From<lancedb::table::MergeResult> for MergeResult {
@@ -950,6 +956,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_updated_rows: value.num_updated_rows as i64,
num_deleted_rows: value.num_deleted_rows as i64,
num_attempts: value.num_attempts as i64,
num_rows: value.num_rows as i64,
}
}
}

View File

@@ -220,6 +220,7 @@ class Table:
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
async def unset_lsm_write_spec(self) -> None: ...
async def close_lsm_writers(self) -> None: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...
@@ -420,6 +421,7 @@ class MergeResult:
num_inserted_rows: int
num_deleted_rows: int
num_attempts: int
num_rows: int
class LsmWriteSpec:
"""Specification selecting Lance's MemWAL LSM-style write path for

View File

@@ -34,6 +34,8 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._use_lsm_write = None
self._validate_single_shard = None
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -96,6 +98,46 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def use_lsm_write(self, use_lsm_write: bool) -> LanceMergeInsertBuilder:
"""
Controls whether the merge uses the MemWAL LSM write path.
By default (unset), a `merge_insert` on a table with an LSM write spec
is routed through Lance's MemWAL shard writer, and a table without one
uses the standard path. Pass `False` to force the standard path even
when a spec is set. Pass `True` to require a spec — `merge_insert`
raises an error if none is installed.
Parameters
----------
use_lsm_write: bool
Whether to use the LSM write path.
"""
self._use_lsm_write = use_lsm_write
return self
def validate_single_shard(
self, validate_single_shard: bool
) -> LanceMergeInsertBuilder:
"""
Controls how an LSM merge checks that its input targets a single shard.
When a table has an LSM write spec, every row in a `merge_insert` call
must route to the same shard. When `True` (the default), every row is
inspected to verify this. When `False`, only the first row is inspected
and the shard it routes to is used for the whole input — a faster path
for callers that have already pre-sharded their input.
Has no effect on tables without an LSM write spec.
Parameters
----------
validate_single_shard: bool
Whether to check every row routes to one shard. Defaults to `True`.
"""
self._validate_single_shard = validate_single_shard
return self
def execute(
self,
new_data: DATA,

View File

@@ -792,6 +792,10 @@ class RemoteTable(Table):
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.unset_lsm_write_spec())
def close_lsm_writers(self) -> None:
"""No-op on LanceDB Cloud (no local shard writers)."""
return LOOP.run(self._table.close_lsm_writers())
def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name))

View File

@@ -1251,7 +1251,7 @@ class Table(ABC):
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -3601,6 +3601,11 @@ class LanceTable(Table):
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
return LOOP.run(self._table.unset_lsm_write_spec())
def close_lsm_writers(self) -> None:
"""Close cached MemWAL shard writers. See
[`AsyncTable.close_lsm_writers`][lancedb.AsyncTable.close_lsm_writers]."""
return LOOP.run(self._table.close_lsm_writers())
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
@@ -4209,6 +4214,16 @@ class AsyncTable:
"""
await self._inner.unset_lsm_write_spec()
async def close_lsm_writers(self) -> None:
"""Drain and close any cached MemWAL shard writers for this table.
When an LSM write spec is installed, `merge_insert` opens MemWAL shard
writers and caches them for reuse across calls. This closes them,
flushing pending data; writers reopen lazily on the next
`merge_insert`. It is a no-op when no writers are cached.
"""
await self._inner.close_lsm_writers()
@property
def name(self) -> str:
"""The name of the table."""
@@ -4659,7 +4674,7 @@ class AsyncTable:
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -5039,6 +5054,8 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
use_lsm_write=merge._use_lsm_write,
validate_single_shard=merge._validate_single_shard,
),
)

View File

@@ -57,7 +57,7 @@ async def test_upsert_async(mem_db_async):
await table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=1, num_deleted_rows=0)
# num_inserted_rows=1, num_deleted_rows=0, num_rows=2)
# --8<-- [end:upsert_basic_async]
assert await table.count_rows() == 3
assert res.version == 2
@@ -86,7 +86,7 @@ def test_insert_if_not_exists(mem_db):
table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=0,
# num_inserted_rows=1, num_deleted_rows=0)
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
# --8<-- [end:insert_if_not_exists]
assert table.count_rows() == 3
assert res.version == 2
@@ -116,7 +116,7 @@ async def test_insert_if_not_exists_async(mem_db_async):
await table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=0,
# num_inserted_rows=1, num_deleted_rows=0)
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
# --8<-- [end:insert_if_not_exists]
assert await table.count_rows() == 3
assert res.version == 2
@@ -150,7 +150,7 @@ def test_replace_range(mem_db):
table.count_rows("doc_id = 1") # 1
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=0, num_deleted_rows=1)
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
# --8<-- [end:insert_if_not_exists]
assert table.count_rows("doc_id = 1") == 1
assert res.version == 2
@@ -185,7 +185,7 @@ async def test_replace_range_async(mem_db_async):
await table.count_rows("doc_id = 1") # 1
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=0, num_deleted_rows=1)
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
# --8<-- [end:insert_if_not_exists]
assert await table.count_rows("doc_id = 1") == 1
assert res.version == 2

View File

@@ -0,0 +1,196 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for the MemWAL LSM ``merge_insert`` dispatch."""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
from lancedb._lancedb import LsmWriteSpec
SCHEMA = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("value", pa.int64(), nullable=False),
]
)
REGION_SCHEMA = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("region", pa.utf8(), nullable=False),
]
)
def _reader(ids):
batch = pa.RecordBatch.from_arrays(
[
pa.array(ids, type=pa.int64()),
pa.array(list(range(len(ids))), type=pa.int64()),
],
schema=SCHEMA,
)
return pa.RecordBatchReader.from_batches(SCHEMA, [batch])
def _region_reader(rows):
batch = pa.RecordBatch.from_arrays(
[
pa.array([row[0] for row in rows], type=pa.int64()),
pa.array([row[1] for row in rows], type=pa.utf8()),
],
schema=REGION_SCHEMA,
)
return pa.RecordBatchReader.from_batches(REGION_SCHEMA, [batch])
def _bucket_table(tmp_path):
"""A table with ``id`` as the primary key and a single-bucket LSM spec."""
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
# num_buckets = 1: every row routes to the single bucket.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
return table
def test_lsm_merge_insert_bucket(tmp_path):
table = _bucket_table(tmp_path)
# Empty `on` defaults to the primary key.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([3, 4, 5]))
)
# LSM path: rows go to the MemWAL, so only num_rows is populated.
assert result.num_rows == 3
assert result.version == 0
assert result.num_inserted_rows == 0
assert result.num_updated_rows == 0
def test_lsm_merge_insert_unsharded(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
result = (
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([10, 11, 12, 13]))
)
assert result.num_rows == 4
def test_lsm_merge_insert_identity(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _region_reader([(1, "us"), (2, "us")]))
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.identity("region"))
# All rows share one identity value, so they route to one shard.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_region_reader([(3, "us"), (4, "us")]))
)
assert result.num_rows == 2
def test_lsm_merge_insert_use_lsm_write_false(tmp_path):
table = _bucket_table(tmp_path) # rows id = 1, 2, 3
# use_lsm_write(False) opts out: the standard path runs and commits.
result = (
table.merge_insert("id")
.when_not_matched_insert_all()
.use_lsm_write(False)
.execute(_reader([3, 4, 5]))
)
assert result.num_inserted_rows == 2
assert table.count_rows() == 5
def test_lsm_merge_insert_validate_single_shard_off(tmp_path):
table = _bucket_table(tmp_path)
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.validate_single_shard(False)
.execute(_reader([6, 7, 8]))
)
assert result.num_rows == 3
def test_lsm_merge_insert_use_lsm_write_true_requires_spec(tmp_path):
# A table with a primary key but no LSM write spec installed.
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
with pytest.raises(Exception, match="use_lsm_write"):
(
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.use_lsm_write(True)
.execute(_reader([4]))
)
def test_lsm_merge_insert_rejects_on_not_primary_key(tmp_path):
table = _bucket_table(tmp_path)
with pytest.raises(Exception, match="primary key"):
(
table.merge_insert("value")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([1]))
)
def test_lsm_merge_insert_rejects_non_upsert(tmp_path):
table = _bucket_table(tmp_path)
# Insert-only (no when_matched_update_all) is not the upsert shape.
with pytest.raises(Exception, match="upsert"):
table.merge_insert([]).when_not_matched_insert_all().execute(_reader([4]))
def test_lsm_close_writers(tmp_path):
table = _bucket_table(tmp_path)
(
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([7, 8]))
)
table.close_lsm_writers()
# The writer reopens lazily on the next merge_insert.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([9]))
)
assert result.num_rows == 1
@pytest.mark.asyncio
async def test_async_lsm_merge_insert(tmp_path):
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=0)
)
table = await db.create_table("t", _reader([1, 2, 3]))
await table.set_unenforced_primary_key("id")
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
builder = (
table.merge_insert([]).when_matched_update_all().when_not_matched_insert_all()
)
result = await builder.execute(_reader([3, 4, 5]))
assert result.num_rows == 3
await table.close_lsm_writers()

View File

@@ -143,18 +143,20 @@ pub struct MergeResult {
pub num_inserted_rows: u64,
pub num_deleted_rows: u64,
pub num_attempts: u32,
pub num_rows: u64,
}
#[pymethods]
impl MergeResult {
pub fn __repr__(&self) -> String {
format!(
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={})",
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={}, num_rows={})",
self.version,
self.num_updated_rows,
self.num_inserted_rows,
self.num_deleted_rows,
self.num_attempts
self.num_attempts,
self.num_rows
)
}
}
@@ -167,6 +169,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_inserted_rows: result.num_inserted_rows,
num_deleted_rows: result.num_deleted_rows,
num_attempts: result.num_attempts,
num_rows: result.num_rows,
}
}
}
@@ -194,6 +197,12 @@ impl LsmWriteSpec {
}
/// Identity sharding — shard by the raw value of `column`.
///
/// `column` must be a deterministic function of the unenforced primary
/// key: every row with a given primary key must always produce the same
/// `column` value, or upserts of that key can land in different shards
/// and a stale version can win. Typically `column` is the primary key
/// itself or a stable attribute of it.
#[staticmethod]
pub fn identity(column: String) -> Self {
Self {
@@ -933,6 +942,12 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(use_lsm_write) = parameters.use_lsm_write {
builder.use_lsm_write(use_lsm_write);
}
if let Some(validate_single_shard) = parameters.validate_single_shard {
builder.validate_single_shard(validate_single_shard);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -971,6 +986,13 @@ impl Table {
})
}
pub fn close_lsm_writers(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.close_lsm_writers().await.infer_error()
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
@@ -1124,6 +1146,8 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
use_lsm_write: Option<bool>,
validate_single_shard: Option<bool>,
}
#[pyclass]

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.94.0"
channel = "1.95.0"

View File

@@ -75,7 +75,7 @@ reqwest = { version = "0.12.0", default-features = false, features = [
"stream",
], optional = true }
http = { version = "1", optional = true } # Matching what is in reqwest
uuid = { version = "1.7.0", features = ["v4"] }
uuid = { version = "1.7.0", features = ["v4", "v5"] }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
hf-hub = { version = "0.4.1", optional = true, default-features = false, features = [

View File

@@ -464,11 +464,9 @@ mod tests {
let mut iter = ids.into_iter().map(|o| o.unwrap());
while let Some(first) = iter.next() {
let rows_left_in_clump = if first == 4470 { 19 } else { 29 };
let mut expected_next = first + 1;
for _ in 0..rows_left_in_clump {
for expected_next in (first + 1)..=(first + rows_left_in_clump) {
let next = iter.next().unwrap();
assert_eq!(next, expected_next);
expected_next += 1;
}
}
}

View File

@@ -908,6 +908,15 @@ mod tests {
use serial_test::serial;
use std::time::Duration;
// Serializes the env-var-mutating tests below: cargo test runs tests in
// parallel, but several of these tests read and write the same process-
// global env vars (`LANCEDB_USER_ID*`), so they would race without this.
static ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn lock_env() -> std::sync::MutexGuard<'static, ()> {
ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner())
}
#[test]
fn test_timeout_config_default() {
let config = TimeoutConfig::default();
@@ -1166,6 +1175,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_none() {
let _guard = lock_env();
let config = ClientConfig::default();
// Clear env vars that might be set from other tests
// SAFETY: This is only called in tests
@@ -1179,6 +1189,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
@@ -1194,6 +1205,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env_key() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::remove_var("LANCEDB_USER_ID");
@@ -1215,6 +1227,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_direct_takes_precedence() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
@@ -1233,6 +1246,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_empty_env_ignored() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "");

View File

@@ -1805,6 +1805,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
num_inserted_rows: 0,
num_updated_rows: 0,
num_attempts: 0,
num_rows: 0,
});
}

View File

@@ -366,6 +366,14 @@ impl LsmWriteSpec {
/// Construct an identity-sharding spec (shard by the raw value of
/// `column`) with no maintained indexes.
///
/// `column` must be a deterministic function of the unenforced primary
/// key: every row with a given primary key must always produce the same
/// `column` value. MemWAL dedups upserts by primary key but tracks
/// generations per shard, so if the same key is written with two
/// different `column` values its versions land in different shards and a
/// stale value can win. Typically `column` is the primary key itself, or
/// a stable attribute of it (e.g. a tenant id).
pub fn identity(column: impl Into<String>) -> Self {
Self::Identity {
column: column.into(),
@@ -580,6 +588,13 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
message: "unset_lsm_write_spec is not supported on this table type".into(),
})
}
/// Drain and close any cached MemWAL shard writers for this table.
///
/// The default implementation is a no-op; table types that maintain
/// MemWAL shard writers override it.
async fn close_lsm_writers(&self) -> Result<()> {
Ok(())
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -1386,6 +1401,16 @@ impl Table {
self.inner.unset_lsm_write_spec().await
}
/// Drain and close any cached MemWAL shard writers held for this table.
///
/// When an [`LsmWriteSpec`] is installed, `merge_insert` opens MemWAL shard
/// writers and caches them for reuse across calls. This closes them,
/// flushing pending data; writers reopen lazily on the next `merge_insert`.
/// It is a no-op when no writers are cached.
pub async fn close_lsm_writers(&self) -> Result<()> {
self.inner.close_lsm_writers().await
}
/// Retrieve the version of the table
///
/// LanceDb supports versioning. Every operation that modifies the table increases
@@ -2829,6 +2854,10 @@ impl BaseTable for NativeTable {
merge::lsm::unset_lsm_write_spec(self).await
}
async fn close_lsm_writers(&self) -> Result<()> {
merge::lsm::close_lsm_writers(self).await
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
delete::execute_delete(self, predicate).await
@@ -3015,11 +3044,12 @@ impl BaseTable for NativeTable {
let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
let min = sorted_sizes.first().copied().unwrap_or(0);
let max = sorted_sizes.last().copied().unwrap_or(0);
let mean = if num_fragments == 0 {
0
} else {
sorted_sizes.iter().copied().sum::<usize>() / num_fragments
};
let mean = sorted_sizes
.iter()
.copied()
.sum::<usize>()
.checked_div(num_fragments)
.unwrap_or(0);
let frag_stats = FragmentStatistics {
num_fragments,

View File

@@ -870,8 +870,10 @@ mod tests {
.await
.unwrap();
// Should return empty or nearly empty result
assert!(result[0].num_rows() <= 1);
assert_eq!(
result.iter().map(|batch| batch.num_rows()).sum::<usize>(),
0
);
}
#[tokio::test]

View File

@@ -8,6 +8,7 @@ use std::{
use lance::{Dataset, dataset::refs};
use crate::table::merge::lsm::ShardWriterCache;
use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
/// A wrapper around a [Dataset] that provides consistency checks.
@@ -18,6 +19,10 @@ use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
pub struct DatasetConsistencyWrapper {
state: Arc<Mutex<DatasetState>>,
consistency: ConsistencyMode,
/// The single MemWAL `ShardWriter` for this dataset, co-located so it is
/// cached for the session and shares the dataset's lifecycle. A dataset
/// writes to one shard at a time. Shared by `Arc` across clones.
shard_writer: Arc<ShardWriterCache>,
}
/// The current dataset and whether it is pinned to a specific version.
@@ -67,9 +72,15 @@ impl DatasetConsistencyWrapper {
pinned_version: None,
})),
consistency,
shard_writer: Arc::new(ShardWriterCache::default()),
}
}
/// The MemWAL `ShardWriter` cache co-located with this dataset.
pub(crate) fn shard_writer(&self) -> &Arc<ShardWriterCache> {
&self.shard_writer
}
/// Get the current dataset.
///
/// Behavior depends on the consistency mode:

View File

@@ -41,6 +41,16 @@ pub struct MergeResult {
/// A value of 1 means the operation succeeded on the first try.
#[serde(default)]
pub num_attempts: u32,
/// Total number of rows written.
///
/// On the standard `merge_insert` path this equals
/// `num_inserted_rows + num_updated_rows`. On the MemWAL LSM write path the
/// insert/update breakdown is not known until compaction; in that mode
/// `num_inserted_rows`, `num_updated_rows`, `num_deleted_rows`, `version`
/// and `num_attempts` are all `0` and this field holds the total number of
/// rows written through the shard writer.
#[serde(default)]
pub num_rows: u64,
}
/// A builder used to create and run a merge insert operation
@@ -57,6 +67,8 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) use_lsm_write: Option<bool>,
pub(crate) validate_single_shard: bool,
}
impl MergeInsertBuilder {
@@ -71,6 +83,8 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
use_lsm_write: None,
validate_single_shard: true,
}
}
@@ -150,6 +164,34 @@ impl MergeInsertBuilder {
self
}
/// Controls whether `merge_insert` uses the MemWAL LSM write path.
///
/// By default (unset), a `merge_insert` on a table with an
/// [`LsmWriteSpec`](super::LsmWriteSpec) installed is routed through
/// Lance's MemWAL shard writer, and a table without one uses the standard
/// path. Calling this with `false` forces the standard path even when a
/// spec is set. Calling it with `true` requires a spec — `merge_insert`
/// errors if none is installed.
pub fn use_lsm_write(&mut self, use_lsm_write: bool) -> &mut Self {
self.use_lsm_write = Some(use_lsm_write);
self
}
/// Controls how an LSM `merge_insert` checks that its input targets a
/// single shard.
///
/// When a table has an LSM write spec, every row in a `merge_insert` call
/// must route to the same shard. When `true` (the default), every row is
/// inspected to verify this. When `false`, only the first row is inspected
/// and the shard it routes to is used for the whole input — a faster path
/// for callers that have already pre-sharded their input.
///
/// Has no effect on tables without an LSM write spec.
pub fn validate_single_shard(&mut self, validate_single_shard: bool) -> &mut Self {
self.validate_single_shard = validate_single_shard;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
@@ -167,6 +209,23 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
match lsm::lsm_dispatch_decision(table, &params).await? {
lsm::LsmDispatch::Lsm(plan) => {
let future =
lsm::execute_lsm_merge_insert(table, plan, params.validate_single_shard, new_data);
return match params.timeout {
Some(timeout) => match tokio::time::timeout(timeout, future).await {
Ok(result) => result,
Err(_) => Err(Error::Runtime {
message: "merge insert timed out".to_string(),
}),
},
None => future.await,
};
}
lsm::LsmDispatch::Standard => {}
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -219,6 +278,7 @@ pub(crate) async fn execute_merge_insert(
num_inserted_rows: stats.num_inserted_rows,
num_deleted_rows: stats.num_deleted_rows,
num_attempts: stats.num_attempts,
num_rows: stats.num_inserted_rows + stats.num_updated_rows,
})
}
@@ -327,3 +387,366 @@ mod tests {
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
}
#[cfg(test)]
mod lsm_tests {
use std::sync::Arc;
use arrow_array::{
Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use tempfile::{TempDir, tempdir};
use crate::connect;
use crate::error::Error;
use crate::table::{LsmWriteSpec, Table};
/// A reader of `[id: Int64, value: Int64]` rows; `value` is `0..n`.
fn id_value_reader(ids: Vec<i64>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));
let n = ids.len() as i64;
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(Int64Array::from_iter_values(0..n)),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
/// A reader of `[id: Int64, region: Utf8]` rows.
fn id_region_reader(rows: Vec<(i64, &str)>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
]));
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(regions)),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
/// A multi-batch reader of `[id: Int64, region: Utf8]` rows.
fn id_region_multi_reader(batches: Vec<Vec<(i64, &str)>>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
]));
let records: Vec<_> = batches
.into_iter()
.map(|rows| {
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
Ok(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(regions)),
],
)
.unwrap())
})
.collect();
Box::new(RecordBatchIterator::new(records, schema))
}
/// Create an `[id, value]` table with `id` as the unenforced primary key.
async fn id_value_table(dir: &TempDir) -> Table {
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_value_reader(vec![1, 2, 3]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
}
#[tokio::test]
async fn lsm_merge_insert_bucket() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
// num_buckets = 1: every row routes to the single bucket.
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// Empty `on` defaults to the primary key.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_value_reader(vec![3, 4, 5]))
.await
.unwrap();
// LSM path: rows go to the MemWAL, the breakdown is unknown until
// compaction, so only `num_rows` is populated.
assert_eq!(result.num_rows, 3);
assert_eq!(result.version, 0);
assert_eq!(result.num_inserted_rows, 0);
assert_eq!(result.num_updated_rows, 0);
}
#[tokio::test]
async fn lsm_merge_insert_unsharded() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::unsharded())
.await
.unwrap();
let mut builder = table.merge_insert(&["id"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_value_reader(vec![10, 11, 12, 13]))
.await
.unwrap();
assert_eq!(result.num_rows, 4);
}
#[tokio::test]
async fn lsm_merge_insert_identity() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us"), (2, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// All rows share one identity value, so they route to one shard.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_region_reader(vec![(3, "us"), (4, "us")]))
.await
.unwrap();
assert_eq!(result.num_rows, 2);
}
#[tokio::test]
async fn lsm_merge_insert_use_lsm_write_false_falls_back() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// use_lsm_write(false) opts out: the standard path runs and commits.
let mut builder = table.merge_insert(&["id"]);
builder.when_not_matched_insert_all().use_lsm_write(false);
let result = builder
.execute(id_value_reader(vec![3, 4, 5]))
.await
.unwrap();
assert_eq!(result.num_inserted_rows, 2);
assert_eq!(table.count_rows(None).await.unwrap(), 5);
}
#[tokio::test]
async fn lsm_merge_insert_rejects_on_not_primary_key() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
let mut builder = table.merge_insert(&["value"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder.execute(id_value_reader(vec![1])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_rejects_non_upsert() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// Insert-only (no when_matched_update_all) is not the upsert shape.
let mut builder = table.merge_insert(&[]);
builder.when_not_matched_insert_all();
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_close_writers_then_reopen() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder.execute(id_value_reader(vec![7, 8])).await.unwrap();
table.close_lsm_writers().await.unwrap();
// The writer reopens lazily on the next merge_insert.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder.execute(id_value_reader(vec![9])).await.unwrap();
assert_eq!(result.num_rows, 1);
}
#[tokio::test]
async fn lsm_merge_insert_multi_batch() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// Multiple batches that all route to one shard are written together.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_region_multi_reader(vec![
vec![(2, "us"), (3, "us")],
vec![(4, "us")],
]))
.await
.unwrap();
assert_eq!(result.num_rows, 3);
// Batches that route to different shards are rejected; the validation
// runs before any write, so no partial write is left behind.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder
.execute(id_region_multi_reader(vec![
vec![(5, "us")],
vec![(6, "eu")],
]))
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_use_lsm_write_true_requires_spec() {
let dir = tempdir().unwrap();
// id_value_table sets a primary key but no LSM write spec.
let table = id_value_table(&dir).await;
let mut builder = table.merge_insert(&["id"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all()
.use_lsm_write(true);
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_rejects_second_shard() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// The first merge_insert opens the single writer for shard "us".
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder
.execute(id_region_reader(vec![(2, "us")]))
.await
.unwrap();
// A merge_insert routing to a different shard is rejected.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder
.execute(id_region_reader(vec![(3, "eu")]))
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
// After closing the writer, a different shard can be written.
table.close_lsm_writers().await.unwrap();
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder
.execute(id_region_reader(vec![(4, "eu")]))
.await
.unwrap();
}
}

File diff suppressed because it is too large Load Diff