Compare commits

..

3 Commits

Author SHA1 Message Date
Jack Ye
01c6b9dcb8 feat: add mem_wal flag to merge insert for MemWAL write path
Add support for enabling MemWAL (Memory Write-Ahead Log) mode on merge insert
operations. This allows streaming writes to route through memory nodes for
high-performance buffered writes.

Changes:
- Add `mem_wal` field to MergeInsertBuilder with validation
- Add `x-lancedb-mem-wal-enabled` header for remote requests
- Add Python `mem_wal()` method to LanceMergeInsertBuilder
- Add validation to ensure only upsert pattern is supported:
  - when_matched_update_all() without filter
  - when_not_matched_insert_all()
- Throw NotSupported error for native tables
- Add mem_wal_enabled to ClientConfig for Python/Node bindings

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-16 01:04:04 -07:00
Will Jones
33a13f0738 fixes for breaking changes 2026-03-04 15:27:32 -08:00
Will Jones
cabc75f167 feat: upgrade Lance to 3.0.0-rc.3 2026-03-04 14:57:17 -08:00
49 changed files with 435 additions and 1302 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.27.0-beta.5"
current_version = "0.27.0-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

10
Cargo.lock generated
View File

@@ -4750,7 +4750,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.27.0-beta.4"
version = "0.27.0-beta.3"
dependencies = [
"ahash",
"anyhow",
@@ -4832,7 +4832,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.27.0-beta.4"
version = "0.27.0-beta.3"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4852,24 +4852,20 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.30.0-beta.4"
version = "0.30.0-beta.3"
dependencies = [
"arrow",
"async-trait",
"bytes",
"env_logger",
"futures",
"lance-core",
"lance-io",
"lance-namespace",
"lance-namespace-impls",
"lancedb",
"pin-project",
"pyo3",
"pyo3-async-runtimes",
"pyo3-build-config",
"serde",
"serde_json",
"snafu 0.8.9",
"tokio",
]

View File

@@ -52,21 +52,14 @@ plugins:
options:
docstring_style: numpy
heading_level: 3
show_source: true
show_symbol_type_in_heading: true
show_signature_annotations: true
show_root_heading: true
show_docstring_examples: true
show_docstring_attributes: false
show_docstring_other_parameters: true
show_symbol_type_heading: true
show_labels: false
show_if_no_docstring: true
show_source: false
members_order: source
docstring_section_style: list
signature_crossrefs: true
separate_signature: true
filters:
- "!^_"
import:
# for cross references
- https://arrow.apache.org/docs/objects.inv
@@ -120,7 +113,7 @@ markdown_extensions:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
- markdown.extensions.toc:
toc_depth: 4
toc_depth: 3
permalink: true
permalink_title: Anchor link to this section

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.27.0-beta.5</version>
<version>0.27.0-beta.3</version>
</dependency>
```

View File

@@ -1,4 +1,4 @@
# LanceDB Java Enterprise Client
# LanceDB Java SDK
## Configuration and Initialization

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-beta.5</version>
<version>0.27.0-beta.3</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-beta.5</version>
<version>0.27.0-beta.3</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.27.0-beta.5"
version = "0.27.0-beta.3"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -63,7 +63,6 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
tableFromIPC,
DataType,
Dictionary,
Uint8: ArrowUint8,
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
} = <any>arrow;
type Schema = ApacheArrow["Schema"];
@@ -363,38 +362,6 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
).toEqual(new Float64().toString());
});
it("will infer FixedSizeList<Float32> from Float32Array values", async function () {
const table = makeArrowTable([
{ id: "a", vector: new Float32Array([0.1, 0.2, 0.3]) },
{ id: "b", vector: new Float32Array([0.4, 0.5, 0.6]) },
]);
expect(DataType.isFixedSizeList(table.getChild("vector")?.type)).toBe(
true,
);
const vectorType = table.getChild("vector")?.type;
expect(vectorType.listSize).toBe(3);
expect(vectorType.children[0].type.toString()).toEqual(
new Float32().toString(),
);
});
it("will infer FixedSizeList<Uint8> from Uint8Array values", async function () {
const table = makeArrowTable([
{ id: "a", vector: new Uint8Array([1, 2, 3]) },
{ id: "b", vector: new Uint8Array([4, 5, 6]) },
]);
expect(DataType.isFixedSizeList(table.getChild("vector")?.type)).toBe(
true,
);
const vectorType = table.getChild("vector")?.type;
expect(vectorType.listSize).toBe(3);
expect(vectorType.children[0].type.toString()).toEqual(
new ArrowUint8().toString(),
);
});
it("will use dictionary encoded strings if asked", async function () {
const table = makeArrowTable([{ str: "hello" }]);
expect(DataType.isUtf8(table.getChild("str")?.type)).toBe(true);

View File

@@ -2204,36 +2204,3 @@ describe("when creating an empty table", () => {
expect((actualSchema.fields[1].type as Float64).precision).toBe(2);
});
});
// Ensure we can create float32 arrays without using Arrow
// by utilizing native JS TypedArray support
//
// https://github.com/lancedb/lancedb/issues/3115
describe("when creating a table with Float32Array vectors", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => {
tmpDir.removeCallback();
});
it("should persist Float32Array as FixedSizeList<Float32> in the LanceDB schema", async () => {
const db = await connect(tmpDir.name);
const table = await db.createTable("test", [
{ id: "a", vector: new Float32Array([0.1, 0.2, 0.3]) },
{ id: "b", vector: new Float32Array([0.4, 0.5, 0.6]) },
]);
const schema = await table.schema();
const vectorField = schema.fields.find((f) => f.name === "vector");
expect(vectorField).toBeDefined();
expect(vectorField!.type).toBeInstanceOf(FixedSizeList);
const fsl = vectorField!.type as FixedSizeList;
expect(fsl.listSize).toBe(3);
expect(fsl.children[0].type.typeId).toBe(Type.Float);
// precision: HALF=0, SINGLE=1, DOUBLE=2
expect((fsl.children[0].type as Float32).precision).toBe(1);
});
});

View File

@@ -20,8 +20,6 @@ import {
Float32,
Float64,
Int,
Int8,
Int16,
Int32,
Int64,
LargeBinary,
@@ -37,8 +35,6 @@ import {
Timestamp,
Type,
Uint8,
Uint16,
Uint32,
Utf8,
Vector,
makeVector as arrowMakeVector,
@@ -533,8 +529,7 @@ function isObject(value: unknown): value is Record<string, unknown> {
!(value instanceof Date) &&
!(value instanceof Set) &&
!(value instanceof Map) &&
!(value instanceof Buffer) &&
!ArrayBuffer.isView(value)
!(value instanceof Buffer)
);
}
@@ -593,13 +588,6 @@ function inferType(
return new Bool();
} else if (value instanceof Buffer) {
return new Binary();
} else if (ArrayBuffer.isView(value) && !(value instanceof DataView)) {
const info = typedArrayToArrowType(value);
if (info !== undefined) {
const child = new Field("item", info.elementType, true);
return new FixedSizeList(info.length, child);
}
return undefined;
} else if (Array.isArray(value)) {
if (value.length === 0) {
return undefined; // Without any values we can't infer the type
@@ -758,32 +746,6 @@ function makeListVector(lists: unknown[][]): Vector<unknown> {
return listBuilder.finish().toVector();
}
/**
* Map a JS TypedArray instance to the corresponding Arrow element DataType
* and its length. Returns undefined if the value is not a recognized TypedArray.
*/
function typedArrayToArrowType(
value: ArrayBufferView,
): { elementType: DataType; length: number } | undefined {
if (value instanceof Float32Array)
return { elementType: new Float32(), length: value.length };
if (value instanceof Float64Array)
return { elementType: new Float64(), length: value.length };
if (value instanceof Uint8Array)
return { elementType: new Uint8(), length: value.length };
if (value instanceof Uint16Array)
return { elementType: new Uint16(), length: value.length };
if (value instanceof Uint32Array)
return { elementType: new Uint32(), length: value.length };
if (value instanceof Int8Array)
return { elementType: new Int8(), length: value.length };
if (value instanceof Int16Array)
return { elementType: new Int16(), length: value.length };
if (value instanceof Int32Array)
return { elementType: new Int32(), length: value.length };
return undefined;
}
/** Helper function to convert an Array of JS values to an Arrow Vector */
function makeVector(
values: unknown[],
@@ -852,16 +814,6 @@ function makeVector(
"makeVector cannot infer the type if all values are null or undefined",
);
}
if (ArrayBuffer.isView(sampleValue) && !(sampleValue instanceof DataView)) {
const info = typedArrayToArrowType(sampleValue);
if (info !== undefined) {
const fslType = new FixedSizeList(
info.length,
new Field("item", info.elementType, true),
);
return vectorFromArray(values, fslType);
}
}
if (Array.isArray(sampleValue)) {
// Default Arrow inference doesn't handle list types
return makeListVector(values as unknown[][]);

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.4",
"version": "0.27.0-beta.3",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.4",
"version": "0.27.0-beta.3",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.27.0-beta.5",
"version": "0.27.0-beta.3",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -145,6 +145,7 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: config.id_delimiter,
tls_config: config.tls_config.map(Into::into),
header_provider: None, // the header provider is set separately later
mem_wal_enabled: None, // mem_wal is set per-operation in merge_insert
}
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.0-beta.5"
current_version = "0.30.0-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.30.0-beta.5"
version = "0.30.0-beta.3"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true
@@ -16,11 +16,9 @@ crate-type = ["cdylib"]
[dependencies]
arrow = { version = "57.2", features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
lance-core.workspace = true
lance-namespace.workspace = true
lance-namespace-impls.workspace = true
lance-io.workspace = true
env_logger.workspace = true
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
@@ -30,8 +28,6 @@ pyo3-async-runtimes = { version = "0.26", features = [
] }
pin-project = "1.1.5"
futures.workspace = true
serde = "1"
serde_json = "1"
snafu.workspace = true
tokio = { version = "1.40", features = ["sync"] }

View File

@@ -1,4 +1,4 @@
# LanceDB Python SDK
# LanceDB
A Python library for [LanceDB](https://github.com/lancedb/lancedb).

View File

@@ -45,7 +45,7 @@ repository = "https://github.com/lancedb/lancedb"
[project.optional-dependencies]
pylance = [
"pylance>=4.0.0b7",
"pylance>=1.0.0b14",
]
tests = [
"aiohttp",
@@ -59,9 +59,9 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=4.0.0b7",
"pylance>=1.0.0b14,<3.0.0",
"requests",
"datafusion>=52,<53",
"datafusion<52",
]
dev = [
"ruff",

View File

@@ -8,7 +8,7 @@ from abc import abstractmethod
from datetime import timedelta
from pathlib import Path
import sys
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -1541,8 +1541,6 @@ class AsyncConnection(object):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> AsyncTable:
"""Open a Lance Table in the database.
@@ -1575,9 +1573,6 @@ class AsyncConnection(object):
The explicit location (URI) of the table. If provided, the table will be
opened from this location instead of deriving it from the database URI
and table name.
managed_versioning: bool, optional
Whether managed versioning is enabled for this table. If provided,
avoids a redundant describe_table call when namespace_client is set.
Returns
-------
@@ -1592,8 +1587,6 @@ class AsyncConnection(object):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
return AsyncTable(table)

View File

@@ -34,6 +34,7 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._mem_wal = False
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -96,6 +97,47 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def mem_wal(self, enabled: bool = True) -> LanceMergeInsertBuilder:
"""
Enable MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
When enabled, the merge insert will route data through a memory node service
that buffers writes before flushing to storage. This is only supported for
remote (LanceDB Cloud) tables.
**Important:** MemWAL only supports the upsert pattern. You must use:
- `when_matched_update_all()` (without a filter condition)
- `when_not_matched_insert_all()`
MemWAL does NOT support:
- `when_matched_update_all(where=...)` with a filter condition
- `when_not_matched_by_source_delete()`
Parameters
----------
enabled: bool
Whether to enable MemWAL mode. Defaults to `True`.
Raises
------
NotImplementedError
If used on a native (local) table, as MemWAL is only supported for
remote tables.
ValueError
If the merge insert pattern is not supported by MemWAL.
Examples
--------
>>> # Correct usage with MemWAL
>>> table.merge_insert(["id"]) \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .mem_wal() \\
... .execute(new_data)
"""
self._mem_wal = enabled
return self
def execute(
self,
new_data: DATA,

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import asyncio
import sys
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Dict, Iterable, List, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -240,7 +240,7 @@ class LanceNamespaceDBConnection(DBConnection):
session : Optional[Session]
A session to use for this connection
"""
self._namespace_client = namespace
self._ns = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@@ -269,7 +269,7 @@ class LanceNamespaceDBConnection(DBConnection):
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._namespace_client.list_tables(request)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
@override
@@ -309,9 +309,7 @@ class LanceNamespaceDBConnection(DBConnection):
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._namespace_client.describe_table(
describe_request
)
describe_response = self._ns.describe_table(describe_request)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
@@ -325,7 +323,7 @@ class LanceNamespaceDBConnection(DBConnection):
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._namespace_client.declare_table(declare_request)
declare_response = self._ns.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
@@ -355,7 +353,7 @@ class LanceNamespaceDBConnection(DBConnection):
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and namespace_storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._namespace_client,
namespace=self._ns,
table_id=table_id,
)
@@ -373,7 +371,6 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
location=location,
namespace_client=self._namespace_client,
)
return tbl
@@ -392,7 +389,7 @@ class LanceNamespaceDBConnection(DBConnection):
namespace = []
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._namespace_client.describe_table(request)
response = self._ns.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -405,14 +402,10 @@ class LanceNamespaceDBConnection(DBConnection):
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._namespace_client,
namespace=self._ns,
table_id=table_id,
)
# Pass managed_versioning to avoid redundant describe_table call in Rust.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
return self._lance_table_from_uri(
name,
response.location,
@@ -420,8 +413,6 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
)
@override
@@ -431,7 +422,7 @@ class LanceNamespaceDBConnection(DBConnection):
namespace = []
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._namespace_client.drop_table(request)
self._ns.drop_table(request)
@override
def rename_table(
@@ -493,7 +484,7 @@ class LanceNamespaceDBConnection(DBConnection):
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._namespace_client.list_namespaces(request)
response = self._ns.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
@@ -529,7 +520,7 @@ class LanceNamespaceDBConnection(DBConnection):
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._namespace_client.create_namespace(request)
response = self._ns.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -564,7 +555,7 @@ class LanceNamespaceDBConnection(DBConnection):
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._namespace_client.drop_namespace(request)
response = self._ns.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
@@ -590,7 +581,7 @@ class LanceNamespaceDBConnection(DBConnection):
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._namespace_client.describe_namespace(request)
response = self._ns.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -624,7 +615,7 @@ class LanceNamespaceDBConnection(DBConnection):
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._namespace_client.list_tables(request)
response = self._ns.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
@@ -639,8 +630,6 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> LanceTable:
# Open a table directly from a URI using the location parameter
# Note: storage_options should already be merged by the caller
@@ -654,8 +643,6 @@ class LanceNamespaceDBConnection(DBConnection):
)
# Open the table using the temporary connection with the location parameter
# Pass namespace_client to enable managed versioning support
# Pass managed_versioning to avoid redundant describe_table call
return LanceTable.open(
temp_conn,
name,
@@ -664,8 +651,6 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=table_uri,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
@@ -700,7 +685,7 @@ class AsyncLanceNamespaceDBConnection:
session : Optional[Session]
A session to use for this connection
"""
self._namespace_client = namespace
self._ns = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@@ -728,7 +713,7 @@ class AsyncLanceNamespaceDBConnection:
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._namespace_client.list_tables(request)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
async def create_table(
@@ -765,9 +750,7 @@ class AsyncLanceNamespaceDBConnection:
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._namespace_client.describe_table(
describe_request
)
describe_response = self._ns.describe_table(describe_request)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
@@ -781,7 +764,7 @@ class AsyncLanceNamespaceDBConnection:
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._namespace_client.declare_table(declare_request)
declare_response = self._ns.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
@@ -814,7 +797,7 @@ class AsyncLanceNamespaceDBConnection:
and namespace_storage_options is not None
):
provider = LanceNamespaceStorageOptionsProvider(
namespace=self._namespace_client,
namespace=self._ns,
table_id=table_id,
)
else:
@@ -834,7 +817,6 @@ class AsyncLanceNamespaceDBConnection:
storage_options=merged_storage_options,
storage_options_provider=provider,
location=location,
namespace_client=self._namespace_client,
)
lance_table = await asyncio.to_thread(_create_table)
@@ -855,7 +837,7 @@ class AsyncLanceNamespaceDBConnection:
namespace = []
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._namespace_client.describe_table(request)
response = self._ns.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -867,14 +849,10 @@ class AsyncLanceNamespaceDBConnection:
# Create a storage options provider if not provided by user
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._namespace_client,
namespace=self._ns,
table_id=table_id,
)
# Capture managed_versioning from describe response.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
# Open table in a thread
def _open_table():
temp_conn = LanceDBConnection(
@@ -892,8 +870,6 @@ class AsyncLanceNamespaceDBConnection:
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=response.location,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
)
lance_table = await asyncio.to_thread(_open_table)
@@ -905,7 +881,7 @@ class AsyncLanceNamespaceDBConnection:
namespace = []
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._namespace_client.drop_table(request)
self._ns.drop_table(request)
async def rename_table(
self,
@@ -967,7 +943,7 @@ class AsyncLanceNamespaceDBConnection:
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._namespace_client.list_namespaces(request)
response = self._ns.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
@@ -1002,7 +978,7 @@ class AsyncLanceNamespaceDBConnection:
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._namespace_client.create_namespace(request)
response = self._ns.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -1036,7 +1012,7 @@ class AsyncLanceNamespaceDBConnection:
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._namespace_client.drop_namespace(request)
response = self._ns.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
@@ -1063,7 +1039,7 @@ class AsyncLanceNamespaceDBConnection:
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._namespace_client.describe_namespace(request)
response = self._ns.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -1096,7 +1072,7 @@ class AsyncLanceNamespaceDBConnection:
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._namespace_client.list_tables(request)
response = self._ns.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,

View File

@@ -1746,8 +1746,6 @@ class LanceTable(Table):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
_async: AsyncTable = None,
):
if namespace is None:
@@ -1755,7 +1753,6 @@ class LanceTable(Table):
self._conn = connection
self._namespace = namespace
self._location = location # Store location for use in _dataset_path
self._namespace_client = namespace_client
if _async is not None:
self._table = _async
else:
@@ -1767,8 +1764,6 @@ class LanceTable(Table):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
)
@@ -1811,8 +1806,6 @@ class LanceTable(Table):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
):
if namespace is None:
namespace = []
@@ -1824,8 +1817,6 @@ class LanceTable(Table):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
# check the dataset exists
@@ -1857,16 +1848,6 @@ class LanceTable(Table):
"Please install with `pip install pylance`."
)
if self._namespace_client is not None:
table_id = self._namespace + [self.name]
return lance.dataset(
version=self.version,
storage_options=self._conn.storage_options,
namespace=self._namespace_client,
table_id=table_id,
**kwargs,
)
return lance.dataset(
self._dataset_path,
version=self.version,
@@ -2732,7 +2713,6 @@ class LanceTable(Table):
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
):
"""
Create a new table.
@@ -2793,7 +2773,6 @@ class LanceTable(Table):
self._conn = db
self._namespace = namespace
self._location = location
self._namespace_client = namespace_client
if data_storage_version is not None:
warnings.warn(
@@ -4202,6 +4181,7 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
mem_wal=merge._mem_wal,
),
)

View File

@@ -326,24 +326,6 @@ def test_add_struct(mem_db: DBConnection):
table = mem_db.create_table("test2", schema=schema)
table.add(data)
struct_type = pa.struct(
[
("b", pa.int64()),
("a", pa.int64()),
]
)
expected = pa.table(
{
"s_list": [
[
pa.scalar({"b": 1, "a": 2}, type=struct_type),
pa.scalar({"b": 4, "a": None}, type=struct_type),
]
],
}
)
assert table.to_arrow() == expected
def test_add_subschema(mem_db: DBConnection):
schema = pa.schema(

View File

@@ -17,8 +17,7 @@ use pyo3::{
use pyo3_async_runtimes::tokio::future_into_py;
use crate::{
error::PythonErrorExt, namespace::extract_namespace_arc,
storage_options::py_object_to_storage_options_provider, table::Table,
error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table,
};
#[pyclass]
@@ -183,8 +182,7 @@ impl Connection {
})
}
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None, namespace_client=None, managed_versioning=None))]
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))]
pub fn open_table(
self_: PyRef<'_, Self>,
name: String,
@@ -193,13 +191,11 @@ impl Connection {
storage_options_provider: Option<Py<PyAny>>,
index_cache_size: Option<u32>,
location: Option<String>,
namespace_client: Option<Py<PyAny>>,
managed_versioning: Option<bool>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut builder = inner.open_table(name);
builder = builder.namespace(namespace.clone());
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
@@ -213,20 +209,6 @@ impl Connection {
if let Some(location) = location {
builder = builder.location(location);
}
// Extract namespace client from Python object if provided
let ns_client = if let Some(ns_obj) = namespace_client {
let py = self_.py();
Some(extract_namespace_arc(py, ns_obj)?)
} else {
None
};
if let Some(ns_client) = ns_client {
builder = builder.namespace_client(ns_client);
}
// Pass managed_versioning if provided to avoid redundant describe_table call
if let Some(enabled) = managed_versioning {
builder = builder.managed_versioning(enabled);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
@@ -524,6 +506,7 @@ pub struct PyClientConfig {
id_delimiter: Option<String>,
tls_config: Option<PyClientTlsConfig>,
header_provider: Option<Py<PyAny>>,
mem_wal_enabled: Option<bool>,
}
#[derive(FromPyObject)]
@@ -608,6 +591,7 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: value.id_delimiter,
tls_config: value.tls_config.map(Into::into),
header_provider,
mem_wal_enabled: value.mem_wal_enabled,
}
}
}

View File

@@ -23,7 +23,6 @@ pub mod connection;
pub mod error;
pub mod header;
pub mod index;
pub mod namespace;
pub mod permutation;
pub mod query;
pub mod session;

View File

@@ -1,696 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Namespace utilities for Python bindings
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use lance_namespace::LanceNamespace as LanceNamespaceTrait;
use lance_namespace::models::*;
use pyo3::prelude::*;
use pyo3::types::PyDict;
/// Wrapper that allows any Python object implementing LanceNamespace protocol
/// to be used as a Rust LanceNamespace.
///
/// This is similar to PyLanceNamespace in lance's Python bindings - it wraps a Python
/// object and calls back into Python when namespace methods are invoked.
pub struct PyLanceNamespace {
py_namespace: Arc<Py<PyAny>>,
namespace_id: String,
}
impl PyLanceNamespace {
/// Create a new PyLanceNamespace wrapper around a Python namespace object.
pub fn new(_py: Python<'_>, py_namespace: &Bound<'_, PyAny>) -> PyResult<Self> {
let namespace_id = py_namespace
.call_method0("namespace_id")?
.extract::<String>()?;
Ok(Self {
py_namespace: Arc::new(py_namespace.clone().unbind()),
namespace_id,
})
}
/// Create an Arc<dyn LanceNamespace> from a Python namespace object.
pub fn create_arc(
py: Python<'_>,
py_namespace: &Bound<'_, PyAny>,
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
let wrapper = Self::new(py, py_namespace)?;
Ok(Arc::new(wrapper))
}
}
impl std::fmt::Debug for PyLanceNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PyLanceNamespace {{ id: {} }}", self.namespace_id)
}
}
/// Get or create the DictWithModelDump class in Python.
/// This class acts like a dict but also has model_dump() method.
/// This allows it to work with both:
/// - depythonize (which expects a dict/Mapping)
/// - Python code that calls .model_dump() (like DirectoryNamespace wrapper)
fn get_dict_with_model_dump_class(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
// Use a module-level cache via __builtins__
let builtins = py.import("builtins")?;
if builtins.hasattr("_DictWithModelDump")? {
return builtins.getattr("_DictWithModelDump");
}
// Create the class using exec
let locals = PyDict::new(py);
py.run(
c"class DictWithModelDump(dict):
def model_dump(self):
return dict(self)",
None,
Some(&locals),
)?;
let class = locals.get_item("DictWithModelDump")?.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Failed to create DictWithModelDump class")
})?;
// Cache it
builtins.setattr("_DictWithModelDump", &class)?;
Ok(class)
}
/// Helper to call a Python namespace method with JSON serialization.
/// For methods that take a request and return a response.
/// Uses DictWithModelDump to pass a dict that also has model_dump() method,
/// making it compatible with both depythonize and Python wrappers.
async fn call_py_method<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: serde::de::DeserializeOwned + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
"Failed to serialize request for {}: {}",
method_name, e
))
})?;
let response_json = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump so it works with both depythonize and .model_dump()
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
// Convert response to dict, then to JSON
// Pydantic models have model_dump() method
let result_dict = if result.bind(py).hasattr("model_dump")? {
result.call_method0(py, "model_dump")?
} else {
result
};
let response_json: String = json_module
.call_method1("dumps", (result_dict,))?
.extract()?;
Ok::<_, PyErr>(response_json)
})
})
.await
.map_err(|e| lance_core::Error::io(format!("Task join error for {}: {}", method_name, e)))?
.map_err(|e: PyErr| lance_core::Error::io(format!("Python error in {}: {}", method_name, e)))?;
serde_json::from_str(&response_json).map_err(|e| {
lance_core::Error::io(format!(
"Failed to deserialize response from {}: {}",
method_name, e
))
})
}
/// Helper for methods that return () on success
async fn call_py_method_unit<Req>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<()>
where
Req: serde::Serialize + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
"Failed to serialize request for {}: {}",
method_name, e
))
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
py_namespace.call_method1(py, method_name, (request_arg,))?;
Ok::<_, PyErr>(())
})
})
.await
.map_err(|e| lance_core::Error::io(format!("Task join error for {}: {}", method_name, e)))?
.map_err(|e: PyErr| lance_core::Error::io(format!("Python error in {}: {}", method_name, e)))
}
/// Helper for methods that return a primitive type
async fn call_py_method_primitive<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
"Failed to serialize request for {}: {}",
method_name, e
))
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let value: Resp = result.extract(py)?;
Ok::<_, PyErr>(value)
})
})
.await
.map_err(|e| lance_core::Error::io(format!("Task join error for {}: {}", method_name, e)))?
.map_err(|e: PyErr| lance_core::Error::io(format!("Python error in {}: {}", method_name, e)))
}
/// Helper for methods that return Bytes
async fn call_py_method_bytes<Req>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Bytes>
where
Req: serde::Serialize + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
"Failed to serialize request for {}: {}",
method_name, e
))
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let bytes_data: Vec<u8> = result.extract(py)?;
Ok::<_, PyErr>(Bytes::from(bytes_data))
})
})
.await
.map_err(|e| lance_core::Error::io(format!("Task join error for {}: {}", method_name, e)))?
.map_err(|e: PyErr| lance_core::Error::io(format!("Python error in {}: {}", method_name, e)))
}
/// Helper for methods that take request + data and return a response
async fn call_py_method_with_data<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
data: Bytes,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: serde::de::DeserializeOwned + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
"Failed to serialize request for {}: {}",
method_name, e
))
})?;
let response_json = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Pass request and bytes to Python method
let py_bytes = pyo3::types::PyBytes::new(py, &data);
let result = py_namespace.call_method1(py, method_name, (request_arg, py_bytes))?;
// Convert response dict to JSON
let response_json: String = json_module.call_method1("dumps", (result,))?.extract()?;
Ok::<_, PyErr>(response_json)
})
})
.await
.map_err(|e| lance_core::Error::io(format!("Task join error for {}: {}", method_name, e)))?
.map_err(|e: PyErr| lance_core::Error::io(format!("Python error in {}: {}", method_name, e)))?;
serde_json::from_str(&response_json).map_err(|e| {
lance_core::Error::io(format!(
"Failed to deserialize response from {}: {}",
method_name, e
))
})
}
#[async_trait]
impl LanceNamespaceTrait for PyLanceNamespace {
fn namespace_id(&self) -> String {
self.namespace_id.clone()
}
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> lance_core::Result<ListNamespacesResponse> {
call_py_method(self.py_namespace.clone(), "list_namespaces", request).await
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> lance_core::Result<DescribeNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "describe_namespace", request).await
}
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> lance_core::Result<CreateNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "create_namespace", request).await
}
async fn drop_namespace(
&self,
request: DropNamespaceRequest,
) -> lance_core::Result<DropNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "drop_namespace", request).await
}
async fn namespace_exists(&self, request: NamespaceExistsRequest) -> lance_core::Result<()> {
call_py_method_unit(self.py_namespace.clone(), "namespace_exists", request).await
}
async fn list_tables(
&self,
request: ListTablesRequest,
) -> lance_core::Result<ListTablesResponse> {
call_py_method(self.py_namespace.clone(), "list_tables", request).await
}
async fn describe_table(
&self,
request: DescribeTableRequest,
) -> lance_core::Result<DescribeTableResponse> {
call_py_method(self.py_namespace.clone(), "describe_table", request).await
}
async fn register_table(
&self,
request: RegisterTableRequest,
) -> lance_core::Result<RegisterTableResponse> {
call_py_method(self.py_namespace.clone(), "register_table", request).await
}
async fn table_exists(&self, request: TableExistsRequest) -> lance_core::Result<()> {
call_py_method_unit(self.py_namespace.clone(), "table_exists", request).await
}
async fn drop_table(&self, request: DropTableRequest) -> lance_core::Result<DropTableResponse> {
call_py_method(self.py_namespace.clone(), "drop_table", request).await
}
async fn deregister_table(
&self,
request: DeregisterTableRequest,
) -> lance_core::Result<DeregisterTableResponse> {
call_py_method(self.py_namespace.clone(), "deregister_table", request).await
}
async fn count_table_rows(&self, request: CountTableRowsRequest) -> lance_core::Result<i64> {
call_py_method_primitive(self.py_namespace.clone(), "count_table_rows", request).await
}
async fn create_table(
&self,
request: CreateTableRequest,
request_data: Bytes,
) -> lance_core::Result<CreateTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"create_table",
request,
request_data,
)
.await
}
async fn declare_table(
&self,
request: DeclareTableRequest,
) -> lance_core::Result<DeclareTableResponse> {
call_py_method(self.py_namespace.clone(), "declare_table", request).await
}
async fn insert_into_table(
&self,
request: InsertIntoTableRequest,
request_data: Bytes,
) -> lance_core::Result<InsertIntoTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"insert_into_table",
request,
request_data,
)
.await
}
async fn merge_insert_into_table(
&self,
request: MergeInsertIntoTableRequest,
request_data: Bytes,
) -> lance_core::Result<MergeInsertIntoTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"merge_insert_into_table",
request,
request_data,
)
.await
}
async fn update_table(
&self,
request: UpdateTableRequest,
) -> lance_core::Result<UpdateTableResponse> {
call_py_method(self.py_namespace.clone(), "update_table", request).await
}
async fn delete_from_table(
&self,
request: DeleteFromTableRequest,
) -> lance_core::Result<DeleteFromTableResponse> {
call_py_method(self.py_namespace.clone(), "delete_from_table", request).await
}
async fn query_table(&self, request: QueryTableRequest) -> lance_core::Result<Bytes> {
call_py_method_bytes(self.py_namespace.clone(), "query_table", request).await
}
async fn create_table_index(
&self,
request: CreateTableIndexRequest,
) -> lance_core::Result<CreateTableIndexResponse> {
call_py_method(self.py_namespace.clone(), "create_table_index", request).await
}
async fn list_table_indices(
&self,
request: ListTableIndicesRequest,
) -> lance_core::Result<ListTableIndicesResponse> {
call_py_method(self.py_namespace.clone(), "list_table_indices", request).await
}
async fn describe_table_index_stats(
&self,
request: DescribeTableIndexStatsRequest,
) -> lance_core::Result<DescribeTableIndexStatsResponse> {
call_py_method(
self.py_namespace.clone(),
"describe_table_index_stats",
request,
)
.await
}
async fn describe_transaction(
&self,
request: DescribeTransactionRequest,
) -> lance_core::Result<DescribeTransactionResponse> {
call_py_method(self.py_namespace.clone(), "describe_transaction", request).await
}
async fn alter_transaction(
&self,
request: AlterTransactionRequest,
) -> lance_core::Result<AlterTransactionResponse> {
call_py_method(self.py_namespace.clone(), "alter_transaction", request).await
}
async fn create_table_scalar_index(
&self,
request: CreateTableIndexRequest,
) -> lance_core::Result<CreateTableScalarIndexResponse> {
call_py_method(
self.py_namespace.clone(),
"create_table_scalar_index",
request,
)
.await
}
async fn drop_table_index(
&self,
request: DropTableIndexRequest,
) -> lance_core::Result<DropTableIndexResponse> {
call_py_method(self.py_namespace.clone(), "drop_table_index", request).await
}
async fn list_all_tables(
&self,
request: ListTablesRequest,
) -> lance_core::Result<ListTablesResponse> {
call_py_method(self.py_namespace.clone(), "list_all_tables", request).await
}
async fn restore_table(
&self,
request: RestoreTableRequest,
) -> lance_core::Result<RestoreTableResponse> {
call_py_method(self.py_namespace.clone(), "restore_table", request).await
}
async fn rename_table(
&self,
request: RenameTableRequest,
) -> lance_core::Result<RenameTableResponse> {
call_py_method(self.py_namespace.clone(), "rename_table", request).await
}
async fn list_table_versions(
&self,
request: ListTableVersionsRequest,
) -> lance_core::Result<ListTableVersionsResponse> {
call_py_method(self.py_namespace.clone(), "list_table_versions", request).await
}
async fn create_table_version(
&self,
request: CreateTableVersionRequest,
) -> lance_core::Result<CreateTableVersionResponse> {
call_py_method(self.py_namespace.clone(), "create_table_version", request).await
}
async fn describe_table_version(
&self,
request: DescribeTableVersionRequest,
) -> lance_core::Result<DescribeTableVersionResponse> {
call_py_method(self.py_namespace.clone(), "describe_table_version", request).await
}
async fn batch_delete_table_versions(
&self,
request: BatchDeleteTableVersionsRequest,
) -> lance_core::Result<BatchDeleteTableVersionsResponse> {
call_py_method(
self.py_namespace.clone(),
"batch_delete_table_versions",
request,
)
.await
}
async fn update_table_schema_metadata(
&self,
request: UpdateTableSchemaMetadataRequest,
) -> lance_core::Result<UpdateTableSchemaMetadataResponse> {
call_py_method(
self.py_namespace.clone(),
"update_table_schema_metadata",
request,
)
.await
}
async fn get_table_stats(
&self,
request: GetTableStatsRequest,
) -> lance_core::Result<GetTableStatsResponse> {
call_py_method(self.py_namespace.clone(), "get_table_stats", request).await
}
async fn explain_table_query_plan(
&self,
request: ExplainTableQueryPlanRequest,
) -> lance_core::Result<String> {
call_py_method_primitive(
self.py_namespace.clone(),
"explain_table_query_plan",
request,
)
.await
}
async fn analyze_table_query_plan(
&self,
request: AnalyzeTableQueryPlanRequest,
) -> lance_core::Result<String> {
call_py_method_primitive(
self.py_namespace.clone(),
"analyze_table_query_plan",
request,
)
.await
}
async fn alter_table_add_columns(
&self,
request: AlterTableAddColumnsRequest,
) -> lance_core::Result<AlterTableAddColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_add_columns",
request,
)
.await
}
async fn alter_table_alter_columns(
&self,
request: AlterTableAlterColumnsRequest,
) -> lance_core::Result<AlterTableAlterColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_alter_columns",
request,
)
.await
}
async fn alter_table_drop_columns(
&self,
request: AlterTableDropColumnsRequest,
) -> lance_core::Result<AlterTableDropColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_drop_columns",
request,
)
.await
}
async fn list_table_tags(
&self,
request: ListTableTagsRequest,
) -> lance_core::Result<ListTableTagsResponse> {
call_py_method(self.py_namespace.clone(), "list_table_tags", request).await
}
async fn create_table_tag(
&self,
request: CreateTableTagRequest,
) -> lance_core::Result<CreateTableTagResponse> {
call_py_method(self.py_namespace.clone(), "create_table_tag", request).await
}
async fn delete_table_tag(
&self,
request: DeleteTableTagRequest,
) -> lance_core::Result<DeleteTableTagResponse> {
call_py_method(self.py_namespace.clone(), "delete_table_tag", request).await
}
async fn update_table_tag(
&self,
request: UpdateTableTagRequest,
) -> lance_core::Result<UpdateTableTagResponse> {
call_py_method(self.py_namespace.clone(), "update_table_tag", request).await
}
async fn get_table_tag_version(
&self,
request: GetTableTagVersionRequest,
) -> lance_core::Result<GetTableTagVersionResponse> {
call_py_method(self.py_namespace.clone(), "get_table_tag_version", request).await
}
}
/// Convert Python dict to HashMap<String, String>
#[allow(dead_code)]
fn dict_to_hashmap(dict: &Bound<'_, PyDict>) -> PyResult<HashMap<String, String>> {
let mut map = HashMap::new();
for (key, value) in dict.iter() {
let key_str: String = key.extract()?;
let value_str: String = value.extract()?;
map.insert(key_str, value_str);
}
Ok(map)
}
/// Extract an Arc<dyn LanceNamespace> from a Python namespace object.
///
/// This function wraps any Python namespace object with PyLanceNamespace.
/// The PyLanceNamespace wrapper uses DictWithModelDump to pass requests,
/// which works with both:
/// - Native namespaces (DirectoryNamespace, RestNamespace) that use depythonize (expects dict)
/// - Custom Python implementations that call .model_dump() on the request
pub fn extract_namespace_arc(
py: Python<'_>,
ns: Py<PyAny>,
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
let ns_ref = ns.bind(py);
PyLanceNamespace::create_arc(py, ns_ref)
}

View File

@@ -710,6 +710,9 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(mem_wal) = parameters.mem_wal {
builder.mem_wal(mem_wal);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -870,6 +873,7 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
mem_wal: Option<bool>,
}
#[pyclass]

4
python/uv.lock generated
View File

@@ -2006,7 +2006,7 @@ requires-dist = [
{ name = "botocore", marker = "extra == 'embeddings'", specifier = ">=1.31.57" },
{ name = "cohere", marker = "extra == 'embeddings'" },
{ name = "colpali-engine", marker = "extra == 'embeddings'", specifier = ">=0.3.10" },
{ name = "datafusion", marker = "extra == 'tests'", specifier = "<52" },
{ name = "datafusion", marker = "extra == 'tests'" },
{ name = "deprecation" },
{ name = "duckdb", marker = "extra == 'tests'" },
{ name = "google-generativeai", marker = "extra == 'embeddings'" },
@@ -2035,7 +2035,7 @@ requires-dist = [
{ name = "pyarrow-stubs", marker = "extra == 'tests'" },
{ name = "pydantic", specifier = ">=1.10" },
{ name = "pylance", marker = "extra == 'pylance'", specifier = ">=1.0.0b14" },
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14,<3.0.0" },
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14" },
{ name = "pyright", marker = "extra == 'dev'" },
{ name = "pytest", marker = "extra == 'tests'" },
{ name = "pytest-asyncio", marker = "extra == 'tests'" },

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.27.0-beta.5"
version = "0.27.0-beta.3"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -1,4 +1,4 @@
# LanceDB Rust SDK
# LanceDB Rust
<a href="https://crates.io/crates/vectordb">![img](https://img.shields.io/crates/v/vectordb)</a>
<a href="https://docs.rs/vectordb/latest/vectordb/">![Docs.rs](https://img.shields.io/docsrs/vectordb)</a>

View File

@@ -136,7 +136,6 @@ impl OpenTableBuilder {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
},
embedding_registry,
}
@@ -236,29 +235,6 @@ impl OpenTableBuilder {
self
}
/// Set a namespace client for managed versioning support.
///
/// When a namespace client is provided and the table has `managed_versioning` enabled,
/// the table will use the namespace's commit handler to notify the namespace of
/// version changes. This enables features like event emission for table modifications.
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
self.request.namespace_client = Some(client);
self
}
/// Set whether managed versioning is enabled for this table.
///
/// When set to `Some(true)`, the table will use namespace-managed commits.
/// When set to `Some(false)`, the table will use local commits even if namespace_client is set.
/// When set to `None` (default), the value will be fetched from the namespace if namespace_client is set.
///
/// This is typically set when the caller has already queried the namespace and knows the
/// managed_versioning status, avoiding a redundant describe_table call.
pub fn managed_versioning(mut self, enabled: bool) -> Self {
self.request.managed_versioning = Some(enabled);
self
}
/// Open the table
pub async fn execute(self) -> Result<Table> {
let table = self.parent.open_table(self.request).await?;
@@ -318,12 +294,6 @@ impl CloneTableBuilder {
self
}
/// Set a namespace client for managed versioning support.
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
self.request.namespace_client = Some(client);
self
}
/// Execute the clone operation
pub async fn execute(self) -> Result<Table> {
let parent = self.parent.clone();
@@ -814,13 +784,19 @@ impl ConnectBuilder {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
// Propagate mem_wal_enabled from options to client_config
let mut client_config = self.request.client_config;
if options.mem_wal_enabled.is_some() {
client_config.mem_wal_enabled = options.mem_wal_enabled;
}
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
&self.request.uri,
&api_key,
&region,
options.host_override,
self.request.client_config,
client_config,
storage_options.into(),
)?);
Ok(Connection {

View File

@@ -66,10 +66,6 @@ pub struct OpenTableRequest {
/// Optional namespace client for server-side query execution.
/// When set, queries will be executed on the namespace server instead of locally.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
/// Whether managed versioning is enabled for this table.
/// When Some(true), the table will use namespace-managed commits instead of local commits.
/// When None and namespace_client is provided, the value will be fetched from the namespace.
pub managed_versioning: Option<bool>,
}
impl std::fmt::Debug for OpenTableRequest {
@@ -81,7 +77,6 @@ impl std::fmt::Debug for OpenTableRequest {
.field("lance_read_params", &self.lance_read_params)
.field("location", &self.location)
.field("namespace_client", &self.namespace_client)
.field("managed_versioning", &self.managed_versioning)
.finish()
}
}
@@ -166,9 +161,6 @@ pub struct CloneTableRequest {
/// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true.
/// Currently only shallow clone is supported.
pub is_shallow: bool,
/// Optional namespace client for managed versioning support.
/// When set, enables the commit handler to track table versions through the namespace.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
}
impl CloneTableRequest {
@@ -180,7 +172,6 @@ impl CloneTableRequest {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
}
}
}

View File

@@ -669,7 +669,6 @@ impl ListingDatabase {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
let table = self.open_table(req).await?;
@@ -870,7 +869,6 @@ impl Database for ListingDatabase {
Some(write_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
)
.await
{
@@ -948,9 +946,7 @@ impl Database for ListingDatabase {
self.store_wrapper.clone(),
None,
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
None, // managed_versioning - will be queried if namespace_client is provided
None,
)
.await?;
@@ -1026,8 +1022,6 @@ impl Database for ListingDatabase {
Some(read_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
request.managed_versioning, // Pass through managed_versioning from request
)
.await?,
);
@@ -1168,7 +1162,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1229,7 +1222,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1289,7 +1281,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1326,7 +1317,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: false, // Request deep clone
namespace_client: None,
})
.await;
@@ -1367,7 +1357,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1408,7 +1397,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1428,7 +1416,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1465,7 +1452,6 @@ mod tests {
source_version: Some(1),
source_tag: Some("v1.0".to_string()),
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1539,7 +1525,6 @@ mod tests {
source_version: Some(initial_version),
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1618,7 +1603,6 @@ mod tests {
source_version: None,
source_tag: Some("v1.0".to_string()),
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1670,7 +1654,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1763,7 +1746,6 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();

View File

@@ -7,7 +7,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
@@ -19,8 +18,6 @@ use lance_namespace::{
},
};
use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
@@ -208,12 +205,14 @@ impl Database for LanceNamespaceDatabase {
let mut table_id = request.namespace.clone();
table_id.push(request.name.clone());
// Try declare_table first, falling back to create_empty_table for backwards
// compatibility with older namespace clients that don't support declare_table
let declare_request = DeclareTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let (location, initial_storage_options, managed_versioning) = {
let (location, initial_storage_options) = {
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
@@ -223,33 +222,21 @@ impl Database for LanceNamespaceDatabase {
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning)
(loc, opts)
};
// Build write params with storage options and commit handler
let mut params = request.write_options.lance_write_params.unwrap_or_default();
// Set up storage options if provided
if let Some(storage_opts) = initial_storage_options {
let write_params = if let Some(storage_opts) = initial_storage_options {
let mut params = request.write_options.lance_write_params.unwrap_or_default();
let store_params = params
.store_params
.get_or_insert_with(ObjectStoreParams::default);
store_params.storage_options_accessor = Some(Arc::new(
StorageOptionsAccessor::with_static_options(storage_opts),
));
}
// Set up commit handler when managed_versioning is enabled
if managed_versioning == Some(true) {
let external_store =
LanceNamespaceExternalManifestStore::new(self.namespace.clone(), table_id.clone());
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
external_manifest_store: Arc::new(external_store),
});
params.commit_handler = Some(commit_handler);
}
let write_params = Some(params);
Some(params)
} else {
request.write_options.lance_write_params
};
let native_table = NativeTable::create_from_namespace(
self.namespace.clone(),

View File

@@ -14,6 +14,7 @@ use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
/// Configuration for TLS/mTLS settings.
#[derive(Clone, Debug, Default)]
@@ -52,6 +53,10 @@ pub struct ClientConfig {
pub tls_config: Option<TlsConfig>,
/// Provider for custom headers to be added to each request
pub header_provider: Option<Arc<dyn HeaderProvider>>,
/// Enable MemWAL write path for streaming writes.
/// When true, write operations will use the MemWAL architecture
/// for high-performance streaming writes.
pub mem_wal_enabled: Option<bool>,
}
impl std::fmt::Debug for ClientConfig {
@@ -67,6 +72,7 @@ impl std::fmt::Debug for ClientConfig {
"header_provider",
&self.header_provider.as_ref().map(|_| "Some(...)"),
)
.field("mem_wal_enabled", &self.mem_wal_enabled)
.finish()
}
}
@@ -81,6 +87,7 @@ impl Default for ClientConfig {
id_delimiter: None,
tls_config: None,
header_provider: None,
mem_wal_enabled: None,
}
}
}
@@ -477,6 +484,11 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
}
// Add MemWAL header if enabled
if let Some(true) = config.mem_wal_enabled {
headers.insert(MEM_WAL_ENABLED_HEADER, HeaderValue::from_static("true"));
}
Ok(headers)
}

View File

@@ -78,6 +78,7 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
pub const OPT_REMOTE_MEM_WAL_ENABLED: &str = "remote_database_mem_wal_enabled";
// TODO: add support for configuring client config via key/value options
#[derive(Clone, Debug, Default)]
@@ -98,6 +99,12 @@ pub struct RemoteDatabaseOptions {
/// These options are only used for LanceDB Enterprise and only a subset of options
/// are supported.
pub storage_options: HashMap<String, String>,
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations (insert, merge_insert, etc.) will use
/// the MemWAL architecture which buffers writes in memory and Write-Ahead Log
/// before asynchronously merging to the base table.
pub mem_wal_enabled: Option<bool>,
}
impl RemoteDatabaseOptions {
@@ -109,6 +116,9 @@ impl RemoteDatabaseOptions {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let mem_wal_enabled = map
.get(OPT_REMOTE_MEM_WAL_ENABLED)
.map(|v| v.to_lowercase() == "true");
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
@@ -119,6 +129,7 @@ impl RemoteDatabaseOptions {
region,
host_override,
storage_options,
mem_wal_enabled,
})
}
}
@@ -137,6 +148,12 @@ impl DatabaseOptions for RemoteDatabaseOptions {
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
if let Some(mem_wal_enabled) = &self.mem_wal_enabled {
map.insert(
OPT_REMOTE_MEM_WAL_ENABLED.to_string(),
mem_wal_enabled.to_string(),
);
}
}
}
@@ -181,6 +198,20 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations will use the MemWAL architecture
/// which buffers writes in memory and Write-Ahead Log before
/// asynchronously merging to the base table.
///
/// # Arguments
///
/// * `enabled` - Whether to enable MemWAL writes
pub fn mem_wal_enabled(mut self, enabled: bool) -> Self {
self.options.mem_wal_enabled = Some(enabled);
self
}
}
#[derive(Debug)]
@@ -464,7 +495,6 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
self.open_table(req).await

View File

@@ -62,6 +62,7 @@ use std::time::Duration;
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
@@ -1359,6 +1360,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let mem_wal = params.mem_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
@@ -1374,6 +1376,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
if mem_wal {
request = request.header(MEM_WAL_ENABLED_HEADER, "true");
}
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
let response = self.check_table_response(&request_id, response).await?;

View File

@@ -34,13 +34,9 @@ use lance_index::vector::sq::builder::SQBuildParams;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
pub use query::AnyQuery;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_namespace::LanceNamespace;
use lance_namespace::models::DescribeTableRequest;
use lance_table::format::Manifest;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::ManifestNamingScheme;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::format;
@@ -1216,13 +1212,10 @@ pub struct NativeTable {
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
// Optional namespace client for namespace operations (e.g., managed versioning).
// pub(crate) so query.rs can access the field for server-side query execution.
// Optional namespace client for server-side query execution.
// When set, queries will be executed on the namespace server instead of locally.
// pub (crate) namespace_client so query.rs can access the fields
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
// Whether to enable server-side query execution via the namespace client.
// When true and namespace_client is set, queries will be executed on the
// namespace server instead of locally.
pub(crate) server_side_query_enabled: bool,
}
impl std::fmt::Debug for NativeTable {
@@ -1234,7 +1227,6 @@ impl std::fmt::Debug for NativeTable {
.field("uri", &self.uri)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("namespace_client", &self.namespace_client)
.field("server_side_query_enabled", &self.server_side_query_enabled)
.finish()
}
}
@@ -1271,7 +1263,7 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, vec![], None, None, None, None, false, None).await
Self::open_with_params(uri, &name, vec![], None, None, None, None).await
}
/// Opens an existing Table
@@ -1281,10 +1273,7 @@ impl NativeTable {
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `params` The [ReadParams] to use when opening the table
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
/// is provided, the value will be fetched via describe_table.
/// * `namespace_client` - Optional namespace client for server-side query execution
///
/// # Returns
///
@@ -1298,8 +1287,6 @@ impl NativeTable {
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
managed_versioning: Option<bool>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
@@ -1308,54 +1295,17 @@ impl NativeTable {
None => params,
};
// Build table_id from namespace + name
let mut table_id = namespace.clone();
table_id.push(name.to_string());
// Determine if managed_versioning is enabled
// Use the provided value if available, otherwise query the namespace
let managed_versioning = match managed_versioning {
Some(value) => value,
None if namespace_client.is_some() => {
let ns_client = namespace_client.as_ref().unwrap();
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let response = ns_client
.describe_table(describe_request)
.await
.map_err(|e| Error::Runtime {
message: format!(
"Failed to describe table via namespace client: {}. \
If you don't need managed versioning, don't pass namespace_client.",
e
),
})?;
response.managed_versioning == Some(true)
}
None => false,
};
let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
// Set up commit handler when managed_versioning is enabled
if managed_versioning && let Some(ref ns_client) = namespace_client {
let external_store =
LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
external_manifest_store: Arc::new(external_store),
});
builder = builder.with_commit_handler(commit_handler);
}
let dataset = builder.load().await.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
source: Box::new(e),
},
e => e.into(),
})?;
let dataset = DatasetBuilder::from_uri(uri)
.with_read_params(params)
.load()
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
source: Box::new(e),
},
e => e.into(),
})?;
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
let id = Self::build_id(&namespace, name);
@@ -1368,7 +1318,6 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
})
}
@@ -1472,7 +1421,6 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
})
}
@@ -1512,8 +1460,7 @@ impl NativeTable {
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
/// * `batches` RecordBatch to be saved in the database.
/// * `params` - Write parameters.
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `namespace_client` - Optional namespace client for server-side query execution
///
/// # Returns
///
@@ -1528,7 +1475,6 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1561,7 +1507,6 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client,
server_side_query_enabled,
})
}
@@ -1575,7 +1520,6 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
) -> Result<Self> {
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
Self::create(
@@ -1587,7 +1531,6 @@ impl NativeTable {
params,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
)
.await
}
@@ -1691,7 +1634,6 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
})
}
@@ -2683,7 +2625,7 @@ mod tests {
vec![Ok(batch.clone())],
batch.schema(),
));
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None, false)
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None)
.await
.unwrap();

View File

@@ -3,13 +3,12 @@
use std::sync::Arc;
use arrow_cast::can_cast_types;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
use datafusion::functions::core::{get_field, named_struct};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_expr::expressions::{Literal, cast};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
@@ -26,9 +25,12 @@ pub fn cast_to_table_schema(
return Ok(input);
}
let exprs = build_field_exprs(input_schema.fields(), table_schema.fields(), &|idx| {
Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>
})?;
let exprs = build_field_exprs(
input_schema.fields(),
table_schema.fields(),
&|idx| Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>,
&input_schema,
)?;
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
.into_iter()
@@ -49,6 +51,7 @@ fn build_field_exprs(
input_fields: &Fields,
table_fields: &Fields,
get_input_expr: &dyn Fn(usize) -> Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<Vec<(Arc<dyn PhysicalExpr>, FieldRef)>> {
let config = Arc::new(ConfigOptions::default());
let mut result = Vec::new();
@@ -69,19 +72,24 @@ fn build_field_exprs(
(DataType::Struct(in_children), DataType::Struct(tbl_children))
if in_children != tbl_children =>
{
let sub_exprs = build_field_exprs(in_children, tbl_children, &|child_idx| {
let child_name = in_children[child_idx].name();
Arc::new(ScalarFunctionExpr::new(
&format!("get_field({child_name})"),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
],
Arc::new(in_children[child_idx].as_ref().clone()),
config.clone(),
)) as Arc<dyn PhysicalExpr>
})?;
let sub_exprs = build_field_exprs(
in_children,
tbl_children,
&|child_idx| {
let child_name = in_children[child_idx].name();
Arc::new(ScalarFunctionExpr::new(
&format!("get_field({child_name})"),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
],
Arc::new(in_children[child_idx].as_ref().clone()),
config.clone(),
)) as Arc<dyn PhysicalExpr>
},
input_schema,
)?;
let output_struct_fields: Fields = sub_exprs
.iter()
@@ -117,21 +125,17 @@ fn build_field_exprs(
// Types match: pass through.
(inp, tbl) if inp == tbl => input_expr,
// Types differ: cast.
// safe: false (the default) means overflow/truncation errors surface at execution time.
(_, _) if can_cast_types(input_field.data_type(), table_field.data_type()) => Arc::new(
CastExpr::new(input_expr, table_field.data_type().clone(), None),
)
as Arc<dyn PhysicalExpr>,
(inp, tbl) => {
return Err(Error::InvalidInput {
_ => cast(input_expr, input_schema, table_field.data_type().clone()).map_err(|e| {
Error::InvalidInput {
message: format!(
"cannot cast field '{}' from {} to {}",
"cannot cast field '{}' from {} to {}: {}",
table_field.name(),
inp,
tbl,
input_field.data_type(),
table_field.data_type(),
e
),
});
}
}
})?,
};
let output_field = Arc::new(Field::new(
@@ -149,12 +153,10 @@ fn build_field_exprs(
mod tests {
use std::sync::Arc;
use arrow::buffer::OffsetBuffer;
use arrow_array::{
Array, Float32Array, Float64Array, Int32Array, Int64Array, ListArray, RecordBatch,
StringArray, StructArray, UInt32Array, UInt64Array,
Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
};
use arrow_schema::{DataType, Field, Fields, Schema};
use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use futures::TryStreamExt;
@@ -493,129 +495,4 @@ mod tests {
assert_eq!(b.value(0), "hello");
assert_eq!(b.value(1), "world");
}
#[tokio::test]
async fn test_narrowing_numeric_cast_success() {
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
vec![Arc::new(UInt64Array::from(vec![1u64, 2, 3]))],
)
.unwrap();
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let plan = plan_from_batch(input_batch).await;
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let result = collect(casted).await;
assert_eq!(result.schema().field(0).data_type(), &DataType::UInt32);
let a: &UInt32Array = result.column(0).as_any().downcast_ref().unwrap();
assert_eq!(a.values(), &[1u32, 2, 3]);
}
#[tokio::test]
async fn test_narrowing_numeric_cast_overflow_errors() {
let overflow_val = u32::MAX as u64 + 1;
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
vec![Arc::new(UInt64Array::from(vec![overflow_val]))],
)
.unwrap();
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let plan = plan_from_batch(input_batch).await;
// Planning succeeds — the overflow is only detected at execution time.
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let ctx = SessionContext::new();
let stream = casted.execute(0, ctx.task_ctx()).unwrap();
let result: Result<Vec<RecordBatch>, _> = stream.try_collect().await;
assert!(result.is_err(), "expected overflow error at execution time");
}
#[tokio::test]
async fn test_list_struct_field_reorder() {
// list<struct<a: Int32, b: Int32>> → list<struct<b: Int64, a: Int64>>
// Tests both reordering (a,b → b,a) and element-type widening (Int32 → Int64).
let inner_fields: Fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]
.into();
let struct_array = StructArray::from(vec![
(
Arc::new(inner_fields[0].as_ref().clone()),
Arc::new(Int32Array::from(vec![1, 3])) as _,
),
(
Arc::new(inner_fields[1].as_ref().clone()),
Arc::new(Int32Array::from(vec![2, 4])) as _,
),
]);
// Offsets: one list element containing two struct rows (0..2).
let offsets = OffsetBuffer::from_lengths(vec![2]);
let list_array = ListArray::try_new(
Arc::new(Field::new("item", DataType::Struct(inner_fields), true)),
offsets,
Arc::new(struct_array),
None,
)
.unwrap();
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"s_list",
list_array.data_type().clone(),
false,
)])),
vec![Arc::new(list_array)],
)
.unwrap();
let table_inner: Fields = vec![
Field::new("b", DataType::Int64, true),
Field::new("a", DataType::Int64, true),
]
.into();
let table_schema = Schema::new(vec![Field::new(
"s_list",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(table_inner),
true,
))),
false,
)]);
let plan = plan_from_batch(input_batch).await;
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let result = collect(casted).await;
let list_col = result
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let struct_col = list_col
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
assert_eq!(struct_col.num_columns(), 2);
let b: &Int64Array = struct_col
.column_by_name("b")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(b.values(), &[2, 4]);
let a: &Int64Array = struct_col
.column_by_name("a")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(a.values(), &[1, 3]);
}
}

View File

@@ -1,4 +1,3 @@
use futures::FutureExt;
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use serde::{Deserialize, Serialize};
@@ -24,7 +23,7 @@ pub struct DeleteResult {
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(predicate).boxed().await?;
let delete_result = dataset.delete(predicate).await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);

View File

@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) mem_wal: bool,
}
impl MergeInsertBuilder {
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
mem_wal: false,
}
}
@@ -148,13 +150,65 @@ impl MergeInsertBuilder {
self
}
/// Enables MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
///
/// When enabled, the merge insert will route data through a memory node service
/// that buffers writes before flushing to storage. This is only supported for
/// remote (LanceDB Cloud) tables.
///
/// If not set, defaults to `false`.
pub fn mem_wal(&mut self, enabled: bool) -> &mut Self {
self.mem_wal = enabled;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
/// inserted, updated, and deleted.
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeResult> {
// Validate MemWAL constraints before execution
if self.mem_wal {
self.validate_mem_wal_pattern()?;
}
self.table.clone().merge_insert(self, new_data).await
}
/// Validate that the merge insert pattern is supported by MemWAL.
///
/// MemWAL only supports the upsert pattern:
/// - when_matched_update_all (without filter)
/// - when_not_matched_insert_all
/// - NO when_not_matched_by_source_delete
fn validate_mem_wal_pattern(&self) -> Result<()> {
// Must have when_matched_update_all without filter
if !self.when_matched_update_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_matched_update_all() to be set".to_string(),
});
}
if self.when_matched_update_all_filt.is_some() {
return Err(Error::InvalidInput {
message: "MemWAL does not support conditional when_matched_update_all (no filter allowed)".to_string(),
});
}
// Must have when_not_matched_insert_all
if !self.when_not_matched_insert_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_not_matched_insert_all() to be set".to_string(),
});
}
// Must NOT have when_not_matched_by_source_delete
if self.when_not_matched_by_source_delete {
return Err(Error::InvalidInput {
message: "MemWAL does not support when_not_matched_by_source_delete()".to_string(),
});
}
Ok(())
}
}
/// Internal implementation of the merge insert logic
@@ -165,6 +219,14 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
if params.mem_wal {
return Err(Error::NotSupported {
message: "MemWAL is not supported for native (local) tables. \
MemWAL is only available for remote (LanceDB Cloud) tables."
.to_string(),
});
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -324,4 +386,139 @@ mod tests {
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
#[tokio::test]
async fn test_mem_wal_validation_valid_pattern() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test", batches)
.execute()
.await
.unwrap();
// Valid MemWAL pattern: when_matched_update_all + when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
// Should fail because native tables don't support MemWAL, but validation passes
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("MemWAL is not supported for native"),
"Expected native table error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test2", batches)
.execute()
.await
.unwrap();
// Missing when_matched_update_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_matched_update_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_not_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test3", batches)
.execute()
.await
.unwrap();
// Missing when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_not_matched_insert_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_filter() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test4", batches)
.execute()
.await
.unwrap();
// With conditional filter - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(Some("target.age > 0".to_string()));
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support conditional"),
"Expected filter validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_delete() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test5", batches)
.execute()
.await
.unwrap();
// With when_not_matched_by_source_delete - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.when_not_matched_by_source_delete(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support when_not_matched_by_source_delete"),
"Expected delete validation error, got: {}",
err
);
}
}

View File

@@ -40,10 +40,8 @@ pub async fn execute_query(
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// If server-side query is enabled and namespace client is configured, use server-side query execution
if table.server_side_query_enabled
&& let Some(ref namespace_client) = table.namespace_client
{
// If namespace client is configured, use server-side query execution
if let Some(ref namespace_client) = table.namespace_client {
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
}
execute_generic_query(table, query, options).await