diff --git a/Cargo.lock b/Cargo.lock index 978996a1..16ffc63f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5071,6 +5071,7 @@ dependencies = [ "futures", "lance-core", "lance-io", + "lance-namespace", "lancedb", "pin-project", "pyo3", diff --git a/python/Cargo.toml b/python/Cargo.toml index 189b4f18..c1460dac 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -18,6 +18,7 @@ arrow = { version = "56.2", features = ["pyarrow"] } async-trait = "0.1" lancedb = { path = "../rust/lancedb", default-features = false } lance-core.workspace = true +lance-namespace.workspace = true lance-io.workspace = true env_logger.workspace = true pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] } diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index d4269510..2cf04a1b 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -5,6 +5,13 @@ import pyarrow as pa from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS from .io import StorageOptionsProvider +from lance_namespace import ( + ListNamespacesResponse, + CreateNamespaceResponse, + DropNamespaceResponse, + DescribeNamespaceResponse, + ListTablesResponse, +) from .remote import ClientConfig class Session: @@ -26,18 +33,38 @@ class Connection(object): async def close(self): ... async def list_namespaces( self, - namespace: Optional[List[str]], - page_token: Optional[str], - limit: Optional[int], - ) -> List[str]: ... - async def create_namespace(self, namespace: List[str]) -> None: ... - async def drop_namespace(self, namespace: List[str]) -> None: ... + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListNamespacesResponse: ... + async def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: ... + async def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: ... + async def describe_namespace( + self, + namespace: List[str], + ) -> DescribeNamespaceResponse: ... + async def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: ... async def table_names( self, namespace: Optional[List[str]], start_after: Optional[str], limit: Optional[int], - ) -> list[str]: ... + ) -> list[str]: ... # Deprecated: Use list_tables instead async def create_table( self, name: str, diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 38a7a334..4b41f023 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -22,6 +22,13 @@ from lancedb.embeddings.registry import EmbeddingFunctionRegistry from lancedb.common import data_to_reader, sanitize_uri, validate_schema from lancedb.background_loop import LOOP +from lance_namespace import ( + ListNamespacesResponse, + CreateNamespaceResponse, + DropNamespaceResponse, + DescribeNamespaceResponse, + ListTablesResponse, +) from . import __version__ from ._lancedb import connect as lancedb_connect # type: ignore @@ -48,6 +55,12 @@ if TYPE_CHECKING: from .io import StorageOptionsProvider from ._lancedb import Session +from .namespace_utils import ( + _normalize_create_namespace_mode, + _normalize_drop_namespace_mode, + _normalize_drop_namespace_behavior, +) + class DBConnection(EnforceOverrides): """An active LanceDB connection interface.""" @@ -56,8 +69,8 @@ class DBConnection(EnforceOverrides): self, namespace: Optional[List[str]] = None, page_token: Optional[str] = None, - limit: int = 10, - ) -> Iterable[str]: + limit: Optional[int] = None, + ) -> ListNamespacesResponse: """List immediate child namespace names in the given namespace. Parameters @@ -66,43 +79,119 @@ class DBConnection(EnforceOverrides): The parent namespace to list namespaces in. Empty list represents root namespace. page_token: str, optional - The token to use for pagination. If not present, start from the beginning. - limit: int, default 10 - The size of the page to return. + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. Returns ------- - Iterable of str - List of immediate child namespace names + ListNamespacesResponse + Response containing namespace names and optional page_token for pagination. """ if namespace is None: namespace = [] - return [] + return ListNamespacesResponse(namespaces=[], page_token=None) - def create_namespace(self, namespace: List[str]) -> None: + def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: """Create a new namespace. Parameters ---------- namespace: List[str] The namespace identifier to create. + mode: str, optional + Creation mode - "create" (fail if exists), "exist_ok" (skip if exists), + or "overwrite" (replace if exists). Case insensitive. + properties: Dict[str, str], optional + Properties to set on the namespace. + + Returns + ------- + CreateNamespaceResponse + Response containing the properties of the created namespace. """ raise NotImplementedError( "Namespace operations are not supported for this connection type" ) - def drop_namespace(self, namespace: List[str]) -> None: + def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: """Drop a namespace. Parameters ---------- namespace: List[str] The namespace identifier to drop. + mode: str, optional + Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive. + behavior: str, optional + Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE"). + Case insensitive. + + Returns + ------- + DropNamespaceResponse + Response containing properties and transaction_id if applicable. """ raise NotImplementedError( "Namespace operations are not supported for this connection type" ) + def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse: + """Describe a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to describe. + + Returns + ------- + DescribeNamespaceResponse + Response containing the namespace properties. + """ + raise NotImplementedError( + "Namespace operations are not supported for this connection type" + ) + + def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: + """List all tables in this database with pagination support. + + Parameters + ---------- + namespace: List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. + page_token: str, optional + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. + + Returns + ------- + ListTablesResponse + Response containing table names and optional page_token for pagination. + """ + raise NotImplementedError( + "list_tables is not supported for this connection type" + ) + @abstractmethod def table_names( self, @@ -557,8 +646,8 @@ class LanceDBConnection(DBConnection): self, namespace: Optional[List[str]] = None, page_token: Optional[str] = None, - limit: int = 10, - ) -> Iterable[str]: + limit: Optional[int] = None, + ) -> ListNamespacesResponse: """List immediate child namespace names in the given namespace. Parameters @@ -567,14 +656,15 @@ class LanceDBConnection(DBConnection): The parent namespace to list namespaces in. None or empty list represents root namespace. page_token: str, optional - The token to use for pagination. If not present, start from the beginning. - limit: int, default 10 - The size of the page to return. + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. Returns ------- - Iterable of str - List of immediate child namespace names + ListNamespacesResponse + Response containing namespace names and optional page_token for pagination. """ if namespace is None: namespace = [] @@ -585,26 +675,111 @@ class LanceDBConnection(DBConnection): ) @override - def create_namespace(self, namespace: List[str]) -> None: + def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: """Create a new namespace. Parameters ---------- namespace: List[str] The namespace identifier to create. + mode: str, optional + Creation mode - "create" (fail if exists), "exist_ok" (skip if exists), + or "overwrite" (replace if exists). Case insensitive. + properties: Dict[str, str], optional + Properties to set on the namespace. + + Returns + ------- + CreateNamespaceResponse + Response containing the properties of the created namespace. """ - LOOP.run(self._conn.create_namespace(namespace=namespace)) + return LOOP.run( + self._conn.create_namespace( + namespace=namespace, mode=mode, properties=properties + ) + ) @override - def drop_namespace(self, namespace: List[str]) -> None: + def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: """Drop a namespace. Parameters ---------- namespace: List[str] The namespace identifier to drop. + mode: str, optional + Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive. + behavior: str, optional + Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE"). + Case insensitive. + + Returns + ------- + DropNamespaceResponse + Response containing properties and transaction_id if applicable. """ - return LOOP.run(self._conn.drop_namespace(namespace=namespace)) + return LOOP.run( + self._conn.drop_namespace(namespace=namespace, mode=mode, behavior=behavior) + ) + + @override + def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse: + """Describe a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to describe. + + Returns + ------- + DescribeNamespaceResponse + Response containing the namespace properties. + """ + return LOOP.run(self._conn.describe_namespace(namespace=namespace)) + + @override + def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: + """List all tables in this database with pagination support. + + Parameters + ---------- + namespace: List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. + page_token: str, optional + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. + + Returns + ------- + ListTablesResponse + Response containing table names and optional page_token for pagination. + """ + if namespace is None: + namespace = [] + return LOOP.run( + self._conn.list_tables( + namespace=namespace, page_token=page_token, limit=limit + ) + ) @override def table_names( @@ -616,6 +791,9 @@ class LanceDBConnection(DBConnection): ) -> Iterable[str]: """Get the names of all tables in the database. The names are sorted. + .. deprecated:: + Use :meth:`list_tables` instead, which provides proper pagination support. + Parameters ---------- namespace: List[str], optional @@ -630,6 +808,13 @@ class LanceDBConnection(DBConnection): Iterator of str. A list of table names. """ + import warnings + + warnings.warn( + "table_names() is deprecated, use list_tables() instead", + DeprecationWarning, + stacklevel=2, + ) if namespace is None: namespace = [] return LOOP.run( @@ -944,8 +1129,8 @@ class AsyncConnection(object): self, namespace: Optional[List[str]] = None, page_token: Optional[str] = None, - limit: int = 10, - ) -> Iterable[str]: + limit: Optional[int] = None, + ) -> ListNamespacesResponse: """List immediate child namespace names in the given namespace. Parameters @@ -955,39 +1140,128 @@ class AsyncConnection(object): None or empty list represents root namespace. page_token: str, optional The token to use for pagination. If not present, start from the beginning. - limit: int, default 10 - The size of the page to return. + limit: int, optional + The maximum number of results to return. Returns ------- - Iterable of str - List of immediate child namespace names (not full paths) + ListNamespacesResponse + Response containing namespace names and optional pagination token """ if namespace is None: namespace = [] - return await self._inner.list_namespaces( + result = await self._inner.list_namespaces( namespace=namespace, page_token=page_token, limit=limit ) + return ListNamespacesResponse(**result) - async def create_namespace(self, namespace: List[str]) -> None: + async def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: """Create a new namespace. Parameters ---------- namespace: List[str] The namespace identifier to create. - """ - await self._inner.create_namespace(namespace) + mode: str, optional + Creation mode - "create", "exist_ok", or "overwrite". Case insensitive. + properties: Dict[str, str], optional + Properties to associate with the namespace - async def drop_namespace(self, namespace: List[str]) -> None: + Returns + ------- + CreateNamespaceResponse + Response containing namespace properties + """ + result = await self._inner.create_namespace( + namespace, + mode=_normalize_create_namespace_mode(mode), + properties=properties, + ) + return CreateNamespaceResponse(**result) + + async def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: """Drop a namespace. Parameters ---------- namespace: List[str] The namespace identifier to drop. + mode: str, optional + Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive. + behavior: str, optional + Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE"). + Case insensitive. + + Returns + ------- + DropNamespaceResponse + Response containing properties and transaction_id if applicable. """ - await self._inner.drop_namespace(namespace) + result = await self._inner.drop_namespace( + namespace, + mode=_normalize_drop_namespace_mode(mode), + behavior=_normalize_drop_namespace_behavior(behavior), + ) + return DropNamespaceResponse(**result) + + async def describe_namespace( + self, namespace: List[str] + ) -> DescribeNamespaceResponse: + """Describe a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to describe. + + Returns + ------- + DescribeNamespaceResponse + Response containing the namespace properties. + """ + result = await self._inner.describe_namespace(namespace) + return DescribeNamespaceResponse(**result) + + async def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: + """List all tables in this database with pagination support. + + Parameters + ---------- + namespace: List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. + page_token: str, optional + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. + + Returns + ------- + ListTablesResponse + Response containing table names and optional page_token for pagination. + """ + if namespace is None: + namespace = [] + result = await self._inner.list_tables( + namespace=namespace, page_token=page_token, limit=limit + ) + return ListTablesResponse(**result) async def table_names( self, @@ -998,6 +1272,9 @@ class AsyncConnection(object): ) -> Iterable[str]: """List all tables in this database, in sorted order + .. deprecated:: + Use :meth:`list_tables` instead, which provides proper pagination support. + Parameters ---------- namespace: List[str], optional @@ -1016,6 +1293,13 @@ class AsyncConnection(object): ------- Iterable of str """ + import warnings + + warnings.warn( + "table_names() is deprecated, use list_tables() instead", + DeprecationWarning, + stacklevel=2, + ) if namespace is None: namespace = [] return await self._inner.table_names( diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index ea8e5fcc..6165372c 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -23,7 +23,29 @@ from datetime import timedelta import pyarrow as pa from lancedb.db import DBConnection, LanceDBConnection +from lancedb.namespace_utils import ( + _normalize_create_namespace_mode, + _normalize_drop_namespace_mode, + _normalize_drop_namespace_behavior, +) from lancedb.io import StorageOptionsProvider +from lance_namespace import ( + LanceNamespace, + connect as namespace_connect, + CreateNamespaceResponse, + DescribeNamespaceResponse, + DropNamespaceResponse, + ListNamespacesResponse, + ListTablesResponse, + ListTablesRequest, + DescribeTableRequest, + DescribeNamespaceRequest, + DropTableRequest, + ListNamespacesRequest, + CreateNamespaceRequest, + DropNamespaceRequest, + CreateEmptyTableRequest, +) from lancedb.table import AsyncTable, LanceTable, Table from lancedb.util import validate_table_name from lancedb.common import DATA @@ -31,19 +53,9 @@ from lancedb.pydantic import LanceModel from lancedb.embeddings import EmbeddingFunctionConfig from ._lancedb import Session -from lance_namespace import LanceNamespace, connect as namespace_connect -from lance_namespace_urllib3_client.models import ( - ListTablesRequest, - DescribeTableRequest, - DropTableRequest, - ListNamespacesRequest, - CreateNamespaceRequest, - DropNamespaceRequest, - CreateEmptyTableRequest, - JsonArrowSchema, - JsonArrowField, - JsonArrowDataType, -) +from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema +from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField +from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType: @@ -241,6 +253,19 @@ class LanceNamespaceDBConnection(DBConnection): *, namespace: Optional[List[str]] = None, ) -> Iterable[str]: + """ + List table names in the database. + + .. deprecated:: + Use :meth:`list_tables` instead, which provides proper pagination support. + """ + import warnings + + warnings.warn( + "table_names() is deprecated, use list_tables() instead", + DeprecationWarning, + stacklevel=2, + ) if namespace is None: namespace = [] request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) @@ -433,8 +458,8 @@ class LanceNamespaceDBConnection(DBConnection): self, namespace: Optional[List[str]] = None, page_token: Optional[str] = None, - limit: int = 10, - ) -> Iterable[str]: + limit: Optional[int] = None, + ) -> ListNamespacesResponse: """ List child namespaces under the given namespace. @@ -444,14 +469,15 @@ class LanceNamespaceDBConnection(DBConnection): The parent namespace to list children from. If None, lists root-level namespaces. page_token : Optional[str] - Pagination token for listing results. - limit : int + Token for pagination. Use the token from a previous response + to get the next page of results. + limit : int, optional Maximum number of namespaces to return. Returns ------- - Iterable[str] - Names of child namespaces. + ListNamespacesResponse + Response containing namespace names and optional page_token for pagination. """ if namespace is None: namespace = [] @@ -459,10 +485,18 @@ class LanceNamespaceDBConnection(DBConnection): id=namespace, page_token=page_token, limit=limit ) response = self._ns.list_namespaces(request) - return response.namespaces if response.namespaces else [] + return ListNamespacesResponse( + namespaces=response.namespaces if response.namespaces else [], + page_token=response.page_token, + ) @override - def create_namespace(self, namespace: List[str]) -> None: + def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: """ Create a new namespace. @@ -470,12 +504,34 @@ class LanceNamespaceDBConnection(DBConnection): ---------- namespace : List[str] The namespace path to create. + mode : str, optional + Creation mode - "create" (fail if exists), "exist_ok" (skip if exists), + or "overwrite" (replace if exists). Case insensitive. + properties : Dict[str, str], optional + Properties to set on the namespace. + + Returns + ------- + CreateNamespaceResponse + Response containing the properties of the created namespace. """ - request = CreateNamespaceRequest(id=namespace) - self._ns.create_namespace(request) + request = CreateNamespaceRequest( + id=namespace, + mode=_normalize_create_namespace_mode(mode), + properties=properties, + ) + response = self._ns.create_namespace(request) + return CreateNamespaceResponse( + properties=response.properties if hasattr(response, "properties") else None + ) @override - def drop_namespace(self, namespace: List[str]) -> None: + def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: """ Drop a namespace. @@ -483,9 +539,87 @@ class LanceNamespaceDBConnection(DBConnection): ---------- namespace : List[str] The namespace path to drop. + mode : str, optional + Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive. + behavior : str, optional + Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE"). + Case insensitive. + + Returns + ------- + DropNamespaceResponse + Response containing properties and transaction_id if applicable. """ - request = DropNamespaceRequest(id=namespace) - self._ns.drop_namespace(request) + request = DropNamespaceRequest( + id=namespace, + mode=_normalize_drop_namespace_mode(mode), + behavior=_normalize_drop_namespace_behavior(behavior), + ) + response = self._ns.drop_namespace(request) + return DropNamespaceResponse( + properties=( + response.properties if hasattr(response, "properties") else None + ), + transaction_id=( + response.transaction_id if hasattr(response, "transaction_id") else None + ), + ) + + @override + def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse: + """ + Describe a namespace. + + Parameters + ---------- + namespace : List[str] + The namespace identifier to describe. + + Returns + ------- + DescribeNamespaceResponse + Response containing the namespace properties. + """ + request = DescribeNamespaceRequest(id=namespace) + response = self._ns.describe_namespace(request) + return DescribeNamespaceResponse( + properties=response.properties if hasattr(response, "properties") else None + ) + + @override + def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: + """ + List all tables in this database with pagination support. + + Parameters + ---------- + namespace : List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. + page_token : str, optional + Token for pagination. Use the token from a previous response + to get the next page of results. + limit : int, optional + The maximum number of results to return. + + Returns + ------- + ListTablesResponse + Response containing table names and optional page_token for pagination. + """ + if namespace is None: + namespace = [] + request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) + response = self._ns.list_tables(request) + return ListTablesResponse( + tables=response.tables if response.tables else [], + page_token=response.page_token, + ) def _lance_table_from_uri( self, @@ -563,7 +697,19 @@ class AsyncLanceNamespaceDBConnection: *, namespace: Optional[List[str]] = None, ) -> Iterable[str]: - """List table names in the namespace.""" + """ + List table names in the namespace. + + .. deprecated:: + Use :meth:`list_tables` instead, which provides proper pagination support. + """ + import warnings + + warnings.warn( + "table_names() is deprecated, use list_tables() instead", + DeprecationWarning, + stacklevel=2, + ) if namespace is None: namespace = [] request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) @@ -771,8 +917,8 @@ class AsyncLanceNamespaceDBConnection: self, namespace: Optional[List[str]] = None, page_token: Optional[str] = None, - limit: int = 10, - ) -> Iterable[str]: + limit: Optional[int] = None, + ) -> ListNamespacesResponse: """ List child namespaces under the given namespace. @@ -782,14 +928,15 @@ class AsyncLanceNamespaceDBConnection: The parent namespace to list children from. If None, lists root-level namespaces. page_token : Optional[str] - Pagination token for listing results. - limit : int + Token for pagination. Use the token from a previous response + to get the next page of results. + limit : int, optional Maximum number of namespaces to return. Returns ------- - Iterable[str] - Names of child namespaces. + ListNamespacesResponse + Response containing namespace names and optional page_token for pagination. """ if namespace is None: namespace = [] @@ -797,9 +944,17 @@ class AsyncLanceNamespaceDBConnection: id=namespace, page_token=page_token, limit=limit ) response = self._ns.list_namespaces(request) - return response.namespaces if response.namespaces else [] + return ListNamespacesResponse( + namespaces=response.namespaces if response.namespaces else [], + page_token=response.page_token, + ) - async def create_namespace(self, namespace: List[str]) -> None: + async def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: """ Create a new namespace. @@ -807,11 +962,33 @@ class AsyncLanceNamespaceDBConnection: ---------- namespace : List[str] The namespace path to create. - """ - request = CreateNamespaceRequest(id=namespace) - self._ns.create_namespace(request) + mode : str, optional + Creation mode - "create" (fail if exists), "exist_ok" (skip if exists), + or "overwrite" (replace if exists). Case insensitive. + properties : Dict[str, str], optional + Properties to set on the namespace. - async def drop_namespace(self, namespace: List[str]) -> None: + Returns + ------- + CreateNamespaceResponse + Response containing the properties of the created namespace. + """ + request = CreateNamespaceRequest( + id=namespace, + mode=_normalize_create_namespace_mode(mode), + properties=properties, + ) + response = self._ns.create_namespace(request) + return CreateNamespaceResponse( + properties=response.properties if hasattr(response, "properties") else None + ) + + async def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: """ Drop a namespace. @@ -819,9 +996,87 @@ class AsyncLanceNamespaceDBConnection: ---------- namespace : List[str] The namespace path to drop. + mode : str, optional + Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive. + behavior : str, optional + Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE"). + Case insensitive. + + Returns + ------- + DropNamespaceResponse + Response containing properties and transaction_id if applicable. """ - request = DropNamespaceRequest(id=namespace) - self._ns.drop_namespace(request) + request = DropNamespaceRequest( + id=namespace, + mode=_normalize_drop_namespace_mode(mode), + behavior=_normalize_drop_namespace_behavior(behavior), + ) + response = self._ns.drop_namespace(request) + return DropNamespaceResponse( + properties=( + response.properties if hasattr(response, "properties") else None + ), + transaction_id=( + response.transaction_id if hasattr(response, "transaction_id") else None + ), + ) + + async def describe_namespace( + self, namespace: List[str] + ) -> DescribeNamespaceResponse: + """ + Describe a namespace. + + Parameters + ---------- + namespace : List[str] + The namespace identifier to describe. + + Returns + ------- + DescribeNamespaceResponse + Response containing the namespace properties. + """ + request = DescribeNamespaceRequest(id=namespace) + response = self._ns.describe_namespace(request) + return DescribeNamespaceResponse( + properties=response.properties if hasattr(response, "properties") else None + ) + + async def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: + """ + List all tables in this database with pagination support. + + Parameters + ---------- + namespace : List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. + page_token : str, optional + Token for pagination. Use the token from a previous response + to get the next page of results. + limit : int, optional + The maximum number of results to return. + + Returns + ------- + ListTablesResponse + Response containing table names and optional page_token for pagination. + """ + if namespace is None: + namespace = [] + request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) + response = self._ns.list_tables(request) + return ListTablesResponse( + tables=response.tables if response.tables else [], + page_token=response.page_token, + ) def connect_namespace( diff --git a/python/python/lancedb/namespace_utils.py b/python/python/lancedb/namespace_utils.py new file mode 100644 index 00000000..eeeac514 --- /dev/null +++ b/python/python/lancedb/namespace_utils.py @@ -0,0 +1,27 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +"""Utility functions for namespace operations.""" + +from typing import Optional + + +def _normalize_create_namespace_mode(mode: Optional[str]) -> Optional[str]: + """Normalize create namespace mode to lowercase (API expects lowercase).""" + if mode is None: + return None + return mode.lower() + + +def _normalize_drop_namespace_mode(mode: Optional[str]) -> Optional[str]: + """Normalize drop namespace mode to uppercase (API expects uppercase).""" + if mode is None: + return None + return mode.upper() + + +def _normalize_drop_namespace_behavior(behavior: Optional[str]) -> Optional[str]: + """Normalize drop namespace behavior to uppercase (API expects uppercase).""" + if behavior is None: + return None + return behavior.upper() diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index 28c7c635..6c1fb0a4 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -23,6 +23,13 @@ import pyarrow as pa from ..common import DATA from ..db import DBConnection, LOOP from ..embeddings import EmbeddingFunctionConfig +from lance_namespace import ( + CreateNamespaceResponse, + DescribeNamespaceResponse, + DropNamespaceResponse, + ListNamespacesResponse, + ListTablesResponse, +) from ..pydantic import LanceModel from ..table import Table from ..util import validate_table_name @@ -106,8 +113,8 @@ class RemoteDBConnection(DBConnection): self, namespace: Optional[List[str]] = None, page_token: Optional[str] = None, - limit: int = 10, - ) -> Iterable[str]: + limit: Optional[int] = None, + ) -> ListNamespacesResponse: """List immediate child namespace names in the given namespace. Parameters @@ -116,14 +123,15 @@ class RemoteDBConnection(DBConnection): The parent namespace to list namespaces in. None or empty list represents root namespace. page_token: str, optional - The token to use for pagination. If not present, start from the beginning. - limit: int, default 10 - The size of the page to return. + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. Returns ------- - Iterable of str - List of immediate child namespace names + ListNamespacesResponse + Response containing namespace names and optional page_token for pagination. """ if namespace is None: namespace = [] @@ -134,26 +142,111 @@ class RemoteDBConnection(DBConnection): ) @override - def create_namespace(self, namespace: List[str]) -> None: + def create_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> CreateNamespaceResponse: """Create a new namespace. Parameters ---------- namespace: List[str] The namespace identifier to create. + mode: str, optional + Creation mode - "create" (fail if exists), "exist_ok" (skip if exists), + or "overwrite" (replace if exists). Case insensitive. + properties: Dict[str, str], optional + Properties to set on the namespace. + + Returns + ------- + CreateNamespaceResponse + Response containing the properties of the created namespace. """ - LOOP.run(self._conn.create_namespace(namespace=namespace)) + return LOOP.run( + self._conn.create_namespace( + namespace=namespace, mode=mode, properties=properties + ) + ) @override - def drop_namespace(self, namespace: List[str]) -> None: + def drop_namespace( + self, + namespace: List[str], + mode: Optional[str] = None, + behavior: Optional[str] = None, + ) -> DropNamespaceResponse: """Drop a namespace. Parameters ---------- namespace: List[str] The namespace identifier to drop. + mode: str, optional + Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive. + behavior: str, optional + Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE"). + Case insensitive. + + Returns + ------- + DropNamespaceResponse + Response containing properties and transaction_id if applicable. """ - return LOOP.run(self._conn.drop_namespace(namespace=namespace)) + return LOOP.run( + self._conn.drop_namespace(namespace=namespace, mode=mode, behavior=behavior) + ) + + @override + def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse: + """Describe a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to describe. + + Returns + ------- + DescribeNamespaceResponse + Response containing the namespace properties. + """ + return LOOP.run(self._conn.describe_namespace(namespace=namespace)) + + @override + def list_tables( + self, + namespace: Optional[List[str]] = None, + page_token: Optional[str] = None, + limit: Optional[int] = None, + ) -> ListTablesResponse: + """List all tables in this database with pagination support. + + Parameters + ---------- + namespace: List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. + page_token: str, optional + Token for pagination. Use the token from a previous response + to get the next page of results. + limit: int, optional + The maximum number of results to return. + + Returns + ------- + ListTablesResponse + Response containing table names and optional page_token for pagination. + """ + if namespace is None: + namespace = [] + return LOOP.run( + self._conn.list_tables( + namespace=namespace, page_token=page_token, limit=limit + ) + ) @override def table_names( @@ -165,6 +258,9 @@ class RemoteDBConnection(DBConnection): ) -> Iterable[str]: """List the names of all tables in the database. + .. deprecated:: + Use :meth:`list_tables` instead, which provides proper pagination support. + Parameters ---------- namespace: List[str], default [] @@ -179,6 +275,13 @@ class RemoteDBConnection(DBConnection): ------- An iterator of table names. """ + import warnings + + warnings.warn( + "table_names() is deprecated, use list_tables() instead", + DeprecationWarning, + stacklevel=2, + ) if namespace is None: namespace = [] return LOOP.run( diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index 3aadba0b..e7c2beb9 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -892,7 +892,7 @@ def test_local_namespace_operations(tmp_path): db = lancedb.connect(tmp_path) # Test list_namespaces returns empty list for root namespace - namespaces = list(db.list_namespaces()) + namespaces = db.list_namespaces().namespaces assert namespaces == [] # Test list_namespaces with non-empty namespace raises NotImplementedError @@ -900,7 +900,7 @@ def test_local_namespace_operations(tmp_path): NotImplementedError, match="Namespace operations are not supported for listing database", ): - list(db.list_namespaces(namespace=["test"])) + db.list_namespaces(namespace=["test"]) def test_local_create_namespace_not_supported(tmp_path): diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index aaf2b326..70bc046b 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -279,13 +279,13 @@ class TestNamespaceConnection: db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) # Initially no namespaces - assert len(list(db.list_namespaces())) == 0 + assert len(db.list_namespaces().namespaces) == 0 # Create a namespace db.create_namespace(["test_namespace"]) # Verify namespace exists - namespaces = list(db.list_namespaces()) + namespaces = db.list_namespaces().namespaces assert "test_namespace" in namespaces assert len(namespaces) == 1 @@ -322,7 +322,7 @@ class TestNamespaceConnection: db.drop_namespace(["test_namespace"]) # Verify namespace no longer exists - namespaces = list(db.list_namespaces()) + namespaces = db.list_namespaces().namespaces assert len(namespaces) == 0 def test_namespace_with_tables_cannot_be_dropped(self): @@ -570,13 +570,13 @@ class TestAsyncNamespaceConnection: # Initially no namespaces namespaces = await db.list_namespaces() - assert len(list(namespaces)) == 0 + assert len(namespaces.namespaces) == 0 # Create a namespace await db.create_namespace(["test_namespace"]) # Verify namespace exists - namespaces = list(await db.list_namespaces()) + namespaces = (await db.list_namespaces()).namespaces assert "test_namespace" in namespaces assert len(namespaces) == 1 @@ -608,7 +608,7 @@ class TestAsyncNamespaceConnection: await db.drop_namespace(["test_namespace"]) # Verify namespace no longer exists - namespaces = list(await db.list_namespaces()) + namespaces = (await db.list_namespaces()).namespaces assert len(namespaces) == 0 async def test_drop_all_tables_async(self): diff --git a/python/src/connection.rs b/python/src/connection.rs index 981b9cb6..92b77f55 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -10,8 +10,9 @@ use lancedb::{ }; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, - pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, - Python, + pyclass, pyfunction, pymethods, + types::{PyDict, PyDictMethods}, + Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, Python, }; use pyo3_async_runtimes::tokio::future_into_py; @@ -292,40 +293,155 @@ impl Connection { limit: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); - future_into_py(self_.py(), async move { - use lancedb::database::ListNamespacesRequest; + let py = self_.py(); + future_into_py(py, async move { + use lance_namespace::models::ListNamespacesRequest; let request = ListNamespacesRequest { - namespace, + id: if namespace.is_empty() { + None + } else { + Some(namespace) + }, page_token, - limit, + limit: limit.map(|l| l as i32), }; - inner.list_namespaces(request).await.infer_error() + let response = inner.list_namespaces(request).await.infer_error()?; + Python::with_gil(|py| -> PyResult> { + let dict = PyDict::new(py); + dict.set_item("namespaces", response.namespaces)?; + dict.set_item("page_token", response.page_token)?; + Ok(dict.unbind()) + }) }) } - #[pyo3(signature = (namespace,))] + #[pyo3(signature = (namespace, mode=None, properties=None))] pub fn create_namespace( self_: PyRef<'_, Self>, namespace: Vec, + mode: Option, + properties: Option>, ) -> PyResult> { let inner = self_.get_inner()?.clone(); - future_into_py(self_.py(), async move { - use lancedb::database::CreateNamespaceRequest; - let request = CreateNamespaceRequest { namespace }; - inner.create_namespace(request).await.infer_error() + let py = self_.py(); + future_into_py(py, async move { + use lance_namespace::models::{create_namespace_request, CreateNamespaceRequest}; + let mode_enum = mode.and_then(|m| match m.to_lowercase().as_str() { + "create" => Some(create_namespace_request::Mode::Create), + "exist_ok" => Some(create_namespace_request::Mode::ExistOk), + "overwrite" => Some(create_namespace_request::Mode::Overwrite), + _ => None, + }); + let request = CreateNamespaceRequest { + id: if namespace.is_empty() { + None + } else { + Some(namespace) + }, + mode: mode_enum, + properties, + }; + let response = inner.create_namespace(request).await.infer_error()?; + Python::with_gil(|py| -> PyResult> { + let dict = PyDict::new(py); + dict.set_item("properties", response.properties)?; + Ok(dict.unbind()) + }) + }) + } + + #[pyo3(signature = (namespace, mode=None, behavior=None))] + pub fn drop_namespace( + self_: PyRef<'_, Self>, + namespace: Vec, + mode: Option, + behavior: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + let py = self_.py(); + future_into_py(py, async move { + use lance_namespace::models::{drop_namespace_request, DropNamespaceRequest}; + let mode_enum = mode.and_then(|m| match m.to_uppercase().as_str() { + "SKIP" => Some(drop_namespace_request::Mode::Skip), + "FAIL" => Some(drop_namespace_request::Mode::Fail), + _ => None, + }); + let behavior_enum = behavior.and_then(|b| match b.to_uppercase().as_str() { + "RESTRICT" => Some(drop_namespace_request::Behavior::Restrict), + "CASCADE" => Some(drop_namespace_request::Behavior::Cascade), + _ => None, + }); + let request = DropNamespaceRequest { + id: if namespace.is_empty() { + None + } else { + Some(namespace) + }, + mode: mode_enum, + behavior: behavior_enum, + }; + let response = inner.drop_namespace(request).await.infer_error()?; + Python::with_gil(|py| -> PyResult> { + let dict = PyDict::new(py); + dict.set_item("properties", response.properties)?; + dict.set_item("transaction_id", response.transaction_id)?; + Ok(dict.unbind()) + }) }) } #[pyo3(signature = (namespace,))] - pub fn drop_namespace( + pub fn describe_namespace( self_: PyRef<'_, Self>, namespace: Vec, ) -> PyResult> { let inner = self_.get_inner()?.clone(); - future_into_py(self_.py(), async move { - use lancedb::database::DropNamespaceRequest; - let request = DropNamespaceRequest { namespace }; - inner.drop_namespace(request).await.infer_error() + let py = self_.py(); + future_into_py(py, async move { + use lance_namespace::models::DescribeNamespaceRequest; + let request = DescribeNamespaceRequest { + id: if namespace.is_empty() { + None + } else { + Some(namespace) + }, + }; + let response = inner.describe_namespace(request).await.infer_error()?; + Python::with_gil(|py| -> PyResult> { + let dict = PyDict::new(py); + dict.set_item("properties", response.properties)?; + Ok(dict.unbind()) + }) + }) + } + + #[pyo3(signature = (namespace=vec![], page_token=None, limit=None))] + pub fn list_tables( + self_: PyRef<'_, Self>, + namespace: Vec, + page_token: Option, + limit: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + let py = self_.py(); + future_into_py(py, async move { + use lance_namespace::models::ListTablesRequest; + let request = ListTablesRequest { + id: if namespace.is_empty() { + None + } else { + Some(namespace) + }, + page_token, + limit: limit.map(|l| l as i32), + }; + let response = inner.list_tables(request).await.infer_error()?; + Python::with_gil(|py| -> PyResult> { + let dict = PyDict::new(py); + dict.set_item("tables", response.tables)?; + dict.set_item("page_token", response.page_token)?; + Ok(dict.unbind()) + }) }) } } diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 20320e9e..963bce6d 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -9,6 +9,11 @@ use std::sync::Arc; use arrow_array::RecordBatchReader; use arrow_schema::{Field, SchemaRef}; use lance::dataset::ReadParams; +use lance_namespace::models::{ + CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest, + DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest, + ListNamespacesResponse, ListTablesRequest, ListTablesResponse, +}; #[cfg(feature = "aws")] use object_store::aws::AwsCredential; @@ -17,9 +22,8 @@ use crate::database::listing::{ ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS, }; use crate::database::{ - CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode, - CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, - OpenTableRequest, ReadConsistency, TableNamesRequest, + CloneTableRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database, + DatabaseOptions, OpenTableRequest, ReadConsistency, TableNamesRequest, }; use crate::embeddings::{ EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings, @@ -74,6 +78,7 @@ impl TableNamesBuilder { } /// Execute the table names operation + #[allow(deprecated)] pub async fn execute(self) -> Result> { self.parent.clone().table_names(self.request).await } @@ -768,20 +773,42 @@ impl Connection { } /// List immediate child namespace names in the given namespace - pub async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result> { + pub async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result { self.internal.list_namespaces(request).await } /// Create a new namespace - pub async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> { + pub async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> Result { self.internal.create_namespace(request).await } /// Drop a namespace - pub async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> { + pub async fn drop_namespace( + &self, + request: DropNamespaceRequest, + ) -> Result { self.internal.drop_namespace(request).await } + /// Describe a namespace + pub async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> Result { + self.internal.describe_namespace(request).await + } + + /// List tables with pagination support + pub async fn list_tables(&self, request: ListTablesRequest) -> Result { + self.internal.list_tables(request).await + } + /// Get the in-memory embedding registry. /// It's important to note that the embedding registry is not persisted across connections. /// So if a table contains embeddings, you will need to make sure that you are using a connection that has the same embedding functions registered diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 5009b5fd..5e594aaf 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -24,6 +24,11 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; use lance::dataset::ReadParams; use lance_datafusion::utils::StreamingWriteSource; +use lance_namespace::models::{ + CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest, + DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest, + ListNamespacesResponse, ListTablesRequest, ListTablesResponse, +}; use lance_namespace::LanceNamespace; use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt}; @@ -37,32 +42,7 @@ pub trait DatabaseOptions { fn serialize_into_map(&self, map: &mut HashMap); } -/// A request to list namespaces in the database -#[derive(Clone, Debug, Default)] -pub struct ListNamespacesRequest { - /// The parent namespace to list namespaces in. Empty list represents root namespace. - pub namespace: Vec, - /// If present, only return names that come lexicographically after the supplied value. - pub page_token: Option, - /// The maximum number of namespace names to return - pub limit: Option, -} - -/// A request to create a namespace -#[derive(Clone, Debug)] -pub struct CreateNamespaceRequest { - /// The namespace identifier to create - pub namespace: Vec, -} - -/// A request to drop a namespace -#[derive(Clone, Debug)] -pub struct DropNamespaceRequest { - /// The namespace identifier to drop - pub namespace: Vec, -} - -/// A request to list names of tables in the database +/// A request to list names of tables in the database (deprecated, use ListTablesRequest) #[derive(Clone, Debug, Default)] pub struct TableNamesRequest { /// The namespace to list tables in. Empty list represents root namespace. @@ -268,13 +248,30 @@ pub trait Database: /// Get the read consistency of the database async fn read_consistency(&self) -> Result; /// List immediate child namespace names in the given namespace - async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result>; + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result; /// Create a new namespace - async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()>; + async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> Result; /// Drop a namespace - async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()>; + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result; + /// Describe a namespace (get its properties) + async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> Result; /// List the names of tables in the database + /// + /// # Deprecated + /// Use `list_tables` instead for pagination support + #[deprecated(note = "Use list_tables instead")] async fn table_names(&self, request: TableNamesRequest) -> Result>; + /// List tables in the database with pagination support + async fn list_tables(&self, request: ListTablesRequest) -> Result; /// Create a table in the database async fn create_table(&self, request: CreateTableRequest) -> Result>; /// Clone a table in the database. diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 471e9a44..05b11e2f 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -24,10 +24,15 @@ use crate::io::object_store::MirroringObjectStoreWrapper; use crate::table::NativeTable; use crate::utils::validate_table_name; +use lance_namespace::models::{ + CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest, + DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest, + ListNamespacesResponse, ListTablesRequest, ListTablesResponse, +}; + use super::{ - BaseTable, CloneTableRequest, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, - Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, - TableNamesRequest, + BaseTable, CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, + OpenTableRequest, TableNamesRequest, }; /// File extension to indicate a lance table @@ -663,14 +668,20 @@ impl ListingDatabase { #[async_trait::async_trait] impl Database for ListingDatabase { - async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result> { - if !request.namespace.is_empty() { + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result { + if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { return Err(Error::NotSupported { message: "Namespace operations are not supported for listing database".into(), }); } - Ok(Vec::new()) + Ok(ListNamespacesResponse { + namespaces: Vec::new(), + page_token: None, + }) } fn uri(&self) -> &str { @@ -689,13 +700,28 @@ impl Database for ListingDatabase { } } - async fn create_namespace(&self, _request: CreateNamespaceRequest) -> Result<()> { + async fn create_namespace( + &self, + _request: CreateNamespaceRequest, + ) -> Result { Err(Error::NotSupported { message: "Namespace operations are not supported for listing database".into(), }) } - async fn drop_namespace(&self, _request: DropNamespaceRequest) -> Result<()> { + async fn drop_namespace( + &self, + _request: DropNamespaceRequest, + ) -> Result { + Err(Error::NotSupported { + message: "Namespace operations are not supported for listing database".into(), + }) + } + + async fn describe_namespace( + &self, + _request: DescribeNamespaceRequest, + ) -> Result { Err(Error::NotSupported { message: "Namespace operations are not supported for listing database".into(), }) @@ -736,6 +762,57 @@ impl Database for ListingDatabase { Ok(f) } + async fn list_tables(&self, request: ListTablesRequest) -> Result { + if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + }); + } + let mut f = self + .object_store + .read_dir(self.base_path.clone()) + .await? + .iter() + .map(Path::new) + .filter(|path| { + let is_lance = path + .extension() + .and_then(|e| e.to_str()) + .map(|e| e == LANCE_EXTENSION); + is_lance.unwrap_or(false) + }) + .filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from))) + .collect::>(); + f.sort(); + + // Handle pagination with page_token + if let Some(ref page_token) = request.page_token { + let index = f + .iter() + .position(|name| name.as_str() > page_token.as_str()) + .unwrap_or(f.len()); + f.drain(0..index); + } + + // Determine if there's a next page + let next_page_token = if let Some(limit) = request.limit { + if f.len() > limit as usize { + let token = f[limit as usize].clone(); + f.truncate(limit as usize); + Some(token) + } else { + None + } + } else { + None + }; + + Ok(ListTablesResponse { + tables: f, + page_token: next_page_token, + }) + } + async fn create_table(&self, request: CreateTableRequest) -> Result> { // When namespace is not empty, location must be provided if !request.namespace.is_empty() && request.location.is_none() { @@ -951,6 +1028,7 @@ impl Database for ListingDatabase { self.drop_tables(vec![name.to_string()]).await } + #[allow(deprecated)] async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { // Check if namespace parameter is provided if !namespace.is_empty() { @@ -1037,6 +1115,7 @@ mod tests { .unwrap(); // Verify both tables exist + #[allow(deprecated)] let table_names = db.table_names(TableNamesRequest::default()).await.unwrap(); assert!(table_names.contains(&"source_table".to_string())); assert!(table_names.contains(&"cloned_table".to_string())); @@ -1733,6 +1812,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1786,6 +1866,7 @@ mod tests { mode: CreateTableMode::Create, write_options, location: None, + namespace_client: None, }) .await .unwrap(); @@ -1859,6 +1940,7 @@ mod tests { mode: CreateTableMode::Create, write_options, location: None, + namespace_client: None, }) .await .unwrap(); diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 8bec116d..88a45f8f 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -10,8 +10,10 @@ use async_trait::async_trait; use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider}; use lance_namespace::{ models::{ - CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest, - DropNamespaceRequest, DropTableRequest, ListNamespacesRequest, ListTablesRequest, + CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse, + DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest, + DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest, + ListNamespacesResponse, ListTablesRequest, ListTablesResponse, }, LanceNamespace, }; @@ -22,11 +24,8 @@ use crate::database::ReadConsistency; use crate::error::{Error, Result}; use super::{ - listing::ListingDatabase, BaseTable, CloneTableRequest, - CreateNamespaceRequest as DbCreateNamespaceRequest, CreateTableMode, - CreateTableRequest as DbCreateTableRequest, Database, - DropNamespaceRequest as DbDropNamespaceRequest, - ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest, + listing::ListingDatabase, BaseTable, CloneTableRequest, CreateTableMode, + CreateTableRequest as DbCreateTableRequest, Database, OpenTableRequest, TableNamesRequest, }; /// A database implementation that uses lance-namespace for table management @@ -154,68 +153,29 @@ impl Database for LanceNamespaceDatabase { } } - async fn list_namespaces(&self, request: DbListNamespacesRequest) -> Result> { - let ns_request = ListNamespacesRequest { - id: if request.namespace.is_empty() { - None - } else { - Some(request.namespace) - }, - page_token: request.page_token, - limit: request.limit.map(|l| l as i32), - }; - - let response = self - .namespace - .list_namespaces(ns_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to list namespaces: {}", e), - })?; - - Ok(response.namespaces) + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result { + Ok(self.namespace.list_namespaces(request).await?) } - async fn create_namespace(&self, request: DbCreateNamespaceRequest) -> Result<()> { - let ns_request = CreateNamespaceRequest { - id: if request.namespace.is_empty() { - None - } else { - Some(request.namespace) - }, - mode: None, - properties: None, - }; - - self.namespace - .create_namespace(ns_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to create namespace: {}", e), - })?; - - Ok(()) + async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> Result { + Ok(self.namespace.create_namespace(request).await?) } - async fn drop_namespace(&self, request: DbDropNamespaceRequest) -> Result<()> { - let ns_request = DropNamespaceRequest { - id: if request.namespace.is_empty() { - None - } else { - Some(request.namespace) - }, - mode: None, - behavior: None, - }; + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { + Ok(self.namespace.drop_namespace(request).await?) + } - self.namespace - .drop_namespace(ns_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to drop namespace: {}", e), - })?; - - Ok(()) + async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> Result { + Ok(self.namespace.describe_namespace(request).await?) } async fn table_names(&self, request: TableNamesRequest) -> Result> { @@ -229,17 +189,15 @@ impl Database for LanceNamespaceDatabase { limit: request.limit.map(|l| l as i32), }; - let response = - self.namespace - .list_tables(ns_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to list tables: {}", e), - })?; + let response = self.namespace.list_tables(ns_request).await?; Ok(response.tables) } + async fn list_tables(&self, request: ListTablesRequest) -> Result { + Ok(self.namespace.list_tables(request).await?) + } + async fn create_table(&self, request: DbCreateTableRequest) -> Result> { // Extract user-provided storage options from request let user_storage_options = request @@ -451,6 +409,7 @@ impl Database for LanceNamespaceDatabase { Ok(()) } + #[allow(deprecated)] async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { let tables = self .table_names(TableNamesRequest { @@ -477,7 +436,6 @@ impl Database for LanceNamespaceDatabase { mod tests { use super::*; use crate::connect_namespace; - use crate::database::CreateNamespaceRequest; use crate::query::ExecutableQuery; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray}; use arrow_schema::{DataType, Field, Schema}; @@ -590,7 +548,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); @@ -649,7 +609,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); @@ -711,7 +673,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); @@ -793,7 +757,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); @@ -847,7 +813,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); @@ -926,7 +894,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); @@ -958,7 +928,9 @@ mod tests { // Create a child namespace first conn.create_namespace(CreateNamespaceRequest { - namespace: vec!["test_ns".into()], + id: Some(vec!["test_ns".into()]), + mode: None, + properties: None, }) .await .expect("Failed to create namespace"); diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 3b1fbb37..1cdfe1d7 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -10,13 +10,17 @@ use http::StatusCode; use lance_io::object_store::StorageOptions; use moka::future::Cache; use reqwest::header::CONTENT_TYPE; -use serde::Deserialize; use tokio::task::spawn_blocking; +use lance_namespace::models::{ + CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest, + DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest, + ListNamespacesResponse, ListTablesRequest, ListTablesResponse, +}; + use crate::database::{ - CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode, - CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, - OpenTableRequest, ReadConsistency, TableNamesRequest, + CloneTableRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database, + DatabaseOptions, OpenTableRequest, ReadConsistency, TableNamesRequest, }; use crate::error::Result; use crate::table::BaseTable; @@ -180,11 +184,6 @@ impl RemoteDatabaseOptionsBuilder { } } -#[derive(Deserialize)] -struct ListTablesResponse { - tables: Vec, -} - #[derive(Debug)] pub struct RemoteDatabase { client: RestfulLanceDbClient, @@ -337,7 +336,6 @@ impl Database for RemoteDatabase { self.client .get(&format!("/v1/namespace/{}/table/list", namespace_id)) } else { - // TODO: use new API for all listing operations once stable self.client.get("/v1/table/") }; @@ -371,6 +369,44 @@ impl Database for RemoteDatabase { Ok(tables) } + async fn list_tables(&self, request: ListTablesRequest) -> Result { + let namespace_parts = request.id.as_deref().unwrap_or(&[]); + let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter); + let mut req = self + .client + .get(&format!("/v1/namespace/{}/table/list", namespace_id)); + + if let Some(limit) = request.limit { + req = req.query(&[("limit", limit)]); + } + if let Some(ref page_token) = request.page_token { + req = req.query(&[("page_token", page_token)]); + } + + let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let version = parse_server_version(&request_id, &rsp)?; + let response: ListTablesResponse = rsp.json().await.err_to_http(request_id)?; + + // Cache the tables for future use + let namespace_vec = namespace_parts.to_vec(); + for table in &response.tables { + let table_identifier = + build_table_identifier(table, &namespace_vec, &self.client.id_delimiter); + let cache_key = build_cache_key(table, &namespace_vec); + let remote_table = Arc::new(RemoteTable::new( + self.client.clone(), + table.clone(), + namespace_vec.clone(), + table_identifier.clone(), + version.clone(), + )); + self.table_cache.insert(cache_key, remote_table).await; + } + + Ok(response) + } + async fn create_table(&self, request: CreateTableRequest) -> Result> { let data = match request.data { CreateTableData::Data(data) => data, @@ -591,53 +627,101 @@ impl Database for RemoteDatabase { }) } - async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result> { - let namespace_id = - build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result { + let namespace_parts = request.id.as_deref().unwrap_or(&[]); + let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter); let mut req = self .client .get(&format!("/v1/namespace/{}/list", namespace_id)); if let Some(limit) = request.limit { req = req.query(&[("limit", limit)]); } - if let Some(page_token) = request.page_token { + if let Some(ref page_token) = request.page_token { req = req.query(&[("page_token", page_token)]); } let (request_id, resp) = self.client.send(req).await?; let resp = self.client.check_response(&request_id, resp).await?; - #[derive(Deserialize)] - struct ListNamespacesResponse { - namespaces: Vec, - } - - let parsed: ListNamespacesResponse = resp.json().await.map_err(|e| Error::Runtime { - message: format!("Failed to parse namespace response: {}", e), - })?; - Ok(parsed.namespaces) + resp.json().await.err_to_http(request_id) } - async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> { - let namespace_id = - build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); - let req = self + async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> Result { + let namespace_parts = request.id.as_deref().unwrap_or(&[]); + let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter); + let mut req = self .client .post(&format!("/v1/namespace/{}/create", namespace_id)); + + // Build request body with mode and properties if present + #[derive(serde::Serialize)] + struct CreateNamespaceRequestBody { + #[serde(skip_serializing_if = "Option::is_none")] + mode: Option, + #[serde(skip_serializing_if = "Option::is_none")] + properties: Option>, + } + + let body = CreateNamespaceRequestBody { + mode: request.mode.as_ref().map(|m| format!("{:?}", m)), + properties: request.properties, + }; + + req = req.json(&body); let (request_id, resp) = self.client.send(req).await?; - self.client.check_response(&request_id, resp).await?; - Ok(()) + let resp = self.client.check_response(&request_id, resp).await?; + + resp.json().await.err_to_http(request_id) } - async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> { - let namespace_id = - build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); - let req = self + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { + let namespace_parts = request.id.as_deref().unwrap_or(&[]); + let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter); + let mut req = self .client .post(&format!("/v1/namespace/{}/drop", namespace_id)); + + // Build request body with mode and behavior if present + #[derive(serde::Serialize)] + struct DropNamespaceRequestBody { + #[serde(skip_serializing_if = "Option::is_none")] + mode: Option, + #[serde(skip_serializing_if = "Option::is_none")] + behavior: Option, + } + + let body = DropNamespaceRequestBody { + mode: request.mode.as_ref().map(|m| format!("{:?}", m)), + behavior: request.behavior.as_ref().map(|b| format!("{:?}", b)), + }; + + req = req.json(&body); let (request_id, resp) = self.client.send(req).await?; - self.client.check_response(&request_id, resp).await?; - Ok(()) + let resp = self.client.check_response(&request_id, resp).await?; + + resp.json().await.err_to_http(request_id) + } + + async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> Result { + let namespace_parts = request.id.as_deref().unwrap_or(&[]); + let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter); + let req = self + .client + .get(&format!("/v1/namespace/{}/describe", namespace_id)); + + let (request_id, resp) = self.client.send(req).await?; + let resp = self.client.check_response(&request_id, resp).await?; + + resp.json().await.err_to_http(request_id) } fn as_any(&self) -> &dyn std::any::Any {