feat: add create_index to the async python API (#1052)

This also refactors the rust lancedb index builder API (and,
correspondingly, the nodejs API)
This commit is contained in:
Weston Pace
2024-03-12 05:17:05 -07:00
committed by GitHub
parent ae1cf4441d
commit 356e89a800
38 changed files with 1330 additions and 767 deletions

View File

@@ -28,13 +28,14 @@ arrow-schema = "50.0"
arrow-arith = "50.0"
arrow-cast = "50.0"
async-trait = "0"
chrono = "0.4.23"
chrono = "0.4.35"
half = { "version" = "=2.3.1", default-features = false, features = [
"num-traits",
] }
futures = "0"
log = "0.4"
object_store = "0.9.0"
pin-project = "1.0.7"
snafu = "0.7.4"
url = "2"
num-traits = "0.2"

View File

@@ -750,11 +750,11 @@ describe('LanceDB client', function () {
num_sub_vectors: 2
})
await expect(createIndex).to.be.rejectedWith(
/VectorIndex requires the column data type to be fixed size list of float32s/
"index cannot be created on the column `name` which has data type Utf8"
)
})
it('it should fail when the column is not a vector', async function () {
it('it should fail when num_partitions is invalid', async function () {
const uri = await createTestDB(32, 300)
const con = await lancedb.connect(uri)
const table = await con.openTable('vectors')

View File

@@ -14,12 +14,10 @@ crate-type = ["cdylib"]
[dependencies]
arrow-ipc.workspace = true
futures.workspace = true
lance-linalg.workspace = true
lance.workspace = true
lancedb = { path = "../rust/lancedb" }
napi = { version = "2.15", default-features = false, features = [
"napi7",
"async"
"async",
] }
napi-derive = "2"

View File

@@ -27,6 +27,7 @@ import {
Float64,
} from "apache-arrow";
import { makeArrowTable } from "../dist/arrow";
import { Index } from "../dist/indices";
describe("Given a table", () => {
let tmpDir: tmp.DirResult;
@@ -67,19 +68,17 @@ describe("Given a table", () => {
});
});
describe("Test creating index", () => {
describe("When creating an index", () => {
let tmpDir: tmp.DirResult;
const schema = new Schema([
new Field("id", new Int32(), true),
new Field("vec", new FixedSizeList(32, new Field("item", new Float32()))),
]);
let tbl: Table;
let queryVec: number[];
beforeEach(() => {
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
test("create vector index with no column", async () => {
const db = await connect(tmpDir.name);
const data = makeArrowTable(
Array(300)
@@ -94,8 +93,13 @@ describe("Test creating index", () => {
schema,
},
);
const tbl = await db.createTable("test", data);
await tbl.createIndex().build();
queryVec = data.toArray()[5].vec.toJSON();
tbl = await db.createTable("test", data);
});
afterEach(() => tmpDir.removeCallback());
it("should create a vector index on vector columns", async () => {
await tbl.createIndex("vec");
// check index directory
const indexDir = path.join(tmpDir.name, "test.lance", "_indices");
@@ -103,38 +107,47 @@ describe("Test creating index", () => {
// TODO: check index type.
// Search without specifying the column
const queryVector = data.toArray()[5].vec.toJSON();
const rst = await tbl.query().nearestTo(queryVector).limit(2).toArrow();
const rst = await tbl.query().nearestTo(queryVec).limit(2).toArrow();
expect(rst.numRows).toBe(2);
// Search with specifying the column
const rst2 = await tbl.search(queryVector, "vec").limit(2).toArrow();
const rst2 = await tbl.search(queryVec, "vec").limit(2).toArrow();
expect(rst2.numRows).toBe(2);
expect(rst.toString()).toEqual(rst2.toString());
});
test("no vector column available", async () => {
const db = await connect(tmpDir.name);
const tbl = await db.createTable(
"no_vec",
makeArrowTable([
{ id: 1, val: 2 },
{ id: 2, val: 3 },
]),
);
await expect(tbl.createIndex().build()).rejects.toThrow(
"No vector column found",
);
it("should allow parameters to be specified", async () => {
await tbl.createIndex("vec", {
config: Index.ivfPq({
numPartitions: 10,
}),
});
await tbl.createIndex("val").build();
const indexDir = path.join(tmpDir.name, "no_vec.lance", "_indices");
// TODO: Verify parameters when we can load index config as part of list indices
});
it("should allow me to replace (or not) an existing index", async () => {
await tbl.createIndex("id");
// Default is replace=true
await tbl.createIndex("id");
await expect(tbl.createIndex("id", { replace: false })).rejects.toThrow(
"already exists",
);
await tbl.createIndex("id", { replace: true });
});
test("should create a scalar index on scalar columns", async () => {
await tbl.createIndex("id");
const indexDir = path.join(tmpDir.name, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
for await (const r of tbl.query().filter("id > 1").select(["id"])) {
expect(r.numRows).toBe(1);
expect(r.numRows).toBe(298);
}
});
// TODO: Move this test to the query API test (making sure we can reject queries
// when the dimension is incorrect)
test("two columns with different dimensions", async () => {
const db = await connect(tmpDir.name);
const schema = new Schema([
@@ -164,14 +177,9 @@ describe("Test creating index", () => {
);
// Only build index over v1
await expect(tbl.createIndex().build()).rejects.toThrow(
/.*More than one vector columns found.*/,
);
tbl
.createIndex("vec")
// eslint-disable-next-line @typescript-eslint/naming-convention
.ivf_pq({ num_partitions: 2, num_sub_vectors: 2 })
.build();
await tbl.createIndex("vec", {
config: Index.ivfPq({ numPartitions: 2, numSubVectors: 2 }),
});
const rst = await tbl
.query()
@@ -205,30 +213,6 @@ describe("Test creating index", () => {
expect(rst64Query.toString()).toEqual(rst64Search.toString());
expect(rst64Query.numRows).toBe(2);
});
test("create scalar index", async () => {
const db = await connect(tmpDir.name);
const data = makeArrowTable(
Array(300)
.fill(1)
.map((_, i) => ({
id: i,
vec: Array(32)
.fill(1)
.map(() => Math.random()),
})),
{
schema,
},
);
const tbl = await db.createTable("test", data);
await tbl.createIndex("id").build();
// check index directory
const indexDir = path.join(tmpDir.name, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
// TODO: check index type.
});
});
describe("Read consistency interval", () => {

View File

@@ -18,15 +18,9 @@ import {
ConnectionOptions,
} from "./native.js";
export {
ConnectionOptions,
WriteOptions,
Query,
MetricType,
} from "./native.js";
export { Connection } from "./connection";
export { Table } from "./table";
export { IvfPQOptions, IndexBuilder } from "./indexer";
export { ConnectionOptions, WriteOptions, Query } from "./native.js";
export { Connection, CreateTableOptions } from "./connection";
export { Table, AddDataOptions } from "./table";
/**
* Connect to a LanceDB instance at the given URI.

View File

@@ -1,105 +0,0 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO: Re-enable this as part of https://github.com/lancedb/lancedb/pull/1052
/* eslint-disable @typescript-eslint/naming-convention */
import {
MetricType,
IndexBuilder as NativeBuilder,
Table as NativeTable,
} from "./native";
/** Options to create `IVF_PQ` index */
export interface IvfPQOptions {
/** Number of IVF partitions. */
num_partitions?: number;
/** Number of sub-vectors in PQ coding. */
num_sub_vectors?: number;
/** Number of bits used for each PQ code.
*/
num_bits?: number;
/** Metric type to calculate the distance between vectors.
*
* Supported metrics: `L2`, `Cosine` and `Dot`.
*/
metric_type?: MetricType;
/** Number of iterations to train K-means.
*
* Default is 50. The more iterations it usually yield better results,
* but it takes longer to train.
*/
max_iterations?: number;
sample_rate?: number;
}
/**
* Building an index on LanceDB {@link Table}
*
* @see {@link Table.createIndex} for detailed usage.
*/
export class IndexBuilder {
private inner: NativeBuilder;
constructor(tbl: NativeTable) {
this.inner = tbl.createIndex();
}
/** Instruct the builder to build an `IVF_PQ` index */
ivf_pq(options?: IvfPQOptions): IndexBuilder {
this.inner.ivfPq(
options?.metric_type,
options?.num_partitions,
options?.num_sub_vectors,
options?.num_bits,
options?.max_iterations,
options?.sample_rate,
);
return this;
}
/** Instruct the builder to build a Scalar index. */
scalar(): IndexBuilder {
this.scalar();
return this;
}
/** Set the column(s) to create index on top of. */
column(col: string): IndexBuilder {
this.inner.column(col);
return this;
}
/** Set to true to replace existing index. */
replace(val: boolean): IndexBuilder {
this.inner.replace(val);
return this;
}
/** Specify the name of the index. Optional */
name(n: string): IndexBuilder {
this.inner.name(n);
return this;
}
/** Building the index. */
async build() {
await this.inner.build();
}
}

195
nodejs/lancedb/indices.ts Normal file
View File

@@ -0,0 +1,195 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Index as LanceDbIndex } from "./native";
/**
* Options to create an `IVF_PQ` index
*/
export interface IvfPqOptions {
/** The number of IVF partitions to create.
*
* This value should generally scale with the number of rows in the dataset.
* By default the number of partitions is the square root of the number of
* rows.
*
* If this value is too large then the first part of the search (picking the
* right partition) will be slow. If this value is too small then the second
* part of the search (searching within a partition) will be slow.
*/
numPartitions?: number;
/** Number of sub-vectors of PQ.
*
* This value controls how much the vector is compressed during the quantization step.
* The more sub vectors there are the less the vector is compressed. The default is
* the dimension of the vector divided by 16. If the dimension is not evenly divisible
* by 16 we use the dimension divded by 8.
*
* The above two cases are highly preferred. Having 8 or 16 values per subvector allows
* us to use efficient SIMD instructions.
*
* If the dimension is not visible by 8 then we use 1 subvector. This is not ideal and
* will likely result in poor performance.
*/
numSubVectors?: number;
/** [DistanceType] to use to build the index.
*
* Default value is [DistanceType::L2].
*
* This is used when training the index to calculate the IVF partitions
* (vectors are grouped in partitions with similar vectors according to this
* distance type) and to calculate a subvector's code during quantization.
*
* The distance type used to train an index MUST match the distance type used
* to search the index. Failure to do so will yield inaccurate results.
*
* The following distance types are available:
*
* "l2" - Euclidean distance. This is a very common distance metric that
* accounts for both magnitude and direction when determining the distance
* between vectors. L2 distance has a range of [0, ∞).
*
* "cosine" - Cosine distance. Cosine distance is a distance metric
* calculated from the cosine similarity between two vectors. Cosine
* similarity is a measure of similarity between two non-zero vectors of an
* inner product space. It is defined to equal the cosine of the angle
* between them. Unlike L2, the cosine distance is not affected by the
* magnitude of the vectors. Cosine distance has a range of [0, 2].
*
* Note: the cosine distance is undefined when one (or both) of the vectors
* are all zeros (there is no direction). These vectors are invalid and may
* never be returned from a vector search.
*
* "dot" - Dot product. Dot distance is the dot product of two vectors. Dot
* distance has a range of (-∞, ∞). If the vectors are normalized (i.e. their
* L2 norm is 1), then dot distance is equivalent to the cosine distance.
*/
distanceType?: "l2" | "cosine" | "dot";
/** Max iteration to train IVF kmeans.
*
* When training an IVF PQ index we use kmeans to calculate the partitions. This parameter
* controls how many iterations of kmeans to run.
*
* Increasing this might improve the quality of the index but in most cases these extra
* iterations have diminishing returns.
*
* The default value is 50.
*/
maxIterations?: number;
/** The number of vectors, per partition, to sample when training IVF kmeans.
*
* When an IVF PQ index is trained, we need to calculate partitions. These are groups
* of vectors that are similar to each other. To do this we use an algorithm called kmeans.
*
* Running kmeans on a large dataset can be slow. To speed this up we run kmeans on a
* random sample of the data. This parameter controls the size of the sample. The total
* number of vectors used to train the index is `sample_rate * num_partitions`.
*
* Increasing this value might improve the quality of the index but in most cases the
* default should be sufficient.
*
* The default value is 256.
*/
sampleRate?: number;
}
export class Index {
private readonly inner: LanceDbIndex;
private constructor(inner: LanceDbIndex) {
this.inner = inner;
}
/**
* Create an IvfPq index
*
* This index stores a compressed (quantized) copy of every vector. These vectors
* are grouped into partitions of similar vectors. Each partition keeps track of
* a centroid which is the average value of all vectors in the group.
*
* During a query the centroids are compared with the query vector to find the closest
* partitions. The compressed vectors in these partitions are then searched to find
* the closest vectors.
*
* The compression scheme is called product quantization. Each vector is divided into
* subvectors and then each subvector is quantized into a small number of bits. the
* parameters `num_bits` and `num_subvectors` control this process, providing a tradeoff
* between index size (and thus search speed) and index accuracy.
*
* The partitioning process is called IVF and the `num_partitions` parameter controls how
* many groups to create.
*
* Note that training an IVF PQ index on a large dataset is a slow operation and
* currently is also a memory intensive operation.
*/
static ivfPq(options?: Partial<IvfPqOptions>) {
return new Index(
LanceDbIndex.ivfPq(
options?.distanceType,
options?.numPartitions,
options?.numSubVectors,
options?.maxIterations,
options?.sampleRate,
),
);
}
/** Create a btree index
*
* A btree index is an index on a scalar columns. The index stores a copy of the column
* in sorted order. A header entry is created for each block of rows (currently the
* block size is fixed at 4096). These header entries are stored in a separate
* cacheable structure (a btree). To search for data the header is used to determine
* which blocks need to be read from disk.
*
* For example, a btree index in a table with 1Bi rows requires sizeof(Scalar) * 256Ki
* bytes of memory and will generally need to read sizeof(Scalar) * 4096 bytes to find
* the correct row ids.
*
* This index is good for scalar columns with mostly distinct values and does best when
* the query is highly selective.
*
* The btree index does not currently have any parameters though parameters such as the
* block size may be added in the future.
*/
static btree() {
return new Index(LanceDbIndex.btree());
}
}
export interface IndexOptions {
/** Advanced index configuration
*
* This option allows you to specify a specfic index to create and also
* allows you to pass in configuration for training the index.
*
* See the static methods on Index for details on the various index types.
*
* If this is not supplied then column data type(s) and column statistics
* will be used to determine the most useful kind of index to create.
*/
config?: Index;
/** Whether to replace the existing index
*
* If this is false, and another index already exists on the same columns
* and the same name, then an error will be returned. This is true even if
* that index is out of date.
*
* The default is true
*/
replace?: boolean;
}

View File

@@ -3,15 +3,6 @@
/* auto-generated by NAPI-RS */
export const enum IndexType {
Scalar = 0,
IvfPq = 1
}
export const enum MetricType {
L2 = 0,
Cosine = 1,
Dot = 2
}
/**
* A definition of a column alteration. The alteration changes the column at
* `path` to have the new name `name`, to be nullable if `nullable` is true,
@@ -93,13 +84,9 @@ export class Connection {
/** Drop table with the name. Or raise an error if the table does not exist. */
dropTable(name: string): Promise<void>
}
export class IndexBuilder {
replace(v: boolean): void
column(c: string): void
name(name: string): void
ivfPq(metricType?: MetricType | undefined | null, numPartitions?: number | undefined | null, numSubVectors?: number | undefined | null, numBits?: number | undefined | null, maxIterations?: number | undefined | null, sampleRate?: number | undefined | null): void
scalar(): void
build(): Promise<void>
export class Index {
static ivfPq(distanceType?: string | undefined | null, numPartitions?: number | undefined | null, numSubVectors?: number | undefined | null, maxIterations?: number | undefined | null, sampleRate?: number | undefined | null): Index
static btree(): Index
}
/** Typescript-style Async Iterator over RecordBatches */
export class RecordBatchIterator {
@@ -125,7 +112,7 @@ export class Table {
add(buf: Buffer, mode: string): Promise<void>
countRows(filter?: string | undefined | null): Promise<number>
delete(predicate: string): Promise<void>
createIndex(): IndexBuilder
createIndex(index: Index | undefined | null, column: string, replace?: boolean | undefined | null): Promise<void>
query(): Query
addColumns(transforms: Array<AddColumnsSql>): Promise<void>
alterColumns(alterations: Array<ColumnAlteration>): Promise<void>

View File

@@ -295,12 +295,10 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { Connection, IndexType, MetricType, IndexBuilder, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding
const { Connection, Index, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding
module.exports.Connection = Connection
module.exports.IndexType = IndexType
module.exports.MetricType = MetricType
module.exports.IndexBuilder = IndexBuilder
module.exports.Index = Index
module.exports.RecordBatchIterator = RecordBatchIterator
module.exports.Query = Query
module.exports.Table = Table

View File

@@ -19,7 +19,7 @@ import {
Table as _NativeTable,
} from "./native";
import { Query } from "./query";
import { IndexBuilder } from "./indexer";
import { IndexOptions } from "./indices";
import { Data, fromDataToBuffer } from "./arrow";
/**
@@ -103,24 +103,28 @@ export class Table {
await this.inner.delete(predicate);
}
/** Create an index over the columns.
/** Create an index to speed up queries.
*
* @param {string} column The column to create the index on. If not specified,
* it will create an index on vector field.
* Indices can be created on vector columns or scalar columns.
* Indices on vector columns will speed up vector searches.
* Indices on scalar columns will speed up filtering (in both
* vector and non-vector searches)
*
* @example
*
* By default, it creates vector idnex on one vector column.
* If the column has a vector (fixed size list) data type then
* an IvfPq vector index will be created.
*
* ```typescript
* const table = await conn.openTable("my_table");
* await table.createIndex().build();
* await table.createIndex(["vector"]);
* ```
*
* You can specify `IVF_PQ` parameters via `ivf_pq({})` call.
* For advanced control over vector index creation you can specify
* the index type and options.
* ```typescript
* const table = await conn.openTable("my_table");
* await table.createIndex("my_vec_col")
* await table.createIndex(["vector"], I)
* .ivf_pq({ num_partitions: 128, num_sub_vectors: 16 })
* .build();
* ```
@@ -131,12 +135,11 @@ export class Table {
* await table.createIndex("my_float_col").build();
* ```
*/
createIndex(column?: string): IndexBuilder {
let builder = new IndexBuilder(this.inner);
if (column !== undefined) {
builder = builder.column(column);
}
return builder;
async createIndex(column: string, options?: Partial<IndexOptions>) {
// Bit of a hack to get around the fact that TS has no package-scope.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const nativeIndex = (options?.config as any)?.inner;
await this.inner.createIndex(nativeIndex, column, options?.replace);
}
/**

12
nodejs/src/error.rs Normal file
View File

@@ -0,0 +1,12 @@
pub type Result<T> = napi::Result<T>;
pub trait NapiErrorExt<T> {
/// Convert to a napi error using from_reason(err.to_string())
fn default_error(self) -> Result<T>;
}
impl<T> NapiErrorExt<T> for std::result::Result<T, lancedb::Error> {
fn default_error(self) -> Result<T> {
self.map_err(|err| napi::Error::from_reason(err.to_string()))
}
}

View File

@@ -14,126 +14,73 @@
use std::sync::Mutex;
use lance_linalg::distance::MetricType as LanceMetricType;
use lancedb::index::IndexBuilder as LanceDbIndexBuilder;
use lancedb::Table as LanceDbTable;
use lancedb::index::scalar::BTreeIndexBuilder;
use lancedb::index::vector::IvfPqIndexBuilder;
use lancedb::index::Index as LanceDbIndex;
use lancedb::DistanceType;
use napi_derive::napi;
#[napi]
pub enum IndexType {
Scalar,
IvfPq,
pub struct Index {
inner: Mutex<Option<LanceDbIndex>>,
}
#[napi]
pub enum MetricType {
L2,
Cosine,
Dot,
}
impl From<MetricType> for LanceMetricType {
fn from(metric: MetricType) -> Self {
match metric {
MetricType::L2 => Self::L2,
MetricType::Cosine => Self::Cosine,
MetricType::Dot => Self::Dot,
}
impl Index {
pub fn consume(&self) -> napi::Result<LanceDbIndex> {
self.inner
.lock()
.unwrap()
.take()
.ok_or(napi::Error::from_reason(
"attempt to use an index more than once",
))
}
}
#[napi]
pub struct IndexBuilder {
inner: Mutex<Option<LanceDbIndexBuilder>>,
}
impl IndexBuilder {
fn modify(
&self,
mod_fn: impl Fn(LanceDbIndexBuilder) -> LanceDbIndexBuilder,
) -> napi::Result<()> {
let mut inner = self.inner.lock().unwrap();
let inner_builder = inner.take().ok_or_else(|| {
napi::Error::from_reason("IndexBuilder has already been consumed".to_string())
})?;
let inner_builder = mod_fn(inner_builder);
inner.replace(inner_builder);
Ok(())
}
}
#[napi]
impl IndexBuilder {
pub fn new(tbl: &LanceDbTable) -> Self {
let inner = tbl.create_index(&[]);
Self {
inner: Mutex::new(Some(inner)),
}
}
#[napi]
pub fn replace(&self, v: bool) -> napi::Result<()> {
self.modify(|b| b.replace(v))
}
#[napi]
pub fn column(&self, c: String) -> napi::Result<()> {
self.modify(|b| b.columns(&[c.as_str()]))
}
#[napi]
pub fn name(&self, name: String) -> napi::Result<()> {
self.modify(|b| b.name(name.as_str()))
}
#[napi]
impl Index {
#[napi(factory)]
pub fn ivf_pq(
&self,
metric_type: Option<MetricType>,
distance_type: Option<String>,
num_partitions: Option<u32>,
num_sub_vectors: Option<u32>,
num_bits: Option<u32>,
max_iterations: Option<u32>,
sample_rate: Option<u32>,
) -> napi::Result<()> {
self.modify(|b| {
let mut b = b.ivf_pq();
if let Some(metric_type) = metric_type {
b = b.metric_type(metric_type.into());
}
if let Some(num_partitions) = num_partitions {
b = b.num_partitions(num_partitions);
}
if let Some(num_sub_vectors) = num_sub_vectors {
b = b.num_sub_vectors(num_sub_vectors);
}
if let Some(num_bits) = num_bits {
b = b.num_bits(num_bits);
}
if let Some(max_iterations) = max_iterations {
b = b.max_iterations(max_iterations);
}
if let Some(sample_rate) = sample_rate {
b = b.sample_rate(sample_rate);
}
b
) -> napi::Result<Self> {
let mut ivf_pq_builder = IvfPqIndexBuilder::default();
if let Some(distance_type) = distance_type {
let distance_type = match distance_type.as_str() {
"l2" => Ok(DistanceType::L2),
"cosine" => Ok(DistanceType::Cosine),
"dot" => Ok(DistanceType::Dot),
_ => Err(napi::Error::from_reason(format!(
"Invalid distance type '{}'. Must be one of l2, cosine, or dot",
distance_type
))),
}?;
ivf_pq_builder = ivf_pq_builder.distance_type(distance_type);
}
if let Some(num_partitions) = num_partitions {
ivf_pq_builder = ivf_pq_builder.num_partitions(num_partitions);
}
if let Some(num_sub_vectors) = num_sub_vectors {
ivf_pq_builder = ivf_pq_builder.num_sub_vectors(num_sub_vectors);
}
if let Some(max_iterations) = max_iterations {
ivf_pq_builder = ivf_pq_builder.max_iterations(max_iterations);
}
if let Some(sample_rate) = sample_rate {
ivf_pq_builder = ivf_pq_builder.sample_rate(sample_rate);
}
Ok(Self {
inner: Mutex::new(Some(LanceDbIndex::IvfPq(ivf_pq_builder))),
})
}
#[napi]
pub fn scalar(&self) -> napi::Result<()> {
self.modify(|b| b.scalar())
}
#[napi]
pub async fn build(&self) -> napi::Result<()> {
let inner = self.inner.lock().unwrap().take().ok_or_else(|| {
napi::Error::from_reason("IndexBuilder has already been consumed".to_string())
})?;
inner
.build()
.await
.map_err(|e| napi::Error::from_reason(format!("Failed to build index: {}", e)))?;
Ok(())
#[napi(factory)]
pub fn btree() -> Self {
Self {
inner: Mutex::new(Some(LanceDbIndex::BTree(BTreeIndexBuilder::default()))),
}
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use futures::StreamExt;
use lance::io::RecordBatchStream;
use lancedb::arrow::SendableRecordBatchStream;
use lancedb::ipc::batches_to_ipc_file;
use napi::bindgen_prelude::*;
use napi_derive::napi;
@@ -21,12 +21,12 @@ use napi_derive::napi;
/** Typescript-style Async Iterator over RecordBatches */
#[napi]
pub struct RecordBatchIterator {
inner: Box<dyn RecordBatchStream + Unpin>,
inner: SendableRecordBatchStream,
}
#[napi]
impl RecordBatchIterator {
pub(crate) fn new(inner: Box<dyn RecordBatchStream + Unpin>) -> Self {
pub(crate) fn new(inner: SendableRecordBatchStream) -> Self {
Self { inner }
}

View File

@@ -16,6 +16,7 @@ use connection::Connection;
use napi_derive::*;
mod connection;
mod error;
mod index;
mod iterator;
mod query;

View File

@@ -74,6 +74,6 @@ impl Query {
let inner_stream = self.inner.execute_stream().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
Ok(RecordBatchIterator::new(Box::new(inner_stream)))
Ok(RecordBatchIterator::new(inner_stream))
}
}

View File

@@ -13,13 +13,16 @@
// limitations under the License.
use arrow_ipc::writer::FileWriter;
use lance::dataset::ColumnAlteration as LanceColumnAlteration;
use lancedb::ipc::ipc_file_to_batches;
use lancedb::table::{AddDataMode, Table as LanceDbTable};
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, NewColumnTransform,
Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use crate::index::IndexBuilder;
use crate::error::NapiErrorExt;
use crate::index::Index;
use crate::query::Query;
#[napi]
@@ -129,8 +132,22 @@ impl Table {
}
#[napi]
pub fn create_index(&self) -> napi::Result<IndexBuilder> {
Ok(IndexBuilder::new(self.inner_ref()?))
pub async fn create_index(
&self,
index: Option<&Index>,
column: String,
replace: Option<bool>,
) -> napi::Result<()> {
let lancedb_index = if let Some(index) = index {
index.consume()?
} else {
lancedb::index::Index::Auto
};
let mut builder = self.inner_ref()?.create_index(&[column], lancedb_index);
if let Some(replace) = replace {
builder = builder.replace(replace);
}
builder.execute().await.default_error()
}
#[napi]
@@ -144,7 +161,7 @@ impl Table {
.into_iter()
.map(|sql| (sql.name, sql.value_sql))
.collect::<Vec<_>>();
let transforms = lance::dataset::NewColumnTransform::SqlExpressions(transforms);
let transforms = NewColumnTransform::SqlExpressions(transforms);
self.inner_ref()?
.add_columns(transforms, None)
.await

View File

@@ -23,8 +23,9 @@ from ._lancedb import connect as lancedb_connect
from .common import URI, sanitize_uri
from .db import AsyncConnection, DBConnection, LanceDBConnection
from .remote.db import RemoteDBConnection
from .schema import vector # noqa: F401
from .utils import sentry_log # noqa: F401
from .schema import vector
from .table import AsyncTable
from .utils import sentry_log
def connect(
@@ -188,3 +189,19 @@ async def connect_async(
read_consistency_interval_secs,
)
)
__all__ = [
"connect",
"connect_async",
"AsyncConnection",
"AsyncTable",
"URI",
"sanitize_uri",
"sentry_log",
"vector",
"DBConnection",
"LanceDBConnection",
"RemoteDBConnection",
"__version__",
]

View File

@@ -2,6 +2,18 @@ from typing import Optional
import pyarrow as pa
class Index:
@staticmethod
def ivf_pq(
distance_type: Optional[str],
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
max_iterations: Optional[int],
sample_rate: Optional[int],
) -> Index: ...
@staticmethod
def btree() -> Index: ...
class Connection(object):
async def table_names(
self, start_after: Optional[str], limit: Optional[int]
@@ -13,10 +25,15 @@ class Connection(object):
self, name: str, mode: str, schema: pa.Schema
) -> Table: ...
class Table(object):
class Table:
def name(self) -> str: ...
def __repr__(self) -> str: ...
async def schema(self) -> pa.Schema: ...
async def add(self, data: pa.RecordBatchReader, mode: str) -> None: ...
async def count_rows(self, filter: Optional[str]) -> int: ...
async def create_index(
self, column: str, config: Optional[Index], replace: Optional[bool]
): ...
async def connect(
uri: str,

View File

@@ -0,0 +1,157 @@
from typing import Optional
from ._lancedb import (
Index as LanceDbIndex,
)
class BTree(object):
"""Describes a btree index configuration
A btree index is an index on scalar columns. The index stores a copy of the
column in sorted order. A header entry is created for each block of rows
(currently the block size is fixed at 4096). These header entries are stored
in a separate cacheable structure (a btree). To search for data the header is
used to determine which blocks need to be read from disk.
For example, a btree index in a table with 1Bi rows requires
sizeof(Scalar) * 256Ki bytes of memory and will generally need to read
sizeof(Scalar) * 4096 bytes to find the correct row ids.
This index is good for scalar columns with mostly distinct values and does best
when the query is highly selective.
The btree index does not currently have any parameters though parameters such as
the block size may be added in the future.
"""
def __init__(self):
self._inner = LanceDbIndex.btree()
class IvfPq(object):
"""Describes an IVF PQ Index
This index stores a compressed (quantized) copy of every vector. These vectors
are grouped into partitions of similar vectors. Each partition keeps track of
a centroid which is the average value of all vectors in the group.
During a query the centroids are compared with the query vector to find the
closest partitions. The compressed vectors in these partitions are then
searched to find the closest vectors.
The compression scheme is called product quantization. Each vector is divide
into subvectors and then each subvector is quantized into a small number of
bits. the parameters `num_bits` and `num_subvectors` control this process,
providing a tradeoff between index size (and thus search speed) and index
accuracy.
The partitioning process is called IVF and the `num_partitions` parameter
controls how many groups to create.
Note that training an IVF PQ index on a large dataset is a slow operation and
currently is also a memory intensive operation.
"""
def __init__(
self,
*,
distance_type: Optional[str] = None,
num_partitions: Optional[int] = None,
num_sub_vectors: Optional[int] = None,
max_iterations: Optional[int] = None,
sample_rate: Optional[int] = None,
):
"""
Create an IVF PQ index config
Parameters
----------
distance_type: str, default "L2"
The distance metric used to train the index
This is used when training the index to calculate the IVF partitions
(vectors are grouped in partitions with similar vectors according to this
distance type) and to calculate a subvector's code during quantization.
The distance type used to train an index MUST match the distance type used
to search the index. Failure to do so will yield inaccurate results.
The following distance types are available:
"l2" - Euclidean distance. This is a very common distance metric that
accounts for both magnitude and direction when determining the distance
between vectors. L2 distance has a range of [0, ∞).
"cosine" - Cosine distance. Cosine distance is a distance metric
calculated from the cosine similarity between two vectors. Cosine
similarity is a measure of similarity between two non-zero vectors of an
inner product space. It is defined to equal the cosine of the angle
between them. Unlike L2, the cosine distance is not affected by the
magnitude of the vectors. Cosine distance has a range of [0, 2].
Note: the cosine distance is undefined when one (or both) of the vectors
are all zeros (there is no direction). These vectors are invalid and may
never be returned from a vector search.
"dot" - Dot product. Dot distance is the dot product of two vectors. Dot
distance has a range of (-∞, ∞). If the vectors are normalized (i.e. their
L2 norm is 1), then dot distance is equivalent to the cosine distance.
num_partitions: int, default sqrt(num_rows)
The number of IVF partitions to create.
This value should generally scale with the number of rows in the dataset.
By default the number of partitions is the square root of the number of
rows.
If this value is too large then the first part of the search (picking the
right partition) will be slow. If this value is too small then the second
part of the search (searching within a partition) will be slow.
num_sub_vectors: int, default is vector dimension / 16
Number of sub-vectors of PQ.
This value controls how much the vector is compressed during the
quantization step. The more sub vectors there are the less the vector is
compressed. The default is the dimension of the vector divided by 16. If
the dimension is not evenly divisible by 16 we use the dimension divded by
8.
The above two cases are highly preferred. Having 8 or 16 values per
subvector allows us to use efficient SIMD instructions.
If the dimension is not visible by 8 then we use 1 subvector. This is not
ideal and will likely result in poor performance.
max_iterations: int, default 50
Max iteration to train kmeans.
When training an IVF PQ index we use kmeans to calculate the partitions.
This parameter controls how many iterations of kmeans to run.
Increasing this might improve the quality of the index but in most cases
these extra iterations have diminishing returns.
The default value is 50.
sample_rate: int, default 256
The rate used to calculate the number of training vectors for kmeans.
When an IVF PQ index is trained, we need to calculate partitions. These
are groups of vectors that are similar to each other. To do this we use an
algorithm called kmeans.
Running kmeans on a large dataset can be slow. To speed this up we run
kmeans on a random sample of the data. This parameter controls the size of
the sample. The total number of vectors used to train the index is
`sample_rate * num_partitions`.
Increasing this value might improve the quality of the index but in most
cases the default should be sufficient.
The default value is 256.
"""
self._inner = LanceDbIndex.ivf_pq(
distance_type=distance_type,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
max_iterations=max_iterations,
sample_rate=sample_rate,
)

View File

@@ -60,6 +60,7 @@ if TYPE_CHECKING:
from ._lancedb import Table as LanceDBTable
from .db import LanceDBConnection
from .index import BTree, IvfPq
pd = safe_import_pandas()
@@ -1917,112 +1918,48 @@ class AsyncTable:
raise NotImplementedError
async def create_index(
self,
metric="L2",
num_partitions=256,
num_sub_vectors=96,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
index_cache_size: Optional[int] = None,
):
"""Create an index on the table.
Parameters
----------
metric: str, default "L2"
The distance metric to use when creating the index.
Valid values are "L2", "cosine", or "dot".
L2 is euclidean distance.
num_partitions: int, default 256
The number of IVF partitions to use when creating the index.
Default is 256.
num_sub_vectors: int, default 96
The number of PQ sub-vectors to use when creating the index.
Default is 96.
vector_column_name: str, default "vector"
The vector column name to create the index.
replace: bool, default True
- If True, replace the existing index if it exists.
- If False, raise an error if duplicate index exists.
accelerator: str, default None
If set, use the given accelerator to create the index.
Only support "cuda" for now.
index_cache_size : int, optional
The size of the index cache in number of entries. Default value is 256.
"""
raise NotImplementedError
async def create_scalar_index(
self,
column: str,
*,
replace: bool = True,
replace: Optional[bool] = None,
config: Optional[Union[IvfPq, BTree]] = None,
):
"""Create a scalar index on a column.
"""Create an index to speed up queries
Scalar indices, like vector indices, can be used to speed up scans. A scalar
index can speed up scans that contain filter expressions on the indexed column.
For example, the following scan will be faster if the column ``my_col`` has
a scalar index:
.. code-block:: python
import lancedb
db = lancedb.connect("/data/lance")
img_table = db.open_table("images")
my_df = img_table.search().where("my_col = 7", prefilter=True).to_pandas()
Scalar indices can also speed up scans containing a vector search and a
prefilter:
.. code-block::python
import lancedb
db = lancedb.connect("/data/lance")
img_table = db.open_table("images")
img_table.search([1, 2, 3, 4], vector_column_name="vector")
.where("my_col != 7", prefilter=True)
.to_pandas()
Scalar indices can only speed up scans for basic filters using
equality, comparison, range (e.g. ``my_col BETWEEN 0 AND 100``), and set
membership (e.g. `my_col IN (0, 1, 2)`)
Scalar indices can be used if the filter contains multiple indexed columns and
the filter criteria are AND'd or OR'd together
(e.g. ``my_col < 0 AND other_col> 100``)
Scalar indices may be used if the filter contains non-indexed columns but,
depending on the structure of the filter, they may not be usable. For example,
if the column ``not_indexed`` does not have a scalar index then the filter
``my_col = 0 OR not_indexed = 1`` will not be able to use any scalar index on
``my_col``.
**Experimental API**
Indices can be created on vector columns or scalar columns.
Indices on vector columns will speed up vector searches.
Indices on scalar columns will speed up filtering (in both
vector and non-vector searches)
Parameters
----------
column : str
The column to be indexed. Must be a boolean, integer, float,
or string column.
replace : bool, default True
Replace the existing index if it exists.
index: Index
The index to create.
Examples
--------
LanceDb supports multiple types of indices. See the static methods on
the Index class for more details.
column: str, default None
The column to index.
.. code-block:: python
When building a scalar index this must be set.
import lance
When building a vector index, this is optional. The default will look
for any columns of type fixed-size-list with floating point values. If
there is only one column of this type then it will be used. Otherwise
an error will be returned.
replace: bool, default True
Whether to replace the existing index
dataset = lance.dataset("./images.lance")
dataset.create_scalar_index("category")
If this is false, and another index already exists on the same columns
and the same name, then an error will be returned. This is true even if
that index is out of date.
The default is True
"""
raise NotImplementedError
index = None
if config is not None:
index = config._inner
await self._inner.create_index(column, index=index, replace=replace)
async def add(
self,
@@ -2066,6 +2003,8 @@ class AsyncTable:
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
if isinstance(data, pa.Table):
data = pa.RecordBatchReader.from_batches(data.schema, data.to_batches())
await self._inner.add(data, mode)
register_event("add")

View File

@@ -0,0 +1,61 @@
from datetime import timedelta
import pyarrow as pa
import pytest
import pytest_asyncio
from lancedb import AsyncConnection, AsyncTable, connect_async
from lancedb.index import BTree, IvfPq
@pytest_asyncio.fixture
async def db_async(tmp_path) -> AsyncConnection:
return await connect_async(tmp_path, read_consistency_interval=timedelta(seconds=0))
def sample_fixed_size_list_array(nrows, dim):
vector_data = pa.array([float(i) for i in range(dim * nrows)], pa.float32())
return pa.FixedSizeListArray.from_arrays(vector_data, dim)
DIM = 8
NROWS = 256
@pytest_asyncio.fixture
async def some_table(db_async):
data = pa.Table.from_pydict(
{
"id": list(range(256)),
"vector": sample_fixed_size_list_array(NROWS, DIM),
}
)
return await db_async.create_table(
"some_table",
data,
)
@pytest.mark.asyncio
async def test_create_scalar_index(some_table: AsyncTable):
# Can create
await some_table.create_index("id")
# Can recreate if replace=True
await some_table.create_index("id", replace=True)
# Can't recreate if replace=False
with pytest.raises(RuntimeError, match="already exists"):
await some_table.create_index("id", replace=False)
# can also specify index type
await some_table.create_index("id", config=BTree())
@pytest.mark.asyncio
async def test_create_vector_index(some_table: AsyncTable):
# Can create
await some_table.create_index("vector")
# Can recreate if replace=True
await some_table.create_index("vector", replace=True)
# Can't recreate if replace=False
with pytest.raises(RuntimeError, match="already exists"):
await some_table.create_index("vector", replace=False)
# Can also specify index type
await some_table.create_index("vector", config=IvfPq(num_partitions=100))

87
python/src/index.rs Normal file
View File

@@ -0,0 +1,87 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use lancedb::{
index::{scalar::BTreeIndexBuilder, vector::IvfPqIndexBuilder, Index as LanceDbIndex},
DistanceType,
};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pymethods, PyResult,
};
#[pyclass]
pub struct Index {
inner: Mutex<Option<LanceDbIndex>>,
}
impl Index {
pub fn consume(&self) -> PyResult<LanceDbIndex> {
self.inner
.lock()
.unwrap()
.take()
.ok_or_else(|| PyRuntimeError::new_err("cannot use an Index more than once"))
}
}
#[pymethods]
impl Index {
#[staticmethod]
pub fn ivf_pq(
distance_type: Option<String>,
num_partitions: Option<u32>,
num_sub_vectors: Option<u32>,
max_iterations: Option<u32>,
sample_rate: Option<u32>,
) -> PyResult<Self> {
let mut ivf_pq_builder = IvfPqIndexBuilder::default();
if let Some(distance_type) = distance_type {
let distance_type = match distance_type.as_str() {
"l2" => Ok(DistanceType::L2),
"cosine" => Ok(DistanceType::Cosine),
"dot" => Ok(DistanceType::Dot),
_ => Err(PyValueError::new_err(format!(
"Invalid distance type '{}'. Must be one of l2, cosine, or dot",
distance_type
))),
}?;
ivf_pq_builder = ivf_pq_builder.distance_type(distance_type);
}
if let Some(num_partitions) = num_partitions {
ivf_pq_builder = ivf_pq_builder.num_partitions(num_partitions);
}
if let Some(num_sub_vectors) = num_sub_vectors {
ivf_pq_builder = ivf_pq_builder.num_sub_vectors(num_sub_vectors);
}
if let Some(max_iterations) = max_iterations {
ivf_pq_builder = ivf_pq_builder.max_iterations(max_iterations);
}
if let Some(sample_rate) = sample_rate {
ivf_pq_builder = ivf_pq_builder.sample_rate(sample_rate);
}
Ok(Self {
inner: Mutex::new(Some(LanceDbIndex::IvfPq(ivf_pq_builder))),
})
}
#[staticmethod]
pub fn btree() -> PyResult<Self> {
Ok(Self {
inner: Mutex::new(Some(LanceDbIndex::BTree(BTreeIndexBuilder::default()))),
})
}
}

View File

@@ -14,11 +14,15 @@
use connection::{connect, Connection};
use env_logger::Env;
use index::Index;
use pyo3::{pymodule, types::PyModule, wrap_pyfunction, PyResult, Python};
use table::Table;
pub mod connection;
pub mod error;
pub mod index;
pub mod table;
pub mod util;
#[pymodule]
pub fn _lancedb(_py: Python, m: &PyModule) -> PyResult<()> {
@@ -27,6 +31,8 @@ pub fn _lancedb(_py: Python, m: &PyModule) -> PyResult<()> {
.write_style("LANCEDB_LOG_STYLE");
env_logger::init_from_env(env);
m.add_class::<Connection>()?;
m.add_class::<Table>()?;
m.add_class::<Index>()?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
Ok(())

View File

@@ -9,7 +9,7 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use crate::error::PythonErrorExt;
use crate::{error::PythonErrorExt, index::Index};
#[pyclass]
pub struct Table {
@@ -81,6 +81,28 @@ impl Table {
})
}
pub fn create_index<'a>(
self_: PyRef<'a, Self>,
column: String,
index: Option<&Index>,
replace: Option<bool>,
) -> PyResult<&'a PyAny> {
let index = if let Some(index) = index {
index.consume()?
} else {
lancedb::index::Index::Auto
};
let mut op = self_.inner_ref()?.create_index(&[column], index);
if let Some(replace) = replace {
op = op.replace(replace);
}
future_into_py(self_.py(), async move {
op.execute().await.infer_error()?;
Ok(())
})
}
pub fn __repr__(&self) -> String {
match &self.inner {
None => format!("ClosedTable({})", self.name),

35
python/src/util.rs Normal file
View File

@@ -0,0 +1,35 @@
use std::sync::Mutex;
use pyo3::{exceptions::PyRuntimeError, PyResult};
/// A wrapper around a rust builder
///
/// Rust builders are often implemented so that the builder methods
/// consume the builder and return a new one. This is not compatible
/// with the pyo3, which, being garbage collected, cannot easily obtain
/// ownership of an object.
///
/// This wrapper converts the compile-time safety of rust into runtime
/// errors if any attempt to use the builder happens after it is consumed.
pub struct BuilderWrapper<T> {
name: String,
inner: Mutex<Option<T>>,
}
impl<T> BuilderWrapper<T> {
pub fn new(name: impl AsRef<str>, inner: T) -> Self {
Self {
name: name.as_ref().to_string(),
inner: Mutex::new(Some(inner)),
}
}
pub fn consume<O>(&self, mod_fn: impl FnOnce(T) -> O) -> PyResult<O> {
let mut inner = self.inner.lock().unwrap();
let inner_builder = inner.take().ok_or_else(|| {
PyRuntimeError::new_err(format!("{} has already been consumed", self.name))
})?;
let result = mod_fn(inner_builder);
Ok(result)
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use lancedb::index::{scalar::BTreeIndexBuilder, Index};
use neon::{
context::{Context, FunctionContext},
result::JsResult,
@@ -33,9 +34,9 @@ pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise>
rt.spawn(async move {
let idx_result = table
.create_index(&[&column])
.create_index(&[column], Index::BTree(BTreeIndexBuilder::default()))
.replace(replace)
.build()
.execute()
.await;
deferred.settle_with(&channel, move |mut cx| {

View File

@@ -13,12 +13,12 @@
// limitations under the License.
use lance_linalg::distance::MetricType;
use lancedb::index::IndexBuilder;
use lancedb::index::vector::IvfPqIndexBuilder;
use lancedb::index::Index;
use neon::context::FunctionContext;
use neon::prelude::*;
use std::convert::TryFrom;
use crate::error::Error::InvalidIndexType;
use crate::error::ResultExt;
use crate::neon_ext::js_object_ext::JsObjectExt;
use crate::runtime;
@@ -39,13 +39,20 @@ pub fn table_create_vector_index(mut cx: FunctionContext) -> JsResult<JsPromise>
.map(|s| s.value(&mut cx))
.unwrap_or("vector".to_string()); // Backward compatibility
let replace = index_params
.get_opt::<JsBoolean, _, _>(&mut cx, "replace")?
.map(|r| r.value(&mut cx));
let tbl = table.clone();
let index_builder = tbl.create_index(&[&column_name]);
let index_builder =
get_index_params_builder(&mut cx, index_params, index_builder).or_throw(&mut cx)?;
let ivf_pq_builder = get_index_params_builder(&mut cx, index_params).or_throw(&mut cx)?;
let mut index_builder = tbl.create_index(&[column_name], Index::IvfPq(ivf_pq_builder));
if let Some(replace) = replace {
index_builder = index_builder.replace(replace);
}
rt.spawn(async move {
let idx_result = index_builder.build().await;
let idx_result = index_builder.execute().await;
deferred.settle_with(&channel, move |mut cx| {
idx_result.or_throw(&mut cx)?;
Ok(cx.boxed(JsTable::from(table)))
@@ -57,26 +64,17 @@ pub fn table_create_vector_index(mut cx: FunctionContext) -> JsResult<JsPromise>
fn get_index_params_builder(
cx: &mut FunctionContext,
obj: Handle<JsObject>,
builder: IndexBuilder,
) -> crate::error::Result<IndexBuilder> {
let mut builder = match obj.get::<JsString, _, _>(cx, "type")?.value(cx).as_str() {
"ivf_pq" => builder.ivf_pq(),
_ => {
return Err(InvalidIndexType {
index_type: "".into(),
})
}
};
if let Some(index_name) = obj.get_opt::<JsString, _, _>(cx, "index_name")? {
builder = builder.name(index_name.value(cx).as_str());
) -> crate::error::Result<IvfPqIndexBuilder> {
if obj.get_opt::<JsString, _, _>(cx, "index_name")?.is_some() {
return Err(crate::error::Error::LanceDB {
message: "Setting the index_name is no longer supported".to_string(),
});
}
let mut builder = IvfPqIndexBuilder::default();
if let Some(metric_type) = obj.get_opt::<JsString, _, _>(cx, "metric_type")? {
let metric_type = MetricType::try_from(metric_type.value(cx).as_str())?;
builder = builder.metric_type(metric_type);
builder = builder.distance_type(metric_type);
}
if let Some(np) = obj.get_opt_u32(cx, "num_partitions")? {
builder = builder.num_partitions(np);
}
@@ -86,11 +84,5 @@ fn get_index_params_builder(
if let Some(max_iters) = obj.get_opt_u32(cx, "max_iters")? {
builder = builder.max_iterations(max_iters);
}
if let Some(num_bits) = obj.get_opt_u32(cx, "num_bits")? {
builder = builder.num_bits(num_bits);
}
if let Some(replace) = obj.get_opt::<JsBoolean, _, _>(cx, "replace")? {
builder = builder.replace(replace.value(cx));
}
Ok(builder)
}

View File

@@ -26,6 +26,7 @@ lance = { workspace = true }
lance-index = { workspace = true }
lance-linalg = { workspace = true }
lance-testing = { workspace = true }
pin-project = { workspace = true }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
log.workspace = true
async-trait = "0"

View File

@@ -20,6 +20,7 @@ use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lancedb::connection::Connection;
use lancedb::index::Index;
use lancedb::{connect, Result, Table as LanceDbTable};
#[tokio::main]
@@ -142,23 +143,18 @@ async fn create_empty_table(db: &Connection) -> Result<LanceDbTable> {
async fn create_index(table: &LanceDbTable) -> Result<()> {
// --8<-- [start:create_index]
table
.create_index(&["vector"])
.ivf_pq()
.num_partitions(8)
.build()
.await
table.create_index(&["vector"], Index::Auto).execute().await
// --8<-- [end:create_index]
}
async fn search(table: &LanceDbTable) -> Result<Vec<RecordBatch>> {
// --8<-- [start:search]
Ok(table
table
.search(&[1.0; 128])
.limit(2)
.execute_stream()
.await?
.try_collect::<Vec<_>>()
.await?)
.await
// --8<-- [end:search]
}

View File

@@ -12,4 +12,92 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use lance::arrow::*;
use std::{pin::Pin, sync::Arc};
pub use arrow_array;
pub use arrow_schema;
use futures::{Stream, StreamExt};
use crate::error::Result;
/// An iterator of batches that also has a schema
pub trait RecordBatchReader: Iterator<Item = Result<arrow_array::RecordBatch>> {
/// Returns the schema of this `RecordBatchReader`.
///
/// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
/// reader should have the same schema as returned from this method.
fn schema(&self) -> Arc<arrow_schema::Schema>;
}
/// A simple RecordBatchReader formed from the two parts (iterator + schema)
pub struct SimpleRecordBatchReader<I: Iterator<Item = Result<arrow_array::RecordBatch>>> {
pub schema: Arc<arrow_schema::Schema>,
pub batches: I,
}
impl<I: Iterator<Item = Result<arrow_array::RecordBatch>>> Iterator for SimpleRecordBatchReader<I> {
type Item = Result<arrow_array::RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
self.batches.next()
}
}
impl<I: Iterator<Item = Result<arrow_array::RecordBatch>>> RecordBatchReader
for SimpleRecordBatchReader<I>
{
fn schema(&self) -> Arc<arrow_schema::Schema> {
self.schema.clone()
}
}
/// A stream of batches that also has a schema
pub trait RecordBatchStream: Stream<Item = Result<arrow_array::RecordBatch>> {
/// Returns the schema of this `RecordBatchStream`.
///
/// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> Arc<arrow_schema::Schema>;
}
/// A boxed RecordBatchStream that is also Send
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
impl<I: lance::io::RecordBatchStream + 'static> From<I> for SendableRecordBatchStream {
fn from(stream: I) -> Self {
let schema = stream.schema();
let mapped_stream = Box::pin(stream.map(|r| r.map_err(Into::into)));
Box::pin(SimpleRecordBatchStream {
schema,
stream: mapped_stream,
})
}
}
/// A simple RecordBatchStream formed from the two parts (stream + schema)
#[pin_project::pin_project]
pub struct SimpleRecordBatchStream<S: Stream<Item = Result<arrow_array::RecordBatch>>> {
pub schema: Arc<arrow_schema::Schema>,
#[pin]
pub stream: S,
}
impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> Stream for SimpleRecordBatchStream<S> {
type Item = Result<arrow_array::RecordBatch>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.stream.poll_next(cx)
}
}
impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> RecordBatchStream
for SimpleRecordBatchStream<S>
{
fn schema(&self) -> Arc<arrow_schema::Schema> {
self.schema.clone()
}
}

View File

@@ -12,181 +12,52 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{cmp::max, sync::Arc};
use lance_index::IndexType;
pub use lance_linalg::distance::MetricType;
pub mod vector;
use std::sync::Arc;
use crate::{table::TableInternal, Result};
/// Index Parameters.
pub enum IndexParams {
Scalar {
replace: bool,
},
IvfPq {
replace: bool,
metric_type: MetricType,
num_partitions: u64,
num_sub_vectors: u32,
num_bits: u32,
sample_rate: u32,
max_iterations: u32,
},
use self::{scalar::BTreeIndexBuilder, vector::IvfPqIndexBuilder};
pub mod scalar;
pub mod vector;
pub enum Index {
Auto,
BTree(BTreeIndexBuilder),
IvfPq(IvfPqIndexBuilder),
}
/// Builder for Index Parameters.
/// Builder for the create_index operation
///
/// The methods on this builder are used to specify options common to all indices.
pub struct IndexBuilder {
parent: Arc<dyn TableInternal>,
pub(crate) index: Index,
pub(crate) columns: Vec<String>,
// General parameters
/// Index name.
pub(crate) name: Option<String>,
/// Replace the existing index.
pub(crate) replace: bool,
pub(crate) index_type: IndexType,
// Scalar index parameters
// Nothing to set here.
// IVF_PQ parameters
pub(crate) metric_type: MetricType,
pub(crate) num_partitions: Option<u32>,
// PQ related
pub(crate) num_sub_vectors: Option<u32>,
pub(crate) num_bits: u32,
/// The rate to find samples to train kmeans.
pub(crate) sample_rate: u32,
/// Max iteration to train kmeans.
pub(crate) max_iterations: u32,
}
impl IndexBuilder {
pub(crate) fn new(parent: Arc<dyn TableInternal>, columns: &[&str]) -> Self {
pub(crate) fn new(parent: Arc<dyn TableInternal>, columns: Vec<String>, index: Index) -> Self {
Self {
parent,
columns: columns.iter().map(|c| c.to_string()).collect(),
name: None,
index,
columns,
replace: true,
index_type: IndexType::Scalar,
metric_type: MetricType::L2,
num_partitions: None,
num_sub_vectors: None,
num_bits: 8,
sample_rate: 256,
max_iterations: 50,
}
}
/// Build a Scalar Index.
/// Whether to replace the existing index, the default is `true`.
///
/// Accepted parameters:
/// - `replace`: Replace the existing index.
/// - `name`: Index name. Default: `None`
pub fn scalar(mut self) -> Self {
self.index_type = IndexType::Scalar;
self
}
/// Build an IVF PQ index.
///
/// Accepted parameters:
/// - `replace`: Replace the existing index.
/// - `name`: Index name. Default: `None`
/// - `metric_type`: [MetricType] to use to build Vector Index.
/// - `num_partitions`: Number of IVF partitions.
/// - `num_sub_vectors`: Number of sub-vectors of PQ.
/// - `num_bits`: Number of bits used for PQ centroids.
/// - `sample_rate`: The rate to find samples to train kmeans.
/// - `max_iterations`: Max iteration to train kmeans.
pub fn ivf_pq(mut self) -> Self {
self.index_type = IndexType::Vector;
self
}
/// The columns to build index on.
pub fn columns(mut self, cols: &[&str]) -> Self {
self.columns = cols.iter().map(|s| s.to_string()).collect();
self
}
/// Whether to replace the existing index, default is `true`.
/// If this is false, and another index already exists on the same columns
/// and the same name, then an error will be returned. This is true even if
/// that index is out of date.
pub fn replace(mut self, v: bool) -> Self {
self.replace = v;
self
}
/// Set the index name.
pub fn name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
/// [MetricType] to use to build Vector Index.
///
/// Default value is [MetricType::L2].
pub fn metric_type(mut self, metric_type: MetricType) -> Self {
self.metric_type = metric_type;
self
}
/// Number of IVF partitions.
pub fn num_partitions(mut self, num_partitions: u32) -> Self {
self.num_partitions = Some(num_partitions);
self
}
/// Number of sub-vectors of PQ.
pub fn num_sub_vectors(mut self, num_sub_vectors: u32) -> Self {
self.num_sub_vectors = Some(num_sub_vectors);
self
}
/// Number of bits used for PQ centroids.
pub fn num_bits(mut self, num_bits: u32) -> Self {
self.num_bits = num_bits;
self
}
/// The rate to find samples to train kmeans.
pub fn sample_rate(mut self, sample_rate: u32) -> Self {
self.sample_rate = sample_rate;
self
}
/// Max iteration to train kmeans.
pub fn max_iterations(mut self, max_iterations: u32) -> Self {
self.max_iterations = max_iterations;
self
}
/// Build the parameters.
pub async fn build(self) -> Result<()> {
self.parent.clone().do_create_index(self).await
}
}
pub(crate) fn suggested_num_partitions(rows: usize) -> u32 {
let num_partitions = (rows as f64).sqrt() as u32;
max(1, num_partitions)
}
pub(crate) fn suggested_num_sub_vectors(dim: u32) -> u32 {
if dim % 16 == 0 {
// Should be more aggressive than this default.
dim / 16
} else if dim % 8 == 0 {
dim / 8
} else {
log::warn!(
"The dimension of the vector is not divisible by 8 or 16, \
which may cause performance degradation in PQ"
);
1
pub async fn execute(self) -> Result<()> {
self.parent.clone().create_index(self).await
}
}

View File

@@ -0,0 +1,30 @@
//! Scalar indices are exact indices that are used to quickly satisfy a variety of filters
//! against a column of scalar values.
//!
//! Scalar indices are currently supported on numeric, string, boolean, and temporal columns.
//!
//! A scalar index will help with queries with filters like `x > 10`, `x < 10`, `x = 10`,
//! etc. Scalar indices can also speed up prefiltering for vector searches. A single
//! vector search with prefiltering can use both a scalar index and a vector index.
/// Builder for a btree index
///
/// A btree index is an index on scalar columns. The index stores a copy of the column
/// in sorted order. A header entry is created for each block of rows (currently the
/// block size is fixed at 4096). These header entries are stored in a separate
/// cacheable structure (a btree). To search for data the header is used to determine
/// which blocks need to be read from disk.
///
/// For example, a btree index in a table with 1Bi rows requires sizeof(Scalar) * 256Ki
/// bytes of memory and will generally need to read sizeof(Scalar) * 4096 bytes to find
/// the correct row ids.
///
/// This index is good for scalar columns with mostly distinct values and does best when
/// the query is highly selective.
///
/// The btree index does not currently have any parameters though parameters such as the
/// block size may be added in the future.
#[derive(Default, Debug, Clone)]
pub struct BTreeIndexBuilder {}
impl BTreeIndexBuilder {}

View File

@@ -12,10 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Vector indices are approximate indices that are used to find rows similar to
//! a query vector. Vector indices speed up vector searches.
//!
//! Vector indices are only supported on fixed-size-list (tensor) columns of floating point
//! values
use std::cmp::max;
use serde::Deserialize;
use lance::table::format::{Index, Manifest};
use crate::DistanceType;
pub struct VectorIndex {
pub columns: Vec<String>,
pub index_name: String,
@@ -42,3 +51,145 @@ pub struct VectorIndexStatistics {
pub num_indexed_rows: usize,
pub num_unindexed_rows: usize,
}
/// Builder for an IVF PQ index.
///
/// This index stores a compressed (quantized) copy of every vector. These vectors
/// are grouped into partitions of similar vectors. Each partition keeps track of
/// a centroid which is the average value of all vectors in the group.
///
/// During a query the centroids are compared with the query vector to find the closest
/// partitions. The compressed vectors in these partitions are then searched to find
/// the closest vectors.
///
/// The compression scheme is called product quantization. Each vector is divided into
/// subvectors and then each subvector is quantized into a small number of bits. the
/// parameters `num_bits` and `num_subvectors` control this process, providing a tradeoff
/// between index size (and thus search speed) and index accuracy.
///
/// The partitioning process is called IVF and the `num_partitions` parameter controls how
/// many groups to create.
///
/// Note that training an IVF PQ index on a large dataset is a slow operation and
/// currently is also a memory intensive operation.
#[derive(Debug, Clone)]
pub struct IvfPqIndexBuilder {
pub(crate) distance_type: DistanceType,
pub(crate) num_partitions: Option<u32>,
pub(crate) num_sub_vectors: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
}
impl Default for IvfPqIndexBuilder {
fn default() -> Self {
Self {
distance_type: DistanceType::L2,
num_partitions: None,
num_sub_vectors: None,
sample_rate: 256,
max_iterations: 50,
}
}
}
impl IvfPqIndexBuilder {
/// [DistanceType] to use to build the index.
///
/// Default value is [DistanceType::L2].
///
/// This is used when training the index to calculate the IVF partitions (vectors are
/// grouped in partitions with similar vectors according to this distance type) and to
/// calculate a subvector's code during quantization.
///
/// The metric type used to train an index MUST match the metric type used to search the
/// index. Failure to do so will yield inaccurate results.
pub fn distance_type(mut self, distance_type: DistanceType) -> Self {
self.distance_type = distance_type;
self
}
/// The number of IVF partitions to create.
///
/// This value should generally scale with the number of rows in the dataset. By default
/// the number of partitions is the square root of the number of rows.
///
/// If this value is too large then the first part of the search (picking the right partition)
/// will be slow. If this value is too small then the second part of the search (searching
/// within a partition) will be slow.
pub fn num_partitions(mut self, num_partitions: u32) -> Self {
self.num_partitions = Some(num_partitions);
self
}
/// Number of sub-vectors of PQ.
///
/// This value controls how much the vector is compressed during the quantization step.
/// The more sub vectors there are the less the vector is compressed. The default is
/// the dimension of the vector divided by 16. If the dimension is not evenly divisible
/// by 16 we use the dimension divded by 8.
///
/// The above two cases are highly preferred. Having 8 or 16 values per subvector allows
/// us to use efficient SIMD instructions.
///
/// If the dimension is not visible by 8 then we use 1 subvector. This is not ideal and
/// will likely result in poor performance.
pub fn num_sub_vectors(mut self, num_sub_vectors: u32) -> Self {
self.num_sub_vectors = Some(num_sub_vectors);
self
}
/// The rate used to calculate the number of training vectors for kmeans.
///
/// When an IVF PQ index is trained, we need to calculate partitions. These are groups
/// of vectors that are similar to each other. To do this we use an algorithm called kmeans.
///
/// Running kmeans on a large dataset can be slow. To speed this up we run kmeans on a
/// random sample of the data. This parameter controls the size of the sample. The total
/// number of vectors used to train the index is `sample_rate * num_partitions`.
///
/// Increasing this value might improve the quality of the index but in most cases the
/// default should be sufficient.
///
/// The default value is 256.
pub fn sample_rate(mut self, sample_rate: u32) -> Self {
self.sample_rate = sample_rate;
self
}
/// Max iterations to train kmeans.
///
/// When training an IVF PQ index we use kmeans to calculate the partitions. This parameter
/// controls how many iterations of kmeans to run.
///
/// Increasing this might improve the quality of the index but in most cases the parameter
/// is unused because kmeans will converge with fewer iterations. The parameter is only
/// used in cases where kmeans does not appear to converge. In those cases it is unlikely
/// that setting this larger will lead to the index converging anyways.
///
/// The default value is 50.
pub fn max_iterations(mut self, max_iterations: u32) -> Self {
self.max_iterations = max_iterations;
self
}
}
pub(crate) fn suggested_num_partitions(rows: usize) -> u32 {
let num_partitions = (rows as f64).sqrt() as u32;
max(1, num_partitions)
}
pub(crate) fn suggested_num_sub_vectors(dim: u32) -> u32 {
if dim % 16 == 0 {
// Should be more aggressive than this default.
dim / 16
} else if dim % 8 == 0 {
dim / 8
} else {
log::warn!(
"The dimension of the vector is not divisible by 8 or 16, \
which may cause performance degradation in PQ"
);
1
}
}

View File

@@ -130,16 +130,15 @@
//! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
//! # RecordBatchIterator, Int32Array};
//! # use arrow_schema::{Schema, Field, DataType};
//! use lancedb::index::Index;
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let tbl = db.open_table("idx_test").execute().await.unwrap();
//! tbl.create_index(&["vector"])
//! .ivf_pq()
//! .num_partitions(256)
//! .build()
//! .await
//! .unwrap();
//! tbl.create_index(&["vector"], Index::Auto)
//! .execute()
//! .await
//! .unwrap();
//! # });
//! ```
//!
@@ -181,6 +180,7 @@
//! # });
//! ```
pub mod arrow;
pub mod connection;
pub mod data;
pub mod error;
@@ -194,6 +194,7 @@ pub mod table;
pub mod utils;
pub use error::{Error, Result};
pub use lance_linalg::distance::DistanceType;
pub use table::Table;
/// Connect to a database

View File

@@ -15,9 +15,9 @@
use std::sync::Arc;
use arrow_array::Float32Array;
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance_linalg::distance::MetricType;
use crate::arrow::SendableRecordBatchStream;
use crate::error::Result;
use crate::table::TableInternal;
@@ -81,13 +81,15 @@ impl Query {
}
}
/// Convert the query plan to a [`DatasetRecordBatchStream`]
/// Convert the query plan to a [`SendableRecordBatchStream`]
///
/// # Returns
///
/// * A [DatasetRecordBatchStream] with the query's results.
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
self.parent.clone().do_query(self).await
/// * A [SendableRecordBatchStream] with the query's results.
pub async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
Ok(SendableRecordBatchStream::from(
self.parent.clone().query(self).await?,
))
}
/// Set the column to query

View File

@@ -51,19 +51,19 @@ impl TableInternal for RemoteTable {
async fn count_rows(&self, _filter: Option<String>) -> Result<usize> {
todo!()
}
async fn do_add(&self, _add: AddDataBuilder) -> Result<()> {
async fn add(&self, _add: AddDataBuilder) -> Result<()> {
todo!()
}
async fn do_query(&self, _query: &Query) -> Result<DatasetRecordBatchStream> {
async fn query(&self, _query: &Query) -> Result<DatasetRecordBatchStream> {
todo!()
}
async fn delete(&self, _predicate: &str) -> Result<()> {
todo!()
}
async fn do_create_index(&self, _index: IndexBuilder) -> Result<()> {
async fn create_index(&self, _index: IndexBuilder) -> Result<()> {
todo!()
}
async fn do_merge_insert(
async fn merge_insert(
&self,
_params: MergeInsertBuilder,
_new_data: Box<dyn RecordBatchReader + Send>,

View File

@@ -18,7 +18,7 @@ use std::path::Path;
use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use chrono::Duration;
use lance::dataset::builder::DatasetBuilder;
@@ -27,13 +27,11 @@ use lance::dataset::optimize::{
compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions,
};
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
use lance::dataset::{
ColumnAlteration, Dataset, NewColumnTransform, UpdateBuilder, WhenMatched, WriteMode,
WriteParams,
};
use lance::dataset::{Dataset, UpdateBuilder, WhenMatched, WriteMode, WriteParams};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::index::scalar::ScalarIndexParams;
use lance::io::WrappingObjectStore;
use lance_index::IndexType;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
@@ -41,9 +39,10 @@ use log::info;
use snafu::whatever;
use crate::error::{Error, Result};
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
use crate::index::vector::{IvfPqIndexBuilder, VectorIndex, VectorIndexStatistics};
use crate::index::{
suggested_num_partitions, suggested_num_sub_vectors, IndexBuilder, IndexParams,
vector::{suggested_num_partitions, suggested_num_sub_vectors},
Index, IndexBuilder,
};
use crate::query::{Query, Select, DEFAULT_TOP_K};
use crate::utils::{default_vector_column, PatchReadParam, PatchWriteParam};
@@ -146,7 +145,7 @@ impl AddDataBuilder {
}
pub async fn execute(self) -> Result<()> {
self.parent.clone().do_add(self).await
self.parent.clone().add(self).await
}
}
@@ -161,11 +160,11 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn
async fn schema(&self) -> Result<SchemaRef>;
/// Count the number of rows in this table.
async fn count_rows(&self, filter: Option<String>) -> Result<usize>;
async fn do_add(&self, add: AddDataBuilder) -> Result<()>;
async fn do_query(&self, query: &Query) -> Result<DatasetRecordBatchStream>;
async fn add(&self, add: AddDataBuilder) -> Result<()>;
async fn query(&self, query: &Query) -> Result<DatasetRecordBatchStream>;
async fn delete(&self, predicate: &str) -> Result<()>;
async fn do_create_index(&self, index: IndexBuilder) -> Result<()>;
async fn do_merge_insert(
async fn create_index(&self, index: IndexBuilder) -> Result<()>;
async fn merge_insert(
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
@@ -294,7 +293,33 @@ impl Table {
self.inner.delete(predicate).await
}
/// Create an index on the column name.
/// Create an index on the provided column(s).
///
/// Indices are used to speed up searches and are often needed when the size of the table
/// becomes large (the exact size depends on many factors but somewhere between 100K rows
/// and 1M rows is a good rule of thumb)
///
/// There are a variety of indices available. They are described more in
/// [`crate::index::Index`]. The simplest thing to do is to use `index::Index::Auto` which
/// will attempt to create the most useful index based on the column type and column
/// statistics.
///
/// Once an index is created it will remain until the data is overwritten (e.g. an
/// add operation with mode overwrite) or the indexed column is dropped.
///
/// Indices are not automatically updated with new data. If you add new data to the
/// table then the index will not include the new rows. However, a table search will
/// still consider the unindexed rows. Searches will issue both an indexed search (on
/// the data covered by the index) and a flat search (on the unindexed data) and the
/// results will be combined.
///
/// If there is enough unindexed data then the flat search will become slow and the index
/// should be optimized. Optimizing an index will add any unindexed data to the existing
/// index without rerunning the full index creation process. For more details see
/// [Table::optimize].
///
/// Note: Multi-column (composite) indices are not currently supported. However, they will
/// be supported in the future and the API is designed to be compatible with them.
///
/// # Examples
///
@@ -304,22 +329,28 @@ impl Table {
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// use lancedb::index::Index;
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
/// # let tbl = db.open_table("idx_test").execute().await.unwrap();
/// tbl.create_index(&["vector"])
/// .ivf_pq()
/// .num_partitions(256)
/// .build()
/// .await
/// .unwrap();
/// tbl.create_index(&["vector"], Index::Auto)
/// .execute()
/// .await
/// .unwrap();
/// # });
/// ```
pub fn create_index(&self, column: &[&str]) -> IndexBuilder {
IndexBuilder::new(self.inner.clone(), column)
pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
IndexBuilder::new(
self.inner.clone(),
columns
.iter()
.map(|val| val.as_ref().to_string())
.collect::<Vec<_>>(),
index,
)
}
/// Create a builder for a merge insert operation
@@ -671,6 +702,28 @@ impl NativeTable {
Ok(name.to_string())
}
fn supported_btree_data_type(dtype: &DataType) -> bool {
dtype.is_integer()
|| dtype.is_floating()
|| matches!(
dtype,
DataType::Boolean
| DataType::Utf8
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Date32
| DataType::Date64
| DataType::Timestamp(_, _)
)
}
fn supported_vector_data_type(dtype: &DataType) -> bool {
match dtype {
DataType::FixedSizeList(inner, _) => DataType::is_floating(inner.data_type()),
_ => false,
}
}
/// Creates a new Table
///
/// # Arguments
@@ -881,6 +934,102 @@ impl NativeTable {
Ok(Some(index_stats))
}
async fn create_ivf_pq_index(
&self,
index: IvfPqIndexBuilder,
field: &Field,
replace: bool,
) -> Result<()> {
if !Self::supported_vector_data_type(field.data_type()) {
return Err(Error::InvalidInput {
message: format!(
"An IVF PQ index cannot be created on the column `{}` which has data type {}",
field.name(),
field.data_type()
),
});
}
let num_partitions = if let Some(n) = index.num_partitions {
n
} else {
suggested_num_partitions(self.count_rows(None).await?)
};
let num_sub_vectors: u32 = if let Some(n) = index.num_sub_vectors {
n
} else {
match field.data_type() {
arrow_schema::DataType::FixedSizeList(_, n) => {
Ok::<u32, Error>(suggested_num_sub_vectors(*n as u32))
}
_ => Err(Error::Schema {
message: format!("Column '{}' is not a FixedSizeList", field.name()),
}),
}?
};
let mut dataset = self.dataset.get_mut().await?;
let lance_idx_params = lance::index::vector::VectorIndexParams::ivf_pq(
num_partitions as usize,
/*num_bits=*/ 8,
num_sub_vectors as usize,
false,
index.distance_type,
index.max_iterations as usize,
);
dataset
.create_index(
&[field.name()],
IndexType::Vector,
None,
&lance_idx_params,
replace,
)
.await?;
Ok(())
}
async fn create_auto_index(&self, field: &Field, opts: IndexBuilder) -> Result<()> {
if Self::supported_vector_data_type(field.data_type()) {
self.create_ivf_pq_index(IvfPqIndexBuilder::default(), field, opts.replace)
.await
} else if Self::supported_btree_data_type(field.data_type()) {
self.create_btree_index(field, opts).await
} else {
Err(Error::InvalidInput {
message: format!(
"there are no indices supported for the field `{}` with the data type {}",
field.name(),
field.data_type()
),
})
}
}
async fn create_btree_index(&self, field: &Field, opts: IndexBuilder) -> Result<()> {
if !Self::supported_btree_data_type(field.data_type()) {
return Err(Error::Schema {
message: format!(
"A BTree index cannot be created on the field `{}` which has data type {}",
field.name(),
field.data_type()
),
});
}
let mut dataset = self.dataset.get_mut().await?;
let lance_idx_params = lance::index::scalar::ScalarIndexParams {};
dataset
.create_index(
&[field.name()],
IndexType::Scalar,
None,
&lance_idx_params,
opts.replace,
)
.await?;
Ok(())
}
}
#[async_trait::async_trait]
@@ -913,7 +1062,7 @@ impl TableInternal for NativeTable {
}
}
async fn do_add(&self, add: AddDataBuilder) -> Result<()> {
async fn add(&self, add: AddDataBuilder) -> Result<()> {
let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
mode: match add.mode {
AddDataMode::Append => WriteMode::Append,
@@ -933,7 +1082,24 @@ impl TableInternal for NativeTable {
Ok(())
}
async fn do_query(&self, query: &Query) -> Result<DatasetRecordBatchStream> {
async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
if opts.columns.len() != 1 {
return Err(Error::Schema {
message: "Multi-column (composite) indices are not yet supported".to_string(),
});
}
let schema = self.schema().await?;
let field = schema.field_with_name(&opts.columns[0])?;
match opts.index {
Index::Auto => self.create_auto_index(field, opts).await,
Index::BTree(_) => self.create_btree_index(field, opts).await,
Index::IvfPq(ivf_pq) => self.create_ivf_pq_index(ivf_pq, field, opts.replace).await,
}
}
async fn query(&self, query: &Query) -> Result<DatasetRecordBatchStream> {
let ds_ref = self.dataset.get().await?;
let mut scanner: Scanner = ds_ref.scan();
@@ -992,7 +1158,7 @@ impl TableInternal for NativeTable {
Ok(scanner.try_into_stream().await?)
}
async fn do_merge_insert(
async fn merge_insert(
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
@@ -1028,112 +1194,6 @@ impl TableInternal for NativeTable {
Ok(())
}
async fn do_create_index(&self, index: IndexBuilder) -> Result<()> {
let schema = self.schema().await?;
// TODO: simplify this after GH lance#1864.
let mut index_type = &index.index_type;
let columns = if index.columns.is_empty() {
// By default we create vector index.
index_type = &IndexType::Vector;
vec![default_vector_column(&schema, None)?]
} else {
index.columns.clone()
};
if columns.len() != 1 {
return Err(Error::Schema {
message: "Only one column is supported for index".to_string(),
});
}
let column = &columns[0];
let field = schema.field_with_name(column)?;
let params = match index_type {
IndexType::Scalar => IndexParams::Scalar {
replace: index.replace,
},
IndexType::Vector => {
let num_partitions = if let Some(n) = index.num_partitions {
n
} else {
suggested_num_partitions(self.count_rows(None).await?)
};
let num_sub_vectors: u32 = if let Some(n) = index.num_sub_vectors {
n
} else {
match field.data_type() {
arrow_schema::DataType::FixedSizeList(_, n) => {
Ok::<u32, Error>(suggested_num_sub_vectors(*n as u32))
}
_ => Err(Error::Schema {
message: format!(
"Column '{}' is not a FixedSizeList",
&index.columns[0]
),
}),
}?
};
IndexParams::IvfPq {
replace: index.replace,
metric_type: index.metric_type,
num_partitions: num_partitions as u64,
num_sub_vectors,
num_bits: index.num_bits,
sample_rate: index.sample_rate,
max_iterations: index.max_iterations,
}
}
};
let tbl = self
.as_native()
.expect("Only native table is supported here");
let mut dataset = tbl.dataset.get_mut().await?;
match params {
IndexParams::Scalar { replace } => {
dataset
.create_index(
&[&column],
IndexType::Scalar,
None,
&ScalarIndexParams::default(),
replace,
)
.await?
}
IndexParams::IvfPq {
replace,
metric_type,
num_partitions,
num_sub_vectors,
num_bits,
max_iterations,
..
} => {
let lance_idx_params = lance::index::vector::VectorIndexParams::ivf_pq(
num_partitions as usize,
num_bits as u8,
num_sub_vectors as usize,
false,
metric_type,
max_iterations as usize,
);
dataset
.create_index(
&[column],
IndexType::Vector,
None,
&lance_idx_params,
replace,
)
.await?;
}
}
Ok(())
}
/// Delete rows from the table
async fn delete(&self, predicate: &str) -> Result<()> {
self.dataset.get_mut().await?.delete(predicate).await?;
@@ -1858,11 +1918,8 @@ mod tests {
);
table
.create_index(&["embeddings"])
.ivf_pq()
.name("my_index")
.num_partitions(256)
.build()
.create_index(&["embeddings"], Index::Auto)
.execute()
.await
.unwrap();

View File

@@ -98,6 +98,6 @@ impl MergeInsertBuilder {
///
/// Nothing is returned but the [`super::Table`] is updated
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<()> {
self.table.clone().do_merge_insert(self, new_data).await
self.table.clone().merge_insert(self, new_data).await
}
}