Compare commits

...

9 Commits

Author SHA1 Message Date
Lance Release
ba6f949515 Bump version: 0.4.5 → 0.4.6 2024-01-26 22:40:36 +00:00
Lei Xu
3dd8522bc9 feat(rust): provide connect and connect_with_options in Rust SDK (#871)
* Bring the feature parity of Rust connect methods.
* A global connect method that can connect to local and remote / cloud
table, as the same as in js/python today.
2024-01-26 11:40:11 -08:00
Lei Xu
e01ef63488 chore(rust): simplified version of optimize (#869)
Consolidate various optimize() into one method, similar to postgres
VACCUM in the process of preparing Rust API for public use
2024-01-26 11:36:04 -08:00
Lei Xu
a6cf24b359 feat(napi): Issue queries as node SDK (#868)
* Query as a fluent API and `AsyncIterator<RecordBatch>`
* Much more docs
* Add tests for auto infer vector search columns with different
dimensions.
2024-01-25 22:14:14 -08:00
Lance Release
9a07c9aad8 Updating package-lock.json 2024-01-25 21:49:36 +00:00
Lance Release
d405798952 Updating package-lock.json 2024-01-25 20:54:55 +00:00
Lance Release
e8a8b92b2a Bump version: 0.4.4 → 0.4.5 2024-01-25 20:54:44 +00:00
Lei Xu
66362c6506 fix: release build for node sdk (#861) 2024-01-25 12:51:32 -08:00
Lance Release
5228ca4b6b Updating package-lock.json 2024-01-25 19:53:05 +00:00
25 changed files with 645 additions and 160 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 0.4.4 current_version = 0.4.6
commit = True commit = True
message = Bump version: {current_version} → {new_version} message = Bump version: {current_version} → {new_version}
tag = True tag = True

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{ {
"name": "vectordb", "name": "vectordb",
"version": "0.4.3", "version": "0.4.5",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "vectordb", "name": "vectordb",
"version": "0.4.3", "version": "0.4.5",
"cpu": [ "cpu": [
"x64", "x64",
"arm64" "arm64"
@@ -53,11 +53,11 @@
"uuid": "^9.0.0" "uuid": "^9.0.0"
}, },
"optionalDependencies": { "optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.3", "@lancedb/vectordb-darwin-arm64": "0.4.5",
"@lancedb/vectordb-darwin-x64": "0.4.3", "@lancedb/vectordb-darwin-x64": "0.4.5",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.3", "@lancedb/vectordb-linux-arm64-gnu": "0.4.5",
"@lancedb/vectordb-linux-x64-gnu": "0.4.3", "@lancedb/vectordb-linux-x64-gnu": "0.4.5",
"@lancedb/vectordb-win32-x64-msvc": "0.4.3" "@lancedb/vectordb-win32-x64-msvc": "0.4.5"
} }
}, },
"node_modules/@75lb/deep-merge": { "node_modules/@75lb/deep-merge": {
@@ -329,9 +329,9 @@
} }
}, },
"node_modules/@lancedb/vectordb-darwin-arm64": { "node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.4.3", "version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.3.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.5.tgz",
"integrity": "sha512-47CvvSaV1EdUsFEpXUJApTk+hMzAhCxVizipCFUlXCgcmzpCDL86wNgJij/X9a+j6zADhIX//Lsu0qd/an/Bpw==", "integrity": "sha512-sR+Q9dRBzMm+NGqM7EiK07c7pQz/V4J//23p05CeO/YATjKYyU3jE/dmVenLjJGW2UUrRYiyUQ9X6Up+OOgdhA==",
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
@@ -341,9 +341,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-darwin-x64": { "node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.4.3", "version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.3.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.5.tgz",
"integrity": "sha512-UlZZv8CmJIuRJNJG+Y1VmFsGyPR8W/72Q5EwgMMsSES6zpMQ9pNdBDWhL3UGX6nMRgnbprkwYiWJ3xHhJvtqtw==", "integrity": "sha512-/BIyUeVkLaUlOEQN4HUQ9J9ZdNWkDpZPUUS9kfz5iYIjotgwpSfznF8Q1GY5BVuXa2ke7GC3tnkwwd5ZMOuDsA==",
"cpu": [ "cpu": [
"x64" "x64"
], ],
@@ -353,9 +353,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-linux-arm64-gnu": { "node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.4.3", "version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.3.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.5.tgz",
"integrity": "sha512-L6NVJr/lKEd8+904FzZNpT8BGQMs2cHNYbGJMIaVvGnMiIJgKAFKtOyGtdDjoe1xRZoEw21yjRGksGbnRO5wHQ==", "integrity": "sha512-bq8vX7znIf2Dap41YIbB5uA/YahwaLvFPNH0WmwqeBWxF64/AJ74DsZk51ftwczQMsyLK74M8f1PzniapMAR+Q==",
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
@@ -365,9 +365,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-linux-x64-gnu": { "node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.4.3", "version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.3.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.5.tgz",
"integrity": "sha512-OBx3WF3pK0xNfFJeErmuD9R2QWLa3XdeZspyTsIrQmBDeKj3HKh8y7Scpx4NH5Y09+9JNqRRKRZN7OqWTYhITg==", "integrity": "sha512-5qCWFyxihyMDYIGRAdQ7zv3enBEDxPR08dCmXr2Bu9yYI3SUqfuSvFX1NwflVeB+RzRMMbeG4xiaEbo7H7/H3Q==",
"cpu": [ "cpu": [
"x64" "x64"
], ],
@@ -377,9 +377,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-win32-x64-msvc": { "node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.4.3", "version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.3.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.5.tgz",
"integrity": "sha512-n9IvR81NXZKnSN91mrgeXbEyCiGM+YLJpOgbdHoEtMP04VDnS+iSU4jGOtQBKErvWeCJQaGFQ9qzdcVchpRGyw==", "integrity": "sha512-z3dZ6TDzm2EU5gNuejshArs3o84v1rdXnds22TTuc9fVhwg5JG87FyHFZKU1MGuyLuZW22Me0YDuS9VR+eAp0Q==",
"cpu": [ "cpu": [
"x64" "x64"
], ],

View File

@@ -1,12 +1,12 @@
{ {
"name": "vectordb", "name": "vectordb",
"version": "0.4.4", "version": "0.4.6",
"description": " Serverless, low-latency vector database for AI applications", "description": " Serverless, low-latency vector database for AI applications",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"tsc": "tsc -b", "tsc": "tsc -b",
"build": "cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json && tsc -b", "build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json",
"build-release": "npm run build -- --release", "build-release": "npm run build -- --release",
"test": "npm run tsc && mocha -recursive dist/test", "test": "npm run tsc && mocha -recursive dist/test",
"integration-test": "npm run tsc && mocha -recursive dist/integration_test", "integration-test": "npm run tsc && mocha -recursive dist/integration_test",
@@ -81,10 +81,10 @@
} }
}, },
"optionalDependencies": { "optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.4", "@lancedb/vectordb-darwin-arm64": "0.4.6",
"@lancedb/vectordb-darwin-x64": "0.4.4", "@lancedb/vectordb-darwin-x64": "0.4.6",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.4", "@lancedb/vectordb-linux-arm64-gnu": "0.4.6",
"@lancedb/vectordb-linux-x64-gnu": "0.4.4", "@lancedb/vectordb-linux-x64-gnu": "0.4.6",
"@lancedb/vectordb-win32-x64-msvc": "0.4.4" "@lancedb/vectordb-win32-x64-msvc": "0.4.6"
} }
} }

View File

@@ -391,24 +391,6 @@ describe('LanceDB client', function () {
}) })
}).timeout(120000) }).timeout(120000)
it('fails to create a new table when the vector column is missing', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const data = [
{
id: 1,
price: 10
}
]
const create = con.createTable('missing_vector', data)
await expect(create).to.be.rejectedWith(
Error,
"column 'vector' is missing"
)
})
it('use overwrite flag to overwrite existing table', async function () { it('use overwrite flag to overwrite existing table', async function () {
const dir = await track().mkdir('lancejs') const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir) const con = await lancedb.connect(dir)

View File

@@ -10,14 +10,15 @@ crate-type = ["cdylib"]
[dependencies] [dependencies]
arrow-ipc.workspace = true arrow-ipc.workspace = true
futures.workspace = true
lance-linalg.workspace = true
lance.workspace = true
vectordb = { path = "../rust/vectordb" }
napi = { version = "2.14", default-features = false, features = [ napi = { version = "2.14", default-features = false, features = [
"napi7", "napi7",
"async" "async"
] } ] }
napi-derive = "2.14" napi-derive = "2.14"
vectordb = { path = "../rust/vectordb" }
lance.workspace = true
lance-linalg.workspace = true
[build-dependencies] [build-dependencies]
napi-build = "2.1" napi-build = "2.1"

View File

@@ -53,6 +53,16 @@ describe("Test creating index", () => {
const indexDir = path.join(tmpDir, "test.lance", "_indices"); const indexDir = path.join(tmpDir, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1); expect(fs.readdirSync(indexDir)).toHaveLength(1);
// TODO: check index type. // TODO: check index type.
// Search without specifying the column
let query_vector = data.toArray()[5].vec.toJSON();
let rst = await tbl.query().nearestTo(query_vector).limit(2).toArrow();
expect(rst.numRows).toBe(2);
// Search with specifying the column
let rst2 = await tbl.search(query_vector, "vec").limit(2).toArrow();
expect(rst2.numRows).toBe(2);
expect(rst.toString()).toEqual(rst2.toString());
}); });
test("no vector column available", async () => { test("no vector column available", async () => {
@@ -71,6 +81,80 @@ describe("Test creating index", () => {
await tbl.createIndex("val").build(); await tbl.createIndex("val").build();
const indexDir = path.join(tmpDir, "no_vec.lance", "_indices"); const indexDir = path.join(tmpDir, "no_vec.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1); expect(fs.readdirSync(indexDir)).toHaveLength(1);
for await (const r of tbl.query().filter("id > 1").select(["id"])) {
expect(r.numRows).toBe(1);
}
});
test("two columns with different dimensions", async () => {
const db = await connect(tmpDir);
const schema = new Schema([
new Field("id", new Int32(), true),
new Field("vec", new FixedSizeList(32, new Field("item", new Float32()))),
new Field(
"vec2",
new FixedSizeList(64, new Field("item", new Float32()))
),
]);
const tbl = await db.createTable(
"two_vectors",
makeArrowTable(
Array(300)
.fill(1)
.map((_, i) => ({
id: i,
vec: Array(32)
.fill(1)
.map(() => Math.random()),
vec2: Array(64) // different dimension
.fill(1)
.map(() => Math.random()),
})),
{ schema }
)
);
// Only build index over v1
await expect(tbl.createIndex().build()).rejects.toThrow(
/.*More than one vector columns found.*/
);
tbl
.createIndex("vec")
.ivf_pq({ num_partitions: 2, num_sub_vectors: 2 })
.build();
const rst = await tbl
.query()
.nearestTo(
Array(32)
.fill(1)
.map(() => Math.random())
)
.limit(2)
.toArrow();
expect(rst.numRows).toBe(2);
// Search with specifying the column
await expect(
tbl
.search(
Array(64)
.fill(1)
.map(() => Math.random()),
"vec"
)
.limit(2)
.toArrow()
).rejects.toThrow(/.*does not match the dimension.*/);
const query64 = Array(64)
.fill(1)
.map(() => Math.random());
const rst64_1 = await tbl.query().nearestTo(query64).limit(2).toArrow();
const rst64_2 = await tbl.search(query64, "vec2").limit(2).toArrow();
expect(rst64_1.toString()).toEqual(rst64_2.toString());
expect(rst64_1.numRows).toBe(2);
}); });
test("create scalar index", async () => { test("create scalar index", async () => {

View File

@@ -91,7 +91,6 @@ impl IndexBuilder {
#[napi] #[napi]
pub async fn build(&self) -> napi::Result<()> { pub async fn build(&self) -> napi::Result<()> {
println!("nodejs::index.rs : build");
self.inner self.inner
.build() .build()
.await .await

47
nodejs/src/iterator.rs Normal file
View File

@@ -0,0 +1,47 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::StreamExt;
use lance::io::RecordBatchStream;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::ipc::batches_to_ipc_file;
/** Typescript-style Async Iterator over RecordBatches */
#[napi]
pub struct RecordBatchIterator {
inner: Box<dyn RecordBatchStream + Unpin>,
}
#[napi]
impl RecordBatchIterator {
pub(crate) fn new(inner: Box<dyn RecordBatchStream + Unpin>) -> Self {
Self { inner }
}
#[napi]
pub async unsafe fn next(&mut self) -> napi::Result<Option<Buffer>> {
if let Some(rst) = self.inner.next().await {
let batch = rst.map_err(|e| {
napi::Error::from_reason(format!("Failed to get next batch from stream: {}", e))
})?;
batches_to_ipc_file(&[batch])
.map_err(|e| napi::Error::from_reason(format!("Failed to write IPC file: {}", e)))
.map(|buf| Some(Buffer::from(buf)))
} else {
// We are done with the stream.
Ok(None)
}
}
}

View File

@@ -17,6 +17,7 @@ use napi_derive::*;
mod connection; mod connection;
mod index; mod index;
mod iterator;
mod query; mod query;
mod table; mod table;

View File

@@ -16,7 +16,7 @@ use napi::bindgen_prelude::*;
use napi_derive::napi; use napi_derive::napi;
use vectordb::query::Query as LanceDBQuery; use vectordb::query::Query as LanceDBQuery;
use crate::table::Table; use crate::{iterator::RecordBatchIterator, table::Table};
#[napi] #[napi]
pub struct Query { pub struct Query {
@@ -32,17 +32,50 @@ impl Query {
} }
#[napi] #[napi]
pub fn vector(&mut self, vector: Float32Array) { pub fn column(&mut self, column: String) {
let inn = self.inner.clone().nearest_to(&vector); self.inner = self.inner.clone().column(&column);
self.inner = inn;
} }
#[napi] #[napi]
pub fn to_arrow(&self) -> napi::Result<()> { pub fn filter(&mut self, filter: String) {
// let buf = self.inner.to_arrow().map_err(|e| { self.inner = self.inner.clone().filter(filter);
// napi::Error::from_reason(format!("Failed to convert query to arrow: {}", e)) }
// })?;
// Ok(buf) #[napi]
todo!() pub fn select(&mut self, columns: Vec<String>) {
self.inner = self.inner.clone().select(&columns);
}
#[napi]
pub fn limit(&mut self, limit: u32) {
self.inner = self.inner.clone().limit(limit as usize);
}
#[napi]
pub fn prefilter(&mut self, prefilter: bool) {
self.inner = self.inner.clone().prefilter(prefilter);
}
#[napi]
pub fn nearest_to(&mut self, vector: Float32Array) {
self.inner = self.inner.clone().nearest_to(&vector);
}
#[napi]
pub fn refine_factor(&mut self, refine_factor: u32) {
self.inner = self.inner.clone().refine_factor(refine_factor);
}
#[napi]
pub fn nprobes(&mut self, nprobe: u32) {
self.inner = self.inner.clone().nprobes(nprobe as usize);
}
#[napi]
pub async fn execute_stream(&self) -> napi::Result<RecordBatchIterator> {
let inner_stream = self.inner.execute_stream().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
Ok(RecordBatchIterator::new(Box::new(inner_stream)))
} }
} }

View File

@@ -54,9 +54,20 @@ export class IndexBuilder {
scalar(): void scalar(): void
build(): Promise<void> build(): Promise<void>
} }
/** Typescript-style Async Iterator over RecordBatches */
export class RecordBatchIterator {
next(): Promise<Buffer | null>
}
export class Query { export class Query {
vector(vector: Float32Array): void column(column: string): void
toArrow(): void filter(filter: string): void
select(columns: Array<string>): void
limit(limit: number): void
prefilter(prefilter: boolean): void
nearestTo(vector: Float32Array): void
refineFactor(refineFactor: number): void
nprobes(nprobe: number): void
executeStream(): Promise<RecordBatchIterator>
} }
export class Table { export class Table {
/** Return Schema as empty Arrow IPC file. */ /** Return Schema as empty Arrow IPC file. */

View File

@@ -295,12 +295,13 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`) throw new Error(`Failed to load native binding`)
} }
const { Connection, IndexType, MetricType, IndexBuilder, Query, Table, WriteMode, connect } = nativeBinding const { Connection, IndexType, MetricType, IndexBuilder, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding
module.exports.Connection = Connection module.exports.Connection = Connection
module.exports.IndexType = IndexType module.exports.IndexType = IndexType
module.exports.MetricType = MetricType module.exports.MetricType = MetricType
module.exports.IndexBuilder = IndexBuilder module.exports.IndexBuilder = IndexBuilder
module.exports.RecordBatchIterator = RecordBatchIterator
module.exports.Query = Query module.exports.Query = Query
module.exports.Table = Table module.exports.Table = Table
module.exports.WriteMode = WriteMode module.exports.WriteMode = WriteMode

View File

@@ -12,46 +12,73 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import { RecordBatch } from "apache-arrow"; import { RecordBatch, tableFromIPC, Table as ArrowTable } from "apache-arrow";
import { Table } from "./table"; import {
RecordBatchIterator as NativeBatchIterator,
Query as NativeQuery,
Table as NativeTable,
} from "./native";
// TODO: re-eanble eslint once we have a real implementation
/* eslint-disable */
class RecordBatchIterator implements AsyncIterator<RecordBatch> { class RecordBatchIterator implements AsyncIterator<RecordBatch> {
next( private promised_inner?: Promise<NativeBatchIterator>;
...args: [] | [undefined] private inner?: NativeBatchIterator;
): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented."); constructor(
inner?: NativeBatchIterator,
promise?: Promise<NativeBatchIterator>
) {
// TODO: check promise reliably so we dont need to pass two arguments.
this.inner = inner;
this.promised_inner = promise;
} }
return?(value?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented."); async next(): Promise<IteratorResult<RecordBatch<any>, any>> {
} if (this.inner === undefined) {
throw?(e?: any): Promise<IteratorResult<RecordBatch<any>, any>> { this.inner = await this.promised_inner;
throw new Error("Method not implemented."); }
if (this.inner === undefined) {
throw new Error("Invalid iterator state state");
}
const n = await this.inner.next();
if (n == null) {
return Promise.resolve({ done: true, value: null });
}
const tbl = tableFromIPC(n);
if (tbl.batches.length != 1) {
throw new Error("Expected only one batch");
}
return Promise.resolve({ done: false, value: tbl.batches[0] });
} }
} }
/* eslint-enable */ /* eslint-enable */
/** Query executor */ /** Query executor */
export class Query implements AsyncIterable<RecordBatch> { export class Query implements AsyncIterable<RecordBatch> {
private readonly tbl: Table; private readonly inner: NativeQuery;
private _filter?: string;
private _limit?: number;
// Vector search constructor(tbl: NativeTable) {
private _vector?: Float32Array; this.inner = tbl.query();
private _nprobes?: number; }
private _refine_factor?: number = 1;
constructor(tbl: Table) { /** Set the column to run query. */
this.tbl = tbl; column(column: string): Query {
this.inner.column(column);
return this;
} }
/** Set the filter predicate, only returns the results that satisfy the filter. /** Set the filter predicate, only returns the results that satisfy the filter.
* *
*/ */
filter(predicate: string): Query { filter(predicate: string): Query {
this._filter = predicate; this.inner.filter(predicate);
return this;
}
/**
* Select the columns to return. If not set, all columns are returned.
*/
select(columns: string[]): Query {
this.inner.select(columns);
return this; return this;
} }
@@ -59,35 +86,67 @@ export class Query implements AsyncIterable<RecordBatch> {
* Set the limit of rows to return. * Set the limit of rows to return.
*/ */
limit(limit: number): Query { limit(limit: number): Query {
this._limit = limit; this.inner.limit(limit);
return this;
}
prefilter(prefilter: boolean): Query {
this.inner.prefilter(prefilter);
return this; return this;
} }
/** /**
* Set the query vector. * Set the query vector.
*/ */
vector(vector: number[]): Query { nearestTo(vector: number[]): Query {
this._vector = Float32Array.from(vector); this.inner.nearestTo(Float32Array.from(vector));
return this; return this;
} }
/** /**
* Set the number of probes to use for the query. * Set the number of IVF partitions to use for the query.
*/ */
nprobes(nprobes: number): Query { nprobes(nprobes: number): Query {
this._nprobes = nprobes; this.inner.nprobes(nprobes);
return this; return this;
} }
/** /**
* Set the refine factor for the query. * Set the refine factor for the query.
*/ */
refine_factor(refine_factor: number): Query { refineFactor(refine_factor: number): Query {
this._refine_factor = refine_factor; this.inner.refineFactor(refine_factor);
return this; return this;
} }
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>, any, undefined> { /**
throw new RecordBatchIterator(); * Execute the query and return the results as an AsyncIterator.
*/
async executeStream(): Promise<RecordBatchIterator> {
const inner = await this.inner.executeStream();
return new RecordBatchIterator(inner);
}
/** Collect the results as an Arrow Table. */
async toArrow(): Promise<ArrowTable> {
const batches = [];
for await (const batch of this) {
batches.push(batch);
}
return new ArrowTable(batches);
}
/** Returns a JSON Array of All results.
*
*/
async toArray(): Promise<any[]> {
const tbl = await this.toArrow();
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return tbl.toArray();
}
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>> {
const promise = this.inner.executeStream();
return new RecordBatchIterator(undefined, promise);
} }
} }

View File

@@ -95,10 +95,58 @@ export class Table {
return builder; return builder;
} }
search(vector?: number[]): Query { /**
const q = new Query(this); * Create a generic {@link Query} Builder.
if (vector !== undefined) { *
q.vector(vector); * When appropriate, various indices and statistics based pruning will be used to
* accelerate the query.
*
* @example
*
* ### Run a SQL-style query
* ```typescript
* for await (const batch of table.query()
* .filter("id > 1").select(["id"]).limit(20)) {
* console.log(batch);
* }
* ```
*
* ### Run Top-10 vector similarity search
* ```typescript
* for await (const batch of table.query()
* .nearestTo([1, 2, 3])
* .refineFactor(5).nprobe(10)
* .limit(10)) {
* console.log(batch);
* }
*```
*
* ### Scan the full dataset
* ```typescript
* for await (const batch of table.query()) {
* console.log(batch);
* }
*
* ### Return the full dataset as Arrow Table
* ```typescript
* let arrowTbl = await table.query().nearestTo([1.0, 2.0, 0.5, 6.7]).toArrow();
* ```
*
* @returns {@link Query}
*/
query(): Query {
return new Query(this.inner);
}
/** Search the table with a given query vector.
*
* This is a convenience method for preparing an ANN {@link Query}.
*/
search(vector: number[], column?: string): Query {
const q = this.query();
q.nearestTo(vector);
if (column !== undefined) {
q.column(column);
} }
return q; return q;
} }

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "vectordb-node" name = "vectordb-node"
version = "0.4.4" version = "0.4.6"
description = "Serverless, low-latency vector database for AI applications" description = "Serverless, low-latency vector database for AI applications"
license = "Apache-2.0" license = "Apache-2.0"
edition = "2018" edition = "2018"

View File

@@ -1,4 +1,4 @@
// Copyright 2023 Lance Developers. // Copyright 2024 Lance Developers.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@@ -19,19 +19,8 @@ use arrow_array::RecordBatch;
use arrow_ipc::reader::FileReader; use arrow_ipc::reader::FileReader;
use arrow_ipc::writer::FileWriter; use arrow_ipc::writer::FileWriter;
use arrow_schema::SchemaRef; use arrow_schema::SchemaRef;
use vectordb::table::VECTOR_COLUMN_NAME;
use crate::error::{MissingColumnSnafu, Result}; use crate::error::Result;
use snafu::prelude::*;
fn validate_vector_column(record_batch: &RecordBatch) -> Result<()> {
record_batch
.column_by_name(VECTOR_COLUMN_NAME)
.map(|_| ())
.context(MissingColumnSnafu {
name: VECTOR_COLUMN_NAME,
})
}
pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> { pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> {
let mut batches: Vec<RecordBatch> = Vec::new(); let mut batches: Vec<RecordBatch> = Vec::new();
@@ -39,7 +28,6 @@ pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBa
let schema = file_reader.schema(); let schema = file_reader.schema();
for b in file_reader { for b in file_reader {
let record_batch = b?; let record_batch = b?;
validate_vector_column(&record_batch)?;
batches.push(record_batch); batches.push(record_batch);
} }
Ok((batches, schema)) Ok((batches, schema))

View File

@@ -19,6 +19,7 @@ use neon::{
}; };
use crate::{error::ResultExt, runtime, table::JsTable}; use crate::{error::ResultExt, runtime, table::JsTable};
use vectordb::Table;
pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> { pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?; let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
@@ -35,7 +36,9 @@ pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsP
let idx_result = table let idx_result = table
.as_native() .as_native()
.unwrap() .unwrap()
.create_scalar_index(&column, replace) .create_index(&[&column])
.replace(replace)
.build()
.await; .await;
deferred.settle_with(&channel, move |mut cx| { deferred.settle_with(&channel, move |mut cx| {

View File

@@ -16,6 +16,7 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions; use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams}; use lance::dataset::{WriteMode, WriteParams};
use lance::io::ObjectStoreParams; use lance::io::ObjectStoreParams;
use vectordb::table::OptimizeAction;
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer}; use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use neon::prelude::*; use neon::prelude::*;
@@ -245,27 +246,30 @@ impl JsTable {
.map(|val| val.value(&mut cx) as i64) .map(|val| val.value(&mut cx) as i64)
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks .unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
let older_than = chrono::Duration::minutes(older_than); let older_than = chrono::Duration::minutes(older_than);
let delete_unverified: bool = cx let delete_unverified: Option<bool> = Some(
.argument_opt(1) cx.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok()) .and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx)) .map(|val| val.value(&mut cx))
.unwrap_or_default(); .unwrap_or_default(),
);
rt.spawn(async move { rt.spawn(async move {
let stats = table let stats = table
.as_native() .optimize(OptimizeAction::Prune {
.unwrap() older_than,
.cleanup_old_versions(older_than, Some(delete_unverified)) delete_unverified,
})
.await; .await;
deferred.settle_with(&channel, move |mut cx| { deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?; let stats = stats.or_throw(&mut cx)?;
let prune_stats = stats.prune.as_ref().expect("Prune stats missing");
let output_metrics = JsObject::new(&mut cx); let output_metrics = JsObject::new(&mut cx);
let bytes_removed = cx.number(stats.bytes_removed as f64); let bytes_removed = cx.number(prune_stats.bytes_removed as f64);
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?; output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
let old_versions = cx.number(stats.old_versions as f64); let old_versions = cx.number(prune_stats.old_versions as f64);
output_metrics.set(&mut cx, "oldVersions", old_versions)?; output_metrics.set(&mut cx, "oldVersions", old_versions)?;
let output_table = cx.boxed(JsTable::from(table)); let output_table = cx.boxed(JsTable::from(table));
@@ -317,13 +321,15 @@ impl JsTable {
rt.spawn(async move { rt.spawn(async move {
let stats = table let stats = table
.as_native() .optimize(OptimizeAction::Compact {
.unwrap() options,
.compact_files(options, None) remap_options: None,
})
.await; .await;
deferred.settle_with(&channel, move |mut cx| { deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?; let stats = stats.or_throw(&mut cx)?;
let stats = stats.compaction.as_ref().expect("Compact stats missing");
let output_metrics = JsObject::new(&mut cx); let output_metrics = JsObject::new(&mut cx);
let fragments_removed = cx.number(stats.fragments_removed as f64); let fragments_removed = cx.number(stats.fragments_removed as f64);

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "vectordb" name = "vectordb"
version = "0.4.4" version = "0.4.6"
edition = "2021" edition = "2021"
description = "LanceDB: A serverless, low-latency vector database for AI applications" description = "LanceDB: A serverless, low-latency vector database for AI applications"
license = "Apache-2.0" license = "Apache-2.0"

View File

@@ -68,6 +68,87 @@ pub trait Connection: Send + Sync {
async fn drop_table(&self, name: &str) -> Result<()>; async fn drop_table(&self, name: &str) -> Result<()>;
} }
#[derive(Debug)]
pub struct ConnectOptions {
/// Database URI
///
/// # Accpeted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - Lance Cloud
pub uri: String,
/// Lance Cloud API key
pub api_key: Option<String>,
/// Lance Cloud region
pub region: Option<String>,
/// Lance Cloud host override
pub host_override: Option<String>,
/// The maximum number of indices to cache in memory. Defaults to 256.
pub index_cache_size: u32,
}
impl ConnectOptions {
/// Create a new [`ConnectOptions`] with the given database URI.
pub fn new(uri: &str) -> Self {
Self {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
index_cache_size: 256,
}
}
pub fn api_key(mut self, api_key: &str) -> Self {
self.api_key = Some(api_key.to_string());
self
}
pub fn region(mut self, region: &str) -> Self {
self.region = Some(region.to_string());
self
}
pub fn host_override(mut self, host_override: &str) -> Self {
self.host_override = Some(host_override.to_string());
self
}
pub fn index_cache_size(mut self, index_cache_size: u32) -> Self {
self.index_cache_size = index_cache_size;
self
}
}
/// Connect to a LanceDB database.
///
/// # Arguments
///
/// - `uri` - URI where the database is located, can be a local file or a supported remote cloud storage
///
/// ## Accepted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - Lance Cloud
///
pub async fn connect(uri: &str) -> Result<Arc<dyn Connection>> {
let options = ConnectOptions::new(uri);
connect_with_options(&options).await
}
/// Connect with [`ConnectOptions`].
///
/// # Arguments
/// - `options` - [`ConnectOptions`] to connect to the database.
pub async fn connect_with_options(options: &ConnectOptions) -> Result<Arc<dyn Connection>> {
let db = Database::connect(&options.uri).await?;
Ok(Arc::new(db))
}
pub struct Database { pub struct Database {
object_store: ObjectStore, object_store: ObjectStore,
query_string: Option<String>, query_string: Option<String>,

View File

@@ -14,6 +14,7 @@
use std::{cmp::max, sync::Arc}; use std::{cmp::max, sync::Arc};
use lance::index::scalar::ScalarIndexParams;
use lance_index::{DatasetIndexExt, IndexType}; use lance_index::{DatasetIndexExt, IndexType};
pub use lance_linalg::distance::MetricType; pub use lance_linalg::distance::MetricType;
@@ -232,10 +233,14 @@ impl IndexBuilder {
let mut dataset = tbl.clone_inner_dataset(); let mut dataset = tbl.clone_inner_dataset();
match params { match params {
IndexParams::Scalar { replace } => { IndexParams::Scalar { replace } => {
self.table dataset
.as_native() .create_index(
.unwrap() &[&column],
.create_scalar_index(column, replace) IndexType::Scalar,
None,
&ScalarIndexParams::default(),
replace,
)
.await? .await?
} }
IndexParams::IvfPq { IndexParams::IvfPq {

View File

@@ -16,10 +16,10 @@
use std::io::Cursor; use std::io::Cursor;
use arrow_array::RecordBatchReader; use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_ipc::reader::StreamReader; use arrow_ipc::{reader::StreamReader, writer::FileWriter};
use crate::Result; use crate::{Error, Result};
/// Convert a Arrow IPC file to a batch reader /// Convert a Arrow IPC file to a batch reader
pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> { pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
@@ -28,6 +28,22 @@ pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
Ok(reader) Ok(reader)
} }
/// Convert record batches to Arrow IPC file
pub fn batches_to_ipc_file(batches: &[RecordBatch]) -> Result<Vec<u8>> {
if batches.is_empty() {
return Err(Error::Store {
message: "No batches to write".to_string(),
});
}
let schema = batches[0].schema();
let mut writer = FileWriter::try_new(vec![], &schema)?;
for batch in batches {
writer.write(batch)?;
}
writer.finish()?;
Ok(writer.into_inner()?)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@@ -41,15 +41,32 @@
//! //!
//! ### Quick Start //! ### Quick Start
//! //!
//! <div class="warning">Rust API is not stable yet.</div> //! <div class="warning">Rust API is not stable yet, please expect breaking changes.</div>
//! //!
//! #### Connect to a database. //! #### Connect to a database.
//! //!
//! ```rust //! ```rust
//! use vectordb::connection::Database; //! use vectordb::connect;
//! # use arrow_schema::{Field, Schema}; //! # use arrow_schema::{Field, Schema};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = Database::connect("data/sample-lancedb").await.unwrap(); //! let db = connect("data/sample-lancedb").await.unwrap();
//! # });
//! ```
//!
//! LanceDB accepts the different form of database path:
//!
//! - `/path/to/database` - local database on file system.
//! - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
//! - `db://dbname` - Lance Cloud
//!
//! You can also use [`ConnectOptions`] to configure the connectoin to the database.
//!
//! ```rust
//! use vectordb::{connect_with_options, ConnectOptions};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let options = ConnectOptions::new("data/sample-lancedb")
//! .index_cache_size(1024);
//! let db = connect_with_options(&options).await.unwrap();
//! # }); //! # });
//! ``` //! ```
//! //!
@@ -57,6 +74,8 @@
//! It treats [`FixedSizeList<Float16/Float32>`](https://docs.rs/arrow/latest/arrow/array/struct.FixedSizeListArray.html) //! It treats [`FixedSizeList<Float16/Float32>`](https://docs.rs/arrow/latest/arrow/array/struct.FixedSizeListArray.html)
//! columns as vector columns. //! columns as vector columns.
//! //!
//! For more details, please refer to [LanceDB documentation](https://lancedb.github.io/lancedb/).
//!
//! #### Create a table //! #### Create a table
//! //!
//! To create a Table, you need to provide a [`arrow_schema::Schema`] and a [`arrow_array::RecordBatch`] stream. //! To create a Table, you need to provide a [`arrow_schema::Schema`] and a [`arrow_array::RecordBatch`] stream.
@@ -67,10 +86,11 @@
//! use arrow_array::{RecordBatch, RecordBatchIterator}; //! use arrow_array::{RecordBatch, RecordBatchIterator};
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type}; //! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
//! # use vectordb::connection::{Database, Connection}; //! # use vectordb::connection::{Database, Connection};
//! # use vectordb::connect;
//! //!
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap(); //! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap(); //! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! let schema = Arc::new(Schema::new(vec![ //! let schema = Arc::new(Schema::new(vec![
//! Field::new("id", DataType::Int32, false), //! Field::new("id", DataType::Int32, false),
//! Field::new("vector", DataType::FixedSizeList( //! Field::new("vector", DataType::FixedSizeList(
@@ -94,13 +114,13 @@
//! //!
//! ```no_run //! ```no_run
//! # use std::sync::Arc; //! # use std::sync::Arc;
//! # use vectordb::connection::{Database, Connection}; //! # use vectordb::connect;
//! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch, //! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
//! # RecordBatchIterator, Int32Array}; //! # RecordBatchIterator, Int32Array};
//! # use arrow_schema::{Schema, Field, DataType}; //! # use arrow_schema::{Schema, Field, DataType};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap(); //! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap(); //! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let tbl = db.open_table("idx_test").await.unwrap(); //! # let tbl = db.open_table("idx_test").await.unwrap();
//! tbl.create_index(&["vector"]) //! tbl.create_index(&["vector"])
//! .ivf_pq() //! .ivf_pq()
@@ -166,4 +186,6 @@ pub use connection::{Connection, Database};
pub use error::{Error, Result}; pub use error::{Error, Result};
pub use table::{Table, TableRef}; pub use table::{Table, TableRef};
/// Connect to a database
pub use connection::{connect, connect_with_options, ConnectOptions};
pub use lance::dataset::WriteMode; pub use lance::dataset::WriteMode;

View File

@@ -22,6 +22,7 @@ use lance_linalg::distance::MetricType;
use crate::error::Result; use crate::error::Result;
use crate::utils::default_vector_column; use crate::utils::default_vector_column;
use crate::Error;
const DEFAULT_TOP_K: usize = 10; const DEFAULT_TOP_K: usize = 10;
@@ -93,6 +94,19 @@ impl Query {
let arrow_schema = Schema::from(self.dataset.schema()); let arrow_schema = Schema::from(self.dataset.schema());
default_vector_column(&arrow_schema, Some(query.len() as i32))? default_vector_column(&arrow_schema, Some(query.len() as i32))?
}; };
let field = self.dataset.schema().field(&column).ok_or(Error::Store {
message: format!("Column {} not found in dataset schema", column),
})?;
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
{
return Err(Error::Store {
message: format!(
"Vector column '{}' does not match the dimension of the query vector: dim={}",
column,
query.len(),
),
});
}
scanner.nearest(&column, query, self.limit.unwrap_or(DEFAULT_TOP_K))?; scanner.nearest(&column, query, self.limit.unwrap_or(DEFAULT_TOP_K))?;
} else { } else {
// If there is no vector query, it's ok to not have a limit // If there is no vector query, it's ok to not have a limit

View File

@@ -27,9 +27,9 @@ use lance::dataset::optimize::{
}; };
pub use lance::dataset::ReadParams; pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WriteParams}; use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
use lance::index::scalar::ScalarIndexParams;
use lance::io::WrappingObjectStore; use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt, IndexType}; use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
use log::info;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::index::vector::{VectorIndex, VectorIndexStatistics}; use crate::index::vector::{VectorIndex, VectorIndexStatistics};
@@ -38,7 +38,46 @@ use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam}; use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode; use crate::WriteMode;
pub const VECTOR_COLUMN_NAME: &str = "vector"; /// Optimize the dataset.
///
/// Similar to `VACUUM` in PostgreSQL, it offers different options to
/// optimize different parts of the table on disk.
///
/// By default, it optimizes everything, as [`OptimizeAction::All`].
pub enum OptimizeAction {
/// Run optimization on every, with default options.
All,
/// Compact files in the dataset
Compact {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
},
/// Prune old version of datasets.
Prune {
/// The duration of time to keep versions of the dataset.
older_than: Duration,
/// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
delete_unverified: Option<bool>,
},
/// Optimize index.
Index(OptimizeOptions),
}
impl Default for OptimizeAction {
fn default() -> Self {
Self::All
}
}
/// Statistics about the optimization.
pub struct OptimizeStats {
/// Stats of the file compaction.
pub compaction: Option<CompactionMetrics>,
/// Stats of the version pruning
pub prune: Option<RemovalStats>,
}
/// A Table is a collection of strong typed Rows. /// A Table is a collection of strong typed Rows.
/// ///
@@ -194,6 +233,14 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # }); /// # });
/// ``` /// ```
fn query(&self) -> Query; fn query(&self) -> Query;
/// Optimize the on-disk data and indices for better performance.
///
/// <section class="warning">Experimental API</section>
///
/// Modeled after ``VACCUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
} }
/// Reference to a Table pointer. /// Reference to a Table pointer.
@@ -396,17 +443,8 @@ impl NativeTable {
self.dataset.lock().expect("lock poison").version().version self.dataset.lock().expect("lock poison").version().version
} }
/// Create a scalar index on the table async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
pub async fn create_scalar_index(&self, column: &str, replace: bool) -> Result<()> { info!("LanceDB: optimizing indices: {:?}", options);
let mut dataset = self.clone_inner_dataset();
let params = ScalarIndexParams::default();
dataset
.create_index(&[column], IndexType::Scalar, None, &params, replace)
.await?;
Ok(())
}
pub async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
let mut dataset = self.clone_inner_dataset(); let mut dataset = self.clone_inner_dataset();
dataset.optimize_indices(options).await?; dataset.optimize_indices(options).await?;
@@ -463,7 +501,7 @@ impl NativeTable {
/// ///
/// This calls into [lance::dataset::Dataset::cleanup_old_versions] and /// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
/// returns the result. /// returns the result.
pub async fn cleanup_old_versions( async fn cleanup_old_versions(
&self, &self,
older_than: Duration, older_than: Duration,
delete_unverified: Option<bool>, delete_unverified: Option<bool>,
@@ -480,7 +518,7 @@ impl NativeTable {
/// for faster reads. /// for faster reads.
/// ///
/// This calls into [lance::dataset::optimize::compact_files]. /// This calls into [lance::dataset::optimize::compact_files].
pub async fn compact_files( async fn compact_files(
&self, &self,
options: CompactionOptions, options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, remap_options: Option<Arc<dyn IndexRemapperOptions>>,
@@ -614,6 +652,52 @@ impl Table for NativeTable {
self.reset_dataset(dataset); self.reset_dataset(dataset);
Ok(()) Ok(())
} }
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
let mut stats = OptimizeStats {
compaction: None,
prune: None,
};
match action {
OptimizeAction::All => {
stats.compaction = self
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await?
.compaction;
stats.prune = self
.optimize(OptimizeAction::Prune {
older_than: Duration::days(7),
delete_unverified: None,
})
.await?
.prune;
self.optimize(OptimizeAction::Index(OptimizeOptions::default()))
.await?;
}
OptimizeAction::Compact {
options,
remap_options,
} => {
stats.compaction = Some(self.compact_files(options, remap_options).await?);
}
OptimizeAction::Prune {
older_than,
delete_unverified,
} => {
stats.prune = Some(
self.cleanup_old_versions(older_than, delete_unverified)
.await?,
);
}
OptimizeAction::Index(options) => {
self.optimize_indices(&options).await?;
}
}
Ok(stats)
}
} }
#[cfg(test)] #[cfg(test)]