mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-22 14:30:41 +00:00
## Summary Split out from #3354 Adds `LsmWriteSpec` and `Table::set_lsm_write_spec` / `unset_lsm_write_spec` to install and clear the spec that selects Lance's MemWAL LSM-style write path for `merge_insert`. `LsmWriteSpec` offers three sharding strategies, all built on Lance's `InitializeMemWalBuilder`: - `LsmWriteSpec::bucket(column, num_buckets)` — hash-bucket sharding by the single-column unenforced primary key. - `LsmWriteSpec::identity(column)` — identity sharding by the raw value of a scalar column. - `LsmWriteSpec::unsharded()` — a single MemWAL shard. Each can be refined with `with_maintained_indexes(...)` (indexes the MemWAL keeps up to date as rows are appended) and `with_writer_config_defaults(...)` (default `ShardWriter` configuration recorded in the MemWAL index, so every writer starts from the same defaults). All variants require the table to have an unenforced primary key. - `set_lsm_write_spec` installs the spec by initializing the MemWAL index; `unset_lsm_write_spec` removes it (dropping the MemWAL index), reverting to the standard `merge_insert` path. `unset` is idempotent. - Bindings: Python (`LsmWriteSpec.bucket` / `.identity` / `.unsharded`, `set_lsm_write_spec` / `unset_lsm_write_spec`) and TypeScript (`setLsmWriteSpec` with `specType` `"bucket"` / `"identity"` / `"unsharded"`). `RemoteTable` returns `NotSupported`. The actual `merge_insert` LSM dispatch and `ShardWriter` write path are a follow-up — this PR only installs and clears the spec.
150 lines
4.7 KiB
Python
150 lines
4.7 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
"""Tests for installing and clearing an LsmWriteSpec via
|
|
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
|
|
"""
|
|
|
|
from datetime import timedelta
|
|
|
|
import lancedb
|
|
import pyarrow as pa
|
|
import pytest
|
|
from lancedb._lancedb import LsmWriteSpec
|
|
|
|
SCHEMA = pa.schema(
|
|
[
|
|
pa.field("id", pa.utf8(), nullable=False),
|
|
pa.field("v", pa.int32(), nullable=False),
|
|
]
|
|
)
|
|
|
|
|
|
def _batch(ids, vs):
|
|
return pa.RecordBatch.from_arrays(
|
|
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
|
|
schema=SCHEMA,
|
|
)
|
|
|
|
|
|
def _reader(ids, vs):
|
|
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
|
|
|
|
|
|
def _make_table(tmp_path):
|
|
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
|
table = db.create_table("t", _reader(["seed"], [0]))
|
|
return db, table
|
|
|
|
|
|
def test_set_lsm_write_spec_validates(tmp_path):
|
|
_db, table = _make_table(tmp_path)
|
|
|
|
# No PK set yet.
|
|
with pytest.raises(Exception, match="primary key"):
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
|
|
|
table.set_unenforced_primary_key("id")
|
|
|
|
# Column mismatch.
|
|
with pytest.raises(Exception, match="match"):
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
|
|
|
|
# Out-of-range num_buckets.
|
|
with pytest.raises(Exception, match="num_buckets"):
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
|
|
with pytest.raises(Exception, match="num_buckets"):
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
|
|
|
|
# Happy path then mutation rejected.
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
|
with pytest.raises(Exception, match="mutation"):
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
|
|
|
|
|
def test_unset_lsm_write_spec(tmp_path):
|
|
_db, table = _make_table(tmp_path)
|
|
|
|
# unset errors when no spec is set.
|
|
with pytest.raises(Exception, match="no LSM write spec"):
|
|
table.unset_lsm_write_spec()
|
|
|
|
# Install a spec, then remove it; afterwards a fresh spec can be set.
|
|
table.set_unenforced_primary_key("id")
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
|
table.unset_lsm_write_spec()
|
|
# A second unset errors — there is no spec left to remove.
|
|
with pytest.raises(Exception, match="no LSM write spec"):
|
|
table.unset_lsm_write_spec()
|
|
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
|
|
|
|
|
def test_set_unsharded_spec(tmp_path):
|
|
_db, table = _make_table(tmp_path)
|
|
# Lance MemWAL still requires a primary key on the dataset; Unsharded
|
|
# just skips per-row hashing.
|
|
table.set_unenforced_primary_key("id")
|
|
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
|
|
table.unset_lsm_write_spec()
|
|
|
|
|
|
def test_lsm_write_spec_repr():
|
|
s = LsmWriteSpec.bucket("id", 4)
|
|
assert s.spec_type == "bucket"
|
|
assert s.column == "id"
|
|
assert s.num_buckets == 4
|
|
assert s.maintained_indexes == []
|
|
assert "bucket" in repr(s)
|
|
assert "id" in repr(s)
|
|
assert "4" in repr(s)
|
|
|
|
u = LsmWriteSpec.unsharded()
|
|
assert u.spec_type == "unsharded"
|
|
assert u.column is None
|
|
assert u.num_buckets is None
|
|
assert "unsharded" in repr(u)
|
|
|
|
|
|
def test_lsm_write_spec_with_maintained_indexes():
|
|
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
|
|
assert s.maintained_indexes == ["idx_a", "idx_b"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_async_set_unset_lsm_write_spec(tmp_path):
|
|
db = await lancedb.connect_async(
|
|
tmp_path, read_consistency_interval=timedelta(seconds=0)
|
|
)
|
|
table = await db.create_table(
|
|
"t",
|
|
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
|
|
)
|
|
|
|
await table.set_unenforced_primary_key("id")
|
|
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
|
await table.unset_lsm_write_spec()
|
|
# A second unset errors.
|
|
with pytest.raises(Exception, match="no LSM write spec"):
|
|
await table.unset_lsm_write_spec()
|
|
|
|
|
|
def test_set_identity_spec(tmp_path):
|
|
_db, table = _make_table(tmp_path)
|
|
# Identity sharding still requires an unenforced primary key on the
|
|
# table; it shards by the raw value of the given column.
|
|
table.set_unenforced_primary_key("id")
|
|
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
|
|
table.unset_lsm_write_spec()
|
|
|
|
|
|
def test_lsm_write_spec_identity_and_writer_config_defaults():
|
|
s = LsmWriteSpec.identity("v")
|
|
assert s.spec_type == "identity"
|
|
assert s.column == "v"
|
|
assert s.num_buckets is None
|
|
assert "identity" in repr(s)
|
|
|
|
s = s.with_writer_config_defaults({"durable_write": "false"})
|
|
assert s.writer_config_defaults == {"durable_write": "false"}
|
|
assert "durable_write" in repr(s)
|