refactor!: use namespace models directly for namespace operations (#2806)

1. Use generated models in lance-namespace for request response models
to avoid multiple layers of conversions
2. Make sure the API is consistent with the namespace spec
3. Deprecate the table_names API in favor of the list_tables API in
namespace that allows full pagination support without the need to have
sorted table names
4. Add describe_namespace API which was a miss in the original
implementation
This commit is contained in:
Jack Ye
2025-12-02 22:41:04 -08:00
committed by GitHub
parent 9d638cb3c7
commit d1efc6ad8a
15 changed files with 1250 additions and 274 deletions

1
Cargo.lock generated
View File

@@ -5071,6 +5071,7 @@ dependencies = [
"futures", "futures",
"lance-core", "lance-core",
"lance-io", "lance-io",
"lance-namespace",
"lancedb", "lancedb",
"pin-project", "pin-project",
"pyo3", "pyo3",

View File

@@ -18,6 +18,7 @@ arrow = { version = "56.2", features = ["pyarrow"] }
async-trait = "0.1" async-trait = "0.1"
lancedb = { path = "../rust/lancedb", default-features = false } lancedb = { path = "../rust/lancedb", default-features = false }
lance-core.workspace = true lance-core.workspace = true
lance-namespace.workspace = true
lance-io.workspace = true lance-io.workspace = true
env_logger.workspace = true env_logger.workspace = true
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] } pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] }

View File

@@ -5,6 +5,13 @@ import pyarrow as pa
from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
from .io import StorageOptionsProvider from .io import StorageOptionsProvider
from lance_namespace import (
ListNamespacesResponse,
CreateNamespaceResponse,
DropNamespaceResponse,
DescribeNamespaceResponse,
ListTablesResponse,
)
from .remote import ClientConfig from .remote import ClientConfig
class Session: class Session:
@@ -26,18 +33,38 @@ class Connection(object):
async def close(self): ... async def close(self): ...
async def list_namespaces( async def list_namespaces(
self, self,
namespace: Optional[List[str]], namespace: Optional[List[str]] = None,
page_token: Optional[str], page_token: Optional[str] = None,
limit: Optional[int], limit: Optional[int] = None,
) -> List[str]: ... ) -> ListNamespacesResponse: ...
async def create_namespace(self, namespace: List[str]) -> None: ... async def create_namespace(
async def drop_namespace(self, namespace: List[str]) -> None: ... self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse: ...
async def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse: ...
async def describe_namespace(
self,
namespace: List[str],
) -> DescribeNamespaceResponse: ...
async def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse: ...
async def table_names( async def table_names(
self, self,
namespace: Optional[List[str]], namespace: Optional[List[str]],
start_after: Optional[str], start_after: Optional[str],
limit: Optional[int], limit: Optional[int],
) -> list[str]: ... ) -> list[str]: ... # Deprecated: Use list_tables instead
async def create_table( async def create_table(
self, self,
name: str, name: str,

View File

@@ -22,6 +22,13 @@ from lancedb.embeddings.registry import EmbeddingFunctionRegistry
from lancedb.common import data_to_reader, sanitize_uri, validate_schema from lancedb.common import data_to_reader, sanitize_uri, validate_schema
from lancedb.background_loop import LOOP from lancedb.background_loop import LOOP
from lance_namespace import (
ListNamespacesResponse,
CreateNamespaceResponse,
DropNamespaceResponse,
DescribeNamespaceResponse,
ListTablesResponse,
)
from . import __version__ from . import __version__
from ._lancedb import connect as lancedb_connect # type: ignore from ._lancedb import connect as lancedb_connect # type: ignore
@@ -48,6 +55,12 @@ if TYPE_CHECKING:
from .io import StorageOptionsProvider from .io import StorageOptionsProvider
from ._lancedb import Session from ._lancedb import Session
from .namespace_utils import (
_normalize_create_namespace_mode,
_normalize_drop_namespace_mode,
_normalize_drop_namespace_behavior,
)
class DBConnection(EnforceOverrides): class DBConnection(EnforceOverrides):
"""An active LanceDB connection interface.""" """An active LanceDB connection interface."""
@@ -56,8 +69,8 @@ class DBConnection(EnforceOverrides):
self, self,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
page_token: Optional[str] = None, page_token: Optional[str] = None,
limit: int = 10, limit: Optional[int] = None,
) -> Iterable[str]: ) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace. """List immediate child namespace names in the given namespace.
Parameters Parameters
@@ -66,43 +79,119 @@ class DBConnection(EnforceOverrides):
The parent namespace to list namespaces in. The parent namespace to list namespaces in.
Empty list represents root namespace. Empty list represents root namespace.
page_token: str, optional page_token: str, optional
The token to use for pagination. If not present, start from the beginning. Token for pagination. Use the token from a previous response
limit: int, default 10 to get the next page of results.
The size of the page to return. limit: int, optional
The maximum number of results to return.
Returns Returns
------- -------
Iterable of str ListNamespacesResponse
List of immediate child namespace names Response containing namespace names and optional page_token for pagination.
""" """
if namespace is None: if namespace is None:
namespace = [] namespace = []
return [] return ListNamespacesResponse(namespaces=[], page_token=None)
def create_namespace(self, namespace: List[str]) -> None: def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace. """Create a new namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to create. The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
""" """
raise NotImplementedError( raise NotImplementedError(
"Namespace operations are not supported for this connection type" "Namespace operations are not supported for this connection type"
) )
def drop_namespace(self, namespace: List[str]) -> None: def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace. """Drop a namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to drop. The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
""" """
raise NotImplementedError( raise NotImplementedError(
"Namespace operations are not supported for this connection type" "Namespace operations are not supported for this connection type"
) )
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
raise NotImplementedError(
"Namespace operations are not supported for this connection type"
)
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
raise NotImplementedError(
"list_tables is not supported for this connection type"
)
@abstractmethod @abstractmethod
def table_names( def table_names(
self, self,
@@ -557,8 +646,8 @@ class LanceDBConnection(DBConnection):
self, self,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
page_token: Optional[str] = None, page_token: Optional[str] = None,
limit: int = 10, limit: Optional[int] = None,
) -> Iterable[str]: ) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace. """List immediate child namespace names in the given namespace.
Parameters Parameters
@@ -567,14 +656,15 @@ class LanceDBConnection(DBConnection):
The parent namespace to list namespaces in. The parent namespace to list namespaces in.
None or empty list represents root namespace. None or empty list represents root namespace.
page_token: str, optional page_token: str, optional
The token to use for pagination. If not present, start from the beginning. Token for pagination. Use the token from a previous response
limit: int, default 10 to get the next page of results.
The size of the page to return. limit: int, optional
The maximum number of results to return.
Returns Returns
------- -------
Iterable of str ListNamespacesResponse
List of immediate child namespace names Response containing namespace names and optional page_token for pagination.
""" """
if namespace is None: if namespace is None:
namespace = [] namespace = []
@@ -585,26 +675,111 @@ class LanceDBConnection(DBConnection):
) )
@override @override
def create_namespace(self, namespace: List[str]) -> None: def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace. """Create a new namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to create. The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
""" """
LOOP.run(self._conn.create_namespace(namespace=namespace)) return LOOP.run(
self._conn.create_namespace(
namespace=namespace, mode=mode, properties=properties
)
)
@override @override
def drop_namespace(self, namespace: List[str]) -> None: def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace. """Drop a namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to drop. The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
""" """
return LOOP.run(self._conn.drop_namespace(namespace=namespace)) return LOOP.run(
self._conn.drop_namespace(namespace=namespace, mode=mode, behavior=behavior)
)
@override
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
return LOOP.run(self._conn.describe_namespace(namespace=namespace))
@override
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
return LOOP.run(
self._conn.list_tables(
namespace=namespace, page_token=page_token, limit=limit
)
)
@override @override
def table_names( def table_names(
@@ -616,6 +791,9 @@ class LanceDBConnection(DBConnection):
) -> Iterable[str]: ) -> Iterable[str]:
"""Get the names of all tables in the database. The names are sorted. """Get the names of all tables in the database. The names are sorted.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters Parameters
---------- ----------
namespace: List[str], optional namespace: List[str], optional
@@ -630,6 +808,13 @@ class LanceDBConnection(DBConnection):
Iterator of str. Iterator of str.
A list of table names. A list of table names.
""" """
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None: if namespace is None:
namespace = [] namespace = []
return LOOP.run( return LOOP.run(
@@ -944,8 +1129,8 @@ class AsyncConnection(object):
self, self,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
page_token: Optional[str] = None, page_token: Optional[str] = None,
limit: int = 10, limit: Optional[int] = None,
) -> Iterable[str]: ) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace. """List immediate child namespace names in the given namespace.
Parameters Parameters
@@ -955,39 +1140,128 @@ class AsyncConnection(object):
None or empty list represents root namespace. None or empty list represents root namespace.
page_token: str, optional page_token: str, optional
The token to use for pagination. If not present, start from the beginning. The token to use for pagination. If not present, start from the beginning.
limit: int, default 10 limit: int, optional
The size of the page to return. The maximum number of results to return.
Returns Returns
------- -------
Iterable of str ListNamespacesResponse
List of immediate child namespace names (not full paths) Response containing namespace names and optional pagination token
""" """
if namespace is None: if namespace is None:
namespace = [] namespace = []
return await self._inner.list_namespaces( result = await self._inner.list_namespaces(
namespace=namespace, page_token=page_token, limit=limit namespace=namespace, page_token=page_token, limit=limit
) )
return ListNamespacesResponse(**result)
async def create_namespace(self, namespace: List[str]) -> None: async def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace. """Create a new namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to create. The namespace identifier to create.
""" mode: str, optional
await self._inner.create_namespace(namespace) Creation mode - "create", "exist_ok", or "overwrite". Case insensitive.
properties: Dict[str, str], optional
Properties to associate with the namespace
async def drop_namespace(self, namespace: List[str]) -> None: Returns
-------
CreateNamespaceResponse
Response containing namespace properties
"""
result = await self._inner.create_namespace(
namespace,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
return CreateNamespaceResponse(**result)
async def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace. """Drop a namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to drop. The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
""" """
await self._inner.drop_namespace(namespace) result = await self._inner.drop_namespace(
namespace,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
return DropNamespaceResponse(**result)
async def describe_namespace(
self, namespace: List[str]
) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
result = await self._inner.describe_namespace(namespace)
return DescribeNamespaceResponse(**result)
async def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
result = await self._inner.list_tables(
namespace=namespace, page_token=page_token, limit=limit
)
return ListTablesResponse(**result)
async def table_names( async def table_names(
self, self,
@@ -998,6 +1272,9 @@ class AsyncConnection(object):
) -> Iterable[str]: ) -> Iterable[str]:
"""List all tables in this database, in sorted order """List all tables in this database, in sorted order
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters Parameters
---------- ----------
namespace: List[str], optional namespace: List[str], optional
@@ -1016,6 +1293,13 @@ class AsyncConnection(object):
------- -------
Iterable of str Iterable of str
""" """
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None: if namespace is None:
namespace = [] namespace = []
return await self._inner.table_names( return await self._inner.table_names(

View File

@@ -23,7 +23,29 @@ from datetime import timedelta
import pyarrow as pa import pyarrow as pa
from lancedb.db import DBConnection, LanceDBConnection from lancedb.db import DBConnection, LanceDBConnection
from lancedb.namespace_utils import (
_normalize_create_namespace_mode,
_normalize_drop_namespace_mode,
_normalize_drop_namespace_behavior,
)
from lancedb.io import StorageOptionsProvider from lancedb.io import StorageOptionsProvider
from lance_namespace import (
LanceNamespace,
connect as namespace_connect,
CreateNamespaceResponse,
DescribeNamespaceResponse,
DropNamespaceResponse,
ListNamespacesResponse,
ListTablesResponse,
ListTablesRequest,
DescribeTableRequest,
DescribeNamespaceRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
CreateEmptyTableRequest,
)
from lancedb.table import AsyncTable, LanceTable, Table from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name from lancedb.util import validate_table_name
from lancedb.common import DATA from lancedb.common import DATA
@@ -31,19 +53,9 @@ from lancedb.pydantic import LanceModel
from lancedb.embeddings import EmbeddingFunctionConfig from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session from ._lancedb import Session
from lance_namespace import LanceNamespace, connect as namespace_connect from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema
from lance_namespace_urllib3_client.models import ( from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField
ListTablesRequest, from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType
DescribeTableRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
CreateEmptyTableRequest,
JsonArrowSchema,
JsonArrowField,
JsonArrowDataType,
)
def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType: def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType:
@@ -241,6 +253,19 @@ class LanceNamespaceDBConnection(DBConnection):
*, *,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
) -> Iterable[str]: ) -> Iterable[str]:
"""
List table names in the database.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None: if namespace is None:
namespace = [] namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
@@ -433,8 +458,8 @@ class LanceNamespaceDBConnection(DBConnection):
self, self,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
page_token: Optional[str] = None, page_token: Optional[str] = None,
limit: int = 10, limit: Optional[int] = None,
) -> Iterable[str]: ) -> ListNamespacesResponse:
""" """
List child namespaces under the given namespace. List child namespaces under the given namespace.
@@ -444,14 +469,15 @@ class LanceNamespaceDBConnection(DBConnection):
The parent namespace to list children from. The parent namespace to list children from.
If None, lists root-level namespaces. If None, lists root-level namespaces.
page_token : Optional[str] page_token : Optional[str]
Pagination token for listing results. Token for pagination. Use the token from a previous response
limit : int to get the next page of results.
limit : int, optional
Maximum number of namespaces to return. Maximum number of namespaces to return.
Returns Returns
------- -------
Iterable[str] ListNamespacesResponse
Names of child namespaces. Response containing namespace names and optional page_token for pagination.
""" """
if namespace is None: if namespace is None:
namespace = [] namespace = []
@@ -459,10 +485,18 @@ class LanceNamespaceDBConnection(DBConnection):
id=namespace, page_token=page_token, limit=limit id=namespace, page_token=page_token, limit=limit
) )
response = self._ns.list_namespaces(request) response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else [] return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
)
@override @override
def create_namespace(self, namespace: List[str]) -> None: def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
""" """
Create a new namespace. Create a new namespace.
@@ -470,12 +504,34 @@ class LanceNamespaceDBConnection(DBConnection):
---------- ----------
namespace : List[str] namespace : List[str]
The namespace path to create. The namespace path to create.
mode : str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties : Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
""" """
request = CreateNamespaceRequest(id=namespace) request = CreateNamespaceRequest(
self._ns.create_namespace(request) id=namespace,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@override @override
def drop_namespace(self, namespace: List[str]) -> None: def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
""" """
Drop a namespace. Drop a namespace.
@@ -483,9 +539,87 @@ class LanceNamespaceDBConnection(DBConnection):
---------- ----------
namespace : List[str] namespace : List[str]
The namespace path to drop. The namespace path to drop.
mode : str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior : str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
""" """
request = DropNamespaceRequest(id=namespace) request = DropNamespaceRequest(
self._ns.drop_namespace(request) id=namespace,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
),
transaction_id=(
response.transaction_id if hasattr(response, "transaction_id") else None
),
)
@override
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""
Describe a namespace.
Parameters
----------
namespace : List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@override
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""
List all tables in this database with pagination support.
Parameters
----------
namespace : List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token : str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit : int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
)
def _lance_table_from_uri( def _lance_table_from_uri(
self, self,
@@ -563,7 +697,19 @@ class AsyncLanceNamespaceDBConnection:
*, *,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
) -> Iterable[str]: ) -> Iterable[str]:
"""List table names in the namespace.""" """
List table names in the namespace.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None: if namespace is None:
namespace = [] namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
@@ -771,8 +917,8 @@ class AsyncLanceNamespaceDBConnection:
self, self,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
page_token: Optional[str] = None, page_token: Optional[str] = None,
limit: int = 10, limit: Optional[int] = None,
) -> Iterable[str]: ) -> ListNamespacesResponse:
""" """
List child namespaces under the given namespace. List child namespaces under the given namespace.
@@ -782,14 +928,15 @@ class AsyncLanceNamespaceDBConnection:
The parent namespace to list children from. The parent namespace to list children from.
If None, lists root-level namespaces. If None, lists root-level namespaces.
page_token : Optional[str] page_token : Optional[str]
Pagination token for listing results. Token for pagination. Use the token from a previous response
limit : int to get the next page of results.
limit : int, optional
Maximum number of namespaces to return. Maximum number of namespaces to return.
Returns Returns
------- -------
Iterable[str] ListNamespacesResponse
Names of child namespaces. Response containing namespace names and optional page_token for pagination.
""" """
if namespace is None: if namespace is None:
namespace = [] namespace = []
@@ -797,9 +944,17 @@ class AsyncLanceNamespaceDBConnection:
id=namespace, page_token=page_token, limit=limit id=namespace, page_token=page_token, limit=limit
) )
response = self._ns.list_namespaces(request) response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else [] return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
)
async def create_namespace(self, namespace: List[str]) -> None: async def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
""" """
Create a new namespace. Create a new namespace.
@@ -807,11 +962,33 @@ class AsyncLanceNamespaceDBConnection:
---------- ----------
namespace : List[str] namespace : List[str]
The namespace path to create. The namespace path to create.
""" mode : str, optional
request = CreateNamespaceRequest(id=namespace) Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
self._ns.create_namespace(request) or "overwrite" (replace if exists). Case insensitive.
properties : Dict[str, str], optional
Properties to set on the namespace.
async def drop_namespace(self, namespace: List[str]) -> None: Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
request = CreateNamespaceRequest(
id=namespace,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
async def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
""" """
Drop a namespace. Drop a namespace.
@@ -819,9 +996,87 @@ class AsyncLanceNamespaceDBConnection:
---------- ----------
namespace : List[str] namespace : List[str]
The namespace path to drop. The namespace path to drop.
mode : str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior : str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
""" """
request = DropNamespaceRequest(id=namespace) request = DropNamespaceRequest(
self._ns.drop_namespace(request) id=namespace,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
),
transaction_id=(
response.transaction_id if hasattr(response, "transaction_id") else None
),
)
async def describe_namespace(
self, namespace: List[str]
) -> DescribeNamespaceResponse:
"""
Describe a namespace.
Parameters
----------
namespace : List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
async def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""
List all tables in this database with pagination support.
Parameters
----------
namespace : List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token : str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit : int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
)
def connect_namespace( def connect_namespace(

View File

@@ -0,0 +1,27 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Utility functions for namespace operations."""
from typing import Optional
def _normalize_create_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize create namespace mode to lowercase (API expects lowercase)."""
if mode is None:
return None
return mode.lower()
def _normalize_drop_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize drop namespace mode to uppercase (API expects uppercase)."""
if mode is None:
return None
return mode.upper()
def _normalize_drop_namespace_behavior(behavior: Optional[str]) -> Optional[str]:
"""Normalize drop namespace behavior to uppercase (API expects uppercase)."""
if behavior is None:
return None
return behavior.upper()

View File

@@ -23,6 +23,13 @@ import pyarrow as pa
from ..common import DATA from ..common import DATA
from ..db import DBConnection, LOOP from ..db import DBConnection, LOOP
from ..embeddings import EmbeddingFunctionConfig from ..embeddings import EmbeddingFunctionConfig
from lance_namespace import (
CreateNamespaceResponse,
DescribeNamespaceResponse,
DropNamespaceResponse,
ListNamespacesResponse,
ListTablesResponse,
)
from ..pydantic import LanceModel from ..pydantic import LanceModel
from ..table import Table from ..table import Table
from ..util import validate_table_name from ..util import validate_table_name
@@ -106,8 +113,8 @@ class RemoteDBConnection(DBConnection):
self, self,
namespace: Optional[List[str]] = None, namespace: Optional[List[str]] = None,
page_token: Optional[str] = None, page_token: Optional[str] = None,
limit: int = 10, limit: Optional[int] = None,
) -> Iterable[str]: ) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace. """List immediate child namespace names in the given namespace.
Parameters Parameters
@@ -116,14 +123,15 @@ class RemoteDBConnection(DBConnection):
The parent namespace to list namespaces in. The parent namespace to list namespaces in.
None or empty list represents root namespace. None or empty list represents root namespace.
page_token: str, optional page_token: str, optional
The token to use for pagination. If not present, start from the beginning. Token for pagination. Use the token from a previous response
limit: int, default 10 to get the next page of results.
The size of the page to return. limit: int, optional
The maximum number of results to return.
Returns Returns
------- -------
Iterable of str ListNamespacesResponse
List of immediate child namespace names Response containing namespace names and optional page_token for pagination.
""" """
if namespace is None: if namespace is None:
namespace = [] namespace = []
@@ -134,26 +142,111 @@ class RemoteDBConnection(DBConnection):
) )
@override @override
def create_namespace(self, namespace: List[str]) -> None: def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace. """Create a new namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to create. The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
""" """
LOOP.run(self._conn.create_namespace(namespace=namespace)) return LOOP.run(
self._conn.create_namespace(
namespace=namespace, mode=mode, properties=properties
)
)
@override @override
def drop_namespace(self, namespace: List[str]) -> None: def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace. """Drop a namespace.
Parameters Parameters
---------- ----------
namespace: List[str] namespace: List[str]
The namespace identifier to drop. The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
""" """
return LOOP.run(self._conn.drop_namespace(namespace=namespace)) return LOOP.run(
self._conn.drop_namespace(namespace=namespace, mode=mode, behavior=behavior)
)
@override
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
return LOOP.run(self._conn.describe_namespace(namespace=namespace))
@override
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
return LOOP.run(
self._conn.list_tables(
namespace=namespace, page_token=page_token, limit=limit
)
)
@override @override
def table_names( def table_names(
@@ -165,6 +258,9 @@ class RemoteDBConnection(DBConnection):
) -> Iterable[str]: ) -> Iterable[str]:
"""List the names of all tables in the database. """List the names of all tables in the database.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters Parameters
---------- ----------
namespace: List[str], default [] namespace: List[str], default []
@@ -179,6 +275,13 @@ class RemoteDBConnection(DBConnection):
------- -------
An iterator of table names. An iterator of table names.
""" """
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None: if namespace is None:
namespace = [] namespace = []
return LOOP.run( return LOOP.run(

View File

@@ -892,7 +892,7 @@ def test_local_namespace_operations(tmp_path):
db = lancedb.connect(tmp_path) db = lancedb.connect(tmp_path)
# Test list_namespaces returns empty list for root namespace # Test list_namespaces returns empty list for root namespace
namespaces = list(db.list_namespaces()) namespaces = db.list_namespaces().namespaces
assert namespaces == [] assert namespaces == []
# Test list_namespaces with non-empty namespace raises NotImplementedError # Test list_namespaces with non-empty namespace raises NotImplementedError
@@ -900,7 +900,7 @@ def test_local_namespace_operations(tmp_path):
NotImplementedError, NotImplementedError,
match="Namespace operations are not supported for listing database", match="Namespace operations are not supported for listing database",
): ):
list(db.list_namespaces(namespace=["test"])) db.list_namespaces(namespace=["test"])
def test_local_create_namespace_not_supported(tmp_path): def test_local_create_namespace_not_supported(tmp_path):

View File

@@ -279,13 +279,13 @@ class TestNamespaceConnection:
db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Initially no namespaces # Initially no namespaces
assert len(list(db.list_namespaces())) == 0 assert len(db.list_namespaces().namespaces) == 0
# Create a namespace # Create a namespace
db.create_namespace(["test_namespace"]) db.create_namespace(["test_namespace"])
# Verify namespace exists # Verify namespace exists
namespaces = list(db.list_namespaces()) namespaces = db.list_namespaces().namespaces
assert "test_namespace" in namespaces assert "test_namespace" in namespaces
assert len(namespaces) == 1 assert len(namespaces) == 1
@@ -322,7 +322,7 @@ class TestNamespaceConnection:
db.drop_namespace(["test_namespace"]) db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists # Verify namespace no longer exists
namespaces = list(db.list_namespaces()) namespaces = db.list_namespaces().namespaces
assert len(namespaces) == 0 assert len(namespaces) == 0
def test_namespace_with_tables_cannot_be_dropped(self): def test_namespace_with_tables_cannot_be_dropped(self):
@@ -570,13 +570,13 @@ class TestAsyncNamespaceConnection:
# Initially no namespaces # Initially no namespaces
namespaces = await db.list_namespaces() namespaces = await db.list_namespaces()
assert len(list(namespaces)) == 0 assert len(namespaces.namespaces) == 0
# Create a namespace # Create a namespace
await db.create_namespace(["test_namespace"]) await db.create_namespace(["test_namespace"])
# Verify namespace exists # Verify namespace exists
namespaces = list(await db.list_namespaces()) namespaces = (await db.list_namespaces()).namespaces
assert "test_namespace" in namespaces assert "test_namespace" in namespaces
assert len(namespaces) == 1 assert len(namespaces) == 1
@@ -608,7 +608,7 @@ class TestAsyncNamespaceConnection:
await db.drop_namespace(["test_namespace"]) await db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists # Verify namespace no longer exists
namespaces = list(await db.list_namespaces()) namespaces = (await db.list_namespaces()).namespaces
assert len(namespaces) == 0 assert len(namespaces) == 0
async def test_drop_all_tables_async(self): async def test_drop_all_tables_async(self):

View File

@@ -10,8 +10,9 @@ use lancedb::{
}; };
use pyo3::{ use pyo3::{
exceptions::{PyRuntimeError, PyValueError}, exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, pyclass, pyfunction, pymethods,
Python, types::{PyDict, PyDictMethods},
Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, Python,
}; };
use pyo3_async_runtimes::tokio::future_into_py; use pyo3_async_runtimes::tokio::future_into_py;
@@ -292,40 +293,155 @@ impl Connection {
limit: Option<u32>, limit: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> { ) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone(); let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move { let py = self_.py();
use lancedb::database::ListNamespacesRequest; future_into_py(py, async move {
use lance_namespace::models::ListNamespacesRequest;
let request = ListNamespacesRequest { let request = ListNamespacesRequest {
namespace, id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
page_token, page_token,
limit, limit: limit.map(|l| l as i32),
}; };
inner.list_namespaces(request).await.infer_error() let response = inner.list_namespaces(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("namespaces", response.namespaces)?;
dict.set_item("page_token", response.page_token)?;
Ok(dict.unbind())
})
}) })
} }
#[pyo3(signature = (namespace,))] #[pyo3(signature = (namespace, mode=None, properties=None))]
pub fn create_namespace( pub fn create_namespace(
self_: PyRef<'_, Self>, self_: PyRef<'_, Self>,
namespace: Vec<String>, namespace: Vec<String>,
mode: Option<String>,
properties: Option<std::collections::HashMap<String, String>>,
) -> PyResult<Bound<'_, PyAny>> { ) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone(); let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move { let py = self_.py();
use lancedb::database::CreateNamespaceRequest; future_into_py(py, async move {
let request = CreateNamespaceRequest { namespace }; use lance_namespace::models::{create_namespace_request, CreateNamespaceRequest};
inner.create_namespace(request).await.infer_error() let mode_enum = mode.and_then(|m| match m.to_lowercase().as_str() {
"create" => Some(create_namespace_request::Mode::Create),
"exist_ok" => Some(create_namespace_request::Mode::ExistOk),
"overwrite" => Some(create_namespace_request::Mode::Overwrite),
_ => None,
});
let request = CreateNamespaceRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
mode: mode_enum,
properties,
};
let response = inner.create_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("properties", response.properties)?;
Ok(dict.unbind())
})
})
}
#[pyo3(signature = (namespace, mode=None, behavior=None))]
pub fn drop_namespace(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
mode: Option<String>,
behavior: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::{drop_namespace_request, DropNamespaceRequest};
let mode_enum = mode.and_then(|m| match m.to_uppercase().as_str() {
"SKIP" => Some(drop_namespace_request::Mode::Skip),
"FAIL" => Some(drop_namespace_request::Mode::Fail),
_ => None,
});
let behavior_enum = behavior.and_then(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Some(drop_namespace_request::Behavior::Restrict),
"CASCADE" => Some(drop_namespace_request::Behavior::Cascade),
_ => None,
});
let request = DropNamespaceRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
mode: mode_enum,
behavior: behavior_enum,
};
let response = inner.drop_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("properties", response.properties)?;
dict.set_item("transaction_id", response.transaction_id)?;
Ok(dict.unbind())
})
}) })
} }
#[pyo3(signature = (namespace,))] #[pyo3(signature = (namespace,))]
pub fn drop_namespace( pub fn describe_namespace(
self_: PyRef<'_, Self>, self_: PyRef<'_, Self>,
namespace: Vec<String>, namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> { ) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone(); let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move { let py = self_.py();
use lancedb::database::DropNamespaceRequest; future_into_py(py, async move {
let request = DropNamespaceRequest { namespace }; use lance_namespace::models::DescribeNamespaceRequest;
inner.drop_namespace(request).await.infer_error() let request = DescribeNamespaceRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
};
let response = inner.describe_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("properties", response.properties)?;
Ok(dict.unbind())
})
})
}
#[pyo3(signature = (namespace=vec![], page_token=None, limit=None))]
pub fn list_tables(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
page_token: Option<String>,
limit: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::ListTablesRequest;
let request = ListTablesRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
page_token,
limit: limit.map(|l| l as i32),
};
let response = inner.list_tables(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("tables", response.tables)?;
dict.set_item("page_token", response.page_token)?;
Ok(dict.unbind())
})
}) })
} }
} }

View File

@@ -9,6 +9,11 @@ use std::sync::Arc;
use arrow_array::RecordBatchReader; use arrow_array::RecordBatchReader;
use arrow_schema::{Field, SchemaRef}; use arrow_schema::{Field, SchemaRef};
use lance::dataset::ReadParams; use lance::dataset::ReadParams;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
#[cfg(feature = "aws")] #[cfg(feature = "aws")]
use object_store::aws::AwsCredential; use object_store::aws::AwsCredential;
@@ -17,9 +22,8 @@ use crate::database::listing::{
ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS, ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS,
}; };
use crate::database::{ use crate::database::{
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode, CloneTableRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, DatabaseOptions, OpenTableRequest, ReadConsistency, TableNamesRequest,
OpenTableRequest, ReadConsistency, TableNamesRequest,
}; };
use crate::embeddings::{ use crate::embeddings::{
EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings, EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings,
@@ -74,6 +78,7 @@ impl TableNamesBuilder {
} }
/// Execute the table names operation /// Execute the table names operation
#[allow(deprecated)]
pub async fn execute(self) -> Result<Vec<String>> { pub async fn execute(self) -> Result<Vec<String>> {
self.parent.clone().table_names(self.request).await self.parent.clone().table_names(self.request).await
} }
@@ -768,20 +773,42 @@ impl Connection {
} }
/// List immediate child namespace names in the given namespace /// List immediate child namespace names in the given namespace
pub async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> { pub async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
self.internal.list_namespaces(request).await self.internal.list_namespaces(request).await
} }
/// Create a new namespace /// Create a new namespace
pub async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> { pub async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
self.internal.create_namespace(request).await self.internal.create_namespace(request).await
} }
/// Drop a namespace /// Drop a namespace
pub async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> { pub async fn drop_namespace(
&self,
request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
self.internal.drop_namespace(request).await self.internal.drop_namespace(request).await
} }
/// Describe a namespace
pub async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
self.internal.describe_namespace(request).await
}
/// List tables with pagination support
pub async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
self.internal.list_tables(request).await
}
/// Get the in-memory embedding registry. /// Get the in-memory embedding registry.
/// It's important to note that the embedding registry is not persisted across connections. /// It's important to note that the embedding registry is not persisted across connections.
/// So if a table contains embeddings, you will need to make sure that you are using a connection that has the same embedding functions registered /// So if a table contains embeddings, you will need to make sure that you are using a connection that has the same embedding functions registered

View File

@@ -24,6 +24,11 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream; use futures::stream;
use lance::dataset::ReadParams; use lance::dataset::ReadParams;
use lance_datafusion::utils::StreamingWriteSource; use lance_datafusion::utils::StreamingWriteSource;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use lance_namespace::LanceNamespace; use lance_namespace::LanceNamespace;
use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt}; use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt};
@@ -37,32 +42,7 @@ pub trait DatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>); fn serialize_into_map(&self, map: &mut HashMap<String, String>);
} }
/// A request to list namespaces in the database /// A request to list names of tables in the database (deprecated, use ListTablesRequest)
#[derive(Clone, Debug, Default)]
pub struct ListNamespacesRequest {
/// The parent namespace to list namespaces in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// If present, only return names that come lexicographically after the supplied value.
pub page_token: Option<String>,
/// The maximum number of namespace names to return
pub limit: Option<u32>,
}
/// A request to create a namespace
#[derive(Clone, Debug)]
pub struct CreateNamespaceRequest {
/// The namespace identifier to create
pub namespace: Vec<String>,
}
/// A request to drop a namespace
#[derive(Clone, Debug)]
pub struct DropNamespaceRequest {
/// The namespace identifier to drop
pub namespace: Vec<String>,
}
/// A request to list names of tables in the database
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct TableNamesRequest { pub struct TableNamesRequest {
/// The namespace to list tables in. Empty list represents root namespace. /// The namespace to list tables in. Empty list represents root namespace.
@@ -268,13 +248,30 @@ pub trait Database:
/// Get the read consistency of the database /// Get the read consistency of the database
async fn read_consistency(&self) -> Result<ReadConsistency>; async fn read_consistency(&self) -> Result<ReadConsistency>;
/// List immediate child namespace names in the given namespace /// List immediate child namespace names in the given namespace
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>>; async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse>;
/// Create a new namespace /// Create a new namespace
async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()>; async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse>;
/// Drop a namespace /// Drop a namespace
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()>; async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse>;
/// Describe a namespace (get its properties)
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse>;
/// List the names of tables in the database /// List the names of tables in the database
///
/// # Deprecated
/// Use `list_tables` instead for pagination support
#[deprecated(note = "Use list_tables instead")]
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>; async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>;
/// List tables in the database with pagination support
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse>;
/// Create a table in the database /// Create a table in the database
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>>; async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Clone a table in the database. /// Clone a table in the database.

View File

@@ -24,10 +24,15 @@ use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::NativeTable; use crate::table::NativeTable;
use crate::utils::validate_table_name; use crate::utils::validate_table_name;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use super::{ use super::{
BaseTable, CloneTableRequest, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, BaseTable, CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, OpenTableRequest, TableNamesRequest,
TableNamesRequest,
}; };
/// File extension to indicate a lance table /// File extension to indicate a lance table
@@ -663,14 +668,20 @@ impl ListingDatabase {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Database for ListingDatabase { impl Database for ListingDatabase {
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> { async fn list_namespaces(
if !request.namespace.is_empty() { &self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported { return Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(), message: "Namespace operations are not supported for listing database".into(),
}); });
} }
Ok(Vec::new()) Ok(ListNamespacesResponse {
namespaces: Vec::new(),
page_token: None,
})
} }
fn uri(&self) -> &str { fn uri(&self) -> &str {
@@ -689,13 +700,28 @@ impl Database for ListingDatabase {
} }
} }
async fn create_namespace(&self, _request: CreateNamespaceRequest) -> Result<()> { async fn create_namespace(
&self,
_request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
Err(Error::NotSupported { Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(), message: "Namespace operations are not supported for listing database".into(),
}) })
} }
async fn drop_namespace(&self, _request: DropNamespaceRequest) -> Result<()> { async fn drop_namespace(
&self,
_request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
}
async fn describe_namespace(
&self,
_request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
Err(Error::NotSupported { Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(), message: "Namespace operations are not supported for listing database".into(),
}) })
@@ -736,6 +762,57 @@ impl Database for ListingDatabase {
Ok(f) Ok(f)
} }
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
}
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter(|path| {
let is_lance = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
// Handle pagination with page_token
if let Some(ref page_token) = request.page_token {
let index = f
.iter()
.position(|name| name.as_str() > page_token.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
// Determine if there's a next page
let next_page_token = if let Some(limit) = request.limit {
if f.len() > limit as usize {
let token = f[limit as usize].clone();
f.truncate(limit as usize);
Some(token)
} else {
None
}
} else {
None
};
Ok(ListTablesResponse {
tables: f,
page_token: next_page_token,
})
}
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> { async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided // When namespace is not empty, location must be provided
if !request.namespace.is_empty() && request.location.is_none() { if !request.namespace.is_empty() && request.location.is_none() {
@@ -951,6 +1028,7 @@ impl Database for ListingDatabase {
self.drop_tables(vec![name.to_string()]).await self.drop_tables(vec![name.to_string()]).await
} }
#[allow(deprecated)]
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
// Check if namespace parameter is provided // Check if namespace parameter is provided
if !namespace.is_empty() { if !namespace.is_empty() {
@@ -1037,6 +1115,7 @@ mod tests {
.unwrap(); .unwrap();
// Verify both tables exist // Verify both tables exist
#[allow(deprecated)]
let table_names = db.table_names(TableNamesRequest::default()).await.unwrap(); let table_names = db.table_names(TableNamesRequest::default()).await.unwrap();
assert!(table_names.contains(&"source_table".to_string())); assert!(table_names.contains(&"source_table".to_string()));
assert!(table_names.contains(&"cloned_table".to_string())); assert!(table_names.contains(&"cloned_table".to_string()));
@@ -1733,6 +1812,7 @@ mod tests {
mode: CreateTableMode::Create, mode: CreateTableMode::Create,
write_options: Default::default(), write_options: Default::default(),
location: None, location: None,
namespace_client: None,
}) })
.await .await
.unwrap(); .unwrap();
@@ -1786,6 +1866,7 @@ mod tests {
mode: CreateTableMode::Create, mode: CreateTableMode::Create,
write_options, write_options,
location: None, location: None,
namespace_client: None,
}) })
.await .await
.unwrap(); .unwrap();
@@ -1859,6 +1940,7 @@ mod tests {
mode: CreateTableMode::Create, mode: CreateTableMode::Create,
write_options, write_options,
location: None, location: None,
namespace_client: None,
}) })
.await .await
.unwrap(); .unwrap();

View File

@@ -10,8 +10,10 @@ use async_trait::async_trait;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider}; use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider};
use lance_namespace::{ use lance_namespace::{
models::{ models::{
CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest, CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DropNamespaceRequest, DropTableRequest, ListNamespacesRequest, ListTablesRequest, DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
}, },
LanceNamespace, LanceNamespace,
}; };
@@ -22,11 +24,8 @@ use crate::database::ReadConsistency;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use super::{ use super::{
listing::ListingDatabase, BaseTable, CloneTableRequest, listing::ListingDatabase, BaseTable, CloneTableRequest, CreateTableMode,
CreateNamespaceRequest as DbCreateNamespaceRequest, CreateTableMode, CreateTableRequest as DbCreateTableRequest, Database, OpenTableRequest, TableNamesRequest,
CreateTableRequest as DbCreateTableRequest, Database,
DropNamespaceRequest as DbDropNamespaceRequest,
ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest,
}; };
/// A database implementation that uses lance-namespace for table management /// A database implementation that uses lance-namespace for table management
@@ -154,68 +153,29 @@ impl Database for LanceNamespaceDatabase {
} }
} }
async fn list_namespaces(&self, request: DbListNamespacesRequest) -> Result<Vec<String>> { async fn list_namespaces(
let ns_request = ListNamespacesRequest { &self,
id: if request.namespace.is_empty() { request: ListNamespacesRequest,
None ) -> Result<ListNamespacesResponse> {
} else { Ok(self.namespace.list_namespaces(request).await?)
Some(request.namespace)
},
page_token: request.page_token,
limit: request.limit.map(|l| l as i32),
};
let response = self
.namespace
.list_namespaces(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to list namespaces: {}", e),
})?;
Ok(response.namespaces)
} }
async fn create_namespace(&self, request: DbCreateNamespaceRequest) -> Result<()> { async fn create_namespace(
let ns_request = CreateNamespaceRequest { &self,
id: if request.namespace.is_empty() { request: CreateNamespaceRequest,
None ) -> Result<CreateNamespaceResponse> {
} else { Ok(self.namespace.create_namespace(request).await?)
Some(request.namespace)
},
mode: None,
properties: None,
};
self.namespace
.create_namespace(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to create namespace: {}", e),
})?;
Ok(())
} }
async fn drop_namespace(&self, request: DbDropNamespaceRequest) -> Result<()> { async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
let ns_request = DropNamespaceRequest { Ok(self.namespace.drop_namespace(request).await?)
id: if request.namespace.is_empty() { }
None
} else {
Some(request.namespace)
},
mode: None,
behavior: None,
};
self.namespace async fn describe_namespace(
.drop_namespace(ns_request) &self,
.await request: DescribeNamespaceRequest,
.map_err(|e| Error::Runtime { ) -> Result<DescribeNamespaceResponse> {
message: format!("Failed to drop namespace: {}", e), Ok(self.namespace.describe_namespace(request).await?)
})?;
Ok(())
} }
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> { async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
@@ -229,17 +189,15 @@ impl Database for LanceNamespaceDatabase {
limit: request.limit.map(|l| l as i32), limit: request.limit.map(|l| l as i32),
}; };
let response = let response = self.namespace.list_tables(ns_request).await?;
self.namespace
.list_tables(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to list tables: {}", e),
})?;
Ok(response.tables) Ok(response.tables)
} }
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
Ok(self.namespace.list_tables(request).await?)
}
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> { async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// Extract user-provided storage options from request // Extract user-provided storage options from request
let user_storage_options = request let user_storage_options = request
@@ -451,6 +409,7 @@ impl Database for LanceNamespaceDatabase {
Ok(()) Ok(())
} }
#[allow(deprecated)]
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
let tables = self let tables = self
.table_names(TableNamesRequest { .table_names(TableNamesRequest {
@@ -477,7 +436,6 @@ impl Database for LanceNamespaceDatabase {
mod tests { mod tests {
use super::*; use super::*;
use crate::connect_namespace; use crate::connect_namespace;
use crate::database::CreateNamespaceRequest;
use crate::query::ExecutableQuery; use crate::query::ExecutableQuery;
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray}; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field, Schema}; use arrow_schema::{DataType, Field, Schema};
@@ -590,7 +548,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");
@@ -649,7 +609,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");
@@ -711,7 +673,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");
@@ -793,7 +757,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");
@@ -847,7 +813,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");
@@ -926,7 +894,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");
@@ -958,7 +928,9 @@ mod tests {
// Create a child namespace first // Create a child namespace first
conn.create_namespace(CreateNamespaceRequest { conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()], id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
}) })
.await .await
.expect("Failed to create namespace"); .expect("Failed to create namespace");

View File

@@ -10,13 +10,17 @@ use http::StatusCode;
use lance_io::object_store::StorageOptions; use lance_io::object_store::StorageOptions;
use moka::future::Cache; use moka::future::Cache;
use reqwest::header::CONTENT_TYPE; use reqwest::header::CONTENT_TYPE;
use serde::Deserialize;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use crate::database::{ use crate::database::{
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode, CloneTableRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, DatabaseOptions, OpenTableRequest, ReadConsistency, TableNamesRequest,
OpenTableRequest, ReadConsistency, TableNamesRequest,
}; };
use crate::error::Result; use crate::error::Result;
use crate::table::BaseTable; use crate::table::BaseTable;
@@ -180,11 +184,6 @@ impl RemoteDatabaseOptionsBuilder {
} }
} }
#[derive(Deserialize)]
struct ListTablesResponse {
tables: Vec<String>,
}
#[derive(Debug)] #[derive(Debug)]
pub struct RemoteDatabase<S: HttpSend = Sender> { pub struct RemoteDatabase<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>, client: RestfulLanceDbClient<S>,
@@ -337,7 +336,6 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
self.client self.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id)) .get(&format!("/v1/namespace/{}/table/list", namespace_id))
} else { } else {
// TODO: use new API for all listing operations once stable
self.client.get("/v1/table/") self.client.get("/v1/table/")
}; };
@@ -371,6 +369,44 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(tables) Ok(tables)
} }
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id));
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(ref page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]);
}
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let response: ListTablesResponse = rsp.json().await.err_to_http(request_id)?;
// Cache the tables for future use
let namespace_vec = namespace_parts.to_vec();
for table in &response.tables {
let table_identifier =
build_table_identifier(table, &namespace_vec, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &namespace_vec);
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
namespace_vec.clone(),
table_identifier.clone(),
version.clone(),
));
self.table_cache.insert(cache_key, remote_table).await;
}
Ok(response)
}
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> { async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let data = match request.data { let data = match request.data {
CreateTableData::Data(data) => data, CreateTableData::Data(data) => data,
@@ -591,53 +627,101 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}) })
} }
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> { async fn list_namespaces(
let namespace_id = &self,
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self let mut req = self
.client .client
.get(&format!("/v1/namespace/{}/list", namespace_id)); .get(&format!("/v1/namespace/{}/list", namespace_id));
if let Some(limit) = request.limit { if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]); req = req.query(&[("limit", limit)]);
} }
if let Some(page_token) = request.page_token { if let Some(ref page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]); req = req.query(&[("page_token", page_token)]);
} }
let (request_id, resp) = self.client.send(req).await?; let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?; let resp = self.client.check_response(&request_id, resp).await?;
#[derive(Deserialize)] resp.json().await.err_to_http(request_id)
struct ListNamespacesResponse {
namespaces: Vec<String>,
}
let parsed: ListNamespacesResponse = resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse namespace response: {}", e),
})?;
Ok(parsed.namespaces)
} }
async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> { async fn create_namespace(
let namespace_id = &self,
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); request: CreateNamespaceRequest,
let req = self ) -> Result<CreateNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client .client
.post(&format!("/v1/namespace/{}/create", namespace_id)); .post(&format!("/v1/namespace/{}/create", namespace_id));
// Build request body with mode and properties if present
#[derive(serde::Serialize)]
struct CreateNamespaceRequestBody {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
properties: Option<HashMap<String, String>>,
}
let body = CreateNamespaceRequestBody {
mode: request.mode.as_ref().map(|m| format!("{:?}", m)),
properties: request.properties,
};
req = req.json(&body);
let (request_id, resp) = self.client.send(req).await?; let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?; let resp = self.client.check_response(&request_id, resp).await?;
Ok(())
resp.json().await.err_to_http(request_id)
} }
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> { async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
let namespace_id = let namespace_parts = request.id.as_deref().unwrap_or(&[]);
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let req = self let mut req = self
.client .client
.post(&format!("/v1/namespace/{}/drop", namespace_id)); .post(&format!("/v1/namespace/{}/drop", namespace_id));
// Build request body with mode and behavior if present
#[derive(serde::Serialize)]
struct DropNamespaceRequestBody {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
behavior: Option<String>,
}
let body = DropNamespaceRequestBody {
mode: request.mode.as_ref().map(|m| format!("{:?}", m)),
behavior: request.behavior.as_ref().map(|b| format!("{:?}", b)),
};
req = req.json(&body);
let (request_id, resp) = self.client.send(req).await?; let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?; let resp = self.client.check_response(&request_id, resp).await?;
Ok(())
resp.json().await.err_to_http(request_id)
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let req = self
.client
.get(&format!("/v1/namespace/{}/describe", namespace_id));
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
} }
fn as_any(&self) -> &dyn std::any::Any { fn as_any(&self) -> &dyn std::any::Any {