diff --git a/python/pyproject.toml b/python/pyproject.toml index 2d311956..21b435b3 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -59,7 +59,7 @@ tests = [ "polars>=0.19, <=1.3.0", "tantivy", "pyarrow-stubs", - "pylance>=1.0.0b2", + "pylance>=1.0.0b4", "requests", "datafusion", ] diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 98a28724..ce1c1f4b 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -20,7 +20,12 @@ from .remote.db import RemoteDBConnection from .schema import vector from .table import AsyncTable, Table from ._lancedb import Session -from .namespace import connect_namespace, LanceNamespaceDBConnection +from .namespace import ( + connect_namespace, + connect_namespace_async, + LanceNamespaceDBConnection, + AsyncLanceNamespaceDBConnection, +) def connect( @@ -36,7 +41,7 @@ def connect( session: Optional[Session] = None, **kwargs: Any, ) -> DBConnection: - """Connect to a LanceDB database. YAY! + """Connect to a LanceDB database. Parameters ---------- @@ -224,7 +229,9 @@ __all__ = [ "connect", "connect_async", "connect_namespace", + "connect_namespace_async", "AsyncConnection", + "AsyncLanceNamespaceDBConnection", "AsyncTable", "URI", "sanitize_uri", diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 0825a3cc..13171d71 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -10,6 +10,7 @@ through a namespace abstraction. from __future__ import annotations +import asyncio import sys from typing import Dict, Iterable, List, Optional, Union @@ -23,7 +24,7 @@ import pyarrow as pa from lancedb.db import DBConnection, LanceDBConnection from lancedb.io import StorageOptionsProvider -from lancedb.table import LanceTable, Table +from lancedb.table import AsyncTable, LanceTable, Table from lancedb.util import validate_table_name from lancedb.common import DATA from lancedb.pydantic import LanceModel @@ -497,6 +498,294 @@ class LanceNamespaceDBConnection(DBConnection): ) +class AsyncLanceNamespaceDBConnection: + """ + An async LanceDB connection that uses a namespace for table management. + + This connection delegates table URI resolution to a lance_namespace instance, + while providing async methods for all operations. + """ + + def __init__( + self, + namespace: LanceNamespace, + *, + read_consistency_interval: Optional[timedelta] = None, + storage_options: Optional[Dict[str, str]] = None, + session: Optional[Session] = None, + ): + """ + Initialize an async namespace-based LanceDB connection. + + Parameters + ---------- + namespace : LanceNamespace + The namespace instance to use for table management + read_consistency_interval : Optional[timedelta] + The interval at which to check for updates to the table from other + processes. If None, then consistency is not checked. + storage_options : Optional[Dict[str, str]] + Additional options for the storage backend + session : Optional[Session] + A session to use for this connection + """ + self._ns = namespace + self.read_consistency_interval = read_consistency_interval + self.storage_options = storage_options or {} + self.session = session + + async def table_names( + self, + page_token: Optional[str] = None, + limit: int = 10, + *, + namespace: List[str] = [], + ) -> Iterable[str]: + """List table names in the namespace.""" + request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) + response = self._ns.list_tables(request) + return response.tables if response.tables else [] + + async def create_table( + self, + name: str, + data: Optional[DATA] = None, + schema: Optional[Union[pa.Schema, LanceModel]] = None, + mode: str = "create", + exist_ok: bool = False, + on_bad_vectors: str = "error", + fill_value: float = 0.0, + embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, + *, + namespace: List[str] = [], + storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + data_storage_version: Optional[str] = None, + enable_v2_manifest_paths: Optional[bool] = None, + ) -> AsyncTable: + """Create a new table in the namespace.""" + if mode.lower() not in ["create", "overwrite"]: + raise ValueError("mode must be either 'create' or 'overwrite'") + validate_table_name(name) + + # Get location from namespace + table_id = namespace + [name] + + # Step 1: Get the table location and storage options from namespace + location = None + namespace_storage_options = None + if mode.lower() == "overwrite": + # Try to describe the table first to see if it exists + try: + describe_request = DescribeTableRequest(id=table_id) + describe_response = self._ns.describe_table(describe_request) + location = describe_response.location + namespace_storage_options = describe_response.storage_options + except Exception: + # Table doesn't exist, will create a new one below + pass + + if location is None: + # Table doesn't exist or mode is "create", reserve a new location + create_empty_request = CreateEmptyTableRequest( + id=table_id, + location=None, + properties=self.storage_options if self.storage_options else None, + ) + create_empty_response = self._ns.create_empty_table(create_empty_request) + + if not create_empty_response.location: + raise ValueError( + "Table location is missing from create_empty_table response" + ) + + location = create_empty_response.location + namespace_storage_options = create_empty_response.storage_options + + # Merge storage options: self.storage_options < user options < namespace options + merged_storage_options = dict(self.storage_options) + if storage_options: + merged_storage_options.update(storage_options) + if namespace_storage_options: + merged_storage_options.update(namespace_storage_options) + + # Step 2: Create table using LanceTable.create with the location + # Run the sync operation in a thread + def _create_table(): + temp_conn = LanceDBConnection( + location, + read_consistency_interval=self.read_consistency_interval, + storage_options=merged_storage_options, + session=self.session, + ) + + # Create a storage options provider if not provided by user + if ( + storage_options_provider is None + and namespace_storage_options is not None + ): + provider = LanceNamespaceStorageOptionsProvider( + namespace=self._ns, + table_id=table_id, + ) + else: + provider = storage_options_provider + + return LanceTable.create( + temp_conn, + name, + data, + schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, + embedding_functions=embedding_functions, + namespace=namespace, + storage_options=merged_storage_options, + storage_options_provider=provider, + location=location, + ) + + lance_table = await asyncio.to_thread(_create_table) + # Get the underlying async table from LanceTable + return lance_table._table + + async def open_table( + self, + name: str, + *, + namespace: List[str] = [], + storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + index_cache_size: Optional[int] = None, + ) -> AsyncTable: + """Open an existing table from the namespace.""" + table_id = namespace + [name] + request = DescribeTableRequest(id=table_id) + response = self._ns.describe_table(request) + + # Merge storage options: self.storage_options < user options < namespace options + merged_storage_options = dict(self.storage_options) + if storage_options: + merged_storage_options.update(storage_options) + if response.storage_options: + merged_storage_options.update(response.storage_options) + + # Create a storage options provider if not provided by user + if storage_options_provider is None and response.storage_options is not None: + storage_options_provider = LanceNamespaceStorageOptionsProvider( + namespace=self._ns, + table_id=table_id, + ) + + # Open table in a thread + def _open_table(): + temp_conn = LanceDBConnection( + response.location, + read_consistency_interval=self.read_consistency_interval, + storage_options=merged_storage_options, + session=self.session, + ) + + return LanceTable.open( + temp_conn, + name, + namespace=namespace, + storage_options=merged_storage_options, + storage_options_provider=storage_options_provider, + index_cache_size=index_cache_size, + location=response.location, + ) + + lance_table = await asyncio.to_thread(_open_table) + return lance_table._table + + async def drop_table(self, name: str, namespace: List[str] = []): + """Drop a table from the namespace.""" + table_id = namespace + [name] + request = DropTableRequest(id=table_id) + self._ns.drop_table(request) + + async def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: List[str] = [], + new_namespace: List[str] = [], + ): + """Rename is not supported for namespace connections.""" + raise NotImplementedError( + "rename_table is not supported for namespace connections" + ) + + async def drop_database(self): + """Deprecated method.""" + raise NotImplementedError( + "drop_database is deprecated, use drop_all_tables instead" + ) + + async def drop_all_tables(self, namespace: List[str] = []): + """Drop all tables in the namespace.""" + table_names = await self.table_names(namespace=namespace) + for table_name in table_names: + await self.drop_table(table_name, namespace=namespace) + + async def list_namespaces( + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, + ) -> Iterable[str]: + """ + List child namespaces under the given namespace. + + Parameters + ---------- + namespace : Optional[List[str]] + The parent namespace to list children from. + If None, lists root-level namespaces. + page_token : Optional[str] + Pagination token for listing results. + limit : int + Maximum number of namespaces to return. + + Returns + ------- + Iterable[str] + Names of child namespaces. + """ + request = ListNamespacesRequest( + id=namespace, page_token=page_token, limit=limit + ) + response = self._ns.list_namespaces(request) + return response.namespaces if response.namespaces else [] + + async def create_namespace(self, namespace: List[str]) -> None: + """ + Create a new namespace. + + Parameters + ---------- + namespace : List[str] + The namespace path to create. + """ + request = CreateNamespaceRequest(id=namespace) + self._ns.create_namespace(request) + + async def drop_namespace(self, namespace: List[str]) -> None: + """ + Drop a namespace. + + Parameters + ---------- + namespace : List[str] + The namespace path to drop. + """ + request = DropNamespaceRequest(id=namespace) + self._ns.drop_namespace(request) + + def connect_namespace( impl: str, properties: Dict[str, str], @@ -541,3 +830,62 @@ def connect_namespace( storage_options=storage_options, session=session, ) + + +def connect_namespace_async( + impl: str, + properties: Dict[str, str], + *, + read_consistency_interval: Optional[timedelta] = None, + storage_options: Optional[Dict[str, str]] = None, + session: Optional[Session] = None, +) -> AsyncLanceNamespaceDBConnection: + """ + Connect to a LanceDB database through a namespace (returns async connection). + + This function is synchronous but returns an AsyncLanceNamespaceDBConnection + that provides async methods for all database operations. + + Parameters + ---------- + impl : str + The namespace implementation to use. For examples: + - "dir" for DirectoryNamespace + - "rest" for REST-based namespace + - Full module path for custom implementations + properties : Dict[str, str] + Configuration properties for the namespace implementation. + Different namespace implemenation has different config properties. + For example, use DirectoryNamespace with {"root": "/path/to/directory"} + read_consistency_interval : Optional[timedelta] + The interval at which to check for updates to the table from other + processes. If None, then consistency is not checked. + storage_options : Optional[Dict[str, str]] + Additional options for the storage backend + session : Optional[Session] + A session to use for this connection + + Returns + ------- + AsyncLanceNamespaceDBConnection + An async namespace-based connection to LanceDB + + Examples + -------- + >>> import lancedb + >>> # This function is sync, but returns an async connection + >>> db = lancedb.connect_namespace_async("dir", {"root": "/path/to/db"}) + >>> # Use async methods on the connection + >>> async def use_db(): + ... tables = await db.table_names() + ... table = await db.create_table("my_table", schema=schema) + """ + namespace = namespace_connect(impl, properties) + + # Return the async namespace-based connection + return AsyncLanceNamespaceDBConnection( + namespace, + read_consistency_interval=read_consistency_interval, + storage_options=storage_options, + session=session, + ) diff --git a/python/python/lancedb/query.py b/python/python/lancedb/query.py index 0367e9dc..1d32051a 100644 --- a/python/python/lancedb/query.py +++ b/python/python/lancedb/query.py @@ -14,6 +14,7 @@ from typing import ( Literal, Optional, Tuple, + Type, TypeVar, Union, Any, diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 5bdade6d..e8b37ea0 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -1717,6 +1717,7 @@ class LanceTable(Table): ): self._conn = connection self._namespace = namespace + self._location = location # Store location for use in _dataset_path if _async is not None: self._table = _async else: @@ -1794,6 +1795,10 @@ class LanceTable(Table): @cached_property def _dataset_path(self) -> str: # Cacheable since it's deterministic + # If table was opened with explicit location (e.g., from namespace), + # use that location directly instead of constructing from base URI + if self._location is not None: + return self._location return _table_path(self._conn.uri, self.name) def to_lance(self, **kwargs) -> lance.LanceDataset: @@ -2681,6 +2686,7 @@ class LanceTable(Table): self = cls.__new__(cls) self._conn = db self._namespace = namespace + self._location = location if data_storage_version is not None: warnings.warn( diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index 0fee1ab0..aaf2b326 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -423,3 +423,218 @@ class TestNamespaceConnection: db.drop_table("same_name_table", namespace=["namespace_b"]) db.drop_namespace(["namespace_a"]) db.drop_namespace(["namespace_b"]) + + +@pytest.mark.asyncio +class TestAsyncNamespaceConnection: + """Test async namespace-based LanceDB connection using DirectoryNamespace.""" + + def setup_method(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + + def teardown_method(self): + """Clean up test fixtures.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + async def test_connect_namespace_async(self): + """Test connecting to LanceDB through DirectoryNamespace asynchronously.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + + # Should be an AsyncLanceNamespaceDBConnection + assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection) + + # Initially no tables in root + table_names = await db.table_names() + assert len(list(table_names)) == 0 + + async def test_create_table_async(self): + """Test creating a table asynchronously through namespace.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + + # Create a child namespace first + await db.create_namespace(["test_ns"]) + + # Define schema for empty table + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("text", pa.string()), + ] + ) + + # Create empty table in child namespace + table = await db.create_table( + "test_table", schema=schema, namespace=["test_ns"] + ) + assert table is not None + assert isinstance(table, lancedb.AsyncTable) + + # Table should appear in child namespace + table_names = await db.table_names(namespace=["test_ns"]) + assert "test_table" in list(table_names) + + async def test_open_table_async(self): + """Test opening an existing table asynchronously through namespace.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + + # Create a child namespace first + await db.create_namespace(["test_ns"]) + + # Create a table with schema in child namespace + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + await db.create_table("test_table", schema=schema, namespace=["test_ns"]) + + # Open the table + table = await db.open_table("test_table", namespace=["test_ns"]) + assert table is not None + assert isinstance(table, lancedb.AsyncTable) + + # Test write operation - add data to the table + test_data = [ + {"id": 1, "vector": [1.0, 2.0]}, + {"id": 2, "vector": [3.0, 4.0]}, + {"id": 3, "vector": [5.0, 6.0]}, + ] + await table.add(test_data) + + # Test read operation - query the table + result = await table.to_arrow() + assert len(result) == 3 + assert result.schema.field("id").type == pa.int64() + assert result.schema.field("vector").type == pa.list_(pa.float32(), 2) + + # Verify data content + result_df = result.to_pandas() + assert result_df["id"].tolist() == [1, 2, 3] + assert [v.tolist() for v in result_df["vector"]] == [ + [1.0, 2.0], + [3.0, 4.0], + [5.0, 6.0], + ] + + # Test update operation + await table.update({"id": 20}, where="id = 2") + result = await table.to_arrow() + result_df = result.to_pandas().sort_values("id").reset_index(drop=True) + assert result_df["id"].tolist() == [1, 3, 20] + + # Test delete operation + await table.delete("id = 1") + result = await table.to_arrow() + assert len(result) == 2 + result_df = result.to_pandas().sort_values("id").reset_index(drop=True) + assert result_df["id"].tolist() == [3, 20] + + async def test_drop_table_async(self): + """Test dropping a table asynchronously through namespace.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + + # Create a child namespace first + await db.create_namespace(["test_ns"]) + + # Create tables in child namespace + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + await db.create_table("table1", schema=schema, namespace=["test_ns"]) + await db.create_table("table2", schema=schema, namespace=["test_ns"]) + + # Verify both tables exist in child namespace + table_names = list(await db.table_names(namespace=["test_ns"])) + assert "table1" in table_names + assert "table2" in table_names + assert len(table_names) == 2 + + # Drop one table + await db.drop_table("table1", namespace=["test_ns"]) + + # Verify only table2 remains + table_names = list(await db.table_names(namespace=["test_ns"])) + assert "table1" not in table_names + assert "table2" in table_names + assert len(table_names) == 1 + + async def test_namespace_operations_async(self): + """Test namespace management operations asynchronously.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + + # Initially no namespaces + namespaces = await db.list_namespaces() + assert len(list(namespaces)) == 0 + + # Create a namespace + await db.create_namespace(["test_namespace"]) + + # Verify namespace exists + namespaces = list(await db.list_namespaces()) + assert "test_namespace" in namespaces + assert len(namespaces) == 1 + + # Create table in namespace + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + table = await db.create_table( + "test_table", schema=schema, namespace=["test_namespace"] + ) + assert table is not None + + # Verify table exists in namespace + tables_in_namespace = list(await db.table_names(namespace=["test_namespace"])) + assert "test_table" in tables_in_namespace + assert len(tables_in_namespace) == 1 + + # Drop table from namespace + await db.drop_table("test_table", namespace=["test_namespace"]) + + # Verify table no longer exists in namespace + tables_in_namespace = list(await db.table_names(namespace=["test_namespace"])) + assert len(tables_in_namespace) == 0 + + # Drop namespace + await db.drop_namespace(["test_namespace"]) + + # Verify namespace no longer exists + namespaces = list(await db.list_namespaces()) + assert len(namespaces) == 0 + + async def test_drop_all_tables_async(self): + """Test dropping all tables asynchronously through namespace.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + + # Create a child namespace first + await db.create_namespace(["test_ns"]) + + # Create multiple tables in child namespace + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + for i in range(3): + await db.create_table(f"table{i}", schema=schema, namespace=["test_ns"]) + + # Verify tables exist in child namespace + table_names = await db.table_names(namespace=["test_ns"]) + assert len(list(table_names)) == 3 + + # Drop all tables in child namespace + await db.drop_all_tables(namespace=["test_ns"]) + + # Verify all tables are gone from child namespace + table_names = await db.table_names(namespace=["test_ns"]) + assert len(list(table_names)) == 0