diff --git a/docs/src/js/classes/Branches.md b/docs/src/js/classes/Branches.md index ea4d72f07..4629620ac 100644 --- a/docs/src/js/classes/Branches.md +++ b/docs/src/js/classes/Branches.md @@ -16,15 +16,21 @@ to the branch; writes on it do not affect `main`. ### checkout() ```ts -checkout(name): Promise +checkout(name, version?): Promise
``` Check out an existing branch and return a handle scoped to it. +With `version` set, the returned handle is pinned to that version of the +branch (a read-only, detached view); otherwise it tracks the branch's +latest and stays writable. + #### Parameters * **name**: `string` +* **version?**: `number` + #### Returns `Promise`<[`Table`](Table.md)> diff --git a/docs/src/js/interfaces/OpenTableOptions.md b/docs/src/js/interfaces/OpenTableOptions.md index 3c7f1e817..7e361b653 100644 --- a/docs/src/js/interfaces/OpenTableOptions.md +++ b/docs/src/js/interfaces/OpenTableOptions.md @@ -55,3 +55,17 @@ Options already set on the connection will be inherited by the table, but can be overridden here. The available options are described at https://docs.lancedb.com/storage/ + +*** + +### version? + +```ts +optional version: number; +``` + +Open the table pinned to this version, producing a read-only view. + +Composes with [OpenTableOptions.branch](OpenTableOptions.md#branch): when both are set, opens +that branch at the version; otherwise opens `main` at the version. Call +`checkoutLatest` to return to a writable state. diff --git a/nodejs/__test__/remote.test.ts b/nodejs/__test__/remote.test.ts index cbddf9673..a2a67014d 100644 --- a/nodejs/__test__/remote.test.ts +++ b/nodejs/__test__/remote.test.ts @@ -191,6 +191,34 @@ describe("remote connection", () => { ); }); + it("allows version on remote but rejects a non-main branch", async () => { + await withMockDatabase( + (_req, res) => { + // describe (table open + version validation) always succeeds + const body = JSON.stringify({ + name: "t", + version: 2, + schema: { fields: [] }, + }); + res.writeHead(200, { "Content-Type": "application/json" }).end(body); + }, + async (db) => { + // version-only (and "main" + version) is allowed: remote supports + // version time-travel even though it has no branches + await db.openTable("t", undefined, { version: 2 }); + await db.openTable("t", undefined, { branch: "main", version: 2 }); + + // a non-main branch is rejected, with or without a version + await expect( + db.openTable("t", undefined, { branch: "exp" }), + ).rejects.toThrow(/branching/); + await expect( + db.openTable("t", undefined, { branch: "exp", version: 2 }), + ).rejects.toThrow(/branching/); + }, + ); + }); + describe("TlsConfig", () => { it("should create TlsConfig with all fields", () => { const tlsConfig: TlsConfig = { diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 6364382c5..6fe5d9240 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -133,6 +133,78 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( expect(await (await db.openTable("some_table")).countRows()).toBe(1); }); + it("should open a branch at a version isolated from main and HEAD", async () => { + const db = await connect(tmpDir.name); + // main: a single fork-point row + const t = await db.createTable("bv_table", [{ id: 0 }]); + const mainV1 = await t.version(); + + // fork "exp", then advance exp AND main independently past the fork so + // they diverge while sharing version numbers + const exp = await (await t.branches()).create("exp"); + await exp.add([{ id: 1 }]); // exp: {0, 1} + const expV2 = await exp.version(); + await exp.add([{ id: 2 }]); // exp HEAD: {0, 1, 2} + await t.add([{ id: 100 }, { id: 101 }, { id: 102 }]); // main HEAD: {0,100,101,102} + expect(await t.version()).toBe(expV2); + + // open exp at the shared version: the data must be exp's, not main's. + // count alone cannot prove this (main@v2 also exists), so assert + // provenance by content. + const pinned = await db.openTable("bv_table", undefined, { + branch: "exp", + version: expV2, + }); + expect(await pinned.countRows()).toBe(2); // not exp HEAD (3), not main@v2 (4) + expect(await pinned.countRows("id = 1")).toBe(1); // exp's post-fork row + expect(await pinned.countRows("id = 100")).toBe(0); // main's rows invisible + + // the same coordinate is reachable directly via branches().checkout(name, version) + const pinnedDirect = await (await t.branches()).checkout("exp", expV2); + expect(await pinnedDirect.countRows()).toBe(2); + + // the HEADs are unaffected + expect( + await ( + await db.openTable("bv_table", undefined, { branch: "exp" }) + ).countRows(), + ).toBe(3); + expect(await (await db.openTable("bv_table")).countRows()).toBe(4); + + // version-only (no branch) time-travels main itself: its fork-point + // version holds only main's first row, and the shared version number + // resolves to main's data, not the branch's ("opens main at the version") + const oldMain = await db.openTable("bv_table", undefined, { + version: mainV1, + }); + expect(await oldMain.countRows()).toBe(1); + const sharedOnMain = await db.openTable("bv_table", undefined, { + version: expV2, + }); + expect(await sharedOnMain.countRows()).toBe(4); // main@v2, not exp@v2 (2) + + // detached head: writing to a pinned version is rejected + await expect(pinned.add([{ id: 9 }])).rejects.toThrow( + /cannot be modified/, + ); + + // a nonexistent version is rejected -- on main, and on a branch (a + // distinct resolution path, on the branch's manifests) + await expect( + db.openTable("bv_table", undefined, { version: 9999 }), + ).rejects.toThrow(); + await expect( + db.openTable("bv_table", undefined, { branch: "exp", version: 9999 }), + ).rejects.toThrow(); + + // checkoutLatest re-attaches the pinned handle to the BRANCH's HEAD + // (writable again), not main's HEAD (4), and not staying pinned (2) + await pinned.checkoutLatest(); + expect(await pinned.countRows()).toBe(3); // exp HEAD + await pinned.add([{ id: 3 }]); + expect(await pinned.countRows()).toBe(4); // writable again + }); + it("rejects invalid branch inputs", async () => { const branches = await table.branches(); await expect(branches.create("")).rejects.toThrow("non-empty"); diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index 5ce155789..ef03e276b 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -90,6 +90,14 @@ export interface OpenTableOptions { * Reads and writes on the returned table operate in the branch's context. */ branch?: string; + /** + * Open the table pinned to this version, producing a read-only view. + * + * Composes with {@link OpenTableOptions.branch}: when both are set, opens + * that branch at the version; otherwise opens `main` at the version. Call + * `checkoutLatest` to return to a writable state. + */ + version?: number; /** * Configuration for object storage. * @@ -489,9 +497,18 @@ export class LocalConnection extends Connection { options?.indexCacheSize, ); - const table = new LocalTable(innerTable); - if (options?.branch != null) { - return (await table.branches()).checkout(options.branch); + let table: Table = new LocalTable(innerTable); + // "main" is the default branch, so treat it as no branch. On a real branch, + // scope and pin in one step (yielding "version V of branch B"); otherwise + // pin the version, if any, against main. + const branch = + options?.branch != null && options.branch !== "main" + ? options.branch + : undefined; + if (branch != null) { + table = await (await table.branches()).checkout(branch, options?.version); + } else if (options?.version != null) { + await table.checkout(options.version); } return table; } diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index fce768f74..497d809ea 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -1290,9 +1290,15 @@ export class Branches { return new LocalTable(await this.#inner.create(name, fromRef, fromVersion)); } - /** Check out an existing branch and return a handle scoped to it. */ - async checkout(name: string): Promise
{ - return new LocalTable(await this.#inner.checkout(name)); + /** + * Check out an existing branch and return a handle scoped to it. + * + * With `version` set, the returned handle is pinned to that version of the + * branch (a read-only, detached view); otherwise it tracks the branch's + * latest and stays writable. + */ + async checkout(name: string, version?: number): Promise
{ + return new LocalTable(await this.#inner.checkout(name, version)); } /** Delete a branch. */ diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 2d674c67c..7613a18c8 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -1194,8 +1194,18 @@ impl Branches { } #[napi] - pub async fn checkout(&self, name: String) -> napi::Result
{ - let table = self.inner.checkout_branch(&name).await.default_error()?; + pub async fn checkout(&self, name: String, version: Option) -> napi::Result
{ + let version = version + .map(|v| { + u64::try_from(v) + .map_err(|_| napi::Error::from_reason("version must be a non-negative integer")) + }) + .transpose()?; + let table = self + .inner + .checkout_branch(&name, version) + .await + .default_error()?; Ok(Table::new(table)) } diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index db9afda35..7f0aaf9be 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -249,7 +249,7 @@ class Branches: from_ref: Optional[str] = None, from_version: Optional[int] = None, ) -> Table: ... - async def checkout(self, name: str) -> Table: ... + async def checkout(self, name: str, version: Optional[int] = None) -> Table: ... async def delete(self, name: str) -> None: ... class IndexConfig: diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index d7ec073ed..5ec697d4e 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -417,6 +417,7 @@ class DBConnection(EnforceOverrides): storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, branch: Optional[str] = None, + version: Optional[int] = None, ) -> Table: """Open a Lance Table in the database. @@ -448,6 +449,11 @@ class DBConnection(EnforceOverrides): branch: str, optional If provided, open a handle scoped to this branch instead of the default branch. Reads and writes operate in the branch's context. + version: int, optional + If provided, open the table pinned to this version, producing a + read-only handle. Composes with ``branch``: when both are given, + opens that branch at the version; otherwise opens ``main`` at the + version. Call ``checkout_latest`` to return to a writable state. Returns ------- @@ -963,6 +969,7 @@ class LanceDBConnection(DBConnection): storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, branch: Optional[str] = None, + version: Optional[int] = None, ) -> LanceTable: """Open a table in the database. @@ -976,6 +983,11 @@ class LanceDBConnection(DBConnection): branch: str, optional If provided, open a handle scoped to this branch instead of the default branch. Reads and writes operate in the branch's context. + version: int, optional + If provided, open the table pinned to this version, producing a + read-only handle. Composes with ``branch``: when both are given, + opens that branch at the version; otherwise opens ``main`` at the + version. Call ``checkout_latest`` to return to a writable state. Returns ------- @@ -1011,7 +1023,9 @@ class LanceDBConnection(DBConnection): ) if branch is not None: - return tbl.branches.checkout(branch) + tbl = tbl.branches.checkout(branch, version) + elif version is not None: + tbl.checkout(version) return tbl def clone_table( @@ -1654,6 +1668,7 @@ class AsyncConnection(object): namespace_client: Optional[Any] = None, managed_versioning: Optional[bool] = None, branch: Optional[str] = None, + version: Optional[int] = None, ) -> AsyncTable: """Open a Lance Table in the database. @@ -1692,6 +1707,11 @@ class AsyncConnection(object): branch: str, optional If provided, open a handle scoped to this branch instead of the default branch. Reads and writes operate in the branch's context. + version: int, optional + If provided, open the table pinned to this version, producing a + read-only handle. Composes with ``branch``: when both are given, + opens that branch at the version; otherwise opens ``main`` at the + version. Call ``checkout_latest`` to return to a writable state. Returns ------- @@ -1709,8 +1729,12 @@ class AsyncConnection(object): managed_versioning=managed_versioning, ) tbl = AsyncTable(table) - if branch is not None: - return await tbl.branches.checkout(branch) + # "main" is the default branch, so treat it as no branch: remote rejects + # every branch checkout (even "main"), and the version still applies. + if branch is not None and branch != "main": + tbl = await tbl.branches.checkout(branch, version) + elif version is not None: + await tbl.checkout(version) return tbl async def clone_table( diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index ba6749787..ec34c3c4e 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -545,6 +545,7 @@ class LanceNamespaceDBConnection(DBConnection): storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, branch: Optional[str] = None, + version: Optional[int] = None, ) -> Table: if namespace_path is None: namespace_path = [] @@ -572,7 +573,9 @@ class LanceNamespaceDBConnection(DBConnection): _async=async_table, ) if branch is not None: - return tbl.branches.checkout(branch) + tbl = tbl.branches.checkout(branch, version) + elif version is not None: + tbl.checkout(version) return tbl @override @@ -979,6 +982,7 @@ class AsyncLanceNamespaceDBConnection: storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, branch: Optional[str] = None, + version: Optional[int] = None, ) -> AsyncTable: """Open an existing table from the namespace.""" if namespace_path is None: @@ -995,8 +999,12 @@ class AsyncLanceNamespaceDBConnection: table_id = namespace_path + [name] raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}") raise - if branch is not None: - return await tbl.branches.checkout(branch) + # "main" is the default branch, so treat it as no branch (mirrors the + # sync remote path); the version still applies. + if branch is not None and branch != "main": + tbl = await tbl.branches.checkout(branch, version) + elif version is not None: + await tbl.checkout(version) return tbl async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None): diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index 02fb5942f..4e5655fae 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -384,6 +384,7 @@ class RemoteDBConnection(DBConnection): storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, branch: Optional[str] = None, + version: Optional[int] = None, ) -> Table: """Open a Lance Table in the database. @@ -394,6 +395,14 @@ class RemoteDBConnection(DBConnection): namespace_path: List[str], optional The namespace to open the table from. None or empty list represents root namespace. + branch: str, optional + Branching is not yet supported on remote tables, so only the + default branch is accepted (``None`` or ``"main"``); any other + value raises ``NotImplementedError``. + version: int, optional + If provided, open the table pinned to this version, producing a + read-only handle. Call ``checkout_latest`` to return to a writable + state. Returns ------- @@ -401,7 +410,9 @@ class RemoteDBConnection(DBConnection): """ from .table import RemoteTable - if branch is not None: + # Remote supports version time-travel but not branches: reject a non-main + # branch, but allow a version-only open (or "main"). + if branch is not None and branch != "main": raise NotImplementedError("branching is not yet supported on remote tables") if namespace_path is None: @@ -413,12 +424,15 @@ class RemoteDBConnection(DBConnection): ) table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path)) - return RemoteTable( + tbl = RemoteTable( table, self.db_name, connection_state=self.serialize, namespace_path=namespace_path, ) + if version is not None: + tbl.checkout(version) + return tbl def clone_table( self, diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index a2ecf4dbc..a854ef2d5 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -5864,9 +5864,19 @@ class Branches: ) return self._wrap(async_table) - def checkout(self, name: str) -> "LanceTable": - """Check out an existing branch and return a handle scoped to it.""" - async_table = LOOP.run(self._table.branches.checkout(name)) + def checkout(self, name: str, version: Optional[int] = None) -> "LanceTable": + """Check out an existing branch and return a handle scoped to it. + + Parameters + ---------- + name: str + Name of the branch to check out. + version: int, optional + A specific version on the branch to pin. When set, the returned + handle is a read-only view of that version; when omitted it tracks + the branch's latest and stays writable. + """ + async_table = LOOP.run(self._table.branches.checkout(name, version)) return self._wrap(async_table) def delete(self, name: str) -> None: @@ -5991,10 +6001,19 @@ class AsyncBranches: inner = await self._table.branches.create(name, from_ref, from_version) return AsyncTable(inner) - async def checkout(self, name: str) -> "AsyncTable": - """Check out an existing branch and return a handle scoped to it.""" - inner = await self._table.branches.checkout(name) - return AsyncTable(inner) + async def checkout(self, name: str, version: Optional[int] = None) -> "AsyncTable": + """Check out an existing branch and return a handle scoped to it. + + Parameters + ---------- + name: str + Name of the branch to check out. + version: int, optional + A specific version on the branch to pin. When set, the returned + handle is a read-only view of that version; when omitted it tracks + the branch's latest and stays writable. + """ + return AsyncTable(await self._table.branches.checkout(name, version)) async def delete(self, name: str) -> None: """Delete a branch.""" diff --git a/python/python/tests/test_remote_db.py b/python/python/tests/test_remote_db.py index 4cc184c77..da2956428 100644 --- a/python/python/tests/test_remote_db.py +++ b/python/python/tests/test_remote_db.py @@ -154,6 +154,52 @@ async def test_async_checkout(): assert await table.count_rows() == 300 +def test_remote_open_table_branch_and_version(): + def handler(request): + # describe (table open + version validation) always succeeds + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + request.wfile.write( + json.dumps({"version": 2, "schema": {"fields": []}}).encode() + ) + + with mock_lancedb_connection(handler) as db: + # version-only (and "main" + version) is allowed: remote supports + # version time-travel even though it has no branches + assert db.open_table("test", version=2) is not None + assert db.open_table("test", branch="main", version=2) is not None + + # a non-main branch is rejected, with or without a version + with pytest.raises(NotImplementedError, match="branching"): + db.open_table("test", branch="exp") + with pytest.raises(NotImplementedError, match="branching"): + db.open_table("test", branch="exp", version=2) + + +@pytest.mark.asyncio +async def test_async_remote_open_table_branch_and_version(): + def handler(request): + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + request.wfile.write( + json.dumps({"version": 2, "schema": {"fields": []}}).encode() + ) + + async with mock_lancedb_connection_async(handler) as db: + # version-only (and "main" + version) is allowed: "main" is the default + # branch, so it must not hit the unsupported remote branch path + assert await db.open_table("test", version=2) is not None + assert await db.open_table("test", branch="main", version=2) is not None + + # a non-main branch is rejected, with or without a version + with pytest.raises(NotImplementedError, match="branching"): + await db.open_table("test", branch="exp") + with pytest.raises(NotImplementedError, match="branching"): + await db.open_table("test", branch="exp", version=2) + + def test_table_len_sync(): def handler(request): if request.path == "/v1/table/test/create/?mode=create": diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index adad831b5..92570df7a 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -998,6 +998,70 @@ def test_open_table_with_branch(tmp_path): assert db.open_table("t").count_rows() == 1 +def test_open_table_with_branch_version(tmp_path): + db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(0)) + + # main: a single fork-point row + t = db.create_table("t", [{"i": 0}]) + main_v1 = t.version + + # fork "exp", then advance exp AND main independently past the fork so they + # diverge while sharing version numbers + exp = t.branches.create("exp") + exp.add([{"i": 1}]) # exp: {0, 1} + exp_v2 = exp.version + exp.add([{"i": 2}]) # exp HEAD: {0, 1, 2} + t.add([{"i": 100}, {"i": 101}, {"i": 102}]) # main HEAD: {0, 100, 101, 102} + assert exp_v2 == t.version, "branch and main must share the version number" + + # open exp at the shared version: the data must be exp's, not main's. count + # alone cannot prove this (main@v2 also exists), so assert provenance by + # content. + pinned = db.open_table("t", branch="exp", version=exp_v2) + assert pinned.current_branch() == "exp" + assert pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4) + assert pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible + assert pinned.count_rows("i = 100") == 0 # main's divergent rows are invisible + + # the same coordinate is reachable directly via branches.checkout(name, version) + pinned_direct = t.branches.checkout("exp", exp_v2) + assert pinned_direct.current_branch() == "exp" + assert pinned_direct.count_rows() == 2 + + # the HEADs are unaffected + assert db.open_table("t", branch="exp").count_rows() == 3 + assert db.open_table("t").count_rows() == 4 + + # version-only (no branch) time-travels main itself: its fork-point version + # holds only main's first row, and the shared version number resolves to + # main's data, not the branch's ("opens main at the version") + old_main = db.open_table("t", version=main_v1) + assert old_main.current_branch() is None + assert old_main.count_rows() == 1 + shared_on_main = db.open_table("t", version=exp_v2) + assert shared_on_main.current_branch() is None + assert shared_on_main.count_rows() == 4 + + # detached head: writing to a pinned version is rejected + with pytest.raises((ValueError, RuntimeError), match="cannot be modified"): + pinned.add([{"i": 9}]) + + # a nonexistent version is rejected -- on main, and on a branch (a distinct + # resolution path, on the branch's manifests) + with pytest.raises((ValueError, RuntimeError)): + db.open_table("t", version=9999) + with pytest.raises((ValueError, RuntimeError)): + db.open_table("t", branch="exp", version=9999) + + # checkout_latest re-attaches the pinned handle to the BRANCH's HEAD + # (writable again), not main's HEAD, and not staying pinned + pinned.checkout_latest() + assert pinned.current_branch() == "exp" + assert pinned.count_rows() == 3 # exp HEAD, not main's 4 + pinned.add([{"i": 3}]) + assert pinned.count_rows() == 4 # writable again + + @pytest.mark.asyncio async def test_async_namespace_open_table_with_branch(tmp_path): pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace @@ -1012,6 +1076,64 @@ async def test_async_namespace_open_table_with_branch(tmp_path): assert await opened.count_rows() == 2 +def test_namespace_open_table_with_branch_version(tmp_path): + pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace + db = lancedb.connect_namespace("dir", {"root": str(tmp_path)}) + db.create_namespace(["ns1"]) + t = db.create_table("t", [{"i": 0}], namespace_path=["ns1"]) + + # fork "exp", then advance exp AND main past the fork so they diverge while + # sharing version numbers + exp = t.branches.create("exp") + exp.add([{"i": 1}]) + exp_v2 = exp.version + exp.add([{"i": 2}]) + t.add([{"i": 100}, {"i": 101}, {"i": 102}]) + assert exp_v2 == t.version, "branch and main must share the version number" + + # open_table(branch=, version=) on the namespace connection reads the + # branch's data at that version, not main's + pinned = db.open_table("t", namespace_path=["ns1"], branch="exp", version=exp_v2) + assert pinned.current_branch() == "exp" + assert pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4) + assert pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible + assert pinned.count_rows("i = 100") == 0 # main's divergent rows are invisible + assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3 + + +@pytest.mark.asyncio +async def test_async_namespace_open_table_with_branch_version(tmp_path): + pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace + db = lancedb.connect_namespace_async("dir", {"root": str(tmp_path)}) + await db.create_namespace(["ns1"]) + t = await db.create_table("t", [{"i": 0}], namespace_path=["ns1"]) + + # fork "exp", then advance exp AND main past the fork so they diverge while + # sharing version numbers + exp = await t.branches.create("exp") + await exp.add([{"i": 1}]) + exp_v2 = await exp.version() + await exp.add([{"i": 2}]) + await t.add([{"i": 100}, {"i": 101}, {"i": 102}]) + assert exp_v2 == await t.version(), "branch and main must share the version number" + + # open_table(branch=, version=) on the async namespace connection reads the + # branch's data at that version, not main's + pinned = await db.open_table( + "t", namespace_path=["ns1"], branch="exp", version=exp_v2 + ) + assert pinned.current_branch() == "exp" + assert await pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4) + assert await pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible + assert await pinned.count_rows("i = 100") == 0 # main's rows are invisible + assert ( + await ( + await db.open_table("t", namespace_path=["ns1"], branch="exp") + ).count_rows() + == 3 + ) + + def test_branch_to_lance_targets_branch(tmp_path): pytest.importorskip("lance") db = lancedb.connect(tmp_path) @@ -1057,6 +1179,70 @@ async def test_async_branches(tmp_path): assert "exp" not in await table.branches.list() +@pytest.mark.asyncio +async def test_async_open_table_with_branch_version(tmp_path): + db = await lancedb.connect_async(tmp_path, read_consistency_interval=timedelta(0)) + + # main: a single fork-point row + t = await db.create_table("t", [{"i": 0}]) + main_v1 = await t.version() + + # fork "exp", then advance exp AND main independently past the fork so they + # diverge while sharing version numbers + exp = await t.branches.create("exp") + await exp.add([{"i": 1}]) # exp: {0, 1} + exp_v2 = await exp.version() + await exp.add([{"i": 2}]) # exp HEAD: {0, 1, 2} + await t.add([{"i": 100}, {"i": 101}, {"i": 102}]) # main HEAD: {0, 100, 101, 102} + assert exp_v2 == await t.version(), "branch and main must share the version number" + + # open exp at the shared version: the data must be exp's, not main's. count + # alone cannot prove this (main@v2 also exists), so assert provenance by + # content. + pinned = await db.open_table("t", branch="exp", version=exp_v2) + assert pinned.current_branch() == "exp" + assert await pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4) + assert await pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible + assert await pinned.count_rows("i = 100") == 0 # main's rows are invisible + + # the same coordinate is reachable directly via branches.checkout(name, version) + pinned_direct = await t.branches.checkout("exp", exp_v2) + assert pinned_direct.current_branch() == "exp" + assert await pinned_direct.count_rows() == 2 + + # the HEADs are unaffected + assert await (await db.open_table("t", branch="exp")).count_rows() == 3 + assert await (await db.open_table("t")).count_rows() == 4 + + # version-only (no branch) time-travels main itself: its fork-point version + # holds only main's first row, and the shared version number resolves to + # main's data, not the branch's ("opens main at the version") + old_main = await db.open_table("t", version=main_v1) + assert old_main.current_branch() is None + assert await old_main.count_rows() == 1 + shared_on_main = await db.open_table("t", version=exp_v2) + assert shared_on_main.current_branch() is None + assert await shared_on_main.count_rows() == 4 + + # detached head: writing to a pinned version is rejected + with pytest.raises((ValueError, RuntimeError), match="cannot be modified"): + await pinned.add([{"i": 9}]) + + # a nonexistent version is rejected -- on main, and on a branch + with pytest.raises((ValueError, RuntimeError)): + await db.open_table("t", version=9999) + with pytest.raises((ValueError, RuntimeError)): + await db.open_table("t", branch="exp", version=9999) + + # checkout_latest re-attaches the pinned handle to the BRANCH's HEAD + # (writable again), not main's HEAD, and not staying pinned + await pinned.checkout_latest() + assert pinned.current_branch() == "exp" + assert await pinned.count_rows() == 3 # exp HEAD, not main's 4 + await pinned.add([{"i": 3}]) + assert await pinned.count_rows() == 4 # writable again + + @patch("lancedb.table.AsyncTable.create_index") def test_create_index_method(mock_create_index, mem_db: DBConnection): table = mem_db.create_table( diff --git a/python/src/table.rs b/python/src/table.rs index cd85d9ad6..04e5bea5a 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -1321,10 +1321,15 @@ impl Branches { }) } - pub fn checkout(self_: PyRef<'_, Self>, name: String) -> PyResult> { + #[pyo3(signature = (name, version=None))] + pub fn checkout( + self_: PyRef<'_, Self>, + name: String, + version: Option, + ) -> PyResult> { let inner = self_.inner.clone(); future_into_py(self_.py(), async move { - let table = inner.checkout_branch(&name).await.infer_error()?; + let table = inner.checkout_branch(&name, version).await.infer_error()?; Ok(Table::new(table)) }) } diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index a478abfea..53b61641b 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use lance::dataset::ReadParams; +use lance::dataset::refs::MAIN_BRANCH; use lance_namespace::models::{ CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest, DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest, @@ -120,6 +121,7 @@ pub struct OpenTableBuilder { request: OpenTableRequest, embedding_registry: Arc, branch: Option, + version: Option, } impl OpenTableBuilder { @@ -141,6 +143,7 @@ impl OpenTableBuilder { }, embedding_registry, branch: None, + version: None, } } @@ -269,13 +272,39 @@ impl OpenTableBuilder { self } + /// Open the table pinned to a specific version, producing a read-only "view". + /// + /// Composes with [`Self::branch`]: when a branch is also set, this opens that + /// branch at the given version; otherwise it opens `main` at that version. + /// The returned table is a detached head, so operations that modify the table + /// will fail until [`Table::checkout_latest`] is called. + /// + /// ``` + /// # use lancedb::Connection; + /// # async fn f(conn: &Connection) -> Result<(), Box> { + /// let table = conn.open_table("t").branch("exp").version(3).execute().await?; + /// # Ok(()) + /// # } + /// ``` + pub fn version(mut self, version: u64) -> Self { + self.version = Some(version); + self + } + /// Open the table pub async fn execute(self) -> Result
{ let table = self.parent.open_table(self.request).await?; let table = Table::new_with_embedding_registry(table, self.parent, self.embedding_registry); - match self.branch { - Some(branch) => table.checkout_branch(&branch).await, - None => Ok(table), + // "main" is the default branch, so treat it as no branch. + let branch = self.branch.filter(|b| b.as_str() != MAIN_BRANCH); + match branch { + Some(branch) => table.checkout_branch(&branch, self.version).await, + None => { + if let Some(version) = self.version { + table.checkout(version).await?; + } + Ok(table) + } } } } diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index c11584157..e83014d5d 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -983,6 +983,49 @@ mod tests { assert_eq!(table.name(), "table1"); } + #[tokio::test] + async fn test_open_table_branch_and_version() { + // Remote supports version time-travel but not branches. A version-only + // open (or one on the default "main" branch) must succeed; a non-main + // branch must be rejected, with or without a version. + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.url().path(), "/v1/table/t/describe/"); + http::Response::builder() + .status(200) + .body( + r#"{"table": "t", "version": 2, "schema": {"fields": [ + {"name": "a", "type": { "type": "int32" }, "nullable": false} + ]}}"#, + ) + .unwrap() + }); + + // version-only: allowed (open + checkout(version) both round-trip) + conn.open_table("t").version(2).execute().await.unwrap(); + + // "main" is the default branch, so it counts as no branch + conn.open_table("t") + .branch("main") + .version(2) + .execute() + .await + .unwrap(); + + // a non-main branch is rejected, with or without a version + assert!(matches!( + conn.open_table("t").branch("exp").execute().await, + Err(Error::NotSupported { .. }) + )); + assert!(matches!( + conn.open_table("t") + .branch("exp") + .version(2) + .execute() + .await, + Err(Error::NotSupported { .. }) + )); + } + #[tokio::test] async fn test_open_table_not_found() { let conn = Connection::new_with_handler(|_| { diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 4118e0f11..0e5105ff0 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -633,6 +633,23 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { ) -> Result>; /// Check out an existing branch and return a handle scoped to it. async fn checkout_branch(&self, name: &str) -> Result>; + /// Check out an existing branch at an optional version, returning a handle. + /// + /// `None` tracks the branch's latest; `Some(v)` pins it to that version + /// (read-only). The default implementation composes [`Self::checkout_branch`] + /// and [`Self::checkout`]; implementations may override it to resolve the + /// `(branch, version)` coordinate in a single manifest read. + async fn checkout_branch_version( + &self, + name: &str, + version: Option, + ) -> Result> { + let branch = self.checkout_branch(name).await?; + if let Some(version) = version { + branch.checkout(version).await?; + } + Ok(branch) + } /// List the branches of the table. async fn list_branches(&self) -> Result>; /// Delete a branch. @@ -1654,8 +1671,20 @@ impl Table { } /// Check out an existing branch and return a handle scoped to it. - pub async fn checkout_branch(&self, name: &str) -> Result { - let inner = self.inner.checkout_branch(name).await?; + /// + /// With `version` set, the returned handle is pinned to that version of the + /// branch: a read-only, detached view (as with [`Self::checkout`]). With + /// `version` as `None` it tracks the branch's latest and stays writable. + /// + /// ``` + /// # use lancedb::Table; + /// # async fn f(table: &Table) -> Result<(), Box> { + /// let exp_at_v3 = table.checkout_branch("exp", Some(3)).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn checkout_branch(&self, name: &str, version: Option) -> Result { + let inner = self.inner.checkout_branch_version(name, version).await?; Ok(Self { inner, database: self.database.clone(), @@ -2757,6 +2786,29 @@ impl BaseTable for NativeTable { Ok(Arc::new(self.with_dataset(dataset))) } + async fn checkout_branch_version( + &self, + name: &str, + version: Option, + ) -> Result> { + let Some(version) = version else { + return self.checkout_branch(name).await; + }; + Self::validate_branch_name(name, "branch name")?; + // Resolve (branch, version) in a single manifest read. + let branch_ds = self + .dataset + .get() + .await? + .checkout_version((name, version)) + .await?; + let dataset = dataset::DatasetConsistencyWrapper::new_time_travel( + branch_ds, + self.read_consistency_interval, + ); + Ok(Arc::new(self.with_dataset(dataset))) + } + async fn list_branches(&self) -> Result> { Ok(self.dataset.get().await?.list_branches().await?) } @@ -3538,7 +3590,7 @@ mod tests { assert!(branches.contains_key("exp")); // checking out the branch from the main handle sees the branch's latest data - let checked_out = table.checkout_branch("exp").await.unwrap(); + let checked_out = table.checkout_branch("exp", None).await.unwrap(); assert_eq!(checked_out.current_branch().as_deref(), Some("exp")); assert_eq!(checked_out.count_rows(None).await.unwrap(), 2); @@ -3558,6 +3610,186 @@ mod tests { assert!(!branches.contains_key("exp")); } + #[tokio::test] + async fn test_branch_version_checkout() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + + // main: a single fork-point row (i = 0) + let table = conn + .create_table("my_table", sample_rows(vec![0])) + .execute() + .await + .unwrap(); + let fork_point = table.version().await.unwrap(); + + // Fork "exp", then advance exp AND main independently past the fork so + // they diverge while sharing version numbers. + let branch = table.create_branch("exp", fork_point).await.unwrap(); + let exp_fork = branch.version().await.unwrap(); // exp's shallow-clone version + branch.add(sample_rows(vec![1])).execute().await.unwrap(); // exp: {0, 1} + let exp_v2 = branch.version().await.unwrap(); + branch.add(sample_rows(vec![2])).execute().await.unwrap(); // exp HEAD: {0, 1, 2} + + // main's own commit reaches the SAME version number with different data + table + .add(sample_rows(vec![100, 101, 102])) + .execute() + .await + .unwrap(); // main HEAD: {0, 100, 101, 102} + let main_v2 = table.version().await.unwrap(); + assert_eq!( + exp_v2, main_v2, + "branch and main must share the version number for this test to mean anything" + ); + + // Open exp at the shared version. The data must be exp's, not main's: + // count alone cannot prove this (main@v2 differs), so assert provenance + // by content. + let pinned = conn + .open_table("my_table") + .branch("exp") + .version(exp_v2) + .execute() + .await + .unwrap(); + assert_eq!(pinned.current_branch().as_deref(), Some("exp")); + // isolated from exp's HEAD (3 rows) and from main@v2 (4 rows) + assert_eq!(pinned.count_rows(None).await.unwrap(), 2); + // exp's post-fork row is visible; main's divergent rows are not + assert_eq!( + pinned.count_rows(Some("i = 1".to_string())).await.unwrap(), + 1 + ); + assert_eq!( + pinned + .count_rows(Some("i = 100".to_string())) + .await + .unwrap(), + 0 + ); + + // the same coordinate is reachable directly via checkout_branch(name, version) + let pinned_direct = table.checkout_branch("exp", Some(exp_v2)).await.unwrap(); + assert_eq!(pinned_direct.current_branch().as_deref(), Some("exp")); + assert_eq!(pinned_direct.count_rows(None).await.unwrap(), 2); + + // the HEADs are unaffected + let head = conn + .open_table("my_table") + .branch("exp") + .execute() + .await + .unwrap(); + assert_eq!(head.count_rows(None).await.unwrap(), 3); + assert_eq!(table.count_rows(None).await.unwrap(), 4); + + // a pinned version is a detached head: writes are rejected + assert!(pinned.add(sample_rows(vec![9])).execute().await.is_err()); + + // version-only (no branch) time-travels main itself: its fork-point + // version holds only main's first row, and the shared version number + // resolves to main's data, not the branch's ("opens main at the version") + let old_main = conn + .open_table("my_table") + .version(fork_point) + .execute() + .await + .unwrap(); + assert_eq!(old_main.current_branch(), None); + assert_eq!(old_main.count_rows(None).await.unwrap(), 1); + let shared_on_main = conn + .open_table("my_table") + .version(exp_v2) + .execute() + .await + .unwrap(); + assert_eq!(shared_on_main.current_branch(), None); + assert_eq!(shared_on_main.count_rows(None).await.unwrap(), 4); + + // a nonexistent version is rejected + assert!( + conn.open_table("my_table") + .version(9999) + .execute() + .await + .is_err() + ); + + // a nonexistent version on a branch is rejected too: this resolves on + // the branch's path, a distinct miss from the main lookup above + assert!( + conn.open_table("my_table") + .branch("exp") + .version(9999) + .execute() + .await + .is_err() + ); + + // opening the branch at its fork point (the shallow-clone manifest) + // shows just the cloned state: main's fork-point row + let exp_at_fork = conn + .open_table("my_table") + .branch("exp") + .version(exp_fork) + .execute() + .await + .unwrap(); + assert_eq!(exp_at_fork.current_branch().as_deref(), Some("exp")); + assert_eq!(exp_at_fork.count_rows(None).await.unwrap(), 1); + + // checkout_latest re-attaches the pinned handle to the BRANCH's HEAD + // (writable again), not main's HEAD, and not staying pinned + pinned.checkout_latest().await.unwrap(); + assert_eq!(pinned.current_branch().as_deref(), Some("exp")); + assert_eq!(pinned.count_rows(None).await.unwrap(), 3); // exp HEAD, not main's 4 + pinned.add(sample_rows(vec![3])).execute().await.unwrap(); + assert_eq!(pinned.count_rows(None).await.unwrap(), 4); // writable again + } + + #[tokio::test] + async fn test_branch_version_two_branches() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + + let table = conn + .create_table("my_table", sample_rows(vec![0])) + .execute() + .await + .unwrap(); + let fork_point = table.version().await.unwrap(); + + // two branches off the same point, each advanced once so they reach the + // SAME version number with divergent data + let exp1 = table.create_branch("exp1", fork_point).await.unwrap(); + let exp2 = table.create_branch("exp2", fork_point).await.unwrap(); + exp1.add(sample_rows(vec![10])).execute().await.unwrap(); + exp2.add(sample_rows(vec![20])).execute().await.unwrap(); + let v1 = exp1.version().await.unwrap(); + let v2 = exp2.version().await.unwrap(); + assert_eq!(v1, v2, "both branches must reach the same version number"); + + // that shared version number resolves to each branch's own data + let at1 = table.checkout_branch("exp1", Some(v1)).await.unwrap(); + assert_eq!(at1.count_rows(Some("i = 10".to_string())).await.unwrap(), 1); + assert_eq!(at1.count_rows(Some("i = 20".to_string())).await.unwrap(), 0); + let at2 = table.checkout_branch("exp2", Some(v2)).await.unwrap(); + assert_eq!(at2.count_rows(Some("i = 20".to_string())).await.unwrap(), 1); + assert_eq!(at2.count_rows(Some("i = 10".to_string())).await.unwrap(), 0); + } + #[tokio::test] async fn test_branch_name_validation() { let tmp_dir = tempdir().unwrap(); @@ -3575,7 +3807,7 @@ mod tests { Err(Error::InvalidInput { .. }) )); assert!(matches!( - table.checkout_branch("").await, + table.checkout_branch("", None).await, Err(Error::InvalidInput { .. }) )); assert!(matches!( @@ -4014,6 +4246,19 @@ mod tests { Box::new(RecordBatchIterator::new(vec![batch], schema)) } + /// A single-batch reader holding the given `i` (Int32) values. Lets a test + /// write distinguishable rows so it can assert data provenance, not row count. + fn sample_rows(values: Vec) -> Box { + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(values))], + ) + .unwrap(); + let schema = batch.schema().clone(); + + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema)) + } + #[tokio::test] async fn test_create_scalar_index() { let tmp_dir = tempdir().unwrap(); diff --git a/rust/lancedb/src/table/dataset.rs b/rust/lancedb/src/table/dataset.rs index 1ff11198e..7ccca744a 100644 --- a/rust/lancedb/src/table/dataset.rs +++ b/rust/lancedb/src/table/dataset.rs @@ -76,6 +76,23 @@ impl DatasetConsistencyWrapper { } } + /// Create a new wrapper pinned to the dataset's current version. + /// + /// `dataset` must already be checked out at the desired version; this pins + /// to `dataset.version()` without re-resolving. The wrapper is read-only + /// (time-travel) until [`as_latest`](Self::as_latest) re-attaches it to the + /// latest version. + pub fn new_time_travel(dataset: Dataset, read_consistency_interval: Option) -> Self { + let version = dataset.version().version; + let wrapper = Self::new_latest(dataset, read_consistency_interval); + wrapper + .state + .lock() + .unwrap_or_else(|e| e.into_inner()) + .pinned_version = Some(version); + wrapper + } + /// The MemWAL `ShardWriter` cache co-located with this dataset. pub(crate) fn shard_writer(&self) -> &Arc { &self.shard_writer