refactor!: consolidate namespace related naming and enterprise integration (#3205)

1. Refactored every client (Rust core, Python, Node/TypeScript) so
“namespace” usage is explicit: code now keeps namespace paths
(namespace_path) separate from namespace clients (namespace_client).
Connections propagate the client, table creation routes through it, and
managed versioning defaults are resolved from namespace metadata. Python
gained LanceNamespaceDBConnection/async counterparts, and the
namespace-focused tests were rewritten to match the clarified API
surface.
2. Synchronized the workspace with Lance 5.0.0-beta.3 (see
https://github.com/lance-format/lance/pull/6186 for the upstream
namespace refactor), updating Cargo/uv lockfiles and ensuring all
bindings align with the new namespace semantics.
3. Added a namespace-backed code path to lancedb.connect() via new
keyword arguments (namespace_client_impl, namespace_client_properties,
plus the existing pushdown-ops flag). When those kwargs are supplied,
connect() delegates to connect_namespace, so users can opt into
namespace clients without changing APIs. (The async helper will gain
parity in a later change)
This commit is contained in:
Jack Ye
2026-04-03 00:09:03 -07:00
committed by GitHub
parent 3ba46135a5
commit e26b22bcca
33 changed files with 2022 additions and 1609 deletions

View File

@@ -101,9 +101,9 @@ impl TableNamesBuilder {
self
}
/// Set the namespace to list tables from
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
/// Set the namespace path to list tables from
pub fn namespace(mut self, namespace_path: Vec<String>) -> Self {
self.request.namespace_path = namespace_path;
self
}
@@ -131,7 +131,7 @@ impl OpenTableBuilder {
parent,
request: OpenTableRequest {
name,
namespace: vec![],
namespace_path: vec![],
index_cache_size: None,
lance_read_params: None,
location: None,
@@ -206,9 +206,9 @@ impl OpenTableBuilder {
self
}
/// Set the namespace for the table
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
/// Set the namespace path for the table
pub fn namespace(mut self, namespace_path: Vec<String>) -> Self {
self.request.namespace_path = namespace_path;
self
}
@@ -303,9 +303,9 @@ impl CloneTableBuilder {
self
}
/// Set the target namespace for the cloned table
pub fn target_namespace(mut self, namespace: Vec<String>) -> Self {
self.request.target_namespace = namespace;
/// Set the target namespace path for the cloned table
pub fn target_namespace(mut self, namespace_path: Vec<String>) -> Self {
self.request.target_namespace_path = namespace_path;
self
}
@@ -456,15 +456,15 @@ impl Connection {
&self,
old_name: impl AsRef<str>,
new_name: impl AsRef<str>,
cur_namespace: &[String],
new_namespace: &[String],
cur_namespace_path: &[String],
new_namespace_path: &[String],
) -> Result<()> {
self.internal
.rename_table(
old_name.as_ref(),
new_name.as_ref(),
cur_namespace,
new_namespace,
cur_namespace_path,
new_namespace_path,
)
.await
}
@@ -478,9 +478,11 @@ impl Connection {
///
/// # Arguments
/// * `name` - The name of the table to drop
/// * `namespace` - The namespace to drop the table from
pub async fn drop_table(&self, name: impl AsRef<str>, namespace: &[String]) -> Result<()> {
self.internal.drop_table(name.as_ref(), namespace).await
/// * `namespace_path` - The namespace path to drop the table from
pub async fn drop_table(&self, name: impl AsRef<str>, namespace_path: &[String]) -> Result<()> {
self.internal
.drop_table(name.as_ref(), namespace_path)
.await
}
/// Drop the database
@@ -494,9 +496,9 @@ impl Connection {
/// Drops all tables in the database
///
/// # Arguments
/// * `namespace` - The namespace to drop all tables from. Empty slice represents root namespace.
pub async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
self.internal.drop_all_tables(namespace).await
/// * `namespace_path` - The namespace path to drop all tables from. Empty slice represents root namespace.
pub async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
self.internal.drop_all_tables(namespace_path).await
}
/// List immediate child namespace names in the given namespace
@@ -862,6 +864,21 @@ pub fn connect(uri: &str) -> ConnectBuilder {
ConnectBuilder::new(uri)
}
use std::collections::HashSet;
/// Operations that can be pushed down to the namespace server.
///
/// These operations will be executed on the namespace server instead of locally
/// when enabled via [`ConnectNamespaceBuilder::pushdown_operations`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PushdownOperation {
/// Execute queries on the namespace server via `query_table()` instead of locally.
QueryTable,
/// Execute table creation on the namespace server via `create_table()`
/// instead of using `declare_table` + local write.
CreateTable,
}
pub struct ConnectNamespaceBuilder {
ns_impl: String,
properties: HashMap<String, String>,
@@ -869,7 +886,7 @@ pub struct ConnectNamespaceBuilder {
read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
session: Option<Arc<lance::session::Session>>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
}
impl ConnectNamespaceBuilder {
@@ -881,7 +898,7 @@ impl ConnectNamespaceBuilder {
read_consistency_interval: None,
embedding_registry: None,
session: None,
server_side_query_enabled: false,
pushdown_operations: HashSet::new(),
}
}
@@ -936,15 +953,30 @@ impl ConnectNamespaceBuilder {
self
}
/// Enable server-side query execution.
/// Add operations to push down to the namespace server.
///
/// When enabled, queries will be executed on the namespace server instead of
/// locally. This can improve performance by reducing data transfer and
/// leveraging server-side compute resources.
/// When operations are added, they will be executed on the namespace server
/// instead of locally. This can improve performance by reducing data transfer
/// and leveraging server-side compute resources.
///
/// Default is `false` (queries executed locally).
pub fn server_side_query(mut self, enabled: bool) -> Self {
self.server_side_query_enabled = enabled;
/// Available operations:
/// - [`PushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
/// - [`PushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
///
/// By default, no operations are pushed down (all executed locally).
pub fn pushdown_operation(mut self, operation: PushdownOperation) -> Self {
self.pushdown_operations.insert(operation);
self
}
/// Add multiple operations to push down to the namespace server.
///
/// See [`Self::pushdown_operation`] for details.
pub fn pushdown_operations(
mut self,
operations: impl IntoIterator<Item = PushdownOperation>,
) -> Self {
self.pushdown_operations.extend(operations);
self
}
@@ -959,7 +991,7 @@ impl ConnectNamespaceBuilder {
self.storage_options,
self.read_consistency_interval,
self.session,
self.server_side_query_enabled,
self.pushdown_operations,
)
.await?,
);

View File

@@ -111,9 +111,9 @@ impl CreateTableBuilder {
Ok(self)
}
/// Set the namespace for the table
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
/// Set the namespace path for the table
pub fn namespace(mut self, namespace_path: Vec<String>) -> Self {
self.request.namespace_path = namespace_path;
self
}

View File

@@ -40,8 +40,8 @@ pub trait DatabaseOptions {
/// A request to list names of tables in the database (deprecated, use ListTablesRequest)
#[derive(Clone, Debug, Default)]
pub struct TableNamesRequest {
/// The namespace to list tables in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// The namespace path to list tables in. Empty list represents root namespace.
pub namespace_path: Vec<String>,
/// If present, only return names that come lexicographically after the supplied
/// value.
///
@@ -56,8 +56,8 @@ pub struct TableNamesRequest {
#[derive(Clone)]
pub struct OpenTableRequest {
pub name: String,
/// The namespace to open the table from. Empty list represents root namespace.
pub namespace: Vec<String>,
/// The namespace path to open the table from. Empty list represents root namespace.
pub namespace_path: Vec<String>,
pub index_cache_size: Option<u32>,
pub lance_read_params: Option<ReadParams>,
/// Optional custom location for the table. If not provided, the database will
@@ -76,7 +76,7 @@ impl std::fmt::Debug for OpenTableRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenTableRequest")
.field("name", &self.name)
.field("namespace", &self.namespace)
.field("namespace_path", &self.namespace_path)
.field("index_cache_size", &self.index_cache_size)
.field("lance_read_params", &self.lance_read_params)
.field("location", &self.location)
@@ -115,8 +115,8 @@ impl CreateTableMode {
pub struct CreateTableRequest {
/// The name of the new table
pub name: String,
/// The namespace to create the table in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// The namespace path to create the table in. Empty list represents root namespace.
pub namespace_path: Vec<String>,
/// Initial data to write to the table, can be empty.
pub data: Box<dyn Scannable>,
/// The mode to use when creating the table
@@ -135,7 +135,7 @@ impl CreateTableRequest {
pub fn new(name: String, data: Box<dyn Scannable>) -> Self {
Self {
name,
namespace: vec![],
namespace_path: vec![],
data,
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
@@ -155,8 +155,8 @@ impl CreateTableRequest {
pub struct CloneTableRequest {
/// The name of the target table to create
pub target_table_name: String,
/// The namespace for the target table. Empty list represents root namespace.
pub target_namespace: Vec<String>,
/// The namespace path for the target table. Empty list represents root namespace.
pub target_namespace_path: Vec<String>,
/// The URI of the source table to clone from.
pub source_uri: String,
/// Optional version of the source table to clone.
@@ -175,7 +175,7 @@ impl CloneTableRequest {
pub fn new(target_table_name: String, source_uri: String) -> Self {
Self {
target_table_name,
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -251,13 +251,13 @@ pub trait Database:
&self,
cur_name: &str,
new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
cur_namespace_path: &[String],
new_namespace_path: &[String],
) -> Result<()>;
/// Drop a table in the database
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()>;
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()>;
/// Drop all tables in the database
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()>;
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()>;
fn as_any(&self) -> &dyn std::any::Any;
/// Get the equivalent namespace client of this database

View File

@@ -3,6 +3,7 @@
//! Provides the `ListingDatabase`, a simple database where tables are folders in a directory
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::path::Path;
use std::{collections::HashMap, sync::Arc};
@@ -653,7 +654,7 @@ impl ListingDatabase {
async fn handle_table_exists(
&self,
table_name: &str,
namespace: Vec<String>,
namespace_path: Vec<String>,
mode: CreateTableMode,
data_schema: &arrow_schema::Schema,
) -> Result<Arc<dyn BaseTable>> {
@@ -664,7 +665,7 @@ impl ListingDatabase {
CreateTableMode::ExistOk(callback) => {
let req = OpenTableRequest {
name: table_name.to_string(),
namespace: namespace.clone(),
namespace_path: namespace_path.clone(),
index_cache_size: None,
lance_read_params: None,
location: None,
@@ -751,7 +752,7 @@ impl Database for ListingDatabase {
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
if !request.namespace.is_empty() {
if !request.namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
@@ -838,7 +839,7 @@ impl Database for ListingDatabase {
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided
if !request.namespace.is_empty() && request.location.is_none() {
if !request.namespace_path.is_empty() && request.location.is_none() {
return Err(Error::InvalidInput {
message: "Location must be provided when namespace is not empty".into(),
});
@@ -864,13 +865,13 @@ impl Database for ListingDatabase {
match NativeTable::create(
&table_uri,
&request.name,
request.namespace.clone(),
request.namespace_path.clone(),
request.data,
self.store_wrapper.clone(),
Some(write_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
HashSet::new(), // listing database doesn't support server-side queries
)
.await
{
@@ -878,7 +879,7 @@ impl Database for ListingDatabase {
Err(Error::TableAlreadyExists { .. }) => {
self.handle_table_exists(
&request.name,
request.namespace.clone(),
request.namespace_path.clone(),
request.mode,
&data_schema,
)
@@ -889,7 +890,7 @@ impl Database for ListingDatabase {
}
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.target_namespace.is_empty() {
if !request.target_namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
@@ -944,13 +945,13 @@ impl Database for ListingDatabase {
let cloned_table = NativeTable::open_with_params(
&target_uri,
&request.target_table_name,
request.target_namespace,
request.target_namespace_path,
self.store_wrapper.clone(),
None,
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
None, // managed_versioning - will be queried if namespace_client is provided
HashSet::new(), // listing database doesn't support server-side queries
None, // managed_versioning - will be queried if namespace_client is provided
)
.await?;
@@ -959,7 +960,7 @@ impl Database for ListingDatabase {
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided
if !request.namespace.is_empty() && request.location.is_none() {
if !request.namespace_path.is_empty() && request.location.is_none() {
return Err(Error::InvalidInput {
message: "Location must be provided when namespace is not empty".into(),
});
@@ -1021,12 +1022,12 @@ impl Database for ListingDatabase {
NativeTable::open_with_params(
&table_uri,
&request.name,
request.namespace,
request.namespace_path,
self.store_wrapper.clone(),
Some(read_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
HashSet::new(), // listing database doesn't support server-side queries
request.managed_versioning, // Pass through managed_versioning from request
)
.await?,
@@ -1038,15 +1039,15 @@ impl Database for ListingDatabase {
&self,
_cur_name: &str,
_new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
cur_namespace_path: &[String],
new_namespace_path: &[String],
) -> Result<()> {
if !cur_namespace.is_empty() {
if !cur_namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
}
if !new_namespace.is_empty() {
if !new_namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
@@ -1056,8 +1057,8 @@ impl Database for ListingDatabase {
})
}
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
if !namespace.is_empty() {
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
if !namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
@@ -1066,9 +1067,9 @@ impl Database for ListingDatabase {
}
#[allow(deprecated)]
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
// Check if namespace parameter is provided
if !namespace.is_empty() {
if !namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
@@ -1146,7 +1147,7 @@ mod tests {
let source_table = db
.create_table(CreateTableRequest {
name: "source_table".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema.clone())) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1163,7 +1164,7 @@ mod tests {
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_table".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri: source_uri.clone(),
source_version: None,
source_tag: None,
@@ -1208,7 +1209,7 @@ mod tests {
let source_table = db
.create_table(CreateTableRequest {
name: "source_with_data".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1224,7 +1225,7 @@ mod tests {
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_with_data".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -1268,7 +1269,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1284,7 +1285,7 @@ mod tests {
let cloned = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -1305,7 +1306,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1321,7 +1322,7 @@ mod tests {
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -1346,7 +1347,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1362,7 +1363,7 @@ mod tests {
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec!["namespace".to_string()], // Non-empty namespace
target_namespace_path: vec!["namespace".to_string()], // Non-empty namespace
source_uri,
source_version: None,
source_tag: None,
@@ -1387,7 +1388,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1403,7 +1404,7 @@ mod tests {
let result = db
.clone_table(CloneTableRequest {
target_table_name: "invalid/name".to_string(), // Invalid name with slash
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -1423,7 +1424,7 @@ mod tests {
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri: "/nonexistent/table.lance".to_string(),
source_version: None,
source_tag: None,
@@ -1444,7 +1445,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1460,7 +1461,7 @@ mod tests {
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: Some(1),
source_tag: Some("v1.0".to_string()),
@@ -1498,7 +1499,7 @@ mod tests {
let source_table = db
.create_table(CreateTableRequest {
name: "versioned_source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch1) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1534,7 +1535,7 @@ mod tests {
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_from_version".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: Some(initial_version),
source_tag: None,
@@ -1573,7 +1574,7 @@ mod tests {
let source_table = db
.create_table(CreateTableRequest {
name: "tagged_source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch1),
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1613,7 +1614,7 @@ mod tests {
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_from_tag".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: Some("v1.0".to_string()),
@@ -1649,7 +1650,7 @@ mod tests {
let source_table = db
.create_table(CreateTableRequest {
name: "independent_source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch1),
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1665,7 +1666,7 @@ mod tests {
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "independent_clone".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -1725,7 +1726,7 @@ mod tests {
let source_table = db
.create_table(CreateTableRequest {
name: "latest_version_source".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch1),
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1758,7 +1759,7 @@ mod tests {
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_latest".to_string(),
target_namespace: vec![],
target_namespace_path: vec![],
source_uri,
source_version: None,
source_tag: None,
@@ -1812,7 +1813,7 @@ mod tests {
let table = db
.create_table(CreateTableRequest {
name: "test_stable".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch),
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -1863,7 +1864,7 @@ mod tests {
let table = db
.create_table(CreateTableRequest {
name: "test_stable_table_level".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch),
mode: CreateTableMode::Create,
write_options,
@@ -1934,7 +1935,7 @@ mod tests {
let table = db
.create_table(CreateTableRequest {
name: "test_override".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(batch),
mode: CreateTableMode::Create,
write_options,
@@ -2052,7 +2053,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "table1".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema.clone())) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
@@ -2064,7 +2065,7 @@ mod tests {
db.create_table(CreateTableRequest {
name: "table2".to_string(),
namespace: vec![],
namespace_path: vec![],
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),

View File

@@ -3,7 +3,7 @@
//! Namespace-based database implementation that delegates table management to lance-namespace
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use async_trait::async_trait;
@@ -22,6 +22,7 @@ use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::connection::PushdownOperation;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
use crate::table::NativeTable;
@@ -42,8 +43,8 @@ pub struct LanceNamespaceDatabase {
session: Option<Arc<lance::session::Session>>,
// database URI
uri: String,
// Whether to enable server-side query execution
server_side_query_enabled: bool,
// Operations to push down to the namespace server
pushdown_operations: HashSet<PushdownOperation>,
}
impl LanceNamespaceDatabase {
@@ -53,7 +54,7 @@ impl LanceNamespaceDatabase {
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -72,7 +73,7 @@ impl LanceNamespaceDatabase {
read_consistency_interval,
session,
uri: format!("namespace://{}", ns_impl),
server_side_query_enabled,
pushdown_operations,
})
}
}
@@ -82,7 +83,7 @@ impl std::fmt::Debug for LanceNamespaceDatabase {
f.debug_struct("LanceNamespaceDatabase")
.field("storage_options", &self.storage_options)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("server_side_query_enabled", &self.server_side_query_enabled)
.field("pushdown_operations", &self.pushdown_operations)
.finish()
}
}
@@ -138,7 +139,7 @@ impl Database for LanceNamespaceDatabase {
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let ns_request = ListTablesRequest {
id: Some(request.namespace),
id: Some(request.namespace_path),
page_token: request.start_after,
limit: request.limit.map(|l| l as i32),
..Default::default()
@@ -154,7 +155,7 @@ impl Database for LanceNamespaceDatabase {
}
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let mut table_id = request.namespace.clone();
let mut table_id = request.namespace_path.clone();
table_id.push(request.name.clone());
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
@@ -191,11 +192,11 @@ impl Database for LanceNamespaceDatabase {
let native_table = NativeTable::open_from_namespace(
self.namespace.clone(),
&request.name,
request.namespace.clone(),
request.namespace_path.clone(),
None,
None,
self.read_consistency_interval,
self.server_side_query_enabled,
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
@@ -205,7 +206,7 @@ impl Database for LanceNamespaceDatabase {
}
}
let mut table_id = request.namespace.clone();
let mut table_id = request.namespace_path.clone();
table_id.push(request.name.clone());
let declare_request = DeclareTableRequest {
@@ -255,12 +256,12 @@ impl Database for LanceNamespaceDatabase {
self.namespace.clone(),
&location,
&request.name,
request.namespace.clone(),
request.namespace_path.clone(),
request.data,
None, // write_store_wrapper not used for namespace connections
write_params,
self.read_consistency_interval,
self.server_side_query_enabled,
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
@@ -272,11 +273,11 @@ impl Database for LanceNamespaceDatabase {
let native_table = NativeTable::open_from_namespace(
self.namespace.clone(),
&request.name,
request.namespace.clone(),
request.namespace_path.clone(),
None, // write_store_wrapper not used for namespace connections
request.lance_read_params,
self.read_consistency_interval,
self.server_side_query_enabled,
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
@@ -294,16 +295,16 @@ impl Database for LanceNamespaceDatabase {
&self,
_cur_name: &str,
_new_name: &str,
_cur_namespace: &[String],
_new_namespace: &[String],
_cur_namespace_path: &[String],
_new_namespace_path: &[String],
) -> Result<()> {
Err(Error::NotSupported {
message: "rename_table is not supported for namespace connections".to_string(),
})
}
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
let mut table_id = namespace.to_vec();
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
let mut table_id = namespace_path.to_vec();
table_id.push(name.to_string());
let drop_request = DropTableRequest {
@@ -321,17 +322,17 @@ impl Database for LanceNamespaceDatabase {
}
#[allow(deprecated)]
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
let tables = self
.table_names(TableNamesRequest {
namespace: namespace.to_vec(),
namespace_path: namespace_path.to_vec(),
start_after: None,
limit: None,
})
.await?;
for table in tables {
self.drop_table(&table, namespace).await?;
self.drop_table(&table, namespace_path).await?;
}
Ok(())

View File

@@ -362,9 +362,9 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let mut req = if !request.namespace.is_empty() {
let mut req = if !request.namespace_path.is_empty() {
let namespace_id =
build_namespace_identifier(&request.namespace, &self.client.id_delimiter);
build_namespace_identifier(&request.namespace_path, &self.client.id_delimiter);
self.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id))
} else {
@@ -387,12 +387,12 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.tables;
for table in &tables {
let table_identifier =
build_table_identifier(table, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &request.namespace);
build_table_identifier(table, &request.namespace_path, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &request.namespace_path);
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
request.namespace.clone(),
request.namespace_path.clone(),
table_identifier.clone(),
version.clone(),
));
@@ -442,8 +442,11 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn create_table(&self, mut request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let body = stream_as_body(request.data.scan_as_stream())?;
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let identifier = build_table_identifier(
&request.name,
&request.namespace_path,
&self.client.id_delimiter,
);
let req = self
.client
.post(&format!("/v1/table/{}/create/", identifier))
@@ -463,7 +466,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
CreateTableMode::ExistOk(callback) => {
let req = OpenTableRequest {
name: request.name.clone(),
namespace: request.namespace.clone(),
namespace_path: request.namespace_path.clone(),
index_cache_size: None,
lance_read_params: None,
location: None,
@@ -495,13 +498,16 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table_identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(&request.name, &request.namespace);
let table_identifier = build_table_identifier(
&request.name,
&request.namespace_path,
&self.client.id_delimiter,
);
let cache_key = build_cache_key(&request.name, &request.namespace_path);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
request.namespace.clone(),
request.namespace_path.clone(),
table_identifier,
version,
));
@@ -513,7 +519,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
let table_identifier = build_table_identifier(
&request.target_table_name,
&request.target_namespace,
&request.target_namespace_path,
&self.client.id_delimiter,
);
@@ -542,11 +548,11 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
let version = parse_server_version(&request_id, &rsp)?;
let cache_key = build_cache_key(&request.target_table_name, &request.target_namespace);
let cache_key = build_cache_key(&request.target_table_name, &request.target_namespace_path);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.target_table_name.clone(),
request.target_namespace.clone(),
request.target_namespace_path.clone(),
table_identifier,
version,
));
@@ -556,9 +562,12 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(&request.name, &request.namespace);
let identifier = build_table_identifier(
&request.name,
&request.namespace_path,
&self.client.id_delimiter,
);
let cache_key = build_cache_key(&request.name, &request.namespace_path);
// We describe the table to confirm it exists before moving on.
if let Some(table) = self.table_cache.get(&cache_key).await {
@@ -574,17 +583,17 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let version = parse_server_version(&request_id, &rsp)?;
let table_identifier = build_table_identifier(
&request.name,
&request.namespace,
&request.namespace_path,
&self.client.id_delimiter,
);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
request.namespace.clone(),
request.namespace_path.clone(),
table_identifier,
version,
));
let cache_key = build_cache_key(&request.name, &request.namespace);
let cache_key = build_cache_key(&request.name, &request.namespace_path);
self.table_cache.insert(cache_key, table.clone()).await;
Ok(table)
}
@@ -594,18 +603,18 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
&self,
current_name: &str,
new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
cur_namespace_path: &[String],
new_namespace_path: &[String],
) -> Result<()> {
let current_identifier =
build_table_identifier(current_name, cur_namespace, &self.client.id_delimiter);
let current_cache_key = build_cache_key(current_name, cur_namespace);
let new_cache_key = build_cache_key(new_name, new_namespace);
build_table_identifier(current_name, cur_namespace_path, &self.client.id_delimiter);
let current_cache_key = build_cache_key(current_name, cur_namespace_path);
let new_cache_key = build_cache_key(new_name, new_namespace_path);
let mut body = serde_json::json!({ "new_table_name": new_name });
if !new_namespace.is_empty() {
if !new_namespace_path.is_empty() {
body["new_namespace"] = serde_json::Value::Array(
new_namespace
new_namespace_path
.iter()
.map(|s| serde_json::Value::String(s.clone()))
.collect(),
@@ -624,9 +633,9 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(())
}
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
let identifier = build_table_identifier(name, namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(name, namespace);
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
let identifier = build_table_identifier(name, namespace_path, &self.client.id_delimiter);
let cache_key = build_cache_key(name, namespace_path);
let req = self.client.post(&format!("/v1/table/{}/drop/", identifier));
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
@@ -634,9 +643,9 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(())
}
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
// TODO: Implement namespace-aware drop_all_tables
let _namespace = namespace; // Suppress unused warning for now
let _namespace_path = namespace_path; // Suppress unused warning for now
Err(crate::Error::NotSupported {
message: "Dropping all tables is not currently supported in the remote API".to_string(),
})

View File

@@ -19,11 +19,11 @@ pub use lance::dataset::Version;
use lance::dataset::WriteMode;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{InsertBuilder, WriteParams};
use lance::index::DatasetIndexExt;
use lance::index::vector::VectorIndexParams;
use lance::index::vector::utils::infer_vector_dim;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lance_datafusion::utils::StreamingWriteSource;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::vector::bq::RQBuildParams;
@@ -42,11 +42,13 @@ use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::ManifestNamingScheme;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::connection::PushdownOperation;
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::database::Database;
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
@@ -1268,10 +1270,9 @@ pub struct NativeTable {
// Optional namespace client for namespace operations (e.g., managed versioning).
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
// Whether to enable server-side query execution via the namespace client.
// When true and namespace_client is set, queries will be executed on the
// namespace server instead of locally.
pub(crate) server_side_query_enabled: bool,
// Operations to push down to the namespace server.
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) pushdown_operations: HashSet<PushdownOperation>,
}
impl std::fmt::Debug for NativeTable {
@@ -1283,7 +1284,7 @@ impl std::fmt::Debug for NativeTable {
.field("uri", &self.uri)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("namespace_client", &self.namespace_client)
.field("server_side_query_enabled", &self.server_side_query_enabled)
.field("pushdown_operations", &self.pushdown_operations)
.finish()
}
}
@@ -1320,7 +1321,18 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, vec![], None, None, None, None, false, None).await
Self::open_with_params(
uri,
&name,
vec![],
None,
None,
None,
None,
HashSet::new(),
None,
)
.await
}
/// Opens an existing Table
@@ -1331,7 +1343,7 @@ impl NativeTable {
/// * `name` The Table name
/// * `params` The [ReadParams] to use when opening the table
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `pushdown_operations` - Operations to push down to the namespace server
/// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
/// is provided, the value will be fetched via describe_table.
///
@@ -1347,7 +1359,7 @@ impl NativeTable {
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
managed_versioning: Option<bool>,
) -> Result<Self> {
let params = params.unwrap_or_default();
@@ -1417,7 +1429,7 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
pushdown_operations,
})
}
@@ -1443,10 +1455,8 @@ impl NativeTable {
/// * `write_store_wrapper` - Optional wrapper for the object store on write path
/// * `params` - Optional read parameters
/// * `read_consistency_interval` - Optional interval for read consistency
/// * `server_side_query_enabled` - Whether to enable server-side query execution.
/// When true, the namespace_client will be stored and queries will be executed
/// on the namespace server. When false, the namespace is only used for opening
/// the table, and queries are executed locally.
/// * `pushdown_operations` - Operations to push down to the namespace server.
/// When `QueryTable` is included, queries will be executed on the namespace server.
/// * `session` - Optional session for object stores and caching
///
/// # Returns
@@ -1460,7 +1470,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
let mut params = params.unwrap_or_default();
@@ -1507,11 +1517,12 @@ impl NativeTable {
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
let id = Self::build_id(&namespace, name);
let stored_namespace_client = if server_side_query_enabled {
Some(namespace_client)
} else {
None
};
let stored_namespace_client =
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
Some(namespace_client)
} else {
None
};
Ok(Self {
name: name.to_string(),
@@ -1521,7 +1532,7 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
pushdown_operations,
})
}
@@ -1562,7 +1573,7 @@ impl NativeTable {
/// * `batches` RecordBatch to be saved in the database.
/// * `params` - Write parameters.
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `pushdown_operations` - Operations to push down to the namespace server
///
/// # Returns
///
@@ -1577,7 +1588,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1610,7 +1621,7 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client,
server_side_query_enabled,
pushdown_operations,
})
}
@@ -1624,7 +1635,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
) -> Result<Self> {
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
Self::create(
@@ -1636,7 +1647,7 @@ impl NativeTable {
params,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
pushdown_operations,
)
.await
}
@@ -1659,7 +1670,7 @@ impl NativeTable {
/// * `write_store_wrapper` - Optional wrapper for the object store on write path
/// * `params` - Optional write parameters
/// * `read_consistency_interval` - Optional interval for read consistency
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `pushdown_operations` - Operations to push down to the namespace server
///
/// # Returns
///
@@ -1674,7 +1685,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
server_side_query_enabled: bool,
pushdown_operations: HashSet<PushdownOperation>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
// Build table_id from namespace + name for the storage options provider
@@ -1726,11 +1737,12 @@ impl NativeTable {
let id = Self::build_id(&namespace, name);
let stored_namespace_client = if server_side_query_enabled {
Some(namespace_client)
} else {
None
};
let stored_namespace_client =
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
Some(namespace_client)
} else {
None
};
Ok(Self {
name: name.to_string(),
@@ -1740,7 +1752,7 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
pushdown_operations,
})
}
@@ -2751,9 +2763,19 @@ mod tests {
vec![Ok(batch.clone())],
batch.schema(),
));
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None, false)
.await
.unwrap();
let table = NativeTable::create(
uri,
"test",
vec![],
reader,
None,
None,
None,
None,
HashSet::new(),
)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
assert_eq!(
@@ -3780,7 +3802,7 @@ mod tests {
TableStatistics {
num_rows: 250,
num_indices: 0,
total_bytes: 2000,
total_bytes: 2300,
fragment_stats: FragmentStatistics {
num_fragments: 11,
num_small_fragments: 11,

View File

@@ -10,7 +10,7 @@ use std::sync::Arc;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{CompactionMetrics, IndexRemapperOptions, compact_files};
use lance_index::DatasetIndexExt;
use lance::index::DatasetIndexExt;
use lance_index::optimize::OptimizeOptions;
use log::info;

View File

@@ -4,6 +4,7 @@
use std::sync::Arc;
use super::NativeTable;
use crate::connection::PushdownOperation;
use crate::error::{Error, Result};
use crate::expr::expr_to_sql_string;
use crate::query::{
@@ -40,8 +41,10 @@ pub async fn execute_query(
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// If server-side query is enabled and namespace client is configured, use server-side query execution
if table.server_side_query_enabled
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
if table
.pushdown_operations
.contains(&PushdownOperation::QueryTable)
&& let Some(ref namespace_client) = table.namespace_client
{
return execute_namespace_query(table, namespace_client.clone(), query, options).await;