Compare commits

...

8 Commits

Author SHA1 Message Date
Lance Release
ba6f949515 Bump version: 0.4.5 → 0.4.6 2024-01-26 22:40:36 +00:00
Lei Xu
3dd8522bc9 feat(rust): provide connect and connect_with_options in Rust SDK (#871)
* Bring the feature parity of Rust connect methods.
* A global connect method that can connect to local and remote / cloud
table, as the same as in js/python today.
2024-01-26 11:40:11 -08:00
Lei Xu
e01ef63488 chore(rust): simplified version of optimize (#869)
Consolidate various optimize() into one method, similar to postgres
VACCUM in the process of preparing Rust API for public use
2024-01-26 11:36:04 -08:00
Lei Xu
a6cf24b359 feat(napi): Issue queries as node SDK (#868)
* Query as a fluent API and `AsyncIterator<RecordBatch>`
* Much more docs
* Add tests for auto infer vector search columns with different
dimensions.
2024-01-25 22:14:14 -08:00
Lance Release
9a07c9aad8 Updating package-lock.json 2024-01-25 21:49:36 +00:00
Lance Release
d405798952 Updating package-lock.json 2024-01-25 20:54:55 +00:00
Lance Release
e8a8b92b2a Bump version: 0.4.4 → 0.4.5 2024-01-25 20:54:44 +00:00
Lei Xu
66362c6506 fix: release build for node sdk (#861) 2024-01-25 12:51:32 -08:00
25 changed files with 690 additions and 145 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.4
current_version = 0.4.6
commit = True
message = Bump version: {current_version} → {new_version}
tag = True

74
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.4.4",
"version": "0.4.5",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.4.4",
"version": "0.4.5",
"cpu": [
"x64",
"arm64"
@@ -53,11 +53,11 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.4",
"@lancedb/vectordb-darwin-x64": "0.4.4",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.4",
"@lancedb/vectordb-linux-x64-gnu": "0.4.4",
"@lancedb/vectordb-win32-x64-msvc": "0.4.4"
"@lancedb/vectordb-darwin-arm64": "0.4.5",
"@lancedb/vectordb-darwin-x64": "0.4.5",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.5",
"@lancedb/vectordb-linux-x64-gnu": "0.4.5",
"@lancedb/vectordb-win32-x64-msvc": "0.4.5"
}
},
"node_modules/@75lb/deep-merge": {
@@ -328,6 +328,66 @@
"@jridgewell/sourcemap-codec": "^1.4.10"
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.5.tgz",
"integrity": "sha512-sR+Q9dRBzMm+NGqM7EiK07c7pQz/V4J//23p05CeO/YATjKYyU3jE/dmVenLjJGW2UUrRYiyUQ9X6Up+OOgdhA==",
"cpu": [
"arm64"
],
"optional": true,
"os": [
"darwin"
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.5.tgz",
"integrity": "sha512-/BIyUeVkLaUlOEQN4HUQ9J9ZdNWkDpZPUUS9kfz5iYIjotgwpSfznF8Q1GY5BVuXa2ke7GC3tnkwwd5ZMOuDsA==",
"cpu": [
"x64"
],
"optional": true,
"os": [
"darwin"
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.5.tgz",
"integrity": "sha512-bq8vX7znIf2Dap41YIbB5uA/YahwaLvFPNH0WmwqeBWxF64/AJ74DsZk51ftwczQMsyLK74M8f1PzniapMAR+Q==",
"cpu": [
"arm64"
],
"optional": true,
"os": [
"linux"
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.5.tgz",
"integrity": "sha512-5qCWFyxihyMDYIGRAdQ7zv3enBEDxPR08dCmXr2Bu9yYI3SUqfuSvFX1NwflVeB+RzRMMbeG4xiaEbo7H7/H3Q==",
"cpu": [
"x64"
],
"optional": true,
"os": [
"linux"
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.4.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.5.tgz",
"integrity": "sha512-z3dZ6TDzm2EU5gNuejshArs3o84v1rdXnds22TTuc9fVhwg5JG87FyHFZKU1MGuyLuZW22Me0YDuS9VR+eAp0Q==",
"cpu": [
"x64"
],
"optional": true,
"os": [
"win32"
]
},
"node_modules/@neon-rs/cli": {
"version": "0.0.160",
"resolved": "https://registry.npmjs.org/@neon-rs/cli/-/cli-0.0.160.tgz",

View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.4.4",
"version": "0.4.6",
"description": " Serverless, low-latency vector database for AI applications",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"tsc": "tsc -b",
"build": "cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json && tsc -b",
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json",
"build-release": "npm run build -- --release",
"test": "npm run tsc && mocha -recursive dist/test",
"integration-test": "npm run tsc && mocha -recursive dist/integration_test",
@@ -81,10 +81,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.4",
"@lancedb/vectordb-darwin-x64": "0.4.4",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.4",
"@lancedb/vectordb-linux-x64-gnu": "0.4.4",
"@lancedb/vectordb-win32-x64-msvc": "0.4.4"
"@lancedb/vectordb-darwin-arm64": "0.4.6",
"@lancedb/vectordb-darwin-x64": "0.4.6",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.6",
"@lancedb/vectordb-linux-x64-gnu": "0.4.6",
"@lancedb/vectordb-win32-x64-msvc": "0.4.6"
}
}

View File

@@ -391,24 +391,6 @@ describe('LanceDB client', function () {
})
}).timeout(120000)
it('fails to create a new table when the vector column is missing', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const data = [
{
id: 1,
price: 10
}
]
const create = con.createTable('missing_vector', data)
await expect(create).to.be.rejectedWith(
Error,
"column 'vector' is missing"
)
})
it('use overwrite flag to overwrite existing table', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)

View File

@@ -10,14 +10,15 @@ crate-type = ["cdylib"]
[dependencies]
arrow-ipc.workspace = true
futures.workspace = true
lance-linalg.workspace = true
lance.workspace = true
vectordb = { path = "../rust/vectordb" }
napi = { version = "2.14", default-features = false, features = [
"napi7",
"async"
] }
napi-derive = "2.14"
vectordb = { path = "../rust/vectordb" }
lance.workspace = true
lance-linalg.workspace = true
[build-dependencies]
napi-build = "2.1"

View File

@@ -53,6 +53,16 @@ describe("Test creating index", () => {
const indexDir = path.join(tmpDir, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
// TODO: check index type.
// Search without specifying the column
let query_vector = data.toArray()[5].vec.toJSON();
let rst = await tbl.query().nearestTo(query_vector).limit(2).toArrow();
expect(rst.numRows).toBe(2);
// Search with specifying the column
let rst2 = await tbl.search(query_vector, "vec").limit(2).toArrow();
expect(rst2.numRows).toBe(2);
expect(rst.toString()).toEqual(rst2.toString());
});
test("no vector column available", async () => {
@@ -71,6 +81,80 @@ describe("Test creating index", () => {
await tbl.createIndex("val").build();
const indexDir = path.join(tmpDir, "no_vec.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
for await (const r of tbl.query().filter("id > 1").select(["id"])) {
expect(r.numRows).toBe(1);
}
});
test("two columns with different dimensions", async () => {
const db = await connect(tmpDir);
const schema = new Schema([
new Field("id", new Int32(), true),
new Field("vec", new FixedSizeList(32, new Field("item", new Float32()))),
new Field(
"vec2",
new FixedSizeList(64, new Field("item", new Float32()))
),
]);
const tbl = await db.createTable(
"two_vectors",
makeArrowTable(
Array(300)
.fill(1)
.map((_, i) => ({
id: i,
vec: Array(32)
.fill(1)
.map(() => Math.random()),
vec2: Array(64) // different dimension
.fill(1)
.map(() => Math.random()),
})),
{ schema }
)
);
// Only build index over v1
await expect(tbl.createIndex().build()).rejects.toThrow(
/.*More than one vector columns found.*/
);
tbl
.createIndex("vec")
.ivf_pq({ num_partitions: 2, num_sub_vectors: 2 })
.build();
const rst = await tbl
.query()
.nearestTo(
Array(32)
.fill(1)
.map(() => Math.random())
)
.limit(2)
.toArrow();
expect(rst.numRows).toBe(2);
// Search with specifying the column
await expect(
tbl
.search(
Array(64)
.fill(1)
.map(() => Math.random()),
"vec"
)
.limit(2)
.toArrow()
).rejects.toThrow(/.*does not match the dimension.*/);
const query64 = Array(64)
.fill(1)
.map(() => Math.random());
const rst64_1 = await tbl.query().nearestTo(query64).limit(2).toArrow();
const rst64_2 = await tbl.search(query64, "vec2").limit(2).toArrow();
expect(rst64_1.toString()).toEqual(rst64_2.toString());
expect(rst64_1.numRows).toBe(2);
});
test("create scalar index", async () => {

View File

@@ -91,7 +91,6 @@ impl IndexBuilder {
#[napi]
pub async fn build(&self) -> napi::Result<()> {
println!("nodejs::index.rs : build");
self.inner
.build()
.await

47
nodejs/src/iterator.rs Normal file
View File

@@ -0,0 +1,47 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::StreamExt;
use lance::io::RecordBatchStream;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::ipc::batches_to_ipc_file;
/** Typescript-style Async Iterator over RecordBatches */
#[napi]
pub struct RecordBatchIterator {
inner: Box<dyn RecordBatchStream + Unpin>,
}
#[napi]
impl RecordBatchIterator {
pub(crate) fn new(inner: Box<dyn RecordBatchStream + Unpin>) -> Self {
Self { inner }
}
#[napi]
pub async unsafe fn next(&mut self) -> napi::Result<Option<Buffer>> {
if let Some(rst) = self.inner.next().await {
let batch = rst.map_err(|e| {
napi::Error::from_reason(format!("Failed to get next batch from stream: {}", e))
})?;
batches_to_ipc_file(&[batch])
.map_err(|e| napi::Error::from_reason(format!("Failed to write IPC file: {}", e)))
.map(|buf| Some(Buffer::from(buf)))
} else {
// We are done with the stream.
Ok(None)
}
}
}

View File

@@ -17,6 +17,7 @@ use napi_derive::*;
mod connection;
mod index;
mod iterator;
mod query;
mod table;

View File

@@ -16,7 +16,7 @@ use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::query::Query as LanceDBQuery;
use crate::table::Table;
use crate::{iterator::RecordBatchIterator, table::Table};
#[napi]
pub struct Query {
@@ -32,17 +32,50 @@ impl Query {
}
#[napi]
pub fn vector(&mut self, vector: Float32Array) {
let inn = self.inner.clone().nearest_to(&vector);
self.inner = inn;
pub fn column(&mut self, column: String) {
self.inner = self.inner.clone().column(&column);
}
#[napi]
pub fn to_arrow(&self) -> napi::Result<()> {
// let buf = self.inner.to_arrow().map_err(|e| {
// napi::Error::from_reason(format!("Failed to convert query to arrow: {}", e))
// })?;
// Ok(buf)
todo!()
pub fn filter(&mut self, filter: String) {
self.inner = self.inner.clone().filter(filter);
}
#[napi]
pub fn select(&mut self, columns: Vec<String>) {
self.inner = self.inner.clone().select(&columns);
}
#[napi]
pub fn limit(&mut self, limit: u32) {
self.inner = self.inner.clone().limit(limit as usize);
}
#[napi]
pub fn prefilter(&mut self, prefilter: bool) {
self.inner = self.inner.clone().prefilter(prefilter);
}
#[napi]
pub fn nearest_to(&mut self, vector: Float32Array) {
self.inner = self.inner.clone().nearest_to(&vector);
}
#[napi]
pub fn refine_factor(&mut self, refine_factor: u32) {
self.inner = self.inner.clone().refine_factor(refine_factor);
}
#[napi]
pub fn nprobes(&mut self, nprobe: u32) {
self.inner = self.inner.clone().nprobes(nprobe as usize);
}
#[napi]
pub async fn execute_stream(&self) -> napi::Result<RecordBatchIterator> {
let inner_stream = self.inner.execute_stream().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
Ok(RecordBatchIterator::new(Box::new(inner_stream)))
}
}

View File

@@ -54,9 +54,20 @@ export class IndexBuilder {
scalar(): void
build(): Promise<void>
}
/** Typescript-style Async Iterator over RecordBatches */
export class RecordBatchIterator {
next(): Promise<Buffer | null>
}
export class Query {
vector(vector: Float32Array): void
toArrow(): void
column(column: string): void
filter(filter: string): void
select(columns: Array<string>): void
limit(limit: number): void
prefilter(prefilter: boolean): void
nearestTo(vector: Float32Array): void
refineFactor(refineFactor: number): void
nprobes(nprobe: number): void
executeStream(): Promise<RecordBatchIterator>
}
export class Table {
/** Return Schema as empty Arrow IPC file. */

View File

@@ -295,12 +295,13 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { Connection, IndexType, MetricType, IndexBuilder, Query, Table, WriteMode, connect } = nativeBinding
const { Connection, IndexType, MetricType, IndexBuilder, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding
module.exports.Connection = Connection
module.exports.IndexType = IndexType
module.exports.MetricType = MetricType
module.exports.IndexBuilder = IndexBuilder
module.exports.RecordBatchIterator = RecordBatchIterator
module.exports.Query = Query
module.exports.Table = Table
module.exports.WriteMode = WriteMode

View File

@@ -12,46 +12,73 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { RecordBatch } from "apache-arrow";
import { Table } from "./table";
import { RecordBatch, tableFromIPC, Table as ArrowTable } from "apache-arrow";
import {
RecordBatchIterator as NativeBatchIterator,
Query as NativeQuery,
Table as NativeTable,
} from "./native";
// TODO: re-eanble eslint once we have a real implementation
/* eslint-disable */
class RecordBatchIterator implements AsyncIterator<RecordBatch> {
next(
...args: [] | [undefined]
): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented.");
private promised_inner?: Promise<NativeBatchIterator>;
private inner?: NativeBatchIterator;
constructor(
inner?: NativeBatchIterator,
promise?: Promise<NativeBatchIterator>
) {
// TODO: check promise reliably so we dont need to pass two arguments.
this.inner = inner;
this.promised_inner = promise;
}
return?(value?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented.");
}
throw?(e?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented.");
async next(): Promise<IteratorResult<RecordBatch<any>, any>> {
if (this.inner === undefined) {
this.inner = await this.promised_inner;
}
if (this.inner === undefined) {
throw new Error("Invalid iterator state state");
}
const n = await this.inner.next();
if (n == null) {
return Promise.resolve({ done: true, value: null });
}
const tbl = tableFromIPC(n);
if (tbl.batches.length != 1) {
throw new Error("Expected only one batch");
}
return Promise.resolve({ done: false, value: tbl.batches[0] });
}
}
/* eslint-enable */
/** Query executor */
export class Query implements AsyncIterable<RecordBatch> {
private readonly tbl: Table;
private _filter?: string;
private _limit?: number;
private readonly inner: NativeQuery;
// Vector search
private _vector?: Float32Array;
private _nprobes?: number;
private _refine_factor?: number = 1;
constructor(tbl: NativeTable) {
this.inner = tbl.query();
}
constructor(tbl: Table) {
this.tbl = tbl;
/** Set the column to run query. */
column(column: string): Query {
this.inner.column(column);
return this;
}
/** Set the filter predicate, only returns the results that satisfy the filter.
*
*/
filter(predicate: string): Query {
this._filter = predicate;
this.inner.filter(predicate);
return this;
}
/**
* Select the columns to return. If not set, all columns are returned.
*/
select(columns: string[]): Query {
this.inner.select(columns);
return this;
}
@@ -59,35 +86,67 @@ export class Query implements AsyncIterable<RecordBatch> {
* Set the limit of rows to return.
*/
limit(limit: number): Query {
this._limit = limit;
this.inner.limit(limit);
return this;
}
prefilter(prefilter: boolean): Query {
this.inner.prefilter(prefilter);
return this;
}
/**
* Set the query vector.
*/
vector(vector: number[]): Query {
this._vector = Float32Array.from(vector);
nearestTo(vector: number[]): Query {
this.inner.nearestTo(Float32Array.from(vector));
return this;
}
/**
* Set the number of probes to use for the query.
* Set the number of IVF partitions to use for the query.
*/
nprobes(nprobes: number): Query {
this._nprobes = nprobes;
this.inner.nprobes(nprobes);
return this;
}
/**
* Set the refine factor for the query.
*/
refine_factor(refine_factor: number): Query {
this._refine_factor = refine_factor;
refineFactor(refine_factor: number): Query {
this.inner.refineFactor(refine_factor);
return this;
}
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>, any, undefined> {
throw new RecordBatchIterator();
/**
* Execute the query and return the results as an AsyncIterator.
*/
async executeStream(): Promise<RecordBatchIterator> {
const inner = await this.inner.executeStream();
return new RecordBatchIterator(inner);
}
/** Collect the results as an Arrow Table. */
async toArrow(): Promise<ArrowTable> {
const batches = [];
for await (const batch of this) {
batches.push(batch);
}
return new ArrowTable(batches);
}
/** Returns a JSON Array of All results.
*
*/
async toArray(): Promise<any[]> {
const tbl = await this.toArrow();
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return tbl.toArray();
}
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>> {
const promise = this.inner.executeStream();
return new RecordBatchIterator(undefined, promise);
}
}

View File

@@ -95,10 +95,58 @@ export class Table {
return builder;
}
search(vector?: number[]): Query {
const q = new Query(this);
if (vector !== undefined) {
q.vector(vector);
/**
* Create a generic {@link Query} Builder.
*
* When appropriate, various indices and statistics based pruning will be used to
* accelerate the query.
*
* @example
*
* ### Run a SQL-style query
* ```typescript
* for await (const batch of table.query()
* .filter("id > 1").select(["id"]).limit(20)) {
* console.log(batch);
* }
* ```
*
* ### Run Top-10 vector similarity search
* ```typescript
* for await (const batch of table.query()
* .nearestTo([1, 2, 3])
* .refineFactor(5).nprobe(10)
* .limit(10)) {
* console.log(batch);
* }
*```
*
* ### Scan the full dataset
* ```typescript
* for await (const batch of table.query()) {
* console.log(batch);
* }
*
* ### Return the full dataset as Arrow Table
* ```typescript
* let arrowTbl = await table.query().nearestTo([1.0, 2.0, 0.5, 6.7]).toArrow();
* ```
*
* @returns {@link Query}
*/
query(): Query {
return new Query(this.inner);
}
/** Search the table with a given query vector.
*
* This is a convenience method for preparing an ANN {@link Query}.
*/
search(vector: number[], column?: string): Query {
const q = this.query();
q.nearestTo(vector);
if (column !== undefined) {
q.column(column);
}
return q;
}

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb-node"
version = "0.4.4"
version = "0.4.6"
description = "Serverless, low-latency vector database for AI applications"
license = "Apache-2.0"
edition = "2018"

View File

@@ -1,4 +1,4 @@
// Copyright 2023 Lance Developers.
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -19,19 +19,8 @@ use arrow_array::RecordBatch;
use arrow_ipc::reader::FileReader;
use arrow_ipc::writer::FileWriter;
use arrow_schema::SchemaRef;
use vectordb::table::VECTOR_COLUMN_NAME;
use crate::error::{MissingColumnSnafu, Result};
use snafu::prelude::*;
fn validate_vector_column(record_batch: &RecordBatch) -> Result<()> {
record_batch
.column_by_name(VECTOR_COLUMN_NAME)
.map(|_| ())
.context(MissingColumnSnafu {
name: VECTOR_COLUMN_NAME,
})
}
use crate::error::Result;
pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> {
let mut batches: Vec<RecordBatch> = Vec::new();
@@ -39,7 +28,6 @@ pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBa
let schema = file_reader.schema();
for b in file_reader {
let record_batch = b?;
validate_vector_column(&record_batch)?;
batches.push(record_batch);
}
Ok((batches, schema))

View File

@@ -19,6 +19,7 @@ use neon::{
};
use crate::{error::ResultExt, runtime, table::JsTable};
use vectordb::Table;
pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
@@ -35,7 +36,9 @@ pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsP
let idx_result = table
.as_native()
.unwrap()
.create_scalar_index(&column, replace)
.create_index(&[&column])
.replace(replace)
.build()
.await;
deferred.settle_with(&channel, move |mut cx| {

View File

@@ -16,6 +16,7 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::ObjectStoreParams;
use vectordb::table::OptimizeAction;
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use neon::prelude::*;
@@ -245,27 +246,30 @@ impl JsTable {
.map(|val| val.value(&mut cx) as i64)
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
let older_than = chrono::Duration::minutes(older_than);
let delete_unverified: bool = cx
.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default();
let delete_unverified: Option<bool> = Some(
cx.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default(),
);
rt.spawn(async move {
let stats = table
.as_native()
.unwrap()
.cleanup_old_versions(older_than, Some(delete_unverified))
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified,
})
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let prune_stats = stats.prune.as_ref().expect("Prune stats missing");
let output_metrics = JsObject::new(&mut cx);
let bytes_removed = cx.number(stats.bytes_removed as f64);
let bytes_removed = cx.number(prune_stats.bytes_removed as f64);
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
let old_versions = cx.number(stats.old_versions as f64);
let old_versions = cx.number(prune_stats.old_versions as f64);
output_metrics.set(&mut cx, "oldVersions", old_versions)?;
let output_table = cx.boxed(JsTable::from(table));
@@ -317,13 +321,15 @@ impl JsTable {
rt.spawn(async move {
let stats = table
.as_native()
.unwrap()
.compact_files(options, None)
.optimize(OptimizeAction::Compact {
options,
remap_options: None,
})
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let stats = stats.compaction.as_ref().expect("Compact stats missing");
let output_metrics = JsObject::new(&mut cx);
let fragments_removed = cx.number(stats.fragments_removed as f64);

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb"
version = "0.4.4"
version = "0.4.6"
edition = "2021"
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license = "Apache-2.0"

View File

@@ -68,6 +68,87 @@ pub trait Connection: Send + Sync {
async fn drop_table(&self, name: &str) -> Result<()>;
}
#[derive(Debug)]
pub struct ConnectOptions {
/// Database URI
///
/// # Accpeted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - Lance Cloud
pub uri: String,
/// Lance Cloud API key
pub api_key: Option<String>,
/// Lance Cloud region
pub region: Option<String>,
/// Lance Cloud host override
pub host_override: Option<String>,
/// The maximum number of indices to cache in memory. Defaults to 256.
pub index_cache_size: u32,
}
impl ConnectOptions {
/// Create a new [`ConnectOptions`] with the given database URI.
pub fn new(uri: &str) -> Self {
Self {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
index_cache_size: 256,
}
}
pub fn api_key(mut self, api_key: &str) -> Self {
self.api_key = Some(api_key.to_string());
self
}
pub fn region(mut self, region: &str) -> Self {
self.region = Some(region.to_string());
self
}
pub fn host_override(mut self, host_override: &str) -> Self {
self.host_override = Some(host_override.to_string());
self
}
pub fn index_cache_size(mut self, index_cache_size: u32) -> Self {
self.index_cache_size = index_cache_size;
self
}
}
/// Connect to a LanceDB database.
///
/// # Arguments
///
/// - `uri` - URI where the database is located, can be a local file or a supported remote cloud storage
///
/// ## Accepted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - Lance Cloud
///
pub async fn connect(uri: &str) -> Result<Arc<dyn Connection>> {
let options = ConnectOptions::new(uri);
connect_with_options(&options).await
}
/// Connect with [`ConnectOptions`].
///
/// # Arguments
/// - `options` - [`ConnectOptions`] to connect to the database.
pub async fn connect_with_options(options: &ConnectOptions) -> Result<Arc<dyn Connection>> {
let db = Database::connect(&options.uri).await?;
Ok(Arc::new(db))
}
pub struct Database {
object_store: ObjectStore,
query_string: Option<String>,

View File

@@ -14,6 +14,7 @@
use std::{cmp::max, sync::Arc};
use lance::index::scalar::ScalarIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
pub use lance_linalg::distance::MetricType;
@@ -232,10 +233,14 @@ impl IndexBuilder {
let mut dataset = tbl.clone_inner_dataset();
match params {
IndexParams::Scalar { replace } => {
self.table
.as_native()
.unwrap()
.create_scalar_index(column, replace)
dataset
.create_index(
&[&column],
IndexType::Scalar,
None,
&ScalarIndexParams::default(),
replace,
)
.await?
}
IndexParams::IvfPq {

View File

@@ -16,10 +16,10 @@
use std::io::Cursor;
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::StreamReader;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_ipc::{reader::StreamReader, writer::FileWriter};
use crate::Result;
use crate::{Error, Result};
/// Convert a Arrow IPC file to a batch reader
pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
@@ -28,6 +28,22 @@ pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
Ok(reader)
}
/// Convert record batches to Arrow IPC file
pub fn batches_to_ipc_file(batches: &[RecordBatch]) -> Result<Vec<u8>> {
if batches.is_empty() {
return Err(Error::Store {
message: "No batches to write".to_string(),
});
}
let schema = batches[0].schema();
let mut writer = FileWriter::try_new(vec![], &schema)?;
for batch in batches {
writer.write(batch)?;
}
writer.finish()?;
Ok(writer.into_inner()?)
}
#[cfg(test)]
mod tests {

View File

@@ -41,15 +41,32 @@
//!
//! ### Quick Start
//!
//! <div class="warning">Rust API is not stable yet.</div>
//! <div class="warning">Rust API is not stable yet, please expect breaking changes.</div>
//!
//! #### Connect to a database.
//!
//! ```rust
//! use vectordb::connection::Database;
//! use vectordb::connect;
//! # use arrow_schema::{Field, Schema};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = Database::connect("data/sample-lancedb").await.unwrap();
//! let db = connect("data/sample-lancedb").await.unwrap();
//! # });
//! ```
//!
//! LanceDB accepts the different form of database path:
//!
//! - `/path/to/database` - local database on file system.
//! - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
//! - `db://dbname` - Lance Cloud
//!
//! You can also use [`ConnectOptions`] to configure the connectoin to the database.
//!
//! ```rust
//! use vectordb::{connect_with_options, ConnectOptions};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let options = ConnectOptions::new("data/sample-lancedb")
//! .index_cache_size(1024);
//! let db = connect_with_options(&options).await.unwrap();
//! # });
//! ```
//!
@@ -57,6 +74,8 @@
//! It treats [`FixedSizeList<Float16/Float32>`](https://docs.rs/arrow/latest/arrow/array/struct.FixedSizeListArray.html)
//! columns as vector columns.
//!
//! For more details, please refer to [LanceDB documentation](https://lancedb.github.io/lancedb/).
//!
//! #### Create a table
//!
//! To create a Table, you need to provide a [`arrow_schema::Schema`] and a [`arrow_array::RecordBatch`] stream.
@@ -67,10 +86,11 @@
//! use arrow_array::{RecordBatch, RecordBatchIterator};
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
//! # use vectordb::connection::{Database, Connection};
//! # use vectordb::connect;
//!
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! let schema = Arc::new(Schema::new(vec![
//! Field::new("id", DataType::Int32, false),
//! Field::new("vector", DataType::FixedSizeList(
@@ -94,13 +114,13 @@
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use vectordb::connection::{Database, Connection};
//! # use vectordb::connect;
//! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
//! # RecordBatchIterator, Int32Array};
//! # use arrow_schema::{Schema, Field, DataType};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let tbl = db.open_table("idx_test").await.unwrap();
//! tbl.create_index(&["vector"])
//! .ivf_pq()
@@ -166,4 +186,6 @@ pub use connection::{Connection, Database};
pub use error::{Error, Result};
pub use table::{Table, TableRef};
/// Connect to a database
pub use connection::{connect, connect_with_options, ConnectOptions};
pub use lance::dataset::WriteMode;

View File

@@ -22,6 +22,7 @@ use lance_linalg::distance::MetricType;
use crate::error::Result;
use crate::utils::default_vector_column;
use crate::Error;
const DEFAULT_TOP_K: usize = 10;
@@ -93,6 +94,19 @@ impl Query {
let arrow_schema = Schema::from(self.dataset.schema());
default_vector_column(&arrow_schema, Some(query.len() as i32))?
};
let field = self.dataset.schema().field(&column).ok_or(Error::Store {
message: format!("Column {} not found in dataset schema", column),
})?;
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
{
return Err(Error::Store {
message: format!(
"Vector column '{}' does not match the dimension of the query vector: dim={}",
column,
query.len(),
),
});
}
scanner.nearest(&column, query, self.limit.unwrap_or(DEFAULT_TOP_K))?;
} else {
// If there is no vector query, it's ok to not have a limit

View File

@@ -27,9 +27,9 @@ use lance::dataset::optimize::{
};
pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
use lance::index::scalar::ScalarIndexParams;
use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt, IndexType};
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
use log::info;
use crate::error::{Error, Result};
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
@@ -38,7 +38,46 @@ use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode;
pub const VECTOR_COLUMN_NAME: &str = "vector";
/// Optimize the dataset.
///
/// Similar to `VACUUM` in PostgreSQL, it offers different options to
/// optimize different parts of the table on disk.
///
/// By default, it optimizes everything, as [`OptimizeAction::All`].
pub enum OptimizeAction {
/// Run optimization on every, with default options.
All,
/// Compact files in the dataset
Compact {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
},
/// Prune old version of datasets.
Prune {
/// The duration of time to keep versions of the dataset.
older_than: Duration,
/// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
delete_unverified: Option<bool>,
},
/// Optimize index.
Index(OptimizeOptions),
}
impl Default for OptimizeAction {
fn default() -> Self {
Self::All
}
}
/// Statistics about the optimization.
pub struct OptimizeStats {
/// Stats of the file compaction.
pub compaction: Option<CompactionMetrics>,
/// Stats of the version pruning
pub prune: Option<RemovalStats>,
}
/// A Table is a collection of strong typed Rows.
///
@@ -194,6 +233,14 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # });
/// ```
fn query(&self) -> Query;
/// Optimize the on-disk data and indices for better performance.
///
/// <section class="warning">Experimental API</section>
///
/// Modeled after ``VACCUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
}
/// Reference to a Table pointer.
@@ -396,17 +443,8 @@ impl NativeTable {
self.dataset.lock().expect("lock poison").version().version
}
/// Create a scalar index on the table
pub async fn create_scalar_index(&self, column: &str, replace: bool) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
let params = ScalarIndexParams::default();
dataset
.create_index(&[column], IndexType::Scalar, None, &params, replace)
.await?;
Ok(())
}
pub async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
let mut dataset = self.clone_inner_dataset();
dataset.optimize_indices(options).await?;
@@ -463,7 +501,7 @@ impl NativeTable {
///
/// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
/// returns the result.
pub async fn cleanup_old_versions(
async fn cleanup_old_versions(
&self,
older_than: Duration,
delete_unverified: Option<bool>,
@@ -480,7 +518,7 @@ impl NativeTable {
/// for faster reads.
///
/// This calls into [lance::dataset::optimize::compact_files].
pub async fn compact_files(
async fn compact_files(
&self,
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
@@ -614,6 +652,52 @@ impl Table for NativeTable {
self.reset_dataset(dataset);
Ok(())
}
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
let mut stats = OptimizeStats {
compaction: None,
prune: None,
};
match action {
OptimizeAction::All => {
stats.compaction = self
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await?
.compaction;
stats.prune = self
.optimize(OptimizeAction::Prune {
older_than: Duration::days(7),
delete_unverified: None,
})
.await?
.prune;
self.optimize(OptimizeAction::Index(OptimizeOptions::default()))
.await?;
}
OptimizeAction::Compact {
options,
remap_options,
} => {
stats.compaction = Some(self.compact_files(options, remap_options).await?);
}
OptimizeAction::Prune {
older_than,
delete_unverified,
} => {
stats.prune = Some(
self.cleanup_old_versions(older_than, delete_unverified)
.await?,
);
}
OptimizeAction::Index(options) => {
self.optimize_indices(&options).await?;
}
}
Ok(stats)
}
}
#[cfg(test)]