diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 332b57398..dcf157348 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -10,7 +10,6 @@ through a namespace abstraction. from __future__ import annotations -import asyncio import sys from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union @@ -25,6 +24,21 @@ if TYPE_CHECKING: from datetime import timedelta import pyarrow as pa +from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType +from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField +from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema +from lance_namespace_urllib3_client.models.query_table_request import QueryTableRequest +from lance_namespace_urllib3_client.models.query_table_request_columns import ( + QueryTableRequestColumns, +) +from lance_namespace_urllib3_client.models.query_table_request_full_text_query import ( + QueryTableRequestFullTextQuery, +) +from lance_namespace_urllib3_client.models.query_table_request_vector import ( + QueryTableRequestVector, +) +from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery +from lancedb._lancedb import connect_namespace_client as _connect_namespace_client from lancedb.background_loop import LOOP from lancedb.db import AsyncConnection, DBConnection from lancedb.namespace_utils import ( @@ -41,64 +55,18 @@ from lance_namespace import ( ListNamespacesResponse, ListTablesResponse, ListTablesRequest, - DescribeTableRequest, DescribeNamespaceRequest, DropTableRequest, ListNamespacesRequest, CreateNamespaceRequest, DropNamespaceRequest, - DeclareTableRequest, - CreateTableRequest, ) from lancedb.table import AsyncTable, LanceTable, Table from lancedb.util import validate_table_name -from lancedb.common import DATA, sanitize_uri +from lancedb.common import DATA from lancedb.pydantic import LanceModel from lancedb.embeddings import EmbeddingFunctionConfig -from ._lancedb import Session, connect as lancedb_connect - - -def _make_temp_async_connection( - uri: str, - read_consistency_interval: Optional[timedelta], - storage_options: Optional[Dict[str, str]], - session: Optional[Session], -) -> AsyncConnection: - read_consistency_interval_secs = ( - read_consistency_interval.total_seconds() - if read_consistency_interval is not None - else None - ) - - async def do_connect(): - return await lancedb_connect( - sanitize_uri(uri), - None, - None, - None, - read_consistency_interval_secs, - None, - storage_options, - session, - ) - - return AsyncConnection(LOOP.run(do_connect())) - - -from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema -from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField -from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType -from lance_namespace_urllib3_client.models.query_table_request import QueryTableRequest -from lance_namespace_urllib3_client.models.query_table_request_vector import ( - QueryTableRequestVector, -) -from lance_namespace_urllib3_client.models.query_table_request_columns import ( - QueryTableRequestColumns, -) -from lance_namespace_urllib3_client.models.query_table_request_full_text_query import ( - QueryTableRequestFullTextQuery, -) -from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery +from ._lancedb import Session def _query_to_namespace_request( @@ -453,6 +421,23 @@ class LanceNamespaceDBConnection(DBConnection): ) self._namespace_client_impl = namespace_client_impl self._namespace_client_properties = namespace_client_properties + self._inner = AsyncConnection( + _connect_namespace_client( + namespace_client, + read_consistency_interval=( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ), + storage_options=self.storage_options or None, + session=session, + namespace_client_pushdown_operations=( + list(self._namespace_client_pushdown_operations) + ), + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, + ) + ) @override def serialize(self) -> str: @@ -526,13 +511,10 @@ class LanceNamespaceDBConnection(DBConnection): if mode.lower() not in ["create", "overwrite"]: raise ValueError("mode must be either 'create' or 'overwrite'") validate_table_name(name) - - table_id = namespace_path + [name] - - if "CreateTable" in self._namespace_client_pushdown_operations: - return self._create_table_server_side( - name=name, - data=data, + async_table = LOOP.run( + self._inner.create_table( + name, + data, schema=schema, mode=mode, exist_ok=exist_ok, @@ -542,148 +524,17 @@ class LanceNamespaceDBConnection(DBConnection): namespace_path=namespace_path, storage_options=storage_options, ) - - # Local create path: declare_table + local write - # Step 1: Get the table location and storage options from namespace - # In overwrite mode, if table exists, use describe_table to get - # existing location. Otherwise, call create_empty_table to reserve - # a new location - location = None - namespace_storage_options = None - if mode.lower() == "overwrite": - # Try to describe the table first to see if it exists - try: - describe_request = DescribeTableRequest(id=table_id) - describe_response = self._namespace_client.describe_table( - describe_request - ) - location = describe_response.location - namespace_storage_options = describe_response.storage_options - except Exception: - # Table doesn't exist, will create a new one below - pass - - if location is None: - # Table doesn't exist or mode is "create", reserve a new location - declare_request = DeclareTableRequest( - id=table_id, - location=None, - properties=self.storage_options if self.storage_options else None, - ) - declare_response = self._namespace_client.declare_table(declare_request) - - if not declare_response.location: - raise ValueError( - "Table location is missing from declare_table response" - ) - - location = declare_response.location - namespace_storage_options = declare_response.storage_options - - # Merge storage options: self.storage_options < user options < namespace options - merged_storage_options = dict(self.storage_options) - if storage_options: - merged_storage_options.update(storage_options) - if namespace_storage_options: - merged_storage_options.update(namespace_storage_options) - - # Step 2: Create the dataset at the namespace-declared physical location. - # - # The namespace has already resolved the logical table identifier - # (namespace_path + name) to an exact storage location. Create a - # short-lived AsyncConnection rooted at that physical location and use it - # only to perform the low-level dataset write. - # - # The sync LanceTable wrapper that we return should still be bound to the - # original namespace connection so that namespace-aware operations keep - # using the logical table id, namespace client, and connection storage - # options. - temp_async_conn = _make_temp_async_connection( - location, - self.read_consistency_interval, - merged_storage_options, - self.session, - ) - - async_table = LOOP.run( - temp_async_conn.create_table( - name, - data, - schema=schema, - mode=mode, - exist_ok=exist_ok, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - embedding_functions=embedding_functions, - namespace_path=[], - storage_options=merged_storage_options, - location=location, - namespace_client=self._namespace_client, - ) ) return LanceTable( self, name, namespace_path=namespace_path, - location=location, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, _async=async_table, ) - def _create_table_server_side( - self, - name: str, - data: Optional[DATA], - schema: Optional[Union[pa.Schema, LanceModel]], - mode: str, - exist_ok: bool, - on_bad_vectors: str, - fill_value: float, - embedding_functions: Optional[List[EmbeddingFunctionConfig]], - namespace_path: Optional[List[str]], - storage_options: Optional[Dict[str, str]], - ) -> Table: - """Create a table using server-side namespace.create_table().""" - if namespace_path is None: - namespace_path = [] - table_id = namespace_path + [name] - - arrow_ipc_bytes = _data_to_arrow_ipc( - data=data, - schema=schema, - embedding_functions=embedding_functions, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - ) - - merged = dict(self.storage_options or {}) - if storage_options: - merged.update(storage_options) - request = CreateTableRequest( - id=table_id, - mode=_normalize_create_table_mode(mode), - properties=merged or None, - ) - - try: - self._namespace_client.create_table(request, arrow_ipc_bytes) - except Exception as e: - if exist_ok and "already exists" in str(e).lower(): - return self.open_table( - name, - namespace_path=namespace_path, - storage_options=storage_options, - ) - raise - - return self.open_table( - name, - namespace_path=namespace_path, - storage_options=storage_options, - ) - @override def open_table( self, @@ -695,30 +546,22 @@ class LanceNamespaceDBConnection(DBConnection): ) -> Table: if namespace_path is None: namespace_path = [] - table_id = namespace_path + [name] - request = DescribeTableRequest(id=table_id) - response = self._namespace_client.describe_table(request) + async_table = LOOP.run( + self._inner.open_table( + name, + namespace_path=namespace_path, + storage_options=storage_options, + index_cache_size=index_cache_size, + ) + ) - # Merge storage options: self.storage_options < user options < namespace options - merged_storage_options = dict(self.storage_options) - if storage_options: - merged_storage_options.update(storage_options) - if response.storage_options: - merged_storage_options.update(response.storage_options) - - # Pass managed_versioning to avoid redundant describe_table call in Rust. - # Convert None to False since we already have the answer from describe_table. - managed_versioning = response.managed_versioning is True - - # Note: storage_options_provider is auto-created in Rust from namespace_client - return self._lance_table_from_uri( + return LanceTable( + self, name, - response.location, namespace_path=namespace_path, - storage_options=merged_storage_options, - index_cache_size=index_cache_size, namespace_client=self._namespace_client, - managed_versioning=managed_versioning, + pushdown_operations=self._namespace_client_pushdown_operations, + _async=async_table, ) @override @@ -944,31 +787,20 @@ class LanceNamespaceDBConnection(DBConnection): ) -> LanceTable: # Open a table directly from the namespace-resolved physical location. # - # The namespace has already resolved the logical table identifier to a - # concrete storage location. Create a short-lived AsyncConnection rooted - # at that physical location and use it only for the low-level open. - # - # The returned sync LanceTable remains bound to the original namespace - # connection so that logical namespace metadata and namespace-aware helper - # behavior remain intact. + # Open the table through the Rust namespace-backed connection. The Rust + # layer keeps the logical namespace path and namespace client intact. if namespace_path is None: namespace_path = [] - temp_async_conn = _make_temp_async_connection( - table_uri, - self.read_consistency_interval, - storage_options if storage_options is not None else {}, - self.session, - ) async_table = LOOP.run( - temp_async_conn.open_table( + self._inner.open_table( name, - namespace_path=[], + namespace_path=namespace_path, storage_options=storage_options, index_cache_size=index_cache_size, - location=table_uri, - namespace_client=None, - managed_versioning=False, + location=None, + namespace_client=namespace_client, + managed_versioning=managed_versioning, ) ) @@ -1047,6 +879,23 @@ class AsyncLanceNamespaceDBConnection: self._namespace_client_pushdown_operations = set( namespace_client_pushdown_operations or [] ) + self._inner = AsyncConnection( + _connect_namespace_client( + namespace_client, + read_consistency_interval=( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ), + storage_options=self.storage_options or None, + session=session, + namespace_client_pushdown_operations=( + list(self._namespace_client_pushdown_operations) + ), + namespace_client_impl=None, + namespace_client_properties=None, + ) + ) async def table_names( self, @@ -1098,145 +947,16 @@ class AsyncLanceNamespaceDBConnection: if mode.lower() not in ["create", "overwrite"]: raise ValueError("mode must be either 'create' or 'overwrite'") validate_table_name(name) - - table_id = namespace_path + [name] - - if "CreateTable" in self._namespace_client_pushdown_operations: - return await self._create_table_server_side( - name=name, - data=data, - schema=schema, - mode=mode, - exist_ok=exist_ok, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - embedding_functions=embedding_functions, - namespace_path=namespace_path, - storage_options=storage_options, - ) - - # Local create path: declare_table + local write - # Step 1: Get the table location and storage options from namespace - location = None - namespace_storage_options = None - if mode.lower() == "overwrite": - # Try to describe the table first to see if it exists - try: - describe_request = DescribeTableRequest(id=table_id) - describe_response = self._namespace_client.describe_table( - describe_request - ) - location = describe_response.location - namespace_storage_options = describe_response.storage_options - except Exception: - # Table doesn't exist, will create a new one below - pass - - if location is None: - # Table doesn't exist or mode is "create", reserve a new location - declare_request = DeclareTableRequest( - id=table_id, - location=None, - properties=self.storage_options if self.storage_options else None, - ) - declare_response = self._namespace_client.declare_table(declare_request) - - if not declare_response.location: - raise ValueError( - "Table location is missing from declare_table response" - ) - - location = declare_response.location - namespace_storage_options = declare_response.storage_options - - # Merge storage options: self.storage_options < user options < namespace options - merged_storage_options = dict(self.storage_options) - if storage_options: - merged_storage_options.update(storage_options) - if namespace_storage_options: - merged_storage_options.update(namespace_storage_options) - - # Step 2: Create the dataset through a short-lived AsyncConnection - # rooted at the namespace-declared physical location. - def _create_table(): - temp_async_conn = _make_temp_async_connection( - location, - self.read_consistency_interval, - merged_storage_options, - self.session, - ) - - return LOOP.run( - temp_async_conn.create_table( - name, - data, - schema=schema, - mode=mode, - exist_ok=exist_ok, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - embedding_functions=embedding_functions, - namespace_path=[], - storage_options=merged_storage_options, - location=location, - namespace_client=self._namespace_client, - ) - ) - - return await asyncio.to_thread(_create_table) - - async def _create_table_server_side( - self, - name: str, - data: Optional[DATA], - schema: Optional[Union[pa.Schema, LanceModel]], - mode: str, - exist_ok: bool, - on_bad_vectors: str, - fill_value: float, - embedding_functions: Optional[List[EmbeddingFunctionConfig]], - namespace_path: Optional[List[str]], - storage_options: Optional[Dict[str, str]], - ) -> AsyncTable: - """Create a table using server-side namespace.create_table().""" - if namespace_path is None: - namespace_path = [] - table_id = namespace_path + [name] - - def _prepare_and_create(): - arrow_ipc_bytes = _data_to_arrow_ipc( - data=data, - schema=schema, - embedding_functions=embedding_functions, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - ) - - merged = dict(self.storage_options or {}) - if storage_options: - merged.update(storage_options) - request = CreateTableRequest( - id=table_id, - mode=_normalize_create_table_mode(mode), - properties=merged or None, - ) - - self._namespace_client.create_table(request, arrow_ipc_bytes) - - try: - await asyncio.to_thread(_prepare_and_create) - except Exception as e: - if exist_ok and "already exists" in str(e).lower(): - return await self.open_table( - name, - namespace_path=namespace_path, - storage_options=storage_options, - ) - raise - - return await self.open_table( + return await self._inner.create_table( name, + data, + schema=schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, namespace_path=namespace_path, + embedding_functions=embedding_functions, storage_options=storage_options, ) @@ -1251,44 +971,12 @@ class AsyncLanceNamespaceDBConnection: """Open an existing table from the namespace.""" if namespace_path is None: namespace_path = [] - table_id = namespace_path + [name] - request = DescribeTableRequest(id=table_id) - response = self._namespace_client.describe_table(request) - - # Merge storage options: self.storage_options < user options < namespace options - merged_storage_options = dict(self.storage_options) - if storage_options: - merged_storage_options.update(storage_options) - if response.storage_options: - merged_storage_options.update(response.storage_options) - - # Capture managed_versioning from describe response. - # Convert None to False since we already have the answer from describe_table. - managed_versioning = response.managed_versioning is True - - # Open the table from the namespace-resolved physical location through a - # short-lived AsyncConnection. - def _open_table(): - temp_async_conn = _make_temp_async_connection( - response.location, - self.read_consistency_interval, - merged_storage_options, - self.session, - ) - - return LOOP.run( - temp_async_conn.open_table( - name, - namespace_path=[], - storage_options=merged_storage_options, - index_cache_size=index_cache_size, - location=response.location, - namespace_client=None, - managed_versioning=False, - ) - ) - - return await asyncio.to_thread(_open_table) + return await self._inner.open_table( + name, + namespace_path=namespace_path, + storage_options=storage_options, + index_cache_size=index_cache_size, + ) async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None): """Drop a table from the namespace.""" diff --git a/python/src/connection.rs b/python/src/connection.rs index f19bfba97..a9ec8e537 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -1,11 +1,17 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow}; use lancedb::{ connection::Connection as LanceConnection, + connection::PushdownOperation, + database::namespace::LanceNamespaceDatabase, database::{CreateTableMode, Database, ReadConsistency}, }; use pyo3::{ @@ -39,6 +45,29 @@ impl Connection { } } +fn parse_pushdown_operations( + operations: Option>, +) -> PyResult> { + let mut parsed = HashSet::new(); + for operation in operations.unwrap_or_default() { + match operation.as_str() { + "QueryTable" => { + parsed.insert(PushdownOperation::QueryTable); + } + "CreateTable" => { + parsed.insert(PushdownOperation::CreateTable); + } + _ => { + return Err(PyValueError::new_err(format!( + "Invalid pushdown operation: {}", + operation + ))); + } + } + } + Ok(parsed) +} + impl Connection { fn parse_create_mode_str(mode: &str) -> PyResult { match mode { @@ -538,6 +567,51 @@ pub fn connect( }) } +#[pyfunction] +#[pyo3(signature = ( + namespace_client, + read_consistency_interval=None, + storage_options=None, + session=None, + namespace_client_pushdown_operations=None, + namespace_client_impl=None, + namespace_client_properties=None, +))] +#[allow(clippy::too_many_arguments)] +pub fn connect_namespace_client( + py: Python<'_>, + namespace_client: Py, + read_consistency_interval: Option, + storage_options: Option>, + session: Option, + namespace_client_pushdown_operations: Option>, + namespace_client_impl: Option, + namespace_client_properties: Option>, +) -> PyResult { + let namespace_client = extract_namespace_arc(py, namespace_client)?; + let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64); + let pushdown_operations = parse_pushdown_operations(namespace_client_pushdown_operations)?; + let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string()); + let ns_properties = namespace_client_properties.unwrap_or_default(); + let storage_options = storage_options.unwrap_or_default(); + let session = session.map(|s| s.inner.clone()); + + let database = LanceNamespaceDatabase::from_namespace_client( + namespace_client, + ns_impl, + ns_properties, + storage_options, + read_consistency_interval, + session, + pushdown_operations, + ); + + Ok(Connection::new(LanceConnection::new( + Arc::new(database), + Arc::new(lancedb::embeddings::MemoryRegistry::new()), + ))) +} + #[derive(FromPyObject)] pub struct PyClientConfig { user_agent: String, diff --git a/python/src/lib.rs b/python/src/lib.rs index e6294cd14..7dd52bdc2 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright The LanceDB Authors use arrow::RecordBatchStream; -use connection::{Connection, connect}; +use connection::{Connection, connect, connect_namespace_client}; use env_logger::Env; use expr::{PyExpr, expr_col, expr_func, expr_lit}; use index::IndexConfig; @@ -58,6 +58,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(connect, m)?)?; + m.add_function(wrap_pyfunction!(connect_namespace_client, m)?)?; m.add_function(wrap_pyfunction!(permutation::async_permutation_builder, m)?)?; m.add_function(wrap_pyfunction!(util::validate_table_name, m)?)?; m.add_function(wrap_pyfunction!(query::fts_query_to_json, m)?)?; diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 6b0d19054..13a8e3967 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -52,6 +52,27 @@ pub struct LanceNamespaceDatabase { } impl LanceNamespaceDatabase { + pub fn from_namespace_client( + namespace: Arc, + ns_impl: String, + ns_properties: HashMap, + storage_options: HashMap, + read_consistency_interval: Option, + session: Option>, + pushdown_operations: HashSet, + ) -> Self { + Self { + namespace, + storage_options, + read_consistency_interval, + session, + uri: format!("namespace://{}", ns_impl), + pushdown_operations, + ns_impl, + ns_properties, + } + } + pub async fn connect( ns_impl: &str, ns_properties: HashMap,