refactor(python): use namespace-backed rust connection

This commit is contained in:
Jack Ye
2026-04-18 17:09:13 -07:00
parent e34fe84c7f
commit f54f5600ad
2 changed files with 57 additions and 27 deletions

View File

@@ -38,6 +38,7 @@ from lance_namespace_urllib3_client.models.query_table_request_vector import (
QueryTableRequestVector,
)
from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery
from lance_namespace.errors import TableNotFoundError
from lancedb._lancedb import connect_namespace_client as _connect_namespace_client
from lancedb.background_loop import LOOP
from lancedb.db import AsyncConnection, DBConnection
@@ -546,14 +547,20 @@ class LanceNamespaceDBConnection(DBConnection):
) -> Table:
if namespace_path is None:
namespace_path = []
async_table = LOOP.run(
self._inner.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
try:
async_table = LOOP.run(
self._inner.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
)
)
except RuntimeError as e:
if "Table not found" in str(e):
table_id = namespace_path + [name]
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
raise
return LanceTable(
self,
@@ -971,12 +978,18 @@ class AsyncLanceNamespaceDBConnection:
"""Open an existing table from the namespace."""
if namespace_path is None:
namespace_path = []
return await self._inner.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
try:
return await self._inner.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
except RuntimeError as e:
if "Table not found" in str(e):
table_id = namespace_path + [name]
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
raise
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
"""Drop a table from the namespace."""

View File

@@ -184,22 +184,15 @@ impl Database for LanceNamespaceDatabase {
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let mut table_id = request.namespace_path.clone();
table_id.push(request.name.clone());
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
match request.mode {
CreateTableMode::Create => {
if describe_result.is_ok() {
return Err(Error::TableAlreadyExists {
name: request.name.clone(),
});
}
}
CreateTableMode::Create => {}
CreateTableMode::Overwrite => {
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
if describe_result.is_ok() {
// Drop the existing table - must succeed
let drop_request = DropTableRequest {
@@ -215,6 +208,11 @@ impl Database for LanceNamespaceDatabase {
}
}
CreateTableMode::ExistOk(_) => {
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
if describe_result.is_ok() {
let native_table = NativeTable::open_from_namespace(
self.namespace.clone(),
@@ -242,7 +240,26 @@ impl Database for LanceNamespaceDatabase {
};
let (location, initial_storage_options, managed_versioning) = {
let response = self.namespace.declare_table(declare_request).await?;
let response = self
.namespace
.declare_table(declare_request)
.await
.map_err(|e| {
let err_str = e.to_string();
if matches!(request.mode, CreateTableMode::Create)
&& (err_str.contains("already exists")
|| err_str.contains("TableAlreadyExists")
|| err_str.contains("table already exists"))
{
Error::TableAlreadyExists {
name: request.name.clone(),
}
} else {
Error::Runtime {
message: format!("Failed to declare table: {}", e),
}
}
})?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;