Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
65e67693a7 Bump version: 0.26.2 → 0.27.0-beta.0 2026-02-17 00:28:07 +00:00
31 changed files with 1557 additions and 2444 deletions

123
Cargo.lock generated
View File

@@ -1389,9 +1389,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.11.1"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "bytes-utils"
@@ -1783,16 +1783,6 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@@ -3082,8 +3072,9 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9e5c0b1c67a38cb92b41535d44623483beb9511592ae23a3bf42ddec758690"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4414,8 +4405,9 @@ dependencies = [
[[package]]
name = "lance"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b7f07b905df393a5554eba19055c620f9ea25a3e40a013bda4bd8dc4ca66f01"
dependencies = [
"arrow",
"arrow-arith",
@@ -4434,7 +4426,6 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"crossbeam-skiplist",
"dashmap",
"datafusion",
"datafusion-expr",
@@ -4474,7 +4465,6 @@ dependencies = [
"tantivy",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
"url",
"uuid",
@@ -4482,8 +4472,9 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "100e076cb81c8f0c24cd2881c706fc53e037c7d6e81eb320e929e265d157effb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4502,8 +4493,9 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588318d3d1ba0f97162fab39a323a0a49866bb35b32af42572c6b6a12296fa27"
dependencies = [
"arrayref",
"paste",
@@ -4512,8 +4504,9 @@ dependencies = [
[[package]]
name = "lance-core"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fa01d1cf490ccfd3b8eaeee2781415d0419e6be8366040e57e43677abf2644e"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4550,8 +4543,9 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef89a39e3284eef76f79e63f23de8881a0583ad6feb20ed39f47eadd847a2b88"
dependencies = [
"arrow",
"arrow-array",
@@ -4574,7 +4568,6 @@ dependencies = [
"log",
"pin-project",
"prost",
"prost-build",
"snafu",
"tokio",
"tracing",
@@ -4582,8 +4575,9 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2a60eef5c47e65d91e2ffa8e7e1629c52e7190c8b88a371a1a60601dc49371"
dependencies = [
"arrow",
"arrow-array",
@@ -4601,8 +4595,9 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ce4a6631308aa681b2671af8f2a845ff781f8d4e755a2a7ccd012379467094"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4639,8 +4634,9 @@ dependencies = [
[[package]]
name = "lance-file"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d4d82357cbfaa1a18494226c15b1cb3c8ed0b6c84b91146323c82047ede419"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4672,8 +4668,9 @@ dependencies = [
[[package]]
name = "lance-geo"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7183fc870da62826f0f97df8007b634da053eb310157856efe1dc74f446951c"
dependencies = [
"datafusion",
"geo-traits",
@@ -4687,8 +4684,9 @@ dependencies = [
[[package]]
name = "lance-index"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20e9c5aa7024a63af9ae89ee8c0f23c8421b7896742e5cd4a271a60f9956cb80"
dependencies = [
"arrow",
"arrow-arith",
@@ -4755,8 +4753,9 @@ dependencies = [
[[package]]
name = "lance-io"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7d2af0b17fb374a8181bcf1a10bce5703ae3ee4373c1587ce4bba23e15e45c8"
dependencies = [
"arrow",
"arrow-arith",
@@ -4789,7 +4788,6 @@ dependencies = [
"serde",
"shellexpand",
"snafu",
"tempfile",
"tokio",
"tracing",
"url",
@@ -4797,8 +4795,9 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5125aa62696e75a7475807564b4921f252d8815be606b84bc00e6def0f5c24bb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4814,8 +4813,9 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70545c2676ce954dfd801da5c6a631a70bba967826cd3a8f31b47d1f04bbfed3"
dependencies = [
"arrow",
"async-trait",
@@ -4827,8 +4827,9 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92519f9f27d62655030aac62ea0db9614b65f086ebe651c1b0a96e351b668022"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4843,7 +4844,6 @@ dependencies = [
"lance-index",
"lance-io",
"lance-namespace",
"lance-table",
"log",
"object_store",
"rand 0.9.2",
@@ -4859,9 +4859,9 @@ dependencies = [
[[package]]
name = "lance-namespace-reqwest-client"
version = "0.5.2"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ad4c947349acd6e37e984eba0254588bd894e6128434338b9e6904e56fb4633"
checksum = "a2acdba67f84190067532fce07b51a435dd390d7cdc1129a05003e5cb3274cf0"
dependencies = [
"reqwest",
"serde",
@@ -4872,8 +4872,9 @@ dependencies = [
[[package]]
name = "lance-table"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06ad37bd90045de8ef533df170c6098e6ff6ecb427aade47d7db8e2c86f2678"
dependencies = [
"arrow",
"arrow-array",
@@ -4912,8 +4913,9 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "3.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-beta.4#649b4871dbcbc4f26586e33677b1f9535d9a7f63"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd7f13b0f2b6337af015dcb1519645388dca08c970037aa77aff517687c4019f"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4924,7 +4926,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.27.0-beta.0"
version = "0.26.2"
dependencies = [
"ahash",
"anyhow",
@@ -5004,7 +5006,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.27.0-beta.0"
version = "0.26.2"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -5024,7 +5026,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.30.0-beta.0"
version = "0.29.2"
dependencies = [
"arrow",
"async-trait",
@@ -5626,10 +5628,11 @@ dependencies = [
[[package]]
name = "num-bigint-dig"
version = "0.8.6"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7"
checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
dependencies = [
"byteorder",
"lazy_static",
"libm",
"num-integer",
@@ -7271,9 +7274,9 @@ dependencies = [
[[package]]
name = "roaring"
version = "0.11.3"
version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885"
checksum = "19e8d2cfa184d94d0726d650a9f4a1be7f9b76ac9fdb954219878dc00c1c1e7b"
dependencies = [
"bytemuck",
"byteorder",

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.88.0"
[workspace.dependencies]
lance = { "version" = "=3.0.0-beta.4", default-features = false, "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-beta.4", default-features = false, "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-beta.4", default-features = false, "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-beta.4", "tag" = "v3.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=2.0.1", default-features = false }
lance-core = "=2.0.1"
lance-datagen = "=2.0.1"
lance-file = "=2.0.1"
lance-io = { "version" = "=2.0.1", default-features = false }
lance-index = "=2.0.1"
lance-linalg = "=2.0.1"
lance-namespace = "=2.0.1"
lance-namespace-impls = { "version" = "=2.0.1", default-features = false }
lance-table = "=2.0.1"
lance-testing = "=2.0.1"
lance-datafusion = "=2.0.1"
lance-encoding = "=2.0.1"
lance-arrow = "=2.0.1"
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>3.0.0-beta.4</lance-core.version>
<lance-core.version>2.0.1</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.0",
"version": "0.26.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.0",
"version": "0.26.2",
"cpu": [
"x64",
"arm64"

View File

@@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import warnings
from typing import List, Union
import numpy as np
@@ -16,8 +15,6 @@ from .utils import weak_lru
@register("gte-text")
class GteEmbeddings(TextEmbeddingFunction):
"""
Deprecated: GTE embeddings should be used through sentence-transformers.
An embedding function that uses GTE-LARGE MLX format(for Apple silicon devices only)
as well as the standard cpu/gpu version from: https://huggingface.co/thenlper/gte-large.
@@ -64,13 +61,6 @@ class GteEmbeddings(TextEmbeddingFunction):
def __init__(self, **kwargs):
super().__init__(**kwargs)
warnings.warn(
"GTE embeddings as a standalone embedding function are deprecated. "
"Use the 'sentence-transformers' embedding function with a GTE model "
"instead.",
DeprecationWarning,
stacklevel=3,
)
self._ndims = None
if kwargs:
self.mlx = kwargs.get("mlx", False)

View File

@@ -110,9 +110,6 @@ class OpenAIEmbeddings(TextEmbeddingFunction):
valid_embeddings = {
idx: v.embedding for v, idx in zip(rs.data, valid_indices)
}
except openai.AuthenticationError:
logging.error("Authentication failed: Invalid API key provided")
raise
except openai.BadRequestError:
logging.exception("Bad request: %s", texts)
return [None] * len(texts)

View File

@@ -6,7 +6,6 @@ import io
import os
from typing import TYPE_CHECKING, List, Union
import urllib.parse as urlparse
import warnings
import numpy as np
import pyarrow as pa
@@ -25,7 +24,6 @@ if TYPE_CHECKING:
@register("siglip")
class SigLipEmbeddings(EmbeddingFunction):
# Deprecated: prefer CLIP embeddings via `open-clip`.
model_name: str = "google/siglip-base-patch16-224"
device: str = "cpu"
batch_size: int = 64
@@ -38,12 +36,6 @@ class SigLipEmbeddings(EmbeddingFunction):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
warnings.warn(
"SigLip embeddings are deprecated. Use CLIP embeddings via the "
"'open-clip' embedding function instead.",
DeprecationWarning,
stacklevel=3,
)
transformers = attempt_import_or_raise("transformers")
self._torch = attempt_import_or_raise("torch")

View File

@@ -269,11 +269,6 @@ def retry_with_exponential_backoff(
# and say that it is assumed that if this portion errors out, it's due
# to rate limit but the user should check the error message to be sure.
except Exception as e: # noqa: PERF203
# Don't retry on authentication errors (e.g., OpenAI 401)
# These are permanent failures that won't be fixed by retrying
if _is_non_retryable_error(e):
raise
num_retries += 1
if num_retries > max_retries:
@@ -294,29 +289,6 @@ def retry_with_exponential_backoff(
return wrapper
def _is_non_retryable_error(error: Exception) -> bool:
"""Check if an error should not be retried.
Args:
error: The exception to check
Returns:
True if the error should not be retried, False otherwise
"""
# Check for OpenAI authentication errors
error_type = type(error).__name__
if error_type == "AuthenticationError":
return True
# Check for other common non-retryable HTTP status codes
# 401 Unauthorized, 403 Forbidden
if hasattr(error, "status_code"):
if error.status_code in (401, 403):
return True
return False
def url_retrieve(url: str):
"""
Parameters

View File

@@ -44,7 +44,7 @@ from lance_namespace import (
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
DeclareTableRequest,
CreateEmptyTableRequest,
)
from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name
@@ -318,20 +318,20 @@ class LanceNamespaceDBConnection(DBConnection):
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
declare_request = DeclareTableRequest(
create_empty_request = CreateEmptyTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._ns.declare_table(declare_request)
create_empty_response = self._ns.create_empty_table(create_empty_request)
if not declare_response.location:
if not create_empty_response.location:
raise ValueError(
"Table location is missing from declare_table response"
"Table location is missing from create_empty_table response"
)
location = declare_response.location
namespace_storage_options = declare_response.storage_options
location = create_empty_response.location
namespace_storage_options = create_empty_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -759,20 +759,20 @@ class AsyncLanceNamespaceDBConnection:
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
declare_request = DeclareTableRequest(
create_empty_request = CreateEmptyTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._ns.declare_table(declare_request)
create_empty_response = self._ns.create_empty_table(create_empty_request)
if not declare_response.location:
if not create_empty_response.location:
raise ValueError(
"Table location is missing from declare_table response"
"Table location is missing from create_empty_table response"
)
location = declare_response.location
namespace_storage_options = declare_response.storage_options
location = create_empty_response.location
namespace_storage_options = create_empty_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)

View File

@@ -1782,26 +1782,6 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
vector_results = LanceHybridQueryBuilder._rank(vector_results, "_distance")
fts_results = LanceHybridQueryBuilder._rank(fts_results, "_score")
# If both result sets are empty (e.g. after hard filtering),
# return early to avoid errors in reranking or score restoration.
if vector_results.num_rows == 0 and fts_results.num_rows == 0:
# Build a minimal empty table with the _relevance_score column
combined_schema = pa.unify_schemas(
[vector_results.schema, fts_results.schema],
)
empty = pa.table(
{
col: pa.array([], type=combined_schema.field(col).type)
for col in combined_schema.names
}
)
empty = empty.append_column(
"_relevance_score", pa.array([], type=pa.float32())
)
if not with_row_ids and "_rowid" in empty.column_names:
empty = empty.drop(["_rowid"])
return empty
original_distances = None
original_scores = None
original_distance_row_ids = None

View File

@@ -515,34 +515,3 @@ def test_openai_propagates_api_key(monkeypatch):
query = "greetings"
actual = table.search(query).limit(1).to_pydantic(Words)[0]
assert len(actual.text) > 0
@patch("time.sleep")
def test_openai_no_retry_on_401(mock_sleep):
"""
Test that OpenAI embedding function does not retry on 401 authentication
errors.
"""
from lancedb.embeddings.utils import retry_with_exponential_backoff
# Create a mock that raises an AuthenticationError
class MockAuthenticationError(Exception):
"""Mock OpenAI AuthenticationError"""
pass
MockAuthenticationError.__name__ = "AuthenticationError"
mock_func = MagicMock(side_effect=MockAuthenticationError("Invalid API key"))
# Wrap the function with retry logic
wrapped_func = retry_with_exponential_backoff(mock_func, max_retries=3)
# Should raise without retrying
with pytest.raises(MockAuthenticationError):
wrapped_func()
# Verify that the function was only called once (no retries)
assert mock_func.call_count == 1
# Verify that sleep was never called (no retries)
assert mock_sleep.call_count == 0

View File

@@ -531,78 +531,6 @@ def test_empty_result_reranker():
)
def test_empty_hybrid_result_reranker():
"""Test that hybrid search with empty results after filtering doesn't crash.
Regression test for https://github.com/lancedb/lancedb/issues/2425
"""
from lancedb.query import LanceHybridQueryBuilder
# Simulate empty vector and FTS results with the expected schema
vector_schema = pa.schema(
[
("text", pa.string()),
("vector", pa.list_(pa.float32(), 4)),
("_rowid", pa.uint64()),
("_distance", pa.float32()),
]
)
fts_schema = pa.schema(
[
("text", pa.string()),
("vector", pa.list_(pa.float32(), 4)),
("_rowid", pa.uint64()),
("_score", pa.float32()),
]
)
empty_vector = pa.table(
{
"text": pa.array([], type=pa.string()),
"vector": pa.array([], type=pa.list_(pa.float32(), 4)),
"_rowid": pa.array([], type=pa.uint64()),
"_distance": pa.array([], type=pa.float32()),
},
schema=vector_schema,
)
empty_fts = pa.table(
{
"text": pa.array([], type=pa.string()),
"vector": pa.array([], type=pa.list_(pa.float32(), 4)),
"_rowid": pa.array([], type=pa.uint64()),
"_score": pa.array([], type=pa.float32()),
},
schema=fts_schema,
)
for reranker in [LinearCombinationReranker(), RRFReranker()]:
result = LanceHybridQueryBuilder._combine_hybrid_results(
fts_results=empty_fts,
vector_results=empty_vector,
norm="score",
fts_query="nonexistent query",
reranker=reranker,
limit=10,
with_row_ids=False,
)
assert len(result) == 0
assert "_relevance_score" in result.column_names
assert "_rowid" not in result.column_names
# Also test with with_row_ids=True
result = LanceHybridQueryBuilder._combine_hybrid_results(
fts_results=empty_fts,
vector_results=empty_vector,
norm="score",
fts_query="nonexistent query",
reranker=LinearCombinationReranker(),
limit=10,
with_row_ids=True,
)
assert len(result) == 0
assert "_relevance_score" in result.column_names
assert "_rowid" in result.column_names
@pytest.mark.parametrize("use_tantivy", [True, False])
def test_cross_encoder_reranker_return_all(tmp_path, use_tantivy):
pytest.importorskip("sentence_transformers")

View File

@@ -292,14 +292,18 @@ class TestModel(lancedb.pydantic.LanceModel):
lambda: pa.table({"a": [1], "b": [2]}),
lambda: pa.table({"a": [1], "b": [2]}).to_reader(),
lambda: iter(pa.table({"a": [1], "b": [2]}).to_batches()),
lambda: lance.write_dataset(
pa.table({"a": [1], "b": [2]}),
"memory://test",
lambda: (
lance.write_dataset(
pa.table({"a": [1], "b": [2]}),
"memory://test",
)
),
lambda: (
lance.write_dataset(
pa.table({"a": [1], "b": [2]}),
"memory://test",
).scanner()
),
lambda: lance.write_dataset(
pa.table({"a": [1], "b": [2]}),
"memory://test",
).scanner(),
lambda: pd.DataFrame({"a": [1], "b": [2]}),
lambda: pl.DataFrame({"a": [1], "b": [2]}),
lambda: pl.LazyFrame({"a": [1], "b": [2]}),

View File

@@ -23,25 +23,10 @@ use pyo3::{
};
use pyo3_async_runtimes::tokio::future_into_py;
fn table_from_py<'a>(table: Bound<'a, PyAny>) -> PyResult<Bound<'a, Table>> {
if table.hasattr("_inner")? {
Ok(table.getattr("_inner")?.downcast_into::<Table>()?)
} else if table.hasattr("_table")? {
Ok(table
.getattr("_table")?
.getattr("_inner")?
.downcast_into::<Table>()?)
} else {
Err(PyRuntimeError::new_err(
"Provided table does not appear to be a Table or RemoteTable instance",
))
}
}
/// Create a permutation builder for the given table
#[pyo3::pyfunction]
pub fn async_permutation_builder(table: Bound<'_, PyAny>) -> PyResult<PyAsyncPermutationBuilder> {
let table = table_from_py(table)?;
let table = table.getattr("_inner")?.downcast_into::<Table>()?;
let inner_table = table.borrow().inner_ref()?.clone();
let inner_builder = LancePermutationBuilder::new(inner_table);
@@ -265,8 +250,10 @@ impl PyPermutationReader {
permutation_table: Option<Bound<'py, PyAny>>,
split: u64,
) -> PyResult<Bound<'py, PyAny>> {
let base_table = table_from_py(base_table)?;
let permutation_table = permutation_table.map(table_from_py).transpose()?;
let base_table = base_table.getattr("_inner")?.downcast_into::<Table>()?;
let permutation_table = permutation_table
.map(|p| PyResult::Ok(p.getattr("_inner")?.downcast_into::<Table>()?))
.transpose()?;
let base_table = base_table.borrow().inner_ref()?.base_table().clone();
let permutation_table = permutation_table

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.91.0"
channel = "1.90.0"

View File

@@ -85,10 +85,8 @@ pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableRequest) -> OpenTableReq
/// Describes what happens when creating a table and a table with
/// the same name already exists
#[derive(Default)]
pub enum CreateTableMode {
/// If the table already exists, an error is returned
#[default]
Create,
/// If the table already exists, it is opened. Any provided data is
/// ignored. The function will be passed an OpenTableBuilder to customize
@@ -106,6 +104,12 @@ impl CreateTableMode {
}
}
impl Default for CreateTableMode {
fn default() -> Self {
Self::Create
}
}
/// A request to create a table
pub struct CreateTableRequest {
/// The name of the new table

View File

@@ -57,7 +57,7 @@ pub struct PermutationConfig {
}
/// Strategy for shuffling the data.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub enum ShuffleStrategy {
/// The data is randomly shuffled
///
@@ -78,10 +78,15 @@ pub enum ShuffleStrategy {
/// The data is not shuffled
///
/// This is useful for debugging and testing.
#[default]
None,
}
impl Default for ShuffleStrategy {
fn default() -> Self {
Self::None
}
}
/// Builder for creating a permutation table.
///
/// A permutation table is a table that stores split assignments and a shuffled order of rows. This

View File

@@ -27,10 +27,9 @@ use crate::{
pub const SPLIT_ID_COLUMN: &str = "split_id";
/// Strategy for assigning rows to splits
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub enum SplitStrategy {
/// All rows will have split id 0
#[default]
NoSplit,
/// Rows will be randomly assigned to splits
///
@@ -74,6 +73,15 @@ pub enum SplitStrategy {
Calculated { calculation: String },
}
// The default is not to split the data
//
// All data will be assigned to a single split.
impl Default for SplitStrategy {
fn default() -> Self {
Self::NoSplit
}
}
impl SplitStrategy {
pub fn validate(&self, num_rows: u64) -> Result<()> {
match self {

View File

@@ -192,14 +192,13 @@ pub use error::{Error, Result};
use lance_linalg::distance::DistanceType as LanceDistanceType;
pub use table::Table;
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Default)]
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(rename_all = "lowercase")]
pub enum DistanceType {
/// Euclidean distance. This is a very common distance metric that
/// accounts for both magnitude and direction when determining the distance
/// between vectors. l2 distance has a range of [0, ∞).
#[default]
L2,
/// Cosine distance. Cosine distance is a distance metric
/// calculated from the cosine similarity between two vectors. Cosine
@@ -221,6 +220,12 @@ pub enum DistanceType {
Hamming,
}
impl Default for DistanceType {
fn default() -> Self {
Self::L2
}
}
impl From<DistanceType> for LanceDistanceType {
fn from(value: DistanceType) -> Self {
match value {

View File

@@ -3,18 +3,11 @@
pub mod insert;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::util::stream_as_body;
use super::ARROW_STREAM_CONTENT_TYPE;
use crate::data::scannable::Scannable;
use crate::index::waiter::wait_for_index;
use crate::index::Index;
use crate::index::IndexStatistics;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::remote::util::stream_as_ipc;
use crate::table::query::create_multi_vector_plan;
use crate::table::AddColumnsResult;
use crate::table::AddResult;
use crate::table::AlterColumnsResult;
@@ -24,18 +17,8 @@ use crate::table::MergeResult;
use crate::table::Tags;
use crate::table::UpdateResult;
use crate::table::{AddDataMode, AnyQuery, Filter, TableStatistics};
use crate::utils::background_cache::BackgroundCache;
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{
error::Result,
index::{IndexBuilder, IndexConfig},
query::QueryExecutionOptions,
table::{
merge::MergeInsertBuilder, AddDataBuilder, BaseTable, OptimizeAction, OptimizeStats,
TableDefinition, UpdateBuilder,
},
};
use crate::{DistanceType, Error};
use crate::{DistanceType, Error, Table};
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
@@ -43,7 +26,8 @@ use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
use futures::TryStreamExt;
use futures::future::Shared;
use futures::{FutureExt, TryStreamExt};
use http::header::CONTENT_TYPE;
use http::{HeaderName, StatusCode};
use lance::arrow::json::{JsonDataType, JsonSchema};
@@ -58,15 +42,83 @@ use std::collections::HashMap;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::util::stream_as_body;
use super::ARROW_STREAM_CONTENT_TYPE;
use crate::index::waiter::wait_for_index;
use crate::{
error::Result,
index::{IndexBuilder, IndexConfig},
query::QueryExecutionOptions,
table::{
merge::MergeInsertBuilder, AddDataBuilder, BaseTable, OptimizeAction, OptimizeStats,
TableDefinition, UpdateBuilder,
},
};
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
type SharedSchemaFuture =
Shared<futures::future::BoxFuture<'static, std::result::Result<SchemaRef, Arc<Error>>>>;
enum SchemaState {
Empty,
Current(SchemaRef, Instant),
Refreshing {
previous: Option<(SchemaRef, Instant)>,
future: SharedSchemaFuture,
},
}
struct SchemaCache {
state: SchemaState,
/// Incremented on invalidation. Background fetches check this to avoid
/// overwriting with stale data after a concurrent invalidation.
generation: u64,
}
enum SchemaAction {
Return(SchemaRef),
Wait(SharedSchemaFuture),
}
impl SchemaState {
/// Returns the schema if it's fresh (not in the refresh window).
fn fresh_schema(&self) -> Option<SchemaRef> {
match self {
Self::Current(schema, cached_at) => {
let elapsed = clock::now().duration_since(*cached_at);
if elapsed < SCHEMA_CACHE_TTL - SCHEMA_CACHE_REFRESH_WINDOW {
Some(schema.clone())
} else {
None
}
}
Self::Refreshing {
previous: Some((schema, cached_at)),
..
} => {
let elapsed = clock::now().duration_since(*cached_at);
if elapsed < SCHEMA_CACHE_TTL - SCHEMA_CACHE_REFRESH_WINDOW {
Some(schema.clone())
} else {
None
}
}
_ => None,
}
}
}
pub struct RemoteTags<'a, S: HttpSend = Sender> {
inner: &'a RemoteTable<S>,
}
@@ -211,7 +263,7 @@ pub struct RemoteTable<S: HttpSend = Sender> {
version: RwLock<Option<u64>>,
location: RwLock<Option<String>>,
schema_cache: BackgroundCache<SchemaRef, Error>,
schema_cache: Arc<Mutex<SchemaCache>>,
}
impl<S: HttpSend> std::fmt::Debug for RemoteTable<S> {
@@ -239,7 +291,10 @@ impl<S: HttpSend> RemoteTable<S> {
server_version,
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
schema_cache: Arc::new(Mutex::new(SchemaCache {
state: SchemaState::Empty,
generation: 0,
})),
}
}
@@ -789,7 +844,9 @@ impl<S: HttpSend> RemoteTable<S> {
}
fn invalidate_schema_cache(&self) {
self.schema_cache.invalidate();
let mut cache = self.schema_cache.lock().unwrap();
cache.state = SchemaState::Empty;
cache.generation += 1;
}
fn handle_error_invalidation(&self, error: &Error) {
@@ -804,6 +861,119 @@ impl<S: HttpSend> RemoteTable<S> {
}
}
}
fn determine_schema_action(
&self,
cache: &mut SchemaCache,
version: Option<u64>,
) -> SchemaAction {
match &cache.state {
SchemaState::Empty => {
let (shared, _) = self.start_schema_fetch(cache, version, None);
SchemaAction::Wait(shared)
}
SchemaState::Current(schema, cached_at) => {
let elapsed = clock::now().duration_since(*cached_at);
if elapsed < SCHEMA_CACHE_TTL - SCHEMA_CACHE_REFRESH_WINDOW {
SchemaAction::Return(schema.clone())
} else if elapsed < SCHEMA_CACHE_TTL {
// In refresh window: start background fetch, return current value
let schema = schema.clone();
let previous = Some((schema.clone(), *cached_at));
let _ = self.start_schema_fetch(cache, version, previous);
SchemaAction::Return(schema)
} else {
// Expired: must wait for fetch
let previous = Some((schema.clone(), *cached_at));
let (shared, _) = self.start_schema_fetch(cache, version, previous);
SchemaAction::Wait(shared)
}
}
SchemaState::Refreshing { previous, future } => {
// If the background fetch already completed (spawned task hasn't
// run yet to update state), transition the state and re-evaluate.
if let Some(result) = future.peek() {
match result {
Ok(schema) => {
cache.state = SchemaState::Current(schema.clone(), clock::now());
}
Err(_) => {
cache.state = match previous.clone() {
Some((s, t)) => SchemaState::Current(s, t),
None => SchemaState::Empty,
};
}
}
return self.determine_schema_action(cache, version);
}
if let Some((schema, cached_at)) = previous {
if clock::now().duration_since(*cached_at) < SCHEMA_CACHE_TTL {
SchemaAction::Return(schema.clone())
} else {
SchemaAction::Wait(future.clone())
}
} else {
SchemaAction::Wait(future.clone())
}
}
}
}
fn start_schema_fetch(
&self,
cache: &mut SchemaCache,
version: Option<u64>,
previous: Option<(SchemaRef, Instant)>,
) -> (SharedSchemaFuture, u64) {
let client = self.client.clone();
let identifier = self.identifier.clone();
let table_name = self.name.clone();
let generation = cache.generation;
let shared = async move {
fetch_schema(&client, &identifier, &table_name, version)
.await
.map_err(Arc::new)
}
.boxed()
.shared();
// Spawn task to eagerly drive the future and update state on completion
let schema_cache = self.schema_cache.clone();
let fut_for_spawn = shared.clone();
tokio::spawn(async move {
let result = fut_for_spawn.await;
let mut cache = schema_cache.lock().unwrap();
// Only update if no invalidation has happened since we started
if cache.generation != generation {
return;
}
match result {
Ok(schema) => {
cache.state = SchemaState::Current(schema, clock::now());
}
Err(_) => {
// Revert to previous cached value if available
let prev = match &cache.state {
SchemaState::Refreshing { previous, .. } => previous.clone(),
_ => None,
};
cache.state = match prev {
Some((s, t)) => SchemaState::Current(s, t),
None => SchemaState::Empty,
};
}
}
});
cache.state = SchemaState::Refreshing {
previous,
future: shared.clone(),
};
(shared, generation)
}
}
#[derive(Deserialize)]
@@ -884,8 +1054,8 @@ impl<S: HttpSend> std::fmt::Display for RemoteTable<S> {
#[cfg(all(test, feature = "remote"))]
mod test_utils {
use super::*;
use crate::remote::client::test_utils::client_with_handler;
use crate::remote::client::test_utils::{client_with_handler_and_config, MockSender};
use crate::remote::client::test_utils::MockSender;
use crate::remote::client::test_utils::{client_with_handler, client_with_handler_and_config};
use crate::remote::ClientConfig;
impl RemoteTable<MockSender> {
@@ -903,7 +1073,10 @@ mod test_utils {
server_version: version.map(ServerVersion).unwrap_or_default(),
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
schema_cache: Arc::new(Mutex::new(SchemaCache {
state: SchemaState::Empty,
generation: 0,
})),
}
}
@@ -921,7 +1094,10 @@ mod test_utils {
server_version: ServerVersion::default(),
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
schema_cache: Arc::new(Mutex::new(SchemaCache {
state: SchemaState::Empty,
generation: 0,
})),
}
}
}
@@ -1021,21 +1197,28 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn schema(&self) -> Result<SchemaRef> {
if let Some(schema) = self.schema_cache.try_get() {
return Ok(schema);
// Fast path: check if cache is fresh (not even in refresh window)
{
let cache = self.schema_cache.lock().unwrap();
if let Some(schema) = cache.state.fresh_schema() {
return Ok(schema);
}
}
// Slow path: may need to fetch or start background refresh
let version = self.current_version().await;
let client = self.client.clone();
let identifier = self.identifier.clone();
let table_name = self.name.clone();
let action = {
let mut cache = self.schema_cache.lock().unwrap();
self.determine_schema_action(&mut cache, version)
};
self.schema_cache
.get(move || async move {
fetch_schema(&client, &identifier, &table_name, version).await
})
.await
.map_err(unwrap_shared_error)
match action {
SchemaAction::Return(schema) => Ok(schema),
SchemaAction::Wait(fut) => match fut.await {
Ok(schema) => Ok(schema),
Err(arc_err) => Err(unwrap_shared_error(arc_err)),
},
}
}
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
@@ -1126,7 +1309,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
create_multi_vector_plan(stream_execs)
Table::multi_vector_plan(stream_execs)
}
}
@@ -1146,7 +1329,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
let plan = create_multi_vector_plan(stream_execs)?;
let plan = Table::multi_vector_plan(stream_execs)?;
Ok(DatasetRecordBatchStream::new(execute_plan(
plan,
@@ -1874,6 +2057,42 @@ impl TryFrom<MergeInsertBuilder> for MergeInsertRequest {
}
}
// Clock module for testing with mock time
#[cfg(test)]
mod clock {
use std::cell::Cell;
use std::time::{Duration, Instant};
thread_local! {
static MOCK_NOW: Cell<Option<Instant>> = const { Cell::new(None) };
}
pub fn now() -> Instant {
MOCK_NOW.with(|mock| mock.get().unwrap_or_else(Instant::now))
}
pub fn advance_by(duration: Duration) {
MOCK_NOW.with(|mock| {
let current = mock.get().unwrap_or_else(Instant::now);
mock.set(Some(current + duration));
});
}
#[allow(dead_code)]
pub fn clear_mock() {
MOCK_NOW.with(|mock| mock.set(None));
}
}
#[cfg(not(test))]
mod clock {
use std::time::Instant;
pub fn now() -> Instant {
Instant::now()
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -1897,7 +2116,6 @@ mod tests {
use crate::index::vector::{IvfFlatIndexBuilder, IvfHnswSqIndexBuilder};
use crate::remote::db::DEFAULT_SERVER_VERSION;
use crate::remote::JSON_CONTENT_TYPE;
use crate::utils::background_cache::clock;
use crate::{
index::{vector::IvfPqIndexBuilder, Index, IndexStatistics, IndexType},
query::{ExecutableQuery, QueryBase},

File diff suppressed because it is too large Load Diff

View File

@@ -200,7 +200,7 @@ impl ExecutionPlan for InsertExec {
let new_dataset = CommitBuilder::new(dataset.clone())
.execute(merged_txn)
.await?;
ds_wrapper.update(new_dataset);
ds_wrapper.set_latest(new_dataset).await;
}
}

View File

@@ -2,499 +2,301 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{
sync::{Arc, Mutex},
time::Duration,
ops::{Deref, DerefMut},
sync::Arc,
time::{self, Duration, Instant},
};
use lance::{dataset::refs, Dataset};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::{error::Result, utils::background_cache::BackgroundCache, Error};
use crate::error::Result;
/// A wrapper around a [Dataset] that provides lazy-loading and consistency checks.
///
/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
#[derive(Debug, Clone)]
pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
/// A wrapper around a [Dataset] that provides consistency checks.
///
/// This can be cloned cheaply. Callers get an [`Arc<Dataset>`] from [`get()`](Self::get)
/// and call [`update()`](Self::update) after writes to store the new version.
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug, Clone)]
pub struct DatasetConsistencyWrapper {
state: Arc<Mutex<DatasetState>>,
consistency: ConsistencyMode,
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
}
/// The current dataset and whether it is pinned to a specific version.
#[derive(Debug, Clone)]
struct DatasetState {
dataset: Arc<Dataset>,
/// `Some(version)` = pinned to a specific version (time travel),
/// `None` = tracking latest.
pinned_version: Option<u64>,
}
impl DatasetRef {
/// Reload the dataset to the appropriate version.
async fn reload(&mut self) -> Result<()> {
match self {
Self::Latest {
dataset,
last_consistency_check,
..
} => {
dataset.checkout_latest().await?;
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
dataset.checkout_version(*version).await?;
}
}
Ok(())
}
#[derive(Debug, Clone)]
enum ConsistencyMode {
/// Only update table state when explicitly asked.
Lazy,
/// Always check for a new version on every read.
Strong,
/// Periodically check for new version in the background. If the table is being
/// regularly accessed, refresh will happen in the background. If the table is idle for a while,
/// the next access will trigger a refresh before returning the dataset.
///
/// read_consistency_interval = TTL
/// refresh_window = min(3s, TTL/4)
///
/// | t < TTL - refresh_window | t < TTL | t >= TTL |
/// | Return value | Background refresh & return value | syncronous refresh |
Eventual(BackgroundCache<Arc<Dataset>, Error>),
fn is_latest(&self) -> bool {
matches!(self, Self::Latest { .. })
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
};
Ok(())
}
}
}
async fn as_time_travel(&mut self, target_version: impl Into<refs::Ref>) -> Result<()> {
let target_ref = target_version.into();
match self {
Self::Latest { dataset, .. } => {
let new_dataset = dataset.checkout_version(target_ref.clone()).await?;
let version_value = new_dataset.version().version;
*self = Self::TimeTravel {
dataset: new_dataset,
version: version_value,
};
}
Self::TimeTravel { dataset, version } => {
let should_checkout = match &target_ref {
refs::Ref::Version(_, Some(target_ver)) => version != target_ver,
refs::Ref::Version(_, None) => true, // No specific version, always checkout
refs::Ref::VersionNumber(target_ver) => version != target_ver,
refs::Ref::Tag(_) => true, // Always checkout for tags
};
if should_checkout {
let new_dataset = dataset.checkout_version(target_ref).await?;
let version_value = new_dataset.version().version;
*self = Self::TimeTravel {
dataset: new_dataset,
version: version_value,
};
}
}
}
Ok(())
}
fn is_up_to_date(&self) -> bool {
match self {
Self::Latest {
read_consistency_interval,
last_consistency_check,
..
} => match (read_consistency_interval, last_consistency_check) {
(None, _) => true,
(Some(_), None) => false,
(Some(interval), Some(last_check)) => last_check.elapsed() < *interval,
},
Self::TimeTravel { dataset, version } => dataset.version().version == *version,
}
}
fn time_travel_version(&self) -> Option<u64> {
match self {
Self::Latest { .. } => None,
Self::TimeTravel { version, .. } => Some(*version),
}
}
fn set_latest(&mut self, dataset: Dataset) {
match self {
Self::Latest {
dataset: ref mut ds,
..
} => {
if dataset.manifest().version > ds.manifest().version {
*ds = dataset;
}
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
}
impl DatasetConsistencyWrapper {
/// Create a new wrapper in the latest version mode.
pub fn new_latest(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
let dataset = Arc::new(dataset);
let consistency = match read_consistency_interval {
Some(d) if d == Duration::ZERO => ConsistencyMode::Strong,
Some(d) => {
let refresh_window = std::cmp::min(std::time::Duration::from_secs(3), d / 4);
let cache = BackgroundCache::new(d, refresh_window);
cache.seed(dataset.clone());
ConsistencyMode::Eventual(cache)
}
None => ConsistencyMode::Lazy,
};
Self {
state: Arc::new(Mutex::new(DatasetState {
dataset,
pinned_version: None,
})),
consistency,
}
Self(Arc::new(RwLock::new(DatasetRef::Latest {
dataset,
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
})))
}
/// Get the current dataset.
/// Get an immutable reference to the dataset.
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetReadGuard {
guard: self.0.read().await,
})
}
/// Get a mutable reference to the dataset.
///
/// Behavior depends on the consistency mode:
/// - **Lazy** (`None`): returns the cached dataset immediately.
/// - **Strong** (`Some(ZERO)`): checks for a new version before returning.
/// - **Eventual** (`Some(d)` where `d > 0`): returns a cached value immediately
/// while refreshing in the background when the TTL expires.
///
/// If pinned to a specific version (time travel), always returns the
/// pinned dataset regardless of consistency mode.
pub async fn get(&self) -> Result<Arc<Dataset>> {
{
let state = self.state.lock().unwrap();
if state.pinned_version.is_some() {
return Ok(state.dataset.clone());
}
}
match &self.consistency {
ConsistencyMode::Eventual(bg_cache) => {
if let Some(dataset) = bg_cache.try_get() {
return Ok(dataset);
}
let state = self.state.clone();
bg_cache
.get(move || refresh_latest(state))
.await
.map_err(unwrap_shared_error)
}
ConsistencyMode::Strong => refresh_latest(self.state.clone()).await,
ConsistencyMode::Lazy => {
let state = self.state.lock().unwrap();
Ok(state.dataset.clone())
}
}
/// If the dataset is in time travel mode this will fail
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_mutable().await?;
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
}
/// Store a new dataset version after a write operation.
///
/// Only stores the dataset if its version is newer than the current one.
/// If the wrapper has since transitioned to time-travel mode (e.g. via a
/// concurrent [`as_time_travel`](Self::as_time_travel) call), the update
/// is silently ignored — the write already committed to storage.
pub fn update(&self, dataset: Dataset) {
let mut state = self.state.lock().unwrap();
if state.pinned_version.is_some() {
// A concurrent as_time_travel() beat us here. The write succeeded
// in storage, but since we're now pinned we don't advance the
// cached pointer.
return;
}
if dataset.manifest().version > state.dataset.manifest().version {
state.dataset = Arc::new(dataset);
}
drop(state);
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
bg_cache.invalidate();
}
/// Get a mutable reference to the dataset without requiring the
/// dataset to be in a Latest mode.
pub async fn get_mut_unchecked(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
}
/// Checkout a branch and track its HEAD for new versions.
pub async fn as_branch(&self, _branch: impl Into<String>) -> Result<()> {
todo!("Branch support not yet implemented")
}
/// Check that the dataset is in a mutable mode (Latest).
pub fn ensure_mutable(&self) -> Result<()> {
let state = self.state.lock().unwrap();
if state.pinned_version.is_some() {
Err(crate::Error::InvalidInput {
message: "table cannot be modified when a specific version is checked out"
.to_string(),
})
} else {
Ok(())
}
}
/// Returns the version, if in time travel mode, or None otherwise.
pub fn time_travel_version(&self) -> Option<u64> {
self.state.lock().unwrap().pinned_version
}
/// Convert into a wrapper in latest version mode.
pub async fn as_latest(&self) -> Result<()> {
let dataset = {
let state = self.state.lock().unwrap();
if state.pinned_version.is_none() {
return Ok(());
}
state.dataset.clone()
};
let latest_version = dataset.latest_version_id().await?;
let new_dataset = dataset.checkout_version(latest_version).await?;
let mut state = self.state.lock().unwrap();
if state.pinned_version.is_some() {
state.dataset = Arc::new(new_dataset);
state.pinned_version = None;
}
drop(state);
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
bg_cache.invalidate();
}
Ok(())
}
pub async fn as_time_travel(&self, target_version: impl Into<refs::Ref>) -> Result<()> {
let target_ref = target_version.into();
let (should_checkout, dataset) = {
let state = self.state.lock().unwrap();
let should = match state.pinned_version {
None => true,
Some(version) => match &target_ref {
refs::Ref::Version(_, Some(target_ver)) => version != *target_ver,
refs::Ref::Version(_, None) => true,
refs::Ref::VersionNumber(target_ver) => version != *target_ver,
refs::Ref::Tag(_) => true,
},
};
(should, state.dataset.clone())
};
if !should_checkout {
/// Convert into a wrapper in latest version mode
pub async fn as_latest(&self, read_consistency_interval: Option<Duration>) -> Result<()> {
if self.0.read().await.is_latest() {
return Ok(());
}
let new_dataset = dataset.checkout_version(target_ref).await?;
let version_value = new_dataset.version().version;
let mut write_guard = self.0.write().await;
if write_guard.is_latest() {
return Ok(());
}
let mut state = self.state.lock().unwrap();
state.dataset = Arc::new(new_dataset);
state.pinned_version = Some(version_value);
Ok(())
write_guard.as_latest(read_consistency_interval).await
}
pub async fn as_time_travel(&self, target_version: impl Into<refs::Ref>) -> Result<()> {
self.0.write().await.as_time_travel(target_version).await
}
/// Provide a known latest version of the dataset.
///
/// This is usually done after some write operation, which inherently will
/// have the latest version.
pub async fn set_latest(&self, dataset: Dataset) {
self.0.write().await.set_latest(dataset);
}
pub async fn reload(&self) -> Result<()> {
let (dataset, pinned_version) = {
let state = self.state.lock().unwrap();
(state.dataset.clone(), state.pinned_version)
};
self.0.write().await.reload().await
}
match pinned_version {
None => {
refresh_latest(self.state.clone()).await?;
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
bg_cache.invalidate();
}
}
Some(version) => {
if dataset.version().version == version {
return Ok(());
}
/// Returns the version, if in time travel mode, or None otherwise
pub async fn time_travel_version(&self) -> Option<u64> {
self.0.read().await.time_travel_version()
}
let new_dataset = dataset.checkout_version(version).await?;
pub async fn ensure_mutable(&self) -> Result<()> {
let dataset_ref = self.0.read().await;
match &*dataset_ref {
DatasetRef::Latest { .. } => Ok(()),
DatasetRef::TimeTravel { .. } => Err(crate::Error::InvalidInput {
message: "table cannot be modified when a specific version is checked out"
.to_string(),
}),
}
}
let mut state = self.state.lock().unwrap();
if state.pinned_version == Some(version) {
state.dataset = Arc::new(new_dataset);
}
async fn is_up_to_date(&self) -> bool {
self.0.read().await.is_up_to_date()
}
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
if !self.is_up_to_date().await {
// Re-check under write lock — another task may have reloaded
// while we waited for the lock.
let mut write_guard = self.0.write().await;
if !write_guard.is_up_to_date() {
write_guard.reload().await?;
}
}
Ok(())
}
}
async fn refresh_latest(state: Arc<Mutex<DatasetState>>) -> Result<Arc<Dataset>> {
let dataset = { state.lock().unwrap().dataset.clone() };
let mut ds = (*dataset).clone();
ds.checkout_latest().await?;
let new_arc = Arc::new(ds);
{
let mut state = state.lock().unwrap();
if state.pinned_version.is_none()
&& new_arc.manifest().version >= state.dataset.manifest().version
{
state.dataset = new_arc.clone();
}
}
Ok(new_arc)
pub struct DatasetReadGuard<'a> {
guard: RwLockReadGuard<'a, DatasetRef>,
}
fn unwrap_shared_error(arc: Arc<Error>) -> Error {
match Arc::try_unwrap(arc) {
Ok(err) => err,
Err(arc) => Error::Runtime {
message: arc.to_string(),
},
impl Deref for DatasetReadGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
pub struct DatasetWriteGuard<'a> {
guard: RwLockWriteGuard<'a, DatasetRef>,
}
impl Deref for DatasetWriteGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
impl DerefMut for DatasetWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut *self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use lance::{
dataset::{WriteMode, WriteParams},
io::ObjectStoreParams,
};
use lance::{dataset::WriteParams, io::ObjectStoreParams};
use super::*;
use crate::{connect, io::object_store::io_tracking::IoStatsHolder, table::WriteOptions};
async fn create_test_dataset(uri: &str) -> Dataset {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
Dataset::write(
RecordBatchIterator::new(vec![Ok(batch)], schema),
uri,
Some(WriteParams::default()),
)
.await
.unwrap()
}
async fn append_to_dataset(uri: &str) -> Dataset {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![4, 5, 6]))],
)
.unwrap();
Dataset::write(
RecordBatchIterator::new(vec![Ok(batch)], schema),
uri,
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap()
}
#[tokio::test]
async fn test_get_returns_dataset() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let version = ds.version().version;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
let ds1 = wrapper.get().await.unwrap();
let ds2 = wrapper.get().await.unwrap();
assert_eq!(ds1.version().version, version);
assert_eq!(ds2.version().version, version);
// Arc<Dataset> is independent — not borrowing from wrapper
drop(wrapper);
assert_eq!(ds1.version().version, version);
}
#[tokio::test]
async fn test_update_stores_newer_version() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds_v1 = create_test_dataset(uri).await;
assert_eq!(ds_v1.version().version, 1);
let wrapper = DatasetConsistencyWrapper::new_latest(ds_v1, None);
let ds_v2 = append_to_dataset(uri).await;
assert_eq!(ds_v2.version().version, 2);
wrapper.update(ds_v2);
let ds = wrapper.get().await.unwrap();
assert_eq!(ds.version().version, 2);
}
#[tokio::test]
async fn test_update_ignores_older_version() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds_v1 = create_test_dataset(uri).await;
let ds_v2 = append_to_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds_v2, None);
wrapper.update(ds_v1);
let ds = wrapper.get().await.unwrap();
assert_eq!(ds.version().version, 2);
}
#[tokio::test]
async fn test_ensure_mutable_allows_latest() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
assert!(wrapper.ensure_mutable().is_ok());
}
#[tokio::test]
async fn test_ensure_mutable_rejects_time_travel() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
wrapper.as_time_travel(1u64).await.unwrap();
assert!(wrapper.ensure_mutable().is_err());
}
#[tokio::test]
async fn test_time_travel_version() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
assert_eq!(wrapper.time_travel_version(), None);
wrapper.as_time_travel(1u64).await.unwrap();
assert_eq!(wrapper.time_travel_version(), Some(1));
}
#[tokio::test]
async fn test_as_latest_from_time_travel() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
wrapper.as_time_travel(1u64).await.unwrap();
assert!(wrapper.ensure_mutable().is_err());
wrapper.as_latest().await.unwrap();
assert!(wrapper.ensure_mutable().is_ok());
assert_eq!(wrapper.time_travel_version(), None);
}
#[tokio::test]
async fn test_lazy_consistency_never_refreshes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
let v1 = wrapper.get().await.unwrap().version().version;
// External write
append_to_dataset(uri).await;
// Lazy consistency should not pick up external write
let v_after = wrapper.get().await.unwrap().version().version;
assert_eq!(v1, v_after);
}
#[tokio::test]
async fn test_strong_consistency_always_refreshes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::ZERO));
let v1 = wrapper.get().await.unwrap().version().version;
// External write
append_to_dataset(uri).await;
// Strong consistency should pick up external write
let v_after = wrapper.get().await.unwrap().version().version;
assert_eq!(v_after, v1 + 1);
}
#[tokio::test]
async fn test_eventual_consistency_background_refresh() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::from_millis(200)));
// Populate the cache
let v1 = wrapper.get().await.unwrap().version().version;
assert_eq!(v1, 1);
// External write
append_to_dataset(uri).await;
// Should return cached value immediately (within TTL)
let v_cached = wrapper.get().await.unwrap().version().version;
assert_eq!(v_cached, 1);
// Wait for TTL to expire, then get() should trigger a refresh
tokio::time::sleep(Duration::from_millis(300)).await;
let v_after = wrapper.get().await.unwrap().version().version;
assert_eq!(v_after, 2);
}
#[tokio::test]
async fn test_eventual_consistency_update_invalidates_cache() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds_v1 = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds_v1, Some(Duration::from_secs(60)));
// Simulate a write that produces v2
let ds_v2 = append_to_dataset(uri).await;
wrapper.update(ds_v2);
// get() should return v2 immediately (update invalidated the bg_cache,
// and the mutex state was updated)
let v = wrapper.get().await.unwrap().version().version;
assert_eq!(v, 2);
}
#[tokio::test]
async fn test_iops_open_strong_consistency() {
let db = connect("memory://")
@@ -510,7 +312,7 @@ mod tests {
.create_empty_table("test", schema)
.write_options(WriteOptions {
lance_write_params: Some(WriteParams {
store_params: Some(lance::io::ObjectStoreParams {
store_params: Some(ObjectStoreParams {
object_store_wrapper: Some(Arc::new(io_stats.clone())),
..Default::default()
}),
@@ -530,31 +332,6 @@ mod tests {
assert_eq!(stats.read_iops, 1);
}
/// Regression test: a write that races with as_time_travel() must not panic.
///
/// Sequence: ensure_mutable() passes → as_time_travel() completes → write
/// calls update(). Previously the assert!() in update() would fire.
#[tokio::test]
async fn test_update_after_concurrent_time_travel_does_not_panic() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds_v1 = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds_v1, None);
// Simulate: as_time_travel() completes just before the write's update().
wrapper.as_time_travel(1u64).await.unwrap();
assert_eq!(wrapper.time_travel_version(), Some(1));
// The write already committed to storage; now it calls update().
// This must not panic, and the wrapper must stay pinned.
let ds_v2 = append_to_dataset(uri).await;
wrapper.update(ds_v2);
let ds = wrapper.get().await.unwrap();
assert_eq!(ds.version().version, 1);
}
/// Regression test: before the fix, the reload fast-path (no version change)
/// did not reset `last_consistency_check`, causing a list call on every
/// subsequent query once the interval expired.

View File

@@ -18,12 +18,17 @@ pub struct DeleteResult {
///
/// This logic was moved from NativeTable::delete to keep table.rs clean.
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
// We access the dataset from the table. Since this is in the same module hierarchy (super),
// and 'dataset' is pub(crate), we can access it.
let mut dataset = table.dataset.get_mut().await?;
// Perform the actual delete on the Lance dataset
dataset.delete(predicate).await?;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DeleteResult { version })
// Return the result with the new version
Ok(DeleteResult {
version: dataset.version().version,
})
}
#[cfg(test)]

View File

@@ -1,45 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use arrow_array::RecordBatchReader;
use futures::future::Either;
use futures::{FutureExt, TryFutureExt};
use lance::dataset::{
MergeInsertBuilder as LanceMergeInsertBuilder, WhenMatched, WhenNotMatchedBySource,
};
use serde::{Deserialize, Serialize};
use crate::error::{Error, Result};
use crate::Result;
use super::{BaseTable, NativeTable};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct MergeResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
#[serde(default)]
pub version: u64,
/// Number of inserted rows (for user statistics)
#[serde(default)]
pub num_inserted_rows: u64,
/// Number of updated rows (for user statistics)
#[serde(default)]
pub num_updated_rows: u64,
/// Number of deleted rows (for user statistics)
/// Note: This is different from internal references to 'deleted_rows', since we technically "delete" updated rows during processing.
/// However those rows are not shared with the user.
#[serde(default)]
pub num_deleted_rows: u64,
/// Number of attempts performed during the merge operation.
/// This includes the initial attempt plus any retries due to transaction conflicts.
/// A value of 1 means the operation succeeded on the first try.
#[serde(default)]
pub num_attempts: u32,
}
use super::{BaseTable, MergeResult};
/// A builder used to create and run a merge insert operation
///
@@ -156,172 +124,3 @@ impl MergeInsertBuilder {
self.table.clone().merge_insert(self, new_data).await
}
}
/// Internal implementation of the merge insert logic
///
/// This logic was moved from NativeTable::merge_insert to keep table.rs clean.
pub(crate) async fn execute_merge_insert(
table: &NativeTable,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
params.when_matched_update_all,
params.when_matched_update_all_filt,
) {
(false, _) => builder.when_matched(WhenMatched::DoNothing),
(true, None) => builder.when_matched(WhenMatched::UpdateAll),
(true, Some(filt)) => builder.when_matched(WhenMatched::update_if(&dataset, &filt)?),
};
if params.when_not_matched_insert_all {
builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
} else {
builder.when_not_matched(lance::dataset::WhenNotMatched::DoNothing);
}
if params.when_not_matched_by_source_delete {
let behavior = if let Some(filter) = params.when_not_matched_by_source_delete_filt {
WhenNotMatchedBySource::delete_if(dataset.as_ref(), &filter)?
} else {
WhenNotMatchedBySource::Delete
};
builder.when_not_matched_by_source(behavior);
} else {
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
}
builder.use_index(params.use_index);
let future = if let Some(timeout) = params.timeout {
let future = builder
.retry_timeout(timeout)
.try_build()?
.execute_reader(new_data);
Either::Left(tokio::time::timeout(timeout, future).map(|res| match res {
Ok(Ok((new_dataset, stats))) => Ok((new_dataset, stats)),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(Error::Runtime {
message: "merge insert timed out".to_string(),
}),
}))
} else {
let job = builder.try_build()?;
Either::Right(job.execute_reader(new_data).map_err(|e| e.into()))
};
let (new_dataset, stats) = future.await?;
let version = new_dataset.manifest().version;
table.dataset.update(new_dataset.as_ref().clone());
Ok(MergeResult {
version,
num_updated_rows: stats.num_updated_rows,
num_inserted_rows: stats.num_inserted_rows,
num_deleted_rows: stats.num_deleted_rows,
num_attempts: stats.num_attempts,
})
}
#[cfg(test)]
mod tests {
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
use crate::connect;
fn merge_insert_test_batches(offset: i32, age: i32) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new("age", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(offset..(offset + 10))),
Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(age, 10))),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
#[tokio::test]
async fn test_merge_insert() {
let conn = connect("memory://").execute().await.unwrap();
// Create a dataset with i=0..10
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("my_table", batches)
.execute()
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
// Create new data with i=5..15
let new_batches = merge_insert_test_batches(5, 1);
// Perform a "insert if not exists"
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_not_matched_insert_all();
let result = merge_insert_builder.execute(new_batches).await.unwrap();
// Only 5 rows should actually be inserted
assert_eq!(table.count_rows(None).await.unwrap(), 15);
assert_eq!(result.num_inserted_rows, 5);
assert_eq!(result.num_updated_rows, 0);
assert_eq!(result.num_deleted_rows, 0);
assert_eq!(result.num_attempts, 1);
// Create new data with i=15..25 (no id matches)
let new_batches = merge_insert_test_batches(15, 2);
// Perform a "bulk update" (should not affect anything)
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_matched_update_all(None);
merge_insert_builder.execute(new_batches).await.unwrap();
// No new rows should have been inserted
assert_eq!(table.count_rows(None).await.unwrap(), 15);
assert_eq!(
table.count_rows(Some("age = 2".to_string())).await.unwrap(),
0
);
// Conditional update that only replaces the age=0 data
let new_batches = merge_insert_test_batches(5, 3);
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_matched_update_all(Some("target.age = 0".to_string()));
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(
table.count_rows(Some("age = 3".to_string())).await.unwrap(),
5
);
}
#[tokio::test]
async fn test_merge_insert_use_index() {
let conn = connect("memory://").execute().await.unwrap();
// Create a dataset with i=0..10
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("my_table", batches)
.execute()
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
// Test use_index=true (default behavior)
let new_batches = merge_insert_test_batches(5, 1);
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_not_matched_insert_all();
merge_insert_builder.use_index(true);
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 15);
// Test use_index=false (force table scan)
let new_batches = merge_insert_test_batches(15, 2);
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_not_matched_insert_all();
merge_insert_builder.use_index(false);
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
}

View File

@@ -26,10 +26,8 @@ use crate::error::Result;
/// optimize different parts of the table on disk.
///
/// By default, it optimizes everything, as [`OptimizeAction::All`].
#[derive(Default)]
pub enum OptimizeAction {
/// Run all optimizations with default values
#[default]
All,
/// Compacts files in the dataset
///
@@ -86,6 +84,12 @@ pub enum OptimizeAction {
Index(OptimizeOptions),
}
impl Default for OptimizeAction {
fn default() -> Self {
Self::All
}
}
/// Statistics about the optimization.
#[derive(Debug, Default)]
pub struct OptimizeStats {
@@ -101,10 +105,12 @@ pub struct OptimizeStats {
/// This logic was moved from NativeTable to keep table.rs clean.
pub(crate) async fn optimize_indices(table: &NativeTable, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
dataset.optimize_indices(options).await?;
table.dataset.update(dataset);
table
.dataset
.get_mut()
.await?
.optimize_indices(options)
.await?;
Ok(())
}
@@ -125,9 +131,10 @@ pub(crate) async fn cleanup_old_versions(
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> Result<RemovalStats> {
table.dataset.ensure_mutable()?;
let dataset = table.dataset.get().await?;
Ok(dataset
Ok(table
.dataset
.get_mut()
.await?
.cleanup_old_versions(older_than, delete_unverified, error_if_tagged_old_versions)
.await?)
}
@@ -143,10 +150,8 @@ pub(crate) async fn compact_files_impl(
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
) -> Result<CompactionMetrics> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let metrics = compact_files(&mut dataset, options, remap_options).await?;
table.dataset.update(dataset);
let mut dataset_mut = table.dataset.get_mut().await?;
let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
Ok(metrics)
}

View File

@@ -1,739 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use super::NativeTable;
use crate::error::{Error, Result};
use crate::query::{
QueryExecutionOptions, QueryFilter, QueryRequest, Select, VectorQueryRequest, DEFAULT_TOP_K,
};
use crate::utils::{default_vector_column, TimeoutStream};
use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder};
use arrow::datatypes::{Float32Type, UInt8Type};
use arrow_array::Array;
use arrow_schema::{DataType, Schema};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::ExecutionPlan;
use futures::future::try_join_all;
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::scanner::Scanner;
use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan};
use lance_namespace::models::{
QueryTableRequest as NsQueryTableRequest, QueryTableRequestColumns,
QueryTableRequestFullTextQuery, QueryTableRequestVector, StringFtsQuery,
};
use lance_namespace::LanceNamespace;
#[derive(Debug, Clone)]
pub enum AnyQuery {
Query(QueryRequest),
VectorQuery(VectorQueryRequest),
}
//Decide between namespace or local
pub async fn execute_query(
table: &NativeTable,
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// If namespace client is configured, use server-side query execution
if let Some(ref namespace_client) = table.namespace_client {
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
}
execute_generic_query(table, query, options).await
}
pub async fn analyze_query_plan(
table: &NativeTable,
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<String> {
let plan = create_plan(table, query, options).await?;
Ok(lance_analyze_plan(plan, Default::default()).await?)
}
/// Local Execution Path (DataFusion)
async fn execute_generic_query(
table: &NativeTable,
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
let plan = create_plan(table, query, options.clone()).await?;
let inner = execute_plan(plan, Default::default())?;
let inner = if let Some(timeout) = options.timeout {
TimeoutStream::new_boxed(inner, timeout)
} else {
inner
};
Ok(DatasetRecordBatchStream::new(inner))
}
pub async fn create_plan(
table: &NativeTable,
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let query = match query {
AnyQuery::VectorQuery(query) => query.clone(),
AnyQuery::Query(query) => VectorQueryRequest::from_plain_query(query.clone()),
};
let ds_ref = table.dataset.get().await?;
let schema = ds_ref.schema();
let mut column = query.column.clone();
let mut query_vector = query.query_vector.first().cloned();
if query.query_vector.len() > 1 {
if column.is_none() {
// Infer a vector column with the same dimension of the query vector.
let arrow_schema = Schema::from(ds_ref.schema());
column = Some(default_vector_column(
&arrow_schema,
Some(query.query_vector[0].len() as i32),
)?);
}
let vector_field = schema.field(column.as_ref().unwrap()).unwrap();
if let DataType::List(_) = vector_field.data_type() {
// Multivector handling: concatenate into FixedSizeList<FixedSizeList<_>>
let vectors = query
.query_vector
.iter()
.map(|arr| arr.as_ref())
.collect::<Vec<_>>();
let dim = vectors[0].len();
let mut fsl_builder = FixedSizeListBuilder::with_capacity(
Float32Builder::with_capacity(dim),
dim as i32,
vectors.len(),
);
for vec in vectors {
fsl_builder
.values()
.append_slice(vec.as_primitive::<Float32Type>().values());
fsl_builder.append(true);
}
query_vector = Some(Arc::new(fsl_builder.finish()));
} else {
// Multiple query vectors: create a plan for each and union them
let query_vecs = query.query_vector.clone();
let plan_futures = query_vecs
.into_iter()
.map(|query_vector| {
let mut sub_query = query.clone();
sub_query.query_vector = vec![query_vector];
let options_ref = options.clone();
async move {
create_plan(table, &AnyQuery::VectorQuery(sub_query), options_ref).await
}
})
.collect::<Vec<_>>();
let plans = try_join_all(plan_futures).await?;
return create_multi_vector_plan(plans);
}
}
let mut scanner: Scanner = ds_ref.scan();
if let Some(query_vector) = query_vector {
let column = if let Some(col) = column {
col
} else {
let arrow_schema = Schema::from(ds_ref.schema());
default_vector_column(&arrow_schema, Some(query_vector.len() as i32))?
};
let (_, element_type) = lance::index::vector::utils::get_vector_type(schema, &column)?;
let is_binary = matches!(element_type, DataType::UInt8);
let top_k = query.base.limit.unwrap_or(DEFAULT_TOP_K) + query.base.offset.unwrap_or(0);
if is_binary {
let query_vector = arrow::compute::cast(&query_vector, &DataType::UInt8)?;
let query_vector = query_vector.as_primitive::<UInt8Type>();
scanner.nearest(&column, query_vector, top_k)?;
} else {
scanner.nearest(&column, query_vector.as_ref(), top_k)?;
}
scanner.minimum_nprobes(query.minimum_nprobes);
if let Some(maximum_nprobes) = query.maximum_nprobes {
scanner.maximum_nprobes(maximum_nprobes);
}
}
scanner.limit(
query.base.limit.map(|limit| limit as i64),
query.base.offset.map(|offset| offset as i64),
)?;
if let Some(ef) = query.ef {
scanner.ef(ef);
}
scanner.distance_range(query.lower_bound, query.upper_bound);
scanner.use_index(query.use_index);
scanner.prefilter(query.base.prefilter);
match query.base.select {
Select::Columns(ref columns) => {
scanner.project(columns.as_slice())?;
}
Select::Dynamic(ref select_with_transform) => {
scanner.project_with_transform(select_with_transform.as_slice())?;
}
Select::All => {}
}
if query.base.with_row_id {
scanner.with_row_id();
}
scanner.batch_size(options.max_batch_length as usize);
if query.base.fast_search {
scanner.fast_search();
}
if let Some(filter) = &query.base.filter {
match filter {
QueryFilter::Sql(sql) => {
scanner.filter(sql)?;
}
QueryFilter::Substrait(substrait) => {
scanner.filter_substrait(substrait)?;
}
QueryFilter::Datafusion(expr) => {
scanner.filter_expr(expr.clone());
}
}
}
if let Some(fts) = &query.base.full_text_search {
scanner.full_text_search(fts.clone())?;
}
if let Some(refine_factor) = query.refine_factor {
scanner.refine(refine_factor);
}
if let Some(distance_type) = query.distance_type {
scanner.distance_metric(distance_type.into());
}
if query.base.disable_scoring_autoprojection {
scanner.disable_scoring_autoprojection();
}
Ok(scanner.create_plan().await?)
}
//Helper functions below
// Take many execution plans and map them into a single plan that adds
// a query_index column and unions them.
pub(crate) fn create_multi_vector_plan(
plans: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if plans.is_empty() {
return Err(Error::InvalidInput {
message: "No plans provided".to_string(),
});
}
// Projection to keeping all existing columns
let first_plan = plans[0].clone();
let project_all_columns = first_plan
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
let expr = datafusion_physical_plan::expressions::Column::new(field.name().as_str(), i);
let expr = Arc::new(expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
(expr, field.name().clone())
})
.collect::<Vec<_>>();
let projected_plans = plans
.into_iter()
.enumerate()
.map(|(plan_i, plan)| {
let query_index = datafusion_common::ScalarValue::Int32(Some(plan_i as i32));
let query_index_expr = datafusion_physical_plan::expressions::Literal::new(query_index);
let query_index_expr =
Arc::new(query_index_expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
let mut projections = vec![(query_index_expr, "query_index".to_string())];
projections.extend_from_slice(&project_all_columns);
let projection = ProjectionExec::try_new(projections, plan).unwrap();
Arc::new(projection) as Arc<dyn datafusion_physical_plan::ExecutionPlan>
})
.collect::<Vec<_>>();
let unioned = UnionExec::try_new(projected_plans).map_err(|err| Error::Runtime {
message: err.to_string(),
})?;
// We require 1 partition in the final output
let repartitioned = RepartitionExec::try_new(
unioned,
datafusion_physical_plan::Partitioning::RoundRobinBatch(1),
)
.unwrap();
Ok(Arc::new(repartitioned))
}
/// Execute a query on the namespace server instead of locally.
async fn execute_namespace_query(
table: &NativeTable,
namespace_client: Arc<dyn LanceNamespace>,
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// Build table_id from namespace + table name
let mut table_id = table.namespace.clone();
table_id.push(table.name.clone());
// Convert AnyQuery to namespace QueryTableRequest
let mut ns_request = convert_to_namespace_query(query)?;
// Set the table ID on the request
ns_request.id = Some(table_id);
// Call the namespace query_table API
let response_bytes = namespace_client
.query_table(ns_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to execute server-side query: {}", e),
})?;
// Parse the Arrow IPC response into a RecordBatchStream
parse_arrow_ipc_response(response_bytes).await
}
/// Convert an AnyQuery to the namespace QueryTableRequest format.
fn convert_to_namespace_query(query: &AnyQuery) -> Result<NsQueryTableRequest> {
match query {
AnyQuery::VectorQuery(vq) => {
// Extract the query vector(s)
let vector = extract_query_vector(&vq.query_vector)?;
// Convert filter to SQL string
let filter = match &vq.base.filter {
Some(f) => Some(filter_to_sql(f)?),
None => None,
};
// Convert select to columns list
let columns = match &vq.base.select {
Select::All => None,
Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
column_names: Some(cols.clone()),
column_aliases: None,
})),
Select::Dynamic(_) => {
return Err(Error::NotSupported {
message:
"Dynamic column selection is not supported for server-side queries"
.to_string(),
});
}
};
// Check for unsupported features
if vq.base.reranker.is_some() {
return Err(Error::NotSupported {
message: "Reranker is not supported for server-side queries".to_string(),
});
}
// Convert FTS query if present
let full_text_query = vq.base.full_text_search.as_ref().map(|fts| {
let columns = fts.columns();
let columns_vec = if columns.is_empty() {
None
} else {
Some(columns.into_iter().collect())
};
Box::new(QueryTableRequestFullTextQuery {
string_query: Some(Box::new(StringFtsQuery {
query: fts.query.to_string(),
columns: columns_vec,
})),
structured_query: None,
})
});
Ok(NsQueryTableRequest {
id: None, // Will be set in namespace_query
k: vq.base.limit.unwrap_or(10) as i32,
vector: Box::new(vector),
vector_column: vq.column.clone(),
filter,
columns,
offset: vq.base.offset.map(|o| o as i32),
distance_type: vq.distance_type.map(|dt| dt.to_string()),
nprobes: Some(vq.minimum_nprobes as i32),
ef: vq.ef.map(|e| e as i32),
refine_factor: vq.refine_factor.map(|r| r as i32),
lower_bound: vq.lower_bound,
upper_bound: vq.upper_bound,
prefilter: Some(vq.base.prefilter),
fast_search: Some(vq.base.fast_search),
with_row_id: Some(vq.base.with_row_id),
bypass_vector_index: Some(!vq.use_index),
full_text_query,
..Default::default()
})
}
AnyQuery::Query(q) => {
// For non-vector queries, pass an empty vector (similar to remote table implementation)
if q.reranker.is_some() {
return Err(Error::NotSupported {
message: "Reranker is not supported for server-side query execution"
.to_string(),
});
}
let filter = q.filter.as_ref().map(filter_to_sql).transpose()?;
let columns = match &q.select {
Select::All => None,
Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
column_names: Some(cols.clone()),
column_aliases: None,
})),
Select::Dynamic(_) => {
return Err(Error::NotSupported {
message: "Dynamic columns are not supported for server-side query"
.to_string(),
});
}
};
// Handle full text search if present
let full_text_query = q.full_text_search.as_ref().map(|fts| {
let columns_vec = if fts.columns().is_empty() {
None
} else {
Some(fts.columns().iter().cloned().collect())
};
Box::new(QueryTableRequestFullTextQuery {
string_query: Some(Box::new(StringFtsQuery {
query: fts.query.to_string(),
columns: columns_vec,
})),
structured_query: None,
})
});
// Empty vector for non-vector queries
let vector = Box::new(QueryTableRequestVector {
single_vector: Some(vec![]),
multi_vector: None,
});
Ok(NsQueryTableRequest {
id: None, // Will be set by caller
vector,
k: q.limit.unwrap_or(10) as i32,
filter,
columns,
prefilter: Some(q.prefilter),
offset: q.offset.map(|o| o as i32),
vector_column: None, // No vector column for plain queries
with_row_id: Some(q.with_row_id),
bypass_vector_index: Some(true), // No vector index for plain queries
full_text_query,
..Default::default()
})
}
}
}
fn filter_to_sql(filter: &QueryFilter) -> Result<String> {
match filter {
QueryFilter::Sql(sql) => Ok(sql.clone()),
QueryFilter::Substrait(_) => Err(Error::NotSupported {
message: "Substrait filters are not supported for server-side queries".to_string(),
}),
QueryFilter::Datafusion(_) => Err(Error::NotSupported {
message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(),
}),
}
}
/// Extract query vector(s) from Arrow arrays into the namespace format.
fn extract_query_vector(
query_vectors: &[Arc<dyn arrow_array::Array>],
) -> Result<QueryTableRequestVector> {
if query_vectors.is_empty() {
return Err(Error::InvalidInput {
message: "Query vector is required for vector search".to_string(),
});
}
// Handle single vector case
if query_vectors.len() == 1 {
let arr = &query_vectors[0];
let single_vector = array_to_f32_vec(arr)?;
Ok(QueryTableRequestVector {
single_vector: Some(single_vector),
multi_vector: None,
})
} else {
// Handle multi-vector case
let multi_vector: Result<Vec<Vec<f32>>> =
query_vectors.iter().map(array_to_f32_vec).collect();
Ok(QueryTableRequestVector {
single_vector: None,
multi_vector: Some(multi_vector?),
})
}
}
/// Convert an Arrow array to a Vec<f32>.
fn array_to_f32_vec(arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
// Handle FixedSizeList (common for vectors)
if let Some(fsl) = arr
.as_any()
.downcast_ref::<arrow_array::FixedSizeListArray>()
{
let values = fsl.values();
if let Some(f32_arr) = values.as_any().downcast_ref::<arrow_array::Float32Array>() {
return Ok(f32_arr.values().to_vec());
}
}
// Handle direct Float32Array
if let Some(f32_arr) = arr.as_any().downcast_ref::<arrow_array::Float32Array>() {
return Ok(f32_arr.values().to_vec());
}
Err(Error::InvalidInput {
message: "Query vector must be Float32 type".to_string(),
})
}
/// Parse Arrow IPC response from the namespace server.
async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result<DatasetRecordBatchStream> {
use arrow_ipc::reader::StreamReader;
use std::io::Cursor;
let cursor = Cursor::new(bytes);
let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
message: format!("Failed to parse Arrow IPC response: {}", e),
})?;
// Collect all record batches
let schema = reader.schema();
let batches: Vec<_> = reader
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::Runtime {
message: format!("Failed to read Arrow IPC batches: {}", e),
})?;
// Create a stream from the batches
let stream = futures::stream::iter(batches.into_iter().map(Ok));
let record_batch_stream =
Box::pin(datafusion_physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream));
Ok(DatasetRecordBatchStream::new(record_batch_stream))
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use arrow_array::Float32Array;
use futures::TryStreamExt;
use std::sync::Arc;
use super::*;
use crate::query::QueryExecutionOptions;
#[test]
fn test_convert_to_namespace_query_vector() {
let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let vq = VectorQueryRequest {
base: QueryRequest {
limit: Some(10),
offset: Some(5),
filter: Some(QueryFilter::Sql("id > 0".to_string())),
select: Select::Columns(vec!["id".to_string()]),
..Default::default()
},
column: Some("vector".to_string()),
// We cast here to satisfy the struct definition
query_vector: vec![query_vector as Arc<dyn Array>],
minimum_nprobes: 20,
distance_type: Some(crate::DistanceType::L2),
..Default::default()
};
let any_query = AnyQuery::VectorQuery(vq);
let ns_request = convert_to_namespace_query(&any_query).unwrap();
assert_eq!(ns_request.k, 10);
assert_eq!(ns_request.offset, Some(5));
assert_eq!(ns_request.filter, Some("id > 0".to_string()));
assert_eq!(
ns_request
.columns
.as_ref()
.and_then(|c| c.column_names.as_ref()),
Some(&vec!["id".to_string()])
);
assert_eq!(ns_request.vector_column, Some("vector".to_string()));
assert_eq!(ns_request.distance_type, Some("l2".to_string()));
// Verify the vector data was extracted correctly
assert!(ns_request.vector.single_vector.is_some());
assert_eq!(
ns_request.vector.single_vector.as_ref().unwrap(),
&vec![1.0, 2.0, 3.0, 4.0]
);
}
#[test]
fn test_convert_to_namespace_query_plain_query() {
let q = QueryRequest {
limit: Some(20),
offset: Some(5),
filter: Some(QueryFilter::Sql("id > 5".to_string())),
select: Select::Columns(vec!["id".to_string()]),
with_row_id: true,
..Default::default()
};
let any_query = AnyQuery::Query(q);
let ns_request = convert_to_namespace_query(&any_query).unwrap();
assert_eq!(ns_request.k, 20);
assert_eq!(ns_request.offset, Some(5));
assert_eq!(ns_request.filter, Some("id > 5".to_string()));
assert_eq!(
ns_request
.columns
.as_ref()
.and_then(|c| c.column_names.as_ref()),
Some(&vec!["id".to_string()])
);
assert_eq!(ns_request.with_row_id, Some(true));
assert_eq!(ns_request.bypass_vector_index, Some(true));
assert!(ns_request.vector_column.is_none());
assert!(ns_request.vector.single_vector.as_ref().unwrap().is_empty());
}
#[tokio::test]
async fn test_execute_query_local_routing() {
use crate::connect;
use crate::table::query::execute_query;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
)
.unwrap();
let table = conn
.create_table("test_routing", vec![batch])
.execute()
.await
.unwrap();
let native_table = table.as_native().unwrap();
// Setup a request
let req = QueryRequest {
filter: Some(QueryFilter::Sql("id > 3".to_string())),
..Default::default()
};
let query = AnyQuery::Query(req);
// Action: Call execute_query directly
// This validates that execute_query correctly routes to the local DataFusion engine
// when table.namespace_client is None.
let stream = execute_query(native_table, &query, QueryExecutionOptions::default())
.await
.unwrap();
// Verify results
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
let count: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(count, 2); // 4 and 5
}
#[tokio::test]
async fn test_create_plan_multivector_structure() {
use arrow_array::{Float32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use crate::table::query::create_plan;
use crate::connect;
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vector",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 2),
false,
),
]));
let batch = RecordBatch::new_empty(schema.clone());
let table = conn
.create_table("test_plan", vec![batch])
.execute()
.await
.unwrap();
let native_table = table.as_native().unwrap();
// This triggers the "create_multi_vector_plan" logic branch
let q1 = Arc::new(Float32Array::from(vec![1.0, 2.0]));
let q2 = Arc::new(Float32Array::from(vec![3.0, 4.0]));
let req = VectorQueryRequest {
column: Some("vector".to_string()),
query_vector: vec![q1, q2],
..Default::default()
};
let query = AnyQuery::VectorQuery(req);
// Create the Plan
let plan = create_plan(native_table, &query, QueryExecutionOptions::default())
.await
.unwrap();
// formatting it allows us to see the hierarchy
let display = DisplayableExecutionPlan::new(plan.as_ref())
.indent(true)
.to_string();
// We expect a RepartitionExec wrapping a UnionExec
assert!(
display.contains("RepartitionExec"),
"Plan should include Repartitioning"
);
assert!(
display.contains("UnionExec"),
"Plan should include a Union of multiple searches"
);
// We expect the projection to add the 'query_index' column (logic inside multi_vector_plan)
assert!(
display.contains("query_index"),
"Plan should add query_index column"
);
}
}

View File

@@ -52,12 +52,11 @@ pub(crate) async fn execute_add_columns(
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
) -> Result<AddColumnsResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let mut dataset = table.dataset.get_mut().await?;
dataset.add_columns(transforms, read_columns, None).await?;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(AddColumnsResult { version })
Ok(AddColumnsResult {
version: dataset.version().version,
})
}
/// Internal implementation of the alter columns logic.
@@ -67,12 +66,11 @@ pub(crate) async fn execute_alter_columns(
table: &NativeTable,
alterations: &[ColumnAlteration],
) -> Result<AlterColumnsResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let mut dataset = table.dataset.get_mut().await?;
dataset.alter_columns(alterations).await?;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(AlterColumnsResult { version })
Ok(AlterColumnsResult {
version: dataset.version().version,
})
}
/// Internal implementation of the drop columns logic.
@@ -82,12 +80,11 @@ pub(crate) async fn execute_drop_columns(
table: &NativeTable,
columns: &[&str],
) -> Result<DropColumnsResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let mut dataset = table.dataset.get_mut().await?;
dataset.drop_columns(columns).await?;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DropColumnsResult { version })
Ok(DropColumnsResult {
version: dataset.version().version,
})
}
#[cfg(test)]

View File

@@ -78,13 +78,11 @@ pub(crate) async fn execute_update(
table: &NativeTable,
update: UpdateBuilder,
) -> Result<UpdateResult> {
table.dataset.ensure_mutable()?;
// 1. Snapshot the current dataset
let dataset = table.dataset.get().await?;
let dataset = table.dataset.get().await?.clone();
// 2. Initialize the Lance Core builder
let mut builder = LanceUpdateBuilder::new(dataset);
let mut builder = LanceUpdateBuilder::new(Arc::new(dataset));
// 3. Apply the filter (WHERE clause)
if let Some(predicate) = update.filter {
@@ -101,7 +99,10 @@ pub(crate) async fn execute_update(
let res = operation.execute().await?;
// 6. Update the table's view of the latest version
table.dataset.update(res.new_dataset.as_ref().clone());
table
.dataset
.set_latest(res.new_dataset.as_ref().clone())
.await;
Ok(UpdateResult {
rows_updated: res.rows_updated,

View File

@@ -1,8 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
pub(crate) mod background_cache;
use std::sync::Arc;
use arrow_array::RecordBatch;

View File

@@ -1,593 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! A cache that refreshes values in the background before they expire.
//!
//! See [`BackgroundCache`] for details.
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::future::{BoxFuture, Shared};
use futures::FutureExt;
type SharedFut<V, E> = Shared<BoxFuture<'static, Result<V, Arc<E>>>>;
enum State<V, E> {
Empty,
Current(V, clock::Instant),
Refreshing {
previous: Option<(V, clock::Instant)>,
future: SharedFut<V, E>,
},
}
impl<V: Clone, E> State<V, E> {
fn fresh_value(&self, ttl: Duration, refresh_window: Duration) -> Option<V> {
let fresh_threshold = ttl - refresh_window;
match self {
Self::Current(value, cached_at) => {
if clock::now().duration_since(*cached_at) < fresh_threshold {
Some(value.clone())
} else {
None
}
}
Self::Refreshing {
previous: Some((value, cached_at)),
..
} => {
if clock::now().duration_since(*cached_at) < fresh_threshold {
Some(value.clone())
} else {
None
}
}
_ => None,
}
}
}
struct CacheInner<V, E> {
state: State<V, E>,
/// Incremented on invalidation. Background fetches check this to avoid
/// overwriting with stale data after a concurrent invalidation.
generation: u64,
}
enum Action<V, E> {
Return(V),
Wait(SharedFut<V, E>),
}
/// A cache that refreshes values in the background before they expire.
///
/// The cache has three states:
/// - **Empty**: No cached value. The next [`get()`](Self::get) blocks until a fetch completes.
/// - **Current**: A valid cached value with a timestamp. Returns immediately if fresh.
/// - **Refreshing**: A fetch is in progress. Returns the previous value if still valid,
/// otherwise blocks until the fetch completes.
///
/// When the cached value enters the refresh window (close to TTL expiry),
/// [`get()`](Self::get) starts a background fetch and returns the current value
/// immediately. Multiple concurrent callers share a single in-flight fetch.
pub struct BackgroundCache<V, E> {
inner: Arc<Mutex<CacheInner<V, E>>>,
ttl: Duration,
refresh_window: Duration,
}
impl<V, E> std::fmt::Debug for BackgroundCache<V, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BackgroundCache")
.field("ttl", &self.ttl)
.field("refresh_window", &self.refresh_window)
.finish_non_exhaustive()
}
}
impl<V, E> Clone for BackgroundCache<V, E> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
ttl: self.ttl,
refresh_window: self.refresh_window,
}
}
}
impl<V, E> BackgroundCache<V, E>
where
V: Clone + Send + Sync + 'static,
E: Send + Sync + 'static,
{
pub fn new(ttl: Duration, refresh_window: Duration) -> Self {
assert!(
refresh_window < ttl,
"refresh_window ({refresh_window:?}) must be less than ttl ({ttl:?})"
);
Self {
inner: Arc::new(Mutex::new(CacheInner {
state: State::Empty,
generation: 0,
})),
ttl,
refresh_window,
}
}
/// Returns the cached value if it's fresh (not in the refresh window).
///
/// This is a cheap synchronous check useful as a fast path before
/// constructing a fetch closure for [`get()`](Self::get).
pub fn try_get(&self) -> Option<V> {
let cache = self.inner.lock().unwrap();
cache.state.fresh_value(self.ttl, self.refresh_window)
}
/// Get the cached value, fetching if needed.
///
/// The closure is called to create the fetch future only when a new fetch
/// is needed. If the cache already has an in-flight fetch, the closure is
/// not called and the caller joins the existing fetch.
pub async fn get<F, Fut>(&self, fetch: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<V, E>> + Send + 'static,
{
// Fast path: check if cache is fresh
{
let cache = self.inner.lock().unwrap();
if let Some(value) = cache.state.fresh_value(self.ttl, self.refresh_window) {
return Ok(value);
}
}
// Slow path
let mut fetch = Some(fetch);
let action = {
let mut cache = self.inner.lock().unwrap();
self.determine_action(&mut cache, &mut fetch)
};
match action {
Action::Return(value) => Ok(value),
Action::Wait(fut) => fut.await,
}
}
/// Pre-populate the cache with an initial value.
///
/// This avoids a blocking fetch on the first [`get()`](Self::get) call.
pub fn seed(&self, value: V) {
let mut cache = self.inner.lock().unwrap();
cache.state = State::Current(value, clock::now());
}
/// Invalidate the cache. The next [`get()`](Self::get) will start a fresh fetch.
///
/// Any in-flight background fetch from before this call will not update the
/// cache (the generation counter prevents stale writes).
pub fn invalidate(&self) {
let mut cache = self.inner.lock().unwrap();
cache.state = State::Empty;
cache.generation += 1;
}
fn determine_action<F, Fut>(
&self,
cache: &mut CacheInner<V, E>,
fetch: &mut Option<F>,
) -> Action<V, E>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<V, E>> + Send + 'static,
{
match &cache.state {
State::Empty => {
let f = fetch
.take()
.expect("fetch closure required for empty cache");
let shared = self.start_fetch(cache, f, None);
Action::Wait(shared)
}
State::Current(value, cached_at) => {
let elapsed = clock::now().duration_since(*cached_at);
if elapsed < self.ttl - self.refresh_window {
Action::Return(value.clone())
} else if elapsed < self.ttl {
// In refresh window: start background fetch, return current value
let value = value.clone();
let previous = Some((value.clone(), *cached_at));
if let Some(f) = fetch.take() {
// The spawned task inside start_fetch drives the future;
// we don't need to await the returned handle here.
drop(self.start_fetch(cache, f, previous));
}
Action::Return(value)
} else {
// Expired: must wait for fetch
let previous = Some((value.clone(), *cached_at));
let f = fetch
.take()
.expect("fetch closure required for expired cache");
let shared = self.start_fetch(cache, f, previous);
Action::Wait(shared)
}
}
State::Refreshing { previous, future } => {
// If the background fetch already completed (spawned task hasn't
// run yet to update state), transition the state and re-evaluate.
if let Some(result) = future.peek() {
match result {
Ok(value) => {
cache.state = State::Current(value.clone(), clock::now());
}
Err(_) => {
cache.state = match previous.clone() {
Some((v, t)) => State::Current(v, t),
None => State::Empty,
};
}
}
return self.determine_action(cache, fetch);
}
if let Some((value, cached_at)) = previous {
if clock::now().duration_since(*cached_at) < self.ttl {
Action::Return(value.clone())
} else {
Action::Wait(future.clone())
}
} else {
Action::Wait(future.clone())
}
}
}
}
fn start_fetch<F, Fut>(
&self,
cache: &mut CacheInner<V, E>,
fetch: F,
previous: Option<(V, clock::Instant)>,
) -> SharedFut<V, E>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<V, E>> + Send + 'static,
{
let generation = cache.generation;
let shared = async move { (fetch)().await.map_err(Arc::new) }
.boxed()
.shared();
// Spawn task to eagerly drive the future and update state on completion
let inner = self.inner.clone();
let fut_for_spawn = shared.clone();
tokio::spawn(async move {
let result = fut_for_spawn.await;
let mut cache = inner.lock().unwrap();
// Only update if no invalidation has happened since we started
if cache.generation != generation {
return;
}
match result {
Ok(value) => {
cache.state = State::Current(value, clock::now());
}
Err(_) => {
let prev = match &cache.state {
State::Refreshing { previous, .. } => previous.clone(),
_ => None,
};
cache.state = match prev {
Some((v, t)) => State::Current(v, t),
None => State::Empty,
};
}
}
});
cache.state = State::Refreshing {
previous,
future: shared.clone(),
};
shared
}
}
#[cfg(test)]
pub mod clock {
use std::cell::Cell;
use std::time::Duration;
// Re-export Instant so callers use the same type
pub use std::time::Instant;
thread_local! {
static MOCK_NOW: Cell<Option<Instant>> = const { Cell::new(None) };
}
pub fn now() -> Instant {
MOCK_NOW.with(|mock| mock.get().unwrap_or_else(Instant::now))
}
pub fn advance_by(duration: Duration) {
MOCK_NOW.with(|mock| {
let current = mock.get().unwrap_or_else(Instant::now);
mock.set(Some(current + duration));
});
}
#[allow(dead_code)]
pub fn clear_mock() {
MOCK_NOW.with(|mock| mock.set(None));
}
}
#[cfg(not(test))]
mod clock {
// Re-export Instant so callers use the same type
pub use std::time::Instant;
pub fn now() -> Instant {
Instant::now()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct TestError(String);
impl std::fmt::Display for TestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
const TEST_TTL: Duration = Duration::from_secs(30);
const TEST_REFRESH_WINDOW: Duration = Duration::from_secs(5);
fn new_cache() -> BackgroundCache<String, TestError> {
BackgroundCache::new(TEST_TTL, TEST_REFRESH_WINDOW)
}
fn ok_fetcher(
counter: Arc<AtomicUsize>,
value: &str,
) -> impl FnOnce() -> BoxFuture<'static, Result<String, TestError>> + Send + 'static {
let value = value.to_string();
move || {
counter.fetch_add(1, Ordering::SeqCst);
async move { Ok(value) }.boxed()
}
}
fn err_fetcher(
counter: Arc<AtomicUsize>,
msg: &str,
) -> impl FnOnce() -> BoxFuture<'static, Result<String, TestError>> + Send + 'static {
let msg = msg.to_string();
move || {
counter.fetch_add(1, Ordering::SeqCst);
async move { Err(TestError(msg)) }.boxed()
}
}
#[tokio::test]
async fn test_basic_caching() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
let v1 = cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
assert_eq!(v1, "hello");
assert_eq!(count.load(Ordering::SeqCst), 1);
// Second call triggers peek transition to Current, returns cached
let v2 = cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
assert_eq!(v2, "hello");
assert_eq!(count.load(Ordering::SeqCst), 1);
// Third call still cached
let v3 = cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
assert_eq!(v3, "hello");
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_try_get_returns_none_when_empty() {
let cache: BackgroundCache<String, TestError> = new_cache();
assert!(cache.try_get().is_none());
}
#[tokio::test]
async fn test_try_get_returns_value_when_fresh() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
// Peek transition
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
assert_eq!(cache.try_get().unwrap(), "hello");
}
#[tokio::test]
async fn test_try_get_returns_none_in_refresh_window() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap(); // peek
clock::advance_by(Duration::from_secs(26));
assert!(cache.try_get().is_none());
}
#[tokio::test]
async fn test_ttl_expiration() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap(); // peek
assert_eq!(count.load(Ordering::SeqCst), 1);
clock::advance_by(Duration::from_secs(31));
let v = cache.get(ok_fetcher(count.clone(), "v2")).await.unwrap();
assert_eq!(v, "v2");
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_invalidate_forces_refetch() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap(); // peek
assert_eq!(count.load(Ordering::SeqCst), 1);
cache.invalidate();
let v = cache.get(ok_fetcher(count.clone(), "v2")).await.unwrap();
assert_eq!(v, "v2");
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_concurrent_get_single_fetch() {
let cache = Arc::new(new_cache());
let count = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let cache = cache.clone();
let count = count.clone();
handles.push(tokio::spawn(async move {
cache.get(ok_fetcher(count, "hello")).await.unwrap()
}));
}
let results: Vec<String> = futures::future::try_join_all(handles).await.unwrap();
for r in &results {
assert_eq!(r, "hello");
}
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_background_refresh_in_window() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
// Populate and transition to Current
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap(); // peek
assert_eq!(count.load(Ordering::SeqCst), 1);
// Move into refresh window
clock::advance_by(Duration::from_secs(26));
// Returns cached value and starts background fetch
let v = cache.get(ok_fetcher(count.clone(), "v2")).await.unwrap();
assert_eq!(v, "v1"); // Still old value
assert_eq!(count.load(Ordering::SeqCst), 1); // bg task hasn't run yet
// Advance past TTL to force waiting on the shared future
clock::advance_by(Duration::from_secs(30));
let v = cache.get(ok_fetcher(count.clone(), "v3")).await.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 2);
assert_eq!(v, "v2"); // Got the bg refresh result
}
#[tokio::test]
async fn test_no_duplicate_background_refreshes() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
// Populate and transition to Current
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap(); // peek
assert_eq!(count.load(Ordering::SeqCst), 1);
// Move into refresh window
clock::advance_by(Duration::from_secs(26));
// Multiple calls should all return cached, only one bg fetch
for _ in 0..5 {
let v = cache.get(ok_fetcher(count.clone(), "v2")).await.unwrap();
assert_eq!(v, "v1");
}
// Drive the shared future to completion
clock::advance_by(Duration::from_secs(30));
cache.get(ok_fetcher(count.clone(), "v3")).await.unwrap();
// Only 1 additional fetch (the background refresh)
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_background_refresh_error_preserves_cache() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
// Populate and transition to Current
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap(); // peek
assert_eq!(count.load(Ordering::SeqCst), 1);
// Move into refresh window
clock::advance_by(Duration::from_secs(26));
// Start bg refresh that will fail, returns cached value
let v = cache.get(err_fetcher(count.clone(), "fail")).await.unwrap();
assert_eq!(v, "v1");
// Still in refresh window, previous is valid
let v = cache.get(err_fetcher(count.clone(), "fail")).await.unwrap();
assert_eq!(v, "v1");
// Advance past TTL to drive the failed future
clock::advance_by(Duration::from_secs(30));
// The peek error path restores previous, but it's expired,
// so a new fetch is needed. This one also fails.
let result = cache.get(err_fetcher(count.clone(), "fail again")).await;
assert!(result.is_err());
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_invalidation_during_fetch_prevents_stale_update() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
// Populate and transition to Current
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "v1")).await.unwrap(); // peek
// Move into refresh window to start background fetch
clock::advance_by(Duration::from_secs(26));
cache.get(ok_fetcher(count.clone(), "stale")).await.unwrap();
// Invalidate before bg task completes
cache.invalidate();
// Advance past TTL
clock::advance_by(Duration::from_secs(30));
// Should get fresh data, not the stale background result
let v = cache.get(ok_fetcher(count.clone(), "fresh")).await.unwrap();
assert_eq!(v, "fresh");
}
}