From 356e89a800763bb1f03cb2f9098e72a97480191c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Mar 2024 05:17:05 -0700 Subject: [PATCH] feat: add create_index to the async python API (#1052) This also refactors the rust lancedb index builder API (and, correspondingly, the nodejs API) --- Cargo.toml | 3 +- node/src/test/test.ts | 4 +- nodejs/Cargo.toml | 4 +- nodejs/__test__/table.test.ts | 100 ++++----- nodejs/lancedb/index.ts | 12 +- nodejs/lancedb/indexer.ts | 105 --------- nodejs/lancedb/indices.ts | 195 +++++++++++++++++ nodejs/lancedb/native.d.ts | 21 +- nodejs/lancedb/native.js | 6 +- nodejs/lancedb/table.ts | 31 +-- nodejs/src/error.rs | 12 ++ nodejs/src/index.rs | 155 +++++--------- nodejs/src/iterator.rs | 6 +- nodejs/src/lib.rs | 1 + nodejs/src/query.rs | 2 +- nodejs/src/table.rs | 29 ++- python/python/lancedb/__init__.py | 21 +- python/python/lancedb/_lancedb.pyi | 19 +- python/python/lancedb/index.py | 157 ++++++++++++++ python/python/lancedb/table.py | 125 +++-------- python/python/tests/test_index.py | 61 ++++++ python/src/index.rs | 87 ++++++++ python/src/lib.rs | 6 + python/src/table.rs | 24 ++- python/src/util.rs | 35 +++ rust/ffi/node/src/index/scalar.rs | 5 +- rust/ffi/node/src/index/vector.rs | 48 ++--- rust/lancedb/Cargo.toml | 1 + rust/lancedb/examples/simple.rs | 12 +- rust/lancedb/src/arrow.rs | 90 +++++++- rust/lancedb/src/index.rs | 175 ++------------- rust/lancedb/src/index/scalar.rs | 30 +++ rust/lancedb/src/index/vector.rs | 151 +++++++++++++ rust/lancedb/src/lib.rs | 13 +- rust/lancedb/src/query.rs | 12 +- rust/lancedb/src/remote/table.rs | 8 +- rust/lancedb/src/table.rs | 329 +++++++++++++++++------------ rust/lancedb/src/table/merge.rs | 2 +- 38 files changed, 1330 insertions(+), 767 deletions(-) delete mode 100644 nodejs/lancedb/indexer.ts create mode 100644 nodejs/lancedb/indices.ts create mode 100644 nodejs/src/error.rs create mode 100644 python/python/lancedb/index.py create mode 100644 python/python/tests/test_index.py create mode 100644 python/src/index.rs create mode 100644 python/src/util.rs create mode 100644 rust/lancedb/src/index/scalar.rs diff --git a/Cargo.toml b/Cargo.toml index 0640c024..bfce7b5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,13 +28,14 @@ arrow-schema = "50.0" arrow-arith = "50.0" arrow-cast = "50.0" async-trait = "0" -chrono = "0.4.23" +chrono = "0.4.35" half = { "version" = "=2.3.1", default-features = false, features = [ "num-traits", ] } futures = "0" log = "0.4" object_store = "0.9.0" +pin-project = "1.0.7" snafu = "0.7.4" url = "2" num-traits = "0.2" diff --git a/node/src/test/test.ts b/node/src/test/test.ts index 7056b992..433d7fc8 100644 --- a/node/src/test/test.ts +++ b/node/src/test/test.ts @@ -750,11 +750,11 @@ describe('LanceDB client', function () { num_sub_vectors: 2 }) await expect(createIndex).to.be.rejectedWith( - /VectorIndex requires the column data type to be fixed size list of float32s/ + "index cannot be created on the column `name` which has data type Utf8" ) }) - it('it should fail when the column is not a vector', async function () { + it('it should fail when num_partitions is invalid', async function () { const uri = await createTestDB(32, 300) const con = await lancedb.connect(uri) const table = await con.openTable('vectors') diff --git a/nodejs/Cargo.toml b/nodejs/Cargo.toml index 0d734209..dd430853 100644 --- a/nodejs/Cargo.toml +++ b/nodejs/Cargo.toml @@ -14,12 +14,10 @@ crate-type = ["cdylib"] [dependencies] arrow-ipc.workspace = true futures.workspace = true -lance-linalg.workspace = true -lance.workspace = true lancedb = { path = "../rust/lancedb" } napi = { version = "2.15", default-features = false, features = [ "napi7", - "async" + "async", ] } napi-derive = "2" diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 1c4c8879..93e8081f 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -27,6 +27,7 @@ import { Float64, } from "apache-arrow"; import { makeArrowTable } from "../dist/arrow"; +import { Index } from "../dist/indices"; describe("Given a table", () => { let tmpDir: tmp.DirResult; @@ -67,19 +68,17 @@ describe("Given a table", () => { }); }); -describe("Test creating index", () => { +describe("When creating an index", () => { let tmpDir: tmp.DirResult; const schema = new Schema([ new Field("id", new Int32(), true), new Field("vec", new FixedSizeList(32, new Field("item", new Float32()))), ]); + let tbl: Table; + let queryVec: number[]; - beforeEach(() => { + beforeEach(async () => { tmpDir = tmp.dirSync({ unsafeCleanup: true }); - }); - afterEach(() => tmpDir.removeCallback()); - - test("create vector index with no column", async () => { const db = await connect(tmpDir.name); const data = makeArrowTable( Array(300) @@ -94,8 +93,13 @@ describe("Test creating index", () => { schema, }, ); - const tbl = await db.createTable("test", data); - await tbl.createIndex().build(); + queryVec = data.toArray()[5].vec.toJSON(); + tbl = await db.createTable("test", data); + }); + afterEach(() => tmpDir.removeCallback()); + + it("should create a vector index on vector columns", async () => { + await tbl.createIndex("vec"); // check index directory const indexDir = path.join(tmpDir.name, "test.lance", "_indices"); @@ -103,38 +107,47 @@ describe("Test creating index", () => { // TODO: check index type. // Search without specifying the column - const queryVector = data.toArray()[5].vec.toJSON(); - const rst = await tbl.query().nearestTo(queryVector).limit(2).toArrow(); + const rst = await tbl.query().nearestTo(queryVec).limit(2).toArrow(); expect(rst.numRows).toBe(2); // Search with specifying the column - const rst2 = await tbl.search(queryVector, "vec").limit(2).toArrow(); + const rst2 = await tbl.search(queryVec, "vec").limit(2).toArrow(); expect(rst2.numRows).toBe(2); expect(rst.toString()).toEqual(rst2.toString()); }); - test("no vector column available", async () => { - const db = await connect(tmpDir.name); - const tbl = await db.createTable( - "no_vec", - makeArrowTable([ - { id: 1, val: 2 }, - { id: 2, val: 3 }, - ]), - ); - await expect(tbl.createIndex().build()).rejects.toThrow( - "No vector column found", - ); + it("should allow parameters to be specified", async () => { + await tbl.createIndex("vec", { + config: Index.ivfPq({ + numPartitions: 10, + }), + }); - await tbl.createIndex("val").build(); - const indexDir = path.join(tmpDir.name, "no_vec.lance", "_indices"); + // TODO: Verify parameters when we can load index config as part of list indices + }); + + it("should allow me to replace (or not) an existing index", async () => { + await tbl.createIndex("id"); + // Default is replace=true + await tbl.createIndex("id"); + await expect(tbl.createIndex("id", { replace: false })).rejects.toThrow( + "already exists", + ); + await tbl.createIndex("id", { replace: true }); + }); + + test("should create a scalar index on scalar columns", async () => { + await tbl.createIndex("id"); + const indexDir = path.join(tmpDir.name, "test.lance", "_indices"); expect(fs.readdirSync(indexDir)).toHaveLength(1); for await (const r of tbl.query().filter("id > 1").select(["id"])) { - expect(r.numRows).toBe(1); + expect(r.numRows).toBe(298); } }); + // TODO: Move this test to the query API test (making sure we can reject queries + // when the dimension is incorrect) test("two columns with different dimensions", async () => { const db = await connect(tmpDir.name); const schema = new Schema([ @@ -164,14 +177,9 @@ describe("Test creating index", () => { ); // Only build index over v1 - await expect(tbl.createIndex().build()).rejects.toThrow( - /.*More than one vector columns found.*/, - ); - tbl - .createIndex("vec") - // eslint-disable-next-line @typescript-eslint/naming-convention - .ivf_pq({ num_partitions: 2, num_sub_vectors: 2 }) - .build(); + await tbl.createIndex("vec", { + config: Index.ivfPq({ numPartitions: 2, numSubVectors: 2 }), + }); const rst = await tbl .query() @@ -205,30 +213,6 @@ describe("Test creating index", () => { expect(rst64Query.toString()).toEqual(rst64Search.toString()); expect(rst64Query.numRows).toBe(2); }); - - test("create scalar index", async () => { - const db = await connect(tmpDir.name); - const data = makeArrowTable( - Array(300) - .fill(1) - .map((_, i) => ({ - id: i, - vec: Array(32) - .fill(1) - .map(() => Math.random()), - })), - { - schema, - }, - ); - const tbl = await db.createTable("test", data); - await tbl.createIndex("id").build(); - - // check index directory - const indexDir = path.join(tmpDir.name, "test.lance", "_indices"); - expect(fs.readdirSync(indexDir)).toHaveLength(1); - // TODO: check index type. - }); }); describe("Read consistency interval", () => { diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index 3ad58c13..9ec464df 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -18,15 +18,9 @@ import { ConnectionOptions, } from "./native.js"; -export { - ConnectionOptions, - WriteOptions, - Query, - MetricType, -} from "./native.js"; -export { Connection } from "./connection"; -export { Table } from "./table"; -export { IvfPQOptions, IndexBuilder } from "./indexer"; +export { ConnectionOptions, WriteOptions, Query } from "./native.js"; +export { Connection, CreateTableOptions } from "./connection"; +export { Table, AddDataOptions } from "./table"; /** * Connect to a LanceDB instance at the given URI. diff --git a/nodejs/lancedb/indexer.ts b/nodejs/lancedb/indexer.ts deleted file mode 100644 index 4c193940..00000000 --- a/nodejs/lancedb/indexer.ts +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2024 Lance Developers. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// TODO: Re-enable this as part of https://github.com/lancedb/lancedb/pull/1052 -/* eslint-disable @typescript-eslint/naming-convention */ - -import { - MetricType, - IndexBuilder as NativeBuilder, - Table as NativeTable, -} from "./native"; - -/** Options to create `IVF_PQ` index */ -export interface IvfPQOptions { - /** Number of IVF partitions. */ - num_partitions?: number; - - /** Number of sub-vectors in PQ coding. */ - num_sub_vectors?: number; - - /** Number of bits used for each PQ code. - */ - num_bits?: number; - - /** Metric type to calculate the distance between vectors. - * - * Supported metrics: `L2`, `Cosine` and `Dot`. - */ - metric_type?: MetricType; - - /** Number of iterations to train K-means. - * - * Default is 50. The more iterations it usually yield better results, - * but it takes longer to train. - */ - max_iterations?: number; - - sample_rate?: number; -} - -/** - * Building an index on LanceDB {@link Table} - * - * @see {@link Table.createIndex} for detailed usage. - */ -export class IndexBuilder { - private inner: NativeBuilder; - - constructor(tbl: NativeTable) { - this.inner = tbl.createIndex(); - } - - /** Instruct the builder to build an `IVF_PQ` index */ - ivf_pq(options?: IvfPQOptions): IndexBuilder { - this.inner.ivfPq( - options?.metric_type, - options?.num_partitions, - options?.num_sub_vectors, - options?.num_bits, - options?.max_iterations, - options?.sample_rate, - ); - return this; - } - - /** Instruct the builder to build a Scalar index. */ - scalar(): IndexBuilder { - this.scalar(); - return this; - } - - /** Set the column(s) to create index on top of. */ - column(col: string): IndexBuilder { - this.inner.column(col); - return this; - } - - /** Set to true to replace existing index. */ - replace(val: boolean): IndexBuilder { - this.inner.replace(val); - return this; - } - - /** Specify the name of the index. Optional */ - name(n: string): IndexBuilder { - this.inner.name(n); - return this; - } - - /** Building the index. */ - async build() { - await this.inner.build(); - } -} diff --git a/nodejs/lancedb/indices.ts b/nodejs/lancedb/indices.ts new file mode 100644 index 00000000..c14d335e --- /dev/null +++ b/nodejs/lancedb/indices.ts @@ -0,0 +1,195 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { Index as LanceDbIndex } from "./native"; + +/** + * Options to create an `IVF_PQ` index + */ +export interface IvfPqOptions { + /** The number of IVF partitions to create. + * + * This value should generally scale with the number of rows in the dataset. + * By default the number of partitions is the square root of the number of + * rows. + * + * If this value is too large then the first part of the search (picking the + * right partition) will be slow. If this value is too small then the second + * part of the search (searching within a partition) will be slow. + */ + numPartitions?: number; + + /** Number of sub-vectors of PQ. + * + * This value controls how much the vector is compressed during the quantization step. + * The more sub vectors there are the less the vector is compressed. The default is + * the dimension of the vector divided by 16. If the dimension is not evenly divisible + * by 16 we use the dimension divded by 8. + * + * The above two cases are highly preferred. Having 8 or 16 values per subvector allows + * us to use efficient SIMD instructions. + * + * If the dimension is not visible by 8 then we use 1 subvector. This is not ideal and + * will likely result in poor performance. + */ + numSubVectors?: number; + + /** [DistanceType] to use to build the index. + * + * Default value is [DistanceType::L2]. + * + * This is used when training the index to calculate the IVF partitions + * (vectors are grouped in partitions with similar vectors according to this + * distance type) and to calculate a subvector's code during quantization. + * + * The distance type used to train an index MUST match the distance type used + * to search the index. Failure to do so will yield inaccurate results. + * + * The following distance types are available: + * + * "l2" - Euclidean distance. This is a very common distance metric that + * accounts for both magnitude and direction when determining the distance + * between vectors. L2 distance has a range of [0, ∞). + * + * "cosine" - Cosine distance. Cosine distance is a distance metric + * calculated from the cosine similarity between two vectors. Cosine + * similarity is a measure of similarity between two non-zero vectors of an + * inner product space. It is defined to equal the cosine of the angle + * between them. Unlike L2, the cosine distance is not affected by the + * magnitude of the vectors. Cosine distance has a range of [0, 2]. + * + * Note: the cosine distance is undefined when one (or both) of the vectors + * are all zeros (there is no direction). These vectors are invalid and may + * never be returned from a vector search. + * + * "dot" - Dot product. Dot distance is the dot product of two vectors. Dot + * distance has a range of (-∞, ∞). If the vectors are normalized (i.e. their + * L2 norm is 1), then dot distance is equivalent to the cosine distance. + */ + distanceType?: "l2" | "cosine" | "dot"; + + /** Max iteration to train IVF kmeans. + * + * When training an IVF PQ index we use kmeans to calculate the partitions. This parameter + * controls how many iterations of kmeans to run. + * + * Increasing this might improve the quality of the index but in most cases these extra + * iterations have diminishing returns. + * + * The default value is 50. + */ + maxIterations?: number; + + /** The number of vectors, per partition, to sample when training IVF kmeans. + * + * When an IVF PQ index is trained, we need to calculate partitions. These are groups + * of vectors that are similar to each other. To do this we use an algorithm called kmeans. + * + * Running kmeans on a large dataset can be slow. To speed this up we run kmeans on a + * random sample of the data. This parameter controls the size of the sample. The total + * number of vectors used to train the index is `sample_rate * num_partitions`. + * + * Increasing this value might improve the quality of the index but in most cases the + * default should be sufficient. + * + * The default value is 256. + */ + sampleRate?: number; +} + +export class Index { + private readonly inner: LanceDbIndex; + private constructor(inner: LanceDbIndex) { + this.inner = inner; + } + + /** + * Create an IvfPq index + * + * This index stores a compressed (quantized) copy of every vector. These vectors + * are grouped into partitions of similar vectors. Each partition keeps track of + * a centroid which is the average value of all vectors in the group. + * + * During a query the centroids are compared with the query vector to find the closest + * partitions. The compressed vectors in these partitions are then searched to find + * the closest vectors. + * + * The compression scheme is called product quantization. Each vector is divided into + * subvectors and then each subvector is quantized into a small number of bits. the + * parameters `num_bits` and `num_subvectors` control this process, providing a tradeoff + * between index size (and thus search speed) and index accuracy. + * + * The partitioning process is called IVF and the `num_partitions` parameter controls how + * many groups to create. + * + * Note that training an IVF PQ index on a large dataset is a slow operation and + * currently is also a memory intensive operation. + */ + static ivfPq(options?: Partial) { + return new Index( + LanceDbIndex.ivfPq( + options?.distanceType, + options?.numPartitions, + options?.numSubVectors, + options?.maxIterations, + options?.sampleRate, + ), + ); + } + + /** Create a btree index + * + * A btree index is an index on a scalar columns. The index stores a copy of the column + * in sorted order. A header entry is created for each block of rows (currently the + * block size is fixed at 4096). These header entries are stored in a separate + * cacheable structure (a btree). To search for data the header is used to determine + * which blocks need to be read from disk. + * + * For example, a btree index in a table with 1Bi rows requires sizeof(Scalar) * 256Ki + * bytes of memory and will generally need to read sizeof(Scalar) * 4096 bytes to find + * the correct row ids. + * + * This index is good for scalar columns with mostly distinct values and does best when + * the query is highly selective. + * + * The btree index does not currently have any parameters though parameters such as the + * block size may be added in the future. + */ + static btree() { + return new Index(LanceDbIndex.btree()); + } +} + +export interface IndexOptions { + /** Advanced index configuration + * + * This option allows you to specify a specfic index to create and also + * allows you to pass in configuration for training the index. + * + * See the static methods on Index for details on the various index types. + * + * If this is not supplied then column data type(s) and column statistics + * will be used to determine the most useful kind of index to create. + */ + config?: Index; + /** Whether to replace the existing index + * + * If this is false, and another index already exists on the same columns + * and the same name, then an error will be returned. This is true even if + * that index is out of date. + * + * The default is true + */ + replace?: boolean; +} diff --git a/nodejs/lancedb/native.d.ts b/nodejs/lancedb/native.d.ts index ebcc7329..9e1f83d3 100644 --- a/nodejs/lancedb/native.d.ts +++ b/nodejs/lancedb/native.d.ts @@ -3,15 +3,6 @@ /* auto-generated by NAPI-RS */ -export const enum IndexType { - Scalar = 0, - IvfPq = 1 -} -export const enum MetricType { - L2 = 0, - Cosine = 1, - Dot = 2 -} /** * A definition of a column alteration. The alteration changes the column at * `path` to have the new name `name`, to be nullable if `nullable` is true, @@ -93,13 +84,9 @@ export class Connection { /** Drop table with the name. Or raise an error if the table does not exist. */ dropTable(name: string): Promise } -export class IndexBuilder { - replace(v: boolean): void - column(c: string): void - name(name: string): void - ivfPq(metricType?: MetricType | undefined | null, numPartitions?: number | undefined | null, numSubVectors?: number | undefined | null, numBits?: number | undefined | null, maxIterations?: number | undefined | null, sampleRate?: number | undefined | null): void - scalar(): void - build(): Promise +export class Index { + static ivfPq(distanceType?: string | undefined | null, numPartitions?: number | undefined | null, numSubVectors?: number | undefined | null, maxIterations?: number | undefined | null, sampleRate?: number | undefined | null): Index + static btree(): Index } /** Typescript-style Async Iterator over RecordBatches */ export class RecordBatchIterator { @@ -125,7 +112,7 @@ export class Table { add(buf: Buffer, mode: string): Promise countRows(filter?: string | undefined | null): Promise delete(predicate: string): Promise - createIndex(): IndexBuilder + createIndex(index: Index | undefined | null, column: string, replace?: boolean | undefined | null): Promise query(): Query addColumns(transforms: Array): Promise alterColumns(alterations: Array): Promise diff --git a/nodejs/lancedb/native.js b/nodejs/lancedb/native.js index a4dedff7..55eb6df5 100644 --- a/nodejs/lancedb/native.js +++ b/nodejs/lancedb/native.js @@ -295,12 +295,10 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Connection, IndexType, MetricType, IndexBuilder, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding +const { Connection, Index, RecordBatchIterator, Query, Table, WriteMode, connect } = nativeBinding module.exports.Connection = Connection -module.exports.IndexType = IndexType -module.exports.MetricType = MetricType -module.exports.IndexBuilder = IndexBuilder +module.exports.Index = Index module.exports.RecordBatchIterator = RecordBatchIterator module.exports.Query = Query module.exports.Table = Table diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 5ab7a19a..8bb35f94 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -19,7 +19,7 @@ import { Table as _NativeTable, } from "./native"; import { Query } from "./query"; -import { IndexBuilder } from "./indexer"; +import { IndexOptions } from "./indices"; import { Data, fromDataToBuffer } from "./arrow"; /** @@ -103,24 +103,28 @@ export class Table { await this.inner.delete(predicate); } - /** Create an index over the columns. + /** Create an index to speed up queries. * - * @param {string} column The column to create the index on. If not specified, - * it will create an index on vector field. + * Indices can be created on vector columns or scalar columns. + * Indices on vector columns will speed up vector searches. + * Indices on scalar columns will speed up filtering (in both + * vector and non-vector searches) * * @example * - * By default, it creates vector idnex on one vector column. + * If the column has a vector (fixed size list) data type then + * an IvfPq vector index will be created. * * ```typescript * const table = await conn.openTable("my_table"); - * await table.createIndex().build(); + * await table.createIndex(["vector"]); * ``` * - * You can specify `IVF_PQ` parameters via `ivf_pq({})` call. + * For advanced control over vector index creation you can specify + * the index type and options. * ```typescript * const table = await conn.openTable("my_table"); - * await table.createIndex("my_vec_col") + * await table.createIndex(["vector"], I) * .ivf_pq({ num_partitions: 128, num_sub_vectors: 16 }) * .build(); * ``` @@ -131,12 +135,11 @@ export class Table { * await table.createIndex("my_float_col").build(); * ``` */ - createIndex(column?: string): IndexBuilder { - let builder = new IndexBuilder(this.inner); - if (column !== undefined) { - builder = builder.column(column); - } - return builder; + async createIndex(column: string, options?: Partial) { + // Bit of a hack to get around the fact that TS has no package-scope. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const nativeIndex = (options?.config as any)?.inner; + await this.inner.createIndex(nativeIndex, column, options?.replace); } /** diff --git a/nodejs/src/error.rs b/nodejs/src/error.rs new file mode 100644 index 00000000..ddbb4471 --- /dev/null +++ b/nodejs/src/error.rs @@ -0,0 +1,12 @@ +pub type Result = napi::Result; + +pub trait NapiErrorExt { + /// Convert to a napi error using from_reason(err.to_string()) + fn default_error(self) -> Result; +} + +impl NapiErrorExt for std::result::Result { + fn default_error(self) -> Result { + self.map_err(|err| napi::Error::from_reason(err.to_string())) + } +} diff --git a/nodejs/src/index.rs b/nodejs/src/index.rs index 7c9864f0..bba79b14 100644 --- a/nodejs/src/index.rs +++ b/nodejs/src/index.rs @@ -14,126 +14,73 @@ use std::sync::Mutex; -use lance_linalg::distance::MetricType as LanceMetricType; -use lancedb::index::IndexBuilder as LanceDbIndexBuilder; -use lancedb::Table as LanceDbTable; +use lancedb::index::scalar::BTreeIndexBuilder; +use lancedb::index::vector::IvfPqIndexBuilder; +use lancedb::index::Index as LanceDbIndex; +use lancedb::DistanceType; use napi_derive::napi; #[napi] -pub enum IndexType { - Scalar, - IvfPq, +pub struct Index { + inner: Mutex>, } -#[napi] -pub enum MetricType { - L2, - Cosine, - Dot, -} - -impl From for LanceMetricType { - fn from(metric: MetricType) -> Self { - match metric { - MetricType::L2 => Self::L2, - MetricType::Cosine => Self::Cosine, - MetricType::Dot => Self::Dot, - } +impl Index { + pub fn consume(&self) -> napi::Result { + self.inner + .lock() + .unwrap() + .take() + .ok_or(napi::Error::from_reason( + "attempt to use an index more than once", + )) } } #[napi] -pub struct IndexBuilder { - inner: Mutex>, -} - -impl IndexBuilder { - fn modify( - &self, - mod_fn: impl Fn(LanceDbIndexBuilder) -> LanceDbIndexBuilder, - ) -> napi::Result<()> { - let mut inner = self.inner.lock().unwrap(); - let inner_builder = inner.take().ok_or_else(|| { - napi::Error::from_reason("IndexBuilder has already been consumed".to_string()) - })?; - let inner_builder = mod_fn(inner_builder); - inner.replace(inner_builder); - Ok(()) - } -} - -#[napi] -impl IndexBuilder { - pub fn new(tbl: &LanceDbTable) -> Self { - let inner = tbl.create_index(&[]); - Self { - inner: Mutex::new(Some(inner)), - } - } - - #[napi] - pub fn replace(&self, v: bool) -> napi::Result<()> { - self.modify(|b| b.replace(v)) - } - - #[napi] - pub fn column(&self, c: String) -> napi::Result<()> { - self.modify(|b| b.columns(&[c.as_str()])) - } - - #[napi] - pub fn name(&self, name: String) -> napi::Result<()> { - self.modify(|b| b.name(name.as_str())) - } - - #[napi] +impl Index { + #[napi(factory)] pub fn ivf_pq( - &self, - metric_type: Option, + distance_type: Option, num_partitions: Option, num_sub_vectors: Option, - num_bits: Option, max_iterations: Option, sample_rate: Option, - ) -> napi::Result<()> { - self.modify(|b| { - let mut b = b.ivf_pq(); - if let Some(metric_type) = metric_type { - b = b.metric_type(metric_type.into()); - } - if let Some(num_partitions) = num_partitions { - b = b.num_partitions(num_partitions); - } - if let Some(num_sub_vectors) = num_sub_vectors { - b = b.num_sub_vectors(num_sub_vectors); - } - if let Some(num_bits) = num_bits { - b = b.num_bits(num_bits); - } - if let Some(max_iterations) = max_iterations { - b = b.max_iterations(max_iterations); - } - if let Some(sample_rate) = sample_rate { - b = b.sample_rate(sample_rate); - } - b + ) -> napi::Result { + let mut ivf_pq_builder = IvfPqIndexBuilder::default(); + if let Some(distance_type) = distance_type { + let distance_type = match distance_type.as_str() { + "l2" => Ok(DistanceType::L2), + "cosine" => Ok(DistanceType::Cosine), + "dot" => Ok(DistanceType::Dot), + _ => Err(napi::Error::from_reason(format!( + "Invalid distance type '{}'. Must be one of l2, cosine, or dot", + distance_type + ))), + }?; + ivf_pq_builder = ivf_pq_builder.distance_type(distance_type); + } + if let Some(num_partitions) = num_partitions { + ivf_pq_builder = ivf_pq_builder.num_partitions(num_partitions); + } + if let Some(num_sub_vectors) = num_sub_vectors { + ivf_pq_builder = ivf_pq_builder.num_sub_vectors(num_sub_vectors); + } + if let Some(max_iterations) = max_iterations { + ivf_pq_builder = ivf_pq_builder.max_iterations(max_iterations); + } + if let Some(sample_rate) = sample_rate { + ivf_pq_builder = ivf_pq_builder.sample_rate(sample_rate); + } + Ok(Self { + inner: Mutex::new(Some(LanceDbIndex::IvfPq(ivf_pq_builder))), }) } - #[napi] - pub fn scalar(&self) -> napi::Result<()> { - self.modify(|b| b.scalar()) - } - - #[napi] - pub async fn build(&self) -> napi::Result<()> { - let inner = self.inner.lock().unwrap().take().ok_or_else(|| { - napi::Error::from_reason("IndexBuilder has already been consumed".to_string()) - })?; - inner - .build() - .await - .map_err(|e| napi::Error::from_reason(format!("Failed to build index: {}", e)))?; - Ok(()) + #[napi(factory)] + pub fn btree() -> Self { + Self { + inner: Mutex::new(Some(LanceDbIndex::BTree(BTreeIndexBuilder::default()))), + } } } diff --git a/nodejs/src/iterator.rs b/nodejs/src/iterator.rs index 55ee0dca..0fbe6b2e 100644 --- a/nodejs/src/iterator.rs +++ b/nodejs/src/iterator.rs @@ -13,7 +13,7 @@ // limitations under the License. use futures::StreamExt; -use lance::io::RecordBatchStream; +use lancedb::arrow::SendableRecordBatchStream; use lancedb::ipc::batches_to_ipc_file; use napi::bindgen_prelude::*; use napi_derive::napi; @@ -21,12 +21,12 @@ use napi_derive::napi; /** Typescript-style Async Iterator over RecordBatches */ #[napi] pub struct RecordBatchIterator { - inner: Box, + inner: SendableRecordBatchStream, } #[napi] impl RecordBatchIterator { - pub(crate) fn new(inner: Box) -> Self { + pub(crate) fn new(inner: SendableRecordBatchStream) -> Self { Self { inner } } diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 13e7453f..08a0045c 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -16,6 +16,7 @@ use connection::Connection; use napi_derive::*; mod connection; +mod error; mod index; mod iterator; mod query; diff --git a/nodejs/src/query.rs b/nodejs/src/query.rs index 6710fd03..07017d18 100644 --- a/nodejs/src/query.rs +++ b/nodejs/src/query.rs @@ -74,6 +74,6 @@ impl Query { let inner_stream = self.inner.execute_stream().await.map_err(|e| { napi::Error::from_reason(format!("Failed to execute query stream: {}", e)) })?; - Ok(RecordBatchIterator::new(Box::new(inner_stream))) + Ok(RecordBatchIterator::new(inner_stream)) } } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 66489b28..80afd63a 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -13,13 +13,16 @@ // limitations under the License. use arrow_ipc::writer::FileWriter; -use lance::dataset::ColumnAlteration as LanceColumnAlteration; use lancedb::ipc::ipc_file_to_batches; -use lancedb::table::{AddDataMode, Table as LanceDbTable}; +use lancedb::table::{ + AddDataMode, ColumnAlteration as LanceColumnAlteration, NewColumnTransform, + Table as LanceDbTable, +}; use napi::bindgen_prelude::*; use napi_derive::napi; -use crate::index::IndexBuilder; +use crate::error::NapiErrorExt; +use crate::index::Index; use crate::query::Query; #[napi] @@ -129,8 +132,22 @@ impl Table { } #[napi] - pub fn create_index(&self) -> napi::Result { - Ok(IndexBuilder::new(self.inner_ref()?)) + pub async fn create_index( + &self, + index: Option<&Index>, + column: String, + replace: Option, + ) -> napi::Result<()> { + let lancedb_index = if let Some(index) = index { + index.consume()? + } else { + lancedb::index::Index::Auto + }; + let mut builder = self.inner_ref()?.create_index(&[column], lancedb_index); + if let Some(replace) = replace { + builder = builder.replace(replace); + } + builder.execute().await.default_error() } #[napi] @@ -144,7 +161,7 @@ impl Table { .into_iter() .map(|sql| (sql.name, sql.value_sql)) .collect::>(); - let transforms = lance::dataset::NewColumnTransform::SqlExpressions(transforms); + let transforms = NewColumnTransform::SqlExpressions(transforms); self.inner_ref()? .add_columns(transforms, None) .await diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 5f9def39..025adc1a 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -23,8 +23,9 @@ from ._lancedb import connect as lancedb_connect from .common import URI, sanitize_uri from .db import AsyncConnection, DBConnection, LanceDBConnection from .remote.db import RemoteDBConnection -from .schema import vector # noqa: F401 -from .utils import sentry_log # noqa: F401 +from .schema import vector +from .table import AsyncTable +from .utils import sentry_log def connect( @@ -188,3 +189,19 @@ async def connect_async( read_consistency_interval_secs, ) ) + + +__all__ = [ + "connect", + "connect_async", + "AsyncConnection", + "AsyncTable", + "URI", + "sanitize_uri", + "sentry_log", + "vector", + "DBConnection", + "LanceDBConnection", + "RemoteDBConnection", + "__version__", +] diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 2c9733b8..f5e95d30 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -2,6 +2,18 @@ from typing import Optional import pyarrow as pa +class Index: + @staticmethod + def ivf_pq( + distance_type: Optional[str], + num_partitions: Optional[int], + num_sub_vectors: Optional[int], + max_iterations: Optional[int], + sample_rate: Optional[int], + ) -> Index: ... + @staticmethod + def btree() -> Index: ... + class Connection(object): async def table_names( self, start_after: Optional[str], limit: Optional[int] @@ -13,10 +25,15 @@ class Connection(object): self, name: str, mode: str, schema: pa.Schema ) -> Table: ... -class Table(object): +class Table: def name(self) -> str: ... def __repr__(self) -> str: ... async def schema(self) -> pa.Schema: ... + async def add(self, data: pa.RecordBatchReader, mode: str) -> None: ... + async def count_rows(self, filter: Optional[str]) -> int: ... + async def create_index( + self, column: str, config: Optional[Index], replace: Optional[bool] + ): ... async def connect( uri: str, diff --git a/python/python/lancedb/index.py b/python/python/lancedb/index.py new file mode 100644 index 00000000..d290bd86 --- /dev/null +++ b/python/python/lancedb/index.py @@ -0,0 +1,157 @@ +from typing import Optional + +from ._lancedb import ( + Index as LanceDbIndex, +) + + +class BTree(object): + """Describes a btree index configuration + + A btree index is an index on scalar columns. The index stores a copy of the + column in sorted order. A header entry is created for each block of rows + (currently the block size is fixed at 4096). These header entries are stored + in a separate cacheable structure (a btree). To search for data the header is + used to determine which blocks need to be read from disk. + + For example, a btree index in a table with 1Bi rows requires + sizeof(Scalar) * 256Ki bytes of memory and will generally need to read + sizeof(Scalar) * 4096 bytes to find the correct row ids. + + This index is good for scalar columns with mostly distinct values and does best + when the query is highly selective. + + The btree index does not currently have any parameters though parameters such as + the block size may be added in the future. + """ + + def __init__(self): + self._inner = LanceDbIndex.btree() + + +class IvfPq(object): + """Describes an IVF PQ Index + + This index stores a compressed (quantized) copy of every vector. These vectors + are grouped into partitions of similar vectors. Each partition keeps track of + a centroid which is the average value of all vectors in the group. + + During a query the centroids are compared with the query vector to find the + closest partitions. The compressed vectors in these partitions are then + searched to find the closest vectors. + + The compression scheme is called product quantization. Each vector is divide + into subvectors and then each subvector is quantized into a small number of + bits. the parameters `num_bits` and `num_subvectors` control this process, + providing a tradeoff between index size (and thus search speed) and index + accuracy. + + The partitioning process is called IVF and the `num_partitions` parameter + controls how many groups to create. + + Note that training an IVF PQ index on a large dataset is a slow operation and + currently is also a memory intensive operation. + """ + + def __init__( + self, + *, + distance_type: Optional[str] = None, + num_partitions: Optional[int] = None, + num_sub_vectors: Optional[int] = None, + max_iterations: Optional[int] = None, + sample_rate: Optional[int] = None, + ): + """ + Create an IVF PQ index config + + Parameters + ---------- + distance_type: str, default "L2" + The distance metric used to train the index + + This is used when training the index to calculate the IVF partitions + (vectors are grouped in partitions with similar vectors according to this + distance type) and to calculate a subvector's code during quantization. + + The distance type used to train an index MUST match the distance type used + to search the index. Failure to do so will yield inaccurate results. + + The following distance types are available: + + "l2" - Euclidean distance. This is a very common distance metric that + accounts for both magnitude and direction when determining the distance + between vectors. L2 distance has a range of [0, ∞). + + "cosine" - Cosine distance. Cosine distance is a distance metric + calculated from the cosine similarity between two vectors. Cosine + similarity is a measure of similarity between two non-zero vectors of an + inner product space. It is defined to equal the cosine of the angle + between them. Unlike L2, the cosine distance is not affected by the + magnitude of the vectors. Cosine distance has a range of [0, 2]. + + Note: the cosine distance is undefined when one (or both) of the vectors + are all zeros (there is no direction). These vectors are invalid and may + never be returned from a vector search. + + "dot" - Dot product. Dot distance is the dot product of two vectors. Dot + distance has a range of (-∞, ∞). If the vectors are normalized (i.e. their + L2 norm is 1), then dot distance is equivalent to the cosine distance. + num_partitions: int, default sqrt(num_rows) + The number of IVF partitions to create. + + This value should generally scale with the number of rows in the dataset. + By default the number of partitions is the square root of the number of + rows. + + If this value is too large then the first part of the search (picking the + right partition) will be slow. If this value is too small then the second + part of the search (searching within a partition) will be slow. + num_sub_vectors: int, default is vector dimension / 16 + Number of sub-vectors of PQ. + + This value controls how much the vector is compressed during the + quantization step. The more sub vectors there are the less the vector is + compressed. The default is the dimension of the vector divided by 16. If + the dimension is not evenly divisible by 16 we use the dimension divded by + 8. + + The above two cases are highly preferred. Having 8 or 16 values per + subvector allows us to use efficient SIMD instructions. + + If the dimension is not visible by 8 then we use 1 subvector. This is not + ideal and will likely result in poor performance. + max_iterations: int, default 50 + Max iteration to train kmeans. + + When training an IVF PQ index we use kmeans to calculate the partitions. + This parameter controls how many iterations of kmeans to run. + + Increasing this might improve the quality of the index but in most cases + these extra iterations have diminishing returns. + + The default value is 50. + sample_rate: int, default 256 + The rate used to calculate the number of training vectors for kmeans. + + When an IVF PQ index is trained, we need to calculate partitions. These + are groups of vectors that are similar to each other. To do this we use an + algorithm called kmeans. + + Running kmeans on a large dataset can be slow. To speed this up we run + kmeans on a random sample of the data. This parameter controls the size of + the sample. The total number of vectors used to train the index is + `sample_rate * num_partitions`. + + Increasing this value might improve the quality of the index but in most + cases the default should be sufficient. + + The default value is 256. + """ + self._inner = LanceDbIndex.ivf_pq( + distance_type=distance_type, + num_partitions=num_partitions, + num_sub_vectors=num_sub_vectors, + max_iterations=max_iterations, + sample_rate=sample_rate, + ) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 7a910431..c0dfea74 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -60,6 +60,7 @@ if TYPE_CHECKING: from ._lancedb import Table as LanceDBTable from .db import LanceDBConnection + from .index import BTree, IvfPq pd = safe_import_pandas() @@ -1917,112 +1918,48 @@ class AsyncTable: raise NotImplementedError async def create_index( - self, - metric="L2", - num_partitions=256, - num_sub_vectors=96, - vector_column_name: str = VECTOR_COLUMN_NAME, - replace: bool = True, - accelerator: Optional[str] = None, - index_cache_size: Optional[int] = None, - ): - """Create an index on the table. - - Parameters - ---------- - metric: str, default "L2" - The distance metric to use when creating the index. - Valid values are "L2", "cosine", or "dot". - L2 is euclidean distance. - num_partitions: int, default 256 - The number of IVF partitions to use when creating the index. - Default is 256. - num_sub_vectors: int, default 96 - The number of PQ sub-vectors to use when creating the index. - Default is 96. - vector_column_name: str, default "vector" - The vector column name to create the index. - replace: bool, default True - - If True, replace the existing index if it exists. - - - If False, raise an error if duplicate index exists. - accelerator: str, default None - If set, use the given accelerator to create the index. - Only support "cuda" for now. - index_cache_size : int, optional - The size of the index cache in number of entries. Default value is 256. - """ - raise NotImplementedError - - async def create_scalar_index( self, column: str, *, - replace: bool = True, + replace: Optional[bool] = None, + config: Optional[Union[IvfPq, BTree]] = None, ): - """Create a scalar index on a column. + """Create an index to speed up queries - Scalar indices, like vector indices, can be used to speed up scans. A scalar - index can speed up scans that contain filter expressions on the indexed column. - For example, the following scan will be faster if the column ``my_col`` has - a scalar index: - - .. code-block:: python - - import lancedb - - db = lancedb.connect("/data/lance") - img_table = db.open_table("images") - my_df = img_table.search().where("my_col = 7", prefilter=True).to_pandas() - - Scalar indices can also speed up scans containing a vector search and a - prefilter: - - .. code-block::python - - import lancedb - - db = lancedb.connect("/data/lance") - img_table = db.open_table("images") - img_table.search([1, 2, 3, 4], vector_column_name="vector") - .where("my_col != 7", prefilter=True) - .to_pandas() - - Scalar indices can only speed up scans for basic filters using - equality, comparison, range (e.g. ``my_col BETWEEN 0 AND 100``), and set - membership (e.g. `my_col IN (0, 1, 2)`) - - Scalar indices can be used if the filter contains multiple indexed columns and - the filter criteria are AND'd or OR'd together - (e.g. ``my_col < 0 AND other_col> 100``) - - Scalar indices may be used if the filter contains non-indexed columns but, - depending on the structure of the filter, they may not be usable. For example, - if the column ``not_indexed`` does not have a scalar index then the filter - ``my_col = 0 OR not_indexed = 1`` will not be able to use any scalar index on - ``my_col``. - - **Experimental API** + Indices can be created on vector columns or scalar columns. + Indices on vector columns will speed up vector searches. + Indices on scalar columns will speed up filtering (in both + vector and non-vector searches) Parameters ---------- - column : str - The column to be indexed. Must be a boolean, integer, float, - or string column. - replace : bool, default True - Replace the existing index if it exists. + index: Index + The index to create. - Examples - -------- + LanceDb supports multiple types of indices. See the static methods on + the Index class for more details. + column: str, default None + The column to index. - .. code-block:: python + When building a scalar index this must be set. - import lance + When building a vector index, this is optional. The default will look + for any columns of type fixed-size-list with floating point values. If + there is only one column of this type then it will be used. Otherwise + an error will be returned. + replace: bool, default True + Whether to replace the existing index - dataset = lance.dataset("./images.lance") - dataset.create_scalar_index("category") + If this is false, and another index already exists on the same columns + and the same name, then an error will be returned. This is true even if + that index is out of date. + + The default is True """ - raise NotImplementedError + index = None + if config is not None: + index = config._inner + await self._inner.create_index(column, index=index, replace=replace) async def add( self, @@ -2066,6 +2003,8 @@ class AsyncTable: on_bad_vectors=on_bad_vectors, fill_value=fill_value, ) + if isinstance(data, pa.Table): + data = pa.RecordBatchReader.from_batches(data.schema, data.to_batches()) await self._inner.add(data, mode) register_event("add") diff --git a/python/python/tests/test_index.py b/python/python/tests/test_index.py new file mode 100644 index 00000000..baac0ae3 --- /dev/null +++ b/python/python/tests/test_index.py @@ -0,0 +1,61 @@ +from datetime import timedelta + +import pyarrow as pa +import pytest +import pytest_asyncio +from lancedb import AsyncConnection, AsyncTable, connect_async +from lancedb.index import BTree, IvfPq + + +@pytest_asyncio.fixture +async def db_async(tmp_path) -> AsyncConnection: + return await connect_async(tmp_path, read_consistency_interval=timedelta(seconds=0)) + + +def sample_fixed_size_list_array(nrows, dim): + vector_data = pa.array([float(i) for i in range(dim * nrows)], pa.float32()) + return pa.FixedSizeListArray.from_arrays(vector_data, dim) + + +DIM = 8 +NROWS = 256 + + +@pytest_asyncio.fixture +async def some_table(db_async): + data = pa.Table.from_pydict( + { + "id": list(range(256)), + "vector": sample_fixed_size_list_array(NROWS, DIM), + } + ) + return await db_async.create_table( + "some_table", + data, + ) + + +@pytest.mark.asyncio +async def test_create_scalar_index(some_table: AsyncTable): + # Can create + await some_table.create_index("id") + # Can recreate if replace=True + await some_table.create_index("id", replace=True) + # Can't recreate if replace=False + with pytest.raises(RuntimeError, match="already exists"): + await some_table.create_index("id", replace=False) + # can also specify index type + await some_table.create_index("id", config=BTree()) + + +@pytest.mark.asyncio +async def test_create_vector_index(some_table: AsyncTable): + # Can create + await some_table.create_index("vector") + # Can recreate if replace=True + await some_table.create_index("vector", replace=True) + # Can't recreate if replace=False + with pytest.raises(RuntimeError, match="already exists"): + await some_table.create_index("vector", replace=False) + # Can also specify index type + await some_table.create_index("vector", config=IvfPq(num_partitions=100)) diff --git a/python/src/index.rs b/python/src/index.rs new file mode 100644 index 00000000..6b75a595 --- /dev/null +++ b/python/src/index.rs @@ -0,0 +1,87 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; + +use lancedb::{ + index::{scalar::BTreeIndexBuilder, vector::IvfPqIndexBuilder, Index as LanceDbIndex}, + DistanceType, +}; +use pyo3::{ + exceptions::{PyRuntimeError, PyValueError}, + pyclass, pymethods, PyResult, +}; + +#[pyclass] +pub struct Index { + inner: Mutex>, +} + +impl Index { + pub fn consume(&self) -> PyResult { + self.inner + .lock() + .unwrap() + .take() + .ok_or_else(|| PyRuntimeError::new_err("cannot use an Index more than once")) + } +} + +#[pymethods] +impl Index { + #[staticmethod] + pub fn ivf_pq( + distance_type: Option, + num_partitions: Option, + num_sub_vectors: Option, + max_iterations: Option, + sample_rate: Option, + ) -> PyResult { + let mut ivf_pq_builder = IvfPqIndexBuilder::default(); + if let Some(distance_type) = distance_type { + let distance_type = match distance_type.as_str() { + "l2" => Ok(DistanceType::L2), + "cosine" => Ok(DistanceType::Cosine), + "dot" => Ok(DistanceType::Dot), + _ => Err(PyValueError::new_err(format!( + "Invalid distance type '{}'. Must be one of l2, cosine, or dot", + distance_type + ))), + }?; + ivf_pq_builder = ivf_pq_builder.distance_type(distance_type); + } + if let Some(num_partitions) = num_partitions { + ivf_pq_builder = ivf_pq_builder.num_partitions(num_partitions); + } + if let Some(num_sub_vectors) = num_sub_vectors { + ivf_pq_builder = ivf_pq_builder.num_sub_vectors(num_sub_vectors); + } + if let Some(max_iterations) = max_iterations { + ivf_pq_builder = ivf_pq_builder.max_iterations(max_iterations); + } + if let Some(sample_rate) = sample_rate { + ivf_pq_builder = ivf_pq_builder.sample_rate(sample_rate); + } + Ok(Self { + inner: Mutex::new(Some(LanceDbIndex::IvfPq(ivf_pq_builder))), + }) + } + + #[staticmethod] + pub fn btree() -> PyResult { + Ok(Self { + inner: Mutex::new(Some(LanceDbIndex::BTree(BTreeIndexBuilder::default()))), + }) + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index fa2f5fc4..b900446c 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -14,11 +14,15 @@ use connection::{connect, Connection}; use env_logger::Env; +use index::Index; use pyo3::{pymodule, types::PyModule, wrap_pyfunction, PyResult, Python}; +use table::Table; pub mod connection; pub mod error; +pub mod index; pub mod table; +pub mod util; #[pymodule] pub fn _lancedb(_py: Python, m: &PyModule) -> PyResult<()> { @@ -27,6 +31,8 @@ pub fn _lancedb(_py: Python, m: &PyModule) -> PyResult<()> { .write_style("LANCEDB_LOG_STYLE"); env_logger::init_from_env(env); m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(connect, m)?)?; m.add("__version__", env!("CARGO_PKG_VERSION"))?; Ok(()) diff --git a/python/src/table.rs b/python/src/table.rs index 7fcf1a5f..93f4fda2 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -9,7 +9,7 @@ use pyo3::{ }; use pyo3_asyncio::tokio::future_into_py; -use crate::error::PythonErrorExt; +use crate::{error::PythonErrorExt, index::Index}; #[pyclass] pub struct Table { @@ -81,6 +81,28 @@ impl Table { }) } + pub fn create_index<'a>( + self_: PyRef<'a, Self>, + column: String, + index: Option<&Index>, + replace: Option, + ) -> PyResult<&'a PyAny> { + let index = if let Some(index) = index { + index.consume()? + } else { + lancedb::index::Index::Auto + }; + let mut op = self_.inner_ref()?.create_index(&[column], index); + if let Some(replace) = replace { + op = op.replace(replace); + } + + future_into_py(self_.py(), async move { + op.execute().await.infer_error()?; + Ok(()) + }) + } + pub fn __repr__(&self) -> String { match &self.inner { None => format!("ClosedTable({})", self.name), diff --git a/python/src/util.rs b/python/src/util.rs new file mode 100644 index 00000000..df9ab5d0 --- /dev/null +++ b/python/src/util.rs @@ -0,0 +1,35 @@ +use std::sync::Mutex; + +use pyo3::{exceptions::PyRuntimeError, PyResult}; + +/// A wrapper around a rust builder +/// +/// Rust builders are often implemented so that the builder methods +/// consume the builder and return a new one. This is not compatible +/// with the pyo3, which, being garbage collected, cannot easily obtain +/// ownership of an object. +/// +/// This wrapper converts the compile-time safety of rust into runtime +/// errors if any attempt to use the builder happens after it is consumed. +pub struct BuilderWrapper { + name: String, + inner: Mutex>, +} + +impl BuilderWrapper { + pub fn new(name: impl AsRef, inner: T) -> Self { + Self { + name: name.as_ref().to_string(), + inner: Mutex::new(Some(inner)), + } + } + + pub fn consume(&self, mod_fn: impl FnOnce(T) -> O) -> PyResult { + let mut inner = self.inner.lock().unwrap(); + let inner_builder = inner.take().ok_or_else(|| { + PyRuntimeError::new_err(format!("{} has already been consumed", self.name)) + })?; + let result = mod_fn(inner_builder); + Ok(result) + } +} diff --git a/rust/ffi/node/src/index/scalar.rs b/rust/ffi/node/src/index/scalar.rs index 3babdda7..10937467 100644 --- a/rust/ffi/node/src/index/scalar.rs +++ b/rust/ffi/node/src/index/scalar.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use lancedb::index::{scalar::BTreeIndexBuilder, Index}; use neon::{ context::{Context, FunctionContext}, result::JsResult, @@ -33,9 +34,9 @@ pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult rt.spawn(async move { let idx_result = table - .create_index(&[&column]) + .create_index(&[column], Index::BTree(BTreeIndexBuilder::default())) .replace(replace) - .build() + .execute() .await; deferred.settle_with(&channel, move |mut cx| { diff --git a/rust/ffi/node/src/index/vector.rs b/rust/ffi/node/src/index/vector.rs index 8c6698bf..3190ee2d 100644 --- a/rust/ffi/node/src/index/vector.rs +++ b/rust/ffi/node/src/index/vector.rs @@ -13,12 +13,12 @@ // limitations under the License. use lance_linalg::distance::MetricType; -use lancedb::index::IndexBuilder; +use lancedb::index::vector::IvfPqIndexBuilder; +use lancedb::index::Index; use neon::context::FunctionContext; use neon::prelude::*; use std::convert::TryFrom; -use crate::error::Error::InvalidIndexType; use crate::error::ResultExt; use crate::neon_ext::js_object_ext::JsObjectExt; use crate::runtime; @@ -39,13 +39,20 @@ pub fn table_create_vector_index(mut cx: FunctionContext) -> JsResult .map(|s| s.value(&mut cx)) .unwrap_or("vector".to_string()); // Backward compatibility + let replace = index_params + .get_opt::(&mut cx, "replace")? + .map(|r| r.value(&mut cx)); + let tbl = table.clone(); - let index_builder = tbl.create_index(&[&column_name]); - let index_builder = - get_index_params_builder(&mut cx, index_params, index_builder).or_throw(&mut cx)?; + let ivf_pq_builder = get_index_params_builder(&mut cx, index_params).or_throw(&mut cx)?; + + let mut index_builder = tbl.create_index(&[column_name], Index::IvfPq(ivf_pq_builder)); + if let Some(replace) = replace { + index_builder = index_builder.replace(replace); + } rt.spawn(async move { - let idx_result = index_builder.build().await; + let idx_result = index_builder.execute().await; deferred.settle_with(&channel, move |mut cx| { idx_result.or_throw(&mut cx)?; Ok(cx.boxed(JsTable::from(table))) @@ -57,26 +64,17 @@ pub fn table_create_vector_index(mut cx: FunctionContext) -> JsResult fn get_index_params_builder( cx: &mut FunctionContext, obj: Handle, - builder: IndexBuilder, -) -> crate::error::Result { - let mut builder = match obj.get::(cx, "type")?.value(cx).as_str() { - "ivf_pq" => builder.ivf_pq(), - _ => { - return Err(InvalidIndexType { - index_type: "".into(), - }) - } - }; - - if let Some(index_name) = obj.get_opt::(cx, "index_name")? { - builder = builder.name(index_name.value(cx).as_str()); +) -> crate::error::Result { + if obj.get_opt::(cx, "index_name")?.is_some() { + return Err(crate::error::Error::LanceDB { + message: "Setting the index_name is no longer supported".to_string(), + }); } - + let mut builder = IvfPqIndexBuilder::default(); if let Some(metric_type) = obj.get_opt::(cx, "metric_type")? { let metric_type = MetricType::try_from(metric_type.value(cx).as_str())?; - builder = builder.metric_type(metric_type); + builder = builder.distance_type(metric_type); } - if let Some(np) = obj.get_opt_u32(cx, "num_partitions")? { builder = builder.num_partitions(np); } @@ -86,11 +84,5 @@ fn get_index_params_builder( if let Some(max_iters) = obj.get_opt_u32(cx, "max_iters")? { builder = builder.max_iterations(max_iters); } - if let Some(num_bits) = obj.get_opt_u32(cx, "num_bits")? { - builder = builder.num_bits(num_bits); - } - if let Some(replace) = obj.get_opt::(cx, "replace")? { - builder = builder.replace(replace.value(cx)); - } Ok(builder) } diff --git a/rust/lancedb/Cargo.toml b/rust/lancedb/Cargo.toml index 4ae78026..1e8e29d6 100644 --- a/rust/lancedb/Cargo.toml +++ b/rust/lancedb/Cargo.toml @@ -26,6 +26,7 @@ lance = { workspace = true } lance-index = { workspace = true } lance-linalg = { workspace = true } lance-testing = { workspace = true } +pin-project = { workspace = true } tokio = { version = "1.23", features = ["rt-multi-thread"] } log.workspace = true async-trait = "0" diff --git a/rust/lancedb/examples/simple.rs b/rust/lancedb/examples/simple.rs index f5b4af8c..79dd7012 100644 --- a/rust/lancedb/examples/simple.rs +++ b/rust/lancedb/examples/simple.rs @@ -20,6 +20,7 @@ use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; use lancedb::connection::Connection; +use lancedb::index::Index; use lancedb::{connect, Result, Table as LanceDbTable}; #[tokio::main] @@ -142,23 +143,18 @@ async fn create_empty_table(db: &Connection) -> Result { async fn create_index(table: &LanceDbTable) -> Result<()> { // --8<-- [start:create_index] - table - .create_index(&["vector"]) - .ivf_pq() - .num_partitions(8) - .build() - .await + table.create_index(&["vector"], Index::Auto).execute().await // --8<-- [end:create_index] } async fn search(table: &LanceDbTable) -> Result> { // --8<-- [start:search] - Ok(table + table .search(&[1.0; 128]) .limit(2) .execute_stream() .await? .try_collect::>() - .await?) + .await // --8<-- [end:search] } diff --git a/rust/lancedb/src/arrow.rs b/rust/lancedb/src/arrow.rs index 9e4ae419..15d78ac2 100644 --- a/rust/lancedb/src/arrow.rs +++ b/rust/lancedb/src/arrow.rs @@ -12,4 +12,92 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use lance::arrow::*; \ No newline at end of file +use std::{pin::Pin, sync::Arc}; + +pub use arrow_array; +pub use arrow_schema; +use futures::{Stream, StreamExt}; + +use crate::error::Result; + +/// An iterator of batches that also has a schema +pub trait RecordBatchReader: Iterator> { + /// Returns the schema of this `RecordBatchReader`. + /// + /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this + /// reader should have the same schema as returned from this method. + fn schema(&self) -> Arc; +} + +/// A simple RecordBatchReader formed from the two parts (iterator + schema) +pub struct SimpleRecordBatchReader>> { + pub schema: Arc, + pub batches: I, +} + +impl>> Iterator for SimpleRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.batches.next() + } +} + +impl>> RecordBatchReader + for SimpleRecordBatchReader +{ + fn schema(&self) -> Arc { + self.schema.clone() + } +} + +/// A stream of batches that also has a schema +pub trait RecordBatchStream: Stream> { + /// Returns the schema of this `RecordBatchStream`. + /// + /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this + /// stream should have the same schema as returned from this method. + fn schema(&self) -> Arc; +} + +/// A boxed RecordBatchStream that is also Send +pub type SendableRecordBatchStream = Pin>; + +impl From for SendableRecordBatchStream { + fn from(stream: I) -> Self { + let schema = stream.schema(); + let mapped_stream = Box::pin(stream.map(|r| r.map_err(Into::into))); + Box::pin(SimpleRecordBatchStream { + schema, + stream: mapped_stream, + }) + } +} + +/// A simple RecordBatchStream formed from the two parts (stream + schema) +#[pin_project::pin_project] +pub struct SimpleRecordBatchStream>> { + pub schema: Arc, + #[pin] + pub stream: S, +} + +impl>> Stream for SimpleRecordBatchStream { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.stream.poll_next(cx) + } +} + +impl>> RecordBatchStream + for SimpleRecordBatchStream +{ + fn schema(&self) -> Arc { + self.schema.clone() + } +} diff --git a/rust/lancedb/src/index.rs b/rust/lancedb/src/index.rs index b55ff661..02eeac9a 100644 --- a/rust/lancedb/src/index.rs +++ b/rust/lancedb/src/index.rs @@ -12,181 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{cmp::max, sync::Arc}; - -use lance_index::IndexType; -pub use lance_linalg::distance::MetricType; - -pub mod vector; +use std::sync::Arc; use crate::{table::TableInternal, Result}; -/// Index Parameters. -pub enum IndexParams { - Scalar { - replace: bool, - }, - IvfPq { - replace: bool, - metric_type: MetricType, - num_partitions: u64, - num_sub_vectors: u32, - num_bits: u32, - sample_rate: u32, - max_iterations: u32, - }, +use self::{scalar::BTreeIndexBuilder, vector::IvfPqIndexBuilder}; + +pub mod scalar; +pub mod vector; + +pub enum Index { + Auto, + BTree(BTreeIndexBuilder), + IvfPq(IvfPqIndexBuilder), } -/// Builder for Index Parameters. - +/// Builder for the create_index operation +/// +/// The methods on this builder are used to specify options common to all indices. pub struct IndexBuilder { parent: Arc, + pub(crate) index: Index, pub(crate) columns: Vec, - // General parameters - /// Index name. - pub(crate) name: Option, - /// Replace the existing index. pub(crate) replace: bool, - - pub(crate) index_type: IndexType, - - // Scalar index parameters - // Nothing to set here. - - // IVF_PQ parameters - pub(crate) metric_type: MetricType, - pub(crate) num_partitions: Option, - // PQ related - pub(crate) num_sub_vectors: Option, - pub(crate) num_bits: u32, - - /// The rate to find samples to train kmeans. - pub(crate) sample_rate: u32, - /// Max iteration to train kmeans. - pub(crate) max_iterations: u32, } impl IndexBuilder { - pub(crate) fn new(parent: Arc, columns: &[&str]) -> Self { + pub(crate) fn new(parent: Arc, columns: Vec, index: Index) -> Self { Self { parent, - columns: columns.iter().map(|c| c.to_string()).collect(), - name: None, + index, + columns, replace: true, - index_type: IndexType::Scalar, - metric_type: MetricType::L2, - num_partitions: None, - num_sub_vectors: None, - num_bits: 8, - sample_rate: 256, - max_iterations: 50, } } - /// Build a Scalar Index. + /// Whether to replace the existing index, the default is `true`. /// - /// Accepted parameters: - /// - `replace`: Replace the existing index. - /// - `name`: Index name. Default: `None` - pub fn scalar(mut self) -> Self { - self.index_type = IndexType::Scalar; - self - } - - /// Build an IVF PQ index. - /// - /// Accepted parameters: - /// - `replace`: Replace the existing index. - /// - `name`: Index name. Default: `None` - /// - `metric_type`: [MetricType] to use to build Vector Index. - /// - `num_partitions`: Number of IVF partitions. - /// - `num_sub_vectors`: Number of sub-vectors of PQ. - /// - `num_bits`: Number of bits used for PQ centroids. - /// - `sample_rate`: The rate to find samples to train kmeans. - /// - `max_iterations`: Max iteration to train kmeans. - pub fn ivf_pq(mut self) -> Self { - self.index_type = IndexType::Vector; - self - } - - /// The columns to build index on. - pub fn columns(mut self, cols: &[&str]) -> Self { - self.columns = cols.iter().map(|s| s.to_string()).collect(); - self - } - - /// Whether to replace the existing index, default is `true`. + /// If this is false, and another index already exists on the same columns + /// and the same name, then an error will be returned. This is true even if + /// that index is out of date. pub fn replace(mut self, v: bool) -> Self { self.replace = v; self } - /// Set the index name. - pub fn name(mut self, name: &str) -> Self { - self.name = Some(name.to_string()); - self - } - - /// [MetricType] to use to build Vector Index. - /// - /// Default value is [MetricType::L2]. - pub fn metric_type(mut self, metric_type: MetricType) -> Self { - self.metric_type = metric_type; - self - } - - /// Number of IVF partitions. - pub fn num_partitions(mut self, num_partitions: u32) -> Self { - self.num_partitions = Some(num_partitions); - self - } - - /// Number of sub-vectors of PQ. - pub fn num_sub_vectors(mut self, num_sub_vectors: u32) -> Self { - self.num_sub_vectors = Some(num_sub_vectors); - self - } - - /// Number of bits used for PQ centroids. - pub fn num_bits(mut self, num_bits: u32) -> Self { - self.num_bits = num_bits; - self - } - - /// The rate to find samples to train kmeans. - pub fn sample_rate(mut self, sample_rate: u32) -> Self { - self.sample_rate = sample_rate; - self - } - - /// Max iteration to train kmeans. - pub fn max_iterations(mut self, max_iterations: u32) -> Self { - self.max_iterations = max_iterations; - self - } - - /// Build the parameters. - pub async fn build(self) -> Result<()> { - self.parent.clone().do_create_index(self).await - } -} - -pub(crate) fn suggested_num_partitions(rows: usize) -> u32 { - let num_partitions = (rows as f64).sqrt() as u32; - max(1, num_partitions) -} - -pub(crate) fn suggested_num_sub_vectors(dim: u32) -> u32 { - if dim % 16 == 0 { - // Should be more aggressive than this default. - dim / 16 - } else if dim % 8 == 0 { - dim / 8 - } else { - log::warn!( - "The dimension of the vector is not divisible by 8 or 16, \ - which may cause performance degradation in PQ" - ); - 1 + pub async fn execute(self) -> Result<()> { + self.parent.clone().create_index(self).await } } diff --git a/rust/lancedb/src/index/scalar.rs b/rust/lancedb/src/index/scalar.rs new file mode 100644 index 00000000..7d447cc7 --- /dev/null +++ b/rust/lancedb/src/index/scalar.rs @@ -0,0 +1,30 @@ +//! Scalar indices are exact indices that are used to quickly satisfy a variety of filters +//! against a column of scalar values. +//! +//! Scalar indices are currently supported on numeric, string, boolean, and temporal columns. +//! +//! A scalar index will help with queries with filters like `x > 10`, `x < 10`, `x = 10`, +//! etc. Scalar indices can also speed up prefiltering for vector searches. A single +//! vector search with prefiltering can use both a scalar index and a vector index. + +/// Builder for a btree index +/// +/// A btree index is an index on scalar columns. The index stores a copy of the column +/// in sorted order. A header entry is created for each block of rows (currently the +/// block size is fixed at 4096). These header entries are stored in a separate +/// cacheable structure (a btree). To search for data the header is used to determine +/// which blocks need to be read from disk. +/// +/// For example, a btree index in a table with 1Bi rows requires sizeof(Scalar) * 256Ki +/// bytes of memory and will generally need to read sizeof(Scalar) * 4096 bytes to find +/// the correct row ids. +/// +/// This index is good for scalar columns with mostly distinct values and does best when +/// the query is highly selective. +/// +/// The btree index does not currently have any parameters though parameters such as the +/// block size may be added in the future. +#[derive(Default, Debug, Clone)] +pub struct BTreeIndexBuilder {} + +impl BTreeIndexBuilder {} diff --git a/rust/lancedb/src/index/vector.rs b/rust/lancedb/src/index/vector.rs index cf8771cb..11f25fee 100644 --- a/rust/lancedb/src/index/vector.rs +++ b/rust/lancedb/src/index/vector.rs @@ -12,10 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Vector indices are approximate indices that are used to find rows similar to +//! a query vector. Vector indices speed up vector searches. +//! +//! Vector indices are only supported on fixed-size-list (tensor) columns of floating point +//! values +use std::cmp::max; + use serde::Deserialize; use lance::table::format::{Index, Manifest}; +use crate::DistanceType; + pub struct VectorIndex { pub columns: Vec, pub index_name: String, @@ -42,3 +51,145 @@ pub struct VectorIndexStatistics { pub num_indexed_rows: usize, pub num_unindexed_rows: usize, } + +/// Builder for an IVF PQ index. +/// +/// This index stores a compressed (quantized) copy of every vector. These vectors +/// are grouped into partitions of similar vectors. Each partition keeps track of +/// a centroid which is the average value of all vectors in the group. +/// +/// During a query the centroids are compared with the query vector to find the closest +/// partitions. The compressed vectors in these partitions are then searched to find +/// the closest vectors. +/// +/// The compression scheme is called product quantization. Each vector is divided into +/// subvectors and then each subvector is quantized into a small number of bits. the +/// parameters `num_bits` and `num_subvectors` control this process, providing a tradeoff +/// between index size (and thus search speed) and index accuracy. +/// +/// The partitioning process is called IVF and the `num_partitions` parameter controls how +/// many groups to create. +/// +/// Note that training an IVF PQ index on a large dataset is a slow operation and +/// currently is also a memory intensive operation. +#[derive(Debug, Clone)] +pub struct IvfPqIndexBuilder { + pub(crate) distance_type: DistanceType, + pub(crate) num_partitions: Option, + pub(crate) num_sub_vectors: Option, + pub(crate) sample_rate: u32, + pub(crate) max_iterations: u32, +} + +impl Default for IvfPqIndexBuilder { + fn default() -> Self { + Self { + distance_type: DistanceType::L2, + num_partitions: None, + num_sub_vectors: None, + sample_rate: 256, + max_iterations: 50, + } + } +} + +impl IvfPqIndexBuilder { + /// [DistanceType] to use to build the index. + /// + /// Default value is [DistanceType::L2]. + /// + /// This is used when training the index to calculate the IVF partitions (vectors are + /// grouped in partitions with similar vectors according to this distance type) and to + /// calculate a subvector's code during quantization. + /// + /// The metric type used to train an index MUST match the metric type used to search the + /// index. Failure to do so will yield inaccurate results. + pub fn distance_type(mut self, distance_type: DistanceType) -> Self { + self.distance_type = distance_type; + self + } + + /// The number of IVF partitions to create. + /// + /// This value should generally scale with the number of rows in the dataset. By default + /// the number of partitions is the square root of the number of rows. + /// + /// If this value is too large then the first part of the search (picking the right partition) + /// will be slow. If this value is too small then the second part of the search (searching + /// within a partition) will be slow. + pub fn num_partitions(mut self, num_partitions: u32) -> Self { + self.num_partitions = Some(num_partitions); + self + } + + /// Number of sub-vectors of PQ. + /// + /// This value controls how much the vector is compressed during the quantization step. + /// The more sub vectors there are the less the vector is compressed. The default is + /// the dimension of the vector divided by 16. If the dimension is not evenly divisible + /// by 16 we use the dimension divded by 8. + /// + /// The above two cases are highly preferred. Having 8 or 16 values per subvector allows + /// us to use efficient SIMD instructions. + /// + /// If the dimension is not visible by 8 then we use 1 subvector. This is not ideal and + /// will likely result in poor performance. + pub fn num_sub_vectors(mut self, num_sub_vectors: u32) -> Self { + self.num_sub_vectors = Some(num_sub_vectors); + self + } + + /// The rate used to calculate the number of training vectors for kmeans. + /// + /// When an IVF PQ index is trained, we need to calculate partitions. These are groups + /// of vectors that are similar to each other. To do this we use an algorithm called kmeans. + /// + /// Running kmeans on a large dataset can be slow. To speed this up we run kmeans on a + /// random sample of the data. This parameter controls the size of the sample. The total + /// number of vectors used to train the index is `sample_rate * num_partitions`. + /// + /// Increasing this value might improve the quality of the index but in most cases the + /// default should be sufficient. + /// + /// The default value is 256. + pub fn sample_rate(mut self, sample_rate: u32) -> Self { + self.sample_rate = sample_rate; + self + } + + /// Max iterations to train kmeans. + /// + /// When training an IVF PQ index we use kmeans to calculate the partitions. This parameter + /// controls how many iterations of kmeans to run. + /// + /// Increasing this might improve the quality of the index but in most cases the parameter + /// is unused because kmeans will converge with fewer iterations. The parameter is only + /// used in cases where kmeans does not appear to converge. In those cases it is unlikely + /// that setting this larger will lead to the index converging anyways. + /// + /// The default value is 50. + pub fn max_iterations(mut self, max_iterations: u32) -> Self { + self.max_iterations = max_iterations; + self + } +} + +pub(crate) fn suggested_num_partitions(rows: usize) -> u32 { + let num_partitions = (rows as f64).sqrt() as u32; + max(1, num_partitions) +} + +pub(crate) fn suggested_num_sub_vectors(dim: u32) -> u32 { + if dim % 16 == 0 { + // Should be more aggressive than this default. + dim / 16 + } else if dim % 8 == 0 { + dim / 8 + } else { + log::warn!( + "The dimension of the vector is not divisible by 8 or 16, \ + which may cause performance degradation in PQ" + ); + 1 + } +} diff --git a/rust/lancedb/src/lib.rs b/rust/lancedb/src/lib.rs index 817f0329..a467a636 100644 --- a/rust/lancedb/src/lib.rs +++ b/rust/lancedb/src/lib.rs @@ -130,16 +130,15 @@ //! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch, //! # RecordBatchIterator, Int32Array}; //! # use arrow_schema::{Schema, Field, DataType}; +//! use lancedb::index::Index; //! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # let tmpdir = tempfile::tempdir().unwrap(); //! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap(); //! # let tbl = db.open_table("idx_test").execute().await.unwrap(); -//! tbl.create_index(&["vector"]) -//! .ivf_pq() -//! .num_partitions(256) -//! .build() -//! .await -//! .unwrap(); +//! tbl.create_index(&["vector"], Index::Auto) +//! .execute() +//! .await +//! .unwrap(); //! # }); //! ``` //! @@ -181,6 +180,7 @@ //! # }); //! ``` +pub mod arrow; pub mod connection; pub mod data; pub mod error; @@ -194,6 +194,7 @@ pub mod table; pub mod utils; pub use error::{Error, Result}; +pub use lance_linalg::distance::DistanceType; pub use table::Table; /// Connect to a database diff --git a/rust/lancedb/src/query.rs b/rust/lancedb/src/query.rs index 085678f1..137015a7 100644 --- a/rust/lancedb/src/query.rs +++ b/rust/lancedb/src/query.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use arrow_array::Float32Array; -use lance::dataset::scanner::DatasetRecordBatchStream; use lance_linalg::distance::MetricType; +use crate::arrow::SendableRecordBatchStream; use crate::error::Result; use crate::table::TableInternal; @@ -81,13 +81,15 @@ impl Query { } } - /// Convert the query plan to a [`DatasetRecordBatchStream`] + /// Convert the query plan to a [`SendableRecordBatchStream`] /// /// # Returns /// - /// * A [DatasetRecordBatchStream] with the query's results. - pub async fn execute_stream(&self) -> Result { - self.parent.clone().do_query(self).await + /// * A [SendableRecordBatchStream] with the query's results. + pub async fn execute_stream(&self) -> Result { + Ok(SendableRecordBatchStream::from( + self.parent.clone().query(self).await?, + )) } /// Set the column to query diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index f258d9a8..649e46b4 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -51,19 +51,19 @@ impl TableInternal for RemoteTable { async fn count_rows(&self, _filter: Option) -> Result { todo!() } - async fn do_add(&self, _add: AddDataBuilder) -> Result<()> { + async fn add(&self, _add: AddDataBuilder) -> Result<()> { todo!() } - async fn do_query(&self, _query: &Query) -> Result { + async fn query(&self, _query: &Query) -> Result { todo!() } async fn delete(&self, _predicate: &str) -> Result<()> { todo!() } - async fn do_create_index(&self, _index: IndexBuilder) -> Result<()> { + async fn create_index(&self, _index: IndexBuilder) -> Result<()> { todo!() } - async fn do_merge_insert( + async fn merge_insert( &self, _params: MergeInsertBuilder, _new_data: Box, diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 710cbbd1..80a212e6 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -18,7 +18,7 @@ use std::path::Path; use std::sync::Arc; use arrow_array::{RecordBatchIterator, RecordBatchReader}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use chrono::Duration; use lance::dataset::builder::DatasetBuilder; @@ -27,13 +27,11 @@ use lance::dataset::optimize::{ compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions, }; use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner}; +pub use lance::dataset::ColumnAlteration; +pub use lance::dataset::NewColumnTransform; pub use lance::dataset::ReadParams; -use lance::dataset::{ - ColumnAlteration, Dataset, NewColumnTransform, UpdateBuilder, WhenMatched, WriteMode, - WriteParams, -}; +use lance::dataset::{Dataset, UpdateBuilder, WhenMatched, WriteMode, WriteParams}; use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource}; -use lance::index::scalar::ScalarIndexParams; use lance::io::WrappingObjectStore; use lance_index::IndexType; use lance_index::{optimize::OptimizeOptions, DatasetIndexExt}; @@ -41,9 +39,10 @@ use log::info; use snafu::whatever; use crate::error::{Error, Result}; -use crate::index::vector::{VectorIndex, VectorIndexStatistics}; +use crate::index::vector::{IvfPqIndexBuilder, VectorIndex, VectorIndexStatistics}; use crate::index::{ - suggested_num_partitions, suggested_num_sub_vectors, IndexBuilder, IndexParams, + vector::{suggested_num_partitions, suggested_num_sub_vectors}, + Index, IndexBuilder, }; use crate::query::{Query, Select, DEFAULT_TOP_K}; use crate::utils::{default_vector_column, PatchReadParam, PatchWriteParam}; @@ -146,7 +145,7 @@ impl AddDataBuilder { } pub async fn execute(self) -> Result<()> { - self.parent.clone().do_add(self).await + self.parent.clone().add(self).await } } @@ -161,11 +160,11 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn async fn schema(&self) -> Result; /// Count the number of rows in this table. async fn count_rows(&self, filter: Option) -> Result; - async fn do_add(&self, add: AddDataBuilder) -> Result<()>; - async fn do_query(&self, query: &Query) -> Result; + async fn add(&self, add: AddDataBuilder) -> Result<()>; + async fn query(&self, query: &Query) -> Result; async fn delete(&self, predicate: &str) -> Result<()>; - async fn do_create_index(&self, index: IndexBuilder) -> Result<()>; - async fn do_merge_insert( + async fn create_index(&self, index: IndexBuilder) -> Result<()>; + async fn merge_insert( &self, params: MergeInsertBuilder, new_data: Box, @@ -294,7 +293,33 @@ impl Table { self.inner.delete(predicate).await } - /// Create an index on the column name. + /// Create an index on the provided column(s). + /// + /// Indices are used to speed up searches and are often needed when the size of the table + /// becomes large (the exact size depends on many factors but somewhere between 100K rows + /// and 1M rows is a good rule of thumb) + /// + /// There are a variety of indices available. They are described more in + /// [`crate::index::Index`]. The simplest thing to do is to use `index::Index::Auto` which + /// will attempt to create the most useful index based on the column type and column + /// statistics. + /// + /// Once an index is created it will remain until the data is overwritten (e.g. an + /// add operation with mode overwrite) or the indexed column is dropped. + /// + /// Indices are not automatically updated with new data. If you add new data to the + /// table then the index will not include the new rows. However, a table search will + /// still consider the unindexed rows. Searches will issue both an indexed search (on + /// the data covered by the index) and a flat search (on the unindexed data) and the + /// results will be combined. + /// + /// If there is enough unindexed data then the flat search will become slow and the index + /// should be optimized. Optimizing an index will add any unindexed data to the existing + /// index without rerunning the full index creation process. For more details see + /// [Table::optimize]. + /// + /// Note: Multi-column (composite) indices are not currently supported. However, they will + /// be supported in the future and the API is designed to be compatible with them. /// /// # Examples /// @@ -304,22 +329,28 @@ impl Table { /// # RecordBatchIterator, Int32Array}; /// # use arrow_schema::{Schema, Field, DataType}; /// # tokio::runtime::Runtime::new().unwrap().block_on(async { + /// use lancedb::index::Index; /// let tmpdir = tempfile::tempdir().unwrap(); /// let db = lancedb::connect(tmpdir.path().to_str().unwrap()) /// .execute() /// .await /// .unwrap(); /// # let tbl = db.open_table("idx_test").execute().await.unwrap(); - /// tbl.create_index(&["vector"]) - /// .ivf_pq() - /// .num_partitions(256) - /// .build() - /// .await - /// .unwrap(); + /// tbl.create_index(&["vector"], Index::Auto) + /// .execute() + /// .await + /// .unwrap(); /// # }); /// ``` - pub fn create_index(&self, column: &[&str]) -> IndexBuilder { - IndexBuilder::new(self.inner.clone(), column) + pub fn create_index(&self, columns: &[impl AsRef], index: Index) -> IndexBuilder { + IndexBuilder::new( + self.inner.clone(), + columns + .iter() + .map(|val| val.as_ref().to_string()) + .collect::>(), + index, + ) } /// Create a builder for a merge insert operation @@ -671,6 +702,28 @@ impl NativeTable { Ok(name.to_string()) } + fn supported_btree_data_type(dtype: &DataType) -> bool { + dtype.is_integer() + || dtype.is_floating() + || matches!( + dtype, + DataType::Boolean + | DataType::Utf8 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Date32 + | DataType::Date64 + | DataType::Timestamp(_, _) + ) + } + + fn supported_vector_data_type(dtype: &DataType) -> bool { + match dtype { + DataType::FixedSizeList(inner, _) => DataType::is_floating(inner.data_type()), + _ => false, + } + } + /// Creates a new Table /// /// # Arguments @@ -881,6 +934,102 @@ impl NativeTable { Ok(Some(index_stats)) } + + async fn create_ivf_pq_index( + &self, + index: IvfPqIndexBuilder, + field: &Field, + replace: bool, + ) -> Result<()> { + if !Self::supported_vector_data_type(field.data_type()) { + return Err(Error::InvalidInput { + message: format!( + "An IVF PQ index cannot be created on the column `{}` which has data type {}", + field.name(), + field.data_type() + ), + }); + } + + let num_partitions = if let Some(n) = index.num_partitions { + n + } else { + suggested_num_partitions(self.count_rows(None).await?) + }; + let num_sub_vectors: u32 = if let Some(n) = index.num_sub_vectors { + n + } else { + match field.data_type() { + arrow_schema::DataType::FixedSizeList(_, n) => { + Ok::(suggested_num_sub_vectors(*n as u32)) + } + _ => Err(Error::Schema { + message: format!("Column '{}' is not a FixedSizeList", field.name()), + }), + }? + }; + let mut dataset = self.dataset.get_mut().await?; + let lance_idx_params = lance::index::vector::VectorIndexParams::ivf_pq( + num_partitions as usize, + /*num_bits=*/ 8, + num_sub_vectors as usize, + false, + index.distance_type, + index.max_iterations as usize, + ); + dataset + .create_index( + &[field.name()], + IndexType::Vector, + None, + &lance_idx_params, + replace, + ) + .await?; + Ok(()) + } + + async fn create_auto_index(&self, field: &Field, opts: IndexBuilder) -> Result<()> { + if Self::supported_vector_data_type(field.data_type()) { + self.create_ivf_pq_index(IvfPqIndexBuilder::default(), field, opts.replace) + .await + } else if Self::supported_btree_data_type(field.data_type()) { + self.create_btree_index(field, opts).await + } else { + Err(Error::InvalidInput { + message: format!( + "there are no indices supported for the field `{}` with the data type {}", + field.name(), + field.data_type() + ), + }) + } + } + + async fn create_btree_index(&self, field: &Field, opts: IndexBuilder) -> Result<()> { + if !Self::supported_btree_data_type(field.data_type()) { + return Err(Error::Schema { + message: format!( + "A BTree index cannot be created on the field `{}` which has data type {}", + field.name(), + field.data_type() + ), + }); + } + + let mut dataset = self.dataset.get_mut().await?; + let lance_idx_params = lance::index::scalar::ScalarIndexParams {}; + dataset + .create_index( + &[field.name()], + IndexType::Scalar, + None, + &lance_idx_params, + opts.replace, + ) + .await?; + Ok(()) + } } #[async_trait::async_trait] @@ -913,7 +1062,7 @@ impl TableInternal for NativeTable { } } - async fn do_add(&self, add: AddDataBuilder) -> Result<()> { + async fn add(&self, add: AddDataBuilder) -> Result<()> { let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams { mode: match add.mode { AddDataMode::Append => WriteMode::Append, @@ -933,7 +1082,24 @@ impl TableInternal for NativeTable { Ok(()) } - async fn do_query(&self, query: &Query) -> Result { + async fn create_index(&self, opts: IndexBuilder) -> Result<()> { + if opts.columns.len() != 1 { + return Err(Error::Schema { + message: "Multi-column (composite) indices are not yet supported".to_string(), + }); + } + let schema = self.schema().await?; + + let field = schema.field_with_name(&opts.columns[0])?; + + match opts.index { + Index::Auto => self.create_auto_index(field, opts).await, + Index::BTree(_) => self.create_btree_index(field, opts).await, + Index::IvfPq(ivf_pq) => self.create_ivf_pq_index(ivf_pq, field, opts.replace).await, + } + } + + async fn query(&self, query: &Query) -> Result { let ds_ref = self.dataset.get().await?; let mut scanner: Scanner = ds_ref.scan(); @@ -992,7 +1158,7 @@ impl TableInternal for NativeTable { Ok(scanner.try_into_stream().await?) } - async fn do_merge_insert( + async fn merge_insert( &self, params: MergeInsertBuilder, new_data: Box, @@ -1028,112 +1194,6 @@ impl TableInternal for NativeTable { Ok(()) } - async fn do_create_index(&self, index: IndexBuilder) -> Result<()> { - let schema = self.schema().await?; - - // TODO: simplify this after GH lance#1864. - let mut index_type = &index.index_type; - let columns = if index.columns.is_empty() { - // By default we create vector index. - index_type = &IndexType::Vector; - vec![default_vector_column(&schema, None)?] - } else { - index.columns.clone() - }; - - if columns.len() != 1 { - return Err(Error::Schema { - message: "Only one column is supported for index".to_string(), - }); - } - let column = &columns[0]; - - let field = schema.field_with_name(column)?; - - let params = match index_type { - IndexType::Scalar => IndexParams::Scalar { - replace: index.replace, - }, - IndexType::Vector => { - let num_partitions = if let Some(n) = index.num_partitions { - n - } else { - suggested_num_partitions(self.count_rows(None).await?) - }; - let num_sub_vectors: u32 = if let Some(n) = index.num_sub_vectors { - n - } else { - match field.data_type() { - arrow_schema::DataType::FixedSizeList(_, n) => { - Ok::(suggested_num_sub_vectors(*n as u32)) - } - _ => Err(Error::Schema { - message: format!( - "Column '{}' is not a FixedSizeList", - &index.columns[0] - ), - }), - }? - }; - IndexParams::IvfPq { - replace: index.replace, - metric_type: index.metric_type, - num_partitions: num_partitions as u64, - num_sub_vectors, - num_bits: index.num_bits, - sample_rate: index.sample_rate, - max_iterations: index.max_iterations, - } - } - }; - - let tbl = self - .as_native() - .expect("Only native table is supported here"); - let mut dataset = tbl.dataset.get_mut().await?; - match params { - IndexParams::Scalar { replace } => { - dataset - .create_index( - &[&column], - IndexType::Scalar, - None, - &ScalarIndexParams::default(), - replace, - ) - .await? - } - IndexParams::IvfPq { - replace, - metric_type, - num_partitions, - num_sub_vectors, - num_bits, - max_iterations, - .. - } => { - let lance_idx_params = lance::index::vector::VectorIndexParams::ivf_pq( - num_partitions as usize, - num_bits as u8, - num_sub_vectors as usize, - false, - metric_type, - max_iterations as usize, - ); - dataset - .create_index( - &[column], - IndexType::Vector, - None, - &lance_idx_params, - replace, - ) - .await?; - } - } - Ok(()) - } - /// Delete rows from the table async fn delete(&self, predicate: &str) -> Result<()> { self.dataset.get_mut().await?.delete(predicate).await?; @@ -1858,11 +1918,8 @@ mod tests { ); table - .create_index(&["embeddings"]) - .ivf_pq() - .name("my_index") - .num_partitions(256) - .build() + .create_index(&["embeddings"], Index::Auto) + .execute() .await .unwrap(); diff --git a/rust/lancedb/src/table/merge.rs b/rust/lancedb/src/table/merge.rs index e7d141e1..1633160d 100644 --- a/rust/lancedb/src/table/merge.rs +++ b/rust/lancedb/src/table/merge.rs @@ -98,6 +98,6 @@ impl MergeInsertBuilder { /// /// Nothing is returned but the [`super::Table`] is updated pub async fn execute(self, new_data: Box) -> Result<()> { - self.table.clone().do_merge_insert(self, new_data).await + self.table.clone().merge_insert(self, new_data).await } }