Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
c874d39f4b Bump version: 0.28.0-beta.6 → 0.28.0-beta.7 2026-04-17 08:12:19 +00:00
27 changed files with 437 additions and 481 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.28.0-beta.9"
current_version = "0.28.0-beta.7"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

6
Cargo.lock generated
View File

@@ -4576,7 +4576,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.28.0-beta.8"
version = "0.28.0-beta.6"
dependencies = [
"ahash",
"anyhow",
@@ -4658,7 +4658,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.28.0-beta.8"
version = "0.28.0-beta.6"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4680,7 +4680,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.31.0-beta.8"
version = "0.31.0-beta.6"
dependencies = [
"arrow",
"async-trait",

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.28.0-beta.9</version>
<version>0.28.0-beta.7</version>
</dependency>
```

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.9</version>
<version>0.28.0-beta.7</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.9</version>
<version>0.28.0-beta.7</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.28.0-beta.9"
version = "0.28.0-beta.7"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -1,8 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import { spawn } from "node:child_process";
import * as path from "node:path";
import { RecordBatch } from "apache-arrow";
import * as tmp from "tmp";
import { Connection, Index, Table, connect, makeArrowTable } from "../lancedb";
@@ -78,91 +76,4 @@ describe("rerankers", function () {
expect(result).toHaveLength(2);
});
it("does not keep process alive after rerank query", async function () {
const script = `
import * as lancedb from "./dist/index.js";
import * as os from "node:os";
import * as path from "node:path";
import * as fs from "node:fs/promises";
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "lancedb-rerank-exit-"));
const db = await lancedb.connect(dir);
const table = await db.createTable("test", [{ text: "hello", vector: [1, 2, 3] }], {
mode: "overwrite",
});
await table.createIndex("text", { config: lancedb.Index.fts() });
await table.waitForIndex(["text_idx"], 30);
const reranker = await lancedb.rerankers.RRFReranker.create();
await table
.query()
.nearestTo([1, 2, 3])
.fullTextSearch("hello")
.rerank(reranker)
.toArray();
table.close();
db.close();
`;
await new Promise<void>((resolve, reject) => {
const child = spawn(
process.execPath,
["--input-type=module", "-e", script],
{
cwd: path.resolve(__dirname, ".."),
stdio: ["ignore", "pipe", "pipe"],
},
);
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += chunk.toString();
});
child.stderr.on("data", (chunk) => {
stderr += chunk.toString();
});
const timeout = setTimeout(() => {
child.kill();
reject(
new Error(
`child process did not exit in time\nstdout:\n${stdout}\nstderr:\n${stderr}`,
),
);
}, 20_000);
child.on("error", (err) => {
clearTimeout(timeout);
reject(err);
});
child.on("exit", (code, signal) => {
clearTimeout(timeout);
if (signal !== null) {
reject(
new Error(
`child process exited with signal ${signal}\nstdout:\n${stdout}\nstderr:\n${stderr}`,
),
);
return;
}
if (code !== 0) {
reject(
new Error(
`child process exited with code ${code}\nstdout:\n${stdout}\nstderr:\n${stderr}`,
),
);
return;
}
resolve();
});
});
});
});

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.28.0-beta.8",
"version": "0.28.0-beta.6",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.28.0-beta.8",
"version": "0.28.0-beta.6",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.7",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -18,7 +18,6 @@ type RerankHybridFn = ThreadsafeFunction<
RerankHybridCallbackArgs,
Status,
false,
true,
>;
/// Reranker implementation that "wraps" a NodeJS Reranker implementation.
@@ -33,10 +32,7 @@ impl Reranker {
pub fn new(
rerank_hybrid: Function<RerankHybridCallbackArgs, Promise<Buffer>>,
) -> napi::Result<Self> {
let rerank_hybrid = rerank_hybrid
.build_threadsafe_function()
.weak::<true>()
.build()?;
let rerank_hybrid = rerank_hybrid.build_threadsafe_function().build()?;
Ok(Self { rerank_hybrid })
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.31.0-beta.9"
current_version = "0.31.0-beta.7"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.31.0-beta.9"
version = "0.31.0-beta.7"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -10,6 +10,7 @@ through a namespace abstraction.
from __future__ import annotations
import asyncio
import sys
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
@@ -24,24 +25,7 @@ if TYPE_CHECKING:
from datetime import timedelta
import pyarrow as pa
from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType
from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField
from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema
from lance_namespace_urllib3_client.models.query_table_request import QueryTableRequest
from lance_namespace_urllib3_client.models.query_table_request_columns import (
QueryTableRequestColumns,
)
from lance_namespace_urllib3_client.models.query_table_request_full_text_query import (
QueryTableRequestFullTextQuery,
)
from lance_namespace_urllib3_client.models.query_table_request_vector import (
QueryTableRequestVector,
)
from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery
from lance_namespace.errors import TableNotFoundError
from lancedb._lancedb import connect_namespace_client as _connect_namespace_client
from lancedb.background_loop import LOOP
from lancedb.db import AsyncConnection, DBConnection
from lancedb.db import DBConnection, LanceDBConnection
from lancedb.namespace_utils import (
_normalize_create_namespace_mode,
_normalize_drop_namespace_mode,
@@ -56,11 +40,14 @@ from lance_namespace import (
ListNamespacesResponse,
ListTablesResponse,
ListTablesRequest,
DescribeTableRequest,
DescribeNamespaceRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
DeclareTableRequest,
CreateTableRequest,
)
from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name
@@ -69,6 +56,21 @@ from lancedb.pydantic import LanceModel
from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema
from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField
from lance_namespace_urllib3_client.models.json_arrow_data_type import JsonArrowDataType
from lance_namespace_urllib3_client.models.query_table_request import QueryTableRequest
from lance_namespace_urllib3_client.models.query_table_request_vector import (
QueryTableRequestVector,
)
from lance_namespace_urllib3_client.models.query_table_request_columns import (
QueryTableRequestColumns,
)
from lance_namespace_urllib3_client.models.query_table_request_full_text_query import (
QueryTableRequestFullTextQuery,
)
from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery
def _query_to_namespace_request(
table_id: List[str],
@@ -422,23 +424,6 @@ class LanceNamespaceDBConnection(DBConnection):
)
self._namespace_client_impl = namespace_client_impl
self._namespace_client_properties = namespace_client_properties
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=self.storage_options or None,
session=session,
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
)
)
@override
def serialize(self) -> str:
@@ -512,10 +497,13 @@ class LanceNamespaceDBConnection(DBConnection):
if mode.lower() not in ["create", "overwrite"]:
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
async_table = LOOP.run(
self._inner.create_table(
name,
data,
table_id = namespace_path + [name]
if "CreateTable" in self._namespace_client_pushdown_operations:
return self._create_table_server_side(
name=name,
data=data,
schema=schema,
mode=mode,
exist_ok=exist_ok,
@@ -525,15 +513,130 @@ class LanceNamespaceDBConnection(DBConnection):
namespace_path=namespace_path,
storage_options=storage_options,
)
# Local create path: declare_table + local write
# Step 1: Get the table location and storage options from namespace
# In overwrite mode, if table exists, use describe_table to get
# existing location. Otherwise, call create_empty_table to reserve
# a new location
location = None
namespace_storage_options = None
if mode.lower() == "overwrite":
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._namespace_client.describe_table(
describe_request
)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
# Table doesn't exist, will create a new one below
pass
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
declare_request = DeclareTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._namespace_client.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
"Table location is missing from declare_table response"
)
location = declare_response.location
namespace_storage_options = declare_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if namespace_storage_options:
merged_storage_options.update(namespace_storage_options)
# Step 2: Create table using LanceTable.create with the location
# We need a temporary connection for the LanceTable.create method
temp_conn = LanceDBConnection(
location, # Use the actual location as the connection URI
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
return LanceTable(
self,
# Note: storage_options_provider is auto-created in Rust from namespace_client
tbl = LanceTable.create(
temp_conn,
name,
data,
schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace_path=namespace_path,
storage_options=merged_storage_options,
location=location,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
_async=async_table,
)
return tbl
def _create_table_server_side(
self,
name: str,
data: Optional[DATA],
schema: Optional[Union[pa.Schema, LanceModel]],
mode: str,
exist_ok: bool,
on_bad_vectors: str,
fill_value: float,
embedding_functions: Optional[List[EmbeddingFunctionConfig]],
namespace_path: Optional[List[str]],
storage_options: Optional[Dict[str, str]],
) -> Table:
"""Create a table using server-side namespace.create_table()."""
if namespace_path is None:
namespace_path = []
table_id = namespace_path + [name]
arrow_ipc_bytes = _data_to_arrow_ipc(
data=data,
schema=schema,
embedding_functions=embedding_functions,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
merged = dict(self.storage_options or {})
if storage_options:
merged.update(storage_options)
request = CreateTableRequest(
id=table_id,
mode=_normalize_create_table_mode(mode),
properties=merged or None,
)
try:
self._namespace_client.create_table(request, arrow_ipc_bytes)
except Exception as e:
if exist_ok and "already exists" in str(e).lower():
return self.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
)
raise
return self.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
)
@override
@@ -547,28 +650,30 @@ class LanceNamespaceDBConnection(DBConnection):
) -> Table:
if namespace_path is None:
namespace_path = []
try:
async_table = LOOP.run(
self._inner.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
)
except RuntimeError as e:
if "Table not found" in str(e):
table_id = namespace_path + [name]
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
raise
table_id = namespace_path + [name]
request = DescribeTableRequest(id=table_id)
response = self._namespace_client.describe_table(request)
return LanceTable(
self,
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
# Pass managed_versioning to avoid redundant describe_table call in Rust.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
# Note: storage_options_provider is auto-created in Rust from namespace_client
return self._lance_table_from_uri(
name,
response.location,
namespace_path=namespace_path,
storage_options=merged_storage_options,
index_cache_size=index_cache_size,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
_async=async_table,
managed_versioning=managed_versioning,
)
@override
@@ -792,34 +897,33 @@ class LanceNamespaceDBConnection(DBConnection):
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> LanceTable:
# Open a table directly from the namespace-resolved physical location.
#
# Open the table through the Rust namespace-backed connection. The Rust
# layer keeps the logical namespace path and namespace client intact.
# Open a table directly from a URI using the location parameter
# Note: storage_options should already be merged by the caller
# Note: storage_options_provider is auto-created in Rust from namespace_client
if namespace_path is None:
namespace_path = []
async_table = LOOP.run(
self._inner.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
location=None,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
temp_conn = LanceDBConnection(
table_uri, # Use the table location as the connection URI
read_consistency_interval=self.read_consistency_interval,
storage_options=storage_options if storage_options is not None else {},
session=self.session,
)
return LanceTable(
self,
# Open the table using the temporary connection with the location parameter
# Pass namespace_client to enable managed versioning support and auto-create
# storage options provider
# Pass managed_versioning to avoid redundant describe_table call
# Pass pushdown_operations if configured on this connection
return LanceTable.open(
temp_conn,
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
location=table_uri,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
pushdown_operations=self._namespace_client_pushdown_operations,
_async=async_table,
)
@override
@@ -886,23 +990,6 @@ class AsyncLanceNamespaceDBConnection:
self._namespace_client_pushdown_operations = set(
namespace_client_pushdown_operations or []
)
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=self.storage_options or None,
session=session,
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=None,
namespace_client_properties=None,
)
)
async def table_names(
self,
@@ -954,16 +1041,148 @@ class AsyncLanceNamespaceDBConnection:
if mode.lower() not in ["create", "overwrite"]:
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
return await self._inner.create_table(
table_id = namespace_path + [name]
if "CreateTable" in self._namespace_client_pushdown_operations:
return await self._create_table_server_side(
name=name,
data=data,
schema=schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace_path=namespace_path,
storage_options=storage_options,
)
# Local create path: declare_table + local write
# Step 1: Get the table location and storage options from namespace
location = None
namespace_storage_options = None
if mode.lower() == "overwrite":
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._namespace_client.describe_table(
describe_request
)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
# Table doesn't exist, will create a new one below
pass
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
declare_request = DeclareTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._namespace_client.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
"Table location is missing from declare_table response"
)
location = declare_response.location
namespace_storage_options = declare_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if namespace_storage_options:
merged_storage_options.update(namespace_storage_options)
# Step 2: Create table using LanceTable.create with the location
# Run the sync operation in a thread
def _create_table():
temp_conn = LanceDBConnection(
location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
# storage_options_provider is auto-created in Rust from namespace_client
return LanceTable.create(
temp_conn,
name,
data,
schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace_path=namespace_path,
storage_options=merged_storage_options,
location=location,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
)
lance_table = await asyncio.to_thread(_create_table)
# Get the underlying async table from LanceTable
return lance_table._table
async def _create_table_server_side(
self,
name: str,
data: Optional[DATA],
schema: Optional[Union[pa.Schema, LanceModel]],
mode: str,
exist_ok: bool,
on_bad_vectors: str,
fill_value: float,
embedding_functions: Optional[List[EmbeddingFunctionConfig]],
namespace_path: Optional[List[str]],
storage_options: Optional[Dict[str, str]],
) -> AsyncTable:
"""Create a table using server-side namespace.create_table()."""
if namespace_path is None:
namespace_path = []
table_id = namespace_path + [name]
def _prepare_and_create():
arrow_ipc_bytes = _data_to_arrow_ipc(
data=data,
schema=schema,
embedding_functions=embedding_functions,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
merged = dict(self.storage_options or {})
if storage_options:
merged.update(storage_options)
request = CreateTableRequest(
id=table_id,
mode=_normalize_create_table_mode(mode),
properties=merged or None,
)
self._namespace_client.create_table(request, arrow_ipc_bytes)
try:
await asyncio.to_thread(_prepare_and_create)
except Exception as e:
if exist_ok and "already exists" in str(e).lower():
return await self.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
)
raise
return await self.open_table(
name,
data,
schema=schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
namespace_path=namespace_path,
embedding_functions=embedding_functions,
storage_options=storage_options,
)
@@ -978,18 +1197,45 @@ class AsyncLanceNamespaceDBConnection:
"""Open an existing table from the namespace."""
if namespace_path is None:
namespace_path = []
try:
return await self._inner.open_table(
table_id = namespace_path + [name]
request = DescribeTableRequest(id=table_id)
response = self._namespace_client.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
# Capture managed_versioning from describe response.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
# Open table in a thread
# Note: storage_options_provider is auto-created in Rust from namespace_client
def _open_table():
temp_conn = LanceDBConnection(
response.location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
return LanceTable.open(
temp_conn,
name,
namespace_path=namespace_path,
storage_options=storage_options,
storage_options=merged_storage_options,
index_cache_size=index_cache_size,
location=response.location,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
pushdown_operations=self._namespace_client_pushdown_operations,
)
except RuntimeError as e:
if "Table not found" in str(e):
table_id = namespace_path + [name]
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
raise
lance_table = await asyncio.to_thread(_open_table)
return lance_table._table
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
"""Drop a table from the namespace."""

View File

@@ -1,17 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
use lancedb::{
connection::Connection as LanceConnection,
connection::NamespaceClientPushdownOperation,
database::namespace::LanceNamespaceDatabase,
database::{CreateTableMode, Database, ReadConsistency},
};
use pyo3::{
@@ -45,29 +39,6 @@ impl Connection {
}
}
fn parse_namespace_client_pushdown_operations(
operations: Option<Vec<String>>,
) -> PyResult<HashSet<NamespaceClientPushdownOperation>> {
let mut parsed = HashSet::new();
for operation in operations.unwrap_or_default() {
match operation.as_str() {
"QueryTable" => {
parsed.insert(NamespaceClientPushdownOperation::QueryTable);
}
"CreateTable" => {
parsed.insert(NamespaceClientPushdownOperation::CreateTable);
}
_ => {
return Err(PyValueError::new_err(format!(
"Invalid pushdown operation: {}",
operation
)));
}
}
}
Ok(parsed)
}
impl Connection {
fn parse_create_mode_str(mode: &str) -> PyResult<CreateTableMode> {
match mode {
@@ -567,52 +538,6 @@ pub fn connect(
})
}
#[pyfunction]
#[pyo3(signature = (
namespace_client,
read_consistency_interval=None,
storage_options=None,
session=None,
namespace_client_pushdown_operations=None,
namespace_client_impl=None,
namespace_client_properties=None,
))]
#[allow(clippy::too_many_arguments)]
pub fn connect_namespace_client(
py: Python<'_>,
namespace_client: Py<PyAny>,
read_consistency_interval: Option<f64>,
storage_options: Option<HashMap<String, String>>,
session: Option<crate::session::Session>,
namespace_client_pushdown_operations: Option<Vec<String>>,
namespace_client_impl: Option<String>,
namespace_client_properties: Option<HashMap<String, String>>,
) -> PyResult<Connection> {
let namespace_client = extract_namespace_arc(py, namespace_client)?;
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
let namespace_client_pushdown_operations =
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string());
let ns_properties = namespace_client_properties.unwrap_or_default();
let storage_options = storage_options.unwrap_or_default();
let session = session.map(|s| s.inner.clone());
let database = LanceNamespaceDatabase::from_namespace_client(
namespace_client,
ns_impl,
ns_properties,
storage_options,
read_consistency_interval,
session,
namespace_client_pushdown_operations,
);
Ok(Connection::new(LanceConnection::new(
Arc::new(database),
Arc::new(lancedb::embeddings::MemoryRegistry::new()),
)))
}
#[derive(FromPyObject)]
pub struct PyClientConfig {
user_agent: String,

View File

@@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use arrow::RecordBatchStream;
use connection::{Connection, connect, connect_namespace_client};
use connection::{Connection, connect};
use env_logger::Env;
use expr::{PyExpr, expr_col, expr_func, expr_lit};
use index::IndexConfig;
@@ -58,7 +58,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyPermutationReader>()?;
m.add_class::<PyExpr>()?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add_function(wrap_pyfunction!(connect_namespace_client, m)?)?;
m.add_function(wrap_pyfunction!(permutation::async_permutation_builder, m)?)?;
m.add_function(wrap_pyfunction!(util::validate_table_name, m)?)?;
m.add_function(wrap_pyfunction!(query::fts_query_to_json, m)?)?;

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.28.0-beta.9"
version = "0.28.0-beta.7"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -915,7 +915,7 @@ use std::collections::HashSet;
/// These operations will be executed on the namespace server instead of locally
/// when enabled via [`ConnectNamespaceBuilder::pushdown_operations`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum NamespaceClientPushdownOperation {
pub enum PushdownOperation {
/// Execute queries on the namespace server via `query_table()` instead of locally.
QueryTable,
/// Execute table creation on the namespace server via `create_table()`
@@ -931,7 +931,7 @@ pub struct ConnectNamespaceBuilder {
read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
}
impl ConnectNamespaceBuilder {
@@ -1029,11 +1029,11 @@ impl ConnectNamespaceBuilder {
/// and leveraging server-side compute resources.
///
/// Available operations:
/// - [`NamespaceClientPushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
/// - [`NamespaceClientPushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
/// - [`PushdownOperation::QueryTable`]: Execute queries via `namespace.query_table()`
/// - [`PushdownOperation::CreateTable`]: Execute table creation via `namespace.create_table()`
///
/// By default, no operations are pushed down (all executed locally).
pub fn pushdown_operation(mut self, operation: NamespaceClientPushdownOperation) -> Self {
pub fn pushdown_operation(mut self, operation: PushdownOperation) -> Self {
self.pushdown_operations.insert(operation);
self
}
@@ -1043,7 +1043,7 @@ impl ConnectNamespaceBuilder {
/// See [`Self::pushdown_operation`] for details.
pub fn pushdown_operations(
mut self,
operations: impl IntoIterator<Item = NamespaceClientPushdownOperation>,
operations: impl IntoIterator<Item = PushdownOperation>,
) -> Self {
self.pushdown_operations.extend(operations);
self

View File

@@ -22,11 +22,10 @@ use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::connection::NamespaceClientPushdownOperation;
use crate::connection::PushdownOperation;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
use crate::table::NativeTable;
use lance::dataset::WriteMode;
use super::{
BaseTable, CloneTableRequest, CreateTableMode, CreateTableRequest as DbCreateTableRequest,
@@ -45,7 +44,7 @@ pub struct LanceNamespaceDatabase {
// database URI
uri: String,
// Operations to push down to the namespace server
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
// Namespace implementation type (e.g., "dir", "rest")
ns_impl: String,
// Namespace properties used to construct the namespace client
@@ -53,34 +52,13 @@ pub struct LanceNamespaceDatabase {
}
impl LanceNamespaceDatabase {
pub fn from_namespace_client(
namespace_client: Arc<dyn LanceNamespace>,
namespace_client_impl: String,
namespace_client_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Self {
Self {
namespace: namespace_client,
storage_options,
read_consistency_interval,
session,
uri: format!("namespace://{}", namespace_client_impl),
pushdown_operations: namespace_client_pushdown_operations,
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
}
}
pub async fn connect(
ns_impl: &str,
ns_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -185,23 +163,37 @@ impl Database for LanceNamespaceDatabase {
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let mut table_id = request.namespace_path.clone();
table_id.push(request.name.clone());
let mut existing_table = None;
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
match request.mode {
CreateTableMode::Create => {}
CreateTableMode::Create => {
if describe_result.is_ok() {
return Err(Error::TableAlreadyExists {
name: request.name.clone(),
});
}
}
CreateTableMode::Overwrite => {
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
existing_table = self.namespace.describe_table(describe_request).await.ok();
if describe_result.is_ok() {
// Drop the existing table - must succeed
let drop_request = DropTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
self.namespace
.drop_table(drop_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to drop existing table for overwrite: {}", e),
})?;
}
}
CreateTableMode::ExistOk(_) => {
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
if describe_result.is_ok() {
let native_table = NativeTable::open_from_namespace(
self.namespace.clone(),
@@ -229,82 +221,21 @@ impl Database for LanceNamespaceDatabase {
};
let (location, initial_storage_options, managed_versioning) = {
if let Some(response) = existing_table {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from describe_table response".to_string(),
})?;
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning)
} else {
match self.namespace.declare_table(declare_request).await {
Ok(response) => {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response"
.to_string(),
})?;
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o: &HashMap<String, String>| !o.is_empty());
(loc, opts, response.managed_versioning)
}
Err(e)
if matches!(request.mode, CreateTableMode::Create) && {
let err_str = e.to_string();
err_str.contains("already exists")
|| err_str.contains("TableAlreadyExists")
|| err_str.contains("table already exists")
} =>
{
let response = self
.namespace
.describe_table(DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
})
.await
.map_err(|describe_err| Error::Runtime {
message: format!(
"Failed to describe existing declared table after declare conflict: {}",
describe_err
),
})?;
if response.version.is_some() && response.schema.is_some() {
return Err(Error::TableAlreadyExists {
name: request.name.clone(),
});
}
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from describe_table response"
.to_string(),
})?;
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o: &HashMap<String, String>| !o.is_empty());
(loc, opts, response.managed_versioning)
}
Err(e) => {
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
}
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning)
};
// Build write params with storage options and commit handler
let mut params = request.write_options.lance_write_params.unwrap_or_default();
if matches!(request.mode, CreateTableMode::Overwrite) {
params.mode = WriteMode::Overwrite;
}
// Set up storage options if provided
if let Some(storage_opts) = initial_storage_options {
let store_params = params
@@ -761,58 +692,6 @@ mod tests {
assert_eq!(id_col.value(2), 30);
}
#[tokio::test]
async fn test_namespace_create_table_after_declare_conflict() {
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.execute()
.await
.expect("Failed to connect to namespace");
conn.create_namespace(CreateNamespaceRequest {
id: Some(vec!["test_ns".into()]),
..Default::default()
})
.await
.expect("Failed to create namespace");
let namespace_client = conn.namespace_client().await.unwrap();
namespace_client
.declare_table(DeclareTableRequest {
id: Some(vec!["test_ns".into(), "declared_test".into()]),
..Default::default()
})
.await
.expect("Failed to declare table");
let test_data = create_test_data();
let table = conn
.create_table("declared_test", test_data)
.namespace(vec!["test_ns".into()])
.execute()
.await
.expect("Failed to create table after declare conflict");
let results = table
.query()
.execute()
.await
.expect("Failed to query table")
.try_collect::<Vec<_>>()
.await
.expect("Failed to collect results");
assert_eq!(results.len(), 1);
assert_eq!(results[0].num_rows(), 5);
assert_eq!(table.namespace(), &["test_ns"]);
assert_eq!(table.id(), "test_ns$declared_test");
}
#[tokio::test]
async fn test_namespace_create_table_exist_ok_mode() {
// Setup: Create a temporary directory for the namespace

View File

@@ -47,7 +47,7 @@ use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::connection::NamespaceClientPushdownOperation;
use crate::connection::PushdownOperation;
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::database::Database;
@@ -1272,7 +1272,7 @@ pub struct NativeTable {
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
// Operations to push down to the namespace server.
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pub(crate) pushdown_operations: HashSet<PushdownOperation>,
}
impl std::fmt::Debug for NativeTable {
@@ -1359,7 +1359,7 @@ impl NativeTable {
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
managed_versioning: Option<bool>,
) -> Result<Self> {
let params = params.unwrap_or_default();
@@ -1470,7 +1470,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
let mut params = params.unwrap_or_default();
@@ -1518,7 +1518,7 @@ impl NativeTable {
let id = Self::build_id(&namespace, name);
let stored_namespace_client =
if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
Some(namespace_client)
} else {
None
@@ -1588,7 +1588,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1635,7 +1635,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
) -> Result<Self> {
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
Self::create(
@@ -1685,7 +1685,7 @@ impl NativeTable {
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
pushdown_operations: HashSet<PushdownOperation>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
// Build table_id from namespace + name for the storage options provider
@@ -1738,7 +1738,7 @@ impl NativeTable {
let id = Self::build_id(&namespace, name);
let stored_namespace_client =
if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
if pushdown_operations.contains(&PushdownOperation::QueryTable) {
Some(namespace_client)
} else {
None

View File

@@ -4,7 +4,7 @@
use std::sync::Arc;
use super::NativeTable;
use crate::connection::NamespaceClientPushdownOperation;
use crate::connection::PushdownOperation;
use crate::error::{Error, Result};
use crate::expr::expr_to_sql_string;
use crate::query::{
@@ -44,7 +44,7 @@ pub async fn execute_query(
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
if table
.pushdown_operations
.contains(&NamespaceClientPushdownOperation::QueryTable)
.contains(&PushdownOperation::QueryTable)
&& let Some(ref namespace_client) = table.namespace_client
{
return execute_namespace_query(table, namespace_client.clone(), query, options).await;