mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-02 02:20:41 +00:00
Thread an optional partition_by through the client: CreateMaterializedViewRequest -> REST body -> pyo3 binding -> Python create_materialized_view/create_view kwarg (sync + async). The server partitions the view's table function by the named source column -- by IVF index clusters if the column is indexed (image-dedup), else by distinct value. Unifies Geneva's partition_by + partition_by_indexed_column into one knob. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1046 lines
37 KiB
Rust
1046 lines
37 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
use std::{
|
|
collections::{HashMap, HashSet},
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use crate::{
|
|
error::PythonErrorExt,
|
|
namespace::{create_namespace_storage_options_provider, extract_namespace_arc},
|
|
runtime::future_into_py,
|
|
table::Table,
|
|
};
|
|
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
|
|
use lancedb::{
|
|
connection::Connection as LanceConnection,
|
|
connection::NamespaceClientPushdownOperation,
|
|
database::namespace::LanceNamespaceDatabase,
|
|
database::{
|
|
CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode, Database,
|
|
ReadConsistency, RefreshMaterializedViewRequest,
|
|
},
|
|
};
|
|
use pyo3::{
|
|
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
|
exceptions::{PyRuntimeError, PyValueError},
|
|
pyclass, pyfunction, pymethods,
|
|
types::{PyDict, PyDictMethods},
|
|
};
|
|
|
|
/// A registered function, as returned by `list_functions`.
|
|
#[pyclass(get_all)]
|
|
#[derive(Clone)]
|
|
pub struct FunctionInfo {
|
|
pub name: String,
|
|
pub language: String,
|
|
pub return_type: String,
|
|
pub description: String,
|
|
}
|
|
|
|
/// A registered materialized view definition.
|
|
#[pyclass(get_all)]
|
|
#[derive(Clone)]
|
|
pub struct MaterializedViewInfo {
|
|
pub name: String,
|
|
pub source_table: String,
|
|
pub projection: Vec<String>,
|
|
pub udf_columns: Vec<String>,
|
|
pub filter: Option<String>,
|
|
pub auto_refresh: bool,
|
|
}
|
|
|
|
/// One inflight server-side job.
|
|
#[pyclass(get_all)]
|
|
#[derive(Clone)]
|
|
pub struct JobInfo {
|
|
pub table: String,
|
|
pub job_id: String,
|
|
pub job_type: String,
|
|
pub state: String,
|
|
pub column: Option<String>,
|
|
pub age_seconds: Option<i64>,
|
|
pub command: Option<String>,
|
|
pub units_done: Option<i64>,
|
|
pub units_total: Option<i64>,
|
|
pub committed: bool,
|
|
pub rows_skipped: u64,
|
|
pub error: Option<String>,
|
|
}
|
|
|
|
/// The plan a REFRESH MATERIALIZED VIEW would execute (EXPLAIN REFRESH).
|
|
#[pyclass(get_all)]
|
|
#[derive(Clone)]
|
|
pub struct MvRefreshPlan {
|
|
pub table_name: String,
|
|
pub has_work: bool,
|
|
pub source_version: u64,
|
|
pub last_refreshed_version: Option<u64>,
|
|
pub full_refresh: bool,
|
|
pub rebuild: bool,
|
|
pub units_total: u64,
|
|
}
|
|
|
|
#[pyclass]
|
|
pub struct Connection {
|
|
inner: Option<LanceConnection>,
|
|
}
|
|
|
|
impl Connection {
|
|
pub(crate) fn new(inner: LanceConnection) -> Self {
|
|
Self { inner: Some(inner) }
|
|
}
|
|
|
|
pub(crate) fn get_inner(&self) -> PyResult<&LanceConnection> {
|
|
self.inner
|
|
.as_ref()
|
|
.ok_or_else(|| PyRuntimeError::new_err("Connection is closed"))
|
|
}
|
|
}
|
|
|
|
fn parse_namespace_client_pushdown_operations(
|
|
operations: Option<Vec<String>>,
|
|
) -> PyResult<HashSet<NamespaceClientPushdownOperation>> {
|
|
let mut parsed = HashSet::new();
|
|
for operation in operations.unwrap_or_default() {
|
|
match operation.as_str() {
|
|
"QueryTable" => {
|
|
parsed.insert(NamespaceClientPushdownOperation::QueryTable);
|
|
}
|
|
"CreateTable" => {
|
|
parsed.insert(NamespaceClientPushdownOperation::CreateTable);
|
|
}
|
|
_ => {
|
|
return Err(PyValueError::new_err(format!(
|
|
"Invalid pushdown operation: {}",
|
|
operation
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
Ok(parsed)
|
|
}
|
|
|
|
impl Connection {
|
|
fn parse_create_mode_str(mode: &str) -> PyResult<CreateTableMode> {
|
|
match mode {
|
|
"create" => Ok(CreateTableMode::Create),
|
|
"overwrite" => Ok(CreateTableMode::Overwrite),
|
|
"exist_ok" => Ok(CreateTableMode::exist_ok(|builder| builder)),
|
|
_ => Err(PyValueError::new_err(format!("Invalid mode {}", mode))),
|
|
}
|
|
}
|
|
|
|
pub fn database(&self) -> PyResult<Arc<dyn Database>> {
|
|
Ok(self.get_inner()?.database().clone())
|
|
}
|
|
}
|
|
|
|
#[pymethods]
|
|
impl Connection {
|
|
fn __repr__(&self) -> String {
|
|
match &self.inner {
|
|
Some(inner) => inner.to_string(),
|
|
None => "ClosedConnection".to_string(),
|
|
}
|
|
}
|
|
|
|
fn is_open(&self) -> bool {
|
|
self.inner.is_some()
|
|
}
|
|
|
|
fn close(&mut self) {
|
|
self.inner.take();
|
|
}
|
|
|
|
#[getter]
|
|
pub fn uri(&self) -> PyResult<String> {
|
|
self.get_inner().map(|inner| inner.uri().to_string())
|
|
}
|
|
|
|
#[pyo3(signature = ())]
|
|
pub fn get_read_consistency_interval(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
Ok(match inner.read_consistency().await.infer_error()? {
|
|
ReadConsistency::Manual => None,
|
|
ReadConsistency::Eventual(duration) => Some(duration.as_secs_f64()),
|
|
ReadConsistency::Strong => Some(0.0_f64),
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path=None, start_after=None, limit=None))]
|
|
pub fn table_names(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
start_after: Option<String>,
|
|
limit: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let mut op = inner.table_names();
|
|
op = op.namespace(namespace_path.unwrap_or_default());
|
|
if let Some(start_after) = start_after {
|
|
op = op.start_after(start_after);
|
|
}
|
|
if let Some(limit) = limit {
|
|
op = op.limit(limit);
|
|
}
|
|
future_into_py(self_.py(), async move { op.execute().await.infer_error() })
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[pyo3(signature = (name, mode, data, namespace_path=None, storage_options=None, location=None, namespace_client=None))]
|
|
pub fn create_table<'a>(
|
|
self_: PyRef<'a, Self>,
|
|
name: String,
|
|
mode: &str,
|
|
data: Bound<'_, PyAny>,
|
|
namespace_path: Option<Vec<String>>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
location: Option<String>,
|
|
namespace_client: Option<Py<PyAny>>,
|
|
) -> PyResult<Bound<'a, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
|
|
let mode = Self::parse_create_mode_str(mode)?;
|
|
|
|
let batches: Box<dyn arrow::array::RecordBatchReader + Send> =
|
|
Box::new(ArrowArrayStreamReader::from_pyarrow_bound(&data)?);
|
|
|
|
let ns_path = namespace_path.clone().unwrap_or_default();
|
|
let mut builder = inner.create_table(name.clone(), batches).mode(mode);
|
|
|
|
builder = builder.namespace(ns_path.clone());
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
|
|
// Auto-create storage options provider from namespace_client
|
|
if let Some(ns_obj) = namespace_client {
|
|
let ns_client = extract_namespace_arc(py, ns_obj)?;
|
|
// Create table_id by combining namespace_path with table name
|
|
let mut table_id = ns_path;
|
|
table_id.push(name);
|
|
let provider = create_namespace_storage_options_provider(ns_client, table_id);
|
|
builder = builder.storage_options_provider(provider);
|
|
}
|
|
|
|
if let Some(location) = location {
|
|
builder = builder.location(location);
|
|
}
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[pyo3(signature = (name, mode, schema, namespace_path=None, storage_options=None, location=None, namespace_client=None))]
|
|
pub fn create_empty_table<'a>(
|
|
self_: PyRef<'a, Self>,
|
|
name: String,
|
|
mode: &str,
|
|
schema: Bound<'_, PyAny>,
|
|
namespace_path: Option<Vec<String>>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
location: Option<String>,
|
|
namespace_client: Option<Py<PyAny>>,
|
|
) -> PyResult<Bound<'a, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
|
|
let mode = Self::parse_create_mode_str(mode)?;
|
|
|
|
let schema = Schema::from_pyarrow_bound(&schema)?;
|
|
|
|
let ns_path = namespace_path.clone().unwrap_or_default();
|
|
let mut builder = inner
|
|
.create_empty_table(name.clone(), Arc::new(schema))
|
|
.mode(mode);
|
|
|
|
builder = builder.namespace(ns_path.clone());
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
|
|
// Auto-create storage options provider from namespace_client
|
|
if let Some(ns_obj) = namespace_client {
|
|
let ns_client = extract_namespace_arc(py, ns_obj)?;
|
|
// Create table_id by combining namespace_path with table name
|
|
let mut table_id = ns_path;
|
|
table_id.push(name);
|
|
let provider = create_namespace_storage_options_provider(ns_client, table_id);
|
|
builder = builder.storage_options_provider(provider);
|
|
}
|
|
|
|
if let Some(location) = location {
|
|
builder = builder.location(location);
|
|
}
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[pyo3(signature = (name, namespace_path=None, storage_options=None, index_cache_size=None, location=None, namespace_client=None, managed_versioning=None))]
|
|
pub fn open_table(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
namespace_path: Option<Vec<String>>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
index_cache_size: Option<u32>,
|
|
location: Option<String>,
|
|
namespace_client: Option<Py<PyAny>>,
|
|
managed_versioning: Option<bool>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
|
|
let ns_path = namespace_path.clone().unwrap_or_default();
|
|
let mut builder = inner.open_table(name.clone());
|
|
builder = builder.namespace(ns_path.clone());
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
|
|
// Auto-create storage options provider from namespace_client
|
|
if let Some(ns_obj) = namespace_client {
|
|
let ns_client = extract_namespace_arc(py, ns_obj)?;
|
|
// Create table_id by combining namespace_path with table name
|
|
let mut table_id = ns_path;
|
|
table_id.push(name);
|
|
let provider = create_namespace_storage_options_provider(ns_client.clone(), table_id);
|
|
builder = builder.storage_options_provider(provider);
|
|
builder = builder.namespace_client(ns_client);
|
|
}
|
|
|
|
if let Some(index_cache_size) = index_cache_size {
|
|
builder = builder.index_cache_size(index_cache_size);
|
|
}
|
|
if let Some(location) = location {
|
|
builder = builder.location(location);
|
|
}
|
|
// Pass managed_versioning if provided to avoid redundant describe_table call
|
|
if let Some(enabled) = managed_versioning {
|
|
builder = builder.managed_versioning(enabled);
|
|
}
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (target_table_name, source_uri, target_namespace_path=None, source_version=None, source_tag=None, is_shallow=true))]
|
|
pub fn clone_table(
|
|
self_: PyRef<'_, Self>,
|
|
target_table_name: String,
|
|
source_uri: String,
|
|
target_namespace_path: Option<Vec<String>>,
|
|
source_version: Option<u64>,
|
|
source_tag: Option<String>,
|
|
is_shallow: bool,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
|
|
let mut builder = inner.clone_table(target_table_name, source_uri);
|
|
builder = builder.target_namespace(target_namespace_path.unwrap_or_default());
|
|
if let Some(version) = source_version {
|
|
builder = builder.source_version(version);
|
|
}
|
|
if let Some(tag) = source_tag {
|
|
builder = builder.source_tag(tag);
|
|
}
|
|
builder = builder.is_shallow(is_shallow);
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (name, language, return_type, body, options=None))]
|
|
pub fn create_function(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
language: String,
|
|
return_type: String,
|
|
body: String,
|
|
options: Option<HashMap<String, String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner
|
|
.create_function(CreateFunctionRequest {
|
|
name,
|
|
language,
|
|
return_type,
|
|
body,
|
|
options: options.unwrap_or_default(),
|
|
})
|
|
.await
|
|
.infer_error()
|
|
})
|
|
}
|
|
|
|
pub fn list_functions(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
let functions = inner.list_functions().await.infer_error()?;
|
|
Ok(functions
|
|
.into_iter()
|
|
.map(|f| FunctionInfo {
|
|
name: f.name,
|
|
language: f.language,
|
|
return_type: f.return_type,
|
|
description: f.description,
|
|
})
|
|
.collect::<Vec<_>>())
|
|
})
|
|
}
|
|
|
|
pub fn drop_function(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner.drop_function(&name).await.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (name, query, auto_refresh=false, with_no_data=false, partition_by=None))]
|
|
pub fn create_materialized_view(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
query: String,
|
|
auto_refresh: bool,
|
|
with_no_data: bool,
|
|
partition_by: Option<String>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner
|
|
.create_materialized_view(CreateMaterializedViewRequest {
|
|
name,
|
|
query,
|
|
auto_refresh,
|
|
with_no_data,
|
|
partition_by,
|
|
})
|
|
.await
|
|
.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (name, src_version=None, num_workers=None, max_workers=None))]
|
|
pub fn refresh_materialized_view(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
src_version: Option<u64>,
|
|
num_workers: Option<u32>,
|
|
max_workers: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner
|
|
.refresh_materialized_view(RefreshMaterializedViewRequest {
|
|
name,
|
|
src_version,
|
|
num_workers,
|
|
max_workers,
|
|
})
|
|
.await
|
|
.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (name, full=false, src_version=None))]
|
|
pub fn explain_refresh_materialized_view(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
full: bool,
|
|
src_version: Option<u64>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
let p = inner
|
|
.explain_refresh_materialized_view(&name, full, src_version)
|
|
.await
|
|
.infer_error()?;
|
|
Ok(MvRefreshPlan {
|
|
table_name: p.table_name,
|
|
has_work: p.has_work,
|
|
source_version: p.source_version,
|
|
last_refreshed_version: p.last_refreshed_version,
|
|
full_refresh: p.full_refresh,
|
|
rebuild: p.rebuild,
|
|
units_total: p.units_total,
|
|
})
|
|
})
|
|
}
|
|
|
|
pub fn alter_materialized_view(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
auto_refresh: bool,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner
|
|
.alter_materialized_view(&name, auto_refresh)
|
|
.await
|
|
.infer_error()
|
|
})
|
|
}
|
|
|
|
pub fn drop_materialized_view(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner.drop_materialized_view(&name).await.infer_error()
|
|
})
|
|
}
|
|
|
|
pub fn list_materialized_views(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
let views = inner.list_materialized_views().await.infer_error()?;
|
|
Ok(views
|
|
.into_iter()
|
|
.map(|v| MaterializedViewInfo {
|
|
name: v.name,
|
|
source_table: v.source_table,
|
|
projection: v.projection,
|
|
udf_columns: v.udf_columns,
|
|
filter: v.filter,
|
|
auto_refresh: v.auto_refresh,
|
|
})
|
|
.collect::<Vec<_>>())
|
|
})
|
|
}
|
|
|
|
pub fn list_jobs(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
let jobs = inner.list_jobs().await.infer_error()?;
|
|
Ok(jobs
|
|
.into_iter()
|
|
.map(|j| JobInfo {
|
|
table: j.table,
|
|
job_id: j.job_id,
|
|
job_type: j.job_type,
|
|
state: j.state,
|
|
column: j.column,
|
|
age_seconds: j.age_seconds,
|
|
command: j.command,
|
|
units_done: j.units_done,
|
|
units_total: j.units_total,
|
|
committed: j.committed,
|
|
rows_skipped: j.rows_skipped,
|
|
error: j.error,
|
|
})
|
|
.collect::<Vec<_>>())
|
|
})
|
|
}
|
|
|
|
pub fn cancel_job(self_: PyRef<'_, Self>, job_id: String) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
inner.cancel_job(&job_id).await.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))]
|
|
pub fn rename_table(
|
|
self_: PyRef<'_, Self>,
|
|
cur_name: String,
|
|
new_name: String,
|
|
cur_namespace_path: Option<Vec<String>>,
|
|
new_namespace_path: Option<Vec<String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let cur_ns_path = cur_namespace_path.unwrap_or_default();
|
|
let new_ns_path = new_namespace_path.unwrap_or_default();
|
|
future_into_py(self_.py(), async move {
|
|
inner
|
|
.rename_table(cur_name, new_name, &cur_ns_path, &new_ns_path)
|
|
.await
|
|
.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (name, namespace_path=None))]
|
|
pub fn drop_table(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
namespace_path: Option<Vec<String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let ns_path = namespace_path.unwrap_or_default();
|
|
future_into_py(self_.py(), async move {
|
|
inner.drop_table(name, &ns_path).await.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path=None,))]
|
|
pub fn drop_all_tables(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let ns_path = namespace_path.unwrap_or_default();
|
|
future_into_py(self_.py(), async move {
|
|
inner.drop_all_tables(&ns_path).await.infer_error()
|
|
})
|
|
}
|
|
|
|
// Namespace management methods
|
|
|
|
#[pyo3(signature = (namespace_path=None, page_token=None, limit=None))]
|
|
pub fn list_namespaces(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
page_token: Option<String>,
|
|
limit: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::ListNamespacesRequest;
|
|
let request = ListNamespacesRequest {
|
|
id: namespace_path,
|
|
page_token,
|
|
limit: limit.map(|l| l as i32),
|
|
..Default::default()
|
|
};
|
|
let response = inner.list_namespaces(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("namespaces", response.namespaces)?;
|
|
dict.set_item("page_token", response.page_token)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path, mode=None, properties=None))]
|
|
pub fn create_namespace(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Vec<String>,
|
|
mode: Option<String>,
|
|
properties: Option<std::collections::HashMap<String, String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::CreateNamespaceRequest;
|
|
// Mode is now a string field
|
|
let mode_str = mode
|
|
.map(|m| match m.to_lowercase().as_str() {
|
|
"create" => Ok("Create".to_string()),
|
|
"exist_ok" => Ok("ExistOk".to_string()),
|
|
"overwrite" => Ok("Overwrite".to_string()),
|
|
_ => Err(PyValueError::new_err(format!(
|
|
"Invalid mode {:?}: expected one of 'create', 'exist_ok', 'overwrite'",
|
|
m
|
|
))),
|
|
})
|
|
.transpose()?;
|
|
let request = CreateNamespaceRequest {
|
|
id: Some(namespace_path),
|
|
mode: mode_str,
|
|
properties,
|
|
..Default::default()
|
|
};
|
|
let response = inner.create_namespace(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("properties", response.properties)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path, mode=None, behavior=None))]
|
|
pub fn drop_namespace(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Vec<String>,
|
|
mode: Option<String>,
|
|
behavior: Option<String>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::DropNamespaceRequest;
|
|
// Mode and Behavior are now string fields
|
|
let mode_str = mode
|
|
.map(|m| match m.to_uppercase().as_str() {
|
|
"SKIP" => Ok("Skip".to_string()),
|
|
"FAIL" => Ok("Fail".to_string()),
|
|
_ => Err(PyValueError::new_err(format!(
|
|
"Invalid mode {:?}: expected one of 'skip', 'fail'",
|
|
m
|
|
))),
|
|
})
|
|
.transpose()?;
|
|
let behavior_str = behavior
|
|
.map(|b| match b.to_uppercase().as_str() {
|
|
"RESTRICT" => Ok("Restrict".to_string()),
|
|
"CASCADE" => Ok("Cascade".to_string()),
|
|
_ => Err(PyValueError::new_err(format!(
|
|
"Invalid behavior {:?}: expected one of 'restrict', 'cascade'",
|
|
b
|
|
))),
|
|
})
|
|
.transpose()?;
|
|
let request = DropNamespaceRequest {
|
|
id: Some(namespace_path),
|
|
mode: mode_str,
|
|
behavior: behavior_str,
|
|
..Default::default()
|
|
};
|
|
let response = inner.drop_namespace(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("properties", response.properties)?;
|
|
dict.set_item("transaction_id", response.transaction_id)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path,))]
|
|
pub fn describe_namespace(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Vec<String>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::DescribeNamespaceRequest;
|
|
let request = DescribeNamespaceRequest {
|
|
id: Some(namespace_path),
|
|
..Default::default()
|
|
};
|
|
let response = inner.describe_namespace(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("properties", response.properties)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path=None, page_token=None, limit=None))]
|
|
pub fn list_tables(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
page_token: Option<String>,
|
|
limit: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::ListTablesRequest;
|
|
let request = ListTablesRequest {
|
|
id: namespace_path,
|
|
page_token,
|
|
limit: limit.map(|l| l as i32),
|
|
..Default::default()
|
|
};
|
|
let response = inner.list_tables(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("tables", response.tables)?;
|
|
dict.set_item("page_token", response.page_token)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
/// Get the configuration for constructing an equivalent namespace client.
|
|
/// Returns a dict with:
|
|
/// - "impl": "dir" for DirectoryNamespace, "rest" for RestNamespace
|
|
/// - "properties": configuration properties for the namespace
|
|
#[pyo3(signature = ())]
|
|
pub fn namespace_client_config(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
let (impl_type, properties) = inner.namespace_client_config().await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("impl", impl_type)?;
|
|
dict.set_item("properties", properties)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
#[pyfunction]
|
|
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None, oauth_config=None))]
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub fn connect(
|
|
py: Python<'_>,
|
|
uri: String,
|
|
api_key: Option<String>,
|
|
region: Option<String>,
|
|
host_override: Option<String>,
|
|
read_consistency_interval: Option<f64>,
|
|
client_config: Option<PyClientConfig>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
session: Option<crate::session::Session>,
|
|
manifest_enabled: bool,
|
|
namespace_client_properties: Option<HashMap<String, String>>,
|
|
oauth_config: Option<crate::oauth::PyOAuthConfig>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
future_into_py(py, async move {
|
|
let mut builder = lancedb::connect(&uri);
|
|
if let Some(api_key) = api_key {
|
|
builder = builder.api_key(&api_key);
|
|
}
|
|
if let Some(region) = region {
|
|
builder = builder.region(®ion);
|
|
}
|
|
if let Some(host_override) = host_override {
|
|
builder = builder.host_override(&host_override);
|
|
}
|
|
if let Some(read_consistency_interval) = read_consistency_interval {
|
|
let read_consistency_interval = Duration::from_secs_f64(read_consistency_interval);
|
|
builder = builder.read_consistency_interval(read_consistency_interval);
|
|
}
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
if manifest_enabled {
|
|
builder = builder.manifest_enabled(true);
|
|
}
|
|
if let Some(namespace_client_properties) = namespace_client_properties {
|
|
builder = builder.namespace_client_properties(namespace_client_properties);
|
|
}
|
|
#[cfg(feature = "remote")]
|
|
if let Some(client_config) = client_config {
|
|
builder = builder.client_config(client_config.into());
|
|
}
|
|
if let Some(oauth_config) = oauth_config {
|
|
let config: lancedb::remote::oauth::OAuthConfig =
|
|
oauth_config.try_into().infer_error()?;
|
|
builder = builder.oauth_config(config);
|
|
}
|
|
if let Some(session) = session {
|
|
builder = builder.session(session.inner.clone());
|
|
}
|
|
Ok(Connection::new(builder.execute().await.infer_error()?))
|
|
})
|
|
}
|
|
|
|
#[pyfunction]
|
|
#[pyo3(signature = (
|
|
namespace_client,
|
|
read_consistency_interval=None,
|
|
storage_options=None,
|
|
session=None,
|
|
namespace_client_pushdown_operations=None,
|
|
namespace_client_impl=None,
|
|
namespace_client_properties=None,
|
|
))]
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub fn connect_namespace_client(
|
|
py: Python<'_>,
|
|
namespace_client: Py<PyAny>,
|
|
read_consistency_interval: Option<f64>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
session: Option<crate::session::Session>,
|
|
namespace_client_pushdown_operations: Option<Vec<String>>,
|
|
namespace_client_impl: Option<String>,
|
|
namespace_client_properties: Option<HashMap<String, String>>,
|
|
) -> PyResult<Connection> {
|
|
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
|
let namespace_client_pushdown_operations =
|
|
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
|
let ns_properties = namespace_client_properties.unwrap_or_default();
|
|
let storage_options = storage_options.unwrap_or_default();
|
|
let session = session.map(|s| s.inner.clone());
|
|
|
|
// Prefer building the namespace natively from (impl, properties) so the
|
|
// read-freshness provider installed
|
|
let database = if build_namespace_natively(namespace_client_impl.as_deref(), &ns_properties) {
|
|
let ns_impl = namespace_client_impl.expect("impl present per build_namespace_natively");
|
|
crate::runtime::block_on(LanceNamespaceDatabase::connect(
|
|
&ns_impl,
|
|
ns_properties,
|
|
storage_options,
|
|
read_consistency_interval,
|
|
session,
|
|
namespace_client_pushdown_operations,
|
|
))
|
|
.infer_error()?
|
|
} else {
|
|
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
|
LanceNamespaceDatabase::from_namespace_client(
|
|
namespace_client,
|
|
namespace_client_impl.unwrap_or_else(|| "python".to_string()),
|
|
ns_properties,
|
|
storage_options,
|
|
read_consistency_interval,
|
|
session,
|
|
namespace_client_pushdown_operations,
|
|
)
|
|
};
|
|
|
|
Ok(Connection::new(LanceConnection::new(
|
|
Arc::new(database),
|
|
Arc::new(lancedb::embeddings::MemoryRegistry::new()),
|
|
)))
|
|
}
|
|
|
|
/// Whether to build the namespace natively (from impl + properties) instead of
|
|
/// wrapping a pre-built client. Native construction is required for the
|
|
/// read-freshness provider to be installed
|
|
fn build_namespace_natively(
|
|
namespace_client_impl: Option<&str>,
|
|
namespace_client_properties: &HashMap<String, String>,
|
|
) -> bool {
|
|
matches!(namespace_client_impl, Some("rest")) && !namespace_client_properties.is_empty()
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientConfig {
|
|
user_agent: String,
|
|
retry_config: Option<PyClientRetryConfig>,
|
|
timeout_config: Option<PyClientTimeoutConfig>,
|
|
extra_headers: Option<HashMap<String, String>>,
|
|
id_delimiter: Option<String>,
|
|
tls_config: Option<PyClientTlsConfig>,
|
|
header_provider: Option<Py<PyAny>>,
|
|
user_id: Option<String>,
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientRetryConfig {
|
|
retries: Option<u8>,
|
|
connect_retries: Option<u8>,
|
|
read_retries: Option<u8>,
|
|
backoff_factor: Option<f32>,
|
|
backoff_jitter: Option<f32>,
|
|
statuses: Option<Vec<u16>>,
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientTimeoutConfig {
|
|
timeout: Option<Duration>,
|
|
connect_timeout: Option<Duration>,
|
|
read_timeout: Option<Duration>,
|
|
pool_idle_timeout: Option<Duration>,
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientTlsConfig {
|
|
cert_file: Option<String>,
|
|
key_file: Option<String>,
|
|
ssl_ca_cert: Option<String>,
|
|
assert_hostname: bool,
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientRetryConfig> for lancedb::remote::RetryConfig {
|
|
fn from(value: PyClientRetryConfig) -> Self {
|
|
Self {
|
|
retries: value.retries,
|
|
connect_retries: value.connect_retries,
|
|
read_retries: value.read_retries,
|
|
backoff_factor: value.backoff_factor,
|
|
backoff_jitter: value.backoff_jitter,
|
|
statuses: value.statuses,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientTimeoutConfig> for lancedb::remote::TimeoutConfig {
|
|
fn from(value: PyClientTimeoutConfig) -> Self {
|
|
Self {
|
|
timeout: value.timeout,
|
|
connect_timeout: value.connect_timeout,
|
|
read_timeout: value.read_timeout,
|
|
pool_idle_timeout: value.pool_idle_timeout,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientTlsConfig> for lancedb::remote::TlsConfig {
|
|
fn from(value: PyClientTlsConfig) -> Self {
|
|
Self {
|
|
cert_file: value.cert_file,
|
|
key_file: value.key_file,
|
|
ssl_ca_cert: value.ssl_ca_cert,
|
|
assert_hostname: value.assert_hostname,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientConfig> for lancedb::remote::ClientConfig {
|
|
fn from(value: PyClientConfig) -> Self {
|
|
use crate::header::PyHeaderProvider;
|
|
|
|
let header_provider = value.header_provider.map(|provider| {
|
|
let py_provider = PyHeaderProvider::new(provider);
|
|
Arc::new(py_provider) as Arc<dyn lancedb::remote::HeaderProvider>
|
|
});
|
|
|
|
Self {
|
|
user_agent: value.user_agent,
|
|
retry_config: value.retry_config.map(Into::into).unwrap_or_default(),
|
|
timeout_config: value.timeout_config.map(Into::into).unwrap_or_default(),
|
|
extra_headers: value.extra_headers.unwrap_or_default(),
|
|
id_delimiter: value.id_delimiter,
|
|
tls_config: value.tls_config.map(Into::into),
|
|
header_provider,
|
|
user_id: value.user_id,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
fn props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
|
|
pairs
|
|
.iter()
|
|
.map(|(k, v)| (k.to_string(), v.to_string()))
|
|
.collect()
|
|
}
|
|
|
|
#[test]
|
|
fn native_build_only_for_rest_with_properties() {
|
|
let rest = props(&[("uri", "http://localhost:10024")]);
|
|
|
|
// rest + non-empty properties -> build natively (installs the
|
|
// read-freshness provider so checkout_latest() busts the server cache).
|
|
assert!(build_namespace_natively(Some("rest"), &rest));
|
|
|
|
// dir is local (no server cache) -> wrap the pre-built client unchanged.
|
|
assert!(!build_namespace_natively(
|
|
Some("dir"),
|
|
&props(&[("root", "/tmp")])
|
|
));
|
|
|
|
// No impl: only a pre-built client was handed in -> wrap it as-is.
|
|
assert!(!build_namespace_natively(None, &rest));
|
|
|
|
// rest but no properties: nothing to build a connection from -> wrap.
|
|
assert!(!build_namespace_natively(Some("rest"), &HashMap::new()));
|
|
}
|
|
}
|