mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-05 05:10:41 +00:00
fix: address review comments on branch support
This commit is contained in:
@@ -978,12 +978,13 @@ class AsyncLanceNamespaceDBConnection:
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
) -> AsyncTable:
|
||||
"""Open an existing table from the namespace."""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
try:
|
||||
return await self._inner.open_table(
|
||||
tbl = await self._inner.open_table(
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
@@ -994,6 +995,9 @@ class AsyncLanceNamespaceDBConnection:
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
if branch is not None:
|
||||
return await tbl.branches.checkout(branch)
|
||||
return tbl
|
||||
|
||||
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
"""Drop a table from the namespace."""
|
||||
|
||||
@@ -982,6 +982,19 @@ def test_open_table_with_branch(tmp_path):
|
||||
assert db.open_table("t").count_rows() == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch(tmp_path):
|
||||
db = lancedb.connect_namespace_async("dir", {"root": str(tmp_path)})
|
||||
await db.create_namespace(["ns1"])
|
||||
table = await db.create_table("t", [{"id": 1}], namespace_path=["ns1"])
|
||||
branch = await table.branches.create("exp")
|
||||
await branch.add([{"id": 2}])
|
||||
|
||||
# open_table(branch=...) on the async namespace connection must work
|
||||
opened = await db.open_table("t", namespace_path=["ns1"], branch="exp")
|
||||
assert await opened.count_rows() == 2
|
||||
|
||||
|
||||
def test_branch_to_lance_targets_branch(tmp_path):
|
||||
pytest.importorskip("lance")
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
@@ -740,6 +740,64 @@ mod tests {
|
||||
assert!(table_names.contains(&"test_table".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_branch_query_under_pushdown_stays_local() {
|
||||
// With QueryTable pushdown enabled, a query on the main branch routes to
|
||||
// the namespace server, but a branch handle must run locally: the
|
||||
// server-side request carries no branch and would return main's rows.
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.pushdown_operation(NamespaceClientPushdownOperation::QueryTable)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
conn.create_namespace(CreateNamespaceRequest {
|
||||
id: Some(vec!["test_ns".into()]),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("Failed to create namespace");
|
||||
|
||||
// main has 5 rows
|
||||
let table = conn
|
||||
.create_table("ref_test", create_test_data())
|
||||
.namespace(vec!["test_ns".into()])
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let main_version = table.version().await.unwrap();
|
||||
|
||||
// fork a branch off main, then add 5 more rows so it differs from main
|
||||
let branch = table
|
||||
.create_branch("exp", main_version)
|
||||
.await
|
||||
.expect("Failed to create branch");
|
||||
branch
|
||||
.add(create_test_data())
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to append to branch");
|
||||
|
||||
// the branch query must run locally and see the branch's 10 rows --
|
||||
// not get routed to the server (which carries no branch) and see main's 5
|
||||
let results = branch
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query branch")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
let count: usize = results.iter().map(|b| b.num_rows()).sum();
|
||||
assert_eq!(count, 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_describe_table() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
|
||||
@@ -41,11 +41,14 @@ pub async fn execute_query(
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
|
||||
// QueryTable pushdown runs the query server-side, but only on the main
|
||||
// branch: the namespace request carries no branch yet, so a branch handle
|
||||
// must fall through to local execution.
|
||||
if table
|
||||
.pushdown_operations
|
||||
.contains(&NamespaceClientPushdownOperation::QueryTable)
|
||||
&& let Some(ref namespace_client) = table.namespace_client
|
||||
&& table.dataset.current_branch().is_none()
|
||||
{
|
||||
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user