mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-05 05:10:41 +00:00
feat: support checking out a version on a branch
This commit is contained in:
@@ -16,15 +16,21 @@ to the branch; writes on it do not affect `main`.
|
||||
### checkout()
|
||||
|
||||
```ts
|
||||
checkout(name): Promise<Table>
|
||||
checkout(name, version?): Promise<Table>
|
||||
```
|
||||
|
||||
Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
With `version` set, the returned handle is pinned to that version of the
|
||||
branch (a read-only, detached view); otherwise it tracks the branch's
|
||||
latest and stays writable.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
|
||||
* **version?**: `number`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Table`](Table.md)>
|
||||
|
||||
@@ -55,3 +55,17 @@ Options already set on the connection will be inherited by the table,
|
||||
but can be overridden here.
|
||||
|
||||
The available options are described at https://docs.lancedb.com/storage/
|
||||
|
||||
***
|
||||
|
||||
### version?
|
||||
|
||||
```ts
|
||||
optional version: number;
|
||||
```
|
||||
|
||||
Open the table pinned to this version, producing a read-only view.
|
||||
|
||||
Composes with [OpenTableOptions.branch](OpenTableOptions.md#branch): when both are set, opens
|
||||
that branch at the version; otherwise opens `main` at the version. Call
|
||||
`checkoutLatest` to return to a writable state.
|
||||
|
||||
@@ -191,6 +191,34 @@ describe("remote connection", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("allows version on remote but rejects a non-main branch", async () => {
|
||||
await withMockDatabase(
|
||||
(_req, res) => {
|
||||
// describe (table open + version validation) always succeeds
|
||||
const body = JSON.stringify({
|
||||
name: "t",
|
||||
version: 2,
|
||||
schema: { fields: [] },
|
||||
});
|
||||
res.writeHead(200, { "Content-Type": "application/json" }).end(body);
|
||||
},
|
||||
async (db) => {
|
||||
// version-only (and "main" + version) is allowed: remote supports
|
||||
// version time-travel even though it has no branches
|
||||
await db.openTable("t", undefined, { version: 2 });
|
||||
await db.openTable("t", undefined, { branch: "main", version: 2 });
|
||||
|
||||
// a non-main branch is rejected, with or without a version
|
||||
await expect(
|
||||
db.openTable("t", undefined, { branch: "exp" }),
|
||||
).rejects.toThrow(/branching/);
|
||||
await expect(
|
||||
db.openTable("t", undefined, { branch: "exp", version: 2 }),
|
||||
).rejects.toThrow(/branching/);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe("TlsConfig", () => {
|
||||
it("should create TlsConfig with all fields", () => {
|
||||
const tlsConfig: TlsConfig = {
|
||||
|
||||
@@ -133,6 +133,78 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
expect(await (await db.openTable("some_table")).countRows()).toBe(1);
|
||||
});
|
||||
|
||||
it("should open a branch at a version isolated from main and HEAD", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
// main: a single fork-point row
|
||||
const t = await db.createTable("bv_table", [{ id: 0 }]);
|
||||
const mainV1 = await t.version();
|
||||
|
||||
// fork "exp", then advance exp AND main independently past the fork so
|
||||
// they diverge while sharing version numbers
|
||||
const exp = await (await t.branches()).create("exp");
|
||||
await exp.add([{ id: 1 }]); // exp: {0, 1}
|
||||
const expV2 = await exp.version();
|
||||
await exp.add([{ id: 2 }]); // exp HEAD: {0, 1, 2}
|
||||
await t.add([{ id: 100 }, { id: 101 }, { id: 102 }]); // main HEAD: {0,100,101,102}
|
||||
expect(await t.version()).toBe(expV2);
|
||||
|
||||
// open exp at the shared version: the data must be exp's, not main's.
|
||||
// count alone cannot prove this (main@v2 also exists), so assert
|
||||
// provenance by content.
|
||||
const pinned = await db.openTable("bv_table", undefined, {
|
||||
branch: "exp",
|
||||
version: expV2,
|
||||
});
|
||||
expect(await pinned.countRows()).toBe(2); // not exp HEAD (3), not main@v2 (4)
|
||||
expect(await pinned.countRows("id = 1")).toBe(1); // exp's post-fork row
|
||||
expect(await pinned.countRows("id = 100")).toBe(0); // main's rows invisible
|
||||
|
||||
// the same coordinate is reachable directly via branches().checkout(name, version)
|
||||
const pinnedDirect = await (await t.branches()).checkout("exp", expV2);
|
||||
expect(await pinnedDirect.countRows()).toBe(2);
|
||||
|
||||
// the HEADs are unaffected
|
||||
expect(
|
||||
await (
|
||||
await db.openTable("bv_table", undefined, { branch: "exp" })
|
||||
).countRows(),
|
||||
).toBe(3);
|
||||
expect(await (await db.openTable("bv_table")).countRows()).toBe(4);
|
||||
|
||||
// version-only (no branch) time-travels main itself: its fork-point
|
||||
// version holds only main's first row, and the shared version number
|
||||
// resolves to main's data, not the branch's ("opens main at the version")
|
||||
const oldMain = await db.openTable("bv_table", undefined, {
|
||||
version: mainV1,
|
||||
});
|
||||
expect(await oldMain.countRows()).toBe(1);
|
||||
const sharedOnMain = await db.openTable("bv_table", undefined, {
|
||||
version: expV2,
|
||||
});
|
||||
expect(await sharedOnMain.countRows()).toBe(4); // main@v2, not exp@v2 (2)
|
||||
|
||||
// detached head: writing to a pinned version is rejected
|
||||
await expect(pinned.add([{ id: 9 }])).rejects.toThrow(
|
||||
/cannot be modified/,
|
||||
);
|
||||
|
||||
// a nonexistent version is rejected -- on main, and on a branch (a
|
||||
// distinct resolution path, on the branch's manifests)
|
||||
await expect(
|
||||
db.openTable("bv_table", undefined, { version: 9999 }),
|
||||
).rejects.toThrow();
|
||||
await expect(
|
||||
db.openTable("bv_table", undefined, { branch: "exp", version: 9999 }),
|
||||
).rejects.toThrow();
|
||||
|
||||
// checkoutLatest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
// (writable again), not main's HEAD (4), and not staying pinned (2)
|
||||
await pinned.checkoutLatest();
|
||||
expect(await pinned.countRows()).toBe(3); // exp HEAD
|
||||
await pinned.add([{ id: 3 }]);
|
||||
expect(await pinned.countRows()).toBe(4); // writable again
|
||||
});
|
||||
|
||||
it("rejects invalid branch inputs", async () => {
|
||||
const branches = await table.branches();
|
||||
await expect(branches.create("")).rejects.toThrow("non-empty");
|
||||
|
||||
@@ -90,6 +90,14 @@ export interface OpenTableOptions {
|
||||
* Reads and writes on the returned table operate in the branch's context.
|
||||
*/
|
||||
branch?: string;
|
||||
/**
|
||||
* Open the table pinned to this version, producing a read-only view.
|
||||
*
|
||||
* Composes with {@link OpenTableOptions.branch}: when both are set, opens
|
||||
* that branch at the version; otherwise opens `main` at the version. Call
|
||||
* `checkoutLatest` to return to a writable state.
|
||||
*/
|
||||
version?: number;
|
||||
/**
|
||||
* Configuration for object storage.
|
||||
*
|
||||
@@ -489,9 +497,18 @@ export class LocalConnection extends Connection {
|
||||
options?.indexCacheSize,
|
||||
);
|
||||
|
||||
const table = new LocalTable(innerTable);
|
||||
if (options?.branch != null) {
|
||||
return (await table.branches()).checkout(options.branch);
|
||||
let table: Table = new LocalTable(innerTable);
|
||||
// "main" is the default branch, so treat it as no branch. On a real branch,
|
||||
// scope and pin in one step (yielding "version V of branch B"); otherwise
|
||||
// pin the version, if any, against main.
|
||||
const branch =
|
||||
options?.branch != null && options.branch !== "main"
|
||||
? options.branch
|
||||
: undefined;
|
||||
if (branch != null) {
|
||||
table = await (await table.branches()).checkout(branch, options?.version);
|
||||
} else if (options?.version != null) {
|
||||
await table.checkout(options.version);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
@@ -1290,9 +1290,15 @@ export class Branches {
|
||||
return new LocalTable(await this.#inner.create(name, fromRef, fromVersion));
|
||||
}
|
||||
|
||||
/** Check out an existing branch and return a handle scoped to it. */
|
||||
async checkout(name: string): Promise<Table> {
|
||||
return new LocalTable(await this.#inner.checkout(name));
|
||||
/**
|
||||
* Check out an existing branch and return a handle scoped to it.
|
||||
*
|
||||
* With `version` set, the returned handle is pinned to that version of the
|
||||
* branch (a read-only, detached view); otherwise it tracks the branch's
|
||||
* latest and stays writable.
|
||||
*/
|
||||
async checkout(name: string, version?: number): Promise<Table> {
|
||||
return new LocalTable(await this.#inner.checkout(name, version));
|
||||
}
|
||||
|
||||
/** Delete a branch. */
|
||||
|
||||
@@ -1194,8 +1194,18 @@ impl Branches {
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn checkout(&self, name: String) -> napi::Result<Table> {
|
||||
let table = self.inner.checkout_branch(&name).await.default_error()?;
|
||||
pub async fn checkout(&self, name: String, version: Option<i64>) -> napi::Result<Table> {
|
||||
let version = version
|
||||
.map(|v| {
|
||||
u64::try_from(v)
|
||||
.map_err(|_| napi::Error::from_reason("version must be a non-negative integer"))
|
||||
})
|
||||
.transpose()?;
|
||||
let table = self
|
||||
.inner
|
||||
.checkout_branch(&name, version)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(Table::new(table))
|
||||
}
|
||||
|
||||
|
||||
@@ -249,7 +249,7 @@ class Branches:
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> Table: ...
|
||||
async def checkout(self, name: str) -> Table: ...
|
||||
async def checkout(self, name: str, version: Optional[int] = None) -> Table: ...
|
||||
async def delete(self, name: str) -> None: ...
|
||||
|
||||
class IndexConfig:
|
||||
|
||||
@@ -417,6 +417,7 @@ class DBConnection(EnforceOverrides):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> Table:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -448,6 +449,11 @@ class DBConnection(EnforceOverrides):
|
||||
branch: str, optional
|
||||
If provided, open a handle scoped to this branch instead of the
|
||||
default branch. Reads and writes operate in the branch's context.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Composes with ``branch``: when both are given,
|
||||
opens that branch at the version; otherwise opens ``main`` at the
|
||||
version. Call ``checkout_latest`` to return to a writable state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -963,6 +969,7 @@ class LanceDBConnection(DBConnection):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> LanceTable:
|
||||
"""Open a table in the database.
|
||||
|
||||
@@ -976,6 +983,11 @@ class LanceDBConnection(DBConnection):
|
||||
branch: str, optional
|
||||
If provided, open a handle scoped to this branch instead of the
|
||||
default branch. Reads and writes operate in the branch's context.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Composes with ``branch``: when both are given,
|
||||
opens that branch at the version; otherwise opens ``main`` at the
|
||||
version. Call ``checkout_latest`` to return to a writable state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -1011,7 +1023,9 @@ class LanceDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
if branch is not None:
|
||||
return tbl.branches.checkout(branch)
|
||||
tbl = tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
def clone_table(
|
||||
@@ -1654,6 +1668,7 @@ class AsyncConnection(object):
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> AsyncTable:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -1692,6 +1707,11 @@ class AsyncConnection(object):
|
||||
branch: str, optional
|
||||
If provided, open a handle scoped to this branch instead of the
|
||||
default branch. Reads and writes operate in the branch's context.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Composes with ``branch``: when both are given,
|
||||
opens that branch at the version; otherwise opens ``main`` at the
|
||||
version. Call ``checkout_latest`` to return to a writable state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -1709,8 +1729,12 @@ class AsyncConnection(object):
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
tbl = AsyncTable(table)
|
||||
if branch is not None:
|
||||
return await tbl.branches.checkout(branch)
|
||||
# "main" is the default branch, so treat it as no branch: remote rejects
|
||||
# every branch checkout (even "main"), and the version still applies.
|
||||
if branch is not None and branch != "main":
|
||||
tbl = await tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
await tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
async def clone_table(
|
||||
|
||||
@@ -545,6 +545,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> Table:
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
@@ -572,7 +573,9 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
_async=async_table,
|
||||
)
|
||||
if branch is not None:
|
||||
return tbl.branches.checkout(branch)
|
||||
tbl = tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
@override
|
||||
@@ -979,6 +982,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> AsyncTable:
|
||||
"""Open an existing table from the namespace."""
|
||||
if namespace_path is None:
|
||||
@@ -995,8 +999,12 @@ class AsyncLanceNamespaceDBConnection:
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
if branch is not None:
|
||||
return await tbl.branches.checkout(branch)
|
||||
# "main" is the default branch, so treat it as no branch (mirrors the
|
||||
# sync remote path); the version still applies.
|
||||
if branch is not None and branch != "main":
|
||||
tbl = await tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
await tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
|
||||
@@ -384,6 +384,7 @@ class RemoteDBConnection(DBConnection):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> Table:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -394,6 +395,14 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from.
|
||||
None or empty list represents root namespace.
|
||||
branch: str, optional
|
||||
Branching is not yet supported on remote tables, so only the
|
||||
default branch is accepted (``None`` or ``"main"``); any other
|
||||
value raises ``NotImplementedError``.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Call ``checkout_latest`` to return to a writable
|
||||
state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -401,7 +410,9 @@ class RemoteDBConnection(DBConnection):
|
||||
"""
|
||||
from .table import RemoteTable
|
||||
|
||||
if branch is not None:
|
||||
# Remote supports version time-travel but not branches: reject a non-main
|
||||
# branch, but allow a version-only open (or "main").
|
||||
if branch is not None and branch != "main":
|
||||
raise NotImplementedError("branching is not yet supported on remote tables")
|
||||
|
||||
if namespace_path is None:
|
||||
@@ -413,12 +424,15 @@ class RemoteDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
|
||||
return RemoteTable(
|
||||
tbl = RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
if version is not None:
|
||||
tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
|
||||
@@ -5864,9 +5864,19 @@ class Branches:
|
||||
)
|
||||
return self._wrap(async_table)
|
||||
|
||||
def checkout(self, name: str) -> "LanceTable":
|
||||
"""Check out an existing branch and return a handle scoped to it."""
|
||||
async_table = LOOP.run(self._table.branches.checkout(name))
|
||||
def checkout(self, name: str, version: Optional[int] = None) -> "LanceTable":
|
||||
"""Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the branch to check out.
|
||||
version: int, optional
|
||||
A specific version on the branch to pin. When set, the returned
|
||||
handle is a read-only view of that version; when omitted it tracks
|
||||
the branch's latest and stays writable.
|
||||
"""
|
||||
async_table = LOOP.run(self._table.branches.checkout(name, version))
|
||||
return self._wrap(async_table)
|
||||
|
||||
def delete(self, name: str) -> None:
|
||||
@@ -5991,10 +6001,19 @@ class AsyncBranches:
|
||||
inner = await self._table.branches.create(name, from_ref, from_version)
|
||||
return AsyncTable(inner)
|
||||
|
||||
async def checkout(self, name: str) -> "AsyncTable":
|
||||
"""Check out an existing branch and return a handle scoped to it."""
|
||||
inner = await self._table.branches.checkout(name)
|
||||
return AsyncTable(inner)
|
||||
async def checkout(self, name: str, version: Optional[int] = None) -> "AsyncTable":
|
||||
"""Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the branch to check out.
|
||||
version: int, optional
|
||||
A specific version on the branch to pin. When set, the returned
|
||||
handle is a read-only view of that version; when omitted it tracks
|
||||
the branch's latest and stays writable.
|
||||
"""
|
||||
return AsyncTable(await self._table.branches.checkout(name, version))
|
||||
|
||||
async def delete(self, name: str) -> None:
|
||||
"""Delete a branch."""
|
||||
|
||||
@@ -154,6 +154,52 @@ async def test_async_checkout():
|
||||
assert await table.count_rows() == 300
|
||||
|
||||
|
||||
def test_remote_open_table_branch_and_version():
|
||||
def handler(request):
|
||||
# describe (table open + version validation) always succeeds
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(
|
||||
json.dumps({"version": 2, "schema": {"fields": []}}).encode()
|
||||
)
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
# version-only (and "main" + version) is allowed: remote supports
|
||||
# version time-travel even though it has no branches
|
||||
assert db.open_table("test", version=2) is not None
|
||||
assert db.open_table("test", branch="main", version=2) is not None
|
||||
|
||||
# a non-main branch is rejected, with or without a version
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
db.open_table("test", branch="exp")
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
db.open_table("test", branch="exp", version=2)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_remote_open_table_branch_and_version():
|
||||
def handler(request):
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(
|
||||
json.dumps({"version": 2, "schema": {"fields": []}}).encode()
|
||||
)
|
||||
|
||||
async with mock_lancedb_connection_async(handler) as db:
|
||||
# version-only (and "main" + version) is allowed: "main" is the default
|
||||
# branch, so it must not hit the unsupported remote branch path
|
||||
assert await db.open_table("test", version=2) is not None
|
||||
assert await db.open_table("test", branch="main", version=2) is not None
|
||||
|
||||
# a non-main branch is rejected, with or without a version
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
await db.open_table("test", branch="exp")
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
await db.open_table("test", branch="exp", version=2)
|
||||
|
||||
|
||||
def test_table_len_sync():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=create":
|
||||
|
||||
@@ -998,6 +998,70 @@ def test_open_table_with_branch(tmp_path):
|
||||
assert db.open_table("t").count_rows() == 1
|
||||
|
||||
|
||||
def test_open_table_with_branch_version(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(0))
|
||||
|
||||
# main: a single fork-point row
|
||||
t = db.create_table("t", [{"i": 0}])
|
||||
main_v1 = t.version
|
||||
|
||||
# fork "exp", then advance exp AND main independently past the fork so they
|
||||
# diverge while sharing version numbers
|
||||
exp = t.branches.create("exp")
|
||||
exp.add([{"i": 1}]) # exp: {0, 1}
|
||||
exp_v2 = exp.version
|
||||
exp.add([{"i": 2}]) # exp HEAD: {0, 1, 2}
|
||||
t.add([{"i": 100}, {"i": 101}, {"i": 102}]) # main HEAD: {0, 100, 101, 102}
|
||||
assert exp_v2 == t.version, "branch and main must share the version number"
|
||||
|
||||
# open exp at the shared version: the data must be exp's, not main's. count
|
||||
# alone cannot prove this (main@v2 also exists), so assert provenance by
|
||||
# content.
|
||||
pinned = db.open_table("t", branch="exp", version=exp_v2)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert pinned.count_rows("i = 100") == 0 # main's divergent rows are invisible
|
||||
|
||||
# the same coordinate is reachable directly via branches.checkout(name, version)
|
||||
pinned_direct = t.branches.checkout("exp", exp_v2)
|
||||
assert pinned_direct.current_branch() == "exp"
|
||||
assert pinned_direct.count_rows() == 2
|
||||
|
||||
# the HEADs are unaffected
|
||||
assert db.open_table("t", branch="exp").count_rows() == 3
|
||||
assert db.open_table("t").count_rows() == 4
|
||||
|
||||
# version-only (no branch) time-travels main itself: its fork-point version
|
||||
# holds only main's first row, and the shared version number resolves to
|
||||
# main's data, not the branch's ("opens main at the version")
|
||||
old_main = db.open_table("t", version=main_v1)
|
||||
assert old_main.current_branch() is None
|
||||
assert old_main.count_rows() == 1
|
||||
shared_on_main = db.open_table("t", version=exp_v2)
|
||||
assert shared_on_main.current_branch() is None
|
||||
assert shared_on_main.count_rows() == 4
|
||||
|
||||
# detached head: writing to a pinned version is rejected
|
||||
with pytest.raises((ValueError, RuntimeError), match="cannot be modified"):
|
||||
pinned.add([{"i": 9}])
|
||||
|
||||
# a nonexistent version is rejected -- on main, and on a branch (a distinct
|
||||
# resolution path, on the branch's manifests)
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
db.open_table("t", version=9999)
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
db.open_table("t", branch="exp", version=9999)
|
||||
|
||||
# checkout_latest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
# (writable again), not main's HEAD, and not staying pinned
|
||||
pinned.checkout_latest()
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert pinned.count_rows() == 3 # exp HEAD, not main's 4
|
||||
pinned.add([{"i": 3}])
|
||||
assert pinned.count_rows() == 4 # writable again
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
@@ -1012,6 +1076,64 @@ async def test_async_namespace_open_table_with_branch(tmp_path):
|
||||
assert await opened.count_rows() == 2
|
||||
|
||||
|
||||
def test_namespace_open_table_with_branch_version(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace("dir", {"root": str(tmp_path)})
|
||||
db.create_namespace(["ns1"])
|
||||
t = db.create_table("t", [{"i": 0}], namespace_path=["ns1"])
|
||||
|
||||
# fork "exp", then advance exp AND main past the fork so they diverge while
|
||||
# sharing version numbers
|
||||
exp = t.branches.create("exp")
|
||||
exp.add([{"i": 1}])
|
||||
exp_v2 = exp.version
|
||||
exp.add([{"i": 2}])
|
||||
t.add([{"i": 100}, {"i": 101}, {"i": 102}])
|
||||
assert exp_v2 == t.version, "branch and main must share the version number"
|
||||
|
||||
# open_table(branch=, version=) on the namespace connection reads the
|
||||
# branch's data at that version, not main's
|
||||
pinned = db.open_table("t", namespace_path=["ns1"], branch="exp", version=exp_v2)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert pinned.count_rows("i = 100") == 0 # main's divergent rows are invisible
|
||||
assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch_version(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace_async("dir", {"root": str(tmp_path)})
|
||||
await db.create_namespace(["ns1"])
|
||||
t = await db.create_table("t", [{"i": 0}], namespace_path=["ns1"])
|
||||
|
||||
# fork "exp", then advance exp AND main past the fork so they diverge while
|
||||
# sharing version numbers
|
||||
exp = await t.branches.create("exp")
|
||||
await exp.add([{"i": 1}])
|
||||
exp_v2 = await exp.version()
|
||||
await exp.add([{"i": 2}])
|
||||
await t.add([{"i": 100}, {"i": 101}, {"i": 102}])
|
||||
assert exp_v2 == await t.version(), "branch and main must share the version number"
|
||||
|
||||
# open_table(branch=, version=) on the async namespace connection reads the
|
||||
# branch's data at that version, not main's
|
||||
pinned = await db.open_table(
|
||||
"t", namespace_path=["ns1"], branch="exp", version=exp_v2
|
||||
)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert await pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert await pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert await pinned.count_rows("i = 100") == 0 # main's rows are invisible
|
||||
assert (
|
||||
await (
|
||||
await db.open_table("t", namespace_path=["ns1"], branch="exp")
|
||||
).count_rows()
|
||||
== 3
|
||||
)
|
||||
|
||||
|
||||
def test_branch_to_lance_targets_branch(tmp_path):
|
||||
pytest.importorskip("lance")
|
||||
db = lancedb.connect(tmp_path)
|
||||
@@ -1057,6 +1179,70 @@ async def test_async_branches(tmp_path):
|
||||
assert "exp" not in await table.branches.list()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_open_table_with_branch_version(tmp_path):
|
||||
db = await lancedb.connect_async(tmp_path, read_consistency_interval=timedelta(0))
|
||||
|
||||
# main: a single fork-point row
|
||||
t = await db.create_table("t", [{"i": 0}])
|
||||
main_v1 = await t.version()
|
||||
|
||||
# fork "exp", then advance exp AND main independently past the fork so they
|
||||
# diverge while sharing version numbers
|
||||
exp = await t.branches.create("exp")
|
||||
await exp.add([{"i": 1}]) # exp: {0, 1}
|
||||
exp_v2 = await exp.version()
|
||||
await exp.add([{"i": 2}]) # exp HEAD: {0, 1, 2}
|
||||
await t.add([{"i": 100}, {"i": 101}, {"i": 102}]) # main HEAD: {0, 100, 101, 102}
|
||||
assert exp_v2 == await t.version(), "branch and main must share the version number"
|
||||
|
||||
# open exp at the shared version: the data must be exp's, not main's. count
|
||||
# alone cannot prove this (main@v2 also exists), so assert provenance by
|
||||
# content.
|
||||
pinned = await db.open_table("t", branch="exp", version=exp_v2)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert await pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert await pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert await pinned.count_rows("i = 100") == 0 # main's rows are invisible
|
||||
|
||||
# the same coordinate is reachable directly via branches.checkout(name, version)
|
||||
pinned_direct = await t.branches.checkout("exp", exp_v2)
|
||||
assert pinned_direct.current_branch() == "exp"
|
||||
assert await pinned_direct.count_rows() == 2
|
||||
|
||||
# the HEADs are unaffected
|
||||
assert await (await db.open_table("t", branch="exp")).count_rows() == 3
|
||||
assert await (await db.open_table("t")).count_rows() == 4
|
||||
|
||||
# version-only (no branch) time-travels main itself: its fork-point version
|
||||
# holds only main's first row, and the shared version number resolves to
|
||||
# main's data, not the branch's ("opens main at the version")
|
||||
old_main = await db.open_table("t", version=main_v1)
|
||||
assert old_main.current_branch() is None
|
||||
assert await old_main.count_rows() == 1
|
||||
shared_on_main = await db.open_table("t", version=exp_v2)
|
||||
assert shared_on_main.current_branch() is None
|
||||
assert await shared_on_main.count_rows() == 4
|
||||
|
||||
# detached head: writing to a pinned version is rejected
|
||||
with pytest.raises((ValueError, RuntimeError), match="cannot be modified"):
|
||||
await pinned.add([{"i": 9}])
|
||||
|
||||
# a nonexistent version is rejected -- on main, and on a branch
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
await db.open_table("t", version=9999)
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
await db.open_table("t", branch="exp", version=9999)
|
||||
|
||||
# checkout_latest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
# (writable again), not main's HEAD, and not staying pinned
|
||||
await pinned.checkout_latest()
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert await pinned.count_rows() == 3 # exp HEAD, not main's 4
|
||||
await pinned.add([{"i": 3}])
|
||||
assert await pinned.count_rows() == 4 # writable again
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
|
||||
@@ -1321,10 +1321,15 @@ impl Branches {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn checkout(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
|
||||
#[pyo3(signature = (name, version=None))]
|
||||
pub fn checkout(
|
||||
self_: PyRef<'_, Self>,
|
||||
name: String,
|
||||
version: Option<u64>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let table = inner.checkout_branch(&name).await.infer_error()?;
|
||||
let table = inner.checkout_branch(&name, version).await.infer_error()?;
|
||||
Ok(Table::new(table))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::SchemaRef;
|
||||
use lance::dataset::ReadParams;
|
||||
use lance::dataset::refs::MAIN_BRANCH;
|
||||
use lance_namespace::models::{
|
||||
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
|
||||
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
|
||||
@@ -120,6 +121,7 @@ pub struct OpenTableBuilder {
|
||||
request: OpenTableRequest,
|
||||
embedding_registry: Arc<dyn EmbeddingRegistry>,
|
||||
branch: Option<String>,
|
||||
version: Option<u64>,
|
||||
}
|
||||
|
||||
impl OpenTableBuilder {
|
||||
@@ -141,6 +143,7 @@ impl OpenTableBuilder {
|
||||
},
|
||||
embedding_registry,
|
||||
branch: None,
|
||||
version: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,13 +272,39 @@ impl OpenTableBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table pinned to a specific version, producing a read-only "view".
|
||||
///
|
||||
/// Composes with [`Self::branch`]: when a branch is also set, this opens that
|
||||
/// branch at the given version; otherwise it opens `main` at that version.
|
||||
/// The returned table is a detached head, so operations that modify the table
|
||||
/// will fail until [`Table::checkout_latest`] is called.
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::Connection;
|
||||
/// # async fn f(conn: &Connection) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let table = conn.open_table("t").branch("exp").version(3).execute().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn version(mut self, version: u64) -> Self {
|
||||
self.version = Some(version);
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table
|
||||
pub async fn execute(self) -> Result<Table> {
|
||||
let table = self.parent.open_table(self.request).await?;
|
||||
let table = Table::new_with_embedding_registry(table, self.parent, self.embedding_registry);
|
||||
match self.branch {
|
||||
Some(branch) => table.checkout_branch(&branch).await,
|
||||
None => Ok(table),
|
||||
// "main" is the default branch, so treat it as no branch.
|
||||
let branch = self.branch.filter(|b| b.as_str() != MAIN_BRANCH);
|
||||
match branch {
|
||||
Some(branch) => table.checkout_branch(&branch, self.version).await,
|
||||
None => {
|
||||
if let Some(version) = self.version {
|
||||
table.checkout(version).await?;
|
||||
}
|
||||
Ok(table)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -983,6 +983,49 @@ mod tests {
|
||||
assert_eq!(table.name(), "table1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table_branch_and_version() {
|
||||
// Remote supports version time-travel but not branches. A version-only
|
||||
// open (or one on the default "main" branch) must succeed; a non-main
|
||||
// branch must be rejected, with or without a version.
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.url().path(), "/v1/table/t/describe/");
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(
|
||||
r#"{"table": "t", "version": 2, "schema": {"fields": [
|
||||
{"name": "a", "type": { "type": "int32" }, "nullable": false}
|
||||
]}}"#,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
// version-only: allowed (open + checkout(version) both round-trip)
|
||||
conn.open_table("t").version(2).execute().await.unwrap();
|
||||
|
||||
// "main" is the default branch, so it counts as no branch
|
||||
conn.open_table("t")
|
||||
.branch("main")
|
||||
.version(2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// a non-main branch is rejected, with or without a version
|
||||
assert!(matches!(
|
||||
conn.open_table("t").branch("exp").execute().await,
|
||||
Err(Error::NotSupported { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
conn.open_table("t")
|
||||
.branch("exp")
|
||||
.version(2)
|
||||
.execute()
|
||||
.await,
|
||||
Err(Error::NotSupported { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table_not_found() {
|
||||
let conn = Connection::new_with_handler(|_| {
|
||||
|
||||
@@ -633,6 +633,23 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Check out an existing branch at an optional version, returning a handle.
|
||||
///
|
||||
/// `None` tracks the branch's latest; `Some(v)` pins it to that version
|
||||
/// (read-only). The default implementation composes [`Self::checkout_branch`]
|
||||
/// and [`Self::checkout`]; implementations may override it to resolve the
|
||||
/// `(branch, version)` coordinate in a single manifest read.
|
||||
async fn checkout_branch_version(
|
||||
&self,
|
||||
name: &str,
|
||||
version: Option<u64>,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
let branch = self.checkout_branch(name).await?;
|
||||
if let Some(version) = version {
|
||||
branch.checkout(version).await?;
|
||||
}
|
||||
Ok(branch)
|
||||
}
|
||||
/// List the branches of the table.
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>>;
|
||||
/// Delete a branch.
|
||||
@@ -1654,8 +1671,20 @@ impl Table {
|
||||
}
|
||||
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
pub async fn checkout_branch(&self, name: &str) -> Result<Self> {
|
||||
let inner = self.inner.checkout_branch(name).await?;
|
||||
///
|
||||
/// With `version` set, the returned handle is pinned to that version of the
|
||||
/// branch: a read-only, detached view (as with [`Self::checkout`]). With
|
||||
/// `version` as `None` it tracks the branch's latest and stays writable.
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::Table;
|
||||
/// # async fn f(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let exp_at_v3 = table.checkout_branch("exp", Some(3)).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn checkout_branch(&self, name: &str, version: Option<u64>) -> Result<Self> {
|
||||
let inner = self.inner.checkout_branch_version(name, version).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
@@ -2757,6 +2786,29 @@ impl BaseTable for NativeTable {
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn checkout_branch_version(
|
||||
&self,
|
||||
name: &str,
|
||||
version: Option<u64>,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
let Some(version) = version else {
|
||||
return self.checkout_branch(name).await;
|
||||
};
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
// Resolve (branch, version) in a single manifest read.
|
||||
let branch_ds = self
|
||||
.dataset
|
||||
.get()
|
||||
.await?
|
||||
.checkout_version((name, version))
|
||||
.await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_time_travel(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
Ok(self.dataset.get().await?.list_branches().await?)
|
||||
}
|
||||
@@ -3538,7 +3590,7 @@ mod tests {
|
||||
assert!(branches.contains_key("exp"));
|
||||
|
||||
// checking out the branch from the main handle sees the branch's latest data
|
||||
let checked_out = table.checkout_branch("exp").await.unwrap();
|
||||
let checked_out = table.checkout_branch("exp", None).await.unwrap();
|
||||
assert_eq!(checked_out.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(checked_out.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
@@ -3558,6 +3610,186 @@ mod tests {
|
||||
assert!(!branches.contains_key("exp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_version_checkout() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// main: a single fork-point row (i = 0)
|
||||
let table = conn
|
||||
.create_table("my_table", sample_rows(vec![0]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let fork_point = table.version().await.unwrap();
|
||||
|
||||
// Fork "exp", then advance exp AND main independently past the fork so
|
||||
// they diverge while sharing version numbers.
|
||||
let branch = table.create_branch("exp", fork_point).await.unwrap();
|
||||
let exp_fork = branch.version().await.unwrap(); // exp's shallow-clone version
|
||||
branch.add(sample_rows(vec![1])).execute().await.unwrap(); // exp: {0, 1}
|
||||
let exp_v2 = branch.version().await.unwrap();
|
||||
branch.add(sample_rows(vec![2])).execute().await.unwrap(); // exp HEAD: {0, 1, 2}
|
||||
|
||||
// main's own commit reaches the SAME version number with different data
|
||||
table
|
||||
.add(sample_rows(vec![100, 101, 102]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap(); // main HEAD: {0, 100, 101, 102}
|
||||
let main_v2 = table.version().await.unwrap();
|
||||
assert_eq!(
|
||||
exp_v2, main_v2,
|
||||
"branch and main must share the version number for this test to mean anything"
|
||||
);
|
||||
|
||||
// Open exp at the shared version. The data must be exp's, not main's:
|
||||
// count alone cannot prove this (main@v2 differs), so assert provenance
|
||||
// by content.
|
||||
let pinned = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.version(exp_v2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(pinned.current_branch().as_deref(), Some("exp"));
|
||||
// isolated from exp's HEAD (3 rows) and from main@v2 (4 rows)
|
||||
assert_eq!(pinned.count_rows(None).await.unwrap(), 2);
|
||||
// exp's post-fork row is visible; main's divergent rows are not
|
||||
assert_eq!(
|
||||
pinned.count_rows(Some("i = 1".to_string())).await.unwrap(),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
pinned
|
||||
.count_rows(Some("i = 100".to_string()))
|
||||
.await
|
||||
.unwrap(),
|
||||
0
|
||||
);
|
||||
|
||||
// the same coordinate is reachable directly via checkout_branch(name, version)
|
||||
let pinned_direct = table.checkout_branch("exp", Some(exp_v2)).await.unwrap();
|
||||
assert_eq!(pinned_direct.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(pinned_direct.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// the HEADs are unaffected
|
||||
let head = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(head.count_rows(None).await.unwrap(), 3);
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 4);
|
||||
|
||||
// a pinned version is a detached head: writes are rejected
|
||||
assert!(pinned.add(sample_rows(vec![9])).execute().await.is_err());
|
||||
|
||||
// version-only (no branch) time-travels main itself: its fork-point
|
||||
// version holds only main's first row, and the shared version number
|
||||
// resolves to main's data, not the branch's ("opens main at the version")
|
||||
let old_main = conn
|
||||
.open_table("my_table")
|
||||
.version(fork_point)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(old_main.current_branch(), None);
|
||||
assert_eq!(old_main.count_rows(None).await.unwrap(), 1);
|
||||
let shared_on_main = conn
|
||||
.open_table("my_table")
|
||||
.version(exp_v2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(shared_on_main.current_branch(), None);
|
||||
assert_eq!(shared_on_main.count_rows(None).await.unwrap(), 4);
|
||||
|
||||
// a nonexistent version is rejected
|
||||
assert!(
|
||||
conn.open_table("my_table")
|
||||
.version(9999)
|
||||
.execute()
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// a nonexistent version on a branch is rejected too: this resolves on
|
||||
// the branch's path, a distinct miss from the main lookup above
|
||||
assert!(
|
||||
conn.open_table("my_table")
|
||||
.branch("exp")
|
||||
.version(9999)
|
||||
.execute()
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// opening the branch at its fork point (the shallow-clone manifest)
|
||||
// shows just the cloned state: main's fork-point row
|
||||
let exp_at_fork = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.version(exp_fork)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(exp_at_fork.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(exp_at_fork.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// checkout_latest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
// (writable again), not main's HEAD, and not staying pinned
|
||||
pinned.checkout_latest().await.unwrap();
|
||||
assert_eq!(pinned.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(pinned.count_rows(None).await.unwrap(), 3); // exp HEAD, not main's 4
|
||||
pinned.add(sample_rows(vec![3])).execute().await.unwrap();
|
||||
assert_eq!(pinned.count_rows(None).await.unwrap(), 4); // writable again
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_version_two_branches() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("my_table", sample_rows(vec![0]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let fork_point = table.version().await.unwrap();
|
||||
|
||||
// two branches off the same point, each advanced once so they reach the
|
||||
// SAME version number with divergent data
|
||||
let exp1 = table.create_branch("exp1", fork_point).await.unwrap();
|
||||
let exp2 = table.create_branch("exp2", fork_point).await.unwrap();
|
||||
exp1.add(sample_rows(vec![10])).execute().await.unwrap();
|
||||
exp2.add(sample_rows(vec![20])).execute().await.unwrap();
|
||||
let v1 = exp1.version().await.unwrap();
|
||||
let v2 = exp2.version().await.unwrap();
|
||||
assert_eq!(v1, v2, "both branches must reach the same version number");
|
||||
|
||||
// that shared version number resolves to each branch's own data
|
||||
let at1 = table.checkout_branch("exp1", Some(v1)).await.unwrap();
|
||||
assert_eq!(at1.count_rows(Some("i = 10".to_string())).await.unwrap(), 1);
|
||||
assert_eq!(at1.count_rows(Some("i = 20".to_string())).await.unwrap(), 0);
|
||||
let at2 = table.checkout_branch("exp2", Some(v2)).await.unwrap();
|
||||
assert_eq!(at2.count_rows(Some("i = 20".to_string())).await.unwrap(), 1);
|
||||
assert_eq!(at2.count_rows(Some("i = 10".to_string())).await.unwrap(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_name_validation() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
@@ -3575,7 +3807,7 @@ mod tests {
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
table.checkout_branch("").await,
|
||||
table.checkout_branch("", None).await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
@@ -4014,6 +4246,19 @@ mod tests {
|
||||
Box::new(RecordBatchIterator::new(vec![batch], schema))
|
||||
}
|
||||
|
||||
/// A single-batch reader holding the given `i` (Int32) values. Lets a test
|
||||
/// write distinguishable rows so it can assert data provenance, not row count.
|
||||
fn sample_rows(values: Vec<i32>) -> Box<dyn arrow_array::RecordBatchReader + Send> {
|
||||
let batch = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
|
||||
vec![Arc::new(Int32Array::from(values))],
|
||||
)
|
||||
.unwrap();
|
||||
let schema = batch.schema().clone();
|
||||
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_scalar_index() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
|
||||
@@ -76,6 +76,23 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new wrapper pinned to the dataset's current version.
|
||||
///
|
||||
/// `dataset` must already be checked out at the desired version; this pins
|
||||
/// to `dataset.version()` without re-resolving. The wrapper is read-only
|
||||
/// (time-travel) until [`as_latest`](Self::as_latest) re-attaches it to the
|
||||
/// latest version.
|
||||
pub fn new_time_travel(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
|
||||
let version = dataset.version().version;
|
||||
let wrapper = Self::new_latest(dataset, read_consistency_interval);
|
||||
wrapper
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.pinned_version = Some(version);
|
||||
wrapper
|
||||
}
|
||||
|
||||
/// The MemWAL `ShardWriter` cache co-located with this dataset.
|
||||
pub(crate) fn shard_writer(&self) -> &Arc<ShardWriterCache> {
|
||||
&self.shard_writer
|
||||
|
||||
Reference in New Issue
Block a user