mirror of
https://github.com/lancedb/lancedb.git
synced 2026-03-30 20:40:42 +00:00
Compare commits
12 Commits
python-v0.
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66804e99fc | ||
|
|
9f85d4c639 | ||
|
|
1ba19d728e | ||
|
|
4c44587af0 | ||
|
|
1d1cafb59c | ||
|
|
4714598155 | ||
|
|
74f457a0f2 | ||
|
|
cca6a7c989 | ||
|
|
ad96489114 | ||
|
|
76429730c0 | ||
|
|
874b74dd3c | ||
|
|
61de47f3a5 |
@@ -1,5 +1,5 @@
|
|||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.27.1"
|
current_version = "0.27.2-beta.1"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
|
|||||||
@@ -23,8 +23,10 @@ runs:
|
|||||||
steps:
|
steps:
|
||||||
- name: CONFIRM ARM BUILD
|
- name: CONFIRM ARM BUILD
|
||||||
shell: bash
|
shell: bash
|
||||||
|
env:
|
||||||
|
ARM_BUILD: ${{ inputs.arm-build }}
|
||||||
run: |
|
run: |
|
||||||
echo "ARM BUILD: ${{ inputs.arm-build }}"
|
echo "ARM BUILD: $ARM_BUILD"
|
||||||
- name: Build x86_64 Manylinux wheel
|
- name: Build x86_64 Manylinux wheel
|
||||||
if: ${{ inputs.arm-build == 'false' }}
|
if: ${{ inputs.arm-build == 'false' }}
|
||||||
uses: PyO3/maturin-action@v1
|
uses: PyO3/maturin-action@v1
|
||||||
|
|||||||
2119
Cargo.lock
generated
2119
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
28
Cargo.toml
28
Cargo.toml
@@ -15,20 +15,20 @@ categories = ["database-implementations"]
|
|||||||
rust-version = "1.91.0"
|
rust-version = "1.91.0"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
lance = { version = "=3.0.1", default-features = false }
|
lance = { "version" = "=4.0.0-rc.3", default-features = false, "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-core = { version = "=3.0.1" }
|
lance-core = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-datagen = { version = "=3.0.1" }
|
lance-datagen = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-file = { version = "=3.0.1" }
|
lance-file = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-io = { version = "=3.0.1", default-features = false }
|
lance-io = { "version" = "=4.0.0-rc.3", default-features = false, "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-index = { version = "=3.0.1" }
|
lance-index = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-linalg = { version = "=3.0.1" }
|
lance-linalg = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-namespace = { version = "=3.0.1" }
|
lance-namespace = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-namespace-impls = { version = "=3.0.1", default-features = false }
|
lance-namespace-impls = { "version" = "=4.0.0-rc.3", default-features = false, "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-table = { version = "=3.0.1" }
|
lance-table = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-testing = { version = "=3.0.1" }
|
lance-testing = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-datafusion = { version = "=3.0.1" }
|
lance-datafusion = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-encoding = { version = "=3.0.1" }
|
lance-encoding = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-arrow = { version = "=3.0.1" }
|
lance-arrow = { "version" = "=4.0.0-rc.3", "tag" = "v4.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
ahash = "0.8"
|
ahash = "0.8"
|
||||||
# Note that this one does not include pyarrow
|
# Note that this one does not include pyarrow
|
||||||
arrow = { version = "57.2", optional = false }
|
arrow = { version = "57.2", optional = false }
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.lancedb</groupId>
|
<groupId>com.lancedb</groupId>
|
||||||
<artifactId>lancedb-core</artifactId>
|
<artifactId>lancedb-core</artifactId>
|
||||||
<version>0.27.1</version>
|
<version>0.27.2-beta.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ new EmbeddingFunction<T, M>(): EmbeddingFunction<T, M>
|
|||||||
### computeQueryEmbeddings()
|
### computeQueryEmbeddings()
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
computeQueryEmbeddings(data): Promise<number[] | Float32Array | Float64Array>
|
computeQueryEmbeddings(data): Promise<number[] | Uint8Array | Float32Array | Float64Array>
|
||||||
```
|
```
|
||||||
|
|
||||||
Compute the embeddings for a single query
|
Compute the embeddings for a single query
|
||||||
@@ -63,7 +63,7 @@ Compute the embeddings for a single query
|
|||||||
|
|
||||||
#### Returns
|
#### Returns
|
||||||
|
|
||||||
`Promise`<`number`[] \| `Float32Array` \| `Float64Array`>
|
`Promise`<`number`[] \| `Uint8Array` \| `Float32Array` \| `Float64Array`>
|
||||||
|
|
||||||
***
|
***
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ new TextEmbeddingFunction<M>(): TextEmbeddingFunction<M>
|
|||||||
### computeQueryEmbeddings()
|
### computeQueryEmbeddings()
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
computeQueryEmbeddings(data): Promise<number[] | Float32Array | Float64Array>
|
computeQueryEmbeddings(data): Promise<number[] | Uint8Array | Float32Array | Float64Array>
|
||||||
```
|
```
|
||||||
|
|
||||||
Compute the embeddings for a single query
|
Compute the embeddings for a single query
|
||||||
@@ -48,7 +48,7 @@ Compute the embeddings for a single query
|
|||||||
|
|
||||||
#### Returns
|
#### Returns
|
||||||
|
|
||||||
`Promise`<`number`[] \| `Float32Array` \| `Float64Array`>
|
`Promise`<`number`[] \| `Uint8Array` \| `Float32Array` \| `Float64Array`>
|
||||||
|
|
||||||
#### Overrides
|
#### Overrides
|
||||||
|
|
||||||
|
|||||||
@@ -7,5 +7,10 @@
|
|||||||
# Type Alias: IntoVector
|
# Type Alias: IntoVector
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
type IntoVector: Float32Array | Float64Array | number[] | Promise<Float32Array | Float64Array | number[]>;
|
type IntoVector:
|
||||||
|
| Float32Array
|
||||||
|
| Float64Array
|
||||||
|
| Uint8Array
|
||||||
|
| number[]
|
||||||
|
| Promise<Float32Array | Float64Array | Uint8Array | number[]>;
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<groupId>com.lancedb</groupId>
|
<groupId>com.lancedb</groupId>
|
||||||
<artifactId>lancedb-parent</artifactId>
|
<artifactId>lancedb-parent</artifactId>
|
||||||
<version>0.27.1-final.0</version>
|
<version>0.27.2-beta.1</version>
|
||||||
<relativePath>../pom.xml</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.lancedb</groupId>
|
<groupId>com.lancedb</groupId>
|
||||||
<artifactId>lancedb-parent</artifactId>
|
<artifactId>lancedb-parent</artifactId>
|
||||||
<version>0.27.1-final.0</version>
|
<version>0.27.2-beta.1</version>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<name>${project.artifactId}</name>
|
<name>${project.artifactId}</name>
|
||||||
<description>LanceDB Java SDK Parent POM</description>
|
<description>LanceDB Java SDK Parent POM</description>
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb-nodejs"
|
name = "lancedb-nodejs"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
version = "0.27.1"
|
version = "0.27.2-beta.1"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
description.workspace = true
|
description.workspace = true
|
||||||
repository.workspace = true
|
repository.workspace = true
|
||||||
@@ -15,6 +15,8 @@ crate-type = ["cdylib"]
|
|||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
arrow-ipc.workspace = true
|
arrow-ipc.workspace = true
|
||||||
arrow-array.workspace = true
|
arrow-array.workspace = true
|
||||||
|
arrow-buffer = "57.2"
|
||||||
|
half.workspace = true
|
||||||
arrow-schema.workspace = true
|
arrow-schema.workspace = true
|
||||||
env_logger.workspace = true
|
env_logger.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
|
|||||||
110
nodejs/__test__/vector_types.test.ts
Normal file
110
nodejs/__test__/vector_types.test.ts
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
import * as tmp from "tmp";
|
||||||
|
|
||||||
|
import { type Table, connect } from "../lancedb";
|
||||||
|
import {
|
||||||
|
Field,
|
||||||
|
FixedSizeList,
|
||||||
|
Float32,
|
||||||
|
Int64,
|
||||||
|
Schema,
|
||||||
|
makeArrowTable,
|
||||||
|
} from "../lancedb/arrow";
|
||||||
|
|
||||||
|
describe("Vector query with different typed arrays", () => {
|
||||||
|
let tmpDir: tmp.DirResult;
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
tmpDir?.removeCallback();
|
||||||
|
});
|
||||||
|
|
||||||
|
async function createFloat32Table(): Promise<Table> {
|
||||||
|
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||||
|
const db = await connect(tmpDir.name);
|
||||||
|
const schema = new Schema([
|
||||||
|
new Field("id", new Int64(), true),
|
||||||
|
new Field(
|
||||||
|
"vec",
|
||||||
|
new FixedSizeList(2, new Field("item", new Float32())),
|
||||||
|
true,
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
const data = makeArrowTable(
|
||||||
|
[
|
||||||
|
{ id: 1n, vec: [1.0, 0.0] },
|
||||||
|
{ id: 2n, vec: [0.0, 1.0] },
|
||||||
|
{ id: 3n, vec: [1.0, 1.0] },
|
||||||
|
],
|
||||||
|
{ schema },
|
||||||
|
);
|
||||||
|
return db.createTable("test_f32", data);
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should search with Float32Array (baseline)", async () => {
|
||||||
|
const table = await createFloat32Table();
|
||||||
|
const results = await table
|
||||||
|
.query()
|
||||||
|
.nearestTo(new Float32Array([1.0, 0.0]))
|
||||||
|
.limit(1)
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
expect(results.length).toBe(1);
|
||||||
|
expect(Number(results[0].id)).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should search with number[] (backward compat)", async () => {
|
||||||
|
const table = await createFloat32Table();
|
||||||
|
const results = await table
|
||||||
|
.query()
|
||||||
|
.nearestTo([1.0, 0.0])
|
||||||
|
.limit(1)
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
expect(results.length).toBe(1);
|
||||||
|
expect(Number(results[0].id)).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should search with Float64Array via raw path", async () => {
|
||||||
|
const table = await createFloat32Table();
|
||||||
|
const results = await table
|
||||||
|
.query()
|
||||||
|
.nearestTo(new Float64Array([1.0, 0.0]))
|
||||||
|
.limit(1)
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
expect(results.length).toBe(1);
|
||||||
|
expect(Number(results[0].id)).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should add multiple query vectors with Float64Array", async () => {
|
||||||
|
const table = await createFloat32Table();
|
||||||
|
const results = await table
|
||||||
|
.query()
|
||||||
|
.nearestTo(new Float64Array([1.0, 0.0]))
|
||||||
|
.addQueryVector(new Float64Array([0.0, 1.0]))
|
||||||
|
.limit(2)
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
expect(results.length).toBeGreaterThanOrEqual(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Float16Array is only available in Node 22+; not in TypeScript's standard lib yet
|
||||||
|
const float16ArrayCtor = (globalThis as unknown as Record<string, unknown>)
|
||||||
|
.Float16Array as (new (values: number[]) => unknown) | undefined;
|
||||||
|
const hasFloat16 = float16ArrayCtor !== undefined;
|
||||||
|
const f16it = hasFloat16 ? it : it.skip;
|
||||||
|
|
||||||
|
f16it("should search with Float16Array via raw path", async () => {
|
||||||
|
const table = await createFloat32Table();
|
||||||
|
const results = await table
|
||||||
|
.query()
|
||||||
|
.nearestTo(new float16ArrayCtor!([1.0, 0.0]) as Float32Array)
|
||||||
|
.limit(1)
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
expect(results.length).toBe(1);
|
||||||
|
expect(Number(results[0].id)).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -117,8 +117,9 @@ export type TableLike =
|
|||||||
export type IntoVector =
|
export type IntoVector =
|
||||||
| Float32Array
|
| Float32Array
|
||||||
| Float64Array
|
| Float64Array
|
||||||
|
| Uint8Array
|
||||||
| number[]
|
| number[]
|
||||||
| Promise<Float32Array | Float64Array | number[]>;
|
| Promise<Float32Array | Float64Array | Uint8Array | number[]>;
|
||||||
|
|
||||||
export type MultiVector = IntoVector[];
|
export type MultiVector = IntoVector[];
|
||||||
|
|
||||||
@@ -126,14 +127,48 @@ export function isMultiVector(value: unknown): value is MultiVector {
|
|||||||
return Array.isArray(value) && isIntoVector(value[0]);
|
return Array.isArray(value) && isIntoVector(value[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Float16Array is not in TypeScript's standard lib yet; access dynamically
|
||||||
|
type Float16ArrayCtor = new (
|
||||||
|
...args: unknown[]
|
||||||
|
) => { buffer: ArrayBuffer; byteOffset: number; byteLength: number };
|
||||||
|
const float16ArrayCtor = (globalThis as unknown as Record<string, unknown>)
|
||||||
|
.Float16Array as Float16ArrayCtor | undefined;
|
||||||
|
|
||||||
export function isIntoVector(value: unknown): value is IntoVector {
|
export function isIntoVector(value: unknown): value is IntoVector {
|
||||||
return (
|
return (
|
||||||
value instanceof Float32Array ||
|
value instanceof Float32Array ||
|
||||||
value instanceof Float64Array ||
|
value instanceof Float64Array ||
|
||||||
|
value instanceof Uint8Array ||
|
||||||
|
(float16ArrayCtor !== undefined && value instanceof float16ArrayCtor) ||
|
||||||
(Array.isArray(value) && !Array.isArray(value[0]))
|
(Array.isArray(value) && !Array.isArray(value[0]))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the underlying byte buffer and data type from a typed array
|
||||||
|
* for passing to the Rust NAPI layer without precision loss.
|
||||||
|
*/
|
||||||
|
export function extractVectorBuffer(
|
||||||
|
vector: Float32Array | Float64Array | Uint8Array,
|
||||||
|
): { data: Uint8Array; dtype: string } | null {
|
||||||
|
if (float16ArrayCtor !== undefined && vector instanceof float16ArrayCtor) {
|
||||||
|
return {
|
||||||
|
data: new Uint8Array(vector.buffer, vector.byteOffset, vector.byteLength),
|
||||||
|
dtype: "float16",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (vector instanceof Float64Array) {
|
||||||
|
return {
|
||||||
|
data: new Uint8Array(vector.buffer, vector.byteOffset, vector.byteLength),
|
||||||
|
dtype: "float64",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (vector instanceof Uint8Array && !(vector instanceof Float32Array)) {
|
||||||
|
return { data: vector, dtype: "uint8" };
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
export function isArrowTable(value: object): value is TableLike {
|
export function isArrowTable(value: object): value is TableLike {
|
||||||
if (value instanceof ArrowTable) return true;
|
if (value instanceof ArrowTable) return true;
|
||||||
return "schema" in value && "batches" in value;
|
return "schema" in value && "batches" in value;
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import {
|
|||||||
Table as ArrowTable,
|
Table as ArrowTable,
|
||||||
type IntoVector,
|
type IntoVector,
|
||||||
RecordBatch,
|
RecordBatch,
|
||||||
|
extractVectorBuffer,
|
||||||
fromBufferToRecordBatch,
|
fromBufferToRecordBatch,
|
||||||
fromRecordBatchToBuffer,
|
fromRecordBatchToBuffer,
|
||||||
tableFromIPC,
|
tableFromIPC,
|
||||||
@@ -661,10 +662,8 @@ export class VectorQuery extends StandardQueryBase<NativeVectorQuery> {
|
|||||||
const res = (async () => {
|
const res = (async () => {
|
||||||
try {
|
try {
|
||||||
const v = await vector;
|
const v = await vector;
|
||||||
const arr = Float32Array.from(v);
|
|
||||||
//
|
|
||||||
// biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping
|
// biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping
|
||||||
const value: any = this.addQueryVector(arr);
|
const value: any = this.addQueryVector(v);
|
||||||
const inner = value.inner as
|
const inner = value.inner as
|
||||||
| NativeVectorQuery
|
| NativeVectorQuery
|
||||||
| Promise<NativeVectorQuery>;
|
| Promise<NativeVectorQuery>;
|
||||||
@@ -676,7 +675,12 @@ export class VectorQuery extends StandardQueryBase<NativeVectorQuery> {
|
|||||||
return new VectorQuery(res);
|
return new VectorQuery(res);
|
||||||
} else {
|
} else {
|
||||||
super.doCall((inner) => {
|
super.doCall((inner) => {
|
||||||
inner.addQueryVector(Float32Array.from(vector));
|
const raw = Array.isArray(vector) ? null : extractVectorBuffer(vector);
|
||||||
|
if (raw) {
|
||||||
|
inner.addQueryVectorRaw(raw.data, raw.dtype);
|
||||||
|
} else {
|
||||||
|
inner.addQueryVector(Float32Array.from(vector as number[]));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@@ -765,14 +769,23 @@ export class Query extends StandardQueryBase<NativeQuery> {
|
|||||||
* a default `limit` of 10 will be used. @see {@link Query#limit}
|
* a default `limit` of 10 will be used. @see {@link Query#limit}
|
||||||
*/
|
*/
|
||||||
nearestTo(vector: IntoVector): VectorQuery {
|
nearestTo(vector: IntoVector): VectorQuery {
|
||||||
|
const callNearestTo = (
|
||||||
|
inner: NativeQuery,
|
||||||
|
resolved: Float32Array | Float64Array | Uint8Array | number[],
|
||||||
|
): NativeVectorQuery => {
|
||||||
|
const raw = Array.isArray(resolved)
|
||||||
|
? null
|
||||||
|
: extractVectorBuffer(resolved);
|
||||||
|
if (raw) {
|
||||||
|
return inner.nearestToRaw(raw.data, raw.dtype);
|
||||||
|
}
|
||||||
|
return inner.nearestTo(Float32Array.from(resolved as number[]));
|
||||||
|
};
|
||||||
|
|
||||||
if (this.inner instanceof Promise) {
|
if (this.inner instanceof Promise) {
|
||||||
const nativeQuery = this.inner.then(async (inner) => {
|
const nativeQuery = this.inner.then(async (inner) => {
|
||||||
if (vector instanceof Promise) {
|
const resolved = vector instanceof Promise ? await vector : vector;
|
||||||
const arr = await vector.then((v) => Float32Array.from(v));
|
return callNearestTo(inner, resolved);
|
||||||
return inner.nearestTo(arr);
|
|
||||||
} else {
|
|
||||||
return inner.nearestTo(Float32Array.from(vector));
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
return new VectorQuery(nativeQuery);
|
return new VectorQuery(nativeQuery);
|
||||||
}
|
}
|
||||||
@@ -780,10 +793,8 @@ export class Query extends StandardQueryBase<NativeQuery> {
|
|||||||
const res = (async () => {
|
const res = (async () => {
|
||||||
try {
|
try {
|
||||||
const v = await vector;
|
const v = await vector;
|
||||||
const arr = Float32Array.from(v);
|
|
||||||
//
|
|
||||||
// biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping
|
// biome-ignore lint/suspicious/noExplicitAny: we need to get the `inner`, but js has no package scoping
|
||||||
const value: any = this.nearestTo(arr);
|
const value: any = this.nearestTo(v);
|
||||||
const inner = value.inner as
|
const inner = value.inner as
|
||||||
| NativeVectorQuery
|
| NativeVectorQuery
|
||||||
| Promise<NativeVectorQuery>;
|
| Promise<NativeVectorQuery>;
|
||||||
@@ -794,7 +805,7 @@ export class Query extends StandardQueryBase<NativeQuery> {
|
|||||||
})();
|
})();
|
||||||
return new VectorQuery(res);
|
return new VectorQuery(res);
|
||||||
} else {
|
} else {
|
||||||
const vectorQuery = this.inner.nearestTo(Float32Array.from(vector));
|
const vectorQuery = callNearestTo(this.inner, vector);
|
||||||
return new VectorQuery(vectorQuery);
|
return new VectorQuery(vectorQuery);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-darwin-arm64",
|
"name": "@lancedb/lancedb-darwin-arm64",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": ["darwin"],
|
"os": ["darwin"],
|
||||||
"cpu": ["arm64"],
|
"cpu": ["arm64"],
|
||||||
"main": "lancedb.darwin-arm64.node",
|
"main": "lancedb.darwin-arm64.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["arm64"],
|
"cpu": ["arm64"],
|
||||||
"main": "lancedb.linux-arm64-gnu.node",
|
"main": "lancedb.linux-arm64-gnu.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["arm64"],
|
"cpu": ["arm64"],
|
||||||
"main": "lancedb.linux-arm64-musl.node",
|
"main": "lancedb.linux-arm64-musl.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["x64"],
|
"cpu": ["x64"],
|
||||||
"main": "lancedb.linux-x64-gnu.node",
|
"main": "lancedb.linux-x64-gnu.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["x64"],
|
"cpu": ["x64"],
|
||||||
"main": "lancedb.linux-x64-musl.node",
|
"main": "lancedb.linux-x64-musl.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": [
|
"os": [
|
||||||
"win32"
|
"win32"
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"os": ["win32"],
|
"os": ["win32"],
|
||||||
"cpu": ["x64"],
|
"cpu": ["x64"],
|
||||||
"main": "lancedb.win32-x64-msvc.node",
|
"main": "lancedb.win32-x64-msvc.node",
|
||||||
|
|||||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb",
|
"name": "@lancedb/lancedb",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "@lancedb/lancedb",
|
"name": "@lancedb/lancedb",
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"cpu": [
|
"cpu": [
|
||||||
"x64",
|
"x64",
|
||||||
"arm64"
|
"arm64"
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
"ann"
|
"ann"
|
||||||
],
|
],
|
||||||
"private": false,
|
"private": false,
|
||||||
"version": "0.27.1",
|
"version": "0.27.2-beta.1",
|
||||||
"main": "dist/index.js",
|
"main": "dist/index.js",
|
||||||
"exports": {
|
"exports": {
|
||||||
".": "./dist/index.js",
|
".": "./dist/index.js",
|
||||||
|
|||||||
@@ -3,6 +3,12 @@
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrow_array::{
|
||||||
|
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
|
||||||
|
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
|
||||||
|
};
|
||||||
|
use arrow_buffer::ScalarBuffer;
|
||||||
|
use half::f16;
|
||||||
use lancedb::index::scalar::{
|
use lancedb::index::scalar::{
|
||||||
BooleanQuery, BoostQuery, FtsQuery, FullTextSearchQuery, MatchQuery, MultiMatchQuery, Occur,
|
BooleanQuery, BoostQuery, FtsQuery, FullTextSearchQuery, MatchQuery, MultiMatchQuery, Occur,
|
||||||
Operator, PhraseQuery,
|
Operator, PhraseQuery,
|
||||||
@@ -24,6 +30,33 @@ use crate::rerankers::RerankHybridCallbackArgs;
|
|||||||
use crate::rerankers::Reranker;
|
use crate::rerankers::Reranker;
|
||||||
use crate::util::{parse_distance_type, schema_to_buffer};
|
use crate::util::{parse_distance_type, schema_to_buffer};
|
||||||
|
|
||||||
|
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
|
||||||
|
let buf = arrow_buffer::Buffer::from(data.to_vec());
|
||||||
|
let num_bytes = buf.len();
|
||||||
|
match dtype.as_str() {
|
||||||
|
"float16" => {
|
||||||
|
let scalar_buf = ScalarBuffer::<f16>::new(buf, 0, num_bytes / 2);
|
||||||
|
Ok(Arc::new(ArrowFloat16Array::new(scalar_buf, None)))
|
||||||
|
}
|
||||||
|
"float32" => {
|
||||||
|
let scalar_buf = ScalarBuffer::<f32>::new(buf, 0, num_bytes / 4);
|
||||||
|
Ok(Arc::new(ArrowFloat32Array::new(scalar_buf, None)))
|
||||||
|
}
|
||||||
|
"float64" => {
|
||||||
|
let scalar_buf = ScalarBuffer::<f64>::new(buf, 0, num_bytes / 8);
|
||||||
|
Ok(Arc::new(ArrowFloat64Array::new(scalar_buf, None)))
|
||||||
|
}
|
||||||
|
"uint8" => {
|
||||||
|
let scalar_buf = ScalarBuffer::<u8>::new(buf, 0, num_bytes);
|
||||||
|
Ok(Arc::new(ArrowUInt8Array::new(scalar_buf, None)))
|
||||||
|
}
|
||||||
|
_ => Err(napi::Error::from_reason(format!(
|
||||||
|
"Unsupported vector dtype: {}. Expected one of: float16, float32, float64, uint8",
|
||||||
|
dtype
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
pub struct Query {
|
pub struct Query {
|
||||||
inner: LanceDbQuery,
|
inner: LanceDbQuery,
|
||||||
@@ -78,6 +111,13 @@ impl Query {
|
|||||||
Ok(VectorQuery { inner })
|
Ok(VectorQuery { inner })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[napi]
|
||||||
|
pub fn nearest_to_raw(&mut self, data: Uint8Array, dtype: String) -> Result<VectorQuery> {
|
||||||
|
let array = bytes_to_arrow_array(data, dtype)?;
|
||||||
|
let inner = self.inner.clone().nearest_to(array).default_error()?;
|
||||||
|
Ok(VectorQuery { inner })
|
||||||
|
}
|
||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
pub fn fast_search(&mut self) {
|
pub fn fast_search(&mut self) {
|
||||||
self.inner = self.inner.clone().fast_search();
|
self.inner = self.inner.clone().fast_search();
|
||||||
@@ -163,6 +203,13 @@ impl VectorQuery {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[napi]
|
||||||
|
pub fn add_query_vector_raw(&mut self, data: Uint8Array, dtype: String) -> Result<()> {
|
||||||
|
let array = bytes_to_arrow_array(data, dtype)?;
|
||||||
|
self.inner = self.inner.clone().add_query_vector(array).default_error()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
pub fn distance_type(&mut self, distance_type: String) -> napi::Result<()> {
|
pub fn distance_type(&mut self, distance_type: String) -> napi::Result<()> {
|
||||||
let distance_type = parse_distance_type(distance_type)?;
|
let distance_type = parse_distance_type(distance_type)?;
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.30.2-beta.0"
|
current_version = "0.30.2-beta.1"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb-python"
|
name = "lancedb-python"
|
||||||
version = "0.30.2-beta.0"
|
version = "0.30.2-beta.1"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
description = "Python bindings for LanceDB"
|
description = "Python bindings for LanceDB"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import sys
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import urllib.error
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
import weakref
|
import weakref
|
||||||
import logging
|
import logging
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ def ensure_vector_query(
|
|||||||
) -> Union[List[float], List[List[float]], pa.Array, List[pa.Array]]:
|
) -> Union[List[float], List[List[float]], pa.Array, List[pa.Array]]:
|
||||||
if isinstance(val, list):
|
if isinstance(val, list):
|
||||||
if len(val) == 0:
|
if len(val) == 0:
|
||||||
return ValueError("Vector query must be a non-empty list")
|
raise ValueError("Vector query must be a non-empty list")
|
||||||
sample = val[0]
|
sample = val[0]
|
||||||
else:
|
else:
|
||||||
if isinstance(val, float):
|
if isinstance(val, float):
|
||||||
@@ -83,7 +83,7 @@ def ensure_vector_query(
|
|||||||
return val
|
return val
|
||||||
if isinstance(sample, list):
|
if isinstance(sample, list):
|
||||||
if len(sample) == 0:
|
if len(sample) == 0:
|
||||||
return ValueError("Vector query must be a non-empty list")
|
raise ValueError("Vector query must be a non-empty list")
|
||||||
if isinstance(sample[0], float):
|
if isinstance(sample[0], float):
|
||||||
# val is list of list of floats
|
# val is list of list of floats
|
||||||
return val
|
return val
|
||||||
|
|||||||
@@ -278,7 +278,7 @@ def _sanitize_data(
|
|||||||
|
|
||||||
if metadata:
|
if metadata:
|
||||||
new_metadata = target_schema.metadata or {}
|
new_metadata = target_schema.metadata or {}
|
||||||
new_metadata = new_metadata.update(metadata)
|
new_metadata.update(metadata)
|
||||||
target_schema = target_schema.with_metadata(new_metadata)
|
target_schema = target_schema.with_metadata(new_metadata)
|
||||||
|
|
||||||
_validate_schema(target_schema)
|
_validate_schema(target_schema)
|
||||||
@@ -3857,7 +3857,13 @@ class AsyncTable:
|
|||||||
|
|
||||||
# _santitize_data is an old code path, but we will use it until the
|
# _santitize_data is an old code path, but we will use it until the
|
||||||
# new code path is ready.
|
# new code path is ready.
|
||||||
if on_bad_vectors != "error" or (
|
if mode == "overwrite":
|
||||||
|
# For overwrite, apply the same preprocessing as create_table
|
||||||
|
# so vector columns are inferred as FixedSizeList.
|
||||||
|
data, _ = sanitize_create_table(
|
||||||
|
data, None, on_bad_vectors=on_bad_vectors, fill_value=fill_value
|
||||||
|
)
|
||||||
|
elif on_bad_vectors != "error" or (
|
||||||
schema.metadata is not None and b"embedding_functions" in schema.metadata
|
schema.metadata is not None and b"embedding_functions" in schema.metadata
|
||||||
):
|
):
|
||||||
data = _sanitize_data(
|
data = _sanitize_data(
|
||||||
|
|||||||
@@ -546,3 +546,23 @@ def test_openai_no_retry_on_401(mock_sleep):
|
|||||||
assert mock_func.call_count == 1
|
assert mock_func.call_count == 1
|
||||||
# Verify that sleep was never called (no retries)
|
# Verify that sleep was never called (no retries)
|
||||||
assert mock_sleep.call_count == 0
|
assert mock_sleep.call_count == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_url_retrieve_downloads_image():
|
||||||
|
"""
|
||||||
|
Embedding functions like open-clip, siglip, and jinaai use url_retrieve()
|
||||||
|
to download images from HTTP URLs. For example, open_clip._to_pil() calls:
|
||||||
|
|
||||||
|
PIL_Image.open(io.BytesIO(url_retrieve(image)))
|
||||||
|
|
||||||
|
Verify that url_retrieve() can download an image and open it as PIL Image,
|
||||||
|
matching the real usage pattern in embedding functions.
|
||||||
|
"""
|
||||||
|
import io
|
||||||
|
from PIL import Image
|
||||||
|
from lancedb.embeddings.utils import url_retrieve
|
||||||
|
|
||||||
|
image_url = "http://farm1.staticflickr.com/53/167798175_7c7845bbbd_z.jpg"
|
||||||
|
image_bytes = url_retrieve(image_url)
|
||||||
|
img = Image.open(io.BytesIO(image_bytes))
|
||||||
|
assert img.size[0] > 0 and img.size[1] > 0
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import shutil
|
|||||||
import pytest
|
import pytest
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
import lancedb
|
import lancedb
|
||||||
|
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
|
||||||
|
|
||||||
|
|
||||||
class TestNamespaceConnection:
|
class TestNamespaceConnection:
|
||||||
@@ -130,7 +131,7 @@ class TestNamespaceConnection:
|
|||||||
assert len(list(db.table_names(namespace=["test_ns"]))) == 0
|
assert len(list(db.table_names(namespace=["test_ns"]))) == 0
|
||||||
|
|
||||||
# Should not be able to open dropped table
|
# Should not be able to open dropped table
|
||||||
with pytest.raises(RuntimeError):
|
with pytest.raises(TableNotFoundError):
|
||||||
db.open_table("table1", namespace=["test_ns"])
|
db.open_table("table1", namespace=["test_ns"])
|
||||||
|
|
||||||
def test_create_table_with_schema(self):
|
def test_create_table_with_schema(self):
|
||||||
@@ -340,7 +341,7 @@ class TestNamespaceConnection:
|
|||||||
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
|
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
|
||||||
|
|
||||||
# Try to drop namespace with tables - should fail
|
# Try to drop namespace with tables - should fail
|
||||||
with pytest.raises(RuntimeError, match="is not empty"):
|
with pytest.raises(NamespaceNotEmptyError):
|
||||||
db.drop_namespace(["test_namespace"])
|
db.drop_namespace(["test_namespace"])
|
||||||
|
|
||||||
# Drop table first
|
# Drop table first
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ from lancedb.query import (
|
|||||||
PhraseQuery,
|
PhraseQuery,
|
||||||
Query,
|
Query,
|
||||||
FullTextSearchQuery,
|
FullTextSearchQuery,
|
||||||
|
ensure_vector_query,
|
||||||
)
|
)
|
||||||
from lancedb.rerankers.cross_encoder import CrossEncoderReranker
|
from lancedb.rerankers.cross_encoder import CrossEncoderReranker
|
||||||
from lancedb.table import AsyncTable, LanceTable
|
from lancedb.table import AsyncTable, LanceTable
|
||||||
@@ -1501,6 +1502,18 @@ def test_search_empty_table(mem_db):
|
|||||||
assert results == []
|
assert results == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_vector_query_empty_list():
|
||||||
|
"""Regression: ensure_vector_query used to return instead of raise ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="non-empty"):
|
||||||
|
ensure_vector_query([])
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_vector_query_nested_empty_list():
|
||||||
|
"""Regression: ensure_vector_query used to return instead of raise ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="non-empty"):
|
||||||
|
ensure_vector_query([[]])
|
||||||
|
|
||||||
|
|
||||||
def test_fast_search(tmp_path):
|
def test_fast_search(tmp_path):
|
||||||
db = lancedb.connect(tmp_path)
|
db = lancedb.connect(tmp_path)
|
||||||
|
|
||||||
|
|||||||
@@ -527,6 +527,36 @@ async def test_add_async(mem_db_async: AsyncConnection):
|
|||||||
assert await table.count_rows() == 3
|
assert await table.count_rows() == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_overwrite_infers_vector_schema(mem_db: DBConnection):
|
||||||
|
"""Overwrite should infer vector columns the same way create_table does.
|
||||||
|
|
||||||
|
Regression test for https://github.com/lancedb/lancedb/issues/3183
|
||||||
|
"""
|
||||||
|
table = mem_db.create_table(
|
||||||
|
"test_overwrite_vec",
|
||||||
|
data=[
|
||||||
|
{"vector": [1.0, 2.0, 3.0, 4.0], "item": "foo"},
|
||||||
|
{"vector": [5.0, 6.0, 7.0, 8.0], "item": "bar"},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# create_table infers vector as fixed_size_list<float32, 4>
|
||||||
|
original_type = table.schema.field("vector").type
|
||||||
|
assert pa.types.is_fixed_size_list(original_type)
|
||||||
|
|
||||||
|
# overwrite with plain Python lists (PyArrow infers list<double>)
|
||||||
|
table.add(
|
||||||
|
[
|
||||||
|
{"vector": [10.0, 20.0, 30.0, 40.0], "item": "baz"},
|
||||||
|
],
|
||||||
|
mode="overwrite",
|
||||||
|
)
|
||||||
|
# overwrite should infer vector column the same way as create_table
|
||||||
|
new_type = table.schema.field("vector").type
|
||||||
|
assert pa.types.is_fixed_size_list(new_type), (
|
||||||
|
f"Expected fixed_size_list after overwrite, got {new_type}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_add_progress_callback(mem_db: DBConnection):
|
def test_add_progress_callback(mem_db: DBConnection):
|
||||||
table = mem_db.create_table(
|
table = mem_db.create_table(
|
||||||
"test",
|
"test",
|
||||||
@@ -2143,3 +2173,33 @@ def test_table_uri(tmp_path):
|
|||||||
db = lancedb.connect(tmp_path)
|
db = lancedb.connect(tmp_path)
|
||||||
table = db.create_table("my_table", data=[{"x": 0}])
|
table = db.create_table("my_table", data=[{"x": 0}])
|
||||||
assert table.uri == str(tmp_path / "my_table.lance")
|
assert table.uri == str(tmp_path / "my_table.lance")
|
||||||
|
|
||||||
|
|
||||||
|
def test_sanitize_data_metadata_not_stripped():
|
||||||
|
"""Regression test: dict.update() returns None, so assigning its result
|
||||||
|
would silently replace metadata with None, causing with_metadata(None)
|
||||||
|
to strip all schema metadata from the target schema."""
|
||||||
|
from lancedb.table import _sanitize_data
|
||||||
|
|
||||||
|
schema = pa.schema(
|
||||||
|
[pa.field("x", pa.int64())],
|
||||||
|
metadata={b"existing_key": b"existing_value"},
|
||||||
|
)
|
||||||
|
batch = pa.record_batch([pa.array([1, 2, 3])], schema=schema)
|
||||||
|
|
||||||
|
# Use a different field type so the reader and target schemas differ,
|
||||||
|
# forcing _cast_to_target_schema to rebuild the schema with the
|
||||||
|
# target's metadata (instead of taking the fast-path).
|
||||||
|
target_schema = pa.schema(
|
||||||
|
[pa.field("x", pa.int32())],
|
||||||
|
metadata={b"existing_key": b"existing_value"},
|
||||||
|
)
|
||||||
|
|
||||||
|
reader = pa.RecordBatchReader.from_batches(schema, [batch])
|
||||||
|
metadata = {b"new_key": b"new_value"}
|
||||||
|
result = _sanitize_data(reader, target_schema=target_schema, metadata=metadata)
|
||||||
|
|
||||||
|
result_schema = result.schema
|
||||||
|
assert result_schema.metadata is not None
|
||||||
|
assert result_schema.metadata[b"existing_key"] == b"existing_value"
|
||||||
|
assert result_schema.metadata[b"new_key"] == b"new_value"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb"
|
name = "lancedb"
|
||||||
version = "0.27.1"
|
version = "0.27.2-beta.1"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
|
|||||||
@@ -240,7 +240,7 @@ impl Shuffler {
|
|||||||
.await?;
|
.await?;
|
||||||
// Need to read the entire file in a single batch for in-memory shuffling
|
// Need to read the entire file in a single batch for in-memory shuffling
|
||||||
let batch = reader.read_record_batch(0, reader.num_rows()).await?;
|
let batch = reader.read_record_batch(0, reader.num_rows()).await?;
|
||||||
let mut rng = rng.lock().unwrap();
|
let mut rng = rng.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
Self::shuffle_batch(&batch, &mut rng, clump_size)
|
Self::shuffle_batch(&batch, &mut rng, clump_size)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -66,13 +66,13 @@ impl IoTrackingStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn record_read(&self, num_bytes: u64) {
|
fn record_read(&self, num_bytes: u64) {
|
||||||
let mut stats = self.stats.lock().unwrap();
|
let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
stats.read_iops += 1;
|
stats.read_iops += 1;
|
||||||
stats.read_bytes += num_bytes;
|
stats.read_bytes += num_bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn record_write(&self, num_bytes: u64) {
|
fn record_write(&self, num_bytes: u64) {
|
||||||
let mut stats = self.stats.lock().unwrap();
|
let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
stats.write_iops += 1;
|
stats.write_iops += 1;
|
||||||
stats.write_bytes += num_bytes;
|
stats.write_bytes += num_bytes;
|
||||||
}
|
}
|
||||||
@@ -229,10 +229,63 @@ impl MultipartUpload for IoTrackingMultipartUpload {
|
|||||||
|
|
||||||
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
|
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
|
||||||
{
|
{
|
||||||
let mut stats = self.stats.lock().unwrap();
|
let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
stats.write_iops += 1;
|
stats.write_iops += 1;
|
||||||
stats.write_bytes += payload.content_length() as u64;
|
stats.write_bytes += payload.content_length() as u64;
|
||||||
}
|
}
|
||||||
self.target.put_part(payload)
|
self.target.put_part(payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// Helper: poison a Mutex<IoStats> by panicking while holding the lock.
|
||||||
|
fn poison_stats(stats: &Arc<Mutex<IoStats>>) {
|
||||||
|
let stats_clone = stats.clone();
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
let _guard = stats_clone.lock().unwrap();
|
||||||
|
panic!("intentional panic to poison stats mutex");
|
||||||
|
});
|
||||||
|
let _ = handle.join();
|
||||||
|
assert!(stats.lock().is_err(), "mutex should be poisoned");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_record_read_recovers_from_poisoned_lock() {
|
||||||
|
let stats = Arc::new(Mutex::new(IoStats::default()));
|
||||||
|
let store = IoTrackingStore {
|
||||||
|
target: Arc::new(object_store::memory::InMemory::new()),
|
||||||
|
stats: stats.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
poison_stats(&stats);
|
||||||
|
|
||||||
|
// record_read should not panic
|
||||||
|
store.record_read(1024);
|
||||||
|
|
||||||
|
// Verify the stats were updated despite poisoning
|
||||||
|
let s = stats.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
assert_eq!(s.read_iops, 1);
|
||||||
|
assert_eq!(s.read_bytes, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_record_write_recovers_from_poisoned_lock() {
|
||||||
|
let stats = Arc::new(Mutex::new(IoStats::default()));
|
||||||
|
let store = IoTrackingStore {
|
||||||
|
target: Arc::new(object_store::memory::InMemory::new()),
|
||||||
|
stats: stats.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
poison_stats(&stats);
|
||||||
|
|
||||||
|
// record_write should not panic
|
||||||
|
store.record_write(2048);
|
||||||
|
|
||||||
|
let s = stats.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
assert_eq!(s.write_iops, 1);
|
||||||
|
assert_eq!(s.write_bytes, 2048);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -130,7 +130,10 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
|
|||||||
// TODO: this will be used when we wire this up to Table::add().
|
// TODO: this will be used when we wire this up to Table::add().
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn add_result(&self) -> Option<AddResult> {
|
pub fn add_result(&self) -> Option<AddResult> {
|
||||||
self.add_result.lock().unwrap().clone()
|
self.add_result
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stream the input into an HTTP body as an Arrow IPC stream, capturing any
|
/// Stream the input into an HTTP body as an Arrow IPC stream, capturing any
|
||||||
|
|||||||
@@ -204,7 +204,9 @@ impl ExecutionPlan for InsertExec {
|
|||||||
|
|
||||||
let to_commit = {
|
let to_commit = {
|
||||||
// Don't hold the lock over an await point.
|
// Don't hold the lock over an await point.
|
||||||
let mut txns = partial_transactions.lock().unwrap();
|
let mut txns = partial_transactions
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
txns.push(transaction);
|
txns.push(transaction);
|
||||||
if txns.len() == total_partitions {
|
if txns.len() == total_partitions {
|
||||||
Some(std::mem::take(&mut *txns))
|
Some(std::mem::take(&mut *txns))
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
/// pinned dataset regardless of consistency mode.
|
/// pinned dataset regardless of consistency mode.
|
||||||
pub async fn get(&self) -> Result<Arc<Dataset>> {
|
pub async fn get(&self) -> Result<Arc<Dataset>> {
|
||||||
{
|
{
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock()?;
|
||||||
if state.pinned_version.is_some() {
|
if state.pinned_version.is_some() {
|
||||||
return Ok(state.dataset.clone());
|
return Ok(state.dataset.clone());
|
||||||
}
|
}
|
||||||
@@ -101,7 +101,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
}
|
}
|
||||||
ConsistencyMode::Strong => refresh_latest(self.state.clone()).await,
|
ConsistencyMode::Strong => refresh_latest(self.state.clone()).await,
|
||||||
ConsistencyMode::Lazy => {
|
ConsistencyMode::Lazy => {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock()?;
|
||||||
Ok(state.dataset.clone())
|
Ok(state.dataset.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -116,7 +116,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
/// concurrent [`as_time_travel`](Self::as_time_travel) call), the update
|
/// concurrent [`as_time_travel`](Self::as_time_travel) call), the update
|
||||||
/// is silently ignored — the write already committed to storage.
|
/// is silently ignored — the write already committed to storage.
|
||||||
pub fn update(&self, dataset: Dataset) {
|
pub fn update(&self, dataset: Dataset) {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
if state.pinned_version.is_some() {
|
if state.pinned_version.is_some() {
|
||||||
// A concurrent as_time_travel() beat us here. The write succeeded
|
// A concurrent as_time_travel() beat us here. The write succeeded
|
||||||
// in storage, but since we're now pinned we don't advance the
|
// in storage, but since we're now pinned we don't advance the
|
||||||
@@ -139,7 +139,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
|
|
||||||
/// Check that the dataset is in a mutable mode (Latest).
|
/// Check that the dataset is in a mutable mode (Latest).
|
||||||
pub fn ensure_mutable(&self) -> Result<()> {
|
pub fn ensure_mutable(&self) -> Result<()> {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock()?;
|
||||||
if state.pinned_version.is_some() {
|
if state.pinned_version.is_some() {
|
||||||
Err(crate::Error::InvalidInput {
|
Err(crate::Error::InvalidInput {
|
||||||
message: "table cannot be modified when a specific version is checked out"
|
message: "table cannot be modified when a specific version is checked out"
|
||||||
@@ -152,13 +152,16 @@ impl DatasetConsistencyWrapper {
|
|||||||
|
|
||||||
/// Returns the version, if in time travel mode, or None otherwise.
|
/// Returns the version, if in time travel mode, or None otherwise.
|
||||||
pub fn time_travel_version(&self) -> Option<u64> {
|
pub fn time_travel_version(&self) -> Option<u64> {
|
||||||
self.state.lock().unwrap().pinned_version
|
self.state
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.pinned_version
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convert into a wrapper in latest version mode.
|
/// Convert into a wrapper in latest version mode.
|
||||||
pub async fn as_latest(&self) -> Result<()> {
|
pub async fn as_latest(&self) -> Result<()> {
|
||||||
let dataset = {
|
let dataset = {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock()?;
|
||||||
if state.pinned_version.is_none() {
|
if state.pinned_version.is_none() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@@ -168,7 +171,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
let latest_version = dataset.latest_version_id().await?;
|
let latest_version = dataset.latest_version_id().await?;
|
||||||
let new_dataset = dataset.checkout_version(latest_version).await?;
|
let new_dataset = dataset.checkout_version(latest_version).await?;
|
||||||
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock()?;
|
||||||
if state.pinned_version.is_some() {
|
if state.pinned_version.is_some() {
|
||||||
state.dataset = Arc::new(new_dataset);
|
state.dataset = Arc::new(new_dataset);
|
||||||
state.pinned_version = None;
|
state.pinned_version = None;
|
||||||
@@ -184,7 +187,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
let target_ref = target_version.into();
|
let target_ref = target_version.into();
|
||||||
|
|
||||||
let (should_checkout, dataset) = {
|
let (should_checkout, dataset) = {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock()?;
|
||||||
let should = match state.pinned_version {
|
let should = match state.pinned_version {
|
||||||
None => true,
|
None => true,
|
||||||
Some(version) => match &target_ref {
|
Some(version) => match &target_ref {
|
||||||
@@ -204,7 +207,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
let new_dataset = dataset.checkout_version(target_ref).await?;
|
let new_dataset = dataset.checkout_version(target_ref).await?;
|
||||||
let version_value = new_dataset.version().version;
|
let version_value = new_dataset.version().version;
|
||||||
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock()?;
|
||||||
state.dataset = Arc::new(new_dataset);
|
state.dataset = Arc::new(new_dataset);
|
||||||
state.pinned_version = Some(version_value);
|
state.pinned_version = Some(version_value);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -212,7 +215,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
|
|
||||||
pub async fn reload(&self) -> Result<()> {
|
pub async fn reload(&self) -> Result<()> {
|
||||||
let (dataset, pinned_version) = {
|
let (dataset, pinned_version) = {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock()?;
|
||||||
(state.dataset.clone(), state.pinned_version)
|
(state.dataset.clone(), state.pinned_version)
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -230,7 +233,7 @@ impl DatasetConsistencyWrapper {
|
|||||||
|
|
||||||
let new_dataset = dataset.checkout_version(version).await?;
|
let new_dataset = dataset.checkout_version(version).await?;
|
||||||
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock()?;
|
||||||
if state.pinned_version == Some(version) {
|
if state.pinned_version == Some(version) {
|
||||||
state.dataset = Arc::new(new_dataset);
|
state.dataset = Arc::new(new_dataset);
|
||||||
}
|
}
|
||||||
@@ -242,14 +245,14 @@ impl DatasetConsistencyWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn refresh_latest(state: Arc<Mutex<DatasetState>>) -> Result<Arc<Dataset>> {
|
async fn refresh_latest(state: Arc<Mutex<DatasetState>>) -> Result<Arc<Dataset>> {
|
||||||
let dataset = { state.lock().unwrap().dataset.clone() };
|
let dataset = { state.lock()?.dataset.clone() };
|
||||||
|
|
||||||
let mut ds = (*dataset).clone();
|
let mut ds = (*dataset).clone();
|
||||||
ds.checkout_latest().await?;
|
ds.checkout_latest().await?;
|
||||||
let new_arc = Arc::new(ds);
|
let new_arc = Arc::new(ds);
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut state = state.lock().unwrap();
|
let mut state = state.lock()?;
|
||||||
if state.pinned_version.is_none()
|
if state.pinned_version.is_none()
|
||||||
&& new_arc.manifest().version >= state.dataset.manifest().version
|
&& new_arc.manifest().version >= state.dataset.manifest().version
|
||||||
{
|
{
|
||||||
@@ -612,4 +615,108 @@ mod tests {
|
|||||||
let s = io_stats.incremental_stats();
|
let s = io_stats.incremental_stats();
|
||||||
assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed());
|
assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Helper: poison the mutex inside a DatasetConsistencyWrapper.
|
||||||
|
fn poison_state(wrapper: &DatasetConsistencyWrapper) {
|
||||||
|
let state = wrapper.state.clone();
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
let _guard = state.lock().unwrap();
|
||||||
|
panic!("intentional panic to poison mutex");
|
||||||
|
});
|
||||||
|
let _ = handle.join(); // join collects the panic
|
||||||
|
assert!(wrapper.state.lock().is_err(), "mutex should be poisoned");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_get_returns_error_on_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
// get() should return Err, not panic
|
||||||
|
let result = wrapper.get().await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_ensure_mutable_returns_error_on_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
let result = wrapper.ensure_mutable();
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_update_recovers_from_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
let ds_v2 = append_to_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
// update() returns (), should not panic
|
||||||
|
wrapper.update(ds_v2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_time_travel_version_recovers_from_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
// Should not panic, returns whatever was in the mutex
|
||||||
|
let _version = wrapper.time_travel_version();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_as_latest_returns_error_on_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
let result = wrapper.as_latest().await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_as_time_travel_returns_error_on_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
let result = wrapper.as_time_travel(1u64).await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_reload_returns_error_on_poisoned_lock() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let ds = create_test_dataset(uri).await;
|
||||||
|
|
||||||
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
poison_state(&wrapper);
|
||||||
|
|
||||||
|
let result = wrapper.reload().await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -130,8 +130,11 @@ impl WriteProgressTracker {
|
|||||||
pub fn record_batch(&self, rows: usize, bytes: usize) {
|
pub fn record_batch(&self, rows: usize, bytes: usize) {
|
||||||
// Lock order: callback first, then rows_and_bytes. This is the only
|
// Lock order: callback first, then rows_and_bytes. This is the only
|
||||||
// order used anywhere, so deadlocks cannot occur.
|
// order used anywhere, so deadlocks cannot occur.
|
||||||
let mut cb = self.callback.lock().unwrap();
|
let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
let mut guard = self.rows_and_bytes.lock().unwrap();
|
let mut guard = self
|
||||||
|
.rows_and_bytes
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
guard.0 += rows;
|
guard.0 += rows;
|
||||||
guard.1 += bytes;
|
guard.1 += bytes;
|
||||||
let progress = self.snapshot(guard.0, guard.1, false);
|
let progress = self.snapshot(guard.0, guard.1, false);
|
||||||
@@ -151,8 +154,11 @@ impl WriteProgressTracker {
|
|||||||
/// `total_rows` is always `Some` on the final callback: it uses the known
|
/// `total_rows` is always `Some` on the final callback: it uses the known
|
||||||
/// total if available, or falls back to the number of rows actually written.
|
/// total if available, or falls back to the number of rows actually written.
|
||||||
pub fn finish(&self) {
|
pub fn finish(&self) {
|
||||||
let mut cb = self.callback.lock().unwrap();
|
let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
let guard = self.rows_and_bytes.lock().unwrap();
|
let guard = self
|
||||||
|
.rows_and_bytes
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
let mut snap = self.snapshot(guard.0, guard.1, true);
|
let mut snap = self.snapshot(guard.0, guard.1, true);
|
||||||
snap.total_rows = Some(self.total_rows.unwrap_or(guard.0));
|
snap.total_rows = Some(self.total_rows.unwrap_or(guard.0));
|
||||||
drop(guard);
|
drop(guard);
|
||||||
@@ -376,4 +382,50 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_record_batch_recovers_from_poisoned_callback_lock() {
|
||||||
|
use super::{ProgressCallback, WriteProgressTracker};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
let callback: ProgressCallback = Arc::new(Mutex::new(|_: &super::WriteProgress| {}));
|
||||||
|
|
||||||
|
// Poison the callback mutex
|
||||||
|
let cb_clone = callback.clone();
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
let _guard = cb_clone.lock().unwrap();
|
||||||
|
panic!("intentional panic to poison callback mutex");
|
||||||
|
});
|
||||||
|
let _ = handle.join();
|
||||||
|
assert!(
|
||||||
|
callback.lock().is_err(),
|
||||||
|
"callback mutex should be poisoned"
|
||||||
|
);
|
||||||
|
|
||||||
|
let tracker = WriteProgressTracker::new(callback, Some(100));
|
||||||
|
|
||||||
|
// record_batch should not panic
|
||||||
|
tracker.record_batch(10, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_finish_recovers_from_poisoned_callback_lock() {
|
||||||
|
use super::{ProgressCallback, WriteProgressTracker};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
let callback: ProgressCallback = Arc::new(Mutex::new(|_: &super::WriteProgress| {}));
|
||||||
|
|
||||||
|
// Poison the callback mutex
|
||||||
|
let cb_clone = callback.clone();
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
let _guard = cb_clone.lock().unwrap();
|
||||||
|
panic!("intentional panic to poison callback mutex");
|
||||||
|
});
|
||||||
|
let _ = handle.join();
|
||||||
|
|
||||||
|
let tracker = WriteProgressTracker::new(callback, Some(100));
|
||||||
|
|
||||||
|
// finish should not panic
|
||||||
|
tracker.finish();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ where
|
|||||||
/// This is a cheap synchronous check useful as a fast path before
|
/// This is a cheap synchronous check useful as a fast path before
|
||||||
/// constructing a fetch closure for [`get()`](Self::get).
|
/// constructing a fetch closure for [`get()`](Self::get).
|
||||||
pub fn try_get(&self) -> Option<V> {
|
pub fn try_get(&self) -> Option<V> {
|
||||||
let cache = self.inner.lock().unwrap();
|
let cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
cache.state.fresh_value(self.ttl, self.refresh_window)
|
cache.state.fresh_value(self.ttl, self.refresh_window)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,7 +138,7 @@ where
|
|||||||
{
|
{
|
||||||
// Fast path: check if cache is fresh
|
// Fast path: check if cache is fresh
|
||||||
{
|
{
|
||||||
let cache = self.inner.lock().unwrap();
|
let cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
if let Some(value) = cache.state.fresh_value(self.ttl, self.refresh_window) {
|
if let Some(value) = cache.state.fresh_value(self.ttl, self.refresh_window) {
|
||||||
return Ok(value);
|
return Ok(value);
|
||||||
}
|
}
|
||||||
@@ -147,7 +147,7 @@ where
|
|||||||
// Slow path
|
// Slow path
|
||||||
let mut fetch = Some(fetch);
|
let mut fetch = Some(fetch);
|
||||||
let action = {
|
let action = {
|
||||||
let mut cache = self.inner.lock().unwrap();
|
let mut cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
self.determine_action(&mut cache, &mut fetch)
|
self.determine_action(&mut cache, &mut fetch)
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -161,7 +161,7 @@ where
|
|||||||
///
|
///
|
||||||
/// This avoids a blocking fetch on the first [`get()`](Self::get) call.
|
/// This avoids a blocking fetch on the first [`get()`](Self::get) call.
|
||||||
pub fn seed(&self, value: V) {
|
pub fn seed(&self, value: V) {
|
||||||
let mut cache = self.inner.lock().unwrap();
|
let mut cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
cache.state = State::Current(value, clock::now());
|
cache.state = State::Current(value, clock::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,7 +170,7 @@ where
|
|||||||
/// Any in-flight background fetch from before this call will not update the
|
/// Any in-flight background fetch from before this call will not update the
|
||||||
/// cache (the generation counter prevents stale writes).
|
/// cache (the generation counter prevents stale writes).
|
||||||
pub fn invalidate(&self) {
|
pub fn invalidate(&self) {
|
||||||
let mut cache = self.inner.lock().unwrap();
|
let mut cache = self.inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
cache.state = State::Empty;
|
cache.state = State::Empty;
|
||||||
cache.generation += 1;
|
cache.generation += 1;
|
||||||
}
|
}
|
||||||
@@ -267,7 +267,7 @@ where
|
|||||||
let fut_for_spawn = shared.clone();
|
let fut_for_spawn = shared.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let result = fut_for_spawn.await;
|
let result = fut_for_spawn.await;
|
||||||
let mut cache = inner.lock().unwrap();
|
let mut cache = inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
// Only update if no invalidation has happened since we started
|
// Only update if no invalidation has happened since we started
|
||||||
if cache.generation != generation {
|
if cache.generation != generation {
|
||||||
return;
|
return;
|
||||||
@@ -590,4 +590,67 @@ mod tests {
|
|||||||
let v = cache.get(ok_fetcher(count.clone(), "fresh")).await.unwrap();
|
let v = cache.get(ok_fetcher(count.clone(), "fresh")).await.unwrap();
|
||||||
assert_eq!(v, "fresh");
|
assert_eq!(v, "fresh");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Helper: poison the inner mutex of a BackgroundCache.
|
||||||
|
fn poison_cache(cache: &BackgroundCache<String, TestError>) {
|
||||||
|
let inner = cache.inner.clone();
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
let _guard = inner.lock().unwrap();
|
||||||
|
panic!("intentional panic to poison mutex");
|
||||||
|
});
|
||||||
|
let _ = handle.join();
|
||||||
|
assert!(cache.inner.lock().is_err(), "mutex should be poisoned");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_try_get_recovers_from_poisoned_lock() {
|
||||||
|
let cache = new_cache();
|
||||||
|
let count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
// Seed a value first
|
||||||
|
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
|
||||||
|
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap(); // peek
|
||||||
|
|
||||||
|
poison_cache(&cache);
|
||||||
|
|
||||||
|
// try_get() should not panic — it recovers via unwrap_or_else
|
||||||
|
let result = cache.try_get();
|
||||||
|
// The value may or may not be fresh depending on timing, but it must not panic
|
||||||
|
let _ = result;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_get_recovers_from_poisoned_lock() {
|
||||||
|
let cache = new_cache();
|
||||||
|
let count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
poison_cache(&cache);
|
||||||
|
|
||||||
|
// get() should not panic — it recovers and can still fetch
|
||||||
|
let result = cache.get(ok_fetcher(count.clone(), "recovered")).await;
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(result.unwrap(), "recovered");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_seed_recovers_from_poisoned_lock() {
|
||||||
|
let cache = new_cache();
|
||||||
|
poison_cache(&cache);
|
||||||
|
|
||||||
|
// seed() should not panic
|
||||||
|
cache.seed("seeded".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_invalidate_recovers_from_poisoned_lock() {
|
||||||
|
let cache = new_cache();
|
||||||
|
let count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
cache.get(ok_fetcher(count.clone(), "hello")).await.unwrap();
|
||||||
|
|
||||||
|
poison_cache(&cache);
|
||||||
|
|
||||||
|
// invalidate() should not panic
|
||||||
|
cache.invalidate();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user