Compare commits

...

3 Commits

Author SHA1 Message Date
Jack Ye
b0170ea86a fix: table_names error at root namespace (#2842)
Root namepace should be passed in as an empty vector, not None.
2025-12-02 23:53:29 -08:00
Jack Ye
d1efc6ad8a refactor!: use namespace models directly for namespace operations (#2806)
1. Use generated models in lance-namespace for request response models
to avoid multiple layers of conversions
2. Make sure the API is consistent with the namespace spec
3. Deprecate the table_names API in favor of the list_tables API in
namespace that allows full pagination support without the need to have
sorted table names
4. Add describe_namespace API which was a miss in the original
implementation
2025-12-02 22:41:04 -08:00
Jack Ye
9d638cb3c7 feat: support namespace server side query (#2811)
Currently a table in a namespace is still backed with a `NativeTable`,
which means after getting the location of the table and optional storage
options override from `namespace.describe_table`, all things work like a
normal local table. However, namespace also supports `query_table`,
which is exactly the same API as remote table. This PR adds a
`server_side_query` capability, when enabled, it runs the query by
calling `namespace.query_table`. For namespace that implements the
operation (e.g. REST namespace), this could hit a backend server that
could execute the query faster (e.g. using a distributed engine).
2025-12-02 21:04:12 -08:00
16 changed files with 1779 additions and 285 deletions

1
Cargo.lock generated
View File

@@ -5071,6 +5071,7 @@ dependencies = [
"futures",
"lance-core",
"lance-io",
"lance-namespace",
"lancedb",
"pin-project",
"pyo3",

View File

@@ -18,6 +18,7 @@ arrow = { version = "56.2", features = ["pyarrow"] }
async-trait = "0.1"
lancedb = { path = "../rust/lancedb", default-features = false }
lance-core.workspace = true
lance-namespace.workspace = true
lance-io.workspace = true
env_logger.workspace = true
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] }

View File

@@ -5,6 +5,13 @@ import pyarrow as pa
from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
from .io import StorageOptionsProvider
from lance_namespace import (
ListNamespacesResponse,
CreateNamespaceResponse,
DropNamespaceResponse,
DescribeNamespaceResponse,
ListTablesResponse,
)
from .remote import ClientConfig
class Session:
@@ -26,18 +33,38 @@ class Connection(object):
async def close(self): ...
async def list_namespaces(
self,
namespace: Optional[List[str]],
page_token: Optional[str],
limit: Optional[int],
) -> List[str]: ...
async def create_namespace(self, namespace: List[str]) -> None: ...
async def drop_namespace(self, namespace: List[str]) -> None: ...
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListNamespacesResponse: ...
async def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse: ...
async def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse: ...
async def describe_namespace(
self,
namespace: List[str],
) -> DescribeNamespaceResponse: ...
async def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse: ...
async def table_names(
self,
namespace: Optional[List[str]],
start_after: Optional[str],
limit: Optional[int],
) -> list[str]: ...
) -> list[str]: ... # Deprecated: Use list_tables instead
async def create_table(
self,
name: str,

View File

@@ -22,6 +22,13 @@ from lancedb.embeddings.registry import EmbeddingFunctionRegistry
from lancedb.common import data_to_reader, sanitize_uri, validate_schema
from lancedb.background_loop import LOOP
from lance_namespace import (
ListNamespacesResponse,
CreateNamespaceResponse,
DropNamespaceResponse,
DescribeNamespaceResponse,
ListTablesResponse,
)
from . import __version__
from ._lancedb import connect as lancedb_connect # type: ignore
@@ -48,6 +55,12 @@ if TYPE_CHECKING:
from .io import StorageOptionsProvider
from ._lancedb import Session
from .namespace_utils import (
_normalize_create_namespace_mode,
_normalize_drop_namespace_mode,
_normalize_drop_namespace_behavior,
)
class DBConnection(EnforceOverrides):
"""An active LanceDB connection interface."""
@@ -56,8 +69,8 @@ class DBConnection(EnforceOverrides):
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace.
Parameters
@@ -66,43 +79,119 @@ class DBConnection(EnforceOverrides):
The parent namespace to list namespaces in.
Empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
Iterable of str
List of immediate child namespace names
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
return []
return ListNamespacesResponse(namespaces=[], page_token=None)
def create_namespace(self, namespace: List[str]) -> None:
def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
raise NotImplementedError(
"Namespace operations are not supported for this connection type"
)
def drop_namespace(self, namespace: List[str]) -> None:
def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
raise NotImplementedError(
"Namespace operations are not supported for this connection type"
)
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
raise NotImplementedError(
"Namespace operations are not supported for this connection type"
)
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
raise NotImplementedError(
"list_tables is not supported for this connection type"
)
@abstractmethod
def table_names(
self,
@@ -557,8 +646,8 @@ class LanceDBConnection(DBConnection):
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace.
Parameters
@@ -567,14 +656,15 @@ class LanceDBConnection(DBConnection):
The parent namespace to list namespaces in.
None or empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
Iterable of str
List of immediate child namespace names
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
@@ -585,26 +675,111 @@ class LanceDBConnection(DBConnection):
)
@override
def create_namespace(self, namespace: List[str]) -> None:
def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
LOOP.run(self._conn.create_namespace(namespace=namespace))
return LOOP.run(
self._conn.create_namespace(
namespace=namespace, mode=mode, properties=properties
)
)
@override
def drop_namespace(self, namespace: List[str]) -> None:
def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
return LOOP.run(self._conn.drop_namespace(namespace=namespace))
return LOOP.run(
self._conn.drop_namespace(namespace=namespace, mode=mode, behavior=behavior)
)
@override
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
return LOOP.run(self._conn.describe_namespace(namespace=namespace))
@override
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
return LOOP.run(
self._conn.list_tables(
namespace=namespace, page_token=page_token, limit=limit
)
)
@override
def table_names(
@@ -616,6 +791,9 @@ class LanceDBConnection(DBConnection):
) -> Iterable[str]:
"""Get the names of all tables in the database. The names are sorted.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters
----------
namespace: List[str], optional
@@ -630,6 +808,13 @@ class LanceDBConnection(DBConnection):
Iterator of str.
A list of table names.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None:
namespace = []
return LOOP.run(
@@ -944,8 +1129,8 @@ class AsyncConnection(object):
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace.
Parameters
@@ -955,39 +1140,128 @@ class AsyncConnection(object):
None or empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
limit: int, optional
The maximum number of results to return.
Returns
-------
Iterable of str
List of immediate child namespace names (not full paths)
ListNamespacesResponse
Response containing namespace names and optional pagination token
"""
if namespace is None:
namespace = []
return await self._inner.list_namespaces(
result = await self._inner.list_namespaces(
namespace=namespace, page_token=page_token, limit=limit
)
return ListNamespacesResponse(**result)
async def create_namespace(self, namespace: List[str]) -> None:
async def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
"""
await self._inner.create_namespace(namespace)
mode: str, optional
Creation mode - "create", "exist_ok", or "overwrite". Case insensitive.
properties: Dict[str, str], optional
Properties to associate with the namespace
async def drop_namespace(self, namespace: List[str]) -> None:
Returns
-------
CreateNamespaceResponse
Response containing namespace properties
"""
result = await self._inner.create_namespace(
namespace,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
return CreateNamespaceResponse(**result)
async def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
await self._inner.drop_namespace(namespace)
result = await self._inner.drop_namespace(
namespace,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
return DropNamespaceResponse(**result)
async def describe_namespace(
self, namespace: List[str]
) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
result = await self._inner.describe_namespace(namespace)
return DescribeNamespaceResponse(**result)
async def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
result = await self._inner.list_tables(
namespace=namespace, page_token=page_token, limit=limit
)
return ListTablesResponse(**result)
async def table_names(
self,
@@ -998,6 +1272,9 @@ class AsyncConnection(object):
) -> Iterable[str]:
"""List all tables in this database, in sorted order
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters
----------
namespace: List[str], optional
@@ -1016,6 +1293,13 @@ class AsyncConnection(object):
-------
Iterable of str
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None:
namespace = []
return await self._inner.table_names(

View File

@@ -23,7 +23,29 @@ from datetime import timedelta
import pyarrow as pa
from lancedb.db import DBConnection, LanceDBConnection
from lancedb.namespace_utils import (
_normalize_create_namespace_mode,
_normalize_drop_namespace_mode,
_normalize_drop_namespace_behavior,
)
from lancedb.io import StorageOptionsProvider
from lance_namespace import (
LanceNamespace,
connect as namespace_connect,
CreateNamespaceResponse,
DescribeNamespaceResponse,
DropNamespaceResponse,
ListNamespacesResponse,
ListTablesResponse,
ListTablesRequest,
DescribeTableRequest,
DescribeNamespaceRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
CreateEmptyTableRequest,
)
from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name
from lancedb.common import DATA
@@ -31,19 +53,9 @@ from lancedb.pydantic import LanceModel
from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
from lance_namespace import LanceNamespace, connect as namespace_connect
from lance_namespace_urllib3_client.models import (
ListTablesRequest,
DescribeTableRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
CreateEmptyTableRequest,
JsonArrowSchema,
JsonArrowField,
JsonArrowDataType,
)
from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema
from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField
from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType
def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType:
@@ -241,6 +253,19 @@ class LanceNamespaceDBConnection(DBConnection):
*,
namespace: Optional[List[str]] = None,
) -> Iterable[str]:
"""
List table names in the database.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
@@ -433,8 +458,8 @@ class LanceNamespaceDBConnection(DBConnection):
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""
List child namespaces under the given namespace.
@@ -444,14 +469,15 @@ class LanceNamespaceDBConnection(DBConnection):
The parent namespace to list children from.
If None, lists root-level namespaces.
page_token : Optional[str]
Pagination token for listing results.
limit : int
Token for pagination. Use the token from a previous response
to get the next page of results.
limit : int, optional
Maximum number of namespaces to return.
Returns
-------
Iterable[str]
Names of child namespaces.
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
@@ -459,10 +485,18 @@ class LanceNamespaceDBConnection(DBConnection):
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else []
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
)
@override
def create_namespace(self, namespace: List[str]) -> None:
def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""
Create a new namespace.
@@ -470,12 +504,34 @@ class LanceNamespaceDBConnection(DBConnection):
----------
namespace : List[str]
The namespace path to create.
mode : str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties : Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
request = CreateNamespaceRequest(id=namespace)
self._ns.create_namespace(request)
request = CreateNamespaceRequest(
id=namespace,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@override
def drop_namespace(self, namespace: List[str]) -> None:
def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""
Drop a namespace.
@@ -483,9 +539,87 @@ class LanceNamespaceDBConnection(DBConnection):
----------
namespace : List[str]
The namespace path to drop.
mode : str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior : str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
request = DropNamespaceRequest(id=namespace)
self._ns.drop_namespace(request)
request = DropNamespaceRequest(
id=namespace,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
),
transaction_id=(
response.transaction_id if hasattr(response, "transaction_id") else None
),
)
@override
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""
Describe a namespace.
Parameters
----------
namespace : List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@override
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""
List all tables in this database with pagination support.
Parameters
----------
namespace : List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token : str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit : int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
)
def _lance_table_from_uri(
self,
@@ -563,7 +697,19 @@ class AsyncLanceNamespaceDBConnection:
*,
namespace: Optional[List[str]] = None,
) -> Iterable[str]:
"""List table names in the namespace."""
"""
List table names in the namespace.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
@@ -771,8 +917,8 @@ class AsyncLanceNamespaceDBConnection:
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""
List child namespaces under the given namespace.
@@ -782,14 +928,15 @@ class AsyncLanceNamespaceDBConnection:
The parent namespace to list children from.
If None, lists root-level namespaces.
page_token : Optional[str]
Pagination token for listing results.
limit : int
Token for pagination. Use the token from a previous response
to get the next page of results.
limit : int, optional
Maximum number of namespaces to return.
Returns
-------
Iterable[str]
Names of child namespaces.
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
@@ -797,9 +944,17 @@ class AsyncLanceNamespaceDBConnection:
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else []
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
)
async def create_namespace(self, namespace: List[str]) -> None:
async def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""
Create a new namespace.
@@ -807,11 +962,33 @@ class AsyncLanceNamespaceDBConnection:
----------
namespace : List[str]
The namespace path to create.
"""
request = CreateNamespaceRequest(id=namespace)
self._ns.create_namespace(request)
mode : str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties : Dict[str, str], optional
Properties to set on the namespace.
async def drop_namespace(self, namespace: List[str]) -> None:
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
request = CreateNamespaceRequest(
id=namespace,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
async def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""
Drop a namespace.
@@ -819,9 +996,87 @@ class AsyncLanceNamespaceDBConnection:
----------
namespace : List[str]
The namespace path to drop.
mode : str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior : str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
request = DropNamespaceRequest(id=namespace)
self._ns.drop_namespace(request)
request = DropNamespaceRequest(
id=namespace,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
),
transaction_id=(
response.transaction_id if hasattr(response, "transaction_id") else None
),
)
async def describe_namespace(
self, namespace: List[str]
) -> DescribeNamespaceResponse:
"""
Describe a namespace.
Parameters
----------
namespace : List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
async def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""
List all tables in this database with pagination support.
Parameters
----------
namespace : List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token : str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit : int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
)
def connect_namespace(

View File

@@ -0,0 +1,27 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Utility functions for namespace operations."""
from typing import Optional
def _normalize_create_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize create namespace mode to lowercase (API expects lowercase)."""
if mode is None:
return None
return mode.lower()
def _normalize_drop_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize drop namespace mode to uppercase (API expects uppercase)."""
if mode is None:
return None
return mode.upper()
def _normalize_drop_namespace_behavior(behavior: Optional[str]) -> Optional[str]:
"""Normalize drop namespace behavior to uppercase (API expects uppercase)."""
if behavior is None:
return None
return behavior.upper()

View File

@@ -23,6 +23,13 @@ import pyarrow as pa
from ..common import DATA
from ..db import DBConnection, LOOP
from ..embeddings import EmbeddingFunctionConfig
from lance_namespace import (
CreateNamespaceResponse,
DescribeNamespaceResponse,
DropNamespaceResponse,
ListNamespacesResponse,
ListTablesResponse,
)
from ..pydantic import LanceModel
from ..table import Table
from ..util import validate_table_name
@@ -106,8 +113,8 @@ class RemoteDBConnection(DBConnection):
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace.
Parameters
@@ -116,14 +123,15 @@ class RemoteDBConnection(DBConnection):
The parent namespace to list namespaces in.
None or empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
Iterable of str
List of immediate child namespace names
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
@@ -134,26 +142,111 @@ class RemoteDBConnection(DBConnection):
)
@override
def create_namespace(self, namespace: List[str]) -> None:
def create_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
LOOP.run(self._conn.create_namespace(namespace=namespace))
return LOOP.run(
self._conn.create_namespace(
namespace=namespace, mode=mode, properties=properties
)
)
@override
def drop_namespace(self, namespace: List[str]) -> None:
def drop_namespace(
self,
namespace: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
return LOOP.run(self._conn.drop_namespace(namespace=namespace))
return LOOP.run(
self._conn.drop_namespace(namespace=namespace, mode=mode, behavior=behavior)
)
@override
def describe_namespace(self, namespace: List[str]) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
return LOOP.run(self._conn.describe_namespace(namespace=namespace))
@override
def list_tables(
self,
namespace: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace is None:
namespace = []
return LOOP.run(
self._conn.list_tables(
namespace=namespace, page_token=page_token, limit=limit
)
)
@override
def table_names(
@@ -165,6 +258,9 @@ class RemoteDBConnection(DBConnection):
) -> Iterable[str]:
"""List the names of all tables in the database.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters
----------
namespace: List[str], default []
@@ -179,6 +275,13 @@ class RemoteDBConnection(DBConnection):
-------
An iterator of table names.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace is None:
namespace = []
return LOOP.run(

View File

@@ -892,7 +892,7 @@ def test_local_namespace_operations(tmp_path):
db = lancedb.connect(tmp_path)
# Test list_namespaces returns empty list for root namespace
namespaces = list(db.list_namespaces())
namespaces = db.list_namespaces().namespaces
assert namespaces == []
# Test list_namespaces with non-empty namespace raises NotImplementedError
@@ -900,7 +900,7 @@ def test_local_namespace_operations(tmp_path):
NotImplementedError,
match="Namespace operations are not supported for listing database",
):
list(db.list_namespaces(namespace=["test"]))
db.list_namespaces(namespace=["test"])
def test_local_create_namespace_not_supported(tmp_path):

View File

@@ -279,13 +279,13 @@ class TestNamespaceConnection:
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Initially no namespaces
assert len(list(db.list_namespaces())) == 0
assert len(db.list_namespaces().namespaces) == 0
# Create a namespace
db.create_namespace(["test_namespace"])
# Verify namespace exists
namespaces = list(db.list_namespaces())
namespaces = db.list_namespaces().namespaces
assert "test_namespace" in namespaces
assert len(namespaces) == 1
@@ -322,7 +322,7 @@ class TestNamespaceConnection:
db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists
namespaces = list(db.list_namespaces())
namespaces = db.list_namespaces().namespaces
assert len(namespaces) == 0
def test_namespace_with_tables_cannot_be_dropped(self):
@@ -570,13 +570,13 @@ class TestAsyncNamespaceConnection:
# Initially no namespaces
namespaces = await db.list_namespaces()
assert len(list(namespaces)) == 0
assert len(namespaces.namespaces) == 0
# Create a namespace
await db.create_namespace(["test_namespace"])
# Verify namespace exists
namespaces = list(await db.list_namespaces())
namespaces = (await db.list_namespaces()).namespaces
assert "test_namespace" in namespaces
assert len(namespaces) == 1
@@ -608,7 +608,7 @@ class TestAsyncNamespaceConnection:
await db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists
namespaces = list(await db.list_namespaces())
namespaces = (await db.list_namespaces()).namespaces
assert len(namespaces) == 0
async def test_drop_all_tables_async(self):

View File

@@ -10,8 +10,9 @@ use lancedb::{
};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult,
Python,
pyclass, pyfunction, pymethods,
types::{PyDict, PyDictMethods},
Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
@@ -292,40 +293,155 @@ impl Connection {
limit: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
use lancedb::database::ListNamespacesRequest;
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::ListNamespacesRequest;
let request = ListNamespacesRequest {
namespace,
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
page_token,
limit,
limit: limit.map(|l| l as i32),
};
inner.list_namespaces(request).await.infer_error()
let response = inner.list_namespaces(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("namespaces", response.namespaces)?;
dict.set_item("page_token", response.page_token)?;
Ok(dict.unbind())
})
})
}
#[pyo3(signature = (namespace,))]
#[pyo3(signature = (namespace, mode=None, properties=None))]
pub fn create_namespace(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
mode: Option<String>,
properties: Option<std::collections::HashMap<String, String>>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
use lancedb::database::CreateNamespaceRequest;
let request = CreateNamespaceRequest { namespace };
inner.create_namespace(request).await.infer_error()
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::{create_namespace_request, CreateNamespaceRequest};
let mode_enum = mode.and_then(|m| match m.to_lowercase().as_str() {
"create" => Some(create_namespace_request::Mode::Create),
"exist_ok" => Some(create_namespace_request::Mode::ExistOk),
"overwrite" => Some(create_namespace_request::Mode::Overwrite),
_ => None,
});
let request = CreateNamespaceRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
mode: mode_enum,
properties,
};
let response = inner.create_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("properties", response.properties)?;
Ok(dict.unbind())
})
})
}
#[pyo3(signature = (namespace, mode=None, behavior=None))]
pub fn drop_namespace(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
mode: Option<String>,
behavior: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::{drop_namespace_request, DropNamespaceRequest};
let mode_enum = mode.and_then(|m| match m.to_uppercase().as_str() {
"SKIP" => Some(drop_namespace_request::Mode::Skip),
"FAIL" => Some(drop_namespace_request::Mode::Fail),
_ => None,
});
let behavior_enum = behavior.and_then(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Some(drop_namespace_request::Behavior::Restrict),
"CASCADE" => Some(drop_namespace_request::Behavior::Cascade),
_ => None,
});
let request = DropNamespaceRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
mode: mode_enum,
behavior: behavior_enum,
};
let response = inner.drop_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("properties", response.properties)?;
dict.set_item("transaction_id", response.transaction_id)?;
Ok(dict.unbind())
})
})
}
#[pyo3(signature = (namespace,))]
pub fn drop_namespace(
pub fn describe_namespace(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
use lancedb::database::DropNamespaceRequest;
let request = DropNamespaceRequest { namespace };
inner.drop_namespace(request).await.infer_error()
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::DescribeNamespaceRequest;
let request = DescribeNamespaceRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
};
let response = inner.describe_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("properties", response.properties)?;
Ok(dict.unbind())
})
})
}
#[pyo3(signature = (namespace=vec![], page_token=None, limit=None))]
pub fn list_tables(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
page_token: Option<String>,
limit: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::ListTablesRequest;
let request = ListTablesRequest {
id: if namespace.is_empty() {
None
} else {
Some(namespace)
},
page_token,
limit: limit.map(|l| l as i32),
};
let response = inner.list_tables(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
let dict = PyDict::new(py);
dict.set_item("tables", response.tables)?;
dict.set_item("page_token", response.page_token)?;
Ok(dict.unbind())
})
})
}
}

View File

@@ -9,6 +9,11 @@ use std::sync::Arc;
use arrow_array::RecordBatchReader;
use arrow_schema::{Field, SchemaRef};
use lance::dataset::ReadParams;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
#[cfg(feature = "aws")]
use object_store::aws::AwsCredential;
@@ -17,9 +22,8 @@ use crate::database::listing::{
ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::database::{
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode,
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest,
OpenTableRequest, ReadConsistency, TableNamesRequest,
CloneTableRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, OpenTableRequest, ReadConsistency, TableNamesRequest,
};
use crate::embeddings::{
EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings,
@@ -74,6 +78,7 @@ impl TableNamesBuilder {
}
/// Execute the table names operation
#[allow(deprecated)]
pub async fn execute(self) -> Result<Vec<String>> {
self.parent.clone().table_names(self.request).await
}
@@ -408,6 +413,7 @@ impl OpenTableBuilder {
index_cache_size: None,
lance_read_params: None,
location: None,
namespace_client: None,
},
embedding_registry,
}
@@ -767,20 +773,42 @@ impl Connection {
}
/// List immediate child namespace names in the given namespace
pub async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> {
pub async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
self.internal.list_namespaces(request).await
}
/// Create a new namespace
pub async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> {
pub async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
self.internal.create_namespace(request).await
}
/// Drop a namespace
pub async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> {
pub async fn drop_namespace(
&self,
request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
self.internal.drop_namespace(request).await
}
/// Describe a namespace
pub async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
self.internal.describe_namespace(request).await
}
/// List tables with pagination support
pub async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
self.internal.list_tables(request).await
}
/// Get the in-memory embedding registry.
/// It's important to note that the embedding registry is not persisted across connections.
/// So if a table contains embeddings, you will need to make sure that you are using a connection that has the same embedding functions registered
@@ -1086,6 +1114,7 @@ pub struct ConnectNamespaceBuilder {
read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
session: Option<Arc<lance::session::Session>>,
server_side_query_enabled: bool,
}
impl ConnectNamespaceBuilder {
@@ -1097,6 +1126,7 @@ impl ConnectNamespaceBuilder {
read_consistency_interval: None,
embedding_registry: None,
session: None,
server_side_query_enabled: false,
}
}
@@ -1151,6 +1181,18 @@ impl ConnectNamespaceBuilder {
self
}
/// Enable server-side query execution.
///
/// When enabled, queries will be executed on the namespace server instead of
/// locally. This can improve performance by reducing data transfer and
/// leveraging server-side compute resources.
///
/// Default is `false` (queries executed locally).
pub fn server_side_query(mut self, enabled: bool) -> Self {
self.server_side_query_enabled = enabled;
self
}
/// Execute the connection
pub async fn execute(self) -> Result<Connection> {
use crate::database::namespace::LanceNamespaceDatabase;
@@ -1162,6 +1204,7 @@ impl ConnectNamespaceBuilder {
self.storage_options,
self.read_consistency_interval,
self.session,
self.server_side_query_enabled,
)
.await?,
);

View File

@@ -24,6 +24,12 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance::dataset::ReadParams;
use lance_datafusion::utils::StreamingWriteSource;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use lance_namespace::LanceNamespace;
use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt};
use crate::error::Result;
@@ -36,32 +42,7 @@ pub trait DatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
}
/// A request to list namespaces in the database
#[derive(Clone, Debug, Default)]
pub struct ListNamespacesRequest {
/// The parent namespace to list namespaces in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// If present, only return names that come lexicographically after the supplied value.
pub page_token: Option<String>,
/// The maximum number of namespace names to return
pub limit: Option<u32>,
}
/// A request to create a namespace
#[derive(Clone, Debug)]
pub struct CreateNamespaceRequest {
/// The namespace identifier to create
pub namespace: Vec<String>,
}
/// A request to drop a namespace
#[derive(Clone, Debug)]
pub struct DropNamespaceRequest {
/// The namespace identifier to drop
pub namespace: Vec<String>,
}
/// A request to list names of tables in the database
/// A request to list names of tables in the database (deprecated, use ListTablesRequest)
#[derive(Clone, Debug, Default)]
pub struct TableNamesRequest {
/// The namespace to list tables in. Empty list represents root namespace.
@@ -77,7 +58,7 @@ pub struct TableNamesRequest {
}
/// A request to open a table
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct OpenTableRequest {
pub name: String,
/// The namespace to open the table from. Empty list represents root namespace.
@@ -87,6 +68,22 @@ pub struct OpenTableRequest {
/// Optional custom location for the table. If not provided, the database will
/// derive a location based on its URI and the table name.
pub location: Option<String>,
/// Optional namespace client for server-side query execution.
/// When set, queries will be executed on the namespace server instead of locally.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
}
impl std::fmt::Debug for OpenTableRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenTableRequest")
.field("name", &self.name)
.field("namespace", &self.namespace)
.field("index_cache_size", &self.index_cache_size)
.field("lance_read_params", &self.lance_read_params)
.field("location", &self.location)
.field("namespace_client", &self.namespace_client)
.finish()
}
}
pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableRequest) -> OpenTableRequest + Send>;
@@ -170,6 +167,9 @@ pub struct CreateTableRequest {
/// Optional custom location for the table. If not provided, the database will
/// derive a location based on its URI and the table name.
pub location: Option<String>,
/// Optional namespace client for server-side query execution.
/// When set, queries will be executed on the namespace server instead of locally.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
}
impl CreateTableRequest {
@@ -181,6 +181,7 @@ impl CreateTableRequest {
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
location: None,
namespace_client: None,
}
}
}
@@ -247,13 +248,30 @@ pub trait Database:
/// Get the read consistency of the database
async fn read_consistency(&self) -> Result<ReadConsistency>;
/// List immediate child namespace names in the given namespace
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>>;
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse>;
/// Create a new namespace
async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()>;
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse>;
/// Drop a namespace
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()>;
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse>;
/// Describe a namespace (get its properties)
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse>;
/// List the names of tables in the database
///
/// # Deprecated
/// Use `list_tables` instead for pagination support
#[deprecated(note = "Use list_tables instead")]
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>;
/// List tables in the database with pagination support
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse>;
/// Create a table in the database
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Clone a table in the database.

View File

@@ -24,10 +24,15 @@ use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::NativeTable;
use crate::utils::validate_table_name;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use super::{
BaseTable, CloneTableRequest, CreateNamespaceRequest, CreateTableMode, CreateTableRequest,
Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
TableNamesRequest,
BaseTable, CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, TableNamesRequest,
};
/// File extension to indicate a lance table
@@ -641,6 +646,7 @@ impl ListingDatabase {
index_cache_size: None,
lance_read_params: None,
location: None,
namespace_client: None,
};
let req = (callback)(req);
let table = self.open_table(req).await?;
@@ -662,14 +668,20 @@ impl ListingDatabase {
#[async_trait::async_trait]
impl Database for ListingDatabase {
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> {
if !request.namespace.is_empty() {
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
});
}
Ok(Vec::new())
Ok(ListNamespacesResponse {
namespaces: Vec::new(),
page_token: None,
})
}
fn uri(&self) -> &str {
@@ -688,13 +700,28 @@ impl Database for ListingDatabase {
}
}
async fn create_namespace(&self, _request: CreateNamespaceRequest) -> Result<()> {
async fn create_namespace(
&self,
_request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
}
async fn drop_namespace(&self, _request: DropNamespaceRequest) -> Result<()> {
async fn drop_namespace(
&self,
_request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
}
async fn describe_namespace(
&self,
_request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
@@ -735,6 +762,57 @@ impl Database for ListingDatabase {
Ok(f)
}
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
}
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter(|path| {
let is_lance = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
// Handle pagination with page_token
if let Some(ref page_token) = request.page_token {
let index = f
.iter()
.position(|name| name.as_str() > page_token.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
// Determine if there's a next page
let next_page_token = if let Some(limit) = request.limit {
if f.len() > limit as usize {
let token = f[limit as usize].clone();
f.truncate(limit as usize);
Some(token)
} else {
None
}
} else {
None
};
Ok(ListTablesResponse {
tables: f,
page_token: next_page_token,
})
}
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided
if !request.namespace.is_empty() && request.location.is_none() {
@@ -768,6 +846,7 @@ impl Database for ListingDatabase {
self.store_wrapper.clone(),
Some(write_params),
self.read_consistency_interval,
request.namespace_client,
)
.await
{
@@ -839,6 +918,7 @@ impl Database for ListingDatabase {
self.store_wrapper.clone(),
None,
self.read_consistency_interval,
None,
)
.await?;
@@ -910,6 +990,7 @@ impl Database for ListingDatabase {
self.store_wrapper.clone(),
Some(read_params),
self.read_consistency_interval,
request.namespace_client,
)
.await?,
);
@@ -947,6 +1028,7 @@ impl Database for ListingDatabase {
self.drop_tables(vec![name.to_string()]).await
}
#[allow(deprecated)]
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
// Check if namespace parameter is provided
if !namespace.is_empty() {
@@ -1011,6 +1093,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1032,6 +1115,7 @@ mod tests {
.unwrap();
// Verify both tables exist
#[allow(deprecated)]
let table_names = db.table_names(TableNamesRequest::default()).await.unwrap();
assert!(table_names.contains(&"source_table".to_string()));
assert!(table_names.contains(&"cloned_table".to_string()));
@@ -1075,6 +1159,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1133,6 +1218,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1168,6 +1254,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1207,6 +1294,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1246,6 +1334,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1300,6 +1389,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1357,6 +1447,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1442,6 +1533,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1528,6 +1620,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1621,6 +1714,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1718,6 +1812,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1771,6 +1866,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options,
location: None,
namespace_client: None,
})
.await
.unwrap();
@@ -1844,6 +1940,7 @@ mod tests {
mode: CreateTableMode::Create,
write_options,
location: None,
namespace_client: None,
})
.await
.unwrap();

View File

@@ -10,8 +10,10 @@ use async_trait::async_trait;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider};
use lance_namespace::{
models::{
CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest,
DropNamespaceRequest, DropTableRequest, ListNamespacesRequest, ListTablesRequest,
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
},
LanceNamespace,
};
@@ -22,11 +24,8 @@ use crate::database::ReadConsistency;
use crate::error::{Error, Result};
use super::{
listing::ListingDatabase, BaseTable, CloneTableRequest,
CreateNamespaceRequest as DbCreateNamespaceRequest, CreateTableMode,
CreateTableRequest as DbCreateTableRequest, Database,
DropNamespaceRequest as DbDropNamespaceRequest,
ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest,
listing::ListingDatabase, BaseTable, CloneTableRequest, CreateTableMode,
CreateTableRequest as DbCreateTableRequest, Database, OpenTableRequest, TableNamesRequest,
};
/// A database implementation that uses lance-namespace for table management
@@ -40,6 +39,8 @@ pub struct LanceNamespaceDatabase {
session: Option<Arc<lance::session::Session>>,
// database URI
uri: String,
// Whether to enable server-side query execution
server_side_query_enabled: bool,
}
impl LanceNamespaceDatabase {
@@ -49,6 +50,7 @@ impl LanceNamespaceDatabase {
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
server_side_query_enabled: bool,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -67,6 +69,7 @@ impl LanceNamespaceDatabase {
read_consistency_interval,
session,
uri: format!("namespace://{}", ns_impl),
server_side_query_enabled,
})
}
}
@@ -76,6 +79,7 @@ impl std::fmt::Debug for LanceNamespaceDatabase {
f.debug_struct("LanceNamespaceDatabase")
.field("storage_options", &self.storage_options)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("server_side_query_enabled", &self.server_side_query_enabled)
.finish()
}
}
@@ -149,92 +153,47 @@ impl Database for LanceNamespaceDatabase {
}
}
async fn list_namespaces(&self, request: DbListNamespacesRequest) -> Result<Vec<String>> {
let ns_request = ListNamespacesRequest {
id: if request.namespace.is_empty() {
None
} else {
Some(request.namespace)
},
page_token: request.page_token,
limit: request.limit.map(|l| l as i32),
};
let response = self
.namespace
.list_namespaces(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to list namespaces: {}", e),
})?;
Ok(response.namespaces)
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
Ok(self.namespace.list_namespaces(request).await?)
}
async fn create_namespace(&self, request: DbCreateNamespaceRequest) -> Result<()> {
let ns_request = CreateNamespaceRequest {
id: if request.namespace.is_empty() {
None
} else {
Some(request.namespace)
},
mode: None,
properties: None,
};
self.namespace
.create_namespace(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to create namespace: {}", e),
})?;
Ok(())
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
Ok(self.namespace.create_namespace(request).await?)
}
async fn drop_namespace(&self, request: DbDropNamespaceRequest) -> Result<()> {
let ns_request = DropNamespaceRequest {
id: if request.namespace.is_empty() {
None
} else {
Some(request.namespace)
},
mode: None,
behavior: None,
};
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
Ok(self.namespace.drop_namespace(request).await?)
}
self.namespace
.drop_namespace(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to drop namespace: {}", e),
})?;
Ok(())
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
Ok(self.namespace.describe_namespace(request).await?)
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let ns_request = ListTablesRequest {
id: if request.namespace.is_empty() {
None
} else {
Some(request.namespace)
},
id: Some(request.namespace),
page_token: request.start_after,
limit: request.limit.map(|l| l as i32),
};
let response =
self.namespace
.list_tables(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to list tables: {}", e),
})?;
let response = self.namespace.list_tables(ns_request).await?;
Ok(response.tables)
}
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
Ok(self.namespace.list_tables(request).await?)
}
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// Extract user-provided storage options from request
let user_storage_options = request
@@ -290,6 +249,10 @@ impl Database for LanceNamespaceDatabase {
)
.await?;
let namespace_client = self
.server_side_query_enabled
.then(|| self.namespace.clone());
return listing_db
.open_table(OpenTableRequest {
name: request.name.clone(),
@@ -297,6 +260,7 @@ impl Database for LanceNamespaceDatabase {
index_cache_size: None,
lance_read_params: None,
location: Some(location),
namespace_client,
})
.await;
}
@@ -333,12 +297,16 @@ impl Database for LanceNamespaceDatabase {
let listing_db = self
.create_listing_database(
&location,
table_id,
table_id.clone(),
user_storage_options,
create_empty_response.storage_options.as_ref(),
)
.await?;
let namespace_client = self
.server_side_query_enabled
.then(|| self.namespace.clone());
let create_request = DbCreateTableRequest {
name: request.name,
namespace: request.namespace,
@@ -346,7 +314,9 @@ impl Database for LanceNamespaceDatabase {
mode: request.mode,
write_options: request.write_options,
location: Some(location),
namespace_client,
};
listing_db.create_table(create_request).await
}
@@ -380,19 +350,25 @@ impl Database for LanceNamespaceDatabase {
let listing_db = self
.create_listing_database(
&location,
table_id,
table_id.clone(),
user_storage_options,
response.storage_options.as_ref(),
)
.await?;
let namespace_client = self
.server_side_query_enabled
.then(|| self.namespace.clone());
let open_request = OpenTableRequest {
name: request.name.clone(),
namespace: request.namespace.clone(),
index_cache_size: request.index_cache_size,
lance_read_params: request.lance_read_params,
location: Some(location),
namespace_client,
};
listing_db.open_table(open_request).await
}
@@ -429,6 +405,7 @@ impl Database for LanceNamespaceDatabase {
Ok(())
}
#[allow(deprecated)]
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
let tables = self
.table_names(TableNamesRequest {
@@ -455,7 +432,6 @@ impl Database for LanceNamespaceDatabase {
mod tests {
use super::*;
use crate::connect_namespace;
use crate::database::CreateNamespaceRequest;
use crate::query::ExecutableQuery;
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field, Schema};
@@ -568,7 +544,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -627,7 +605,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -689,7 +669,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -771,7 +753,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -825,7 +809,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -904,7 +890,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -936,7 +924,9 @@ mod tests {
// Create a child namespace first
conn.create_namespace(CreateNamespaceRequest {
namespace: vec!["test_ns".into()],
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
})
.await
.expect("Failed to create namespace");
@@ -977,4 +967,46 @@ mod tests {
let open_result = conn.open_table("drop_test").execute().await;
assert!(open_result.is_err());
}
#[tokio::test]
async fn test_table_names_at_root() {
// Test that table_names at root (empty namespace) works correctly
// This is a regression test for a bug where empty namespace was converted to None
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.execute()
.await
.expect("Failed to connect to namespace");
// Create multiple tables at root namespace
let test_data1 = create_test_data();
let _table1 = conn
.create_table("table1", test_data1)
.execute()
.await
.expect("Failed to create table1 at root");
let test_data2 = create_test_data();
let _table2 = conn
.create_table("table2", test_data2)
.execute()
.await
.expect("Failed to create table2 at root");
// List tables at root using table_names (empty namespace means root)
let table_names = conn
.table_names()
.execute()
.await
.expect("Failed to list tables at root");
assert!(table_names.contains(&"table1".to_string()));
assert!(table_names.contains(&"table2".to_string()));
assert_eq!(table_names.len(), 2);
}
}

View File

@@ -10,13 +10,17 @@ use http::StatusCode;
use lance_io::object_store::StorageOptions;
use moka::future::Cache;
use reqwest::header::CONTENT_TYPE;
use serde::Deserialize;
use tokio::task::spawn_blocking;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use crate::database::{
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode,
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest,
OpenTableRequest, ReadConsistency, TableNamesRequest,
CloneTableRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, OpenTableRequest, ReadConsistency, TableNamesRequest,
};
use crate::error::Result;
use crate::table::BaseTable;
@@ -180,11 +184,6 @@ impl RemoteDatabaseOptionsBuilder {
}
}
#[derive(Deserialize)]
struct ListTablesResponse {
tables: Vec<String>,
}
#[derive(Debug)]
pub struct RemoteDatabase<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>,
@@ -337,7 +336,6 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
self.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id))
} else {
// TODO: use new API for all listing operations once stable
self.client.get("/v1/table/")
};
@@ -371,6 +369,44 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(tables)
}
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id));
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(ref page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]);
}
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let response: ListTablesResponse = rsp.json().await.err_to_http(request_id)?;
// Cache the tables for future use
let namespace_vec = namespace_parts.to_vec();
for table in &response.tables {
let table_identifier =
build_table_identifier(table, &namespace_vec, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &namespace_vec);
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
namespace_vec.clone(),
table_identifier.clone(),
version.clone(),
));
self.table_cache.insert(cache_key, remote_table).await;
}
Ok(response)
}
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let data = match request.data {
CreateTableData::Data(data) => data,
@@ -417,6 +453,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
index_cache_size: None,
lance_read_params: None,
location: None,
namespace_client: None,
};
let req = (callback)(req);
self.open_table(req).await
@@ -590,53 +627,101 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
})
}
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> {
let namespace_id =
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter);
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.get(&format!("/v1/namespace/{}/list", namespace_id));
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(page_token) = request.page_token {
if let Some(ref page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]);
}
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
#[derive(Deserialize)]
struct ListNamespacesResponse {
namespaces: Vec<String>,
}
let parsed: ListNamespacesResponse = resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse namespace response: {}", e),
})?;
Ok(parsed.namespaces)
resp.json().await.err_to_http(request_id)
}
async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> {
let namespace_id =
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter);
let req = self
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.post(&format!("/v1/namespace/{}/create", namespace_id));
// Build request body with mode and properties if present
#[derive(serde::Serialize)]
struct CreateNamespaceRequestBody {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
properties: Option<HashMap<String, String>>,
}
let body = CreateNamespaceRequestBody {
mode: request.mode.as_ref().map(|m| format!("{:?}", m)),
properties: request.properties,
};
req = req.json(&body);
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
Ok(())
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> {
let namespace_id =
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter);
let req = self
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.post(&format!("/v1/namespace/{}/drop", namespace_id));
// Build request body with mode and behavior if present
#[derive(serde::Serialize)]
struct DropNamespaceRequestBody {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
behavior: Option<String>,
}
let body = DropNamespaceRequestBody {
mode: request.mode.as_ref().map(|m| format!("{:?}", m)),
behavior: request.behavior.as_ref().map(|b| format!("{:?}", b)),
};
req = req.json(&body);
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
Ok(())
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let req = self
.client
.get(&format!("/v1/namespace/{}/describe", namespace_id));
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
fn as_any(&self) -> &dyn std::any::Any {

View File

@@ -40,6 +40,11 @@ use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_namespace::models::{
QueryTableRequest as NsQueryTableRequest, QueryTableRequestFullTextQuery,
QueryTableRequestVector, StringFtsQuery,
};
use lance_namespace::LanceNamespace;
use lance_table::format::Manifest;
use lance_table::io::commit::ManifestNamingScheme;
use log::info;
@@ -1480,7 +1485,7 @@ impl NativeTableExt for Arc<dyn BaseTable> {
}
/// A table in a LanceDB database.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct NativeTable {
name: String,
namespace: Vec<String>,
@@ -1490,6 +1495,22 @@ pub struct NativeTable {
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
// Optional namespace client for server-side query execution.
// When set, queries will be executed on the namespace server instead of locally.
namespace_client: Option<Arc<dyn LanceNamespace>>,
}
impl std::fmt::Debug for NativeTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NativeTable")
.field("name", &self.name)
.field("namespace", &self.namespace)
.field("id", &self.id)
.field("uri", &self.uri)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("namespace_client", &self.namespace_client)
.finish()
}
}
impl std::fmt::Display for NativeTable {
@@ -1524,7 +1545,7 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, vec![], None, None, None).await
Self::open_with_params(uri, &name, vec![], None, None, None, None).await
}
/// Opens an existing Table
@@ -1534,10 +1555,12 @@ impl NativeTable {
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `params` The [ReadParams] to use when opening the table
/// * `namespace_client` - Optional namespace client for server-side query execution
///
/// # Returns
///
/// * A [NativeTable] object.
#[allow(clippy::too_many_arguments)]
pub async fn open_with_params(
uri: &str,
name: &str,
@@ -1545,6 +1568,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
@@ -1575,9 +1599,18 @@ impl NativeTable {
uri: uri.to_string(),
dataset,
read_consistency_interval,
namespace_client,
})
}
/// Set the namespace client for server-side query execution.
///
/// When set, queries will be executed on the namespace server instead of locally.
pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
self.namespace_client = Some(namespace_client);
self
}
fn get_table_name(uri: &str) -> Result<String> {
let path = Path::new(uri);
let name = path
@@ -1614,10 +1647,12 @@ impl NativeTable {
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
/// * `batches` RecordBatch to be saved in the database.
/// * `params` - Write parameters.
/// * `namespace_client` - Optional namespace client for server-side query execution
///
/// # Returns
///
/// * A [TableImpl] object.
#[allow(clippy::too_many_arguments)]
pub async fn create(
uri: &str,
name: &str,
@@ -1626,6 +1661,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1657,9 +1693,11 @@ impl NativeTable {
uri: uri.to_string(),
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn create_empty(
uri: &str,
name: &str,
@@ -1668,6 +1706,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
) -> Result<Self> {
let batches = RecordBatchIterator::new(vec![], schema);
Self::create(
@@ -1678,6 +1717,7 @@ impl NativeTable {
write_store_wrapper,
params,
read_consistency_interval,
namespace_client,
)
.await
}
@@ -2035,6 +2075,278 @@ impl NativeTable {
Ok(DatasetRecordBatchStream::new(inner))
}
/// Execute a query on the namespace server instead of locally.
async fn namespace_query(
&self,
namespace_client: Arc<dyn LanceNamespace>,
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// Build table_id from namespace + table name
let mut table_id = self.namespace.clone();
table_id.push(self.name.clone());
// Convert AnyQuery to namespace QueryTableRequest
let mut ns_request = self.convert_to_namespace_query(query)?;
// Set the table ID on the request
ns_request.id = Some(table_id);
// Call the namespace query_table API
let response_bytes = namespace_client
.query_table(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to execute server-side query: {}", e),
})?;
// Parse the Arrow IPC response into a RecordBatchStream
self.parse_arrow_ipc_response(response_bytes).await
}
/// Convert a QueryFilter to a SQL string for the namespace API.
fn filter_to_sql(&self, filter: &QueryFilter) -> Result<String> {
match filter {
QueryFilter::Sql(sql) => Ok(sql.clone()),
QueryFilter::Substrait(_) => Err(Error::NotSupported {
message: "Substrait filters are not supported for server-side queries".to_string(),
}),
QueryFilter::Datafusion(_) => Err(Error::NotSupported {
message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(),
}),
}
}
/// Convert an AnyQuery to the namespace QueryTableRequest format.
fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result<NsQueryTableRequest> {
match query {
AnyQuery::VectorQuery(vq) => {
// Extract the query vector(s)
let vector = self.extract_query_vector(&vq.query_vector)?;
// Convert filter to SQL string
let filter = match &vq.base.filter {
Some(f) => Some(self.filter_to_sql(f)?),
None => None,
};
// Convert select to columns list
let columns = match &vq.base.select {
Select::All => None,
Select::Columns(cols) => Some(cols.clone()),
Select::Dynamic(_) => {
return Err(Error::NotSupported {
message:
"Dynamic column selection is not supported for server-side queries"
.to_string(),
});
}
};
// Check for unsupported features
if vq.base.reranker.is_some() {
return Err(Error::NotSupported {
message: "Reranker is not supported for server-side queries".to_string(),
});
}
// Convert FTS query if present
let full_text_query = vq.base.full_text_search.as_ref().map(|fts| {
let columns = fts.columns();
let columns_vec = if columns.is_empty() {
None
} else {
Some(columns.into_iter().collect())
};
Box::new(QueryTableRequestFullTextQuery {
string_query: Some(Box::new(StringFtsQuery {
query: fts.query.to_string(),
columns: columns_vec,
})),
structured_query: None,
})
});
Ok(NsQueryTableRequest {
id: None, // Will be set in namespace_query
k: vq.base.limit.unwrap_or(10) as i32,
vector: Box::new(vector),
vector_column: vq.column.clone(),
filter,
columns,
offset: vq.base.offset.map(|o| o as i32),
distance_type: vq.distance_type.map(|dt| dt.to_string()),
nprobes: Some(vq.minimum_nprobes as i32),
ef: vq.ef.map(|e| e as i32),
refine_factor: vq.refine_factor.map(|r| r as i32),
lower_bound: vq.lower_bound,
upper_bound: vq.upper_bound,
prefilter: Some(vq.base.prefilter),
fast_search: Some(vq.base.fast_search),
with_row_id: Some(vq.base.with_row_id),
bypass_vector_index: Some(!vq.use_index),
full_text_query,
version: None,
})
}
AnyQuery::Query(q) => {
// For non-vector queries, pass an empty vector (similar to remote table implementation)
if q.reranker.is_some() {
return Err(Error::NotSupported {
message: "Reranker is not supported for server-side query execution"
.to_string(),
});
}
let filter = q
.filter
.as_ref()
.map(|f| self.filter_to_sql(f))
.transpose()?;
let columns = match &q.select {
Select::All => None,
Select::Columns(cols) => Some(cols.clone()),
Select::Dynamic(_) => {
return Err(Error::NotSupported {
message: "Dynamic columns are not supported for server-side query"
.to_string(),
});
}
};
// Handle full text search if present
let full_text_query = q.full_text_search.as_ref().map(|fts| {
let columns_vec = if fts.columns().is_empty() {
None
} else {
Some(fts.columns().iter().cloned().collect())
};
Box::new(QueryTableRequestFullTextQuery {
string_query: Some(Box::new(StringFtsQuery {
query: fts.query.to_string(),
columns: columns_vec,
})),
structured_query: None,
})
});
// Empty vector for non-vector queries
let vector = Box::new(QueryTableRequestVector {
single_vector: Some(vec![]),
multi_vector: None,
});
Ok(NsQueryTableRequest {
id: None, // Will be set by caller
vector,
k: q.limit.unwrap_or(10) as i32,
filter,
columns,
prefilter: Some(q.prefilter),
offset: q.offset.map(|o| o as i32),
ef: None,
refine_factor: None,
distance_type: None,
nprobes: None,
vector_column: None, // No vector column for plain queries
with_row_id: Some(q.with_row_id),
bypass_vector_index: Some(true), // No vector index for plain queries
full_text_query,
version: None,
fast_search: None,
lower_bound: None,
upper_bound: None,
})
}
}
}
/// Extract query vector(s) from Arrow arrays into the namespace format.
fn extract_query_vector(
&self,
query_vectors: &[Arc<dyn arrow_array::Array>],
) -> Result<QueryTableRequestVector> {
if query_vectors.is_empty() {
return Err(Error::InvalidInput {
message: "Query vector is required for vector search".to_string(),
});
}
// Handle single vector case
if query_vectors.len() == 1 {
let arr = &query_vectors[0];
let single_vector = self.array_to_f32_vec(arr)?;
Ok(QueryTableRequestVector {
single_vector: Some(single_vector),
multi_vector: None,
})
} else {
// Handle multi-vector case
let multi_vector: Result<Vec<Vec<f32>>> = query_vectors
.iter()
.map(|arr| self.array_to_f32_vec(arr))
.collect();
Ok(QueryTableRequestVector {
single_vector: None,
multi_vector: Some(multi_vector?),
})
}
}
/// Convert an Arrow array to a Vec<f32>.
fn array_to_f32_vec(&self, arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
// Handle FixedSizeList (common for vectors)
if let Some(fsl) = arr
.as_any()
.downcast_ref::<arrow_array::FixedSizeListArray>()
{
let values = fsl.values();
if let Some(f32_arr) = values.as_any().downcast_ref::<arrow_array::Float32Array>() {
return Ok(f32_arr.values().to_vec());
}
}
// Handle direct Float32Array
if let Some(f32_arr) = arr.as_any().downcast_ref::<arrow_array::Float32Array>() {
return Ok(f32_arr.values().to_vec());
}
Err(Error::InvalidInput {
message: "Query vector must be Float32 type".to_string(),
})
}
/// Parse Arrow IPC response from the namespace server.
async fn parse_arrow_ipc_response(
&self,
bytes: bytes::Bytes,
) -> Result<DatasetRecordBatchStream> {
use arrow_ipc::reader::StreamReader;
use std::io::Cursor;
let cursor = Cursor::new(bytes);
let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
message: format!("Failed to parse Arrow IPC response: {}", e),
})?;
// Collect all record batches
let schema = reader.schema();
let batches: Vec<_> = reader
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::Runtime {
message: format!("Failed to read Arrow IPC batches: {}", e),
})?;
// Create a stream from the batches
let stream = futures::stream::iter(batches.into_iter().map(Ok));
let record_batch_stream = Box::pin(
datafusion_physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream),
);
Ok(DatasetRecordBatchStream::new(record_batch_stream))
}
/// Check whether the table uses V2 manifest paths.
///
/// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for
@@ -2466,6 +2778,12 @@ impl BaseTable for NativeTable {
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// If namespace client is configured, use server-side query execution
if let Some(ref namespace_client) = self.namespace_client {
return self
.namespace_query(namespace_client.clone(), query, options)
.await;
}
self.generic_query(query, options).await
}
@@ -2934,7 +3252,7 @@ mod tests {
let batches = make_test_batches();
let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
let table = NativeTable::create(uri, "test", vec![], batches, None, None, None)
let table = NativeTable::create(uri, "test", vec![], batches, None, None, None, None)
.await
.unwrap();
@@ -4574,4 +4892,91 @@ mod tests {
assert_eq!(result.len(), 1);
assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
}
#[tokio::test]
async fn test_convert_to_namespace_query_vector() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test_ns_query.lance");
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let table = NativeTable::open(dataset_path.to_str().unwrap())
.await
.unwrap();
// Create a vector query
let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let vq = VectorQueryRequest {
base: QueryRequest {
limit: Some(10),
offset: Some(5),
filter: Some(QueryFilter::Sql("id > 0".to_string())),
select: Select::Columns(vec!["id".to_string()]),
..Default::default()
},
column: Some("vector".to_string()),
query_vector: vec![query_vector as Arc<dyn Array>],
minimum_nprobes: 20,
distance_type: Some(crate::DistanceType::L2),
..Default::default()
};
let any_query = AnyQuery::VectorQuery(vq);
let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
assert_eq!(ns_request.k, 10);
assert_eq!(ns_request.offset, Some(5));
assert_eq!(ns_request.filter, Some("id > 0".to_string()));
assert_eq!(ns_request.columns, Some(vec!["id".to_string()]));
assert_eq!(ns_request.vector_column, Some("vector".to_string()));
assert_eq!(ns_request.distance_type, Some("l2".to_string()));
assert!(ns_request.vector.single_vector.is_some());
assert_eq!(
ns_request.vector.single_vector.as_ref().unwrap(),
&vec![1.0, 2.0, 3.0, 4.0]
);
}
#[tokio::test]
async fn test_convert_to_namespace_query_plain_query() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test_ns_plain.lance");
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let table = NativeTable::open(dataset_path.to_str().unwrap())
.await
.unwrap();
// Create a plain (non-vector) query with filter and select
let q = QueryRequest {
limit: Some(20),
offset: Some(5),
filter: Some(QueryFilter::Sql("id > 5".to_string())),
select: Select::Columns(vec!["id".to_string()]),
with_row_id: true,
..Default::default()
};
let any_query = AnyQuery::Query(q);
let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
// Plain queries should pass an empty vector
assert_eq!(ns_request.k, 20);
assert_eq!(ns_request.offset, Some(5));
assert_eq!(ns_request.filter, Some("id > 5".to_string()));
assert_eq!(ns_request.columns, Some(vec!["id".to_string()]));
assert_eq!(ns_request.with_row_id, Some(true));
assert_eq!(ns_request.bypass_vector_index, Some(true));
assert!(ns_request.vector_column.is_none()); // No vector column for plain queries
// Should have an empty vector
assert!(ns_request.vector.single_vector.as_ref().unwrap().is_empty());
}
}