fix(python): use namespace-backed rust connection for namespace tables (#3286)

So far, I have been using a hacky approach that creates and opens
namespace-backed table, by getting its location and use a temporary
lancedb connection to create or open it. This was working for features
like credentials vending but is no longer fully working for the managed
versioning feature, recently geneva tests have been failing here and
there and various patches are not addressing the root cause. This PR
fully fixes this and implements proper rust binding for it.
Specifically:

- build a real Rust namespace-backed connection from the Python
namespace client
- route namespace table create/open through that connection instead of
resolved-location temp connections
- keep namespace client naming consistent in the Rust bridge and
preserve federated namespace + DuckDB behavior
This commit is contained in:
Jack Ye
2026-04-18 21:17:52 -07:00
committed by GitHub
parent d715bbb588
commit f909df3e87
7 changed files with 287 additions and 415 deletions

View File

@@ -915,7 +915,7 @@ use std::collections::HashSet;
/// These operations will be executed on the namespace server instead of locally
/// when enabled via [`ConnectNamespaceBuilder::pushdown_operations`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PushdownOperation {
pub enum NamespaceClientPushdownOperation {
/// Execute queries on the namespace server via `query_table()` instead of locally.
QueryTable,
/// Execute table creation on the namespace server via `create_table()`
@@ -931,7 +931,7 @@ pub struct ConnectNamespaceBuilder {
read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
}
impl ConnectNamespaceBuilder {
@@ -1029,11 +1029,11 @@ impl ConnectNamespaceBuilder {
/// and leveraging server-side compute resources.
///
/// Available operations:
/// - [`PushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
/// - [`PushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
/// - [`NamespaceClientPushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
/// - [`NamespaceClientPushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
///
/// By default, no operations are pushed down (all executed locally).
pub fn pushdown_operation(mut self, operation: PushdownOperation) -> Self {
pub fn pushdown_operation(mut self, operation: NamespaceClientPushdownOperation) -> Self {
self.pushdown_operations.insert(operation);
self
}
@@ -1043,7 +1043,7 @@ impl ConnectNamespaceBuilder {
/// See [`Self::pushdown_operation`] for details.
pub fn pushdown_operations(
mut self,
operations: impl IntoIterator<Item = PushdownOperation>,
operations: impl IntoIterator<Item = NamespaceClientPushdownOperation>,
) -> Self {
self.pushdown_operations.extend(operations);
self

View File

@@ -22,10 +22,11 @@ use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::connection::PushdownOperation;
use crate::connection::NamespaceClientPushdownOperation;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
use crate::table::NativeTable;
use lance::dataset::WriteMode;
use super::{
BaseTable, CloneTableRequest, CreateTableMode, CreateTableRequest as DbCreateTableRequest,
@@ -44,7 +45,7 @@ pub struct LanceNamespaceDatabase {
// database URI
uri: String,
// Operations to push down to the namespace server
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
// Namespace implementation type (e.g., "dir", "rest")
ns_impl: String,
// Namespace properties used to construct the namespace client
@@ -52,13 +53,34 @@ pub struct LanceNamespaceDatabase {
}
impl LanceNamespaceDatabase {
pub fn from_namespace_client(
namespace_client: Arc<dyn LanceNamespace>,
namespace_client_impl: String,
namespace_client_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Self {
Self {
namespace: namespace_client,
storage_options,
read_consistency_interval,
session,
uri: format!("namespace://{}", namespace_client_impl),
pushdown_operations: namespace_client_pushdown_operations,
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
}
}
pub async fn connect(
ns_impl: &str,
ns_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -163,37 +185,23 @@ impl Database for LanceNamespaceDatabase {
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let mut table_id = request.namespace_path.clone();
table_id.push(request.name.clone());
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
let mut existing_table = None;
match request.mode {
CreateTableMode::Create => {
if describe_result.is_ok() {
return Err(Error::TableAlreadyExists {
name: request.name.clone(),
});
}
}
CreateTableMode::Create => {}
CreateTableMode::Overwrite => {
if describe_result.is_ok() {
// Drop the existing table - must succeed
let drop_request = DropTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
self.namespace
.drop_table(drop_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to drop existing table for overwrite: {}", e),
})?;
}
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
existing_table = self.namespace.describe_table(describe_request).await.ok();
}
CreateTableMode::ExistOk(_) => {
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
if describe_result.is_ok() {
let native_table = NativeTable::open_from_namespace(
self.namespace.clone(),
@@ -221,21 +229,55 @@ impl Database for LanceNamespaceDatabase {
};
let (location, initial_storage_options, managed_versioning) = {
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning)
if let Some(response) = existing_table {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from describe_table response".to_string(),
})?;
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning)
} else {
let response = self
.namespace
.declare_table(declare_request)
.await
.map_err(|e| {
let err_str = e.to_string();
if matches!(request.mode, CreateTableMode::Create)
&& (err_str.contains("already exists")
|| err_str.contains("TableAlreadyExists")
|| err_str.contains("table already exists"))
{
Error::TableAlreadyExists {
name: request.name.clone(),
}
} else {
Error::Runtime {
message: format!("Failed to declare table: {}", e),
}
}
})?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning)
}
};
// Build write params with storage options and commit handler
let mut params = request.write_options.lance_write_params.unwrap_or_default();
if matches!(request.mode, CreateTableMode::Overwrite) {
params.mode = WriteMode::Overwrite;
}
// Set up storage options if provided
if let Some(storage_opts) = initial_storage_options {
let store_params = params

View File

@@ -47,7 +47,7 @@ use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::connection::PushdownOperation;
use crate::connection::NamespaceClientPushdownOperation;
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::database::Database;
@@ -1272,7 +1272,7 @@ pub struct NativeTable {
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
// Operations to push down to the namespace server.
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) pushdown_operations: HashSet<PushdownOperation>,
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
}
impl std::fmt::Debug for NativeTable {
@@ -1359,7 +1359,7 @@ impl NativeTable {
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
managed_versioning: Option<bool>,
) -> Result<Self> {
let params = params.unwrap_or_default();
@@ -1470,7 +1470,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
let mut params = params.unwrap_or_default();
@@ -1518,7 +1518,7 @@ impl NativeTable {
let id = Self::build_id(&namespace, name);
let stored_namespace_client =
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
Some(namespace_client)
} else {
None
@@ -1588,7 +1588,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1635,7 +1635,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Result<Self> {
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
Self::create(
@@ -1685,7 +1685,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
pushdown_operations: HashSet<PushdownOperation>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
// Build table_id from namespace + name for the storage options provider
@@ -1738,7 +1738,7 @@ impl NativeTable {
let id = Self::build_id(&namespace, name);
let stored_namespace_client =
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
Some(namespace_client)
} else {
None

View File

@@ -4,7 +4,7 @@
use std::sync::Arc;
use super::NativeTable;
use crate::connection::PushdownOperation;
use crate::connection::NamespaceClientPushdownOperation;
use crate::error::{Error, Result};
use crate::expr::expr_to_sql_string;
use crate::query::{
@@ -44,7 +44,7 @@ pub async fn execute_query(
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
if table
.pushdown_operations
.contains(&PushdownOperation::QueryTable)
.contains(&NamespaceClientPushdownOperation::QueryTable)
&& let Some(ref namespace_client) = table.namespace_client
{
return execute_namespace_query(table, namespace_client.clone(), query, options).await;