mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-24 13:59:58 +00:00
Compare commits
8 Commits
api-docs-f
...
v0.4.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba6f949515 | ||
|
|
3dd8522bc9 | ||
|
|
e01ef63488 | ||
|
|
a6cf24b359 | ||
|
|
9a07c9aad8 | ||
|
|
d405798952 | ||
|
|
e8a8b92b2a | ||
|
|
66362c6506 |
@@ -1,5 +1,5 @@
|
||||
[bumpversion]
|
||||
current_version = 0.4.4
|
||||
current_version = 0.4.6
|
||||
commit = True
|
||||
message = Bump version: {current_version} → {new_version}
|
||||
tag = True
|
||||
|
||||
74
node/package-lock.json
generated
74
node/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.4.4",
|
||||
"version": "0.4.5",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "vectordb",
|
||||
"version": "0.4.4",
|
||||
"version": "0.4.5",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
@@ -53,11 +53,11 @@
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.4",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.4",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.4",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.4",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.4"
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.5",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.5",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.5",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.5",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.5"
|
||||
}
|
||||
},
|
||||
"node_modules/@75lb/deep-merge": {
|
||||
@@ -328,6 +328,66 @@
|
||||
"@jridgewell/sourcemap-codec": "^1.4.10"
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-arm64": {
|
||||
"version": "0.4.5",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.5.tgz",
|
||||
"integrity": "sha512-sR+Q9dRBzMm+NGqM7EiK07c7pQz/V4J//23p05CeO/YATjKYyU3jE/dmVenLjJGW2UUrRYiyUQ9X6Up+OOgdhA==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"darwin"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-x64": {
|
||||
"version": "0.4.5",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.5.tgz",
|
||||
"integrity": "sha512-/BIyUeVkLaUlOEQN4HUQ9J9ZdNWkDpZPUUS9kfz5iYIjotgwpSfznF8Q1GY5BVuXa2ke7GC3tnkwwd5ZMOuDsA==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"darwin"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
|
||||
"version": "0.4.5",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.5.tgz",
|
||||
"integrity": "sha512-bq8vX7znIf2Dap41YIbB5uA/YahwaLvFPNH0WmwqeBWxF64/AJ74DsZk51ftwczQMsyLK74M8f1PzniapMAR+Q==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"linux"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
|
||||
"version": "0.4.5",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.5.tgz",
|
||||
"integrity": "sha512-5qCWFyxihyMDYIGRAdQ7zv3enBEDxPR08dCmXr2Bu9yYI3SUqfuSvFX1NwflVeB+RzRMMbeG4xiaEbo7H7/H3Q==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"linux"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
|
||||
"version": "0.4.5",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.5.tgz",
|
||||
"integrity": "sha512-z3dZ6TDzm2EU5gNuejshArs3o84v1rdXnds22TTuc9fVhwg5JG87FyHFZKU1MGuyLuZW22Me0YDuS9VR+eAp0Q==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"win32"
|
||||
]
|
||||
},
|
||||
"node_modules/@neon-rs/cli": {
|
||||
"version": "0.0.160",
|
||||
"resolved": "https://registry.npmjs.org/@neon-rs/cli/-/cli-0.0.160.tgz",
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.4.4",
|
||||
"version": "0.4.6",
|
||||
"description": " Serverless, low-latency vector database for AI applications",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"tsc": "tsc -b",
|
||||
"build": "cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json && tsc -b",
|
||||
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json",
|
||||
"build-release": "npm run build -- --release",
|
||||
"test": "npm run tsc && mocha -recursive dist/test",
|
||||
"integration-test": "npm run tsc && mocha -recursive dist/integration_test",
|
||||
@@ -81,10 +81,10 @@
|
||||
}
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.4",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.4",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.4",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.4",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.4"
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.6",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.6",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.6",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.6",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.6"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -391,24 +391,6 @@ describe('LanceDB client', function () {
|
||||
})
|
||||
}).timeout(120000)
|
||||
|
||||
it('fails to create a new table when the vector column is missing', async function () {
|
||||
const dir = await track().mkdir('lancejs')
|
||||
const con = await lancedb.connect(dir)
|
||||
|
||||
const data = [
|
||||
{
|
||||
id: 1,
|
||||
price: 10
|
||||
}
|
||||
]
|
||||
|
||||
const create = con.createTable('missing_vector', data)
|
||||
await expect(create).to.be.rejectedWith(
|
||||
Error,
|
||||
"column 'vector' is missing"
|
||||
)
|
||||
})
|
||||
|
||||
it('use overwrite flag to overwrite existing table', async function () {
|
||||
const dir = await track().mkdir('lancejs')
|
||||
const con = await lancedb.connect(dir)
|
||||
|
||||
@@ -10,14 +10,15 @@ crate-type = ["cdylib"]
|
||||
|
||||
[dependencies]
|
||||
arrow-ipc.workspace = true
|
||||
futures.workspace = true
|
||||
lance-linalg.workspace = true
|
||||
lance.workspace = true
|
||||
vectordb = { path = "../rust/vectordb" }
|
||||
napi = { version = "2.14", default-features = false, features = [
|
||||
"napi7",
|
||||
"async"
|
||||
] }
|
||||
napi-derive = "2.14"
|
||||
vectordb = { path = "../rust/vectordb" }
|
||||
lance.workspace = true
|
||||
lance-linalg.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
napi-build = "2.1"
|
||||
|
||||
@@ -53,6 +53,16 @@ describe("Test creating index", () => {
|
||||
const indexDir = path.join(tmpDir, "test.lance", "_indices");
|
||||
expect(fs.readdirSync(indexDir)).toHaveLength(1);
|
||||
// TODO: check index type.
|
||||
|
||||
// Search without specifying the column
|
||||
let query_vector = data.toArray()[5].vec.toJSON();
|
||||
let rst = await tbl.query().nearestTo(query_vector).limit(2).toArrow();
|
||||
expect(rst.numRows).toBe(2);
|
||||
|
||||
// Search with specifying the column
|
||||
let rst2 = await tbl.search(query_vector, "vec").limit(2).toArrow();
|
||||
expect(rst2.numRows).toBe(2);
|
||||
expect(rst.toString()).toEqual(rst2.toString());
|
||||
});
|
||||
|
||||
test("no vector column available", async () => {
|
||||
@@ -71,6 +81,80 @@ describe("Test creating index", () => {
|
||||
await tbl.createIndex("val").build();
|
||||
const indexDir = path.join(tmpDir, "no_vec.lance", "_indices");
|
||||
expect(fs.readdirSync(indexDir)).toHaveLength(1);
|
||||
|
||||
for await (const r of tbl.query().filter("id > 1").select(["id"])) {
|
||||
expect(r.numRows).toBe(1);
|
||||
}
|
||||
});
|
||||
|
||||
test("two columns with different dimensions", async () => {
|
||||
const db = await connect(tmpDir);
|
||||
const schema = new Schema([
|
||||
new Field("id", new Int32(), true),
|
||||
new Field("vec", new FixedSizeList(32, new Field("item", new Float32()))),
|
||||
new Field(
|
||||
"vec2",
|
||||
new FixedSizeList(64, new Field("item", new Float32()))
|
||||
),
|
||||
]);
|
||||
const tbl = await db.createTable(
|
||||
"two_vectors",
|
||||
makeArrowTable(
|
||||
Array(300)
|
||||
.fill(1)
|
||||
.map((_, i) => ({
|
||||
id: i,
|
||||
vec: Array(32)
|
||||
.fill(1)
|
||||
.map(() => Math.random()),
|
||||
vec2: Array(64) // different dimension
|
||||
.fill(1)
|
||||
.map(() => Math.random()),
|
||||
})),
|
||||
{ schema }
|
||||
)
|
||||
);
|
||||
|
||||
// Only build index over v1
|
||||
await expect(tbl.createIndex().build()).rejects.toThrow(
|
||||
/.*More than one vector columns found.*/
|
||||
);
|
||||
tbl
|
||||
.createIndex("vec")
|
||||
.ivf_pq({ num_partitions: 2, num_sub_vectors: 2 })
|
||||
.build();
|
||||
|
||||
const rst = await tbl
|
||||
.query()
|
||||
.nearestTo(
|
||||
Array(32)
|
||||
.fill(1)
|
||||
.map(() => Math.random())
|
||||
)
|
||||
.limit(2)
|
||||
.toArrow();
|
||||
expect(rst.numRows).toBe(2);
|
||||
|
||||
// Search with specifying the column
|
||||
await expect(
|
||||
tbl
|
||||
.search(
|
||||
Array(64)
|
||||
.fill(1)
|
||||
.map(() => Math.random()),
|
||||
"vec"
|
||||
)
|
||||
.limit(2)
|
||||
.toArrow()
|
||||
).rejects.toThrow(/.*does not match the dimension.*/);
|
||||
|
||||
const query64 = Array(64)
|
||||
.fill(1)
|
||||
.map(() => Math.random());
|
||||
const rst64_1 = await tbl.query().nearestTo(query64).limit(2).toArrow();
|
||||
const rst64_2 = await tbl.search(query64, "vec2").limit(2).toArrow();
|
||||
expect(rst64_1.toString()).toEqual(rst64_2.toString());
|
||||
expect(rst64_1.numRows).toBe(2);
|
||||
});
|
||||
|
||||
test("create scalar index", async () => {
|
||||
|
||||
@@ -91,7 +91,6 @@ impl IndexBuilder {
|
||||
|
||||
#[napi]
|
||||
pub async fn build(&self) -> napi::Result<()> {
|
||||
println!("nodejs::index.rs : build");
|
||||
self.inner
|
||||
.build()
|
||||
.await
|
||||
|
||||
47
nodejs/src/iterator.rs
Normal file
47
nodejs/src/iterator.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use futures::StreamExt;
|
||||
use lance::io::RecordBatchStream;
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
use vectordb::ipc::batches_to_ipc_file;
|
||||
|
||||
/** Typescript-style Async Iterator over RecordBatches */
|
||||
#[napi]
|
||||
pub struct RecordBatchIterator {
|
||||
inner: Box<dyn RecordBatchStream + Unpin>,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl RecordBatchIterator {
|
||||
pub(crate) fn new(inner: Box<dyn RecordBatchStream + Unpin>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async unsafe fn next(&mut self) -> napi::Result<Option<Buffer>> {
|
||||
if let Some(rst) = self.inner.next().await {
|
||||
let batch = rst.map_err(|e| {
|
||||
napi::Error::from_reason(format!("Failed to get next batch from stream: {}", e))
|
||||
})?;
|
||||
batches_to_ipc_file(&[batch])
|
||||
.map_err(|e| napi::Error::from_reason(format!("Failed to write IPC file: {}", e)))
|
||||
.map(|buf| Some(Buffer::from(buf)))
|
||||
} else {
|
||||
// We are done with the stream.
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ use napi_derive::*;
|
||||
|
||||
mod connection;
|
||||
mod index;
|
||||
mod iterator;
|
||||
mod query;
|
||||
mod table;
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
use vectordb::query::Query as LanceDBQuery;
|
||||
|
||||
use crate::table::Table;
|
||||
use crate::{iterator::RecordBatchIterator, table::Table};
|
||||
|
||||
#[napi]
|
||||
pub struct Query {
|
||||
@@ -32,17 +32,50 @@ impl Query {
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn vector(&mut self, vector: Float32Array) {
|
||||
let inn = self.inner.clone().nearest_to(&vector);
|
||||
self.inner = inn;
|
||||
pub fn column(&mut self, column: String) {
|
||||
self.inner = self.inner.clone().column(&column);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn to_arrow(&self) -> napi::Result<()> {
|
||||
// let buf = self.inner.to_arrow().map_err(|e| {
|
||||
// napi::Error::from_reason(format!("Failed to convert query to arrow: {}", e))
|
||||
// })?;
|
||||
// Ok(buf)
|
||||
todo!()
|
||||
pub fn filter(&mut self, filter: String) {
|
||||
self.inner = self.inner.clone().filter(filter);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn select(&mut self, columns: Vec<String>) {
|
||||
self.inner = self.inner.clone().select(&columns);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn limit(&mut self, limit: u32) {
|
||||
self.inner = self.inner.clone().limit(limit as usize);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn prefilter(&mut self, prefilter: bool) {
|
||||
self.inner = self.inner.clone().prefilter(prefilter);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn nearest_to(&mut self, vector: Float32Array) {
|
||||
self.inner = self.inner.clone().nearest_to(&vector);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn refine_factor(&mut self, refine_factor: u32) {
|
||||
self.inner = self.inner.clone().refine_factor(refine_factor);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn nprobes(&mut self, nprobe: u32) {
|
||||
self.inner = self.inner.clone().nprobes(nprobe as usize);
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn execute_stream(&self) -> napi::Result<RecordBatchIterator> {
|
||||
let inner_stream = self.inner.execute_stream().await.map_err(|e| {
|
||||
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
|
||||
})?;
|
||||
Ok(RecordBatchIterator::new(Box::new(inner_stream)))
|
||||
}
|
||||
}
|
||||
|
||||
15
nodejs/vectordb/native.d.ts
vendored
15
nodejs/vectordb/native.d.ts
vendored
@@ -54,9 +54,20 @@ export class IndexBuilder {
|
||||
scalar(): void
|
||||
build(): Promise<void>
|
||||
}
|
||||
/** Typescript-style Async Iterator over RecordBatches */
|
||||
export class RecordBatchIterator {
|
||||
next(): Promise<Buffer | null>
|
||||
}
|
||||
export class Query {
|
||||
vector(vector: Float32Array): void
|
||||
toArrow(): void
|
||||
column(column: string): void
|
||||
filter(filter: string): void
|
||||
select(columns: Array<string>): void
|
||||
limit(limit: number): void
|
||||
prefilter(prefilter: boolean): void
|
||||
nearestTo(vector: Float32Array): void
|
||||
refineFactor(refineFactor: number): void
|
||||
nprobes(nprobe: number): void
|
||||
executeStream(): Promise<RecordBatchIterator>
|
||||
}
|
||||
export class Table {
|
||||
/** Return Schema as empty Arrow IPC file. */
|
||||
|
||||
@@ -295,12 +295,13 @@ if (!nativeBinding) {
|
||||
throw new Error(`Failed to load native binding`)
|
||||
}
|
||||
|
||||
const { Connection, IndexType, MetricType, IndexBuilder, Query, Table, WriteMode, connect } = nativeBinding
|
||||
const { Connection, IndexType, MetricType, IndexBuilder, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding
|
||||
|
||||
module.exports.Connection = Connection
|
||||
module.exports.IndexType = IndexType
|
||||
module.exports.MetricType = MetricType
|
||||
module.exports.IndexBuilder = IndexBuilder
|
||||
module.exports.RecordBatchIterator = RecordBatchIterator
|
||||
module.exports.Query = Query
|
||||
module.exports.Table = Table
|
||||
module.exports.WriteMode = WriteMode
|
||||
|
||||
@@ -12,46 +12,73 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import { RecordBatch } from "apache-arrow";
|
||||
import { Table } from "./table";
|
||||
import { RecordBatch, tableFromIPC, Table as ArrowTable } from "apache-arrow";
|
||||
import {
|
||||
RecordBatchIterator as NativeBatchIterator,
|
||||
Query as NativeQuery,
|
||||
Table as NativeTable,
|
||||
} from "./native";
|
||||
|
||||
// TODO: re-eanble eslint once we have a real implementation
|
||||
/* eslint-disable */
|
||||
class RecordBatchIterator implements AsyncIterator<RecordBatch> {
|
||||
next(
|
||||
...args: [] | [undefined]
|
||||
): Promise<IteratorResult<RecordBatch<any>, any>> {
|
||||
throw new Error("Method not implemented.");
|
||||
private promised_inner?: Promise<NativeBatchIterator>;
|
||||
private inner?: NativeBatchIterator;
|
||||
|
||||
constructor(
|
||||
inner?: NativeBatchIterator,
|
||||
promise?: Promise<NativeBatchIterator>
|
||||
) {
|
||||
// TODO: check promise reliably so we dont need to pass two arguments.
|
||||
this.inner = inner;
|
||||
this.promised_inner = promise;
|
||||
}
|
||||
return?(value?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
throw?(e?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
|
||||
throw new Error("Method not implemented.");
|
||||
|
||||
async next(): Promise<IteratorResult<RecordBatch<any>, any>> {
|
||||
if (this.inner === undefined) {
|
||||
this.inner = await this.promised_inner;
|
||||
}
|
||||
if (this.inner === undefined) {
|
||||
throw new Error("Invalid iterator state state");
|
||||
}
|
||||
const n = await this.inner.next();
|
||||
if (n == null) {
|
||||
return Promise.resolve({ done: true, value: null });
|
||||
}
|
||||
const tbl = tableFromIPC(n);
|
||||
if (tbl.batches.length != 1) {
|
||||
throw new Error("Expected only one batch");
|
||||
}
|
||||
return Promise.resolve({ done: false, value: tbl.batches[0] });
|
||||
}
|
||||
}
|
||||
/* eslint-enable */
|
||||
|
||||
/** Query executor */
|
||||
export class Query implements AsyncIterable<RecordBatch> {
|
||||
private readonly tbl: Table;
|
||||
private _filter?: string;
|
||||
private _limit?: number;
|
||||
private readonly inner: NativeQuery;
|
||||
|
||||
// Vector search
|
||||
private _vector?: Float32Array;
|
||||
private _nprobes?: number;
|
||||
private _refine_factor?: number = 1;
|
||||
constructor(tbl: NativeTable) {
|
||||
this.inner = tbl.query();
|
||||
}
|
||||
|
||||
constructor(tbl: Table) {
|
||||
this.tbl = tbl;
|
||||
/** Set the column to run query. */
|
||||
column(column: string): Query {
|
||||
this.inner.column(column);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Set the filter predicate, only returns the results that satisfy the filter.
|
||||
*
|
||||
*/
|
||||
filter(predicate: string): Query {
|
||||
this._filter = predicate;
|
||||
this.inner.filter(predicate);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select the columns to return. If not set, all columns are returned.
|
||||
*/
|
||||
select(columns: string[]): Query {
|
||||
this.inner.select(columns);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -59,35 +86,67 @@ export class Query implements AsyncIterable<RecordBatch> {
|
||||
* Set the limit of rows to return.
|
||||
*/
|
||||
limit(limit: number): Query {
|
||||
this._limit = limit;
|
||||
this.inner.limit(limit);
|
||||
return this;
|
||||
}
|
||||
|
||||
prefilter(prefilter: boolean): Query {
|
||||
this.inner.prefilter(prefilter);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the query vector.
|
||||
*/
|
||||
vector(vector: number[]): Query {
|
||||
this._vector = Float32Array.from(vector);
|
||||
nearestTo(vector: number[]): Query {
|
||||
this.inner.nearestTo(Float32Array.from(vector));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of probes to use for the query.
|
||||
* Set the number of IVF partitions to use for the query.
|
||||
*/
|
||||
nprobes(nprobes: number): Query {
|
||||
this._nprobes = nprobes;
|
||||
this.inner.nprobes(nprobes);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the refine factor for the query.
|
||||
*/
|
||||
refine_factor(refine_factor: number): Query {
|
||||
this._refine_factor = refine_factor;
|
||||
refineFactor(refine_factor: number): Query {
|
||||
this.inner.refineFactor(refine_factor);
|
||||
return this;
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>, any, undefined> {
|
||||
throw new RecordBatchIterator();
|
||||
/**
|
||||
* Execute the query and return the results as an AsyncIterator.
|
||||
*/
|
||||
async executeStream(): Promise<RecordBatchIterator> {
|
||||
const inner = await this.inner.executeStream();
|
||||
return new RecordBatchIterator(inner);
|
||||
}
|
||||
|
||||
/** Collect the results as an Arrow Table. */
|
||||
async toArrow(): Promise<ArrowTable> {
|
||||
const batches = [];
|
||||
for await (const batch of this) {
|
||||
batches.push(batch);
|
||||
}
|
||||
return new ArrowTable(batches);
|
||||
}
|
||||
|
||||
/** Returns a JSON Array of All results.
|
||||
*
|
||||
*/
|
||||
async toArray(): Promise<any[]> {
|
||||
const tbl = await this.toArrow();
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return tbl.toArray();
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>> {
|
||||
const promise = this.inner.executeStream();
|
||||
return new RecordBatchIterator(undefined, promise);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,10 +95,58 @@ export class Table {
|
||||
return builder;
|
||||
}
|
||||
|
||||
search(vector?: number[]): Query {
|
||||
const q = new Query(this);
|
||||
if (vector !== undefined) {
|
||||
q.vector(vector);
|
||||
/**
|
||||
* Create a generic {@link Query} Builder.
|
||||
*
|
||||
* When appropriate, various indices and statistics based pruning will be used to
|
||||
* accelerate the query.
|
||||
*
|
||||
* @example
|
||||
*
|
||||
* ### Run a SQL-style query
|
||||
* ```typescript
|
||||
* for await (const batch of table.query()
|
||||
* .filter("id > 1").select(["id"]).limit(20)) {
|
||||
* console.log(batch);
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* ### Run Top-10 vector similarity search
|
||||
* ```typescript
|
||||
* for await (const batch of table.query()
|
||||
* .nearestTo([1, 2, 3])
|
||||
* .refineFactor(5).nprobe(10)
|
||||
* .limit(10)) {
|
||||
* console.log(batch);
|
||||
* }
|
||||
*```
|
||||
*
|
||||
* ### Scan the full dataset
|
||||
* ```typescript
|
||||
* for await (const batch of table.query()) {
|
||||
* console.log(batch);
|
||||
* }
|
||||
*
|
||||
* ### Return the full dataset as Arrow Table
|
||||
* ```typescript
|
||||
* let arrowTbl = await table.query().nearestTo([1.0, 2.0, 0.5, 6.7]).toArrow();
|
||||
* ```
|
||||
*
|
||||
* @returns {@link Query}
|
||||
*/
|
||||
query(): Query {
|
||||
return new Query(this.inner);
|
||||
}
|
||||
|
||||
/** Search the table with a given query vector.
|
||||
*
|
||||
* This is a convenience method for preparing an ANN {@link Query}.
|
||||
*/
|
||||
search(vector: number[], column?: string): Query {
|
||||
const q = this.query();
|
||||
q.nearestTo(vector);
|
||||
if (column !== undefined) {
|
||||
q.column(column);
|
||||
}
|
||||
return q;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "vectordb-node"
|
||||
version = "0.4.4"
|
||||
version = "0.4.6"
|
||||
description = "Serverless, low-latency vector database for AI applications"
|
||||
license = "Apache-2.0"
|
||||
edition = "2018"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Lance Developers.
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
@@ -19,19 +19,8 @@ use arrow_array::RecordBatch;
|
||||
use arrow_ipc::reader::FileReader;
|
||||
use arrow_ipc::writer::FileWriter;
|
||||
use arrow_schema::SchemaRef;
|
||||
use vectordb::table::VECTOR_COLUMN_NAME;
|
||||
|
||||
use crate::error::{MissingColumnSnafu, Result};
|
||||
use snafu::prelude::*;
|
||||
|
||||
fn validate_vector_column(record_batch: &RecordBatch) -> Result<()> {
|
||||
record_batch
|
||||
.column_by_name(VECTOR_COLUMN_NAME)
|
||||
.map(|_| ())
|
||||
.context(MissingColumnSnafu {
|
||||
name: VECTOR_COLUMN_NAME,
|
||||
})
|
||||
}
|
||||
use crate::error::Result;
|
||||
|
||||
pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> {
|
||||
let mut batches: Vec<RecordBatch> = Vec::new();
|
||||
@@ -39,7 +28,6 @@ pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBa
|
||||
let schema = file_reader.schema();
|
||||
for b in file_reader {
|
||||
let record_batch = b?;
|
||||
validate_vector_column(&record_batch)?;
|
||||
batches.push(record_batch);
|
||||
}
|
||||
Ok((batches, schema))
|
||||
|
||||
@@ -19,6 +19,7 @@ use neon::{
|
||||
};
|
||||
|
||||
use crate::{error::ResultExt, runtime, table::JsTable};
|
||||
use vectordb::Table;
|
||||
|
||||
pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||
@@ -35,7 +36,9 @@ pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsP
|
||||
let idx_result = table
|
||||
.as_native()
|
||||
.unwrap()
|
||||
.create_scalar_index(&column, replace)
|
||||
.create_index(&[&column])
|
||||
.replace(replace)
|
||||
.build()
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
|
||||
@@ -16,6 +16,7 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
|
||||
use lance::dataset::optimize::CompactionOptions;
|
||||
use lance::dataset::{WriteMode, WriteParams};
|
||||
use lance::io::ObjectStoreParams;
|
||||
use vectordb::table::OptimizeAction;
|
||||
|
||||
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
|
||||
use neon::prelude::*;
|
||||
@@ -245,27 +246,30 @@ impl JsTable {
|
||||
.map(|val| val.value(&mut cx) as i64)
|
||||
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
|
||||
let older_than = chrono::Duration::minutes(older_than);
|
||||
let delete_unverified: bool = cx
|
||||
.argument_opt(1)
|
||||
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
|
||||
.map(|val| val.value(&mut cx))
|
||||
.unwrap_or_default();
|
||||
let delete_unverified: Option<bool> = Some(
|
||||
cx.argument_opt(1)
|
||||
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
|
||||
.map(|val| val.value(&mut cx))
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
|
||||
rt.spawn(async move {
|
||||
let stats = table
|
||||
.as_native()
|
||||
.unwrap()
|
||||
.cleanup_old_versions(older_than, Some(delete_unverified))
|
||||
.optimize(OptimizeAction::Prune {
|
||||
older_than,
|
||||
delete_unverified,
|
||||
})
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let stats = stats.or_throw(&mut cx)?;
|
||||
|
||||
let prune_stats = stats.prune.as_ref().expect("Prune stats missing");
|
||||
let output_metrics = JsObject::new(&mut cx);
|
||||
let bytes_removed = cx.number(stats.bytes_removed as f64);
|
||||
let bytes_removed = cx.number(prune_stats.bytes_removed as f64);
|
||||
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
|
||||
|
||||
let old_versions = cx.number(stats.old_versions as f64);
|
||||
let old_versions = cx.number(prune_stats.old_versions as f64);
|
||||
output_metrics.set(&mut cx, "oldVersions", old_versions)?;
|
||||
|
||||
let output_table = cx.boxed(JsTable::from(table));
|
||||
@@ -317,13 +321,15 @@ impl JsTable {
|
||||
|
||||
rt.spawn(async move {
|
||||
let stats = table
|
||||
.as_native()
|
||||
.unwrap()
|
||||
.compact_files(options, None)
|
||||
.optimize(OptimizeAction::Compact {
|
||||
options,
|
||||
remap_options: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let stats = stats.or_throw(&mut cx)?;
|
||||
let stats = stats.compaction.as_ref().expect("Compact stats missing");
|
||||
|
||||
let output_metrics = JsObject::new(&mut cx);
|
||||
let fragments_removed = cx.number(stats.fragments_removed as f64);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "vectordb"
|
||||
version = "0.4.4"
|
||||
version = "0.4.6"
|
||||
edition = "2021"
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -68,6 +68,87 @@ pub trait Connection: Send + Sync {
|
||||
async fn drop_table(&self, name: &str) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectOptions {
|
||||
/// Database URI
|
||||
///
|
||||
/// # Accpeted URI formats
|
||||
///
|
||||
/// - `/path/to/database` - local database on file system.
|
||||
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
|
||||
/// - `db://dbname` - Lance Cloud
|
||||
pub uri: String,
|
||||
|
||||
/// Lance Cloud API key
|
||||
pub api_key: Option<String>,
|
||||
/// Lance Cloud region
|
||||
pub region: Option<String>,
|
||||
/// Lance Cloud host override
|
||||
pub host_override: Option<String>,
|
||||
|
||||
/// The maximum number of indices to cache in memory. Defaults to 256.
|
||||
pub index_cache_size: u32,
|
||||
}
|
||||
|
||||
impl ConnectOptions {
|
||||
/// Create a new [`ConnectOptions`] with the given database URI.
|
||||
pub fn new(uri: &str) -> Self {
|
||||
Self {
|
||||
uri: uri.to_string(),
|
||||
api_key: None,
|
||||
region: None,
|
||||
host_override: None,
|
||||
index_cache_size: 256,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn api_key(mut self, api_key: &str) -> Self {
|
||||
self.api_key = Some(api_key.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn region(mut self, region: &str) -> Self {
|
||||
self.region = Some(region.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn host_override(mut self, host_override: &str) -> Self {
|
||||
self.host_override = Some(host_override.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn index_cache_size(mut self, index_cache_size: u32) -> Self {
|
||||
self.index_cache_size = index_cache_size;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect to a LanceDB database.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `uri` - URI where the database is located, can be a local file or a supported remote cloud storage
|
||||
///
|
||||
/// ## Accepted URI formats
|
||||
///
|
||||
/// - `/path/to/database` - local database on file system.
|
||||
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
|
||||
/// - `db://dbname` - Lance Cloud
|
||||
///
|
||||
pub async fn connect(uri: &str) -> Result<Arc<dyn Connection>> {
|
||||
let options = ConnectOptions::new(uri);
|
||||
connect_with_options(&options).await
|
||||
}
|
||||
|
||||
/// Connect with [`ConnectOptions`].
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `options` - [`ConnectOptions`] to connect to the database.
|
||||
pub async fn connect_with_options(options: &ConnectOptions) -> Result<Arc<dyn Connection>> {
|
||||
let db = Database::connect(&options.uri).await?;
|
||||
Ok(Arc::new(db))
|
||||
}
|
||||
|
||||
pub struct Database {
|
||||
object_store: ObjectStore,
|
||||
query_string: Option<String>,
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::{cmp::max, sync::Arc};
|
||||
|
||||
use lance::index::scalar::ScalarIndexParams;
|
||||
use lance_index::{DatasetIndexExt, IndexType};
|
||||
pub use lance_linalg::distance::MetricType;
|
||||
|
||||
@@ -232,10 +233,14 @@ impl IndexBuilder {
|
||||
let mut dataset = tbl.clone_inner_dataset();
|
||||
match params {
|
||||
IndexParams::Scalar { replace } => {
|
||||
self.table
|
||||
.as_native()
|
||||
.unwrap()
|
||||
.create_scalar_index(column, replace)
|
||||
dataset
|
||||
.create_index(
|
||||
&[&column],
|
||||
IndexType::Scalar,
|
||||
None,
|
||||
&ScalarIndexParams::default(),
|
||||
replace,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
IndexParams::IvfPq {
|
||||
|
||||
@@ -16,10 +16,10 @@
|
||||
|
||||
use std::io::Cursor;
|
||||
|
||||
use arrow_array::RecordBatchReader;
|
||||
use arrow_ipc::reader::StreamReader;
|
||||
use arrow_array::{RecordBatch, RecordBatchReader};
|
||||
use arrow_ipc::{reader::StreamReader, writer::FileWriter};
|
||||
|
||||
use crate::Result;
|
||||
use crate::{Error, Result};
|
||||
|
||||
/// Convert a Arrow IPC file to a batch reader
|
||||
pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
|
||||
@@ -28,6 +28,22 @@ pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
/// Convert record batches to Arrow IPC file
|
||||
pub fn batches_to_ipc_file(batches: &[RecordBatch]) -> Result<Vec<u8>> {
|
||||
if batches.is_empty() {
|
||||
return Err(Error::Store {
|
||||
message: "No batches to write".to_string(),
|
||||
});
|
||||
}
|
||||
let schema = batches[0].schema();
|
||||
let mut writer = FileWriter::try_new(vec![], &schema)?;
|
||||
for batch in batches {
|
||||
writer.write(batch)?;
|
||||
}
|
||||
writer.finish()?;
|
||||
Ok(writer.into_inner()?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -41,15 +41,32 @@
|
||||
//!
|
||||
//! ### Quick Start
|
||||
//!
|
||||
//! <div class="warning">Rust API is not stable yet.</div>
|
||||
//! <div class="warning">Rust API is not stable yet, please expect breaking changes.</div>
|
||||
//!
|
||||
//! #### Connect to a database.
|
||||
//!
|
||||
//! ```rust
|
||||
//! use vectordb::connection::Database;
|
||||
//! use vectordb::connect;
|
||||
//! # use arrow_schema::{Field, Schema};
|
||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
//! let db = Database::connect("data/sample-lancedb").await.unwrap();
|
||||
//! let db = connect("data/sample-lancedb").await.unwrap();
|
||||
//! # });
|
||||
//! ```
|
||||
//!
|
||||
//! LanceDB accepts the different form of database path:
|
||||
//!
|
||||
//! - `/path/to/database` - local database on file system.
|
||||
//! - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
|
||||
//! - `db://dbname` - Lance Cloud
|
||||
//!
|
||||
//! You can also use [`ConnectOptions`] to configure the connectoin to the database.
|
||||
//!
|
||||
//! ```rust
|
||||
//! use vectordb::{connect_with_options, ConnectOptions};
|
||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
//! let options = ConnectOptions::new("data/sample-lancedb")
|
||||
//! .index_cache_size(1024);
|
||||
//! let db = connect_with_options(&options).await.unwrap();
|
||||
//! # });
|
||||
//! ```
|
||||
//!
|
||||
@@ -57,6 +74,8 @@
|
||||
//! It treats [`FixedSizeList<Float16/Float32>`](https://docs.rs/arrow/latest/arrow/array/struct.FixedSizeListArray.html)
|
||||
//! columns as vector columns.
|
||||
//!
|
||||
//! For more details, please refer to [LanceDB documentation](https://lancedb.github.io/lancedb/).
|
||||
//!
|
||||
//! #### Create a table
|
||||
//!
|
||||
//! To create a Table, you need to provide a [`arrow_schema::Schema`] and a [`arrow_array::RecordBatch`] stream.
|
||||
@@ -67,10 +86,11 @@
|
||||
//! use arrow_array::{RecordBatch, RecordBatchIterator};
|
||||
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
|
||||
//! # use vectordb::connection::{Database, Connection};
|
||||
//! # use vectordb::connect;
|
||||
//!
|
||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
//! # let tmpdir = tempfile::tempdir().unwrap();
|
||||
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
|
||||
//! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
|
||||
//! let schema = Arc::new(Schema::new(vec![
|
||||
//! Field::new("id", DataType::Int32, false),
|
||||
//! Field::new("vector", DataType::FixedSizeList(
|
||||
@@ -94,13 +114,13 @@
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # use std::sync::Arc;
|
||||
//! # use vectordb::connection::{Database, Connection};
|
||||
//! # use vectordb::connect;
|
||||
//! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
|
||||
//! # RecordBatchIterator, Int32Array};
|
||||
//! # use arrow_schema::{Schema, Field, DataType};
|
||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
//! # let tmpdir = tempfile::tempdir().unwrap();
|
||||
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
|
||||
//! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
|
||||
//! # let tbl = db.open_table("idx_test").await.unwrap();
|
||||
//! tbl.create_index(&["vector"])
|
||||
//! .ivf_pq()
|
||||
@@ -166,4 +186,6 @@ pub use connection::{Connection, Database};
|
||||
pub use error::{Error, Result};
|
||||
pub use table::{Table, TableRef};
|
||||
|
||||
/// Connect to a database
|
||||
pub use connection::{connect, connect_with_options, ConnectOptions};
|
||||
pub use lance::dataset::WriteMode;
|
||||
|
||||
@@ -22,6 +22,7 @@ use lance_linalg::distance::MetricType;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::utils::default_vector_column;
|
||||
use crate::Error;
|
||||
|
||||
const DEFAULT_TOP_K: usize = 10;
|
||||
|
||||
@@ -93,6 +94,19 @@ impl Query {
|
||||
let arrow_schema = Schema::from(self.dataset.schema());
|
||||
default_vector_column(&arrow_schema, Some(query.len() as i32))?
|
||||
};
|
||||
let field = self.dataset.schema().field(&column).ok_or(Error::Store {
|
||||
message: format!("Column {} not found in dataset schema", column),
|
||||
})?;
|
||||
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
|
||||
{
|
||||
return Err(Error::Store {
|
||||
message: format!(
|
||||
"Vector column '{}' does not match the dimension of the query vector: dim={}",
|
||||
column,
|
||||
query.len(),
|
||||
),
|
||||
});
|
||||
}
|
||||
scanner.nearest(&column, query, self.limit.unwrap_or(DEFAULT_TOP_K))?;
|
||||
} else {
|
||||
// If there is no vector query, it's ok to not have a limit
|
||||
|
||||
@@ -27,9 +27,9 @@ use lance::dataset::optimize::{
|
||||
};
|
||||
pub use lance::dataset::ReadParams;
|
||||
use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
|
||||
use lance::index::scalar::ScalarIndexParams;
|
||||
use lance::io::WrappingObjectStore;
|
||||
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt, IndexType};
|
||||
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
|
||||
use log::info;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
|
||||
@@ -38,7 +38,46 @@ use crate::query::Query;
|
||||
use crate::utils::{PatchReadParam, PatchWriteParam};
|
||||
use crate::WriteMode;
|
||||
|
||||
pub const VECTOR_COLUMN_NAME: &str = "vector";
|
||||
/// Optimize the dataset.
|
||||
///
|
||||
/// Similar to `VACUUM` in PostgreSQL, it offers different options to
|
||||
/// optimize different parts of the table on disk.
|
||||
///
|
||||
/// By default, it optimizes everything, as [`OptimizeAction::All`].
|
||||
pub enum OptimizeAction {
|
||||
/// Run optimization on every, with default options.
|
||||
All,
|
||||
/// Compact files in the dataset
|
||||
Compact {
|
||||
options: CompactionOptions,
|
||||
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
|
||||
},
|
||||
/// Prune old version of datasets.
|
||||
Prune {
|
||||
/// The duration of time to keep versions of the dataset.
|
||||
older_than: Duration,
|
||||
/// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
|
||||
/// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
|
||||
delete_unverified: Option<bool>,
|
||||
},
|
||||
/// Optimize index.
|
||||
Index(OptimizeOptions),
|
||||
}
|
||||
|
||||
impl Default for OptimizeAction {
|
||||
fn default() -> Self {
|
||||
Self::All
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics about the optimization.
|
||||
pub struct OptimizeStats {
|
||||
/// Stats of the file compaction.
|
||||
pub compaction: Option<CompactionMetrics>,
|
||||
|
||||
/// Stats of the version pruning
|
||||
pub prune: Option<RemovalStats>,
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
///
|
||||
@@ -194,6 +233,14 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
||||
/// # });
|
||||
/// ```
|
||||
fn query(&self) -> Query;
|
||||
|
||||
/// Optimize the on-disk data and indices for better performance.
|
||||
///
|
||||
/// <section class="warning">Experimental API</section>
|
||||
///
|
||||
/// Modeled after ``VACCUM`` in PostgreSQL.
|
||||
/// Not all implementations support explicit optimization.
|
||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
||||
}
|
||||
|
||||
/// Reference to a Table pointer.
|
||||
@@ -396,17 +443,8 @@ impl NativeTable {
|
||||
self.dataset.lock().expect("lock poison").version().version
|
||||
}
|
||||
|
||||
/// Create a scalar index on the table
|
||||
pub async fn create_scalar_index(&self, column: &str, replace: bool) -> Result<()> {
|
||||
let mut dataset = self.clone_inner_dataset();
|
||||
let params = ScalarIndexParams::default();
|
||||
dataset
|
||||
.create_index(&[column], IndexType::Scalar, None, ¶ms, replace)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
|
||||
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
|
||||
info!("LanceDB: optimizing indices: {:?}", options);
|
||||
let mut dataset = self.clone_inner_dataset();
|
||||
dataset.optimize_indices(options).await?;
|
||||
|
||||
@@ -463,7 +501,7 @@ impl NativeTable {
|
||||
///
|
||||
/// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
|
||||
/// returns the result.
|
||||
pub async fn cleanup_old_versions(
|
||||
async fn cleanup_old_versions(
|
||||
&self,
|
||||
older_than: Duration,
|
||||
delete_unverified: Option<bool>,
|
||||
@@ -480,7 +518,7 @@ impl NativeTable {
|
||||
/// for faster reads.
|
||||
///
|
||||
/// This calls into [lance::dataset::optimize::compact_files].
|
||||
pub async fn compact_files(
|
||||
async fn compact_files(
|
||||
&self,
|
||||
options: CompactionOptions,
|
||||
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
|
||||
@@ -614,6 +652,52 @@ impl Table for NativeTable {
|
||||
self.reset_dataset(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
|
||||
let mut stats = OptimizeStats {
|
||||
compaction: None,
|
||||
prune: None,
|
||||
};
|
||||
match action {
|
||||
OptimizeAction::All => {
|
||||
stats.compaction = self
|
||||
.optimize(OptimizeAction::Compact {
|
||||
options: CompactionOptions::default(),
|
||||
remap_options: None,
|
||||
})
|
||||
.await?
|
||||
.compaction;
|
||||
stats.prune = self
|
||||
.optimize(OptimizeAction::Prune {
|
||||
older_than: Duration::days(7),
|
||||
delete_unverified: None,
|
||||
})
|
||||
.await?
|
||||
.prune;
|
||||
self.optimize(OptimizeAction::Index(OptimizeOptions::default()))
|
||||
.await?;
|
||||
}
|
||||
OptimizeAction::Compact {
|
||||
options,
|
||||
remap_options,
|
||||
} => {
|
||||
stats.compaction = Some(self.compact_files(options, remap_options).await?);
|
||||
}
|
||||
OptimizeAction::Prune {
|
||||
older_than,
|
||||
delete_unverified,
|
||||
} => {
|
||||
stats.prune = Some(
|
||||
self.cleanup_old_versions(older_than, delete_unverified)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
OptimizeAction::Index(options) => {
|
||||
self.optimize_indices(&options).await?;
|
||||
}
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user