mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-31 02:40:39 +00:00
Compare commits
10 Commits
dependabot
...
xuanwo/rem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d5f65af61 | ||
|
|
a0001043b6 | ||
|
|
1bb7acb74f | ||
|
|
4ce175276c | ||
|
|
4bccb43e56 | ||
|
|
d5dc4c0f06 | ||
|
|
55ae6197c1 | ||
|
|
15bd821825 | ||
|
|
cf162c8a10 | ||
|
|
2eba7ebd02 |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.29.1-beta.0"
|
||||
current_version = "0.30.0-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
49
Cargo.lock
generated
49
Cargo.lock
generated
@@ -1308,6 +1308,17 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "brotli"
|
||||
version = "3.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391"
|
||||
dependencies = [
|
||||
"alloc-no-stdlib",
|
||||
"alloc-stdlib",
|
||||
"brotli-decompressor 2.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "brotli"
|
||||
version = "8.0.2"
|
||||
@@ -1316,7 +1327,17 @@ checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560"
|
||||
dependencies = [
|
||||
"alloc-no-stdlib",
|
||||
"alloc-stdlib",
|
||||
"brotli-decompressor",
|
||||
"brotli-decompressor 5.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "brotli-decompressor"
|
||||
version = "2.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
|
||||
dependencies = [
|
||||
"alloc-no-stdlib",
|
||||
"alloc-stdlib",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4668,7 +4689,7 @@ dependencies = [
|
||||
"rand 0.9.4",
|
||||
"rand_distr 0.5.1",
|
||||
"rand_xoshiro",
|
||||
"random_word",
|
||||
"random_word 0.5.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4993,7 +5014,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
@@ -5055,7 +5076,7 @@ dependencies = [
|
||||
"polars",
|
||||
"polars-arrow",
|
||||
"rand 0.9.4",
|
||||
"random_word",
|
||||
"random_word 0.4.3",
|
||||
"regex",
|
||||
"reqwest 0.12.28",
|
||||
"rstest",
|
||||
@@ -5075,7 +5096,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.0"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -5098,7 +5119,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.32.1-beta.0"
|
||||
version = "0.33.0-beta.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -7364,6 +7385,20 @@ dependencies = [
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "random_word"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07eed67a16dde2cc3c7f65c072acd8d5b2e53d4aab95067c320db851c7651f29"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"brotli 3.5.0",
|
||||
"once_cell",
|
||||
"paste",
|
||||
"rand 0.8.6",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "random_word"
|
||||
version = "0.5.2"
|
||||
@@ -7371,7 +7406,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e47a395bdb55442b883c89062d6bcff25dc90fa5f8369af81e0ac6d49d78cf81"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"brotli",
|
||||
"brotli 8.0.2",
|
||||
"paste",
|
||||
"rand 0.9.4",
|
||||
"unicase",
|
||||
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<version>0.30.0-beta.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<version>0.30.0-beta.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<version>0.30.0-beta.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.0"
|
||||
publish = false
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
List,
|
||||
Schema,
|
||||
SchemaLike,
|
||||
Struct,
|
||||
Type,
|
||||
Uint8,
|
||||
Utf8,
|
||||
@@ -780,6 +781,113 @@ describe("When creating an index", () => {
|
||||
expect(indices2.length).toBe(0);
|
||||
});
|
||||
|
||||
it("should create and search a nested vector index", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const nestedSchema = new Schema([
|
||||
new Field("id", new Int32(), true),
|
||||
new Field(
|
||||
"image",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
const nestedTable = await db.createTable(
|
||||
"nested_vector",
|
||||
makeArrowTable(
|
||||
Array.from({ length: 300 }, (_, id) => ({
|
||||
id,
|
||||
image: { embedding: [id, id + 1] },
|
||||
})),
|
||||
{ schema: nestedSchema },
|
||||
),
|
||||
);
|
||||
|
||||
await nestedTable.createIndex("image.embedding", {
|
||||
name: "image_embedding_idx",
|
||||
});
|
||||
const indices = await nestedTable.listIndices();
|
||||
expect(indices).toContainEqual({
|
||||
name: "image_embedding_idx",
|
||||
indexType: "IvfPq",
|
||||
columns: ["image.embedding"],
|
||||
});
|
||||
|
||||
const explicit = await nestedTable
|
||||
.query()
|
||||
.nearestTo([0.0, 1.0])
|
||||
.column("image.embedding")
|
||||
.limit(1)
|
||||
.toArray();
|
||||
const inferred = await nestedTable
|
||||
.query()
|
||||
.nearestTo([0.0, 1.0])
|
||||
.limit(1)
|
||||
.toArray();
|
||||
expect(inferred[0].id).toEqual(explicit[0].id);
|
||||
});
|
||||
|
||||
it("should report multiple nested vector candidates", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const nestedSchema = new Schema([
|
||||
new Field(
|
||||
"image",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
new Field(
|
||||
"text",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
const nestedTable = await db.createTable(
|
||||
"multiple_nested_vectors",
|
||||
makeArrowTable(
|
||||
[
|
||||
{
|
||||
image: { embedding: [0.0, 1.0] },
|
||||
text: { embedding: [2.0, 3.0] },
|
||||
},
|
||||
],
|
||||
{ schema: nestedSchema },
|
||||
),
|
||||
);
|
||||
|
||||
await expect(
|
||||
nestedTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
|
||||
).rejects.toThrow(/image\.embedding.*text\.embedding/);
|
||||
});
|
||||
|
||||
it("should report when no default vector column exists", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const noVectorTable = await db.createTable(
|
||||
"no_vector",
|
||||
makeArrowTable([{ id: 0, label: "cat" }]),
|
||||
);
|
||||
|
||||
await expect(
|
||||
noVectorTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
|
||||
).rejects.toThrow(/No vector column/);
|
||||
});
|
||||
|
||||
it("should wait for index readiness", async () => {
|
||||
// Create an index and then wait for it to be ready
|
||||
await tbl.createIndex("vec");
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.0",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.32.1-beta.0"
|
||||
current_version = "0.33.0-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.32.1-beta.0"
|
||||
version = "0.33.0-beta.0"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
|
||||
@@ -304,6 +304,15 @@ def deserialize_conn(
|
||||
manifest_enabled=parsed.get("manifest_enabled", False),
|
||||
namespace_client_properties=parsed.get("namespace_client_properties"),
|
||||
)
|
||||
elif connection_type == "remote":
|
||||
return RemoteDBConnection(
|
||||
parsed["db_url"],
|
||||
parsed["api_key"],
|
||||
parsed.get("region", "us-east-1"),
|
||||
host_override=parsed.get("host_override"),
|
||||
client_config=parsed.get("client_config"),
|
||||
storage_options=storage_options,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown connection_type: {connection_type}")
|
||||
|
||||
|
||||
@@ -8,7 +8,17 @@ from abc import abstractmethod
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterable,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
if sys.version_info >= (3, 12):
|
||||
from typing import override
|
||||
@@ -313,7 +323,7 @@ class DBConnection(EnforceOverrides):
|
||||
>>> data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
|
||||
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
|
||||
>>> db.create_table("my_table", data)
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> db["my_table"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -334,7 +344,7 @@ class DBConnection(EnforceOverrides):
|
||||
... "long": [-122.7, -74.1]
|
||||
... })
|
||||
>>> db.create_table("table2", data)
|
||||
LanceTable(name='table2', version=1, ...)
|
||||
LanceTable(name='table2', ...)
|
||||
>>> db["table2"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -357,7 +367,7 @@ class DBConnection(EnforceOverrides):
|
||||
... pa.field("long", pa.float32())
|
||||
... ])
|
||||
>>> db.create_table("table3", data, schema = custom_schema)
|
||||
LanceTable(name='table3', version=1, ...)
|
||||
LanceTable(name='table3', ...)
|
||||
>>> db["table3"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -391,7 +401,7 @@ class DBConnection(EnforceOverrides):
|
||||
... pa.field("price", pa.float32()),
|
||||
... ])
|
||||
>>> db.create_table("table4", make_batches(), schema=schema)
|
||||
LanceTable(name='table4', version=1, ...)
|
||||
LanceTable(name='table4', ...)
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
@@ -568,15 +578,15 @@ class LanceDBConnection(DBConnection):
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> db.create_table("my_table", data=[{"vector": [1.1, 1.2], "b": 2},
|
||||
... {"vector": [0.5, 1.3], "b": 4}])
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> db.create_table("another_table", data=[{"vector": [0.4, 0.4], "b": 6}])
|
||||
LanceTable(name='another_table', version=1, ...)
|
||||
LanceTable(name='another_table', ...)
|
||||
>>> sorted(db.table_names())
|
||||
['another_table', 'my_table']
|
||||
>>> len(db)
|
||||
2
|
||||
>>> db["my_table"]
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> "my_table" in db
|
||||
True
|
||||
>>> db.drop_table("my_table")
|
||||
@@ -847,11 +857,20 @@ class LanceDBConnection(DBConnection):
|
||||
)
|
||||
)
|
||||
|
||||
def _all_table_names(self) -> Generator[str, None, None]:
|
||||
page_token = None
|
||||
while True:
|
||||
response = self.list_tables(page_token=page_token)
|
||||
yield from response.tables
|
||||
page_token = response.page_token
|
||||
if not page_token:
|
||||
return
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.table_names())
|
||||
return sum(1 for _ in self._all_table_names())
|
||||
|
||||
def __contains__(self, name: str) -> bool:
|
||||
return name in self.table_names()
|
||||
return name in self._all_table_names()
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
|
||||
from deprecation import deprecated
|
||||
import pyarrow as pa
|
||||
|
||||
from ._lancedb import async_permutation_builder, PermutationReader
|
||||
from .table import LanceTable
|
||||
from .table import LanceTable, Table
|
||||
from .background_loop import LOOP
|
||||
from .util import batch_to_tensor, batch_to_tensor_rows
|
||||
from typing import Any, Callable, Iterator, Literal, Optional, TYPE_CHECKING, Union
|
||||
@@ -354,6 +355,49 @@ class Transforms:
|
||||
DEFAULT_BATCH_SIZE = 100
|
||||
|
||||
|
||||
def _table_to_pickle_state(table: Table) -> dict[str, Any]:
|
||||
from .remote.table import RemoteTable
|
||||
|
||||
if isinstance(table, RemoteTable):
|
||||
return {
|
||||
"kind": "remote",
|
||||
"table": table,
|
||||
}
|
||||
|
||||
if not isinstance(table, LanceTable):
|
||||
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
|
||||
|
||||
base_uri = table._conn.uri
|
||||
if base_uri.startswith("memory://"):
|
||||
return {
|
||||
"kind": "memory",
|
||||
"name": table.name,
|
||||
"data": table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
"kind": "local",
|
||||
"name": table.name,
|
||||
"uri": base_uri,
|
||||
"namespace": table._namespace_path,
|
||||
"storage_options": table._conn.storage_options,
|
||||
}
|
||||
|
||||
|
||||
def _table_from_pickle_state(state: dict[str, Any]) -> Table:
|
||||
from . import connect
|
||||
|
||||
kind = state["kind"]
|
||||
if kind == "remote":
|
||||
return state["table"]
|
||||
if kind == "memory":
|
||||
return connect("memory://").create_table(state["name"], state["data"])
|
||||
if kind == "local":
|
||||
db = connect(state["uri"], storage_options=state["storage_options"])
|
||||
return db.open_table(state["name"], namespace_path=state["namespace"] or None)
|
||||
raise ValueError(f"Unknown table pickle state kind: {kind}")
|
||||
|
||||
|
||||
class Permutation:
|
||||
"""
|
||||
A Permutation is a view of a dataset that can be used as input to model training
|
||||
@@ -369,15 +413,15 @@ class Permutation:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable],
|
||||
base_table: Table,
|
||||
permutation_table: Optional[Table],
|
||||
split: int,
|
||||
selection: dict[str, str],
|
||||
batch_size: int,
|
||||
transform_fn: Callable[pa.RecordBatch, Any],
|
||||
offset: Optional[int] = None,
|
||||
limit: Optional[int] = None,
|
||||
connection_factory: Optional[Callable[[str], LanceTable]] = None,
|
||||
connection_factory: Optional[Callable[[str], Table]] = None,
|
||||
_reader: Optional[PermutationReader] = None,
|
||||
):
|
||||
"""
|
||||
@@ -397,6 +441,7 @@ class Permutation:
|
||||
if _reader is None:
|
||||
_reader = LOOP.run(self._build_reader())
|
||||
self.reader: PermutationReader = _reader
|
||||
self._pid = os.getpid()
|
||||
|
||||
async def _build_reader(self) -> PermutationReader:
|
||||
reader = await PermutationReader.from_tables(
|
||||
@@ -428,29 +473,25 @@ class Permutation:
|
||||
return new
|
||||
|
||||
def with_connection_factory(
|
||||
self, connection_factory: Callable[[str], LanceTable]
|
||||
self, connection_factory: Callable[[str], Table]
|
||||
) -> "Permutation":
|
||||
"""
|
||||
Creates a new permutation that will use ``connection_factory`` to reopen
|
||||
the base table when this permutation is unpickled in a worker process.
|
||||
|
||||
The factory is a callable that takes a single argument — the base table
|
||||
name — and returns a [LanceTable]. It must be picklable; the worker
|
||||
The factory is a callable that takes a single argument, the base table
|
||||
name, and returns a LanceDB table. It must be picklable; the worker
|
||||
will pickle it via standard ``pickle`` and call it to recover the base
|
||||
table. Picklable callables in practice means top-level (module-level)
|
||||
functions, ``functools.partial`` of such functions, or instances of
|
||||
picklable classes implementing ``__call__``. Lambdas and closures over
|
||||
local variables don't pickle with the default protocol.
|
||||
|
||||
Setting a factory is necessary when the URI alone is not enough to
|
||||
re-open the connection — most importantly for LanceDB Cloud (``db://``)
|
||||
connections, where ``api_key`` and ``region`` aren't recoverable from
|
||||
the connection object after construction.
|
||||
|
||||
For local file or cloud-storage paths the factory is optional: if not
|
||||
set, ``__getstate__`` falls back to capturing
|
||||
``(uri, storage_options, namespace_path)`` and re-opening via
|
||||
``lancedb.connect(uri, storage_options=...)``.
|
||||
A factory is optional for normal local and remote LanceDB connections:
|
||||
if not set, ``__getstate__`` captures the table's own picklable reopen
|
||||
state. Use a factory when that default state is not enough, for example
|
||||
when credentials should be loaded from the worker environment instead
|
||||
of being embedded in the pickle.
|
||||
|
||||
Examples
|
||||
--------
|
||||
@@ -508,7 +549,7 @@ class Permutation:
|
||||
return new
|
||||
|
||||
@classmethod
|
||||
def identity(cls, table: LanceTable) -> "Permutation":
|
||||
def identity(cls, table: Table) -> "Permutation":
|
||||
"""
|
||||
Creates an identity permutation for the given table.
|
||||
"""
|
||||
@@ -517,8 +558,8 @@ class Permutation:
|
||||
@classmethod
|
||||
def from_tables(
|
||||
cls,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable] = None,
|
||||
base_table: Table,
|
||||
permutation_table: Optional[Table] = None,
|
||||
split: Optional[Union[str, int]] = None,
|
||||
) -> "Permutation":
|
||||
"""
|
||||
@@ -594,19 +635,24 @@ class Permutation:
|
||||
|
||||
The base table is captured either via a user-supplied
|
||||
``connection_factory`` (see [with_connection_factory]) or, as a
|
||||
fallback, by introspecting ``(uri, storage_options, namespace_path)``
|
||||
on the connection. The permutation table — always an in-memory
|
||||
LanceDB table — is captured as a pyarrow Table (which pickles via
|
||||
Arrow IPC natively). The reader is dropped from the wire format;
|
||||
``__setstate__`` rebuilds it from the restored tables.
|
||||
fallback, by the table's own picklable reopen state. An in-memory
|
||||
permutation table is captured as a pyarrow Table (which pickles via
|
||||
Arrow IPC natively); otherwise, the permutation table uses its own
|
||||
reopen state too. The reader is dropped from the wire format and
|
||||
rebuilt lazily on first use.
|
||||
"""
|
||||
permutation_data: Optional[pa.Table] = None
|
||||
permutation_table_state: Optional[dict[str, Any]] = None
|
||||
if self.permutation_table is not None:
|
||||
permutation_data = self.permutation_table.to_arrow()
|
||||
try:
|
||||
permutation_data = self.permutation_table.to_arrow()
|
||||
except NotImplementedError:
|
||||
permutation_table_state = _table_to_pickle_state(self.permutation_table)
|
||||
|
||||
common = {
|
||||
"base_table_name": self.base_table.name,
|
||||
"permutation_data": permutation_data,
|
||||
"permutation_table_state": permutation_table_state,
|
||||
"split": self.split,
|
||||
"selection": self.selection,
|
||||
"batch_size": self.batch_size,
|
||||
@@ -622,39 +668,9 @@ class Permutation:
|
||||
# namespace from the existing connection.
|
||||
return common
|
||||
|
||||
# URI-introspection fallback: only viable for native (OSS) connections
|
||||
# where (uri, storage_options) is enough to reopen. Remote / cloud
|
||||
# connections don't expose recoverable api_key / region — those users
|
||||
# must call with_connection_factory().
|
||||
try:
|
||||
base_uri = self.base_table._conn.uri
|
||||
storage_options = self.base_table._conn.storage_options
|
||||
except AttributeError as e:
|
||||
raise ValueError(
|
||||
"Cannot pickle this Permutation: the base table's connection "
|
||||
"does not expose a uri/storage_options, which usually means it "
|
||||
"is a remote (LanceDB Cloud) connection. Call "
|
||||
"Permutation.with_connection_factory(...) first to provide a "
|
||||
"picklable callable that re-opens the base table from a worker "
|
||||
"process."
|
||||
) from e
|
||||
|
||||
if base_uri.startswith("memory://"):
|
||||
# In-memory base tables don't exist in any worker process by
|
||||
# default, so dump the entire base table into the pickle. This
|
||||
# can be expensive for large datasets — users with large
|
||||
# in-memory base tables should either persist them or set a
|
||||
# connection_factory.
|
||||
return {
|
||||
**common,
|
||||
"base_table_data": self.base_table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
**common,
|
||||
"base_table_uri": base_uri,
|
||||
"base_table_namespace": self.base_table._namespace_path,
|
||||
"base_table_storage_options": storage_options,
|
||||
"base_table_state": _table_to_pickle_state(self.base_table),
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict[str, Any]) -> None:
|
||||
@@ -663,6 +679,8 @@ class Permutation:
|
||||
connection_factory = state["connection_factory"]
|
||||
if connection_factory is not None:
|
||||
base_table = connection_factory(state["base_table_name"])
|
||||
elif "base_table_state" in state:
|
||||
base_table = _table_from_pickle_state(state["base_table_state"])
|
||||
elif "base_table_data" in state:
|
||||
# In-memory base table inlined into the pickle; rebuild the same
|
||||
# way we rebuild the in-memory permutation table.
|
||||
@@ -680,8 +698,12 @@ class Permutation:
|
||||
namespace_path=state["base_table_namespace"] or None,
|
||||
)
|
||||
|
||||
permutation_table: Optional[LanceTable] = None
|
||||
if state["permutation_data"] is not None:
|
||||
permutation_table: Optional[Table] = None
|
||||
if state.get("permutation_table_state") is not None:
|
||||
permutation_table = _table_from_pickle_state(
|
||||
state["permutation_table_state"]
|
||||
)
|
||||
elif state["permutation_data"] is not None:
|
||||
mem_db = connect("memory://")
|
||||
permutation_table = mem_db.create_table(
|
||||
"permutation", state["permutation_data"]
|
||||
@@ -696,10 +718,26 @@ class Permutation:
|
||||
self.offset = state["offset"]
|
||||
self.limit = state["limit"]
|
||||
self.connection_factory = connection_factory
|
||||
self.reader = None
|
||||
self._pid = None
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if self.reader is not None and getattr(self, "_pid", None) == pid:
|
||||
return
|
||||
if hasattr(self.base_table, "_ensure_open"):
|
||||
self.base_table._ensure_open()
|
||||
if self.permutation_table is not None and hasattr(
|
||||
self.permutation_table, "_ensure_open"
|
||||
):
|
||||
self.permutation_table._ensure_open()
|
||||
self.reader = LOOP.run(self._build_reader())
|
||||
self._pid = pid
|
||||
|
||||
@property
|
||||
def schema(self) -> pa.Schema:
|
||||
self._ensure_open()
|
||||
|
||||
async def do_output_schema():
|
||||
return await self.reader.output_schema(self.selection)
|
||||
|
||||
@@ -717,6 +755,7 @@ class Permutation:
|
||||
"""
|
||||
The number of rows in the permutation
|
||||
"""
|
||||
self._ensure_open()
|
||||
return self.reader.count_rows()
|
||||
|
||||
@property
|
||||
@@ -875,6 +914,7 @@ class Permutation:
|
||||
If skip_last_batch is True, the last batch will be skipped if it is not a
|
||||
multiple of batch_size.
|
||||
"""
|
||||
self._ensure_open()
|
||||
|
||||
async def get_iter():
|
||||
return await self.reader.read(self.selection, batch_size=batch_size)
|
||||
@@ -976,6 +1016,7 @@ class Permutation:
|
||||
so `with_format` and `with_transform` affect this method in the same way
|
||||
they affect iteration.
|
||||
"""
|
||||
self._ensure_open()
|
||||
|
||||
async def do_take_offsets():
|
||||
return await self.reader.take_offsets(offsets, selection=self.selection)
|
||||
@@ -1011,9 +1052,11 @@ class Permutation:
|
||||
"""
|
||||
Skip the first `skip` rows of the permutation
|
||||
"""
|
||||
self._ensure_open()
|
||||
new = copy.copy(self)
|
||||
new.offset = skip
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
new._pid = os.getpid()
|
||||
return new
|
||||
|
||||
@deprecated(details="Use with_take instead")
|
||||
@@ -1032,9 +1075,11 @@ class Permutation:
|
||||
"""
|
||||
Limit the permutation to `limit` rows (following any `skip`)
|
||||
"""
|
||||
self._ensure_open()
|
||||
new = copy.copy(self)
|
||||
new.limit = limit
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
new._pid = os.getpid()
|
||||
return new
|
||||
|
||||
@deprecated(details="Use with_repeat instead")
|
||||
|
||||
@@ -3,12 +3,14 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from datetime import timedelta
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
@@ -17,41 +19,40 @@ from typing import (
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
Any,
|
||||
)
|
||||
|
||||
import asyncio
|
||||
import deprecation
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
import pyarrow.compute as pc
|
||||
import pydantic
|
||||
from typing_extensions import Annotated
|
||||
|
||||
from lancedb.pydantic import PYDANTIC_VERSION
|
||||
from lancedb._lancedb import fts_query_to_json
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.pydantic import PYDANTIC_VERSION
|
||||
|
||||
from . import __version__
|
||||
from .arrow import AsyncRecordBatchReader
|
||||
from .dependencies import pandas as pd
|
||||
from .expr import Expr
|
||||
from .rerankers.base import Reranker
|
||||
from .rerankers.rrf import RRFReranker
|
||||
from .rerankers.util import check_reranker_result
|
||||
from .util import flatten_columns
|
||||
from .expr import Expr
|
||||
from lancedb._lancedb import fts_query_to_json
|
||||
from typing_extensions import Annotated
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import sys
|
||||
|
||||
import PIL
|
||||
import polars as pl
|
||||
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import FTSQuery as LanceFTSQuery
|
||||
from ._lancedb import HybridQuery as LanceHybridQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from ._lancedb import TakeQuery as LanceTakeQuery
|
||||
from ._lancedb import PyQueryRequest
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import TakeQuery as LanceTakeQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from .common import VEC
|
||||
from .pydantic import LanceModel
|
||||
from .table import Table
|
||||
@@ -3348,16 +3349,18 @@ class BaseQueryBuilder(object):
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
"""
|
||||
async_iter = LOOP.run(self._inner.execute(max_batch_length, timeout))
|
||||
async_reader = LOOP.run(
|
||||
self._inner.to_batches(max_batch_length=max_batch_length, timeout=timeout)
|
||||
)
|
||||
|
||||
def iter_sync():
|
||||
try:
|
||||
while True:
|
||||
yield LOOP.run(async_iter.__anext__())
|
||||
yield LOOP.run(async_reader.__anext__())
|
||||
except StopAsyncIteration:
|
||||
return
|
||||
|
||||
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
|
||||
return pa.RecordBatchReader.from_batches(async_reader.schema, iter_sync())
|
||||
|
||||
def to_arrow(self, timeout: Optional[timedelta] = None) -> pa.Table:
|
||||
"""
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import sys
|
||||
@@ -17,7 +18,7 @@ else:
|
||||
|
||||
# Remove this import to fix circular dependency
|
||||
# from lancedb import connect_async
|
||||
from lancedb.remote import ClientConfig
|
||||
from lancedb.remote import ClientConfig, RetryConfig, TimeoutConfig, TlsConfig
|
||||
import pyarrow as pa
|
||||
|
||||
from ..common import DATA
|
||||
@@ -36,6 +37,64 @@ from ..table import Table
|
||||
from ..util import validate_table_name
|
||||
|
||||
|
||||
def _duration_seconds(value: Optional[timedelta]) -> Optional[float]:
|
||||
return value.total_seconds() if value is not None else None
|
||||
|
||||
|
||||
def _timeout_config_to_dict(
|
||||
config: Optional[TimeoutConfig],
|
||||
) -> Optional[dict[str, Any]]:
|
||||
if config is None:
|
||||
return None
|
||||
return {
|
||||
"timeout": _duration_seconds(config.timeout),
|
||||
"connect_timeout": _duration_seconds(config.connect_timeout),
|
||||
"read_timeout": _duration_seconds(config.read_timeout),
|
||||
"pool_idle_timeout": _duration_seconds(config.pool_idle_timeout),
|
||||
}
|
||||
|
||||
|
||||
def _retry_config_to_dict(config: RetryConfig) -> dict[str, Any]:
|
||||
return {
|
||||
"retries": config.retries,
|
||||
"connect_retries": config.connect_retries,
|
||||
"read_retries": config.read_retries,
|
||||
"backoff_factor": config.backoff_factor,
|
||||
"backoff_jitter": config.backoff_jitter,
|
||||
"statuses": config.statuses,
|
||||
}
|
||||
|
||||
|
||||
def _tls_config_to_dict(config: Optional[TlsConfig]) -> Optional[dict[str, Any]]:
|
||||
if config is None:
|
||||
return None
|
||||
return {
|
||||
"cert_file": config.cert_file,
|
||||
"key_file": config.key_file,
|
||||
"ssl_ca_cert": config.ssl_ca_cert,
|
||||
"assert_hostname": config.assert_hostname,
|
||||
}
|
||||
|
||||
|
||||
def _client_config_to_dict(config: ClientConfig) -> dict[str, Any]:
|
||||
if config.header_provider is not None:
|
||||
raise ValueError(
|
||||
"Cannot serialize a remote connection with a header_provider. "
|
||||
"Use static api_key/extra_headers or provide a worker-side "
|
||||
"connection factory instead."
|
||||
)
|
||||
return {
|
||||
"user_agent": config.user_agent,
|
||||
"retry_config": _retry_config_to_dict(config.retry_config),
|
||||
"timeout_config": _timeout_config_to_dict(config.timeout_config),
|
||||
"extra_headers": config.extra_headers,
|
||||
"id_delimiter": config.id_delimiter,
|
||||
"tls_config": _tls_config_to_dict(config.tls_config),
|
||||
"header_provider": None,
|
||||
"user_id": config.user_id,
|
||||
}
|
||||
|
||||
|
||||
class RemoteDBConnection(DBConnection):
|
||||
"""A connection to a remote LanceDB database."""
|
||||
|
||||
@@ -88,6 +147,11 @@ class RemoteDBConnection(DBConnection):
|
||||
parsed = urlparse(db_url)
|
||||
if parsed.scheme != "db":
|
||||
raise ValueError(f"Invalid scheme: {parsed.scheme}, only accepts db://")
|
||||
self.db_url = db_url
|
||||
self.api_key = api_key
|
||||
self.region = region
|
||||
self.host_override = host_override
|
||||
self.storage_options = storage_options
|
||||
self.db_name = parsed.netloc
|
||||
|
||||
self.client_config = client_config
|
||||
@@ -109,6 +173,20 @@ class RemoteDBConnection(DBConnection):
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteConnect(name={self.db_name})"
|
||||
|
||||
@override
|
||||
def serialize(self) -> str:
|
||||
return json.dumps(
|
||||
{
|
||||
"connection_type": "remote",
|
||||
"db_url": self.db_url,
|
||||
"api_key": self.api_key,
|
||||
"region": self.region,
|
||||
"host_override": self.host_override,
|
||||
"client_config": _client_config_to_dict(self.client_config),
|
||||
"storage_options": self.storage_options,
|
||||
}
|
||||
)
|
||||
|
||||
@override
|
||||
def list_namespaces(
|
||||
self,
|
||||
@@ -329,7 +407,12 @@ class RemoteDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
@@ -378,7 +461,12 @@ class RemoteDBConnection(DBConnection):
|
||||
is_shallow=is_shallow,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=target_namespace_path,
|
||||
)
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
@@ -523,7 +611,12 @@ class RemoteDBConnection(DBConnection):
|
||||
fill_value=fill_value,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
|
||||
@override
|
||||
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from functools import cached_property
|
||||
import os
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, Literal
|
||||
import warnings
|
||||
|
||||
@@ -49,14 +50,78 @@ class RemoteTable(Table):
|
||||
self,
|
||||
table: AsyncTable,
|
||||
db_name: str,
|
||||
*,
|
||||
connection_state: Optional[Union[str, Callable[[], str]]] = None,
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
):
|
||||
self._table = table
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self.db_name = db_name
|
||||
self._connection_state = connection_state
|
||||
self._namespace_path = list(namespace_path or [])
|
||||
self._checkout_version: Optional[int] = None
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _serialized_connection_state(self) -> str:
|
||||
if self._connection_state is None:
|
||||
raise RuntimeError(
|
||||
"Cannot reopen this remote table because it does not carry "
|
||||
"serialized connection state"
|
||||
)
|
||||
if callable(self._connection_state):
|
||||
self._connection_state = self._connection_state()
|
||||
return self._connection_state
|
||||
|
||||
@property
|
||||
def _table(self) -> AsyncTable:
|
||||
self._ensure_open()
|
||||
assert self._table_handle is not None
|
||||
return self._table_handle
|
||||
|
||||
@_table.setter
|
||||
def _table(self, table: AsyncTable) -> None:
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if self._table_handle is not None and self._pid == pid:
|
||||
return
|
||||
|
||||
from lancedb import deserialize_conn
|
||||
|
||||
db = deserialize_conn(self._serialized_connection_state(), for_worker=True)
|
||||
table = db.open_table(self._name, namespace_path=self._namespace_path)
|
||||
if self._checkout_version is not None:
|
||||
table.checkout(self._checkout_version)
|
||||
|
||||
self._table_handle = table._table
|
||||
self.db_name = table.db_name
|
||||
self._pid = pid
|
||||
|
||||
def __getstate__(self) -> dict:
|
||||
return {
|
||||
"connection_state": self._serialized_connection_state(),
|
||||
"db_name": self.db_name,
|
||||
"name": self.name,
|
||||
"namespace_path": self._namespace_path,
|
||||
"checkout_version": self._checkout_version,
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict) -> None:
|
||||
self._table_handle = None
|
||||
self._name = state["name"]
|
||||
self.db_name = state["db_name"]
|
||||
self._connection_state = state["connection_state"]
|
||||
self._namespace_path = state["namespace_path"]
|
||||
self._checkout_version = state["checkout_version"]
|
||||
self._pid = None
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table"""
|
||||
return self._table.name
|
||||
return self._name
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteTable({self.db_name}.{self.name})"
|
||||
@@ -106,13 +171,19 @@ class RemoteTable(Table):
|
||||
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
|
||||
|
||||
def checkout(self, version: Union[int, str]):
|
||||
return LOOP.run(self._table.checkout(version))
|
||||
result = LOOP.run(self._table.checkout(version))
|
||||
self._checkout_version = self.version
|
||||
return result
|
||||
|
||||
def checkout_latest(self):
|
||||
return LOOP.run(self._table.checkout_latest())
|
||||
result = LOOP.run(self._table.checkout_latest())
|
||||
self._checkout_version = None
|
||||
return result
|
||||
|
||||
def restore(self, version: Optional[Union[int, str]] = None):
|
||||
return LOOP.run(self._table.restore(version))
|
||||
result = LOOP.run(self._table.restore(version))
|
||||
self._checkout_version = None
|
||||
return result
|
||||
|
||||
def list_indices(self) -> Iterable[IndexConfig]:
|
||||
"""List all the indices on the table"""
|
||||
|
||||
@@ -2178,7 +2178,7 @@ class LanceTable(Table):
|
||||
return LOOP.run(self._table.count_rows(filter))
|
||||
|
||||
def __repr__(self) -> str:
|
||||
val = f"{self.__class__.__name__}(name={self.name!r}, version={self.version}"
|
||||
val = f"{self.__class__.__name__}(name={self.name!r}"
|
||||
if self._conn.read_consistency_interval is not None:
|
||||
val += ", read_consistency_interval={!r}".format(
|
||||
self._conn.read_consistency_interval
|
||||
|
||||
@@ -10,7 +10,7 @@ import pathlib
|
||||
import warnings
|
||||
from datetime import date, datetime
|
||||
from functools import singledispatch
|
||||
from typing import Tuple, Union, Optional, Any
|
||||
from typing import Tuple, Union, Optional, Any, List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import numpy as np
|
||||
@@ -189,7 +189,33 @@ def flatten_columns(tbl: pa.Table, flatten: Optional[Union[int, bool]] = None):
|
||||
return tbl
|
||||
|
||||
|
||||
def inf_vector_column_query(schema: pa.Schema) -> str:
|
||||
def _format_field_path(path: List[str]) -> str:
|
||||
def format_segment(segment: str) -> str:
|
||||
if all(char.isalnum() or char == "_" for char in segment):
|
||||
return segment
|
||||
return f"`{segment.replace('`', '``')}`"
|
||||
|
||||
return ".".join(format_segment(segment) for segment in path)
|
||||
|
||||
|
||||
def _iter_vector_columns(
|
||||
field: pa.Field, path: List[str], dim: Optional[int] = None
|
||||
) -> List[str]:
|
||||
field_path = [*path, field.name]
|
||||
if is_vector_column(field.type):
|
||||
vector_dim = infer_vector_column_dim(field.type)
|
||||
if dim is None or vector_dim == dim:
|
||||
return [_format_field_path(field_path)]
|
||||
return []
|
||||
if pa.types.is_struct(field.type):
|
||||
columns = []
|
||||
for idx in range(field.type.num_fields):
|
||||
columns.extend(_iter_vector_columns(field.type.field(idx), field_path, dim))
|
||||
return columns
|
||||
return []
|
||||
|
||||
|
||||
def inf_vector_column_query(schema: pa.Schema, dim: Optional[int] = None) -> str:
|
||||
"""
|
||||
Get the vector column name
|
||||
|
||||
@@ -202,26 +228,21 @@ def inf_vector_column_query(schema: pa.Schema) -> str:
|
||||
-------
|
||||
str: the vector column name.
|
||||
"""
|
||||
vector_col_name = ""
|
||||
vector_col_count = 0
|
||||
for field_name in schema.names:
|
||||
field = schema.field(field_name)
|
||||
if is_vector_column(field.type):
|
||||
vector_col_count += 1
|
||||
if vector_col_count > 1:
|
||||
raise ValueError(
|
||||
"Schema has more than one vector column. "
|
||||
"Please specify the vector column name "
|
||||
"for vector search"
|
||||
)
|
||||
elif vector_col_count == 1:
|
||||
vector_col_name = field_name
|
||||
if vector_col_count == 0:
|
||||
vector_col_names = []
|
||||
for field in schema:
|
||||
vector_col_names.extend(_iter_vector_columns(field, [], dim))
|
||||
if len(vector_col_names) > 1:
|
||||
raise ValueError(
|
||||
"Schema has more than one vector column. "
|
||||
"Please specify the vector column name "
|
||||
f"for vector search. Candidates: {vector_col_names}"
|
||||
)
|
||||
if len(vector_col_names) == 0:
|
||||
raise ValueError(
|
||||
"There is no vector column in the data. "
|
||||
"Please specify the vector column name for vector search"
|
||||
)
|
||||
return vector_col_name
|
||||
return vector_col_names[0]
|
||||
|
||||
|
||||
def is_vector_column(data_type: pa.DataType) -> bool:
|
||||
@@ -247,6 +268,29 @@ def is_vector_column(data_type: pa.DataType) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def infer_vector_column_dim(data_type: pa.DataType) -> Optional[int]:
|
||||
if pa.types.is_fixed_size_list(data_type):
|
||||
return data_type.list_size
|
||||
if pa.types.is_list(data_type):
|
||||
return infer_vector_column_dim(data_type.value_type)
|
||||
return None
|
||||
|
||||
|
||||
def _query_vector_dim(query: Optional[Any]) -> Optional[int]:
|
||||
if query is None:
|
||||
return None
|
||||
if isinstance(query, np.ndarray):
|
||||
if query.ndim == 0:
|
||||
return None
|
||||
return query.shape[-1]
|
||||
if isinstance(query, list) and query:
|
||||
first = query[0]
|
||||
if isinstance(first, (list, tuple, np.ndarray)):
|
||||
return len(first)
|
||||
return len(query)
|
||||
return None
|
||||
|
||||
|
||||
def infer_vector_column_name(
|
||||
schema: pa.Schema,
|
||||
query_type: str,
|
||||
@@ -262,7 +306,9 @@ def infer_vector_column_name(
|
||||
|
||||
if query is not None or query_type == "hybrid":
|
||||
try:
|
||||
vector_column_name = inf_vector_column_query(schema)
|
||||
vector_column_name = inf_vector_column_query(
|
||||
schema, dim=_query_vector_dim(query)
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import re
|
||||
import sys
|
||||
from datetime import timedelta
|
||||
import os
|
||||
from types import SimpleNamespace
|
||||
|
||||
import lancedb
|
||||
import numpy as np
|
||||
@@ -188,6 +189,43 @@ def test_table_names(tmp_db: lancedb.DBConnection):
|
||||
assert len(result) == 3
|
||||
|
||||
|
||||
def test_db_contains_and_len_include_all_table_name_pages(tmp_db: lancedb.DBConnection):
|
||||
for idx in range(20):
|
||||
tmp_db.create_table(f"table_{idx}", data=[{"id": idx}])
|
||||
|
||||
assert len(tmp_db) == 20
|
||||
for idx in range(20):
|
||||
assert f"table_{idx}" in tmp_db
|
||||
assert "does_not_exist" not in tmp_db
|
||||
|
||||
|
||||
def test_db_contains_stops_after_matching_table_page(
|
||||
tmp_db: lancedb.DBConnection, monkeypatch
|
||||
):
|
||||
calls = []
|
||||
pages = {
|
||||
None: SimpleNamespace(tables=["table_0", "table_1"], page_token="next"),
|
||||
"next": SimpleNamespace(tables=["table_2"], page_token=None),
|
||||
}
|
||||
|
||||
def list_tables(*, page_token=None, **_kwargs):
|
||||
calls.append(page_token)
|
||||
return pages[page_token]
|
||||
|
||||
monkeypatch.setattr(tmp_db, "list_tables", list_tables)
|
||||
|
||||
assert "table_1" in tmp_db
|
||||
assert calls == [None]
|
||||
|
||||
calls.clear()
|
||||
assert "table_2" in tmp_db
|
||||
assert calls == [None, "next"]
|
||||
|
||||
calls.clear()
|
||||
assert len(tmp_db) == 3
|
||||
assert calls == [None, "next"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_table_names_async(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
@@ -563,7 +563,7 @@ def test_create_index_multiple_columns(tmp_path, table):
|
||||
|
||||
|
||||
def test_nested_schema(tmp_path, table):
|
||||
table.create_fts_index("nested.text")
|
||||
table.create_fts_index("nested.text", with_position=True)
|
||||
indices = table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type == "FTS"
|
||||
@@ -577,6 +577,98 @@ def test_nested_schema(tmp_path, table):
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
results = table.search(MatchQuery("puppy", "nested.text")).limit(5).to_list()
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
phrase_results = (
|
||||
table.search(PhraseQuery("puppy runs", "nested.text")).limit(5).to_list()
|
||||
)
|
||||
assert len(phrase_results) > 0
|
||||
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
|
||||
|
||||
hybrid_results = (
|
||||
table.search(query_type="hybrid", fts_columns="nested.text")
|
||||
.vector([0 for _ in range(128)])
|
||||
.text("puppy")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_schema_async(async_table):
|
||||
await async_table.create_index("nested.text", config=FTS(with_position=True))
|
||||
indices = await async_table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type == "FTS"
|
||||
assert indices[0].columns == ["nested.text"]
|
||||
|
||||
results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text("puppy", columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text(MatchQuery("puppy", "nested.text"))
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
phrase_results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text(PhraseQuery("puppy runs", "nested.text"))
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(phrase_results) > 0
|
||||
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
|
||||
|
||||
hybrid_results = await (
|
||||
async_table.query()
|
||||
.nearest_to([0 for _ in range(128)])
|
||||
.nearest_to_text("puppy", columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
|
||||
def test_nested_schema_rejects_invalid_fts_fields(tmp_path):
|
||||
db = ldb.connect(tmp_path)
|
||||
data = pa.table(
|
||||
{
|
||||
"payload": pa.array(
|
||||
[
|
||||
{"text": "puppy runs", "count": 1},
|
||||
{"text": "car drives", "count": 2},
|
||||
]
|
||||
),
|
||||
"vector": pa.array(
|
||||
[[0.1, 0.1], [0.2, 0.2]],
|
||||
type=pa.list_(pa.float32(), list_size=2),
|
||||
),
|
||||
}
|
||||
)
|
||||
table = db.create_table("test", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="FTS index cannot be created.*payload"):
|
||||
table.create_fts_index("payload")
|
||||
|
||||
with pytest.raises(ValueError, match="FTS index cannot be created.*count"):
|
||||
table.create_fts_index("payload.count")
|
||||
|
||||
with pytest.raises(ValueError, match="Field path `payload.missing` not found"):
|
||||
table.create_fts_index("payload.missing")
|
||||
|
||||
|
||||
def test_search_index_with_filter(table):
|
||||
table.create_fts_index("text")
|
||||
|
||||
@@ -105,6 +105,46 @@ async def test_create_scalar_index(some_table: AsyncTable):
|
||||
assert len(indices) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
|
||||
metadata_type = pa.struct(
|
||||
[
|
||||
pa.field("user_id", pa.int32()),
|
||||
pa.field("user.id", pa.int32()),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_arrays(
|
||||
[
|
||||
pa.array([1, 2, 3], type=pa.int32()),
|
||||
pa.array(
|
||||
[
|
||||
{"user_id": 10, "user.id": 100},
|
||||
{"user_id": 20, "user.id": 200},
|
||||
{"user_id": 30, "user.id": 300},
|
||||
],
|
||||
type=metadata_type,
|
||||
),
|
||||
],
|
||||
names=["user_id", "metadata"],
|
||||
)
|
||||
table = await db_async.create_table("nested_scalar_index", data)
|
||||
|
||||
await table.create_index("user_id", config=BTree(), name="top_user_id_idx")
|
||||
await table.create_index(
|
||||
"metadata.user_id", config=BTree(), name="nested_user_id_idx"
|
||||
)
|
||||
await table.create_index(
|
||||
"metadata.`user.id`", config=BTree(), name="escaped_user_id_idx"
|
||||
)
|
||||
|
||||
columns_by_name = {
|
||||
index.name: index.columns for index in await table.list_indices()
|
||||
}
|
||||
assert columns_by_name["top_user_id_idx"] == ["user_id"]
|
||||
assert columns_by_name["nested_user_id_idx"] == ["metadata.user_id"]
|
||||
assert columns_by_name["escaped_user_id_idx"] == ["metadata.`user.id`"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
|
||||
await some_table.create_index("fsb", config=BTree())
|
||||
|
||||
@@ -1512,6 +1512,37 @@ def test_take_queries(tmp_path):
|
||||
]
|
||||
|
||||
|
||||
def test_take_queries_to_batches(tmp_path):
|
||||
# Regression test for the sync take-query path: `to_batches` previously
|
||||
# raised ``AttributeError: 'AsyncTakeQuery' object has no attribute
|
||||
# 'execute'`` because the inherited ``BaseQueryBuilder.to_batches`` called
|
||||
# ``execute`` on the async wrapper instead of the native query.
|
||||
db = lancedb.connect(tmp_path)
|
||||
data = pa.table({"idx": list(range(100)), "label": [str(i) for i in range(100)]})
|
||||
table = db.create_table("test", data)
|
||||
|
||||
# Take by offset → to_batches
|
||||
rs = list(table.take_offsets([5, 2, 17]).to_batches())
|
||||
assert all(isinstance(b, pa.RecordBatch) for b in rs)
|
||||
assert sum(b.num_rows for b in rs) == 3
|
||||
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
|
||||
|
||||
# Take by row id → to_batches
|
||||
rs = list(table.take_row_ids([5, 2, 17]).to_batches())
|
||||
assert all(isinstance(b, pa.RecordBatch) for b in rs)
|
||||
assert sum(b.num_rows for b in rs) == 3
|
||||
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
|
||||
|
||||
# Take with select projection → to_batches preserves the projection
|
||||
rs = list(table.take_row_ids([5, 2, 17]).select(["label"]).to_batches())
|
||||
assert all(b.schema.names == ["label"] for b in rs)
|
||||
assert sorted(v for b in rs for v in b.column("label").to_pylist()) == [
|
||||
"17",
|
||||
"2",
|
||||
"5",
|
||||
]
|
||||
|
||||
|
||||
def test_getitems(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
data = pa.table(
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
import re
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import contextlib
|
||||
from datetime import timedelta
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
@@ -171,6 +172,155 @@ def test_table_len_sync():
|
||||
assert len(table) == 1
|
||||
|
||||
|
||||
def test_remote_connection_serializes():
|
||||
def handler(request):
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"tables": []}')
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
serialized = json.loads(db.serialize())
|
||||
assert isinstance(serialized["client_config"], dict)
|
||||
restored = lancedb.deserialize_conn(db.serialize())
|
||||
assert restored.table_names() == []
|
||||
|
||||
|
||||
def test_remote_table_is_picklable():
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "id", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.open_table("test")
|
||||
restored = pickle.loads(pickle.dumps(table))
|
||||
assert restored.count_rows() == 3
|
||||
|
||||
|
||||
def test_remote_table_open_does_not_require_picklable_client_config():
|
||||
from lancedb.remote import HeaderProvider
|
||||
|
||||
class LocalHeaderProvider(HeaderProvider):
|
||||
def get_headers(self):
|
||||
return {"X-Test-Header": "present"}
|
||||
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
assert request.headers.get("X-Test-Header") == "present"
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with http.server.HTTPServer(
|
||||
("localhost", 0), make_mock_http_handler(handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
"header_provider": LocalHeaderProvider(),
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
assert table.count_rows() == 3
|
||||
with pytest.raises(ValueError, match="header_provider"):
|
||||
pickle.dumps(table)
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
def test_remote_permutation_is_picklable():
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
rows = list(range(10))
|
||||
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "a", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(str(len(rows)).encode())
|
||||
elif request.path == "/v1/table/test/query/":
|
||||
content_len = int(request.headers.get("Content-Length"))
|
||||
body = json.loads(request.rfile.read(content_len))
|
||||
if "filter" in body:
|
||||
match = re.search(r"_rowoffset in \((.*?)\)", body["filter"])
|
||||
offsets = [int(offset.strip()) for offset in match.group(1).split(",")]
|
||||
else:
|
||||
offsets = rows
|
||||
table = pa.table({"a": [rows[offset] for offset in offsets]})
|
||||
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
|
||||
request.end_headers()
|
||||
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
permutation = Permutation.identity(db.open_table("test"))
|
||||
restored = pickle.loads(pickle.dumps(permutation))
|
||||
assert restored.__getitems__([0, 2, 4]) == [{"a": 0}, {"a": 2}, {"a": 4}]
|
||||
|
||||
|
||||
def test_create_table_exist_ok():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=exist_ok":
|
||||
@@ -362,6 +512,22 @@ def test_table_create_indices():
|
||||
schema=dict(
|
||||
fields=[
|
||||
dict(name="id", type={"type": "int64"}, nullable=False),
|
||||
dict(name="text", type={"type": "string"}, nullable=False),
|
||||
dict(
|
||||
name="vector",
|
||||
type={
|
||||
"type": "fixed_size_list",
|
||||
"fields": [
|
||||
dict(
|
||||
name="item",
|
||||
type={"type": "float"},
|
||||
nullable=True,
|
||||
)
|
||||
],
|
||||
"length": 2,
|
||||
},
|
||||
nullable=False,
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
@@ -1289,6 +1455,10 @@ def _remote_fork_child(port: int, queue) -> None:
|
||||
queue.put(db.table_names())
|
||||
|
||||
|
||||
def _remote_table_fork_child(table, queue) -> None:
|
||||
queue.put(table.count_rows())
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
@@ -1351,3 +1521,65 @@ def test_remote_connection_after_fork():
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_inherited_remote_table_reopens_after_fork():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"7")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
|
||||
port = server.server_address[1]
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
assert table.count_rows() == 7
|
||||
|
||||
ctx = mp.get_context("fork")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(target=_remote_table_fork_child, args=(table, queue))
|
||||
proc.start()
|
||||
proc.join(timeout=15)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail("Remote table hung after fork")
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no result"
|
||||
assert queue.get() == 7
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
@@ -33,7 +33,7 @@ def test_basic(mem_db: DBConnection):
|
||||
table = mem_db.create_table("test", data=data)
|
||||
|
||||
assert table.name == "test"
|
||||
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
|
||||
assert "LanceTable(name='test', _conn=LanceDBConnection(" in repr(table)
|
||||
expected_schema = pa.schema(
|
||||
{
|
||||
"vector": pa.list_(pa.float32(), 2),
|
||||
@@ -1934,6 +1934,10 @@ def test_create_index_nested_field_paths(mem_db: DBConnection):
|
||||
assert len(vector_results) == 1
|
||||
assert vector_results[0]["metadata"]["user_id"] == 0
|
||||
|
||||
default_vector_results = table.search([0.0, 1.0]).limit(1).to_list()
|
||||
assert len(default_vector_results) == 1
|
||||
assert default_vector_results[0]["metadata"]["user_id"] == 0
|
||||
|
||||
filtered_results = table.search().where("metadata.user_id = 42").limit(1).to_list()
|
||||
assert len(filtered_results) == 1
|
||||
assert filtered_results[0]["metadata"]["user_id"] == 42
|
||||
@@ -2013,6 +2017,74 @@ def test_search_with_schema_inf_multiple_vector(mem_db: DBConnection):
|
||||
table.search(q).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_search_infers_single_nested_vector(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int32()),
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{"id": 0, "image": {"embedding": [0.0, 1.0]}},
|
||||
{"id": 1, "image": {"embedding": [10.0, 11.0]}},
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_default_search", data=data)
|
||||
|
||||
result = table.search([0.0, 1.0]).limit(1).to_list()
|
||||
assert result[0]["id"] == 0
|
||||
|
||||
|
||||
def test_search_nested_vector_multiple_candidates(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
pa.field(
|
||||
"text",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{
|
||||
"image": {"embedding": [0.0, 1.0]},
|
||||
"text": {"embedding": [2.0, 3.0]},
|
||||
}
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_multiple_candidates", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="image.embedding.*text.embedding"):
|
||||
table.search([0.0, 1.0]).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_search_nested_vector_no_candidates(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int32()),
|
||||
pa.field("metadata", pa.struct([pa.field("label", pa.string())])),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[{"id": 0, "metadata": {"label": "cat"}}],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_no_candidates", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="no vector column"):
|
||||
table.search([0.0, 1.0]).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_compact_cleanup(tmp_db: DBConnection):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
@@ -15,6 +20,107 @@ from lancedb.util import tbl_to_tensor
|
||||
torch = pytest.importorskip("torch")
|
||||
|
||||
|
||||
REMOTE_ROWS = list(range(100))
|
||||
|
||||
|
||||
def _make_mock_http_handler(handler):
|
||||
class MockLanceDBHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
handler(self)
|
||||
|
||||
def do_POST(self):
|
||||
handler(self)
|
||||
|
||||
return MockLanceDBHandler
|
||||
|
||||
|
||||
def _remote_schema_payload():
|
||||
return {
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "a", "type": {"type": "int64"}, "nullable": False},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _offsets_from_filter(filter_sql: str | None) -> list[int]:
|
||||
if filter_sql is None:
|
||||
return REMOTE_ROWS
|
||||
match = re.search(r"_rowoffset in \((.*?)\)", filter_sql)
|
||||
if match is None:
|
||||
return REMOTE_ROWS
|
||||
raw_offsets = match.group(1).strip()
|
||||
if raw_offsets == "":
|
||||
return []
|
||||
return [int(offset.strip()) for offset in raw_offsets.split(",")]
|
||||
|
||||
|
||||
def _remote_dataset_handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(json.dumps(_remote_schema_payload()).encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(str(len(REMOTE_ROWS)).encode())
|
||||
elif request.path == "/v1/table/test/query/":
|
||||
content_len = int(request.headers.get("Content-Length"))
|
||||
body = json.loads(request.rfile.read(content_len))
|
||||
offsets = _offsets_from_filter(body.get("filter"))
|
||||
requested_columns = body.get("columns") or ["a"]
|
||||
if isinstance(requested_columns, dict):
|
||||
requested_columns = list(requested_columns)
|
||||
|
||||
data = {}
|
||||
for column in requested_columns:
|
||||
if column == "a":
|
||||
data[column] = [REMOTE_ROWS[offset] for offset in offsets]
|
||||
elif column == "_rowoffset":
|
||||
data[column] = offsets
|
||||
elif column == "_rowid":
|
||||
data[column] = offsets
|
||||
|
||||
table = pa.table(data)
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
|
||||
request.end_headers()
|
||||
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _remote_dataset_table():
|
||||
with http.server.ThreadingHTTPServer(
|
||||
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
yield db.open_table("test")
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
def _open_native_table(uri: str, table_name: str):
|
||||
"""Top-level connection factory used by the explicit-factory pickle test.
|
||||
|
||||
@@ -107,6 +213,39 @@ def test_permutation_dataloader_multiprocessing(tmp_db):
|
||||
assert seen == 1000
|
||||
|
||||
|
||||
def test_remote_table_dataloader_multiprocessing():
|
||||
with _remote_dataset_table() as table:
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
table,
|
||||
collate_fn=tbl_to_tensor,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch.size(0) == 1
|
||||
assert batch.size(1) == 10
|
||||
seen += batch.size(1)
|
||||
assert seen == len(REMOTE_ROWS)
|
||||
|
||||
|
||||
def test_remote_permutation_dataloader_multiprocessing():
|
||||
with _remote_dataset_table() as table:
|
||||
permutation = Permutation.identity(table)
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
seen += batch["a"].size(0)
|
||||
assert seen == len(REMOTE_ROWS)
|
||||
|
||||
|
||||
def test_permutation_pickle_with_connection_factory(tmp_path):
|
||||
"""When the user provides a connection_factory, pickling should round-trip
|
||||
through that factory rather than introspecting the connection URI. Useful
|
||||
@@ -171,6 +310,35 @@ def _multiworker_dataloader_target(db_uri: str, result_queue):
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
def _remote_multiworker_dataloader_target(port: int, result_queue):
|
||||
import lancedb
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="fork",
|
||||
)
|
||||
count = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
count += 1
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
@@ -208,3 +376,46 @@ def test_permutation_dataloader_fork_workers(tmp_path):
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 100
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_remote_permutation_dataloader_fork_workers():
|
||||
with http.server.ThreadingHTTPServer(
|
||||
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
ctx = mp.get_context("spawn")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(
|
||||
target=_remote_multiworker_dataloader_target,
|
||||
args=(port, queue),
|
||||
)
|
||||
proc.start()
|
||||
proc.join(timeout=30)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail(
|
||||
"Remote permutation hung when iterated in a fork-based "
|
||||
"DataLoader worker"
|
||||
)
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 10
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.0"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
|
||||
@@ -23,17 +23,12 @@ impl VectorIndex {
|
||||
.fields
|
||||
.iter()
|
||||
.map(|field_id| {
|
||||
manifest
|
||||
.schema
|
||||
.field_by_id(*field_id)
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"field {field_id} of index {} must exist in schema",
|
||||
index.name
|
||||
)
|
||||
})
|
||||
.name
|
||||
.clone()
|
||||
manifest.schema.field_path(*field_id).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"field {field_id} of index {} must exist in schema",
|
||||
index.name
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
|
||||
@@ -27,7 +27,9 @@ use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AnyQuery, Filter, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
|
||||
use crate::utils::{
|
||||
resolve_arrow_field_path, supported_btree_data_type, supported_vector_data_type,
|
||||
};
|
||||
use crate::{DistanceType, Error};
|
||||
use crate::{
|
||||
error::Result,
|
||||
@@ -1526,8 +1528,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
});
|
||||
}
|
||||
};
|
||||
let schema = self.schema().await?;
|
||||
let (canonical_column, field) = resolve_arrow_field_path(&schema, &column)?;
|
||||
let mut body = serde_json::json!({
|
||||
"column": column
|
||||
"column": canonical_column
|
||||
});
|
||||
|
||||
// Add name parameter if provided (for backwards compatibility, only include if Some)
|
||||
@@ -1562,12 +1566,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Index::LabelList(p) => ("LABEL_LIST", Some(to_json(p)?)),
|
||||
Index::FTS(p) => ("FTS", Some(to_json(p)?)),
|
||||
Index::Auto => {
|
||||
let schema = self.schema().await?;
|
||||
let field = schema
|
||||
.field_with_name(&column)
|
||||
.map_err(|_| Error::InvalidInput {
|
||||
message: format!("Column {} not found in schema", column),
|
||||
})?;
|
||||
if supported_vector_data_type(field.data_type()) {
|
||||
body[METRIC_TYPE_KEY] =
|
||||
serde_json::Value::String(DistanceType::L2.to_string().to_lowercase());
|
||||
@@ -1864,16 +1862,26 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
let schema = self.schema().await?;
|
||||
|
||||
// Make request to get stats for each index, so we get the index type.
|
||||
// This is a bit inefficient, but it's the only way to get the index type.
|
||||
let mut futures = Vec::with_capacity(body.indexes.len());
|
||||
for index in body.indexes {
|
||||
let columns = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|column| {
|
||||
resolve_arrow_field_path(&schema, column)
|
||||
.map(|(canonical_column, _)| canonical_column)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let future = async move {
|
||||
match self.index_stats(&index.index_name).await {
|
||||
Ok(Some(stats)) => Ok(Some(IndexConfig {
|
||||
name: index.index_name,
|
||||
index_type: stats.index_type,
|
||||
columns: index.columns,
|
||||
columns,
|
||||
})),
|
||||
Ok(None) => Ok(None), // The index must have been deleted since we listed it.
|
||||
Err(e) => Err(e),
|
||||
@@ -2315,6 +2323,38 @@ mod tests {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn nested_index_schema() -> Schema {
|
||||
let vector_type =
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 8);
|
||||
Schema::new(vec![
|
||||
Field::new(
|
||||
"metadata",
|
||||
DataType::Struct(vec![Field::new("user_id", DataType::Int32, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(vec![Field::new("embedding", vector_type, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"payload",
|
||||
DataType::Struct(vec![Field::new("text", DataType::Utf8, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"meta-data",
|
||||
DataType::Struct(vec![Field::new("user-id", DataType::Int32, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"literal",
|
||||
DataType::Struct(vec![Field::new("a.b", DataType::Int32, false)].into()),
|
||||
false,
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case("", 0)]
|
||||
#[case("{}", 0)]
|
||||
@@ -3081,6 +3121,59 @@ mod tests {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_vector_nested_field_path() {
|
||||
let expected_data = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
|
||||
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
|
||||
)
|
||||
.unwrap();
|
||||
let expected_data_ref = expected_data.clone();
|
||||
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = serde_json::json!({
|
||||
"vector_column": "image.embedding",
|
||||
"prefilter": true,
|
||||
"k": 10,
|
||||
"nprobes": 20,
|
||||
"minimum_nprobes": 20,
|
||||
"maximum_nprobes": 20,
|
||||
"lower_bound": Option::<f32>::None,
|
||||
"upper_bound": Option::<f32>::None,
|
||||
"ef": Option::<usize>::None,
|
||||
"refine_factor": Option::<u32>::None,
|
||||
"version": null,
|
||||
});
|
||||
expected_body["vector"] = vec![0.1f32, 0.2, 0.3].into();
|
||||
assert_eq!(body, expected_body);
|
||||
|
||||
let response_body = write_ipc_file(&expected_data_ref);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, ARROW_FILE_CONTENT_TYPE)
|
||||
.body(response_body)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let _ = table
|
||||
.query()
|
||||
.nearest_to(vec![0.1, 0.2, 0.3])
|
||||
.unwrap()
|
||||
.column("image.embedding")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_fts() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
@@ -3162,7 +3255,7 @@ mod tests {
|
||||
"query": {
|
||||
"match": {
|
||||
"terms": "hello world",
|
||||
"column": "a",
|
||||
"column": "payload.text",
|
||||
"boost": 1.0,
|
||||
"fuzziness": 0,
|
||||
"max_expansions": 50,
|
||||
@@ -3196,7 +3289,7 @@ mod tests {
|
||||
.query()
|
||||
.full_text_search(FullTextSearchQuery::new_query(
|
||||
MatchQuery::new("hello world".to_owned())
|
||||
.with_column(Some("a".to_owned()))
|
||||
.with_column(Some("payload.text".to_owned()))
|
||||
.into(),
|
||||
))
|
||||
.with_row_id()
|
||||
@@ -3467,32 +3560,152 @@ mod tests {
|
||||
for (index_type, expected_body, index) in cases {
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/create_index/");
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = expected_body.clone();
|
||||
expected_body["column"] = "a".into();
|
||||
expected_body[INDEX_TYPE_KEY] = index_type.into();
|
||||
match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => {
|
||||
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap()
|
||||
}
|
||||
"/v1/table/my_table/create_index/" => {
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = expected_body.clone();
|
||||
expected_body["column"] = "a".into();
|
||||
expected_body[INDEX_TYPE_KEY] = index_type.into();
|
||||
|
||||
assert_eq!(body, expected_body);
|
||||
assert_eq!(body, expected_body);
|
||||
|
||||
http::Response::builder().status(200).body("{}").unwrap()
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body("{}".to_string())
|
||||
.unwrap()
|
||||
}
|
||||
path => panic!("Unexpected path: {}", path),
|
||||
}
|
||||
});
|
||||
|
||||
table.create_index(&["a"], index).execute().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index_nested_field_paths() {
|
||||
let schema = nested_index_schema();
|
||||
let expected_requests = Arc::new(vec![
|
||||
json!({
|
||||
"column": "metadata.user_id",
|
||||
"index_type": "BTREE",
|
||||
}),
|
||||
json!({
|
||||
"column": "image.embedding",
|
||||
"index_type": "IVF_PQ",
|
||||
"metric_type": "l2",
|
||||
}),
|
||||
{
|
||||
let mut body = serde_json::to_value(InvertedIndexParams::default()).unwrap();
|
||||
body["column"] = "payload.text".into();
|
||||
body["index_type"] = "FTS".into();
|
||||
body
|
||||
},
|
||||
json!({
|
||||
"column": "`meta-data`.`user-id`",
|
||||
"index_type": "BTREE",
|
||||
}),
|
||||
json!({
|
||||
"column": "literal.`a.b`",
|
||||
"index_type": "BTREE",
|
||||
}),
|
||||
]);
|
||||
let request_idx = Arc::new(AtomicUsize::new(0));
|
||||
let table = Table::new_with_handler("my_table", {
|
||||
let schema = schema.clone();
|
||||
let expected_requests = expected_requests.clone();
|
||||
let request_idx = request_idx.clone();
|
||||
move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap(),
|
||||
"/v1/table/my_table/create_index/" => {
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
let idx = request_idx.fetch_add(1, Ordering::SeqCst);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body, expected_requests[idx]);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body("{}".to_string())
|
||||
.unwrap()
|
||||
}
|
||||
path => panic!("Unexpected path: {}", path),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
table
|
||||
.create_index(&["Metadata.USER_ID"], Index::BTree(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["Image.Embedding"], Index::Auto)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["Payload.Text"], Index::FTS(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["`META-DATA`.`USER-ID`"], Index::BTree(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["literal.`A.B`"], Index::BTree(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(request_idx.load(Ordering::SeqCst), expected_requests.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_indices() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
let schema = Schema::new(vec![
|
||||
Field::new(
|
||||
"vector",
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 8),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"metadata",
|
||||
DataType::Struct(vec![Field::new("my.column", DataType::Utf8, true)].into()),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
|
||||
let response_body = match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => {
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap();
|
||||
}
|
||||
"/v1/table/my_table/index/list/" => {
|
||||
serde_json::json!({
|
||||
"indexes": [
|
||||
@@ -3505,7 +3718,7 @@ mod tests {
|
||||
{
|
||||
"index_name": "my_idx",
|
||||
"index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6",
|
||||
"columns": ["my_column"],
|
||||
"columns": ["metadata.`my.column`"],
|
||||
"index_status": "done",
|
||||
},
|
||||
]
|
||||
@@ -3544,7 +3757,7 @@ mod tests {
|
||||
IndexConfig {
|
||||
name: "my_idx".into(),
|
||||
index_type: IndexType::LabelList,
|
||||
columns: vec!["my_column".into()],
|
||||
columns: vec!["metadata.`my.column`".into()],
|
||||
},
|
||||
];
|
||||
assert_eq!(indices, expected);
|
||||
@@ -4012,6 +4225,20 @@ mod tests {
|
||||
assert_eq!(request.method(), "POST");
|
||||
|
||||
let response_body = match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => {
|
||||
let schema = Schema::new(vec![
|
||||
Field::new(
|
||||
"vector",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
8,
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new("my_column", DataType::Utf8, false),
|
||||
]);
|
||||
serde_json::from_str::<serde_json::Value>(&describe_response(&schema)).unwrap()
|
||||
}
|
||||
"/v1/table/my_table/index/list/" => {
|
||||
serde_json::json!({
|
||||
"indexes": [
|
||||
@@ -4173,13 +4400,23 @@ mod tests {
|
||||
assert_eq!(value["index_type"], "IVF_PQ");
|
||||
}
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
}
|
||||
"/v1/table/dev$users/describe/" => {
|
||||
// Needed for schema check in Auto index type
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1, "schema": {"fields": [{"name": "embedding", "type": {"type": "list", "item": {"type": "float32"}}, "nullable": false}]}}"#)
|
||||
.body("".to_string())
|
||||
.unwrap()
|
||||
}
|
||||
"/v1/table/dev$users/describe/" => {
|
||||
let schema = Schema::new(vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
8,
|
||||
),
|
||||
false,
|
||||
)]);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap()
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -2688,16 +2688,13 @@ impl BaseTable for NativeTable {
|
||||
message: "Multi-column (composite) indices are not yet supported".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let dataset = self.dataset.get().await?;
|
||||
self.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*self.dataset.get().await?).clone();
|
||||
let (column, field) = Self::resolve_index_field(dataset.schema(), &opts.columns[0])?;
|
||||
drop(dataset);
|
||||
|
||||
let lance_idx_params = self.make_index_params(&field, opts.index.clone()).await?;
|
||||
let index_type = self.get_index_type_for_field(&field, &opts.index);
|
||||
let columns = [column.as_str()];
|
||||
self.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*self.dataset.get().await?).clone();
|
||||
let mut builder = dataset
|
||||
.create_index_builder(&columns, index_type, lance_idx_params.as_ref())
|
||||
.train(opts.train)
|
||||
@@ -2815,63 +2812,88 @@ impl BaseTable for NativeTable {
|
||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||
let dataset = self.dataset.get().await?;
|
||||
let indices = dataset.load_indices().await?;
|
||||
let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
|
||||
|
||||
// skip Lance internal indexes
|
||||
if idx.name == FRAG_REUSE_INDEX_NAME {
|
||||
return None;
|
||||
}
|
||||
|
||||
let stats = match dataset.index_statistics(idx.name.as_str()).await {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
|
||||
let results = futures::stream::iter(indices.as_slice())
|
||||
.then(|idx| async {
|
||||
// skip Lance internal indexes
|
||||
if idx.name == FRAG_REUSE_INDEX_NAME {
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let stats: serde_json::Value = match serde_json::from_str(&stats) {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
|
||||
log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
|
||||
return None;
|
||||
};
|
||||
|
||||
let index_type: crate::index::IndexType = match index_type.parse() {
|
||||
Ok(index_type) => index_type,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let mut columns = Vec::with_capacity(idx.fields.len());
|
||||
for field_id in &idx.fields {
|
||||
let column = match dataset.schema().field_path(*field_id) {
|
||||
Ok(column) => column,
|
||||
let stats = match dataset.index_statistics(idx.name.as_str()).await {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"The index {} ({}) referenced a field with id {} which does not exist in the schema: {}",
|
||||
"Failed to get statistics for index {} ({}): {}",
|
||||
idx.name,
|
||||
idx.uuid,
|
||||
field_id,
|
||||
e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
columns.push(column);
|
||||
}
|
||||
|
||||
let name = idx.name.clone();
|
||||
Some(IndexConfig { index_type, columns, name })
|
||||
}).collect::<Vec<_>>().await;
|
||||
let stats: serde_json::Value = match serde_json::from_str(&stats) {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"Failed to deserialize index statistics for index {} ({}): {}",
|
||||
idx.name,
|
||||
idx.uuid,
|
||||
e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
|
||||
log::warn!(
|
||||
"Index statistics was missing 'index_type' field for index {} ({})",
|
||||
idx.name,
|
||||
idx.uuid
|
||||
);
|
||||
return None;
|
||||
};
|
||||
|
||||
let index_type: crate::index::IndexType = match index_type.parse() {
|
||||
Ok(index_type) => index_type,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"Failed to parse index type for index {} ({}): {}",
|
||||
idx.name,
|
||||
idx.uuid,
|
||||
e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let mut columns = Vec::with_capacity(idx.fields.len());
|
||||
for field_id in &idx.fields {
|
||||
let field_path = match dataset.schema().field_path(*field_id) {
|
||||
Ok(field_path) => field_path,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"Failed to resolve field path for index {} ({}) field id {}: {}",
|
||||
idx.name,
|
||||
idx.uuid,
|
||||
field_id,
|
||||
e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
columns.push(field_path);
|
||||
}
|
||||
|
||||
let name = idx.name.clone();
|
||||
Some(IndexConfig {
|
||||
index_type,
|
||||
columns,
|
||||
name,
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
Ok(results.into_iter().flatten().collect())
|
||||
}
|
||||
@@ -3074,6 +3096,7 @@ pub struct FragmentSummaryStats {
|
||||
#[cfg(test)]
|
||||
#[allow(deprecated)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
@@ -3854,6 +3877,25 @@ mod tests {
|
||||
1
|
||||
);
|
||||
|
||||
let default_vector_results = table
|
||||
.query()
|
||||
.nearest_to(&[0.0; 8])
|
||||
.unwrap()
|
||||
.limit(1)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
default_vector_results
|
||||
.iter()
|
||||
.map(|batch| batch.num_rows())
|
||||
.sum::<usize>(),
|
||||
1
|
||||
);
|
||||
|
||||
let fts_results = table
|
||||
.query()
|
||||
.full_text_search(FullTextSearchQuery::new("document".to_string()))
|
||||
|
||||
@@ -6,7 +6,7 @@ pub(crate) mod background_cache;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::{DataType, Schema, SchemaRef};
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef};
|
||||
use datafusion_common::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion_execution::RecordBatchStream;
|
||||
use futures::{FutureExt, Stream};
|
||||
@@ -152,14 +152,10 @@ pub fn validate_namespace(namespace: &[String]) -> Result<()> {
|
||||
/// Find one default column to create index or perform vector query.
|
||||
pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result<String> {
|
||||
// Try to find a vector column.
|
||||
let candidates = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter_map(|field| match infer_vector_dim(field.data_type()) {
|
||||
Ok(d) if dim.is_none() || dim == Some(d as i32) => Some(field.name()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut candidates = Vec::new();
|
||||
for field in schema.fields() {
|
||||
collect_vector_columns(field, &mut Vec::new(), dim, &mut candidates);
|
||||
}
|
||||
if candidates.is_empty() {
|
||||
Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
@@ -180,6 +176,57 @@ pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_vector_columns(
|
||||
field: &Field,
|
||||
path: &mut Vec<String>,
|
||||
dim: Option<i32>,
|
||||
candidates: &mut Vec<String>,
|
||||
) {
|
||||
path.push(field.name().clone());
|
||||
match infer_vector_dim(field.data_type()) {
|
||||
Ok(d) if dim.is_none() || dim == Some(d as i32) => {
|
||||
let path_segments = path.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
candidates.push(lance_core::datatypes::format_field_path(&path_segments));
|
||||
}
|
||||
_ => {
|
||||
if let DataType::Struct(fields) = field.data_type() {
|
||||
for child in fields {
|
||||
collect_vector_columns(child, path, dim, candidates);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
path.pop();
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_arrow_field_path(schema: &Schema, column: &str) -> Result<(String, Field)> {
|
||||
lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
|
||||
message: format!("Invalid field path `{}`: {}", column, e),
|
||||
})?;
|
||||
|
||||
let lance_schema =
|
||||
lance_core::datatypes::Schema::try_from(schema).map_err(|e| Error::Schema {
|
||||
message: format!("Invalid schema: {}", e),
|
||||
})?;
|
||||
let field_path = lance_schema
|
||||
.resolve_case_insensitive(column)
|
||||
.ok_or_else(|| Error::Schema {
|
||||
message: format!(
|
||||
"Field path `{}` not found in schema. Available field paths: {}",
|
||||
column,
|
||||
lance_schema.field_paths().join(", ")
|
||||
),
|
||||
})?;
|
||||
let field = field_path.last().expect("field path should be non-empty");
|
||||
let path_segments = field_path
|
||||
.iter()
|
||||
.map(|field| field.name.as_str())
|
||||
.collect::<Vec<_>>();
|
||||
let canonical_path = lance_core::datatypes::format_field_path(&path_segments);
|
||||
|
||||
Ok((canonical_path, Field::from(*field)))
|
||||
}
|
||||
|
||||
pub fn supported_btree_data_type(dtype: &DataType) -> bool {
|
||||
dtype.is_integer()
|
||||
|| dtype.is_floating()
|
||||
@@ -450,6 +497,49 @@ mod tests {
|
||||
"vec"
|
||||
);
|
||||
|
||||
let schema_with_nested_vec_col = Schema::new(vec![
|
||||
Field::new("id", DataType::Int16, true),
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
default_vector_column(&schema_with_nested_vec_col, None).unwrap(),
|
||||
"image.embedding"
|
||||
);
|
||||
|
||||
let schema_with_escaped_nested_vec_col = Schema::new(vec![Field::new(
|
||||
"image-meta",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding.v1",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
)]);
|
||||
assert_eq!(
|
||||
default_vector_column(&schema_with_escaped_nested_vec_col, None).unwrap(),
|
||||
"`image-meta`.`embedding.v1`"
|
||||
);
|
||||
|
||||
let multi_vec_col = Schema::new(vec![
|
||||
Field::new("id", DataType::Int16, true),
|
||||
Field::new(
|
||||
@@ -469,6 +559,48 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("More than one")
|
||||
);
|
||||
|
||||
let multi_nested_vec_col = Schema::new(vec![
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"text",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
50,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
default_vector_column(&multi_nested_vec_col, Some(50)).unwrap(),
|
||||
"text.embedding"
|
||||
);
|
||||
let err = default_vector_column(&multi_nested_vec_col, None)
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
assert!(err.contains("image.embedding"));
|
||||
assert!(err.contains("text.embedding"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user