Compare commits

..

12 Commits

Author SHA1 Message Date
Will Jones
66804e99fc fix(python): use correct exception types in namespace tests (#3206)
## Summary
- Namespace tests expected `RuntimeError` for table-not-found and
namespace-not-empty cases, but `lance_namespace` raises
`TableNotFoundError` and `NamespaceNotEmptyError` which inherit from
`Exception`, not `RuntimeError`.
- Updated `pytest.raises` to use the correct exception types.

## Test plan
- [x] CI passes on `test_namespace.py`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 12:55:54 -07:00
lennylxx
9f85d4c639 fix(embeddings): add missing urllib.request import in url_retrieve (#3190)
url_retrieve() calls urllib.request.urlopen() but only urllib.error was
imported, causing AttributeError for any HTTP URL input. This affects
open-clip, siglip, and jinaai embedding functions when processing image
URLs.

The bug has existed since the embeddings API refactor (#580) but was
masked because most users pass local file paths or bytes rather than
HTTP URLs.
2026-03-30 12:03:44 -07:00
Vedant Madane
1ba19d728e feat(node): support Float16, Float64, and Uint8 vector queries (#3193)
Fixes #2716

## Summary

Add support for querying with Float16Array, Float64Array, and Uint8Array
vectors in the Node.js SDK, eliminating precision loss from the previous
\Float32Array.from()\ conversion.

## Implementation

Follows @wjones127's [5-step
plan](https://github.com/lancedb/lancedb/issues/2716#issuecomment-3447750543):

### Rust (\
odejs/src/query.rs\)

1. \ytes_to_arrow_array(data: Uint8Array, dtype: String)\ helper that:
   - Creates an Arrow \Buffer\ from the raw bytes
   - Wraps it in a typed \ScalarBuffer<T>\ based on the dtype enum
   - Constructs a \PrimitiveArray\ and returns \Arc<dyn Array>\
2. \
earest_to_raw(data, dtype)\ and \dd_query_vector_raw(data, dtype)\ NAPI
methods that pass the type-erased array to the core \
earest_to\/\dd_query_vector\ which already accept \impl
IntoQueryVector\ for \Arc<dyn Array>\

### TypeScript (\
odejs/lancedb/query.ts\, \rrow.ts\)

3. Extended \IntoVector\ type to include \Uint8Array\ (and
\Float16Array\ via runtime check for Node 22+)
4. \xtractVectorBuffer()\ helper detects non-Float32 typed arrays and
extracts their underlying byte buffer + dtype string
5. \
earestTo()\ and \ddQueryVector()\ route through the raw NAPI path when
the input is Float16/Float64/Uint8

### Backward compatibility

Existing \Float32Array\ and \
umber[]\ inputs are unchanged -- they still use the original \
earest_to(Float32Array)\ NAPI method. The new raw path is only used when
a non-Float32 typed array is detected.

## Usage

\\\	ypescript
// Float16Array (Node 22+) -- no precision loss
const f16vec = new Float16Array([0.1, 0.2, 0.3]);
const results = await
table.query().nearestTo(f16vec).limit(10).toArray();

// Float64Array -- no precision loss
const f64vec = new Float64Array([0.1, 0.2, 0.3]);
const results = await
table.query().nearestTo(f64vec).limit(10).toArray();

// Uint8Array (binary embeddings)
const u8vec = new Uint8Array([1, 0, 1, 1, 0]);
const results = await
table.query().nearestTo(u8vec).limit(10).toArray();

// Existing usage unchanged
const results = await table.query().nearestTo([0.1, 0.2,
0.3]).limit(10).toArray();
\\\

## Note on dependencies

The Rust side uses \rrow_array\, \rrow_buffer\, and \half\ crates.
These should already be in the dependency tree via \lancedb\ core, but
\Cargo.toml\ may need explicit entries for \half\ and the arrow
sub-crates in the nodejs workspace.

---------

Signed-off-by: Vedant Madane <6527493+VedantMadane@users.noreply.github.com>
Co-authored-by: Will Jones <willjones127@gmail.com>
2026-03-30 11:15:35 -07:00
lif
4c44587af0 fix: table.add(mode='overwrite') infers vector column types (#3184)
Fixes #3183

## Summary

When `table.add(mode='overwrite')` is called, PyArrow infers input data
types (e.g. `list<double>`) which differ from the original table schema
(e.g. `fixed_size_list<float32>`). Previously, overwrite mode bypassed
`cast_to_table_schema()` entirely, so the inferred types replaced the
original schema, breaking vector search.

This fix builds a merged target schema for overwrite: columns present in
the existing table schema keep their original types, while columns
unique to the input pass through as-is. This way
`cast_to_table_schema()` is applied unconditionally, preserving vector
column types without blocking schema evolution.

## Changes

- `rust/lancedb/src/table/add_data.rs`: For overwrite mode, construct a
target schema by matching input columns against the existing table
schema, then cast. Non-overwrite (append) path is unchanged.
- Added `test_add_overwrite_preserves_vector_type` test that creates a
table with `fixed_size_list<float32>`, overwrites with `list<double>`
input, and asserts the original type is preserved.

## Test Plan

- `cargo test --features remote -p lancedb -- test_add_overwrite` — all
4 overwrite tests pass
- Full suite: 454 passed, 2 failed (pre-existing `remote::retry` flakes
unrelated to this change)

---------

Signed-off-by: majiayu000 <1835304752@qq.com>
2026-03-30 10:57:33 -07:00
lennylxx
1d1cafb59c fix(python): don't assign dict.update() return value in _sanitize_data (#3198)
dict.update() mutates in place and returns None. Assigning its result
caused with_metadata(None) to strip all schema metadata when embedding
metadata was merged during create_table with embedding_functions.
2026-03-30 10:15:45 -07:00
aikido-autofix[bot]
4714598155 ci: mitigate template injection attack in build_linux_wheel (#3195)
This patch mitigates template injection vulnerabilities in GitHub
Workflows by replacing direct references with an environment variable.

Aikido used AI to generate this PR.

High confidence: Aikido has a robust set of benchmarks for similar
fixes, and they are proven to be effective.

Co-authored-by: aikido-autofix[bot] <119856028+aikido-autofix[bot]@users.noreply.github.com>
2026-03-30 09:29:24 -07:00
lennylxx
74f457a0f2 fix(rust): handle Mutex lock poisoning gracefully across codebase (#3196)
Replace ~30 production `lock().unwrap()` calls that would cascade-panic
on a poisoned Mutex. Functions returning `Result` now propagate the
poison as an error via `?` (leveraging the existing `From<PoisonError>`
impl). Functions without a `Result` return recover via
`unwrap_or_else(|e| e.into_inner())`, which is safe because the guarded
data (counters, caches, RNG state) remains logically valid after a
panic.
2026-03-30 09:25:18 -07:00
Dan Tasse
cca6a7c989 fix: raise instead of return ValueError (#3189)
These couple of cases used to return ValueError; should raise it
instead.
2026-03-25 18:49:29 -07:00
Lance Release
ad96489114 Bump version: 0.27.2-beta.0 → 0.27.2-beta.1 2026-03-25 16:22:09 +00:00
Lance Release
76429730c0 Bump version: 0.30.2-beta.0 → 0.30.2-beta.1 2026-03-25 16:21:26 +00:00
Weston Pace
874b74dd3c feat: update lance dependency to v4.0.0-rc.3 (#3187)
## Summary
- Update all lance workspace dependencies from v3.0.1 (crates.io) to
v4.0.0-rc.3 (git tag)
- Pin AWS SDK transitive dependencies to versions compatible with Rust
1.91.0 MSRV

## Test plan
- [x] `cargo check --features remote --tests --examples` passes
- [x] `cargo clippy --features remote --tests --examples` passes
- [x] Python bindings compile (`cargo check -p lancedb-python`)
- [ ] CI passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 09:20:29 -07:00
Lance Release
61de47f3a5 Bump version: 0.27.1 → 0.27.2-beta.0 2026-03-25 03:23:28 +00:00
41 changed files with 1589 additions and 1299 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "0.27.1" current_version = "0.27.2-beta.1"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@@ -23,8 +23,10 @@ runs:
steps: steps:
- name: CONFIRM ARM BUILD - name: CONFIRM ARM BUILD
shell: bash shell: bash
env:
ARM_BUILD: ${{ inputs.arm-build }}
run: | run: |
echo "ARM BUILD: ${{ inputs.arm-build }}" echo "ARM BUILD: $ARM_BUILD"
- name: Build x86_64 Manylinux wheel - name: Build x86_64 Manylinux wheel
if: ${{ inputs.arm-build == 'false' }} if: ${{ inputs.arm-build == 'false' }}
uses: PyO3/maturin-action@v1 uses: PyO3/maturin-action@v1

2119
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0" rust-version = "1.91.0"
[workspace.dependencies] [workspace.dependencies]
lance = { version = "=3.0.1", default-features = false } lance = { "version" = "=4.0.0-rc.3", default-features = false, "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { version = "=3.0.1" } lance-core = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { version = "=3.0.1" } lance-datagen = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { version = "=3.0.1" } lance-file = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { version = "=3.0.1", default-features = false } lance-io = { "version" = "=4.0.0-rc.3", default-features = false, "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { version = "=3.0.1" } lance-index = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { version = "=3.0.1" } lance-linalg = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { version = "=3.0.1" } lance-namespace = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { version = "=3.0.1", default-features = false } lance-namespace-impls = { "version" = "=4.0.0-rc.3", default-features = false, "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { version = "=3.0.1" } lance-table = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { version = "=3.0.1" } lance-testing = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { version = "=3.0.1" } lance-datafusion = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { version = "=3.0.1" } lance-encoding = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { version = "=3.0.1" } lance-arrow = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8" ahash = "0.8"
# Note that this one does not include pyarrow # Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false } arrow = { version = "57.2", optional = false }

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency> <dependency>
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId> <artifactId>lancedb-core</artifactId>
<version>0.27.1</version> <version>0.27.2-beta.1</version>
</dependency> </dependency>
``` ```

View File

@@ -52,7 +52,7 @@ new EmbeddingFunction<T, M>(): EmbeddingFunction<T, M>
### computeQueryEmbeddings() ### computeQueryEmbeddings()
```ts ```ts
computeQueryEmbeddings(data): Promise<number[] | Float32Array | Float64Array> computeQueryEmbeddings(data): Promise<number[] | Uint8Array | Float32Array | Float64Array>
``` ```
Compute the embeddings for a single query Compute the embeddings for a single query
@@ -63,7 +63,7 @@ Compute the embeddings for a single query
#### Returns #### Returns
`Promise`&lt;`number`[] \| `Float32Array` \| `Float64Array`&gt; `Promise`&lt;`number`[] \| `Uint8Array` \| `Float32Array` \| `Float64Array`&gt;
*** ***

View File

@@ -37,7 +37,7 @@ new TextEmbeddingFunction<M>(): TextEmbeddingFunction<M>
### computeQueryEmbeddings() ### computeQueryEmbeddings()
```ts ```ts
computeQueryEmbeddings(data): Promise<number[] | Float32Array | Float64Array> computeQueryEmbeddings(data): Promise<number[] | Uint8Array | Float32Array | Float64Array>
``` ```
Compute the embeddings for a single query Compute the embeddings for a single query
@@ -48,7 +48,7 @@ Compute the embeddings for a single query
#### Returns #### Returns
`Promise`&lt;`number`[] \| `Float32Array` \| `Float64Array`&gt; `Promise`&lt;`number`[] \| `Uint8Array` \| `Float32Array` \| `Float64Array`&gt;
#### Overrides #### Overrides

View File

@@ -7,5 +7,10 @@
# Type Alias: IntoVector # Type Alias: IntoVector
```ts ```ts
type IntoVector: Float32Array | Float64Array | number[] | Promise<Float32Array | Float64Array | number[]>; type IntoVector:
| Float32Array
| Float64Array
| Uint8Array
| number[]
| Promise<Float32Array | Float64Array | Uint8Array | number[]>;
``` ```

View File

@@ -8,7 +8,7 @@
<parent> <parent>
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.27.1-final.0</version> <version>0.27.2-beta.1</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.27.1-final.0</version> <version>0.27.2-beta.1</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description> <description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package] [package]
name = "lancedb-nodejs" name = "lancedb-nodejs"
edition.workspace = true edition.workspace = true
version = "0.27.1" version = "0.27.2-beta.1"
license.workspace = true license.workspace = true
description.workspace = true description.workspace = true
repository.workspace = true repository.workspace = true
@@ -15,6 +15,8 @@ crate-type = ["cdylib"]
async-trait.workspace = true async-trait.workspace = true
arrow-ipc.workspace = true arrow-ipc.workspace = true
arrow-array.workspace = true arrow-array.workspace = true
arrow-buffer = "57.2"
half.workspace = true
arrow-schema.workspace = true arrow-schema.workspace = true
env_logger.workspace = true env_logger.workspace = true
futures.workspace = true futures.workspace = true

View File

@@ -0,0 +1,110 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import * as tmp from "tmp";
import { type Table, connect } from "../lancedb";
import {
Field,
FixedSizeList,
Float32,
Int64,
Schema,
makeArrowTable,
} from "../lancedb/arrow";
describe("Vector query with different typed arrays", () => {
let tmpDir: tmp.DirResult;
afterEach(() => {
tmpDir?.removeCallback();
});
async function createFloat32Table(): Promise<Table> {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const db = await connect(tmpDir.name);
const schema = new Schema([
new Field("id", new Int64(), true),
new Field(
"vec",
new FixedSizeList(2, new Field("item", new Float32())),
true,
),
]);
const data = makeArrowTable(
[
{ id: 1n, vec: [1.0, 0.0] },
{ id: 2n, vec: [0.0, 1.0] },
{ id: 3n, vec: [1.0, 1.0] },
],
{ schema },
);
return db.createTable("test_f32", data);
}
it("should search with Float32Array (baseline)", async () => {
const table = await createFloat32Table();
const results = await table
.query()
.nearestTo(new Float32Array([1.0, 0.0]))
.limit(1)
.toArray();
expect(results.length).toBe(1);
expect(Number(results[0].id)).toBe(1);
});
it("should search with number[] (backward compat)", async () => {
const table = await createFloat32Table();
const results = await table
.query()
.nearestTo([1.0, 0.0])
.limit(1)
.toArray();
expect(results.length).toBe(1);
expect(Number(results[0].id)).toBe(1);
});
it("should search with Float64Array via raw path", async () => {
const table = await createFloat32Table();
const results = await table
.query()
.nearestTo(new Float64Array([1.0, 0.0]))
.limit(1)
.toArray();
expect(results.length).toBe(1);
expect(Number(results[0].id)).toBe(1);
});
it("should add multiple query vectors with Float64Array", async () => {
const table = await createFloat32Table();
const results = await table
.query()
.nearestTo(new Float64Array([1.0, 0.0]))
.addQueryVector(new Float64Array([0.0, 1.0]))
.limit(2)
.toArray();
expect(results.length).toBeGreaterThanOrEqual(2);
});
// Float16Array is only available in Node 22+; not in TypeScript's standard lib yet
const float16ArrayCtor = (globalThis as unknown as Record<string, unknown>)
.Float16Array as (new (values: number[]) => unknown) | undefined;
const hasFloat16 = float16ArrayCtor !== undefined;
const f16it = hasFloat16 ? it : it.skip;
f16it("should search with Float16Array via raw path", async () => {
const table = await createFloat32Table();
const results = await table
.query()
.nearestTo(new float16ArrayCtor!([1.0, 0.0]) as Float32Array)
.limit(1)
.toArray();
expect(results.length).toBe(1);
expect(Number(results[0].id)).toBe(1);
});
});

View File

@@ -117,8 +117,9 @@ export type TableLike =
export type IntoVector = export type IntoVector =
| Float32Array | Float32Array
| Float64Array | Float64Array
| Uint8Array
| number[] | number[]
| Promise<Float32Array | Float64Array | number[]>; | Promise<Float32Array | Float64Array | Uint8Array | number[]>;
export type MultiVector = IntoVector[]; export type MultiVector = IntoVector[];
@@ -126,14 +127,48 @@ export function isMultiVector(value: unknown): value is MultiVector {
return Array.isArray(value) && isIntoVector(value[0]); return Array.isArray(value) && isIntoVector(value[0]);
} }
// Float16Array is not in TypeScript's standard lib yet; access dynamically
type Float16ArrayCtor = new (
...args: unknown[]
) => { buffer: ArrayBuffer; byteOffset: number; byteLength: number };
const float16ArrayCtor = (globalThis as unknown as Record<string, unknown>)
.Float16Array as Float16ArrayCtor | undefined;
export function isIntoVector(value: unknown): value is IntoVector { export function isIntoVector(value: unknown): value is IntoVector {
return ( return (
value instanceof Float32Array || value instanceof Float32Array ||
value instanceof Float64Array || value instanceof Float64Array ||
value instanceof Uint8Array ||
(float16ArrayCtor !== undefined && value instanceof float16ArrayCtor) ||
(Array.isArray(value) && !Array.isArray(value[0])) (Array.isArray(value) && !Array.isArray(value[0]))
); );
} }
/**
* Extract the underlying byte buffer and data type from a typed array
* for passing to the Rust NAPI layer without precision loss.
*/
export function extractVectorBuffer(
vector: Float32Array | Float64Array | Uint8Array,
): { data: Uint8Array; dtype: string } | null {
if (float16ArrayCtor !== undefined && vector instanceof float16ArrayCtor) {
return {
data: new Uint8Array(vector.buffer, vector.byteOffset, vector.byteLength),
dtype: "float16",
};
}
if (vector instanceof Float64Array) {
return {
data: new Uint8Array(vector.buffer, vector.byteOffset, vector.byteLength),
dtype: "float64",
};
}
if (vector instanceof Uint8Array && !(vector instanceof Float32Array)) {
return { data: vector, dtype: "uint8" };
}
return null;
}
export function isArrowTable(value: object): value is TableLike { export function isArrowTable(value: object): value is TableLike {
if (value instanceof ArrowTable) return true; if (value instanceof ArrowTable) return true;
return "schema" in value && "batches" in value; return "schema" in value && "batches" in value;

View File

@@ -5,6 +5,7 @@ import {
Table as ArrowTable, Table as ArrowTable,
type IntoVector, type IntoVector,
RecordBatch, RecordBatch,
extractVectorBuffer,
fromBufferToRecordBatch, fromBufferToRecordBatch,
fromRecordBatchToBuffer, fromRecordBatchToBuffer,
tableFromIPC, tableFromIPC,
@@ -661,10 +662,8 @@ export class VectorQuery extends StandardQueryBase<NativeVectorQuery> {
const res = (async () => { const res = (async () => {
try { try {
const v = await vector; const v = await vector;
const arr = Float32Array.from(v);
//
// biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping // biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping
const value: any = this.addQueryVector(arr); const value: any = this.addQueryVector(v);
const inner = value.inner as const inner = value.inner as
| NativeVectorQuery | NativeVectorQuery
| Promise<NativeVectorQuery>; | Promise<NativeVectorQuery>;
@@ -676,7 +675,12 @@ export class VectorQuery extends StandardQueryBase<NativeVectorQuery> {
return new VectorQuery(res); return new VectorQuery(res);
} else { } else {
super.doCall((inner) => { super.doCall((inner) => {
inner.addQueryVector(Float32Array.from(vector)); const raw = Array.isArray(vector) ? null : extractVectorBuffer(vector);
if (raw) {
inner.addQueryVectorRaw(raw.data, raw.dtype);
} else {
inner.addQueryVector(Float32Array.from(vector as number[]));
}
}); });
return this; return this;
} }
@@ -765,14 +769,23 @@ export class Query extends StandardQueryBase<NativeQuery> {
* a default `limit` of 10 will be used. @see {@link Query#limit} * a default `limit` of 10 will be used. @see {@link Query#limit}
*/ */
nearestTo(vector: IntoVector): VectorQuery { nearestTo(vector: IntoVector): VectorQuery {
const callNearestTo = (
inner: NativeQuery,
resolved: Float32Array | Float64Array | Uint8Array | number[],
): NativeVectorQuery => {
const raw = Array.isArray(resolved)
? null
: extractVectorBuffer(resolved);
if (raw) {
return inner.nearestToRaw(raw.data, raw.dtype);
}
return inner.nearestTo(Float32Array.from(resolved as number[]));
};
if (this.inner instanceof Promise) { if (this.inner instanceof Promise) {
const nativeQuery = this.inner.then(async (inner) => { const nativeQuery = this.inner.then(async (inner) => {
if (vector instanceof Promise) { const resolved = vector instanceof Promise ? await vector : vector;
const arr = await vector.then((v) => Float32Array.from(v)); return callNearestTo(inner, resolved);
return inner.nearestTo(arr);
} else {
return inner.nearestTo(Float32Array.from(vector));
}
}); });
return new VectorQuery(nativeQuery); return new VectorQuery(nativeQuery);
} }
@@ -780,10 +793,8 @@ export class Query extends StandardQueryBase<NativeQuery> {
const res = (async () => { const res = (async () => {
try { try {
const v = await vector; const v = await vector;
const arr = Float32Array.from(v);
//
// biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping // biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping
const value: any = this.nearestTo(arr); const value: any = this.nearestTo(v);
const inner = value.inner as const inner = value.inner as
| NativeVectorQuery | NativeVectorQuery
| Promise<NativeVectorQuery>; | Promise<NativeVectorQuery>;
@@ -794,7 +805,7 @@ export class Query extends StandardQueryBase<NativeQuery> {
})(); })();
return new VectorQuery(res); return new VectorQuery(res);
} else { } else {
const vectorQuery = this.inner.nearestTo(Float32Array.from(vector)); const vectorQuery = callNearestTo(this.inner, vector);
return new VectorQuery(vectorQuery); return new VectorQuery(vectorQuery);
} }
} }

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-darwin-arm64", "name": "@lancedb/lancedb-darwin-arm64",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": ["darwin"], "os": ["darwin"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node", "main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-arm64-gnu", "name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node", "main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-arm64-musl", "name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node", "main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-x64-gnu", "name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node", "main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-x64-musl", "name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node", "main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-win32-arm64-msvc", "name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": [ "os": [
"win32" "win32"
], ],

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-win32-x64-msvc", "name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.27.1", "version": "0.27.2-beta.1",
"os": ["win32"], "os": ["win32"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node", "main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{ {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.27.1", "version": "0.27.2-beta.1",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.27.1", "version": "0.27.2-beta.1",
"cpu": [ "cpu": [
"x64", "x64",
"arm64" "arm64"

View File

@@ -11,7 +11,7 @@
"ann" "ann"
], ],
"private": false, "private": false,
"version": "0.27.1", "version": "0.27.2-beta.1",
"main": "dist/index.js", "main": "dist/index.js",
"exports": { "exports": {
".": "./dist/index.js", ".": "./dist/index.js",

View File

@@ -3,6 +3,12 @@
use std::sync::Arc; use std::sync::Arc;
use arrow_array::{
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
};
use arrow_buffer::ScalarBuffer;
use half::f16;
use lancedb::index::scalar::{ use lancedb::index::scalar::{
BooleanQuery, BoostQuery, FtsQuery, FullTextSearchQuery, MatchQuery, MultiMatchQuery, Occur, BooleanQuery, BoostQuery, FtsQuery, FullTextSearchQuery, MatchQuery, MultiMatchQuery, Occur,
Operator, PhraseQuery, Operator, PhraseQuery,
@@ -24,6 +30,33 @@ use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker; use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer}; use crate::util::{parse_distance_type, schema_to_buffer};
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
let buf = arrow_buffer::Buffer::from(data.to_vec());
let num_bytes = buf.len();
match dtype.as_str() {
"float16" => {
let scalar_buf = ScalarBuffer::<f16>::new(buf, 0, num_bytes / 2);
Ok(Arc::new(ArrowFloat16Array::new(scalar_buf, None)))
}
"float32" => {
let scalar_buf = ScalarBuffer::<f32>::new(buf, 0, num_bytes / 4);
Ok(Arc::new(ArrowFloat32Array::new(scalar_buf, None)))
}
"float64" => {
let scalar_buf = ScalarBuffer::<f64>::new(buf, 0, num_bytes / 8);
Ok(Arc::new(ArrowFloat64Array::new(scalar_buf, None)))
}
"uint8" => {
let scalar_buf = ScalarBuffer::<u8>::new(buf, 0, num_bytes);
Ok(Arc::new(ArrowUInt8Array::new(scalar_buf, None)))
}
_ => Err(napi::Error::from_reason(format!(
"Unsupported vector dtype: {}. Expected one of: float16, float32, float64, uint8",
dtype
))),
}
}
#[napi] #[napi]
pub struct Query { pub struct Query {
inner: LanceDbQuery, inner: LanceDbQuery,
@@ -78,6 +111,13 @@ impl Query {
Ok(VectorQuery { inner }) Ok(VectorQuery { inner })
} }
#[napi]
pub fn nearest_to_raw(&mut self, data: Uint8Array, dtype: String) -> Result<VectorQuery> {
let array = bytes_to_arrow_array(data, dtype)?;
let inner = self.inner.clone().nearest_to(array).default_error()?;
Ok(VectorQuery { inner })
}
#[napi] #[napi]
pub fn fast_search(&mut self) { pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search(); self.inner = self.inner.clone().fast_search();
@@ -163,6 +203,13 @@ impl VectorQuery {
Ok(()) Ok(())
} }
#[napi]
pub fn add_query_vector_raw(&mut self, data: Uint8Array, dtype: String) -> Result<()> {
let array = bytes_to_arrow_array(data, dtype)?;
self.inner = self.inner.clone().add_query_vector(array).default_error()?;
Ok(())
}
#[napi] #[napi]
pub fn distance_type(&mut self, distance_type: String) -> napi::Result<()> { pub fn distance_type(&mut self, distance_type: String) -> napi::Result<()> {
let distance_type = parse_distance_type(distance_type)?; let distance_type = parse_distance_type(distance_type)?;

View File

@@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "0.30.2-beta.0" current_version = "0.30.2-beta.1"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb-python" name = "lancedb-python"
version = "0.30.2-beta.0" version = "0.30.2-beta.1"
edition.workspace = true edition.workspace = true
description = "Python bindings for LanceDB" description = "Python bindings for LanceDB"
license.workspace = true license.workspace = true

View File

@@ -10,6 +10,7 @@ import sys
import threading import threading
import time import time
import urllib.error import urllib.error
import urllib.request
import weakref import weakref
import logging import logging
from functools import wraps from functools import wraps

View File

@@ -70,7 +70,7 @@ def ensure_vector_query(
) -> Union[List[float], List[List[float]], pa.Array, List[pa.Array]]: ) -> Union[List[float], List[List[float]], pa.Array, List[pa.Array]]:
if isinstance(val, list): if isinstance(val, list):
if len(val) == 0: if len(val) == 0:
return ValueError("Vector query must be a non-empty list") raise ValueError("Vector query must be a non-empty list")
sample = val[0] sample = val[0]
else: else:
if isinstance(val, float): if isinstance(val, float):
@@ -83,7 +83,7 @@ def ensure_vector_query(
return val return val
if isinstance(sample, list): if isinstance(sample, list):
if len(sample) == 0: if len(sample) == 0:
return ValueError("Vector query must be a non-empty list") raise ValueError("Vector query must be a non-empty list")
if isinstance(sample[0], float): if isinstance(sample[0], float):
# val is list of list of floats # val is list of list of floats
return val return val

View File

@@ -278,7 +278,7 @@ def _sanitize_data(
if metadata: if metadata:
new_metadata = target_schema.metadata or {} new_metadata = target_schema.metadata or {}
new_metadata = new_metadata.update(metadata) new_metadata.update(metadata)
target_schema = target_schema.with_metadata(new_metadata) target_schema = target_schema.with_metadata(new_metadata)
_validate_schema(target_schema) _validate_schema(target_schema)
@@ -3857,7 +3857,13 @@ class AsyncTable:
# _santitize_data is an old code path, but we will use it until the # _santitize_data is an old code path, but we will use it until the
# new code path is ready. # new code path is ready.
if on_bad_vectors != "error" or ( if mode == "overwrite":
# For overwrite, apply the same preprocessing as create_table
# so vector columns are inferred as FixedSizeList.
data, _ = sanitize_create_table(
data, None, on_bad_vectors=on_bad_vectors, fill_value=fill_value
)
elif on_bad_vectors != "error" or (
schema.metadata is not None and b"embedding_functions" in schema.metadata schema.metadata is not None and b"embedding_functions" in schema.metadata
): ):
data = _sanitize_data( data = _sanitize_data(

View File

@@ -546,3 +546,23 @@ def test_openai_no_retry_on_401(mock_sleep):
assert mock_func.call_count == 1 assert mock_func.call_count == 1
# Verify that sleep was never called (no retries) # Verify that sleep was never called (no retries)
assert mock_sleep.call_count == 0 assert mock_sleep.call_count == 0
def test_url_retrieve_downloads_image():
"""
Embedding functions like open-clip, siglip, and jinaai use url_retrieve()
to download images from HTTP URLs. For example, open_clip._to_pil() calls:
PIL_Image.open(io.BytesIO(url_retrieve(image)))
Verify that url_retrieve() can download an image and open it as PIL Image,
matching the real usage pattern in embedding functions.
"""
import io
from PIL import Image
from lancedb.embeddings.utils import url_retrieve
image_url = "http://farm1.staticflickr.com/53/167798175_7c7845bbbd_z.jpg"
image_bytes = url_retrieve(image_url)
img = Image.open(io.BytesIO(image_bytes))
assert img.size[0] > 0 and img.size[1] > 0

View File

@@ -8,6 +8,7 @@ import shutil
import pytest import pytest
import pyarrow as pa import pyarrow as pa
import lancedb import lancedb
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
class TestNamespaceConnection: class TestNamespaceConnection:
@@ -130,7 +131,7 @@ class TestNamespaceConnection:
assert len(list(db.table_names(namespace=["test_ns"]))) == 0 assert len(list(db.table_names(namespace=["test_ns"]))) == 0
# Should not be able to open dropped table # Should not be able to open dropped table
with pytest.raises(RuntimeError): with pytest.raises(TableNotFoundError):
db.open_table("table1", namespace=["test_ns"]) db.open_table("table1", namespace=["test_ns"])
def test_create_table_with_schema(self): def test_create_table_with_schema(self):
@@ -340,7 +341,7 @@ class TestNamespaceConnection:
db.create_table("test_table", schema=schema, namespace=["test_namespace"]) db.create_table("test_table", schema=schema, namespace=["test_namespace"])
# Try to drop namespace with tables - should fail # Try to drop namespace with tables - should fail
with pytest.raises(RuntimeError, match="is not empty"): with pytest.raises(NamespaceNotEmptyError):
db.drop_namespace(["test_namespace"]) db.drop_namespace(["test_namespace"])
# Drop table first # Drop table first

View File

@@ -30,6 +30,7 @@ from lancedb.query import (
PhraseQuery, PhraseQuery,
Query, Query,
FullTextSearchQuery, FullTextSearchQuery,
ensure_vector_query,
) )
from lancedb.rerankers.cross_encoder import CrossEncoderReranker from lancedb.rerankers.cross_encoder import CrossEncoderReranker
from lancedb.table import AsyncTable, LanceTable from lancedb.table import AsyncTable, LanceTable
@@ -1501,6 +1502,18 @@ def test_search_empty_table(mem_db):
assert results == [] assert results == []
def test_ensure_vector_query_empty_list():
"""Regression: ensure_vector_query used to return instead of raise ValueError."""
with pytest.raises(ValueError, match="non-empty"):
ensure_vector_query([])
def test_ensure_vector_query_nested_empty_list():
"""Regression: ensure_vector_query used to return instead of raise ValueError."""
with pytest.raises(ValueError, match="non-empty"):
ensure_vector_query([[]])
def test_fast_search(tmp_path): def test_fast_search(tmp_path):
db = lancedb.connect(tmp_path) db = lancedb.connect(tmp_path)

View File

@@ -527,6 +527,36 @@ async def test_add_async(mem_db_async: AsyncConnection):
assert await table.count_rows() == 3 assert await table.count_rows() == 3
def test_add_overwrite_infers_vector_schema(mem_db: DBConnection):
"""Overwrite should infer vector columns the same way create_table does.
Regression test for https://github.com/lancedb/lancedb/issues/3183
"""
table = mem_db.create_table(
"test_overwrite_vec",
data=[
{"vector": [1.0, 2.0, 3.0, 4.0], "item": "foo"},
{"vector": [5.0, 6.0, 7.0, 8.0], "item": "bar"},
],
)
# create_table infers vector as fixed_size_list<float32, 4>
original_type = table.schema.field("vector").type
assert pa.types.is_fixed_size_list(original_type)
# overwrite with plain Python lists (PyArrow infers list<double>)
table.add(
[
{"vector": [10.0, 20.0, 30.0, 40.0], "item": "baz"},
],
mode="overwrite",
)
# overwrite should infer vector column the same way as create_table
new_type = table.schema.field("vector").type
assert pa.types.is_fixed_size_list(new_type), (
f"Expected fixed_size_list after overwrite, got {new_type}"
)
def test_add_progress_callback(mem_db: DBConnection): def test_add_progress_callback(mem_db: DBConnection):
table = mem_db.create_table( table = mem_db.create_table(
"test", "test",
@@ -2143,3 +2173,33 @@ def test_table_uri(tmp_path):
db = lancedb.connect(tmp_path) db = lancedb.connect(tmp_path)
table = db.create_table("my_table", data=[{"x": 0}]) table = db.create_table("my_table", data=[{"x": 0}])
assert table.uri == str(tmp_path / "my_table.lance") assert table.uri == str(tmp_path / "my_table.lance")
def test_sanitize_data_metadata_not_stripped():
"""Regression test: dict.update() returns None, so assigning its result
would silently replace metadata with None, causing with_metadata(None)
to strip all schema metadata from the target schema."""
from lancedb.table import _sanitize_data
schema = pa.schema(
[pa.field("x", pa.int64())],
metadata={b"existing_key": b"existing_value"},
)
batch = pa.record_batch([pa.array([1, 2, 3])], schema=schema)
# Use a different field type so the reader and target schemas differ,
# forcing _cast_to_target_schema to rebuild the schema with the
# target's metadata (instead of taking the fast-path).
target_schema = pa.schema(
[pa.field("x", pa.int32())],
metadata={b"existing_key": b"existing_value"},
)
reader = pa.RecordBatchReader.from_batches(schema, [batch])
metadata = {b"new_key": b"new_value"}
result = _sanitize_data(reader, target_schema=target_schema, metadata=metadata)
result_schema = result.schema
assert result_schema.metadata is not None
assert result_schema.metadata[b"existing_key"] == b"existing_value"
assert result_schema.metadata[b"new_key"] == b"new_value"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb" name = "lancedb"
version = "0.27.1" version = "0.27.2-beta.1"
edition.workspace = true edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications" description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true license.workspace = true

View File

@@ -240,7 +240,7 @@ impl Shuffler {
.await?; .await?;
// Need to read the entire file in a single batch for in-memory shuffling // Need to read the entire file in a single batch for in-memory shuffling
let batch = reader.read_record_batch(0, reader.num_rows()).await?; let batch = reader.read_record_batch(0, reader.num_rows()).await?;
let mut rng = rng.lock().unwrap(); let mut rng = rng.lock().unwrap_or_else(|e| e.into_inner());
Self::shuffle_batch(&batch, &mut rng, clump_size) Self::shuffle_batch(&batch, &mut rng, clump_size)
} }
}) })

View File

@@ -66,13 +66,13 @@ impl IoTrackingStore {
} }
fn record_read(&self, num_bytes: u64) { fn record_read(&self, num_bytes: u64) {
let mut stats = self.stats.lock().unwrap(); let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
stats.read_iops += 1; stats.read_iops += 1;
stats.read_bytes += num_bytes; stats.read_bytes += num_bytes;
} }
fn record_write(&self, num_bytes: u64) { fn record_write(&self, num_bytes: u64) {
let mut stats = self.stats.lock().unwrap(); let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
stats.write_iops += 1; stats.write_iops += 1;
stats.write_bytes += num_bytes; stats.write_bytes += num_bytes;
} }
@@ -229,10 +229,63 @@ impl MultipartUpload for IoTrackingMultipartUpload {
fn put_part(&mut self, payload: PutPayload) -> UploadPart { fn put_part(&mut self, payload: PutPayload) -> UploadPart {
{ {
let mut stats = self.stats.lock().unwrap(); let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
stats.write_iops += 1; stats.write_iops += 1;
stats.write_bytes += payload.content_length() as u64; stats.write_bytes += payload.content_length() as u64;
} }
self.target.put_part(payload) self.target.put_part(payload)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
/// Helper: poison a Mutex<IoStats> by panicking while holding the lock.
fn poison_stats(stats: &Arc<Mutex<IoStats>>) {
let stats_clone = stats.clone();
let handle = std::thread::spawn(move || {
let _guard = stats_clone.lock().unwrap();
panic!("intentional panic to poison stats mutex");
});
let _ = handle.join();
assert!(stats.lock().is_err(), "mutex should be poisoned");
}
#[test]
fn test_record_read_recovers_from_poisoned_lock() {
let stats = Arc::new(Mutex::new(IoStats::default()));
let store = IoTrackingStore {
target: Arc::new(object_store::memory::InMemory::new()),
stats: stats.clone(),
};
poison_stats(&stats);
// record_read should not panic
store.record_read(1024);
// Verify the stats were updated despite poisoning
let s = stats.lock().unwrap_or_else(|e| e.into_inner());
assert_eq!(s.read_iops, 1);
assert_eq!(s.read_bytes, 1024);
}
#[test]
fn test_record_write_recovers_from_poisoned_lock() {
let stats = Arc::new(Mutex::new(IoStats::default()));
let store = IoTrackingStore {
target: Arc::new(object_store::memory::InMemory::new()),
stats: stats.clone(),
};
poison_stats(&stats);
// record_write should not panic
store.record_write(2048);
let s = stats.lock().unwrap_or_else(|e| e.into_inner());
assert_eq!(s.write_iops, 1);
assert_eq!(s.write_bytes, 2048);
}
}

View File

@@ -130,7 +130,10 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
// TODO: this will be used when we wire this up to Table::add(). // TODO: this will be used when we wire this up to Table::add().
#[allow(dead_code)] #[allow(dead_code)]
pub fn add_result(&self) -> Option<AddResult> { pub fn add_result(&self) -> Option<AddResult> {
self.add_result.lock().unwrap().clone() self.add_result
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
} }
/// Stream the input into an HTTP body as an Arrow IPC stream, capturing any /// Stream the input into an HTTP body as an Arrow IPC stream, capturing any

View File

@@ -204,7 +204,9 @@ impl ExecutionPlan for InsertExec {
let to_commit = { let to_commit = {
// Don't hold the lock over an await point. // Don't hold the lock over an await point.
let mut txns = partial_transactions.lock().unwrap(); let mut txns = partial_transactions
.lock()
.unwrap_or_else(|e| e.into_inner());
txns.push(transaction); txns.push(transaction);
if txns.len() == total_partitions { if txns.len() == total_partitions {
Some(std::mem::take(&mut *txns)) Some(std::mem::take(&mut *txns))

View File

@@ -82,7 +82,7 @@ impl DatasetConsistencyWrapper {
/// pinned dataset regardless of consistency mode. /// pinned dataset regardless of consistency mode.
pub async fn get(&self) -> Result<Arc<Dataset>> { pub async fn get(&self) -> Result<Arc<Dataset>> {
{ {
let state = self.state.lock().unwrap(); let state = self.state.lock()?;
if state.pinned_version.is_some() { if state.pinned_version.is_some() {
return Ok(state.dataset.clone()); return Ok(state.dataset.clone());
} }
@@ -101,7 +101,7 @@ impl DatasetConsistencyWrapper {
} }
ConsistencyMode::Strong => refresh_latest(self.state.clone()).await, ConsistencyMode::Strong => refresh_latest(self.state.clone()).await,
ConsistencyMode::Lazy => { ConsistencyMode::Lazy => {
let state = self.state.lock().unwrap(); let state = self.state.lock()?;
Ok(state.dataset.clone()) Ok(state.dataset.clone())
} }
} }
@@ -116,7 +116,7 @@ impl DatasetConsistencyWrapper {
/// concurrent [`as_time_travel`](Self::as_time_travel) call), the update /// concurrent [`as_time_travel`](Self::as_time_travel) call), the update
/// is silently ignored — the write already committed to storage. /// is silently ignored — the write already committed to storage.
pub fn update(&self, dataset: Dataset) { pub fn update(&self, dataset: Dataset) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
if state.pinned_version.is_some() { if state.pinned_version.is_some() {
// A concurrent as_time_travel() beat us here. The write succeeded // A concurrent as_time_travel() beat us here. The write succeeded
// in storage, but since we're now pinned we don't advance the // in storage, but since we're now pinned we don't advance the
@@ -139,7 +139,7 @@ impl DatasetConsistencyWrapper {
/// Check that the dataset is in a mutable mode (Latest). /// Check that the dataset is in a mutable mode (Latest).
pub fn ensure_mutable(&self) -> Result<()> { pub fn ensure_mutable(&self) -> Result<()> {
let state = self.state.lock().unwrap(); let state = self.state.lock()?;
if state.pinned_version.is_some() { if state.pinned_version.is_some() {
Err(crate::Error::InvalidInput { Err(crate::Error::InvalidInput {
message: "table cannot be modified when a specific version is checked out" message: "table cannot be modified when a specific version is checked out"
@@ -152,13 +152,16 @@ impl DatasetConsistencyWrapper {
/// Returns the version, if in time travel mode, or None otherwise. /// Returns the version, if in time travel mode, or None otherwise.
pub fn time_travel_version(&self) -> Option<u64> { pub fn time_travel_version(&self) -> Option<u64> {
self.state.lock().unwrap().pinned_version self.state
.lock()
.unwrap_or_else(|e| e.into_inner())
.pinned_version
} }
/// Convert into a wrapper in latest version mode. /// Convert into a wrapper in latest version mode.
pub async fn as_latest(&self) -> Result<()> { pub async fn as_latest(&self) -> Result<()> {
let dataset = { let dataset = {
let state = self.state.lock().unwrap(); let state = self.state.lock()?;
if state.pinned_version.is_none() { if state.pinned_version.is_none() {
return Ok(()); return Ok(());
} }
@@ -168,7 +171,7 @@ impl DatasetConsistencyWrapper {
let latest_version = dataset.latest_version_id().await?; let latest_version = dataset.latest_version_id().await?;
let new_dataset = dataset.checkout_version(latest_version).await?; let new_dataset = dataset.checkout_version(latest_version).await?;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock()?;
if state.pinned_version.is_some() { if state.pinned_version.is_some() {
state.dataset = Arc::new(new_dataset); state.dataset = Arc::new(new_dataset);
state.pinned_version = None; state.pinned_version = None;
@@ -184,7 +187,7 @@ impl DatasetConsistencyWrapper {
let target_ref = target_version.into(); let target_ref = target_version.into();
let (should_checkout, dataset) = { let (should_checkout, dataset) = {
let state = self.state.lock().unwrap(); let state = self.state.lock()?;
let should = match state.pinned_version { let should = match state.pinned_version {
None => true, None => true,
Some(version) => match &target_ref { Some(version) => match &target_ref {
@@ -204,7 +207,7 @@ impl DatasetConsistencyWrapper {
let new_dataset = dataset.checkout_version(target_ref).await?; let new_dataset = dataset.checkout_version(target_ref).await?;
let version_value = new_dataset.version().version; let version_value = new_dataset.version().version;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock()?;
state.dataset = Arc::new(new_dataset); state.dataset = Arc::new(new_dataset);
state.pinned_version = Some(version_value); state.pinned_version = Some(version_value);
Ok(()) Ok(())
@@ -212,7 +215,7 @@ impl DatasetConsistencyWrapper {
pub async fn reload(&self) -> Result<()> { pub async fn reload(&self) -> Result<()> {
let (dataset, pinned_version) = { let (dataset, pinned_version) = {
let state = self.state.lock().unwrap(); let state = self.state.lock()?;
(state.dataset.clone(), state.pinned_version) (state.dataset.clone(), state.pinned_version)
}; };
@@ -230,7 +233,7 @@ impl DatasetConsistencyWrapper {
let new_dataset = dataset.checkout_version(version).await?; let new_dataset = dataset.checkout_version(version).await?;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock()?;
if state.pinned_version == Some(version) { if state.pinned_version == Some(version) {
state.dataset = Arc::new(new_dataset); state.dataset = Arc::new(new_dataset);
} }
@@ -242,14 +245,14 @@ impl DatasetConsistencyWrapper {
} }
async fn refresh_latest(state: Arc<Mutex<DatasetState>>) -> Result<Arc<Dataset>> { async fn refresh_latest(state: Arc<Mutex<DatasetState>>) -> Result<Arc<Dataset>> {
let dataset = { state.lock().unwrap().dataset.clone() }; let dataset = { state.lock()?.dataset.clone() };
let mut ds = (*dataset).clone(); let mut ds = (*dataset).clone();
ds.checkout_latest().await?; ds.checkout_latest().await?;
let new_arc = Arc::new(ds); let new_arc = Arc::new(ds);
{ {
let mut state = state.lock().unwrap(); let mut state = state.lock()?;
if state.pinned_version.is_none() if state.pinned_version.is_none()
&& new_arc.manifest().version >= state.dataset.manifest().version && new_arc.manifest().version >= state.dataset.manifest().version
{ {
@@ -612,4 +615,108 @@ mod tests {
let s = io_stats.incremental_stats(); let s = io_stats.incremental_stats();
assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed()); assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed());
} }
/// Helper: poison the mutex inside a DatasetConsistencyWrapper.
fn poison_state(wrapper: &DatasetConsistencyWrapper) {
let state = wrapper.state.clone();
let handle = std::thread::spawn(move || {
let _guard = state.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join(); // join collects the panic
assert!(wrapper.state.lock().is_err(), "mutex should be poisoned");
}
#[tokio::test]
async fn test_get_returns_error_on_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
// get() should return Err, not panic
let result = wrapper.get().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_ensure_mutable_returns_error_on_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
let result = wrapper.ensure_mutable();
assert!(result.is_err());
}
#[tokio::test]
async fn test_update_recovers_from_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let ds_v2 = append_to_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
// update() returns (), should not panic
wrapper.update(ds_v2);
}
#[tokio::test]
async fn test_time_travel_version_recovers_from_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
// Should not panic, returns whatever was in the mutex
let _version = wrapper.time_travel_version();
}
#[tokio::test]
async fn test_as_latest_returns_error_on_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
let result = wrapper.as_latest().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_as_time_travel_returns_error_on_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
let result = wrapper.as_time_travel(1u64).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_reload_returns_error_on_poisoned_lock() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = create_test_dataset(uri).await;
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
poison_state(&wrapper);
let result = wrapper.reload().await;
assert!(result.is_err());
}
} }

View File

@@ -130,8 +130,11 @@ impl WriteProgressTracker {
pub fn record_batch(&self, rows: usize, bytes: usize) { pub fn record_batch(&self, rows: usize, bytes: usize) {
// Lock order: callback first, then rows_and_bytes. This is the only // Lock order: callback first, then rows_and_bytes. This is the only
// order used anywhere, so deadlocks cannot occur. // order used anywhere, so deadlocks cannot occur.
let mut cb = self.callback.lock().unwrap(); let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
let mut guard = self.rows_and_bytes.lock().unwrap(); let mut guard = self
.rows_and_bytes
.lock()
.unwrap_or_else(|e| e.into_inner());
guard.0 += rows; guard.0 += rows;
guard.1 += bytes; guard.1 += bytes;
let progress = self.snapshot(guard.0, guard.1, false); let progress = self.snapshot(guard.0, guard.1, false);
@@ -151,8 +154,11 @@ impl WriteProgressTracker {
/// `total_rows` is always `Some` on the final callback: it uses the known /// `total_rows` is always `Some` on the final callback: it uses the known
/// total if available, or falls back to the number of rows actually written. /// total if available, or falls back to the number of rows actually written.
pub fn finish(&self) { pub fn finish(&self) {
let mut cb = self.callback.lock().unwrap(); let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
let guard = self.rows_and_bytes.lock().unwrap(); let guard = self
.rows_and_bytes
.lock()
.unwrap_or_else(|e| e.into_inner());
let mut snap = self.snapshot(guard.0, guard.1, true); let mut snap = self.snapshot(guard.0, guard.1, true);
snap.total_rows = Some(self.total_rows.unwrap_or(guard.0)); snap.total_rows = Some(self.total_rows.unwrap_or(guard.0));
drop(guard); drop(guard);
@@ -376,4 +382,50 @@ mod tests {
} }
} }
} }
#[test]
fn test_record_batch_recovers_from_poisoned_callback_lock() {
use super::{ProgressCallback, WriteProgressTracker};
use std::sync::Mutex;
let callback: ProgressCallback = Arc::new(Mutex::new(|_: &super::WriteProgress| {}));
// Poison the callback mutex
let cb_clone = callback.clone();
let handle = std::thread::spawn(move || {
let _guard = cb_clone.lock().unwrap();
panic!("intentional panic to poison callback mutex");
});
let _ = handle.join();
assert!(
callback.lock().is_err(),
"callback mutex should be poisoned"
);
let tracker = WriteProgressTracker::new(callback, Some(100));
// record_batch should not panic
tracker.record_batch(10, 1024);
}
#[test]
fn test_finish_recovers_from_poisoned_callback_lock() {
use super::{ProgressCallback, WriteProgressTracker};
use std::sync::Mutex;
let callback: ProgressCallback = Arc::new(Mutex::new(|_: &super::WriteProgress| {}));
// Poison the callback mutex
let cb_clone = callback.clone();
let handle = std::thread::spawn(move || {
let _guard = cb_clone.lock().unwrap();
panic!("intentional panic to poison callback mutex");
});
let _ = handle.join();
let tracker = WriteProgressTracker::new(callback, Some(100));
// finish should not panic
tracker.finish();
}
} }

View File

@@ -122,7 +122,7 @@ where
/// This is a cheap synchronous check useful as a fast path before /// This is a cheap synchronous check useful as a fast path before
/// constructing a fetch closure for [`get()`](Self::get). /// constructing a fetch closure for [`get()`](Self::get).
pub fn try_get(&self) -> Option<V> { pub fn try_get(&self) -> Option<V> {
let cache = self.inner.lock().unwrap(); let cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
cache.state.fresh_value(self.ttl, self.refresh_window) cache.state.fresh_value(self.ttl, self.refresh_window)
} }
@@ -138,7 +138,7 @@ where
{ {
// Fast path: check if cache is fresh // Fast path: check if cache is fresh
{ {
let cache = self.inner.lock().unwrap(); let cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if let Some(value) = cache.state.fresh_value(self.ttl, self.refresh_window) { if let Some(value) = cache.state.fresh_value(self.ttl, self.refresh_window) {
return Ok(value); return Ok(value);
} }
@@ -147,7 +147,7 @@ where
// Slow path // Slow path
let mut fetch = Some(fetch); let mut fetch = Some(fetch);
let action = { let action = {
let mut cache = self.inner.lock().unwrap(); let mut cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
self.determine_action(&mut cache, &mut fetch) self.determine_action(&mut cache, &mut fetch)
}; };
@@ -161,7 +161,7 @@ where
/// ///
/// This avoids a blocking fetch on the first [`get()`](Self::get) call. /// This avoids a blocking fetch on the first [`get()`](Self::get) call.
pub fn seed(&self, value: V) { pub fn seed(&self, value: V) {
let mut cache = self.inner.lock().unwrap(); let mut cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
cache.state = State::Current(value, clock::now()); cache.state = State::Current(value, clock::now());
} }
@@ -170,7 +170,7 @@ where
/// Any in-flight background fetch from before this call will not update the /// Any in-flight background fetch from before this call will not update the
/// cache (the generation counter prevents stale writes). /// cache (the generation counter prevents stale writes).
pub fn invalidate(&self) { pub fn invalidate(&self) {
let mut cache = self.inner.lock().unwrap(); let mut cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
cache.state = State::Empty; cache.state = State::Empty;
cache.generation += 1; cache.generation += 1;
} }
@@ -267,7 +267,7 @@ where
let fut_for_spawn = shared.clone(); let fut_for_spawn = shared.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = fut_for_spawn.await; let result = fut_for_spawn.await;
let mut cache = inner.lock().unwrap(); let mut cache = inner.lock().unwrap_or_else(|e| e.into_inner());
// Only update if no invalidation has happened since we started // Only update if no invalidation has happened since we started
if cache.generation != generation { if cache.generation != generation {
return; return;
@@ -590,4 +590,67 @@ mod tests {
let v = cache.get(ok_fetcher(count.clone(), "fresh")).await.unwrap(); let v = cache.get(ok_fetcher(count.clone(), "fresh")).await.unwrap();
assert_eq!(v, "fresh"); assert_eq!(v, "fresh");
} }
/// Helper: poison the inner mutex of a BackgroundCache.
fn poison_cache(cache: &BackgroundCache<String, TestError>) {
let inner = cache.inner.clone();
let handle = std::thread::spawn(move || {
let _guard = inner.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
assert!(cache.inner.lock().is_err(), "mutex should be poisoned");
}
#[tokio::test]
async fn test_try_get_recovers_from_poisoned_lock() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
// Seed a value first
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap(); // peek
poison_cache(&cache);
// try_get() should not panic — it recovers via unwrap_or_else
let result = cache.try_get();
// The value may or may not be fresh depending on timing, but it must not panic
let _ = result;
}
#[tokio::test]
async fn test_get_recovers_from_poisoned_lock() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
poison_cache(&cache);
// get() should not panic — it recovers and can still fetch
let result = cache.get(ok_fetcher(count.clone(), "recovered")).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "recovered");
}
#[tokio::test]
async fn test_seed_recovers_from_poisoned_lock() {
let cache = new_cache();
poison_cache(&cache);
// seed() should not panic
cache.seed("seeded".to_string());
}
#[tokio::test]
async fn test_invalidate_recovers_from_poisoned_lock() {
let cache = new_cache();
let count = Arc::new(AtomicUsize::new(0));
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
poison_cache(&cache);
// invalidate() should not panic
cache.invalidate();
}
} }