Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
e8feaede1e Bump version: 0.23.1-beta.2 → 0.23.1 2026-01-02 17:38:59 +00:00
33 changed files with 661 additions and 1410 deletions

844
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,39 +15,39 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=2.0.0-beta.6", default-features = false, "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=2.0.0-beta.6", default-features = false, "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=2.0.0-beta.6", default-features = false, "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=2.0.0-beta.6", "tag" = "v2.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=1.0.1", default-features = false }
lance-core = "=1.0.1"
lance-datagen = "=1.0.1"
lance-file = "=1.0.1"
lance-io = { "version" = "=1.0.1", default-features = false }
lance-index = "=1.0.1"
lance-linalg = "=1.0.1"
lance-namespace = "=1.0.1"
lance-namespace-impls = { "version" = "=1.0.1", default-features = false }
lance-table = "=1.0.1"
lance-testing = "=1.0.1"
lance-datafusion = "=1.0.1"
lance-encoding = "=1.0.1"
lance-arrow = "=1.0.1"
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.1", optional = false }
arrow-array = "57.1"
arrow-data = "57.1"
arrow-ipc = "57.1"
arrow-ord = "57.1"
arrow-schema = "57.1"
arrow-select = "57.1"
arrow-cast = "57.1"
arrow = { version = "56.2", optional = false }
arrow-array = "56.2"
arrow-data = "56.2"
arrow-ipc = "56.2"
arrow-ord = "56.2"
arrow-schema = "56.2"
arrow-select = "56.2"
arrow-cast = "56.2"
async-trait = "0"
datafusion = { version = "51.0", default-features = false }
datafusion-catalog = "51.0"
datafusion-common = { version = "51.0", default-features = false }
datafusion-execution = "51.0"
datafusion-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion = { version = "50.1", default-features = false }
datafusion-catalog = "50.1"
datafusion-common = { version = "50.1", default-features = false }
datafusion-execution = "50.1"
datafusion-expr = "50.1"
datafusion-physical-plan = "50.1"
env_logger = "0.11"
half = { "version" = "2.7.1", default-features = false, features = [
half = { "version" = "2.6.0", default-features = false, features = [
"num-traits",
] }
futures = "0"

View File

@@ -16,7 +16,7 @@ check_command_exists() {
}
if [[ ! -e ./lancedb ]]; then
if [[ x${SOPHON_READ_TOKEN} != "x" ]]; then
if [[ -v SOPHON_READ_TOKEN ]]; then
INPUT="lancedb-linux-x64"
gh release \
--repo lancedb/lancedb \

View File

@@ -11,7 +11,7 @@ watch:
theme:
name: "material"
logo: assets/logo.png
favicon: assets/favicon.ico
favicon: assets/logo.png
palette:
# Palette toggle for light mode
- scheme: lancedb
@@ -32,6 +32,8 @@ theme:
- content.tooltips
- toc.follow
- navigation.top
- navigation.tabs
- navigation.tabs.sticky
- navigation.footer
- navigation.tracking
- navigation.instant
@@ -113,13 +115,12 @@ markdown_extensions:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
- markdown.extensions.toc:
toc_depth: 3
permalink: true
permalink_title: Anchor link to this section
baselevel: 1
permalink: ""
nav:
- Documentation:
- SDK Reference: index.md
- API reference:
- Overview: index.md
- Python: python/python.md
- Javascript/TypeScript: js/globals.md
- Java: java/java.md

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

View File

@@ -1,111 +0,0 @@
# VoyageAI Embeddings : Multimodal
VoyageAI embeddings can also be used to embed both text and image data, only some of the models support image data and you can check the list
under [https://docs.voyageai.com/docs/multimodal-embeddings](https://docs.voyageai.com/docs/multimodal-embeddings)
Supported multimodal models:
- `voyage-multimodal-3` - 1024 dimensions (text + images)
- `voyage-multimodal-3.5` - Flexible dimensions (256, 512, 1024 default, 2048). Supports text, images, and video.
### Video Support (voyage-multimodal-3.5)
The `voyage-multimodal-3.5` model supports video input through:
- Video URLs (`.mp4`, `.webm`, `.mov`, `.avi`, `.mkv`, `.m4v`, `.gif`)
- Video file paths
Constraints: Max 20MB video size.
Supported parameters (to be passed in `create` method) are:
| Parameter | Type | Default Value | Description |
|---|---|-------------------------|-------------------------------------------|
| `name` | `str` | `"voyage-multimodal-3"` | The model ID of the VoyageAI model to use |
| `output_dimension` | `int` | `None` | Output dimension for voyage-multimodal-3.5. Valid: 256, 512, 1024, 2048 |
Usage Example:
```python
import base64
import os
from io import BytesIO
import requests
import lancedb
from lancedb.pydantic import LanceModel, Vector
from lancedb.embeddings import get_registry
import pandas as pd
os.environ['VOYAGE_API_KEY'] = 'YOUR_VOYAGE_API_KEY'
db = lancedb.connect(".lancedb")
func = get_registry().get("voyageai").create(name="voyage-multimodal-3")
def image_to_base64(image_bytes: bytes):
buffered = BytesIO(image_bytes)
img_str = base64.b64encode(buffered.getvalue())
return img_str.decode("utf-8")
class Images(LanceModel):
label: str
image_uri: str = func.SourceField() # image uri as the source
image_bytes: str = func.SourceField() # image bytes base64 encoded as the source
vector: Vector(func.ndims()) = func.VectorField() # vector column
vec_from_bytes: Vector(func.ndims()) = func.VectorField() # Another vector column
if "images" in db.table_names():
db.drop_table("images")
table = db.create_table("images", schema=Images)
labels = ["cat", "cat", "dog", "dog", "horse", "horse"]
uris = [
"http://farm1.staticflickr.com/53/167798175_7c7845bbbd_z.jpg",
"http://farm1.staticflickr.com/134/332220238_da527d8140_z.jpg",
"http://farm9.staticflickr.com/8387/8602747737_2e5c2a45d4_z.jpg",
"http://farm5.staticflickr.com/4092/5017326486_1f46057f5f_z.jpg",
"http://farm9.staticflickr.com/8216/8434969557_d37882c42d_z.jpg",
"http://farm6.staticflickr.com/5142/5835678453_4f3a4edb45_z.jpg",
]
# get each uri as bytes
images_bytes = [image_to_base64(requests.get(uri).content) for uri in uris]
table.add(
pd.DataFrame({"label": labels, "image_uri": uris, "image_bytes": images_bytes})
)
```
Now we can search using text from both the default vector column and the custom vector column
```python
# text search
actual = table.search("man's best friend", "vec_from_bytes").limit(1).to_pydantic(Images)[0]
print(actual.label) # prints "dog"
frombytes = (
table.search("man's best friend", vector_column_name="vec_from_bytes")
.limit(1)
.to_pydantic(Images)[0]
)
print(frombytes.label)
```
Because we're using a multi-modal embedding function, we can also search using images
```python
# image search
query_image_uri = "http://farm1.staticflickr.com/200/467715466_ed4a31801f_z.jpg"
image_bytes = requests.get(query_image_uri).content
query_image = Image.open(BytesIO(image_bytes))
actual = table.search(query_image, "vec_from_bytes").limit(1).to_pydantic(Images)[0]
print(actual.label == "dog")
# image search using a custom vector column
other = (
table.search(query_image, vector_column_name="vec_from_bytes")
.limit(1)
.to_pydantic(Images)[0]
)
print(actual.label)
```

View File

@@ -1,12 +1,8 @@
# SDK Reference
# API Reference
This site contains the API reference for the client SDKs supported by [LanceDB](https://lancedb.com).
This page contains the API reference for the SDKs supported by the LanceDB team.
- [Python](python/python.md)
- [JavaScript/TypeScript](js/globals.md)
- [Java](java/java.md)
- [Rust](https://docs.rs/lancedb/latest/lancedb/index.html)
!!! info "LanceDB Documentation"
If you're looking for the full documentation of LanceDB, visit [docs.lancedb.com](https://docs.lancedb.com).
- [Rust](https://docs.rs/lancedb/latest/lancedb/index.html)

View File

@@ -85,26 +85,17 @@
/* Header gradient (only header area) */
.md-header {
background: linear-gradient(90deg, #e4d8f8 0%, #F0B7C1 45%, #E55A2B 100%);
background: linear-gradient(90deg, #3B2E58 0%, #F0B7C1 45%, #E55A2B 100%);
box-shadow: inset 0 1px 0 rgba(255,255,255,0.08), 0 1px 0 rgba(0,0,0,0.08);
}
/* Improve brand title contrast on the lavender side */
.md-header__title,
.md-header__topic,
.md-header__title .md-ellipsis,
.md-header__topic .md-ellipsis {
color: #2b1b3a;
text-shadow: 0 1px 0 rgba(255, 255, 255, 0.25);
}
/* Same colors as header for tabs (that hold the text) */
.md-tabs {
background: linear-gradient(90deg, #e4d8f8 0%, #F0B7C1 45%, #E55A2B 100%);
background: linear-gradient(90deg, #3B2E58 0%, #F0B7C1 45%, #E55A2B 100%);
}
/* Dark scheme variant */
[data-md-color-scheme="slate"] .md-header,
[data-md-color-scheme="slate"] .md-tabs {
background: linear-gradient(90deg, #e4d8f8 0%, #F0B7C1 45%, #E55A2B 100%);
background: linear-gradient(90deg, #3B2E58 0%, #F0B7C1 45%, #E55A2B 100%);
}

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.23.1",
"version": "0.23.1-beta.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.23.1",
"version": "0.23.1-beta.1",
"cpu": [
"x64",
"arm64"

View File

@@ -14,15 +14,15 @@ name = "_lancedb"
crate-type = ["cdylib"]
[dependencies]
arrow = { workspace = true, features = ["pyarrow"] }
arrow = { version = "56.2", features = ["pyarrow"] }
async-trait = "0.1"
lancedb = { path = "../rust/lancedb", default-features = false }
lance-core.workspace = true
lance-namespace.workspace = true
lance-io.workspace = true
env_logger.workspace = true
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.26", features = [
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.25", features = [
"attributes",
"tokio-runtime",
] }
@@ -32,7 +32,7 @@ snafu.workspace = true
tokio = { version = "1.40", features = ["sync"] }
[build-dependencies]
pyo3-build-config = { version = "0.26", features = [
pyo3-build-config = { version = "0.25", features = [
"extension-module",
"abi3-py39",
] }

View File

@@ -13,7 +13,6 @@ __version__ = importlib.metadata.version("lancedb")
from ._lancedb import connect as lancedb_connect
from .common import URI, sanitize_uri
from urllib.parse import urlparse
from .db import AsyncConnection, DBConnection, LanceDBConnection
from .io import StorageOptionsProvider
from .remote import ClientConfig
@@ -29,39 +28,6 @@ from .namespace import (
)
def _check_s3_bucket_with_dots(
uri: str, storage_options: Optional[Dict[str, str]]
) -> None:
"""
Check if an S3 URI has a bucket name containing dots and warn if no region
is specified. S3 buckets with dots cannot use virtual-hosted-style URLs,
which breaks automatic region detection.
See: https://github.com/lancedb/lancedb/issues/1898
"""
if not isinstance(uri, str) or not uri.startswith("s3://"):
return
parsed = urlparse(uri)
bucket = parsed.netloc
if "." not in bucket:
return
# Check if region is provided in storage_options
region_keys = {"region", "aws_region"}
has_region = storage_options and any(k in storage_options for k in region_keys)
if not has_region:
raise ValueError(
f"S3 bucket name '{bucket}' contains dots, which prevents automatic "
f"region detection. Please specify the region explicitly via "
f"storage_options={{'region': '<your-region>'}} or "
f"storage_options={{'aws_region': '<your-region>'}}. "
f"See https://github.com/lancedb/lancedb/issues/1898 for details."
)
def connect(
uri: URI,
*,
@@ -155,11 +121,9 @@ def connect(
storage_options=storage_options,
**kwargs,
)
_check_s3_bucket_with_dots(str(uri), storage_options)
if kwargs:
raise ValueError(f"Unknown keyword arguments: {kwargs}")
return LanceDBConnection(
uri,
read_consistency_interval=read_consistency_interval,
@@ -247,8 +211,6 @@ async def connect_async(
if isinstance(client_config, dict):
client_config = ClientConfig(**client_config)
_check_s3_bucket_with_dots(str(uri), storage_options)
return AsyncConnection(
await lancedb_connect(
sanitize_uri(uri),

View File

@@ -2,7 +2,7 @@
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import base64
import os
from typing import ClassVar, TYPE_CHECKING, List, Union, Any, Generator, Optional
from typing import ClassVar, TYPE_CHECKING, List, Union, Any, Generator
from pathlib import Path
from urllib.parse import urlparse
@@ -45,29 +45,11 @@ def is_valid_url(text):
return False
VIDEO_EXTENSIONS = {".mp4", ".webm", ".mov", ".avi", ".mkv", ".m4v", ".gif"}
def is_video_url(url: str) -> bool:
"""Check if URL points to a video file based on extension."""
parsed = urlparse(url)
path = parsed.path.lower()
return any(path.endswith(ext) for ext in VIDEO_EXTENSIONS)
def is_video_path(path: Path) -> bool:
"""Check if file path is a video file based on extension."""
return path.suffix.lower() in VIDEO_EXTENSIONS
def transform_input(input_data: Union[str, bytes, Path]):
PIL = attempt_import_or_raise("PIL", "pillow")
if isinstance(input_data, str):
if is_valid_url(input_data):
if is_video_url(input_data):
content = {"type": "video_url", "video_url": input_data}
else:
content = {"type": "image_url", "image_url": input_data}
content = {"type": "image_url", "image_url": input_data}
else:
content = {"type": "text", "text": input_data}
elif isinstance(input_data, PIL.Image.Image):
@@ -88,24 +70,14 @@ def transform_input(input_data: Union[str, bytes, Path]):
"image_base64": "data:image/jpeg;base64," + img_str,
}
elif isinstance(input_data, Path):
if is_video_path(input_data):
# Read video file and encode as base64
with open(input_data, "rb") as f:
video_bytes = f.read()
video_str = base64.b64encode(video_bytes).decode("utf-8")
content = {
"type": "video_base64",
"video_base64": video_str,
}
else:
img = PIL.Image.open(input_data)
buffered = BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
content = {
"type": "image_base64",
"image_base64": "data:image/jpeg;base64," + img_str,
}
img = PIL.Image.open(input_data)
buffered = BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
content = {
"type": "image_base64",
"image_base64": "data:image/jpeg;base64," + img_str,
}
else:
raise ValueError("Each input should be either str, bytes, Path or Image.")
@@ -119,8 +91,6 @@ def sanitize_multimodal_input(inputs: Union[TEXT, IMAGES]) -> List[Any]:
PIL = attempt_import_or_raise("PIL", "pillow")
if isinstance(inputs, (str, bytes, Path, PIL.Image.Image)):
inputs = [inputs]
elif isinstance(inputs, list):
pass # Already a list, use as-is
elif isinstance(inputs, pa.Array):
inputs = inputs.to_pylist()
elif isinstance(inputs, pa.ChunkedArray):
@@ -173,16 +143,11 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
* voyage-3
* voyage-3-lite
* voyage-multimodal-3
* voyage-multimodal-3.5
* voyage-finance-2
* voyage-multilingual-2
* voyage-law-2
* voyage-code-2
output_dimension: int, optional
The output dimension for models that support flexible dimensions.
Currently only voyage-multimodal-3.5 supports this feature.
Valid options: 256, 512, 1024 (default), 2048.
Examples
--------
@@ -210,10 +175,7 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
"""
name: str
output_dimension: Optional[int] = None
client: ClassVar = None
_FLEXIBLE_DIM_MODELS: ClassVar[list] = ["voyage-multimodal-3.5"]
_VALID_DIMENSIONS: ClassVar[list] = [256, 512, 1024, 2048]
text_embedding_models: list = [
"voyage-3.5",
"voyage-3.5-lite",
@@ -224,7 +186,7 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
"voyage-law-2",
"voyage-code-2",
]
multimodal_embedding_models: list = ["voyage-multimodal-3", "voyage-multimodal-3.5"]
multimodal_embedding_models: list = ["voyage-multimodal-3"]
contextual_embedding_models: list = ["voyage-context-3"]
def _is_multimodal_model(self, model_name: str):
@@ -236,17 +198,6 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
return model_name in self.contextual_embedding_models or "context" in model_name
def ndims(self):
# Handle flexible dimension models
if self.name in self._FLEXIBLE_DIM_MODELS:
if self.output_dimension is not None:
if self.output_dimension not in self._VALID_DIMENSIONS:
raise ValueError(
f"Invalid output_dimension {self.output_dimension} "
f"for {self.name}. Valid options: {self._VALID_DIMENSIONS}"
)
return self.output_dimension
return 1024 # default dimension
if self.name == "voyage-3-lite":
return 512
elif self.name == "voyage-code-2":
@@ -260,17 +211,12 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
"voyage-finance-2",
"voyage-multilingual-2",
"voyage-law-2",
"voyage-multimodal-3",
]:
return 1024
else:
raise ValueError(f"Model {self.name} not supported")
def _get_multimodal_kwargs(self, **kwargs):
"""Get kwargs for multimodal embed call, including output_dimension if set."""
if self.name in self._FLEXIBLE_DIM_MODELS and self.output_dimension is not None:
kwargs["output_dimension"] = self.output_dimension
return kwargs
def compute_query_embeddings(
self, query: Union[str, "PIL.Image.Image"], *args, **kwargs
) -> List[np.ndarray]:
@@ -288,7 +234,6 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
"""
client = VoyageAIEmbeddingFunction._get_client()
if self._is_multimodal_model(self.name):
kwargs = self._get_multimodal_kwargs(**kwargs)
result = client.multimodal_embed(
inputs=[[query]], model=self.name, input_type="query", **kwargs
)
@@ -330,7 +275,6 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
)
if has_images:
# Use non-batched API for images
kwargs = self._get_multimodal_kwargs(**kwargs)
result = client.multimodal_embed(
inputs=sanitized, model=self.name, input_type="document", **kwargs
)
@@ -413,7 +357,6 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
callable: A function that takes a batch of texts and returns embeddings.
"""
if self._is_multimodal_model(self.name):
multimodal_kwargs = self._get_multimodal_kwargs(**kwargs)
def embed_batch(batch: List[str]) -> List[np.array]:
batch_inputs = sanitize_multimodal_input(batch)
@@ -421,7 +364,7 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction):
inputs=batch_inputs,
model=self.name,
input_type=input_type,
**multimodal_kwargs,
**kwargs,
)
return result.embeddings

View File

@@ -384,7 +384,6 @@ class RemoteDBConnection(DBConnection):
on_bad_vectors: str = "error",
fill_value: float = 0.0,
mode: Optional[str] = None,
exist_ok: bool = False,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: Optional[List[str]] = None,
@@ -413,12 +412,6 @@ class RemoteDBConnection(DBConnection):
- pyarrow.Schema
- [LanceModel][lancedb.pydantic.LanceModel]
mode: str, default "create"
The mode to use when creating the table.
Can be either "create", "overwrite", or "exist_ok".
exist_ok: bool, default False
If exist_ok is True, and mode is None or "create", mode will be changed
to "exist_ok".
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill".
@@ -490,11 +483,6 @@ class RemoteDBConnection(DBConnection):
LanceTable(table4)
"""
if exist_ok:
if mode == "create":
mode = "exist_ok"
elif not mode:
mode = "exist_ok"
if namespace is None:
namespace = []
validate_table_name(name)

View File

@@ -18,17 +18,7 @@ from lancedb._lancedb import (
UpdateResult,
)
from lancedb.embeddings.base import EmbeddingFunctionConfig
from lancedb.index import (
FTS,
BTree,
Bitmap,
HnswSq,
IvfFlat,
IvfPq,
IvfRq,
IvfSq,
LabelList,
)
from lancedb.index import FTS, BTree, Bitmap, HnswSq, IvfFlat, IvfPq, IvfSq, LabelList
from lancedb.remote.db import LOOP
import pyarrow as pa
@@ -275,12 +265,6 @@ class RemoteTable(Table):
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
)
elif index_type == "IVF_RQ":
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
)
elif index_type == "IVF_SQ":
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_HNSW_PQ":
@@ -295,8 +279,7 @@ class RemoteTable(Table):
else:
raise ValueError(
f"Unknown vector index type: {index_type}. Valid options are"
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'"
" 'IVF_FLAT', 'IVF_SQ', 'IVF_PQ', 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'"
)
LOOP.run(

View File

@@ -613,133 +613,6 @@ def test_voyageai_multimodal_embedding_text_function():
assert len(tbl.to_pandas()["vector"][0]) == voyageai.ndims()
@pytest.mark.slow
@pytest.mark.skipif(
os.environ.get("VOYAGE_API_KEY") is None, reason="VOYAGE_API_KEY not set"
)
def test_voyageai_multimodal_35_embedding_function():
"""Test voyage-multimodal-3.5 model with text input."""
voyageai = (
get_registry()
.get("voyageai")
.create(name="voyage-multimodal-3.5", max_retries=0)
)
class TextModel(LanceModel):
text: str = voyageai.SourceField()
vector: Vector(voyageai.ndims()) = voyageai.VectorField()
df = pd.DataFrame({"text": ["hello world", "goodbye world"]})
db = lancedb.connect("~/lancedb")
tbl = db.create_table("test_multimodal_35", schema=TextModel, mode="overwrite")
tbl.add(df)
assert len(tbl.to_pandas()["vector"][0]) == voyageai.ndims()
assert voyageai.ndims() == 1024
@pytest.mark.slow
@pytest.mark.skipif(
os.environ.get("VOYAGE_API_KEY") is None, reason="VOYAGE_API_KEY not set"
)
def test_voyageai_multimodal_35_flexible_dimensions():
"""Test voyage-multimodal-3.5 model with custom output dimension."""
voyageai = (
get_registry()
.get("voyageai")
.create(name="voyage-multimodal-3.5", output_dimension=512, max_retries=0)
)
class TextModel(LanceModel):
text: str = voyageai.SourceField()
vector: Vector(voyageai.ndims()) = voyageai.VectorField()
assert voyageai.ndims() == 512
df = pd.DataFrame({"text": ["hello world", "goodbye world"]})
db = lancedb.connect("~/lancedb")
tbl = db.create_table("test_multimodal_35_dim", schema=TextModel, mode="overwrite")
tbl.add(df)
assert len(tbl.to_pandas()["vector"][0]) == 512
@pytest.mark.slow
@pytest.mark.skipif(
os.environ.get("VOYAGE_API_KEY") is None, reason="VOYAGE_API_KEY not set"
)
def test_voyageai_multimodal_35_image_embedding():
"""Test voyage-multimodal-3.5 model with image input."""
voyageai = (
get_registry()
.get("voyageai")
.create(name="voyage-multimodal-3.5", max_retries=0)
)
class Images(LanceModel):
label: str
image_uri: str = voyageai.SourceField()
vector: Vector(voyageai.ndims()) = voyageai.VectorField()
db = lancedb.connect("~/lancedb")
table = db.create_table(
"test_multimodal_35_images", schema=Images, mode="overwrite"
)
labels = ["cat", "dog"]
uris = [
"http://farm1.staticflickr.com/53/167798175_7c7845bbbd_z.jpg",
"http://farm9.staticflickr.com/8387/8602747737_2e5c2a45d4_z.jpg",
]
table.add(pd.DataFrame({"label": labels, "image_uri": uris}))
assert len(table.to_pandas()["vector"][0]) == voyageai.ndims()
assert voyageai.ndims() == 1024
@pytest.mark.slow
@pytest.mark.skipif(
os.environ.get("VOYAGE_API_KEY") is None, reason="VOYAGE_API_KEY not set"
)
@pytest.mark.parametrize("dimension", [256, 512, 1024, 2048])
def test_voyageai_multimodal_35_all_dimensions(dimension):
"""Test voyage-multimodal-3.5 model with all valid output dimensions."""
voyageai = (
get_registry()
.get("voyageai")
.create(name="voyage-multimodal-3.5", output_dimension=dimension, max_retries=0)
)
assert voyageai.ndims() == dimension
class TextModel(LanceModel):
text: str = voyageai.SourceField()
vector: Vector(voyageai.ndims()) = voyageai.VectorField()
df = pd.DataFrame({"text": ["hello world"]})
db = lancedb.connect("~/lancedb")
tbl = db.create_table(
f"test_multimodal_35_dim_{dimension}", schema=TextModel, mode="overwrite"
)
tbl.add(df)
assert len(tbl.to_pandas()["vector"][0]) == dimension
@pytest.mark.slow
@pytest.mark.skipif(
os.environ.get("VOYAGE_API_KEY") is None, reason="VOYAGE_API_KEY not set"
)
def test_voyageai_multimodal_35_invalid_dimension():
"""Test voyage-multimodal-3.5 model raises error for invalid output dimension."""
with pytest.raises(ValueError, match="Invalid output_dimension"):
voyageai = (
get_registry()
.get("voyageai")
.create(name="voyage-multimodal-3.5", output_dimension=999, max_retries=0)
)
# ndims() is where the validation happens
voyageai.ndims()
@pytest.mark.slow
@pytest.mark.skipif(
importlib.util.find_spec("colpali_engine") is None,

View File

@@ -168,42 +168,6 @@ def test_table_len_sync():
assert len(table) == 1
def test_create_table_exist_ok():
def handler(request):
if request.path == "/v1/table/test/create/?mode=exist_ok":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}], exist_ok=True)
assert table is not None
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}], mode="create", exist_ok=True)
assert table is not None
def test_create_table_exist_ok_with_mode_overwrite():
def handler(request):
if request.path == "/v1/table/test/create/?mode=overwrite":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}], mode="overwrite", exist_ok=True)
assert table is not None
@pytest.mark.asyncio
async def test_http_error():
request_id_holder = {"request_id": None}

View File

@@ -1,68 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""
Tests for S3 bucket names containing dots.
Related issue: https://github.com/lancedb/lancedb/issues/1898
These tests validate the early error checking for S3 bucket names with dots.
No actual S3 connection is made - validation happens before connection.
"""
import pytest
import lancedb
# Test URIs
BUCKET_WITH_DOTS = "s3://my.bucket.name/path"
BUCKET_WITH_DOTS_AND_REGION = ("s3://my.bucket.name", {"region": "us-east-1"})
BUCKET_WITH_DOTS_AND_AWS_REGION = ("s3://my.bucket.name", {"aws_region": "us-east-1"})
BUCKET_WITHOUT_DOTS = "s3://my-bucket/path"
class TestS3BucketWithDotsSync:
"""Tests for connect()."""
def test_bucket_with_dots_requires_region(self):
with pytest.raises(ValueError, match="contains dots"):
lancedb.connect(BUCKET_WITH_DOTS)
def test_bucket_with_dots_and_region_passes(self):
uri, opts = BUCKET_WITH_DOTS_AND_REGION
db = lancedb.connect(uri, storage_options=opts)
assert db is not None
def test_bucket_with_dots_and_aws_region_passes(self):
uri, opts = BUCKET_WITH_DOTS_AND_AWS_REGION
db = lancedb.connect(uri, storage_options=opts)
assert db is not None
def test_bucket_without_dots_passes(self):
db = lancedb.connect(BUCKET_WITHOUT_DOTS)
assert db is not None
class TestS3BucketWithDotsAsync:
"""Tests for connect_async()."""
@pytest.mark.asyncio
async def test_bucket_with_dots_requires_region(self):
with pytest.raises(ValueError, match="contains dots"):
await lancedb.connect_async(BUCKET_WITH_DOTS)
@pytest.mark.asyncio
async def test_bucket_with_dots_and_region_passes(self):
uri, opts = BUCKET_WITH_DOTS_AND_REGION
db = await lancedb.connect_async(uri, storage_options=opts)
assert db is not None
@pytest.mark.asyncio
async def test_bucket_with_dots_and_aws_region_passes(self):
uri, opts = BUCKET_WITH_DOTS_AND_AWS_REGION
db = await lancedb.connect_async(uri, storage_options=opts)
assert db is not None
@pytest.mark.asyncio
async def test_bucket_without_dots_passes(self):
db = await lancedb.connect_async(BUCKET_WITHOUT_DOTS)
assert db is not None

View File

@@ -10,7 +10,8 @@ use arrow::{
use futures::stream::StreamExt;
use lancedb::arrow::SendableRecordBatchStream;
use pyo3::{
exceptions::PyStopAsyncIteration, pyclass, pymethods, Bound, Py, PyAny, PyRef, PyResult, Python,
exceptions::PyStopAsyncIteration, pyclass, pymethods, Bound, PyAny, PyObject, PyRef, PyResult,
Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
@@ -35,11 +36,8 @@ impl RecordBatchStream {
#[pymethods]
impl RecordBatchStream {
#[getter]
pub fn schema(&self, py: Python) -> PyResult<Py<PyAny>> {
(*self.schema)
.clone()
.into_pyarrow(py)
.map(|bound| bound.unbind())
pub fn schema(&self, py: Python) -> PyResult<PyObject> {
(*self.schema).clone().into_pyarrow(py)
}
pub fn __aiter__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> {
@@ -55,12 +53,7 @@ impl RecordBatchStream {
.next()
.await
.ok_or_else(|| PyStopAsyncIteration::new_err(""))?;
Python::attach(|py| {
inner_next
.infer_error()?
.to_pyarrow(py)
.map(|bound| bound.unbind())
})
Python::with_gil(|py| inner_next.infer_error()?.to_pyarrow(py))
})
}
}

View File

@@ -12,7 +12,7 @@ use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods,
types::{PyDict, PyDictMethods},
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
@@ -114,7 +114,7 @@ impl Connection {
data: Bound<'_, PyAny>,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
storage_options_provider: Option<Py<PyAny>>,
storage_options_provider: Option<PyObject>,
location: Option<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -152,7 +152,7 @@ impl Connection {
schema: Bound<'_, PyAny>,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
storage_options_provider: Option<Py<PyAny>>,
storage_options_provider: Option<PyObject>,
location: Option<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -187,7 +187,7 @@ impl Connection {
name: String,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
storage_options_provider: Option<Py<PyAny>>,
storage_options_provider: Option<PyObject>,
index_cache_size: Option<u32>,
location: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
@@ -304,7 +304,6 @@ impl Connection {
},
page_token,
limit: limit.map(|l| l as i32),
..Default::default()
};
let response = inner.list_namespaces(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
@@ -326,12 +325,12 @@ impl Connection {
let inner = self_.get_inner()?.clone();
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::CreateNamespaceRequest;
let mode_enum = mode.map(|m| match m.to_lowercase().as_str() {
"create" => "Create".to_string(),
"exist_ok" => "ExistOk".to_string(),
"overwrite" => "Overwrite".to_string(),
other => other.to_string(),
use lance_namespace::models::{create_namespace_request, CreateNamespaceRequest};
let mode_enum = mode.and_then(|m| match m.to_lowercase().as_str() {
"create" => Some(create_namespace_request::Mode::Create),
"exist_ok" => Some(create_namespace_request::Mode::ExistOk),
"overwrite" => Some(create_namespace_request::Mode::Overwrite),
_ => None,
});
let request = CreateNamespaceRequest {
id: if namespace.is_empty() {
@@ -341,7 +340,6 @@ impl Connection {
},
mode: mode_enum,
properties,
..Default::default()
};
let response = inner.create_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
@@ -362,16 +360,16 @@ impl Connection {
let inner = self_.get_inner()?.clone();
let py = self_.py();
future_into_py(py, async move {
use lance_namespace::models::DropNamespaceRequest;
let mode_enum = mode.map(|m| match m.to_uppercase().as_str() {
"SKIP" => "Skip".to_string(),
"FAIL" => "Fail".to_string(),
other => other.to_string(),
use lance_namespace::models::{drop_namespace_request, DropNamespaceRequest};
let mode_enum = mode.and_then(|m| match m.to_uppercase().as_str() {
"SKIP" => Some(drop_namespace_request::Mode::Skip),
"FAIL" => Some(drop_namespace_request::Mode::Fail),
_ => None,
});
let behavior_enum = behavior.map(|b| match b.to_uppercase().as_str() {
"RESTRICT" => "Restrict".to_string(),
"CASCADE" => "Cascade".to_string(),
other => other.to_string(),
let behavior_enum = behavior.and_then(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Some(drop_namespace_request::Behavior::Restrict),
"CASCADE" => Some(drop_namespace_request::Behavior::Cascade),
_ => None,
});
let request = DropNamespaceRequest {
id: if namespace.is_empty() {
@@ -381,7 +379,6 @@ impl Connection {
},
mode: mode_enum,
behavior: behavior_enum,
..Default::default()
};
let response = inner.drop_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
@@ -408,7 +405,6 @@ impl Connection {
} else {
Some(namespace)
},
..Default::default()
};
let response = inner.describe_namespace(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
@@ -438,7 +434,6 @@ impl Connection {
},
page_token,
limit: limit.map(|l| l as i32),
..Default::default()
};
let response = inner.list_tables(request).await.infer_error()?;
Python::with_gil(|py| -> PyResult<Py<PyDict>> {

View File

@@ -1,4 +1,3 @@
#![allow(deprecated)]
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors

View File

@@ -281,7 +281,7 @@ impl PyPermutationReader {
let reader = slf.reader.clone();
future_into_py(slf.py(), async move {
let schema = reader.output_schema(selection).await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py).map(|bound| bound.unbind()))
Python::with_gil(|py| schema.to_pyarrow(py))
})
}

View File

@@ -216,7 +216,7 @@ impl<'py> IntoPyObject<'py> for PyQueryVectors {
let py_objs = self
.0
.into_iter()
.map(|v| v.to_data().into_pyarrow(py).map(|bound| bound.unbind()))
.map(|v| v.to_data().into_pyarrow(py))
.collect::<Result<Vec<_>, _>>()?;
PyList::new(py, py_objs)
}
@@ -453,7 +453,7 @@ impl Query {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let schema = inner.output_schema().await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py).map(|bound| bound.unbind()))
Python::with_gil(|py| schema.to_pyarrow(py))
})
}
@@ -532,7 +532,7 @@ impl TakeQuery {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let schema = inner.output_schema().await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py).map(|bound| bound.unbind()))
Python::with_gil(|py| schema.to_pyarrow(py))
})
}
@@ -627,7 +627,7 @@ impl FTSQuery {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let schema = inner.output_schema().await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py).map(|bound| bound.unbind()))
Python::with_gil(|py| schema.to_pyarrow(py))
})
}
@@ -806,7 +806,7 @@ impl VectorQuery {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let schema = inner.output_schema().await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py).map(|bound| bound.unbind()))
Python::with_gil(|py| schema.to_pyarrow(py))
})
}

View File

@@ -17,7 +17,7 @@ use pyo3::types::PyDict;
/// Internal wrapper around a Python object implementing StorageOptionsProvider
pub struct PyStorageOptionsProvider {
/// The Python object implementing fetch_storage_options()
inner: Py<PyAny>,
inner: PyObject,
}
impl Clone for PyStorageOptionsProvider {
@@ -29,7 +29,7 @@ impl Clone for PyStorageOptionsProvider {
}
impl PyStorageOptionsProvider {
pub fn new(obj: Py<PyAny>) -> PyResult<Self> {
pub fn new(obj: PyObject) -> PyResult<Self> {
Python::with_gil(|py| {
// Verify the object has a fetch_storage_options method
if !obj.bind(py).hasattr("fetch_storage_options")? {
@@ -143,7 +143,7 @@ impl std::fmt::Debug for PyStorageOptionsProviderWrapper {
/// This is the main entry point for converting Python StorageOptionsProvider objects
/// to Rust trait objects that can be used by the Lance ecosystem.
pub fn py_object_to_storage_options_provider(
py_obj: Py<PyAny>,
py_obj: PyObject,
) -> PyResult<Arc<dyn StorageOptionsProvider>> {
let py_provider = PyStorageOptionsProvider::new(py_obj)?;
Ok(Arc::new(PyStorageOptionsProviderWrapper::new(py_provider)))

View File

@@ -287,7 +287,7 @@ impl Table {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let schema = inner.schema().await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py).map(|bound| bound.unbind()))
Python::with_gil(|py| schema.to_pyarrow(py))
})
}

View File

@@ -1325,27 +1325,25 @@ mod tests {
#[tokio::test]
async fn test_table_names() {
let tc = new_test_connection().await.unwrap();
let db = tc.connection;
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let tmp_dir = tempdir().unwrap();
let mut names = Vec::with_capacity(100);
for _ in 0..100 {
let name = uuid::Uuid::new_v4().to_string();
let mut name = uuid::Uuid::new_v4().to_string();
names.push(name.clone());
db.create_empty_table(name, schema.clone())
.execute()
.await
.unwrap();
name.push_str(".lance");
create_dir_all(tmp_dir.path().join(&name)).unwrap();
}
names.sort();
let tables = db.table_names().limit(100).execute().await.unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let tables = db.table_names().execute().await.unwrap();
assert_eq!(tables, names);
let tables = db
.table_names()
.start_after(&names[30])
.limit(100)
.execute()
.await
.unwrap();

View File

@@ -9,7 +9,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use lance_namespace::{
models::{
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
@@ -137,7 +137,6 @@ impl Database for LanceNamespaceDatabase {
id: Some(request.namespace),
page_token: request.start_after,
limit: request.limit.map(|l| l as i32),
..Default::default()
};
let response = self.namespace.list_tables(ns_request).await?;
@@ -155,7 +154,6 @@ impl Database for LanceNamespaceDatabase {
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
version: None,
..Default::default()
};
let describe_result = self.namespace.describe_table(describe_request).await;
@@ -173,7 +171,6 @@ impl Database for LanceNamespaceDatabase {
// Drop the existing table - must succeed
let drop_request = DropTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
self.namespace
.drop_table(drop_request)
@@ -205,24 +202,29 @@ impl Database for LanceNamespaceDatabase {
let mut table_id = request.namespace.clone();
table_id.push(request.name.clone());
let declare_request = DeclareTableRequest {
let create_empty_request = CreateEmptyTableRequest {
id: Some(table_id.clone()),
location: None,
vend_credentials: None,
..Default::default()
properties: if self.storage_options.is_empty() {
None
} else {
Some(self.storage_options.clone())
},
};
let declare_response = self
let create_empty_response = self
.namespace
.declare_table(declare_request)
.create_empty_table(create_empty_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to declare table: {}", e),
message: format!("Failed to create empty table: {}", e),
})?;
let location = declare_response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
let location = create_empty_response
.location
.ok_or_else(|| Error::Runtime {
message: "Table location is missing from create_empty_table response".to_string(),
})?;
let native_table = NativeTable::create_from_namespace(
self.namespace.clone(),
@@ -279,10 +281,7 @@ impl Database for LanceNamespaceDatabase {
let mut table_id = namespace.to_vec();
table_id.push(name.to_string());
let drop_request = DropTableRequest {
id: Some(table_id),
..Default::default()
};
let drop_request = DropTableRequest { id: Some(table_id) };
self.namespace
.drop_table(drop_request)
.await
@@ -439,7 +438,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -501,7 +499,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -566,7 +563,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -651,7 +647,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -708,7 +703,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -790,7 +784,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -825,7 +818,6 @@ mod tests {
id: Some(vec!["test_ns".into()]),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");

View File

@@ -120,13 +120,8 @@ impl MemoryRegistry {
}
/// A record batch reader that has embeddings applied to it
///
/// This is a wrapper around another record batch reader that applies embedding functions
/// when reading from the record batch.
///
/// When multiple embedding functions are defined, they are computed in parallel using
/// scoped threads to improve performance. For a single embedding function, computation
/// is done inline without threading overhead.
/// This is a wrapper around another record batch reader that applies an embedding function
/// when reading from the record batch
pub struct WithEmbeddings<R: RecordBatchReader> {
inner: R,
embeddings: Vec<(EmbeddingDefinition, Arc<dyn EmbeddingFunction>)>,
@@ -240,48 +235,6 @@ impl<R: RecordBatchReader> WithEmbeddings<R> {
column_definitions,
})
}
fn compute_embeddings_parallel(&self, batch: &RecordBatch) -> Result<Vec<Arc<dyn Array>>> {
if self.embeddings.len() == 1 {
let (fld, func) = &self.embeddings[0];
let src_column =
batch
.column_by_name(&fld.source_column)
.ok_or_else(|| Error::InvalidInput {
message: format!("Source column '{}' not found", fld.source_column),
})?;
return Ok(vec![func.compute_source_embeddings(src_column.clone())?]);
}
// Parallel path: multiple embeddings
std::thread::scope(|s| {
let handles: Vec<_> = self
.embeddings
.iter()
.map(|(fld, func)| {
let src_column = batch.column_by_name(&fld.source_column).ok_or_else(|| {
Error::InvalidInput {
message: format!("Source column '{}' not found", fld.source_column),
}
})?;
let handle =
s.spawn(move || func.compute_source_embeddings(src_column.clone()));
Ok(handle)
})
.collect::<Result<_>>()?;
handles
.into_iter()
.map(|h| {
h.join().map_err(|e| Error::Runtime {
message: format!("Thread panicked during embedding computation: {:?}", e),
})?
})
.collect()
})
}
}
impl<R: RecordBatchReader> Iterator for MaybeEmbedded<R> {
@@ -309,19 +262,19 @@ impl<R: RecordBatchReader> Iterator for WithEmbeddings<R> {
fn next(&mut self) -> Option<Self::Item> {
let batch = self.inner.next()?;
match batch {
Ok(batch) => {
let embeddings = match self.compute_embeddings_parallel(&batch) {
Ok(emb) => emb,
Err(e) => {
return Some(Err(arrow_schema::ArrowError::ComputeError(format!(
"Error computing embedding: {}",
e
))))
}
};
let mut batch = batch;
for ((fld, _), embedding) in self.embeddings.iter().zip(embeddings.iter()) {
Ok(mut batch) => {
// todo: parallelize this
for (fld, func) in self.embeddings.iter() {
let src_column = batch.column_by_name(&fld.source_column).unwrap();
let embedding = match func.compute_source_embeddings(src_column.clone()) {
Ok(embedding) => embedding,
Err(e) => {
return Some(Err(arrow_schema::ArrowError::ComputeError(format!(
"Error computing embedding: {}",
e
))))
}
};
let dst_field_name = fld
.dest_column
.clone()
@@ -333,7 +286,7 @@ impl<R: RecordBatchReader> Iterator for WithEmbeddings<R> {
embedding.nulls().is_some(),
);
match batch.try_with_column(dst_field.clone(), embedding.clone()) {
match batch.try_with_column(dst_field.clone(), embedding) {
Ok(b) => batch = b,
Err(e) => return Some(Err(e)),
};

View File

@@ -1720,7 +1720,6 @@ mod tests {
id: Some(namespace.clone()),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -1747,7 +1746,6 @@ mod tests {
id: Some(namespace.clone()),
page_token: None,
limit: None,
..Default::default()
})
.await
.expect("Failed to list tables");
@@ -1760,7 +1758,6 @@ mod tests {
id: Some(namespace.clone()),
page_token: None,
limit: None,
..Default::default()
})
.await
.unwrap();
@@ -1802,7 +1799,6 @@ mod tests {
id: Some(namespace.clone()),
mode: None,
properties: None,
..Default::default()
})
.await
.expect("Failed to create namespace");
@@ -1829,7 +1825,6 @@ mod tests {
id: Some(namespace.clone()),
page_token: None,
limit: None,
..Default::default()
})
.await
.unwrap();

View File

@@ -1088,17 +1088,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfRq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_RQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
if let Some(num_bits) = index.num_bits {
body["num_bits"] = serde_json::Value::Number(num_bits.into());
}
}
Index::BTree(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BTREE".to_string());
}

View File

@@ -42,8 +42,8 @@ use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_io::object_store::LanceNamespaceStorageOptionsProvider;
use lance_namespace::models::{
QueryTableRequest as NsQueryTableRequest, QueryTableRequestColumns,
QueryTableRequestFullTextQuery, QueryTableRequestVector, StringFtsQuery,
QueryTableRequest as NsQueryTableRequest, QueryTableRequestFullTextQuery,
QueryTableRequestVector, StringFtsQuery,
};
use lance_namespace::LanceNamespace;
use lance_table::format::Manifest;
@@ -1424,7 +1424,7 @@ impl Table {
})
.collect::<Vec<_>>();
let unioned = UnionExec::try_new(projected_plans).unwrap();
let unioned = Arc::new(UnionExec::new(projected_plans));
// We require 1 partition in the final output
let repartitioned = RepartitionExec::try_new(
unioned,
@@ -2332,18 +2332,6 @@ impl NativeTable {
}
}
/// Convert selected columns into the namespace request format.
fn namespace_columns(&self, columns: &[String]) -> Option<Box<QueryTableRequestColumns>> {
if columns.is_empty() {
None
} else {
Some(Box::new(QueryTableRequestColumns {
column_names: Some(columns.to_vec()),
column_aliases: None,
}))
}
}
/// Convert an AnyQuery to the namespace QueryTableRequest format.
fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result<NsQueryTableRequest> {
match query {
@@ -2360,7 +2348,7 @@ impl NativeTable {
// Convert select to columns list
let columns = match &vq.base.select {
Select::All => None,
Select::Columns(cols) => self.namespace_columns(cols),
Select::Columns(cols) => Some(cols.clone()),
Select::Dynamic(_) => {
return Err(Error::NotSupported {
message:
@@ -2395,8 +2383,6 @@ impl NativeTable {
});
Ok(NsQueryTableRequest {
identity: None,
context: None,
id: None, // Will be set in namespace_query
k: vq.base.limit.unwrap_or(10) as i32,
vector: Box::new(vector),
@@ -2435,7 +2421,7 @@ impl NativeTable {
let columns = match &q.select {
Select::All => None,
Select::Columns(cols) => self.namespace_columns(cols),
Select::Columns(cols) => Some(cols.clone()),
Select::Dynamic(_) => {
return Err(Error::NotSupported {
message: "Dynamic columns are not supported for server-side query"
@@ -2467,8 +2453,6 @@ impl NativeTable {
});
Ok(NsQueryTableRequest {
identity: None,
context: None,
id: None, // Will be set by caller
vector,
k: q.limit.unwrap_or(10) as i32,
@@ -5162,13 +5146,7 @@ mod tests {
assert_eq!(ns_request.k, 10);
assert_eq!(ns_request.offset, Some(5));
assert_eq!(ns_request.filter, Some("id > 0".to_string()));
let column_names = ns_request
.columns
.as_ref()
.and_then(|c| c.column_names.as_ref())
.cloned()
.unwrap();
assert_eq!(column_names, vec!["id".to_string()]);
assert_eq!(ns_request.columns, Some(vec!["id".to_string()]));
assert_eq!(ns_request.vector_column, Some("vector".to_string()));
assert_eq!(ns_request.distance_type, Some("l2".to_string()));
assert!(ns_request.vector.single_vector.is_some());
@@ -5209,13 +5187,7 @@ mod tests {
assert_eq!(ns_request.k, 20);
assert_eq!(ns_request.offset, Some(5));
assert_eq!(ns_request.filter, Some("id > 5".to_string()));
let column_names = ns_request
.columns
.as_ref()
.and_then(|c| c.column_names.as_ref())
.cloned()
.unwrap();
assert_eq!(column_names, vec!["id".to_string()]);
assert_eq!(ns_request.columns, Some(vec!["id".to_string()]));
assert_eq!(ns_request.with_row_id, Some(true));
assert_eq!(ns_request.bypass_vector_index, Some(true));
assert!(ns_request.vector_column.is_none()); // No vector column for plain queries

View File

@@ -100,8 +100,7 @@ impl DatasetRef {
let should_checkout = match &target_ref {
refs::Ref::Version(_, Some(target_ver)) => version != target_ver,
refs::Ref::Version(_, None) => true, // No specific version, always checkout
refs::Ref::VersionNumber(target_ver) => version != target_ver,
refs::Ref::Tag(_) => true, // Always checkout for tags
refs::Ref::Tag(_) => true, // Always checkout for tags
};
if should_checkout {

View File

@@ -5,19 +5,16 @@
use regex::Regex;
use std::env;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, ChildStdout, Command};
use tokio::sync::mpsc;
use std::io::{BufRead, BufReader};
use std::process::{Child, ChildStdout, Command, Stdio};
use crate::{connect, Connection};
use anyhow::{anyhow, bail, Result};
use anyhow::{bail, Result};
use tempfile::{tempdir, TempDir};
pub struct TestConnection {
pub uri: String,
pub connection: Connection,
pub is_remote: bool,
_temp_dir: Option<TempDir>,
_process: Option<TestProcess>,
}
@@ -40,56 +37,6 @@ pub async fn new_test_connection() -> Result<TestConnection> {
}
}
async fn spawn_stdout_reader(
mut stdout: BufReader<ChildStdout>,
port_sender: mpsc::Sender<anyhow::Result<String>>,
) -> tokio::task::JoinHandle<()> {
let print_stdout = env::var("PRINT_LANCEDB_TEST_CONNECTION_SCRIPT_OUTPUT").is_ok();
tokio::spawn(async move {
let mut line = String::new();
let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap();
loop {
line.clear();
let result = stdout.read_line(&mut line).await;
if let Err(err) = result {
port_sender
.send(Err(anyhow!(
"error while reading from process output: {}",
err
)))
.await
.unwrap();
return;
} else if result.unwrap() == 0 {
port_sender
.send(Err(anyhow!(
" hit EOF before reading port from process output."
)))
.await
.unwrap();
return;
}
if re.is_match(&line) {
let caps = re.captures(&line).unwrap();
port_sender.send(Ok(caps[1].to_string())).await.unwrap();
break;
}
}
loop {
line.clear();
match stdout.read_line(&mut line).await {
Err(_) => return,
Ok(0) => return,
Ok(_size) => {
if print_stdout {
print!("{}", line);
}
}
}
}
})
}
async fn new_remote_connection(script_path: &str) -> Result<TestConnection> {
let temp_dir = tempdir()?;
let data_path = temp_dir.path().to_str().unwrap().to_string();
@@ -110,25 +57,38 @@ async fn new_remote_connection(script_path: &str) -> Result<TestConnection> {
child: child_result.unwrap(),
};
let stdout = BufReader::new(process.child.stdout.take().unwrap());
let (port_sender, mut port_receiver) = mpsc::channel(5);
let _reader = spawn_stdout_reader(stdout, port_sender).await;
let port = match port_receiver.recv().await {
None => bail!("Unable to determine the port number used by the phalanx process we spawned, because the reader thread was closed too soon."),
Some(Err(err)) => bail!("Unable to determine the port number used by the phalanx process we spawned, because of an error, {}", err),
Some(Ok(port)) => port,
};
let port = read_process_port(stdout)?;
let uri = "db://test";
let host_override = format!("http://localhost:{}", port);
let connection = create_new_connection(uri, &host_override).await?;
Ok(TestConnection {
uri: uri.to_string(),
connection,
is_remote: true,
_temp_dir: Some(temp_dir),
_process: Some(process),
})
}
fn read_process_port(mut stdout: BufReader<ChildStdout>) -> Result<String> {
let mut line = String::new();
let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap();
loop {
let result = stdout.read_line(&mut line);
if let Err(err) = result {
bail!(format!(
"read_process_port: error while reading from process output: {}",
err
));
} else if result.unwrap() == 0 {
bail!("read_process_port: hit EOF before reading port from process output.");
}
if re.is_match(&line) {
let caps = re.captures(&line).unwrap();
return Ok(caps[1].to_string());
}
}
}
#[cfg(feature = "remote")]
async fn create_new_connection(uri: &str, host_override: &str) -> crate::error::Result<Connection> {
connect(uri)
@@ -154,7 +114,6 @@ async fn new_local_connection() -> Result<TestConnection> {
Ok(TestConnection {
uri: uri.to_string(),
connection,
is_remote: false,
_temp_dir: Some(temp_dir),
_process: None,
})

View File

@@ -1,253 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{
borrow::Cow,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use arrow::buffer::NullBuffer;
use arrow_array::{
Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use lancedb::{
embeddings::{EmbeddingDefinition, EmbeddingFunction, MaybeEmbedded, WithEmbeddings},
Error, Result,
};
#[derive(Debug)]
struct SlowMockEmbed {
name: String,
dim: usize,
delay_ms: u64,
call_count: Arc<AtomicUsize>,
}
impl SlowMockEmbed {
pub fn new(name: String, dim: usize, delay_ms: u64) -> Self {
Self {
name,
dim,
delay_ms,
call_count: Arc::new(AtomicUsize::new(0)),
}
}
pub fn get_call_count(&self) -> usize {
self.call_count.load(Ordering::SeqCst)
}
}
impl EmbeddingFunction for SlowMockEmbed {
fn name(&self) -> &str {
&self.name
}
fn source_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::Utf8))
}
fn dest_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::new_fixed_size_list(
DataType::Float32,
self.dim as _,
true,
)))
}
fn compute_source_embeddings(&self, source: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
// Simulate slow embedding computation
std::thread::sleep(Duration::from_millis(self.delay_ms));
self.call_count.fetch_add(1, Ordering::SeqCst);
let len = source.len();
let inner = Arc::new(Float32Array::from(vec![Some(1.0); len * self.dim]));
let field = Field::new("item", inner.data_type().clone(), false);
let arr = FixedSizeListArray::new(
Arc::new(field),
self.dim as _,
inner,
Some(NullBuffer::new_valid(len)),
);
Ok(Arc::new(arr))
}
fn compute_query_embeddings(&self, _input: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
unimplemented!()
}
}
fn create_test_batch() -> Result<RecordBatch> {
let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
let text = StringArray::from(vec!["hello", "world"]);
RecordBatch::try_new(schema, vec![Arc::new(text)]).map_err(|e| Error::Runtime {
message: format!("Failed to create test batch: {}", e),
})
}
#[test]
fn test_single_embedding_fast_path() {
// Single embedding should execute without spawning threads
let batch = create_test_batch().unwrap();
let schema = batch.schema();
let embed = Arc::new(SlowMockEmbed::new("test".to_string(), 2, 10));
let embedding_def = EmbeddingDefinition::new("text", "test", Some("embedding"));
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let embeddings = vec![(embedding_def, embed.clone() as Arc<dyn EmbeddingFunction>)];
let mut with_embeddings = WithEmbeddings::new(reader, embeddings);
let result = with_embeddings.next().unwrap().unwrap();
assert!(result.column_by_name("embedding").is_some());
assert_eq!(embed.get_call_count(), 1);
}
#[test]
fn test_multiple_embeddings_parallel() {
// Multiple embeddings should execute in parallel
let batch = create_test_batch().unwrap();
let schema = batch.schema();
let embed1 = Arc::new(SlowMockEmbed::new("embed1".to_string(), 2, 100));
let embed2 = Arc::new(SlowMockEmbed::new("embed2".to_string(), 3, 100));
let embed3 = Arc::new(SlowMockEmbed::new("embed3".to_string(), 4, 100));
let def1 = EmbeddingDefinition::new("text", "embed1", Some("emb1"));
let def2 = EmbeddingDefinition::new("text", "embed2", Some("emb2"));
let def3 = EmbeddingDefinition::new("text", "embed3", Some("emb3"));
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let embeddings = vec![
(def1, embed1.clone() as Arc<dyn EmbeddingFunction>),
(def2, embed2.clone() as Arc<dyn EmbeddingFunction>),
(def3, embed3.clone() as Arc<dyn EmbeddingFunction>),
];
let mut with_embeddings = WithEmbeddings::new(reader, embeddings);
let result = with_embeddings.next().unwrap().unwrap();
// Verify all embedding columns are present
assert!(result.column_by_name("emb1").is_some());
assert!(result.column_by_name("emb2").is_some());
assert!(result.column_by_name("emb3").is_some());
// Verify all embeddings were computed
assert_eq!(embed1.get_call_count(), 1);
assert_eq!(embed2.get_call_count(), 1);
assert_eq!(embed3.get_call_count(), 1);
}
#[test]
fn test_embedding_column_order_preserved() {
// Verify that embedding columns are added in the same order as definitions
let batch = create_test_batch().unwrap();
let schema = batch.schema();
let embed1 = Arc::new(SlowMockEmbed::new("embed1".to_string(), 2, 10));
let embed2 = Arc::new(SlowMockEmbed::new("embed2".to_string(), 3, 10));
let embed3 = Arc::new(SlowMockEmbed::new("embed3".to_string(), 4, 10));
let def1 = EmbeddingDefinition::new("text", "embed1", Some("first"));
let def2 = EmbeddingDefinition::new("text", "embed2", Some("second"));
let def3 = EmbeddingDefinition::new("text", "embed3", Some("third"));
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let embeddings = vec![
(def1, embed1 as Arc<dyn EmbeddingFunction>),
(def2, embed2 as Arc<dyn EmbeddingFunction>),
(def3, embed3 as Arc<dyn EmbeddingFunction>),
];
let mut with_embeddings = WithEmbeddings::new(reader, embeddings);
let result = with_embeddings.next().unwrap().unwrap();
let result_schema = result.schema();
// Original column is first
assert_eq!(result_schema.field(0).name(), "text");
// Embedding columns follow in order
assert_eq!(result_schema.field(1).name(), "first");
assert_eq!(result_schema.field(2).name(), "second");
assert_eq!(result_schema.field(3).name(), "third");
}
#[test]
fn test_embedding_error_propagation() {
// Test that errors from embedding computation are properly propagated
#[derive(Debug)]
struct FailingEmbed {
name: String,
}
impl EmbeddingFunction for FailingEmbed {
fn name(&self) -> &str {
&self.name
}
fn source_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::Utf8))
}
fn dest_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::new_fixed_size_list(
DataType::Float32,
2,
true,
)))
}
fn compute_source_embeddings(&self, _source: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
Err(Error::Runtime {
message: "Intentional failure".to_string(),
})
}
fn compute_query_embeddings(&self, _input: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
unimplemented!()
}
}
let batch = create_test_batch().unwrap();
let schema = batch.schema();
let embed = Arc::new(FailingEmbed {
name: "failing".to_string(),
});
let def = EmbeddingDefinition::new("text", "failing", Some("emb"));
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let embeddings = vec![(def, embed as Arc<dyn EmbeddingFunction>)];
let mut with_embeddings = WithEmbeddings::new(reader, embeddings);
let result = with_embeddings.next().unwrap();
assert!(result.is_err());
let err_msg = format!("{}", result.err().unwrap());
assert!(err_msg.contains("Intentional failure"));
}
#[test]
fn test_maybe_embedded_with_no_embeddings() {
// Test that MaybeEmbedded::No variant works correctly
let batch = create_test_batch().unwrap();
let schema = batch.schema();
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone());
let table_def = lancedb::table::TableDefinition {
schema: schema.clone(),
column_definitions: vec![lancedb::table::ColumnDefinition {
kind: lancedb::table::ColumnKind::Physical,
}],
};
let mut maybe_embedded = MaybeEmbedded::try_new(reader, table_def, None).unwrap();
let result = maybe_embedded.next().unwrap().unwrap();
assert_eq!(result.num_columns(), 1);
assert_eq!(result.column(0).as_ref(), batch.column(0).as_ref());
}