mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-13 18:10:41 +00:00
refactor(rust): align namespace client naming
This commit is contained in:
@@ -10,7 +10,7 @@ use std::{
|
||||
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
|
||||
use lancedb::{
|
||||
connection::Connection as LanceConnection,
|
||||
connection::PushdownOperation,
|
||||
connection::NamespaceClientPushdownOperation,
|
||||
database::namespace::LanceNamespaceDatabase,
|
||||
database::{CreateTableMode, Database, ReadConsistency},
|
||||
};
|
||||
@@ -45,17 +45,17 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_pushdown_operations(
|
||||
fn parse_namespace_client_pushdown_operations(
|
||||
operations: Option<Vec<String>>,
|
||||
) -> PyResult<HashSet<PushdownOperation>> {
|
||||
) -> PyResult<HashSet<NamespaceClientPushdownOperation>> {
|
||||
let mut parsed = HashSet::new();
|
||||
for operation in operations.unwrap_or_default() {
|
||||
match operation.as_str() {
|
||||
"QueryTable" => {
|
||||
parsed.insert(PushdownOperation::QueryTable);
|
||||
parsed.insert(NamespaceClientPushdownOperation::QueryTable);
|
||||
}
|
||||
"CreateTable" => {
|
||||
parsed.insert(PushdownOperation::CreateTable);
|
||||
parsed.insert(NamespaceClientPushdownOperation::CreateTable);
|
||||
}
|
||||
_ => {
|
||||
return Err(PyValueError::new_err(format!(
|
||||
@@ -590,7 +590,8 @@ pub fn connect_namespace_client(
|
||||
) -> PyResult<Connection> {
|
||||
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
||||
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
||||
let pushdown_operations = parse_pushdown_operations(namespace_client_pushdown_operations)?;
|
||||
let namespace_client_pushdown_operations =
|
||||
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
||||
let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string());
|
||||
let ns_properties = namespace_client_properties.unwrap_or_default();
|
||||
let storage_options = storage_options.unwrap_or_default();
|
||||
@@ -603,7 +604,7 @@ pub fn connect_namespace_client(
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
pushdown_operations,
|
||||
namespace_client_pushdown_operations,
|
||||
);
|
||||
|
||||
Ok(Connection::new(LanceConnection::new(
|
||||
|
||||
@@ -915,7 +915,7 @@ use std::collections::HashSet;
|
||||
/// These operations will be executed on the namespace server instead of locally
|
||||
/// when enabled via [`ConnectNamespaceBuilder::pushdown_operations`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum PushdownOperation {
|
||||
pub enum NamespaceClientPushdownOperation {
|
||||
/// Execute queries on the namespace server via `query_table()` instead of locally.
|
||||
QueryTable,
|
||||
/// Execute table creation on the namespace server via `create_table()`
|
||||
@@ -931,7 +931,7 @@ pub struct ConnectNamespaceBuilder {
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
}
|
||||
|
||||
impl ConnectNamespaceBuilder {
|
||||
@@ -1029,11 +1029,11 @@ impl ConnectNamespaceBuilder {
|
||||
/// and leveraging server-side compute resources.
|
||||
///
|
||||
/// Available operations:
|
||||
/// - [`PushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
|
||||
/// - [`PushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
|
||||
/// - [`NamespaceClientPushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
|
||||
/// - [`NamespaceClientPushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
|
||||
///
|
||||
/// By default, no operations are pushed down (all executed locally).
|
||||
pub fn pushdown_operation(mut self, operation: PushdownOperation) -> Self {
|
||||
pub fn pushdown_operation(mut self, operation: NamespaceClientPushdownOperation) -> Self {
|
||||
self.pushdown_operations.insert(operation);
|
||||
self
|
||||
}
|
||||
@@ -1043,7 +1043,7 @@ impl ConnectNamespaceBuilder {
|
||||
/// See [`Self::pushdown_operation`] for details.
|
||||
pub fn pushdown_operations(
|
||||
mut self,
|
||||
operations: impl IntoIterator<Item = PushdownOperation>,
|
||||
operations: impl IntoIterator<Item = NamespaceClientPushdownOperation>,
|
||||
) -> Self {
|
||||
self.pushdown_operations.extend(operations);
|
||||
self
|
||||
|
||||
@@ -22,7 +22,7 @@ use lance_namespace_impls::ConnectBuilder;
|
||||
use lance_table::io::commit::CommitHandler;
|
||||
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
||||
|
||||
use crate::connection::PushdownOperation;
|
||||
use crate::connection::NamespaceClientPushdownOperation;
|
||||
use crate::database::ReadConsistency;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::NativeTable;
|
||||
@@ -44,7 +44,7 @@ pub struct LanceNamespaceDatabase {
|
||||
// database URI
|
||||
uri: String,
|
||||
// Operations to push down to the namespace server
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
// Namespace implementation type (e.g., "dir", "rest")
|
||||
ns_impl: String,
|
||||
// Namespace properties used to construct the namespace client
|
||||
@@ -53,23 +53,23 @@ pub struct LanceNamespaceDatabase {
|
||||
|
||||
impl LanceNamespaceDatabase {
|
||||
pub fn from_namespace_client(
|
||||
namespace: Arc<dyn LanceNamespace>,
|
||||
ns_impl: String,
|
||||
ns_properties: HashMap<String, String>,
|
||||
namespace_client: Arc<dyn LanceNamespace>,
|
||||
namespace_client_impl: String,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Self {
|
||||
Self {
|
||||
namespace,
|
||||
namespace: namespace_client,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
uri: format!("namespace://{}", ns_impl),
|
||||
pushdown_operations,
|
||||
ns_impl,
|
||||
ns_properties,
|
||||
uri: format!("namespace://{}", namespace_client_impl),
|
||||
pushdown_operations: namespace_client_pushdown_operations,
|
||||
ns_impl: namespace_client_impl,
|
||||
ns_properties: namespace_client_properties,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ impl LanceNamespaceDatabase {
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Result<Self> {
|
||||
let mut builder = ConnectBuilder::new(ns_impl);
|
||||
for (key, value) in ns_properties.clone() {
|
||||
|
||||
@@ -47,7 +47,7 @@ use std::format;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::connection::PushdownOperation;
|
||||
use crate::connection::NamespaceClientPushdownOperation;
|
||||
|
||||
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
|
||||
use crate::database::Database;
|
||||
@@ -1272,7 +1272,7 @@ pub struct NativeTable {
|
||||
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
// Operations to push down to the namespace server.
|
||||
// pub(crate) so query.rs can access the field for server-side query execution.
|
||||
pub(crate) pushdown_operations: HashSet<PushdownOperation>,
|
||||
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NativeTable {
|
||||
@@ -1359,7 +1359,7 @@ impl NativeTable {
|
||||
params: Option<ReadParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
managed_versioning: Option<bool>,
|
||||
) -> Result<Self> {
|
||||
let params = params.unwrap_or_default();
|
||||
@@ -1470,7 +1470,7 @@ impl NativeTable {
|
||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
params: Option<ReadParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
) -> Result<Self> {
|
||||
let mut params = params.unwrap_or_default();
|
||||
@@ -1518,7 +1518,7 @@ impl NativeTable {
|
||||
let id = Self::build_id(&namespace, name);
|
||||
|
||||
let stored_namespace_client =
|
||||
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
|
||||
if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
|
||||
Some(namespace_client)
|
||||
} else {
|
||||
None
|
||||
@@ -1588,7 +1588,7 @@ impl NativeTable {
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Result<Self> {
|
||||
// Default params uses format v1.
|
||||
let params = params.unwrap_or(WriteParams {
|
||||
@@ -1635,7 +1635,7 @@ impl NativeTable {
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Result<Self> {
|
||||
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
|
||||
Self::create(
|
||||
@@ -1685,7 +1685,7 @@ impl NativeTable {
|
||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
pushdown_operations: HashSet<PushdownOperation>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
) -> Result<Self> {
|
||||
// Build table_id from namespace + name for the storage options provider
|
||||
@@ -1738,7 +1738,7 @@ impl NativeTable {
|
||||
let id = Self::build_id(&namespace, name);
|
||||
|
||||
let stored_namespace_client =
|
||||
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
|
||||
if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
|
||||
Some(namespace_client)
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::NativeTable;
|
||||
use crate::connection::PushdownOperation;
|
||||
use crate::connection::NamespaceClientPushdownOperation;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::expr::expr_to_sql_string;
|
||||
use crate::query::{
|
||||
@@ -44,7 +44,7 @@ pub async fn execute_query(
|
||||
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
|
||||
if table
|
||||
.pushdown_operations
|
||||
.contains(&PushdownOperation::QueryTable)
|
||||
.contains(&NamespaceClientPushdownOperation::QueryTable)
|
||||
&& let Some(ref namespace_client) = table.namespace_client
|
||||
{
|
||||
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
|
||||
|
||||
Reference in New Issue
Block a user