add create index to nodejs client (#89)

This commit is contained in:
gsilvestrin
2023-05-24 16:45:58 -06:00
committed by GitHub
parent 06cb7b6458
commit f923cfe47f
11 changed files with 505 additions and 8 deletions

2
Cargo.lock generated
View File

@@ -3359,8 +3359,10 @@ name = "vectordb"
version = "0.0.1"
dependencies = [
"arrow-array",
"arrow-data",
"arrow-schema",
"lance",
"rand",
"tempfile",
"tokio",
]

View File

@@ -21,7 +21,7 @@ import {
import { fromRecordsToBuffer } from './arrow'
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { databaseNew, databaseTableNames, databaseOpenTable, tableCreate, tableSearch, tableAdd } = require('../native.js')
const { databaseNew, databaseTableNames, databaseOpenTable, tableCreate, tableSearch, tableAdd, tableCreateVectorIndex } = require('../native.js')
/**
* Connect to a LanceDB instance at the given URI
@@ -118,8 +118,62 @@ export class Table {
async overwrite (data: Array<Record<string, unknown>>): Promise<number> {
return tableAdd.call(this._tbl, await fromRecordsToBuffer(data), WriteMode.Overwrite.toString())
}
async create_index (indexParams: VectorIndexParams): Promise<any> {
return tableCreateVectorIndex.call(this._tbl, indexParams)
}
}
interface IvfPQIndexConfig {
/**
* The column to be indexed
*/
column?: string
/**
* A unique name for the index
*/
index_name?: string
/**
* Metric type, L2 or Cosine
*/
metric_type?: MetricType
/**
* The number of partitions this index
*/
num_partitions?: number
/**
* The max number of iterations for kmeans training.
*/
max_iters?: number
/**
* Train as optimized product quantization.
*/
use_opq?: boolean
/**
* Number of subvectors to build PQ code
*/
num_sub_vectors?: number
/**
* The number of bits to present one PQ centroid.
*/
num_bits?: number
/**
* Max number of iterations to train OPQ, if `use_opq` is true.
*/
max_opq_iters?: number
type: 'ivf_pq'
}
export type VectorIndexParams = IvfPQIndexConfig
/**
* A builder for nearest neighbor queries for LanceDB.
*/

View File

@@ -68,7 +68,7 @@ describe('LanceDB client', function () {
const uri = await createTestDB()
const con = await lancedb.connect(uri)
const table = await con.openTable('vectors')
const results = await table.search([0.1, 0.3]).filter('id == 2').execute()
const results = await table.search([0.1, 0.1]).filter('id == 2').execute()
assert.equal(results.length, 1)
assert.equal(results[0].id, 2)
})
@@ -131,6 +131,15 @@ describe('LanceDB client', function () {
assert.equal(resultsAdd.length, 2)
})
})
describe('when creating a vector index', function () {
it('overwrite all records in a table', async function () {
const uri = await createTestDB(32, 300)
const con = await lancedb.connect(uri)
const table = await con.openTable('vectors')
await table.create_index({ type: 'ivf_pq', column: 'vector', num_partitions: 2, max_iters: 2 })
}).timeout(10_000) // Timeout is high partially because GH macos runner is pretty slow
})
})
describe('Query object', function () {
@@ -147,14 +156,18 @@ describe('Query object', function () {
})
})
async function createTestDB (): Promise<string> {
async function createTestDB (numDimensions: number = 2, numRows: number = 2): Promise<string> {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const data = [
{ id: 1, vector: [0.1, 0.2], name: 'foo', price: 10, is_active: true },
{ id: 2, vector: [1.1, 1.2], name: 'bar', price: 50, is_active: false }
]
const data = []
for (let i = 0; i < numRows; i++) {
const vector = []
for (let j = 0; j < numDimensions; j++) {
vector.push(i + (j * 0.1))
}
data.push({ id: i + 1, name: `name_${i}`, price: i + 10, is_active: (i % 2 === 0), vector })
}
await con.createTable('vectors', data)
return dir

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod vector;

View File

@@ -0,0 +1,128 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::TryFrom;
use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams;
use lance::index::vector::MetricType;
use neon::context::FunctionContext;
use neon::prelude::*;
use vectordb::index::vector::{IvfPQIndexBuilder, VectorIndexBuilder};
use crate::{runtime, JsTable};
pub(crate) fn table_create_vector_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let index_params = cx.argument::<JsObject>(0)?;
let index_params_builder = get_index_params_builder(&mut cx, index_params).unwrap();
let rt = runtime(&mut cx)?;
let channel = cx.channel();
let (deferred, promise) = cx.promise();
let table = js_table.table.clone();
rt.block_on(async move {
let add_result = table
.lock()
.unwrap()
.create_idx(&index_params_builder)
.await;
deferred.settle_with(&channel, move |mut cx| {
add_result
.map(|_| cx.undefined())
.or_else(|err| cx.throw_error(err.to_string()))
});
});
Ok(promise)
}
fn get_index_params_builder(
cx: &mut FunctionContext,
obj: Handle<JsObject>,
) -> Result<impl VectorIndexBuilder, String> {
let idx_type = obj
.get::<JsString, _, _>(cx, "type")
.map_err(|t| t.to_string())?
.value(cx);
match idx_type.as_str() {
"ivf_pq" => {
let mut index_builder: IvfPQIndexBuilder = IvfPQIndexBuilder::new();
let mut pq_params = PQBuildParams::default();
obj.get_opt::<JsString, _, _>(cx, "column")
.map_err(|t| t.to_string())?
.map(|s| index_builder.column(s.value(cx)));
obj.get_opt::<JsString, _, _>(cx, "index_name")
.map_err(|t| t.to_string())?
.map(|s| index_builder.index_name(s.value(cx)));
obj.get_opt::<JsString, _, _>(cx, "metric_type")
.map_err(|t| t.to_string())?
.map(|s| MetricType::try_from(s.value(cx).as_str()))
.map(|mt| {
let metric_type = mt.unwrap();
index_builder.metric_type(metric_type);
pq_params.metric_type = metric_type;
});
let num_partitions = obj
.get_opt::<JsNumber, _, _>(cx, "num_partitions")
.map_err(|t| t.to_string())?
.map(|s| s.value(cx) as usize);
let max_iters = obj
.get_opt::<JsNumber, _, _>(cx, "max_iters")
.map_err(|t| t.to_string())?
.map(|s| s.value(cx) as usize);
num_partitions.map(|np| {
let max_iters = max_iters.unwrap_or(50);
let ivf_params = IvfBuildParams {
num_partitions: np,
max_iters,
};
index_builder.ivf_params(ivf_params)
});
obj.get_opt::<JsBoolean, _, _>(cx, "use_opq")
.map_err(|t| t.to_string())?
.map(|s| pq_params.use_opq = s.value(cx));
obj.get_opt::<JsNumber, _, _>(cx, "num_sub_vectors")
.map_err(|t| t.to_string())?
.map(|s| pq_params.num_sub_vectors = s.value(cx) as usize);
obj.get_opt::<JsNumber, _, _>(cx, "num_bits")
.map_err(|t| t.to_string())?
.map(|s| pq_params.num_bits = s.value(cx) as usize);
obj.get_opt::<JsNumber, _, _>(cx, "max_iters")
.map_err(|t| t.to_string())?
.map(|s| pq_params.max_iters = s.value(cx) as usize);
obj.get_opt::<JsNumber, _, _>(cx, "max_opq_iters")
.map_err(|t| t.to_string())?
.map(|s| pq_params.max_opq_iters = s.value(cx) as usize);
Ok(index_builder)
}
t => Err(format!("{} is not a valid index type", t).to_string()),
}
}

View File

@@ -36,6 +36,7 @@ use crate::arrow::arrow_buffer_to_record_batch;
mod arrow;
mod convert;
mod index;
struct JsDatabase {
database: Arc<Database>,
@@ -236,5 +237,9 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("tableSearch", table_search)?;
cx.export_function("tableCreate", table_create)?;
cx.export_function("tableAdd", table_add)?;
cx.export_function(
"tableCreateVectorIndex",
index::vector::table_create_vector_index,
)?;
Ok(())
}

View File

@@ -10,9 +10,11 @@ repository = "https://github.com/lancedb/lancedb"
[dependencies]
arrow-array = "37.0"
arrow-data = "37.0"
arrow-schema = "37.0"
lance = "0.4.3"
tokio = { version = "1.23", features = ["rt-multi-thread"] }
[dev-dependencies]
tempfile = "3.5.0"
rand = { version = "0.8.3", features = ["small_rng"] }

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod vector;

View File

@@ -0,0 +1,163 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams;
use lance::index::vector::{MetricType, VectorIndexParams};
pub trait VectorIndexBuilder {
fn get_column(&self) -> Option<String>;
fn get_index_name(&self) -> Option<String>;
fn build(&self) -> VectorIndexParams;
}
pub struct IvfPQIndexBuilder {
column: Option<String>,
index_name: Option<String>,
metric_type: Option<MetricType>,
ivf_params: Option<IvfBuildParams>,
pq_params: Option<PQBuildParams>,
}
impl IvfPQIndexBuilder {
pub fn new() -> IvfPQIndexBuilder {
IvfPQIndexBuilder {
column: None,
index_name: None,
metric_type: None,
ivf_params: None,
pq_params: None,
}
}
}
impl IvfPQIndexBuilder {
pub fn column(&mut self, column: String) -> &mut IvfPQIndexBuilder {
self.column = Some(column);
self
}
pub fn index_name(&mut self, index_name: String) -> &mut IvfPQIndexBuilder {
self.index_name = Some(index_name);
self
}
pub fn metric_type(&mut self, metric_type: MetricType) -> &mut IvfPQIndexBuilder {
self.metric_type = Some(metric_type);
self
}
pub fn ivf_params(&mut self, ivf_params: IvfBuildParams) -> &mut IvfPQIndexBuilder {
self.ivf_params = Some(ivf_params);
self
}
pub fn pq_params(&mut self, pq_params: PQBuildParams) -> &mut IvfPQIndexBuilder {
self.pq_params = Some(pq_params);
self
}
}
impl VectorIndexBuilder for IvfPQIndexBuilder {
fn get_column(&self) -> Option<String> {
self.column.clone()
}
fn get_index_name(&self) -> Option<String> {
self.index_name.clone()
}
fn build(&self) -> VectorIndexParams {
let ivf_params = self.ivf_params.clone().unwrap_or(IvfBuildParams::default());
let pq_params = self.pq_params.clone().unwrap_or(PQBuildParams::default());
VectorIndexParams::with_ivf_pq_params(pq_params.metric_type, ivf_params, pq_params)
}
}
#[cfg(test)]
mod tests {
use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams;
use lance::index::vector::{MetricType, StageParams};
use crate::index::vector::{IvfPQIndexBuilder, VectorIndexBuilder};
#[test]
fn test_builder_no_params() {
let index_builder = IvfPQIndexBuilder::new();
assert!(index_builder.get_column().is_none());
assert!(index_builder.get_index_name().is_none());
let index_params = index_builder.build();
assert_eq!(index_params.stages.len(), 2);
if let StageParams::Ivf(ivf_params) = index_params.stages.get(0).unwrap() {
let default = IvfBuildParams::default();
assert_eq!(ivf_params.num_partitions, default.num_partitions);
assert_eq!(ivf_params.max_iters, default.max_iters);
} else {
panic!("Expected first stage to be ivf")
}
if let StageParams::PQ(pq_params) = index_params.stages.get(1).unwrap() {
assert_eq!(pq_params.use_opq, false);
} else {
panic!("Expected second stage to be pq")
}
}
#[test]
fn test_builder_all_params() {
let mut index_builder = IvfPQIndexBuilder::new();
index_builder
.column("c".to_owned())
.metric_type(MetricType::Cosine)
.index_name("index".to_owned());
assert_eq!(index_builder.column.clone().unwrap(), "c");
assert_eq!(index_builder.metric_type.unwrap(), MetricType::Cosine);
assert_eq!(index_builder.index_name.clone().unwrap(), "index");
let ivf_params = IvfBuildParams::new(500);
let mut pq_params = PQBuildParams::default();
pq_params.use_opq = true;
pq_params.max_iters = 1;
pq_params.num_bits = 8;
pq_params.num_sub_vectors = 50;
pq_params.metric_type = MetricType::Cosine;
pq_params.max_opq_iters = 2;
index_builder.ivf_params(ivf_params);
index_builder.pq_params(pq_params);
let index_params = index_builder.build();
assert_eq!(index_params.stages.len(), 2);
if let StageParams::Ivf(ivf_params) = index_params.stages.get(0).unwrap() {
assert_eq!(ivf_params.num_partitions, 500);
} else {
assert!(false, "Expected first stage to be ivf")
}
if let StageParams::PQ(pq_params) = index_params.stages.get(1).unwrap() {
assert_eq!(pq_params.use_opq, true);
assert_eq!(pq_params.max_iters, 1);
assert_eq!(pq_params.num_bits, 8);
assert_eq!(pq_params.num_sub_vectors, 50);
assert_eq!(pq_params.metric_type, MetricType::Cosine);
assert_eq!(pq_params.max_opq_iters, 2);
} else {
assert!(false, "Expected second stage to be pq")
}
}
}

View File

@@ -14,5 +14,6 @@
pub mod database;
pub mod error;
pub mod index;
pub mod query;
pub mod table;

View File

@@ -17,8 +17,10 @@ use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::index::IndexType;
use crate::error::{Error, Result};
use crate::index::vector::VectorIndexBuilder;
use crate::query::Query;
pub const VECTOR_COLUMN_NAME: &str = "vector";
@@ -87,6 +89,25 @@ impl Table {
})
}
pub async fn create_idx(&mut self, index_builder: &impl VectorIndexBuilder) -> Result<()> {
use lance::index::DatasetIndexExt;
let dataset = self
.dataset
.create_index(
&[index_builder
.get_column()
.unwrap_or(VECTOR_COLUMN_NAME.to_string())
.as_str()],
IndexType::Vector,
index_builder.get_index_name(),
&index_builder.build(),
)
.await?;
self.dataset = Arc::new(dataset);
Ok(())
}
/// Insert records into this Table
///
/// # Arguments
@@ -130,13 +151,21 @@ impl Table {
#[cfg(test)]
mod tests {
use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchReader};
use arrow_array::{
Array, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchReader,
};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType, Field, Schema};
use lance::arrow::RecordBatchBuffer;
use lance::dataset::{Dataset, WriteMode};
use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams;
use rand::Rng;
use std::sync::Arc;
use tempfile::tempdir;
use crate::error::Result;
use crate::index::vector::IvfPQIndexBuilder;
use crate::table::Table;
#[tokio::test]
@@ -251,4 +280,74 @@ mod tests {
)
.unwrap()])
}
#[tokio::test]
async fn test_create_index() {
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use rand;
use std::iter::repeat_with;
use arrow_array::Float32Array;
let tmp_dir = tempdir().unwrap();
let path_buf = tmp_dir.into_path();
let dimension = 16;
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"embeddings",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimension,
),
false,
)]));
let mut rng = rand::thread_rng();
let float_arr = Float32Array::from(
repeat_with(|| rng.gen::<f32>())
.take(512 * dimension as usize)
.collect::<Vec<f32>>(),
);
let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
let batches = RecordBatchBuffer::new(vec![RecordBatch::try_new(
schema.clone(),
vec![vectors.clone()],
)
.unwrap()]);
let reader: Box<dyn RecordBatchReader + Send> = Box::new(batches);
let mut table = Table::create(Arc::new(path_buf), "test".to_string(), reader)
.await
.unwrap();
let mut i = IvfPQIndexBuilder::new();
let index_builder = i
.column("embeddings".to_string())
.index_name("my_index".to_string())
.ivf_params(IvfBuildParams::new(256))
.pq_params(PQBuildParams::default());
table.create_idx(index_builder).await.unwrap();
assert_eq!(table.dataset.load_indices().await.unwrap().len(), 1);
assert_eq!(table.count_rows().await.unwrap(), 512);
assert_eq!(table.name, "test");
}
fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
let list_type = DataType::FixedSizeList(
Arc::new(Field::new("item", values.data_type().clone(), true)),
list_size,
);
let data = ArrayDataBuilder::new(list_type)
.len(values.len() / list_size as usize)
.add_child_data(values.into_data())
.build()
.unwrap();
Ok(FixedSizeListArray::from(data))
}
}