mirror of
https://github.com/lancedb/lancedb.git
synced 2026-03-26 18:40:42 +00:00
Compare commits
4 Commits
sophon/ind
...
python-v0.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f31561c5bb | ||
|
|
e0c5ceac03 | ||
|
|
e93bb3355a | ||
|
|
b75991eb07 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -4855,16 +4855,20 @@ version = "0.30.0-beta.3"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"lance-core",
|
||||
"lance-io",
|
||||
"lance-namespace",
|
||||
"lance-namespace-impls",
|
||||
"lancedb",
|
||||
"pin-project",
|
||||
"pyo3",
|
||||
"pyo3-async-runtimes",
|
||||
"pyo3-build-config",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -52,14 +52,21 @@ plugins:
|
||||
options:
|
||||
docstring_style: numpy
|
||||
heading_level: 3
|
||||
show_source: true
|
||||
show_symbol_type_in_heading: true
|
||||
show_signature_annotations: true
|
||||
show_root_heading: true
|
||||
show_docstring_examples: true
|
||||
show_docstring_attributes: false
|
||||
show_docstring_other_parameters: true
|
||||
show_symbol_type_heading: true
|
||||
show_labels: false
|
||||
show_if_no_docstring: true
|
||||
show_source: false
|
||||
members_order: source
|
||||
docstring_section_style: list
|
||||
signature_crossrefs: true
|
||||
separate_signature: true
|
||||
filters:
|
||||
- "!^_"
|
||||
import:
|
||||
# for cross references
|
||||
- https://arrow.apache.org/docs/objects.inv
|
||||
@@ -113,7 +120,7 @@ markdown_extensions:
|
||||
emoji_index: !!python/name:material.extensions.emoji.twemoji
|
||||
emoji_generator: !!python/name:material.extensions.emoji.to_svg
|
||||
- markdown.extensions.toc:
|
||||
toc_depth: 3
|
||||
toc_depth: 4
|
||||
permalink: true
|
||||
permalink_title: Anchor link to this section
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.30.0-beta.3"
|
||||
current_version = "0.30.0-beta.4"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.30.0-beta.3"
|
||||
version = "0.30.0-beta.4"
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
license.workspace = true
|
||||
@@ -16,9 +16,11 @@ crate-type = ["cdylib"]
|
||||
[dependencies]
|
||||
arrow = { version = "57.2", features = ["pyarrow"] }
|
||||
async-trait = "0.1"
|
||||
bytes = "1"
|
||||
lancedb = { path = "../rust/lancedb", default-features = false }
|
||||
lance-core.workspace = true
|
||||
lance-namespace.workspace = true
|
||||
lance-namespace-impls.workspace = true
|
||||
lance-io.workspace = true
|
||||
env_logger.workspace = true
|
||||
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
|
||||
@@ -28,6 +30,8 @@ pyo3-async-runtimes = { version = "0.26", features = [
|
||||
] }
|
||||
pin-project = "1.1.5"
|
||||
futures.workspace = true
|
||||
serde = "1"
|
||||
serde_json = "1"
|
||||
snafu.workspace = true
|
||||
tokio = { version = "1.40", features = ["sync"] }
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ repository = "https://github.com/lancedb/lancedb"
|
||||
|
||||
[project.optional-dependencies]
|
||||
pylance = [
|
||||
"pylance>=1.0.0b14",
|
||||
"pylance>=4.0.0b7",
|
||||
]
|
||||
tests = [
|
||||
"aiohttp",
|
||||
@@ -59,9 +59,9 @@ tests = [
|
||||
"polars>=0.19, <=1.3.0",
|
||||
"tantivy",
|
||||
"pyarrow-stubs",
|
||||
"pylance>=1.0.0b14,<3.0.0",
|
||||
"pylance>=4.0.0b7",
|
||||
"requests",
|
||||
"datafusion<52",
|
||||
"datafusion>=52,<53",
|
||||
]
|
||||
dev = [
|
||||
"ruff",
|
||||
|
||||
@@ -8,7 +8,7 @@ from abc import abstractmethod
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
|
||||
|
||||
if sys.version_info >= (3, 12):
|
||||
from typing import override
|
||||
@@ -1541,6 +1541,8 @@ class AsyncConnection(object):
|
||||
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
location: Optional[str] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
) -> AsyncTable:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -1573,6 +1575,9 @@ class AsyncConnection(object):
|
||||
The explicit location (URI) of the table. If provided, the table will be
|
||||
opened from this location instead of deriving it from the database URI
|
||||
and table name.
|
||||
managed_versioning: bool, optional
|
||||
Whether managed versioning is enabled for this table. If provided,
|
||||
avoids a redundant describe_table call when namespace_client is set.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -1587,6 +1592,8 @@ class AsyncConnection(object):
|
||||
storage_options_provider=storage_options_provider,
|
||||
index_cache_size=index_cache_size,
|
||||
location=location,
|
||||
namespace_client=namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
return AsyncTable(table)
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from typing import Dict, Iterable, List, Optional, Union
|
||||
from typing import Any, Dict, Iterable, List, Optional, Union
|
||||
|
||||
if sys.version_info >= (3, 12):
|
||||
from typing import override
|
||||
@@ -240,7 +240,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
session : Optional[Session]
|
||||
A session to use for this connection
|
||||
"""
|
||||
self._ns = namespace
|
||||
self._namespace_client = namespace
|
||||
self.read_consistency_interval = read_consistency_interval
|
||||
self.storage_options = storage_options or {}
|
||||
self.session = session
|
||||
@@ -269,7 +269,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
if namespace is None:
|
||||
namespace = []
|
||||
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
|
||||
response = self._ns.list_tables(request)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return response.tables if response.tables else []
|
||||
|
||||
@override
|
||||
@@ -309,7 +309,9 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
# Try to describe the table first to see if it exists
|
||||
try:
|
||||
describe_request = DescribeTableRequest(id=table_id)
|
||||
describe_response = self._ns.describe_table(describe_request)
|
||||
describe_response = self._namespace_client.describe_table(
|
||||
describe_request
|
||||
)
|
||||
location = describe_response.location
|
||||
namespace_storage_options = describe_response.storage_options
|
||||
except Exception:
|
||||
@@ -323,7 +325,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
location=None,
|
||||
properties=self.storage_options if self.storage_options else None,
|
||||
)
|
||||
declare_response = self._ns.declare_table(declare_request)
|
||||
declare_response = self._namespace_client.declare_table(declare_request)
|
||||
|
||||
if not declare_response.location:
|
||||
raise ValueError(
|
||||
@@ -353,7 +355,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
# Only create if namespace returned storage_options (not None)
|
||||
if storage_options_provider is None and namespace_storage_options is not None:
|
||||
storage_options_provider = LanceNamespaceStorageOptionsProvider(
|
||||
namespace=self._ns,
|
||||
namespace=self._namespace_client,
|
||||
table_id=table_id,
|
||||
)
|
||||
|
||||
@@ -371,6 +373,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
storage_options=merged_storage_options,
|
||||
storage_options_provider=storage_options_provider,
|
||||
location=location,
|
||||
namespace_client=self._namespace_client,
|
||||
)
|
||||
|
||||
return tbl
|
||||
@@ -389,7 +392,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace = []
|
||||
table_id = namespace + [name]
|
||||
request = DescribeTableRequest(id=table_id)
|
||||
response = self._ns.describe_table(request)
|
||||
response = self._namespace_client.describe_table(request)
|
||||
|
||||
# Merge storage options: self.storage_options < user options < namespace options
|
||||
merged_storage_options = dict(self.storage_options)
|
||||
@@ -402,10 +405,14 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
# Only create if namespace returned storage_options (not None)
|
||||
if storage_options_provider is None and response.storage_options is not None:
|
||||
storage_options_provider = LanceNamespaceStorageOptionsProvider(
|
||||
namespace=self._ns,
|
||||
namespace=self._namespace_client,
|
||||
table_id=table_id,
|
||||
)
|
||||
|
||||
# Pass managed_versioning to avoid redundant describe_table call in Rust.
|
||||
# Convert None to False since we already have the answer from describe_table.
|
||||
managed_versioning = response.managed_versioning is True
|
||||
|
||||
return self._lance_table_from_uri(
|
||||
name,
|
||||
response.location,
|
||||
@@ -413,6 +420,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
storage_options=merged_storage_options,
|
||||
storage_options_provider=storage_options_provider,
|
||||
index_cache_size=index_cache_size,
|
||||
namespace_client=self._namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -422,7 +431,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace = []
|
||||
table_id = namespace + [name]
|
||||
request = DropTableRequest(id=table_id)
|
||||
self._ns.drop_table(request)
|
||||
self._namespace_client.drop_table(request)
|
||||
|
||||
@override
|
||||
def rename_table(
|
||||
@@ -484,7 +493,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
request = ListNamespacesRequest(
|
||||
id=namespace, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._ns.list_namespaces(request)
|
||||
response = self._namespace_client.list_namespaces(request)
|
||||
return ListNamespacesResponse(
|
||||
namespaces=response.namespaces if response.namespaces else [],
|
||||
page_token=response.page_token,
|
||||
@@ -520,7 +529,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
mode=_normalize_create_namespace_mode(mode),
|
||||
properties=properties,
|
||||
)
|
||||
response = self._ns.create_namespace(request)
|
||||
response = self._namespace_client.create_namespace(request)
|
||||
return CreateNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
@@ -555,7 +564,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
mode=_normalize_drop_namespace_mode(mode),
|
||||
behavior=_normalize_drop_namespace_behavior(behavior),
|
||||
)
|
||||
response = self._ns.drop_namespace(request)
|
||||
response = self._namespace_client.drop_namespace(request)
|
||||
return DropNamespaceResponse(
|
||||
properties=(
|
||||
response.properties if hasattr(response, "properties") else None
|
||||
@@ -581,7 +590,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
Response containing the namespace properties.
|
||||
"""
|
||||
request = DescribeNamespaceRequest(id=namespace)
|
||||
response = self._ns.describe_namespace(request)
|
||||
response = self._namespace_client.describe_namespace(request)
|
||||
return DescribeNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
@@ -615,7 +624,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
if namespace is None:
|
||||
namespace = []
|
||||
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
|
||||
response = self._ns.list_tables(request)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return ListTablesResponse(
|
||||
tables=response.tables if response.tables else [],
|
||||
page_token=response.page_token,
|
||||
@@ -630,6 +639,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
) -> LanceTable:
|
||||
# Open a table directly from a URI using the location parameter
|
||||
# Note: storage_options should already be merged by the caller
|
||||
@@ -643,6 +654,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
# Open the table using the temporary connection with the location parameter
|
||||
# Pass namespace_client to enable managed versioning support
|
||||
# Pass managed_versioning to avoid redundant describe_table call
|
||||
return LanceTable.open(
|
||||
temp_conn,
|
||||
name,
|
||||
@@ -651,6 +664,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
storage_options_provider=storage_options_provider,
|
||||
index_cache_size=index_cache_size,
|
||||
location=table_uri,
|
||||
namespace_client=namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
|
||||
|
||||
@@ -685,7 +700,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
session : Optional[Session]
|
||||
A session to use for this connection
|
||||
"""
|
||||
self._ns = namespace
|
||||
self._namespace_client = namespace
|
||||
self.read_consistency_interval = read_consistency_interval
|
||||
self.storage_options = storage_options or {}
|
||||
self.session = session
|
||||
@@ -713,7 +728,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
if namespace is None:
|
||||
namespace = []
|
||||
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
|
||||
response = self._ns.list_tables(request)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return response.tables if response.tables else []
|
||||
|
||||
async def create_table(
|
||||
@@ -750,7 +765,9 @@ class AsyncLanceNamespaceDBConnection:
|
||||
# Try to describe the table first to see if it exists
|
||||
try:
|
||||
describe_request = DescribeTableRequest(id=table_id)
|
||||
describe_response = self._ns.describe_table(describe_request)
|
||||
describe_response = self._namespace_client.describe_table(
|
||||
describe_request
|
||||
)
|
||||
location = describe_response.location
|
||||
namespace_storage_options = describe_response.storage_options
|
||||
except Exception:
|
||||
@@ -764,7 +781,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
location=None,
|
||||
properties=self.storage_options if self.storage_options else None,
|
||||
)
|
||||
declare_response = self._ns.declare_table(declare_request)
|
||||
declare_response = self._namespace_client.declare_table(declare_request)
|
||||
|
||||
if not declare_response.location:
|
||||
raise ValueError(
|
||||
@@ -797,7 +814,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
and namespace_storage_options is not None
|
||||
):
|
||||
provider = LanceNamespaceStorageOptionsProvider(
|
||||
namespace=self._ns,
|
||||
namespace=self._namespace_client,
|
||||
table_id=table_id,
|
||||
)
|
||||
else:
|
||||
@@ -817,6 +834,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
storage_options=merged_storage_options,
|
||||
storage_options_provider=provider,
|
||||
location=location,
|
||||
namespace_client=self._namespace_client,
|
||||
)
|
||||
|
||||
lance_table = await asyncio.to_thread(_create_table)
|
||||
@@ -837,7 +855,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
namespace = []
|
||||
table_id = namespace + [name]
|
||||
request = DescribeTableRequest(id=table_id)
|
||||
response = self._ns.describe_table(request)
|
||||
response = self._namespace_client.describe_table(request)
|
||||
|
||||
# Merge storage options: self.storage_options < user options < namespace options
|
||||
merged_storage_options = dict(self.storage_options)
|
||||
@@ -849,10 +867,14 @@ class AsyncLanceNamespaceDBConnection:
|
||||
# Create a storage options provider if not provided by user
|
||||
if storage_options_provider is None and response.storage_options is not None:
|
||||
storage_options_provider = LanceNamespaceStorageOptionsProvider(
|
||||
namespace=self._ns,
|
||||
namespace=self._namespace_client,
|
||||
table_id=table_id,
|
||||
)
|
||||
|
||||
# Capture managed_versioning from describe response.
|
||||
# Convert None to False since we already have the answer from describe_table.
|
||||
managed_versioning = response.managed_versioning is True
|
||||
|
||||
# Open table in a thread
|
||||
def _open_table():
|
||||
temp_conn = LanceDBConnection(
|
||||
@@ -870,6 +892,8 @@ class AsyncLanceNamespaceDBConnection:
|
||||
storage_options_provider=storage_options_provider,
|
||||
index_cache_size=index_cache_size,
|
||||
location=response.location,
|
||||
namespace_client=self._namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
|
||||
lance_table = await asyncio.to_thread(_open_table)
|
||||
@@ -881,7 +905,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
namespace = []
|
||||
table_id = namespace + [name]
|
||||
request = DropTableRequest(id=table_id)
|
||||
self._ns.drop_table(request)
|
||||
self._namespace_client.drop_table(request)
|
||||
|
||||
async def rename_table(
|
||||
self,
|
||||
@@ -943,7 +967,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
request = ListNamespacesRequest(
|
||||
id=namespace, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._ns.list_namespaces(request)
|
||||
response = self._namespace_client.list_namespaces(request)
|
||||
return ListNamespacesResponse(
|
||||
namespaces=response.namespaces if response.namespaces else [],
|
||||
page_token=response.page_token,
|
||||
@@ -978,7 +1002,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
mode=_normalize_create_namespace_mode(mode),
|
||||
properties=properties,
|
||||
)
|
||||
response = self._ns.create_namespace(request)
|
||||
response = self._namespace_client.create_namespace(request)
|
||||
return CreateNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
@@ -1012,7 +1036,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
mode=_normalize_drop_namespace_mode(mode),
|
||||
behavior=_normalize_drop_namespace_behavior(behavior),
|
||||
)
|
||||
response = self._ns.drop_namespace(request)
|
||||
response = self._namespace_client.drop_namespace(request)
|
||||
return DropNamespaceResponse(
|
||||
properties=(
|
||||
response.properties if hasattr(response, "properties") else None
|
||||
@@ -1039,7 +1063,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
Response containing the namespace properties.
|
||||
"""
|
||||
request = DescribeNamespaceRequest(id=namespace)
|
||||
response = self._ns.describe_namespace(request)
|
||||
response = self._namespace_client.describe_namespace(request)
|
||||
return DescribeNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
@@ -1072,7 +1096,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
if namespace is None:
|
||||
namespace = []
|
||||
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
|
||||
response = self._ns.list_tables(request)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return ListTablesResponse(
|
||||
tables=response.tables if response.tables else [],
|
||||
page_token=response.page_token,
|
||||
|
||||
@@ -1746,6 +1746,8 @@ class LanceTable(Table):
|
||||
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
location: Optional[str] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
_async: AsyncTable = None,
|
||||
):
|
||||
if namespace is None:
|
||||
@@ -1753,6 +1755,7 @@ class LanceTable(Table):
|
||||
self._conn = connection
|
||||
self._namespace = namespace
|
||||
self._location = location # Store location for use in _dataset_path
|
||||
self._namespace_client = namespace_client
|
||||
if _async is not None:
|
||||
self._table = _async
|
||||
else:
|
||||
@@ -1764,6 +1767,8 @@ class LanceTable(Table):
|
||||
storage_options_provider=storage_options_provider,
|
||||
index_cache_size=index_cache_size,
|
||||
location=location,
|
||||
namespace_client=namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1806,6 +1811,8 @@ class LanceTable(Table):
|
||||
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
location: Optional[str] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
):
|
||||
if namespace is None:
|
||||
namespace = []
|
||||
@@ -1817,6 +1824,8 @@ class LanceTable(Table):
|
||||
storage_options_provider=storage_options_provider,
|
||||
index_cache_size=index_cache_size,
|
||||
location=location,
|
||||
namespace_client=namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
|
||||
# check the dataset exists
|
||||
@@ -1848,6 +1857,16 @@ class LanceTable(Table):
|
||||
"Please install with `pip install pylance`."
|
||||
)
|
||||
|
||||
if self._namespace_client is not None:
|
||||
table_id = self._namespace + [self.name]
|
||||
return lance.dataset(
|
||||
version=self.version,
|
||||
storage_options=self._conn.storage_options,
|
||||
namespace=self._namespace_client,
|
||||
table_id=table_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return lance.dataset(
|
||||
self._dataset_path,
|
||||
version=self.version,
|
||||
@@ -2713,6 +2732,7 @@ class LanceTable(Table):
|
||||
data_storage_version: Optional[str] = None,
|
||||
enable_v2_manifest_paths: Optional[bool] = None,
|
||||
location: Optional[str] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
):
|
||||
"""
|
||||
Create a new table.
|
||||
@@ -2773,6 +2793,7 @@ class LanceTable(Table):
|
||||
self._conn = db
|
||||
self._namespace = namespace
|
||||
self._location = location
|
||||
self._namespace_client = namespace_client
|
||||
|
||||
if data_storage_version is not None:
|
||||
warnings.warn(
|
||||
|
||||
@@ -326,6 +326,24 @@ def test_add_struct(mem_db: DBConnection):
|
||||
table = mem_db.create_table("test2", schema=schema)
|
||||
table.add(data)
|
||||
|
||||
struct_type = pa.struct(
|
||||
[
|
||||
("b", pa.int64()),
|
||||
("a", pa.int64()),
|
||||
]
|
||||
)
|
||||
expected = pa.table(
|
||||
{
|
||||
"s_list": [
|
||||
[
|
||||
pa.scalar({"b": 1, "a": 2}, type=struct_type),
|
||||
pa.scalar({"b": 4, "a": None}, type=struct_type),
|
||||
]
|
||||
],
|
||||
}
|
||||
)
|
||||
assert table.to_arrow() == expected
|
||||
|
||||
|
||||
def test_add_subschema(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
|
||||
@@ -17,7 +17,8 @@ use pyo3::{
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
use crate::{
|
||||
error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table,
|
||||
error::PythonErrorExt, namespace::extract_namespace_arc,
|
||||
storage_options::py_object_to_storage_options_provider, table::Table,
|
||||
};
|
||||
|
||||
#[pyclass]
|
||||
@@ -182,7 +183,8 @@ impl Connection {
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None, namespace_client=None, managed_versioning=None))]
|
||||
pub fn open_table(
|
||||
self_: PyRef<'_, Self>,
|
||||
name: String,
|
||||
@@ -191,11 +193,13 @@ impl Connection {
|
||||
storage_options_provider: Option<Py<PyAny>>,
|
||||
index_cache_size: Option<u32>,
|
||||
location: Option<String>,
|
||||
namespace_client: Option<Py<PyAny>>,
|
||||
managed_versioning: Option<bool>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.get_inner()?.clone();
|
||||
|
||||
let mut builder = inner.open_table(name);
|
||||
builder = builder.namespace(namespace);
|
||||
builder = builder.namespace(namespace.clone());
|
||||
if let Some(storage_options) = storage_options {
|
||||
builder = builder.storage_options(storage_options);
|
||||
}
|
||||
@@ -209,6 +213,20 @@ impl Connection {
|
||||
if let Some(location) = location {
|
||||
builder = builder.location(location);
|
||||
}
|
||||
// Extract namespace client from Python object if provided
|
||||
let ns_client = if let Some(ns_obj) = namespace_client {
|
||||
let py = self_.py();
|
||||
Some(extract_namespace_arc(py, ns_obj)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if let Some(ns_client) = ns_client {
|
||||
builder = builder.namespace_client(ns_client);
|
||||
}
|
||||
// Pass managed_versioning if provided to avoid redundant describe_table call
|
||||
if let Some(enabled) = managed_versioning {
|
||||
builder = builder.managed_versioning(enabled);
|
||||
}
|
||||
|
||||
future_into_py(self_.py(), async move {
|
||||
let table = builder.execute().await.infer_error()?;
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod connection;
|
||||
pub mod error;
|
||||
pub mod header;
|
||||
pub mod index;
|
||||
pub mod namespace;
|
||||
pub mod permutation;
|
||||
pub mod query;
|
||||
pub mod session;
|
||||
|
||||
746
python/src/namespace.rs
Normal file
746
python/src/namespace.rs
Normal file
@@ -0,0 +1,746 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Namespace utilities for Python bindings
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use lance_namespace::LanceNamespace as LanceNamespaceTrait;
|
||||
use lance_namespace::models::*;
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::PyDict;
|
||||
|
||||
/// Wrapper that allows any Python object implementing LanceNamespace protocol
|
||||
/// to be used as a Rust LanceNamespace.
|
||||
///
|
||||
/// This is similar to PyLanceNamespace in lance's Python bindings - it wraps a Python
|
||||
/// object and calls back into Python when namespace methods are invoked.
|
||||
pub struct PyLanceNamespace {
|
||||
py_namespace: Arc<Py<PyAny>>,
|
||||
namespace_id: String,
|
||||
}
|
||||
|
||||
impl PyLanceNamespace {
|
||||
/// Create a new PyLanceNamespace wrapper around a Python namespace object.
|
||||
pub fn new(_py: Python<'_>, py_namespace: &Bound<'_, PyAny>) -> PyResult<Self> {
|
||||
let namespace_id = py_namespace
|
||||
.call_method0("namespace_id")?
|
||||
.extract::<String>()?;
|
||||
|
||||
Ok(Self {
|
||||
py_namespace: Arc::new(py_namespace.clone().unbind()),
|
||||
namespace_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create an Arc<dyn LanceNamespace> from a Python namespace object.
|
||||
pub fn create_arc(
|
||||
py: Python<'_>,
|
||||
py_namespace: &Bound<'_, PyAny>,
|
||||
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
|
||||
let wrapper = Self::new(py, py_namespace)?;
|
||||
Ok(Arc::new(wrapper))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PyLanceNamespace {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "PyLanceNamespace {{ id: {} }}", self.namespace_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create the DictWithModelDump class in Python.
|
||||
/// This class acts like a dict but also has model_dump() method.
|
||||
/// This allows it to work with both:
|
||||
/// - depythonize (which expects a dict/Mapping)
|
||||
/// - Python code that calls .model_dump() (like DirectoryNamespace wrapper)
|
||||
fn get_dict_with_model_dump_class(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
|
||||
// Use a module-level cache via __builtins__
|
||||
let builtins = py.import("builtins")?;
|
||||
if builtins.hasattr("_DictWithModelDump")? {
|
||||
return builtins.getattr("_DictWithModelDump");
|
||||
}
|
||||
|
||||
// Create the class using exec
|
||||
let locals = PyDict::new(py);
|
||||
py.run(
|
||||
c"class DictWithModelDump(dict):
|
||||
def model_dump(self):
|
||||
return dict(self)",
|
||||
None,
|
||||
Some(&locals),
|
||||
)?;
|
||||
let class = locals.get_item("DictWithModelDump")?.ok_or_else(|| {
|
||||
pyo3::exceptions::PyRuntimeError::new_err("Failed to create DictWithModelDump class")
|
||||
})?;
|
||||
|
||||
// Cache it
|
||||
builtins.setattr("_DictWithModelDump", &class)?;
|
||||
Ok(class)
|
||||
}
|
||||
|
||||
/// Helper to call a Python namespace method with JSON serialization.
|
||||
/// For methods that take a request and return a response.
|
||||
/// Uses DictWithModelDump to pass a dict that also has model_dump() method,
|
||||
/// making it compatible with both depythonize and Python wrappers.
|
||||
async fn call_py_method<Req, Resp>(
|
||||
py_namespace: Arc<Py<PyAny>>,
|
||||
method_name: &'static str,
|
||||
request: Req,
|
||||
) -> lance_core::Result<Resp>
|
||||
where
|
||||
Req: serde::Serialize + Send + 'static,
|
||||
Resp: serde::de::DeserializeOwned + Send + 'static,
|
||||
{
|
||||
let request_json = serde_json::to_string(&request).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to serialize request for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let response_json = tokio::task::spawn_blocking(move || {
|
||||
Python::attach(|py| {
|
||||
let json_module = py.import("json")?;
|
||||
let request_dict = json_module.call_method1("loads", (&request_json,))?;
|
||||
|
||||
// Wrap dict in DictWithModelDump so it works with both depythonize and .model_dump()
|
||||
let dict_class = get_dict_with_model_dump_class(py)?;
|
||||
let request_arg = dict_class.call1((request_dict,))?;
|
||||
|
||||
// Call the Python method
|
||||
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
|
||||
|
||||
// Convert response to dict, then to JSON
|
||||
// Pydantic models have model_dump() method
|
||||
let result_dict = if result.bind(py).hasattr("model_dump")? {
|
||||
result.call_method0(py, "model_dump")?
|
||||
} else {
|
||||
result
|
||||
};
|
||||
let response_json: String = json_module
|
||||
.call_method1("dumps", (result_dict,))?
|
||||
.extract()?;
|
||||
Ok::<_, PyErr>(response_json)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Task join error for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?
|
||||
.map_err(|e: PyErr| {
|
||||
lance_core::Error::io(
|
||||
format!("Python error in {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
serde_json::from_str(&response_json).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to deserialize response from {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper for methods that return () on success
|
||||
async fn call_py_method_unit<Req>(
|
||||
py_namespace: Arc<Py<PyAny>>,
|
||||
method_name: &'static str,
|
||||
request: Req,
|
||||
) -> lance_core::Result<()>
|
||||
where
|
||||
Req: serde::Serialize + Send + 'static,
|
||||
{
|
||||
let request_json = serde_json::to_string(&request).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to serialize request for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Python::attach(|py| {
|
||||
let json_module = py.import("json")?;
|
||||
let request_dict = json_module.call_method1("loads", (&request_json,))?;
|
||||
|
||||
// Wrap dict in DictWithModelDump
|
||||
let dict_class = get_dict_with_model_dump_class(py)?;
|
||||
let request_arg = dict_class.call1((request_dict,))?;
|
||||
|
||||
// Call the Python method
|
||||
py_namespace.call_method1(py, method_name, (request_arg,))?;
|
||||
Ok::<_, PyErr>(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Task join error for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?
|
||||
.map_err(|e: PyErr| {
|
||||
lance_core::Error::io(
|
||||
format!("Python error in {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper for methods that return a primitive type
|
||||
async fn call_py_method_primitive<Req, Resp>(
|
||||
py_namespace: Arc<Py<PyAny>>,
|
||||
method_name: &'static str,
|
||||
request: Req,
|
||||
) -> lance_core::Result<Resp>
|
||||
where
|
||||
Req: serde::Serialize + Send + 'static,
|
||||
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
|
||||
{
|
||||
let request_json = serde_json::to_string(&request).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to serialize request for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Python::attach(|py| {
|
||||
let json_module = py.import("json")?;
|
||||
let request_dict = json_module.call_method1("loads", (&request_json,))?;
|
||||
|
||||
// Wrap dict in DictWithModelDump
|
||||
let dict_class = get_dict_with_model_dump_class(py)?;
|
||||
let request_arg = dict_class.call1((request_dict,))?;
|
||||
|
||||
// Call the Python method
|
||||
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
|
||||
let value: Resp = result.extract(py)?;
|
||||
Ok::<_, PyErr>(value)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Task join error for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?
|
||||
.map_err(|e: PyErr| {
|
||||
lance_core::Error::io(
|
||||
format!("Python error in {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper for methods that return Bytes
|
||||
async fn call_py_method_bytes<Req>(
|
||||
py_namespace: Arc<Py<PyAny>>,
|
||||
method_name: &'static str,
|
||||
request: Req,
|
||||
) -> lance_core::Result<Bytes>
|
||||
where
|
||||
Req: serde::Serialize + Send + 'static,
|
||||
{
|
||||
let request_json = serde_json::to_string(&request).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to serialize request for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Python::attach(|py| {
|
||||
let json_module = py.import("json")?;
|
||||
let request_dict = json_module.call_method1("loads", (&request_json,))?;
|
||||
|
||||
// Wrap dict in DictWithModelDump
|
||||
let dict_class = get_dict_with_model_dump_class(py)?;
|
||||
let request_arg = dict_class.call1((request_dict,))?;
|
||||
|
||||
// Call the Python method
|
||||
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
|
||||
let bytes_data: Vec<u8> = result.extract(py)?;
|
||||
Ok::<_, PyErr>(Bytes::from(bytes_data))
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Task join error for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?
|
||||
.map_err(|e: PyErr| {
|
||||
lance_core::Error::io(
|
||||
format!("Python error in {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper for methods that take request + data and return a response
|
||||
async fn call_py_method_with_data<Req, Resp>(
|
||||
py_namespace: Arc<Py<PyAny>>,
|
||||
method_name: &'static str,
|
||||
request: Req,
|
||||
data: Bytes,
|
||||
) -> lance_core::Result<Resp>
|
||||
where
|
||||
Req: serde::Serialize + Send + 'static,
|
||||
Resp: serde::de::DeserializeOwned + Send + 'static,
|
||||
{
|
||||
let request_json = serde_json::to_string(&request).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to serialize request for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let response_json = tokio::task::spawn_blocking(move || {
|
||||
Python::attach(|py| {
|
||||
let json_module = py.import("json")?;
|
||||
let request_dict = json_module.call_method1("loads", (&request_json,))?;
|
||||
|
||||
// Wrap dict in DictWithModelDump
|
||||
let dict_class = get_dict_with_model_dump_class(py)?;
|
||||
let request_arg = dict_class.call1((request_dict,))?;
|
||||
|
||||
// Pass request and bytes to Python method
|
||||
let py_bytes = pyo3::types::PyBytes::new(py, &data);
|
||||
let result = py_namespace.call_method1(py, method_name, (request_arg, py_bytes))?;
|
||||
|
||||
// Convert response dict to JSON
|
||||
let response_json: String = json_module.call_method1("dumps", (result,))?.extract()?;
|
||||
Ok::<_, PyErr>(response_json)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Task join error for {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?
|
||||
.map_err(|e: PyErr| {
|
||||
lance_core::Error::io(
|
||||
format!("Python error in {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})?;
|
||||
|
||||
serde_json::from_str(&response_json).map_err(|e| {
|
||||
lance_core::Error::io(
|
||||
format!("Failed to deserialize response from {}: {}", method_name, e),
|
||||
Default::default(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LanceNamespaceTrait for PyLanceNamespace {
|
||||
fn namespace_id(&self) -> String {
|
||||
self.namespace_id.clone()
|
||||
}
|
||||
|
||||
async fn list_namespaces(
|
||||
&self,
|
||||
request: ListNamespacesRequest,
|
||||
) -> lance_core::Result<ListNamespacesResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "list_namespaces", request).await
|
||||
}
|
||||
|
||||
async fn describe_namespace(
|
||||
&self,
|
||||
request: DescribeNamespaceRequest,
|
||||
) -> lance_core::Result<DescribeNamespaceResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "describe_namespace", request).await
|
||||
}
|
||||
|
||||
async fn create_namespace(
|
||||
&self,
|
||||
request: CreateNamespaceRequest,
|
||||
) -> lance_core::Result<CreateNamespaceResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "create_namespace", request).await
|
||||
}
|
||||
|
||||
async fn drop_namespace(
|
||||
&self,
|
||||
request: DropNamespaceRequest,
|
||||
) -> lance_core::Result<DropNamespaceResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "drop_namespace", request).await
|
||||
}
|
||||
|
||||
async fn namespace_exists(&self, request: NamespaceExistsRequest) -> lance_core::Result<()> {
|
||||
call_py_method_unit(self.py_namespace.clone(), "namespace_exists", request).await
|
||||
}
|
||||
|
||||
async fn list_tables(
|
||||
&self,
|
||||
request: ListTablesRequest,
|
||||
) -> lance_core::Result<ListTablesResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "list_tables", request).await
|
||||
}
|
||||
|
||||
async fn describe_table(
|
||||
&self,
|
||||
request: DescribeTableRequest,
|
||||
) -> lance_core::Result<DescribeTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "describe_table", request).await
|
||||
}
|
||||
|
||||
async fn register_table(
|
||||
&self,
|
||||
request: RegisterTableRequest,
|
||||
) -> lance_core::Result<RegisterTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "register_table", request).await
|
||||
}
|
||||
|
||||
async fn table_exists(&self, request: TableExistsRequest) -> lance_core::Result<()> {
|
||||
call_py_method_unit(self.py_namespace.clone(), "table_exists", request).await
|
||||
}
|
||||
|
||||
async fn drop_table(&self, request: DropTableRequest) -> lance_core::Result<DropTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "drop_table", request).await
|
||||
}
|
||||
|
||||
async fn deregister_table(
|
||||
&self,
|
||||
request: DeregisterTableRequest,
|
||||
) -> lance_core::Result<DeregisterTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "deregister_table", request).await
|
||||
}
|
||||
|
||||
async fn count_table_rows(&self, request: CountTableRowsRequest) -> lance_core::Result<i64> {
|
||||
call_py_method_primitive(self.py_namespace.clone(), "count_table_rows", request).await
|
||||
}
|
||||
|
||||
async fn create_table(
|
||||
&self,
|
||||
request: CreateTableRequest,
|
||||
request_data: Bytes,
|
||||
) -> lance_core::Result<CreateTableResponse> {
|
||||
call_py_method_with_data(
|
||||
self.py_namespace.clone(),
|
||||
"create_table",
|
||||
request,
|
||||
request_data,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn declare_table(
|
||||
&self,
|
||||
request: DeclareTableRequest,
|
||||
) -> lance_core::Result<DeclareTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "declare_table", request).await
|
||||
}
|
||||
|
||||
async fn insert_into_table(
|
||||
&self,
|
||||
request: InsertIntoTableRequest,
|
||||
request_data: Bytes,
|
||||
) -> lance_core::Result<InsertIntoTableResponse> {
|
||||
call_py_method_with_data(
|
||||
self.py_namespace.clone(),
|
||||
"insert_into_table",
|
||||
request,
|
||||
request_data,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn merge_insert_into_table(
|
||||
&self,
|
||||
request: MergeInsertIntoTableRequest,
|
||||
request_data: Bytes,
|
||||
) -> lance_core::Result<MergeInsertIntoTableResponse> {
|
||||
call_py_method_with_data(
|
||||
self.py_namespace.clone(),
|
||||
"merge_insert_into_table",
|
||||
request,
|
||||
request_data,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn update_table(
|
||||
&self,
|
||||
request: UpdateTableRequest,
|
||||
) -> lance_core::Result<UpdateTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "update_table", request).await
|
||||
}
|
||||
|
||||
async fn delete_from_table(
|
||||
&self,
|
||||
request: DeleteFromTableRequest,
|
||||
) -> lance_core::Result<DeleteFromTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "delete_from_table", request).await
|
||||
}
|
||||
|
||||
async fn query_table(&self, request: QueryTableRequest) -> lance_core::Result<Bytes> {
|
||||
call_py_method_bytes(self.py_namespace.clone(), "query_table", request).await
|
||||
}
|
||||
|
||||
async fn create_table_index(
|
||||
&self,
|
||||
request: CreateTableIndexRequest,
|
||||
) -> lance_core::Result<CreateTableIndexResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "create_table_index", request).await
|
||||
}
|
||||
|
||||
async fn list_table_indices(
|
||||
&self,
|
||||
request: ListTableIndicesRequest,
|
||||
) -> lance_core::Result<ListTableIndicesResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "list_table_indices", request).await
|
||||
}
|
||||
|
||||
async fn describe_table_index_stats(
|
||||
&self,
|
||||
request: DescribeTableIndexStatsRequest,
|
||||
) -> lance_core::Result<DescribeTableIndexStatsResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"describe_table_index_stats",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn describe_transaction(
|
||||
&self,
|
||||
request: DescribeTransactionRequest,
|
||||
) -> lance_core::Result<DescribeTransactionResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "describe_transaction", request).await
|
||||
}
|
||||
|
||||
async fn alter_transaction(
|
||||
&self,
|
||||
request: AlterTransactionRequest,
|
||||
) -> lance_core::Result<AlterTransactionResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "alter_transaction", request).await
|
||||
}
|
||||
|
||||
async fn create_table_scalar_index(
|
||||
&self,
|
||||
request: CreateTableIndexRequest,
|
||||
) -> lance_core::Result<CreateTableScalarIndexResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"create_table_scalar_index",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn drop_table_index(
|
||||
&self,
|
||||
request: DropTableIndexRequest,
|
||||
) -> lance_core::Result<DropTableIndexResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "drop_table_index", request).await
|
||||
}
|
||||
|
||||
async fn list_all_tables(
|
||||
&self,
|
||||
request: ListTablesRequest,
|
||||
) -> lance_core::Result<ListTablesResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "list_all_tables", request).await
|
||||
}
|
||||
|
||||
async fn restore_table(
|
||||
&self,
|
||||
request: RestoreTableRequest,
|
||||
) -> lance_core::Result<RestoreTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "restore_table", request).await
|
||||
}
|
||||
|
||||
async fn rename_table(
|
||||
&self,
|
||||
request: RenameTableRequest,
|
||||
) -> lance_core::Result<RenameTableResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "rename_table", request).await
|
||||
}
|
||||
|
||||
async fn list_table_versions(
|
||||
&self,
|
||||
request: ListTableVersionsRequest,
|
||||
) -> lance_core::Result<ListTableVersionsResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "list_table_versions", request).await
|
||||
}
|
||||
|
||||
async fn create_table_version(
|
||||
&self,
|
||||
request: CreateTableVersionRequest,
|
||||
) -> lance_core::Result<CreateTableVersionResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "create_table_version", request).await
|
||||
}
|
||||
|
||||
async fn describe_table_version(
|
||||
&self,
|
||||
request: DescribeTableVersionRequest,
|
||||
) -> lance_core::Result<DescribeTableVersionResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "describe_table_version", request).await
|
||||
}
|
||||
|
||||
async fn batch_delete_table_versions(
|
||||
&self,
|
||||
request: BatchDeleteTableVersionsRequest,
|
||||
) -> lance_core::Result<BatchDeleteTableVersionsResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"batch_delete_table_versions",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn update_table_schema_metadata(
|
||||
&self,
|
||||
request: UpdateTableSchemaMetadataRequest,
|
||||
) -> lance_core::Result<UpdateTableSchemaMetadataResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"update_table_schema_metadata",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_table_stats(
|
||||
&self,
|
||||
request: GetTableStatsRequest,
|
||||
) -> lance_core::Result<GetTableStatsResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "get_table_stats", request).await
|
||||
}
|
||||
|
||||
async fn explain_table_query_plan(
|
||||
&self,
|
||||
request: ExplainTableQueryPlanRequest,
|
||||
) -> lance_core::Result<String> {
|
||||
call_py_method_primitive(
|
||||
self.py_namespace.clone(),
|
||||
"explain_table_query_plan",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn analyze_table_query_plan(
|
||||
&self,
|
||||
request: AnalyzeTableQueryPlanRequest,
|
||||
) -> lance_core::Result<String> {
|
||||
call_py_method_primitive(
|
||||
self.py_namespace.clone(),
|
||||
"analyze_table_query_plan",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn alter_table_add_columns(
|
||||
&self,
|
||||
request: AlterTableAddColumnsRequest,
|
||||
) -> lance_core::Result<AlterTableAddColumnsResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"alter_table_add_columns",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn alter_table_alter_columns(
|
||||
&self,
|
||||
request: AlterTableAlterColumnsRequest,
|
||||
) -> lance_core::Result<AlterTableAlterColumnsResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"alter_table_alter_columns",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn alter_table_drop_columns(
|
||||
&self,
|
||||
request: AlterTableDropColumnsRequest,
|
||||
) -> lance_core::Result<AlterTableDropColumnsResponse> {
|
||||
call_py_method(
|
||||
self.py_namespace.clone(),
|
||||
"alter_table_drop_columns",
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn list_table_tags(
|
||||
&self,
|
||||
request: ListTableTagsRequest,
|
||||
) -> lance_core::Result<ListTableTagsResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "list_table_tags", request).await
|
||||
}
|
||||
|
||||
async fn create_table_tag(
|
||||
&self,
|
||||
request: CreateTableTagRequest,
|
||||
) -> lance_core::Result<CreateTableTagResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "create_table_tag", request).await
|
||||
}
|
||||
|
||||
async fn delete_table_tag(
|
||||
&self,
|
||||
request: DeleteTableTagRequest,
|
||||
) -> lance_core::Result<DeleteTableTagResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "delete_table_tag", request).await
|
||||
}
|
||||
|
||||
async fn update_table_tag(
|
||||
&self,
|
||||
request: UpdateTableTagRequest,
|
||||
) -> lance_core::Result<UpdateTableTagResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "update_table_tag", request).await
|
||||
}
|
||||
|
||||
async fn get_table_tag_version(
|
||||
&self,
|
||||
request: GetTableTagVersionRequest,
|
||||
) -> lance_core::Result<GetTableTagVersionResponse> {
|
||||
call_py_method(self.py_namespace.clone(), "get_table_tag_version", request).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert Python dict to HashMap<String, String>
|
||||
#[allow(dead_code)]
|
||||
fn dict_to_hashmap(dict: &Bound<'_, PyDict>) -> PyResult<HashMap<String, String>> {
|
||||
let mut map = HashMap::new();
|
||||
for (key, value) in dict.iter() {
|
||||
let key_str: String = key.extract()?;
|
||||
let value_str: String = value.extract()?;
|
||||
map.insert(key_str, value_str);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
/// Extract an Arc<dyn LanceNamespace> from a Python namespace object.
|
||||
///
|
||||
/// This function wraps any Python namespace object with PyLanceNamespace.
|
||||
/// The PyLanceNamespace wrapper uses DictWithModelDump to pass requests,
|
||||
/// which works with both:
|
||||
/// - Native namespaces (DirectoryNamespace, RestNamespace) that use depythonize (expects dict)
|
||||
/// - Custom Python implementations that call .model_dump() on the request
|
||||
pub fn extract_namespace_arc(
|
||||
py: Python<'_>,
|
||||
ns: Py<PyAny>,
|
||||
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
|
||||
let ns_ref = ns.bind(py);
|
||||
PyLanceNamespace::create_arc(py, ns_ref)
|
||||
}
|
||||
4
python/uv.lock
generated
4
python/uv.lock
generated
@@ -2006,7 +2006,7 @@ requires-dist = [
|
||||
{ name = "botocore", marker = "extra == 'embeddings'", specifier = ">=1.31.57" },
|
||||
{ name = "cohere", marker = "extra == 'embeddings'" },
|
||||
{ name = "colpali-engine", marker = "extra == 'embeddings'", specifier = ">=0.3.10" },
|
||||
{ name = "datafusion", marker = "extra == 'tests'" },
|
||||
{ name = "datafusion", marker = "extra == 'tests'", specifier = "<52" },
|
||||
{ name = "deprecation" },
|
||||
{ name = "duckdb", marker = "extra == 'tests'" },
|
||||
{ name = "google-generativeai", marker = "extra == 'embeddings'" },
|
||||
@@ -2035,7 +2035,7 @@ requires-dist = [
|
||||
{ name = "pyarrow-stubs", marker = "extra == 'tests'" },
|
||||
{ name = "pydantic", specifier = ">=1.10" },
|
||||
{ name = "pylance", marker = "extra == 'pylance'", specifier = ">=1.0.0b14" },
|
||||
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14" },
|
||||
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14,<3.0.0" },
|
||||
{ name = "pyright", marker = "extra == 'dev'" },
|
||||
{ name = "pytest", marker = "extra == 'tests'" },
|
||||
{ name = "pytest-asyncio", marker = "extra == 'tests'" },
|
||||
|
||||
@@ -136,6 +136,7 @@ impl OpenTableBuilder {
|
||||
lance_read_params: None,
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
managed_versioning: None,
|
||||
},
|
||||
embedding_registry,
|
||||
}
|
||||
@@ -235,6 +236,29 @@ impl OpenTableBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a namespace client for managed versioning support.
|
||||
///
|
||||
/// When a namespace client is provided and the table has `managed_versioning` enabled,
|
||||
/// the table will use the namespace's commit handler to notify the namespace of
|
||||
/// version changes. This enables features like event emission for table modifications.
|
||||
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
|
||||
self.request.namespace_client = Some(client);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether managed versioning is enabled for this table.
|
||||
///
|
||||
/// When set to `Some(true)`, the table will use namespace-managed commits.
|
||||
/// When set to `Some(false)`, the table will use local commits even if namespace_client is set.
|
||||
/// When set to `None` (default), the value will be fetched from the namespace if namespace_client is set.
|
||||
///
|
||||
/// This is typically set when the caller has already queried the namespace and knows the
|
||||
/// managed_versioning status, avoiding a redundant describe_table call.
|
||||
pub fn managed_versioning(mut self, enabled: bool) -> Self {
|
||||
self.request.managed_versioning = Some(enabled);
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table
|
||||
pub async fn execute(self) -> Result<Table> {
|
||||
let table = self.parent.open_table(self.request).await?;
|
||||
@@ -294,6 +318,12 @@ impl CloneTableBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a namespace client for managed versioning support.
|
||||
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
|
||||
self.request.namespace_client = Some(client);
|
||||
self
|
||||
}
|
||||
|
||||
/// Execute the clone operation
|
||||
pub async fn execute(self) -> Result<Table> {
|
||||
let parent = self.parent.clone();
|
||||
|
||||
@@ -66,6 +66,10 @@ pub struct OpenTableRequest {
|
||||
/// Optional namespace client for server-side query execution.
|
||||
/// When set, queries will be executed on the namespace server instead of locally.
|
||||
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
/// Whether managed versioning is enabled for this table.
|
||||
/// When Some(true), the table will use namespace-managed commits instead of local commits.
|
||||
/// When None and namespace_client is provided, the value will be fetched from the namespace.
|
||||
pub managed_versioning: Option<bool>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for OpenTableRequest {
|
||||
@@ -77,6 +81,7 @@ impl std::fmt::Debug for OpenTableRequest {
|
||||
.field("lance_read_params", &self.lance_read_params)
|
||||
.field("location", &self.location)
|
||||
.field("namespace_client", &self.namespace_client)
|
||||
.field("managed_versioning", &self.managed_versioning)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -161,6 +166,9 @@ pub struct CloneTableRequest {
|
||||
/// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true.
|
||||
/// Currently only shallow clone is supported.
|
||||
pub is_shallow: bool,
|
||||
/// Optional namespace client for managed versioning support.
|
||||
/// When set, enables the commit handler to track table versions through the namespace.
|
||||
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
}
|
||||
|
||||
impl CloneTableRequest {
|
||||
@@ -172,6 +180,7 @@ impl CloneTableRequest {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -669,6 +669,7 @@ impl ListingDatabase {
|
||||
lance_read_params: None,
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
managed_versioning: None,
|
||||
};
|
||||
let req = (callback)(req);
|
||||
let table = self.open_table(req).await?;
|
||||
@@ -869,6 +870,7 @@ impl Database for ListingDatabase {
|
||||
Some(write_params),
|
||||
self.read_consistency_interval,
|
||||
request.namespace_client,
|
||||
false, // server_side_query_enabled - listing database doesn't support server-side queries
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -946,7 +948,9 @@ impl Database for ListingDatabase {
|
||||
self.store_wrapper.clone(),
|
||||
None,
|
||||
self.read_consistency_interval,
|
||||
None,
|
||||
request.namespace_client,
|
||||
false, // server_side_query_enabled - listing database doesn't support server-side queries
|
||||
None, // managed_versioning - will be queried if namespace_client is provided
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1022,6 +1026,8 @@ impl Database for ListingDatabase {
|
||||
Some(read_params),
|
||||
self.read_consistency_interval,
|
||||
request.namespace_client,
|
||||
false, // server_side_query_enabled - listing database doesn't support server-side queries
|
||||
request.managed_versioning, // Pass through managed_versioning from request
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
@@ -1162,6 +1168,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1222,6 +1229,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1281,6 +1289,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1317,6 +1326,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: false, // Request deep clone
|
||||
namespace_client: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1357,6 +1367,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1397,6 +1408,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1416,6 +1428,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1452,6 +1465,7 @@ mod tests {
|
||||
source_version: Some(1),
|
||||
source_tag: Some("v1.0".to_string()),
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1525,6 +1539,7 @@ mod tests {
|
||||
source_version: Some(initial_version),
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1603,6 +1618,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: Some("v1.0".to_string()),
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1654,6 +1670,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1746,6 +1763,7 @@ mod tests {
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -7,18 +7,20 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
|
||||
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
|
||||
use lance_namespace::{
|
||||
LanceNamespace,
|
||||
models::{
|
||||
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
|
||||
DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse,
|
||||
DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
|
||||
ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
|
||||
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
|
||||
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
|
||||
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
|
||||
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
|
||||
},
|
||||
};
|
||||
use lance_namespace_impls::ConnectBuilder;
|
||||
use log::warn;
|
||||
use lance_table::io::commit::CommitHandler;
|
||||
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
||||
|
||||
use crate::database::ReadConsistency;
|
||||
use crate::error::{Error, Result};
|
||||
@@ -206,83 +208,55 @@ impl Database for LanceNamespaceDatabase {
|
||||
let mut table_id = request.namespace.clone();
|
||||
table_id.push(request.name.clone());
|
||||
|
||||
// Try declare_table first, falling back to create_empty_table for backwards
|
||||
// compatibility with older namespace clients that don't support declare_table
|
||||
let declare_request = DeclareTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (location, initial_storage_options) =
|
||||
match self.namespace.declare_table(declare_request).await {
|
||||
Ok(response) => {
|
||||
let loc = response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from declare_table response"
|
||||
.to_string(),
|
||||
})?;
|
||||
// Use storage options from response, fall back to self.storage_options
|
||||
let opts = response
|
||||
.storage_options
|
||||
.or_else(|| Some(self.storage_options.clone()))
|
||||
.filter(|o| !o.is_empty());
|
||||
(loc, opts)
|
||||
}
|
||||
Err(e) => {
|
||||
// Check if the error is "not supported" and try create_empty_table as fallback
|
||||
let err_str = e.to_string().to_lowercase();
|
||||
if err_str.contains("not supported") || err_str.contains("not implemented") {
|
||||
warn!(
|
||||
"declare_table is not supported by the namespace client, \
|
||||
falling back to deprecated create_empty_table. \
|
||||
create_empty_table is deprecated and will be removed in Lance 3.0.0. \
|
||||
Please upgrade your namespace client to support declare_table."
|
||||
);
|
||||
#[allow(deprecated)]
|
||||
let create_empty_request = CreateEmptyTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let response = self
|
||||
.namespace
|
||||
.declare_table(declare_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to declare table: {}", e),
|
||||
})?;
|
||||
|
||||
#[allow(deprecated)]
|
||||
let create_response = self
|
||||
.namespace
|
||||
.create_empty_table(create_empty_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to create empty table: {}", e),
|
||||
})?;
|
||||
let location = response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from declare_table response".to_string(),
|
||||
})?;
|
||||
|
||||
let loc = create_response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from create_empty_table response"
|
||||
.to_string(),
|
||||
})?;
|
||||
// For deprecated path, use self.storage_options
|
||||
let opts = if self.storage_options.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.storage_options.clone())
|
||||
};
|
||||
(loc, opts)
|
||||
} else {
|
||||
return Err(Error::Runtime {
|
||||
message: format!("Failed to declare table: {}", e),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
// Use storage options from response, fall back to self.storage_options
|
||||
let initial_storage_options = response
|
||||
.storage_options
|
||||
.or_else(|| Some(self.storage_options.clone()))
|
||||
.filter(|o| !o.is_empty());
|
||||
|
||||
let write_params = if let Some(storage_opts) = initial_storage_options {
|
||||
let mut params = request.write_options.lance_write_params.unwrap_or_default();
|
||||
let managed_versioning = response.managed_versioning;
|
||||
|
||||
// Build write params with storage options and commit handler
|
||||
let mut params = request.write_options.lance_write_params.unwrap_or_default();
|
||||
|
||||
// Set up storage options if provided
|
||||
if let Some(storage_opts) = initial_storage_options {
|
||||
let store_params = params
|
||||
.store_params
|
||||
.get_or_insert_with(ObjectStoreParams::default);
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(storage_opts),
|
||||
));
|
||||
Some(params)
|
||||
} else {
|
||||
request.write_options.lance_write_params
|
||||
};
|
||||
}
|
||||
|
||||
// Set up commit handler when managed_versioning is enabled
|
||||
if managed_versioning == Some(true) {
|
||||
let external_store =
|
||||
LanceNamespaceExternalManifestStore::new(self.namespace.clone(), table_id.clone());
|
||||
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
|
||||
external_manifest_store: Arc::new(external_store),
|
||||
});
|
||||
params.commit_handler = Some(commit_handler);
|
||||
}
|
||||
|
||||
let write_params = Some(params);
|
||||
|
||||
let native_table = NativeTable::create_from_namespace(
|
||||
self.namespace.clone(),
|
||||
|
||||
@@ -464,6 +464,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
|
||||
lance_read_params: None,
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
managed_versioning: None,
|
||||
};
|
||||
let req = (callback)(req);
|
||||
self.open_table(req).await
|
||||
|
||||
@@ -34,9 +34,13 @@ use lance_index::vector::sq::builder::SQBuildParams;
|
||||
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
|
||||
pub use query::AnyQuery;
|
||||
|
||||
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
|
||||
use lance_namespace::LanceNamespace;
|
||||
use lance_namespace::models::DescribeTableRequest;
|
||||
use lance_table::format::Manifest;
|
||||
use lance_table::io::commit::CommitHandler;
|
||||
use lance_table::io::commit::ManifestNamingScheme;
|
||||
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::format;
|
||||
@@ -1212,10 +1216,13 @@ pub struct NativeTable {
|
||||
// This comes from the connection options. We store here so we can pass down
|
||||
// to the dataset when we recreate it (for example, in checkout_latest).
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
// Optional namespace client for server-side query execution.
|
||||
// When set, queries will be executed on the namespace server instead of locally.
|
||||
// pub (crate) namespace_client so query.rs can access the fields
|
||||
// Optional namespace client for namespace operations (e.g., managed versioning).
|
||||
// pub(crate) so query.rs can access the field for server-side query execution.
|
||||
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
// Whether to enable server-side query execution via the namespace client.
|
||||
// When true and namespace_client is set, queries will be executed on the
|
||||
// namespace server instead of locally.
|
||||
pub(crate) server_side_query_enabled: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NativeTable {
|
||||
@@ -1227,6 +1234,7 @@ impl std::fmt::Debug for NativeTable {
|
||||
.field("uri", &self.uri)
|
||||
.field("read_consistency_interval", &self.read_consistency_interval)
|
||||
.field("namespace_client", &self.namespace_client)
|
||||
.field("server_side_query_enabled", &self.server_side_query_enabled)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -1263,7 +1271,7 @@ impl NativeTable {
|
||||
/// * A [NativeTable] object.
|
||||
pub async fn open(uri: &str) -> Result<Self> {
|
||||
let name = Self::get_table_name(uri)?;
|
||||
Self::open_with_params(uri, &name, vec![], None, None, None, None).await
|
||||
Self::open_with_params(uri, &name, vec![], None, None, None, None, false, None).await
|
||||
}
|
||||
|
||||
/// Opens an existing Table
|
||||
@@ -1273,7 +1281,10 @@ impl NativeTable {
|
||||
/// * `base_path` - The base path where the table is located
|
||||
/// * `name` The Table name
|
||||
/// * `params` The [ReadParams] to use when opening the table
|
||||
/// * `namespace_client` - Optional namespace client for server-side query execution
|
||||
/// * `namespace_client` - Optional namespace client for namespace operations
|
||||
/// * `server_side_query_enabled` - Whether to enable server-side query execution
|
||||
/// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
|
||||
/// is provided, the value will be fetched via describe_table.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@@ -1287,6 +1298,8 @@ impl NativeTable {
|
||||
params: Option<ReadParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
server_side_query_enabled: bool,
|
||||
managed_versioning: Option<bool>,
|
||||
) -> Result<Self> {
|
||||
let params = params.unwrap_or_default();
|
||||
// patch the params if we have a write store wrapper
|
||||
@@ -1295,17 +1308,54 @@ impl NativeTable {
|
||||
None => params,
|
||||
};
|
||||
|
||||
let dataset = DatasetBuilder::from_uri(uri)
|
||||
.with_read_params(params)
|
||||
.load()
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
|
||||
name: name.to_string(),
|
||||
source: Box::new(e),
|
||||
},
|
||||
e => e.into(),
|
||||
})?;
|
||||
// Build table_id from namespace + name
|
||||
let mut table_id = namespace.clone();
|
||||
table_id.push(name.to_string());
|
||||
|
||||
// Determine if managed_versioning is enabled
|
||||
// Use the provided value if available, otherwise query the namespace
|
||||
let managed_versioning = match managed_versioning {
|
||||
Some(value) => value,
|
||||
None if namespace_client.is_some() => {
|
||||
let ns_client = namespace_client.as_ref().unwrap();
|
||||
let describe_request = DescribeTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let response = ns_client
|
||||
.describe_table(describe_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!(
|
||||
"Failed to describe table via namespace client: {}. \
|
||||
If you don't need managed versioning, don't pass namespace_client.",
|
||||
e
|
||||
),
|
||||
})?;
|
||||
response.managed_versioning == Some(true)
|
||||
}
|
||||
None => false,
|
||||
};
|
||||
|
||||
let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
|
||||
|
||||
// Set up commit handler when managed_versioning is enabled
|
||||
if managed_versioning && let Some(ref ns_client) = namespace_client {
|
||||
let external_store =
|
||||
LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
|
||||
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
|
||||
external_manifest_store: Arc::new(external_store),
|
||||
});
|
||||
builder = builder.with_commit_handler(commit_handler);
|
||||
}
|
||||
|
||||
let dataset = builder.load().await.map_err(|e| match e {
|
||||
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
|
||||
name: name.to_string(),
|
||||
source: Box::new(e),
|
||||
},
|
||||
e => e.into(),
|
||||
})?;
|
||||
|
||||
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
|
||||
let id = Self::build_id(&namespace, name);
|
||||
@@ -1318,6 +1368,7 @@ impl NativeTable {
|
||||
dataset,
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
server_side_query_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1421,6 +1472,7 @@ impl NativeTable {
|
||||
dataset,
|
||||
read_consistency_interval,
|
||||
namespace_client: stored_namespace_client,
|
||||
server_side_query_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1460,7 +1512,8 @@ impl NativeTable {
|
||||
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
|
||||
/// * `batches` RecordBatch to be saved in the database.
|
||||
/// * `params` - Write parameters.
|
||||
/// * `namespace_client` - Optional namespace client for server-side query execution
|
||||
/// * `namespace_client` - Optional namespace client for namespace operations
|
||||
/// * `server_side_query_enabled` - Whether to enable server-side query execution
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@@ -1475,6 +1528,7 @@ impl NativeTable {
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
server_side_query_enabled: bool,
|
||||
) -> Result<Self> {
|
||||
// Default params uses format v1.
|
||||
let params = params.unwrap_or(WriteParams {
|
||||
@@ -1507,6 +1561,7 @@ impl NativeTable {
|
||||
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
server_side_query_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1520,6 +1575,7 @@ impl NativeTable {
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
server_side_query_enabled: bool,
|
||||
) -> Result<Self> {
|
||||
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
|
||||
Self::create(
|
||||
@@ -1531,6 +1587,7 @@ impl NativeTable {
|
||||
params,
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
server_side_query_enabled,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -1634,6 +1691,7 @@ impl NativeTable {
|
||||
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
||||
read_consistency_interval,
|
||||
namespace_client: stored_namespace_client,
|
||||
server_side_query_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2625,7 +2683,7 @@ mod tests {
|
||||
vec![Ok(batch.clone())],
|
||||
batch.schema(),
|
||||
));
|
||||
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None)
|
||||
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_cast::can_cast_types;
|
||||
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
|
||||
use datafusion::functions::core::{get_field, named_struct};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
use datafusion_physical_expr::ScalarFunctionExpr;
|
||||
use datafusion_physical_expr::expressions::{Literal, cast};
|
||||
use datafusion_physical_expr::expressions::{CastExpr, Literal};
|
||||
use datafusion_physical_plan::expressions::Column;
|
||||
use datafusion_physical_plan::projection::ProjectionExec;
|
||||
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
|
||||
@@ -25,12 +26,9 @@ pub fn cast_to_table_schema(
|
||||
return Ok(input);
|
||||
}
|
||||
|
||||
let exprs = build_field_exprs(
|
||||
input_schema.fields(),
|
||||
table_schema.fields(),
|
||||
&|idx| Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>,
|
||||
&input_schema,
|
||||
)?;
|
||||
let exprs = build_field_exprs(input_schema.fields(), table_schema.fields(), &|idx| {
|
||||
Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>
|
||||
})?;
|
||||
|
||||
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
|
||||
.into_iter()
|
||||
@@ -51,7 +49,6 @@ fn build_field_exprs(
|
||||
input_fields: &Fields,
|
||||
table_fields: &Fields,
|
||||
get_input_expr: &dyn Fn(usize) -> Arc<dyn PhysicalExpr>,
|
||||
input_schema: &Schema,
|
||||
) -> Result<Vec<(Arc<dyn PhysicalExpr>, FieldRef)>> {
|
||||
let config = Arc::new(ConfigOptions::default());
|
||||
let mut result = Vec::new();
|
||||
@@ -72,24 +69,19 @@ fn build_field_exprs(
|
||||
(DataType::Struct(in_children), DataType::Struct(tbl_children))
|
||||
if in_children != tbl_children =>
|
||||
{
|
||||
let sub_exprs = build_field_exprs(
|
||||
in_children,
|
||||
tbl_children,
|
||||
&|child_idx| {
|
||||
let child_name = in_children[child_idx].name();
|
||||
Arc::new(ScalarFunctionExpr::new(
|
||||
&format!("get_field({child_name})"),
|
||||
get_field(),
|
||||
vec![
|
||||
input_expr.clone(),
|
||||
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
|
||||
],
|
||||
Arc::new(in_children[child_idx].as_ref().clone()),
|
||||
config.clone(),
|
||||
)) as Arc<dyn PhysicalExpr>
|
||||
},
|
||||
input_schema,
|
||||
)?;
|
||||
let sub_exprs = build_field_exprs(in_children, tbl_children, &|child_idx| {
|
||||
let child_name = in_children[child_idx].name();
|
||||
Arc::new(ScalarFunctionExpr::new(
|
||||
&format!("get_field({child_name})"),
|
||||
get_field(),
|
||||
vec![
|
||||
input_expr.clone(),
|
||||
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
|
||||
],
|
||||
Arc::new(in_children[child_idx].as_ref().clone()),
|
||||
config.clone(),
|
||||
)) as Arc<dyn PhysicalExpr>
|
||||
})?;
|
||||
|
||||
let output_struct_fields: Fields = sub_exprs
|
||||
.iter()
|
||||
@@ -125,17 +117,21 @@ fn build_field_exprs(
|
||||
// Types match: pass through.
|
||||
(inp, tbl) if inp == tbl => input_expr,
|
||||
// Types differ: cast.
|
||||
_ => cast(input_expr, input_schema, table_field.data_type().clone()).map_err(|e| {
|
||||
Error::InvalidInput {
|
||||
// safe: false (the default) means overflow/truncation errors surface at execution time.
|
||||
(_, _) if can_cast_types(input_field.data_type(), table_field.data_type()) => Arc::new(
|
||||
CastExpr::new(input_expr, table_field.data_type().clone(), None),
|
||||
)
|
||||
as Arc<dyn PhysicalExpr>,
|
||||
(inp, tbl) => {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"cannot cast field '{}' from {} to {}: {}",
|
||||
"cannot cast field '{}' from {} to {}",
|
||||
table_field.name(),
|
||||
input_field.data_type(),
|
||||
table_field.data_type(),
|
||||
e
|
||||
inp,
|
||||
tbl,
|
||||
),
|
||||
}
|
||||
})?,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let output_field = Arc::new(Field::new(
|
||||
@@ -153,10 +149,12 @@ fn build_field_exprs(
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::buffer::OffsetBuffer;
|
||||
use arrow_array::{
|
||||
Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
|
||||
Array, Float32Array, Float64Array, Int32Array, Int64Array, ListArray, RecordBatch,
|
||||
StringArray, StructArray, UInt32Array, UInt64Array,
|
||||
};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use arrow_schema::{DataType, Field, Fields, Schema};
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion_catalog::MemTable;
|
||||
use futures::TryStreamExt;
|
||||
@@ -495,4 +493,129 @@ mod tests {
|
||||
assert_eq!(b.value(0), "hello");
|
||||
assert_eq!(b.value(1), "world");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_narrowing_numeric_cast_success() {
|
||||
let input_batch = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
|
||||
vec![Arc::new(UInt64Array::from(vec![1u64, 2, 3]))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
|
||||
|
||||
let plan = plan_from_batch(input_batch).await;
|
||||
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
|
||||
let result = collect(casted).await;
|
||||
|
||||
assert_eq!(result.schema().field(0).data_type(), &DataType::UInt32);
|
||||
let a: &UInt32Array = result.column(0).as_any().downcast_ref().unwrap();
|
||||
assert_eq!(a.values(), &[1u32, 2, 3]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_narrowing_numeric_cast_overflow_errors() {
|
||||
let overflow_val = u32::MAX as u64 + 1;
|
||||
let input_batch = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
|
||||
vec![Arc::new(UInt64Array::from(vec![overflow_val]))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
|
||||
|
||||
let plan = plan_from_batch(input_batch).await;
|
||||
// Planning succeeds — the overflow is only detected at execution time.
|
||||
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
|
||||
|
||||
let ctx = SessionContext::new();
|
||||
let stream = casted.execute(0, ctx.task_ctx()).unwrap();
|
||||
let result: Result<Vec<RecordBatch>, _> = stream.try_collect().await;
|
||||
assert!(result.is_err(), "expected overflow error at execution time");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_struct_field_reorder() {
|
||||
// list<struct<a: Int32, b: Int32>> → list<struct<b: Int64, a: Int64>>
|
||||
// Tests both reordering (a,b → b,a) and element-type widening (Int32 → Int64).
|
||||
let inner_fields: Fields = vec![
|
||||
Field::new("a", DataType::Int32, true),
|
||||
Field::new("b", DataType::Int32, true),
|
||||
]
|
||||
.into();
|
||||
let struct_array = StructArray::from(vec![
|
||||
(
|
||||
Arc::new(inner_fields[0].as_ref().clone()),
|
||||
Arc::new(Int32Array::from(vec![1, 3])) as _,
|
||||
),
|
||||
(
|
||||
Arc::new(inner_fields[1].as_ref().clone()),
|
||||
Arc::new(Int32Array::from(vec![2, 4])) as _,
|
||||
),
|
||||
]);
|
||||
// Offsets: one list element containing two struct rows (0..2).
|
||||
let offsets = OffsetBuffer::from_lengths(vec![2]);
|
||||
let list_array = ListArray::try_new(
|
||||
Arc::new(Field::new("item", DataType::Struct(inner_fields), true)),
|
||||
offsets,
|
||||
Arc::new(struct_array),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let input_batch = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new(
|
||||
"s_list",
|
||||
list_array.data_type().clone(),
|
||||
false,
|
||||
)])),
|
||||
vec![Arc::new(list_array)],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table_inner: Fields = vec![
|
||||
Field::new("b", DataType::Int64, true),
|
||||
Field::new("a", DataType::Int64, true),
|
||||
]
|
||||
.into();
|
||||
let table_schema = Schema::new(vec![Field::new(
|
||||
"s_list",
|
||||
DataType::List(Arc::new(Field::new(
|
||||
"item",
|
||||
DataType::Struct(table_inner),
|
||||
true,
|
||||
))),
|
||||
false,
|
||||
)]);
|
||||
|
||||
let plan = plan_from_batch(input_batch).await;
|
||||
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
|
||||
let result = collect(casted).await;
|
||||
|
||||
let list_col = result
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<ListArray>()
|
||||
.unwrap();
|
||||
let struct_col = list_col
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<StructArray>()
|
||||
.unwrap();
|
||||
assert_eq!(struct_col.num_columns(), 2);
|
||||
|
||||
let b: &Int64Array = struct_col
|
||||
.column_by_name("b")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref()
|
||||
.unwrap();
|
||||
assert_eq!(b.values(), &[2, 4]);
|
||||
let a: &Int64Array = struct_col
|
||||
.column_by_name("a")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref()
|
||||
.unwrap();
|
||||
assert_eq!(a.values(), &[1, 3]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use futures::FutureExt;
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -23,7 +24,7 @@ pub struct DeleteResult {
|
||||
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let delete_result = dataset.delete(predicate).await?;
|
||||
let delete_result = dataset.delete(predicate).boxed().await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
|
||||
@@ -40,8 +40,10 @@ pub async fn execute_query(
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// If namespace client is configured, use server-side query execution
|
||||
if let Some(ref namespace_client) = table.namespace_client {
|
||||
// If server-side query is enabled and namespace client is configured, use server-side query execution
|
||||
if table.server_side_query_enabled
|
||||
&& let Some(ref namespace_client) = table.namespace_client
|
||||
{
|
||||
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
|
||||
}
|
||||
execute_generic_query(table, query, options).await
|
||||
|
||||
Reference in New Issue
Block a user