diff --git a/.github/workflows/docs_test.yml b/.github/workflows/docs_test.yml index 40fe89b1..a6e22571 100644 --- a/.github/workflows/docs_test.yml +++ b/.github/workflows/docs_test.yml @@ -58,51 +58,3 @@ jobs: run: | cd docs/test/python for d in *; do cd "$d"; echo "$d".py; python "$d".py; cd ..; done - test-node: - name: Test doc nodejs code - runs-on: ubuntu-24.04 - timeout-minutes: 60 - strategy: - fail-fast: false - steps: - - name: Checkout - uses: actions/checkout@v4 - with: - fetch-depth: 0 - lfs: true - - name: Print CPU capabilities - run: cat /proc/cpuinfo - - name: Set up Node - uses: actions/setup-node@v4 - with: - node-version: 20 - - name: Install protobuf - run: | - sudo apt update - sudo apt install -y protobuf-compiler - - name: Install dependecies needed for ubuntu - run: | - sudo apt install -y libssl-dev - rustup update && rustup default - - name: Rust cache - uses: swatinem/rust-cache@v2 - - name: Install node dependencies - run: | - sudo swapoff -a - sudo fallocate -l 8G /swapfile - sudo chmod 600 /swapfile - sudo mkswap /swapfile - sudo swapon /swapfile - sudo swapon --show - cd node - npm ci - npm run build-release - cd ../docs - npm install - - name: Test - env: - LANCEDB_URI: ${{ secrets.LANCEDB_URI }} - LANCEDB_DEV_API_KEY: ${{ secrets.LANCEDB_DEV_API_KEY }} - run: | - cd docs - npm t diff --git a/docs/src/js/classes/Query.md b/docs/src/js/classes/Query.md index ae1ff574..7e467b85 100644 --- a/docs/src/js/classes/Query.md +++ b/docs/src/js/classes/Query.md @@ -14,7 +14,7 @@ A builder for LanceDB queries. ## Extends -- [`QueryBase`](QueryBase.md)<`NativeQuery`> +- `StandardQueryBase`<`NativeQuery`> ## Properties @@ -26,7 +26,7 @@ protected inner: Query | Promise; #### Inherited from -[`QueryBase`](QueryBase.md).[`inner`](QueryBase.md#inner) +`StandardQueryBase.inner` ## Methods @@ -73,7 +73,7 @@ AnalyzeExec verbose=true, metrics=[] #### Inherited from -[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan) +`StandardQueryBase.analyzePlan` *** @@ -107,7 +107,7 @@ single query) #### Inherited from -[`QueryBase`](QueryBase.md).[`execute`](QueryBase.md#execute) +`StandardQueryBase.execute` *** @@ -143,7 +143,7 @@ const plan = await table.query().nearestTo([0.5, 0.2]).explainPlan(); #### Inherited from -[`QueryBase`](QueryBase.md).[`explainPlan`](QueryBase.md#explainplan) +`StandardQueryBase.explainPlan` *** @@ -164,7 +164,7 @@ Use [Table#optimize](Table.md#optimize) to index all un-indexed data. #### Inherited from -[`QueryBase`](QueryBase.md).[`fastSearch`](QueryBase.md#fastsearch) +`StandardQueryBase.fastSearch` *** @@ -194,7 +194,7 @@ Use `where` instead #### Inherited from -[`QueryBase`](QueryBase.md).[`filter`](QueryBase.md#filter) +`StandardQueryBase.filter` *** @@ -216,7 +216,7 @@ fullTextSearch(query, options?): this #### Inherited from -[`QueryBase`](QueryBase.md).[`fullTextSearch`](QueryBase.md#fulltextsearch) +`StandardQueryBase.fullTextSearch` *** @@ -241,7 +241,7 @@ called then every valid row from the table will be returned. #### Inherited from -[`QueryBase`](QueryBase.md).[`limit`](QueryBase.md#limit) +`StandardQueryBase.limit` *** @@ -325,6 +325,10 @@ nearestToText(query, columns?): Query offset(offset): this ``` +Set the number of rows to skip before returning results. + +This is useful for pagination. + #### Parameters * **offset**: `number` @@ -335,7 +339,7 @@ offset(offset): this #### Inherited from -[`QueryBase`](QueryBase.md).[`offset`](QueryBase.md#offset) +`StandardQueryBase.offset` *** @@ -388,7 +392,7 @@ object insertion order is easy to get wrong and `Map` is more foolproof. #### Inherited from -[`QueryBase`](QueryBase.md).[`select`](QueryBase.md#select) +`StandardQueryBase.select` *** @@ -410,7 +414,7 @@ Collect the results as an array of objects. #### Inherited from -[`QueryBase`](QueryBase.md).[`toArray`](QueryBase.md#toarray) +`StandardQueryBase.toArray` *** @@ -436,7 +440,7 @@ ArrowTable. #### Inherited from -[`QueryBase`](QueryBase.md).[`toArrow`](QueryBase.md#toarrow) +`StandardQueryBase.toArrow` *** @@ -471,7 +475,7 @@ on the filter column(s). #### Inherited from -[`QueryBase`](QueryBase.md).[`where`](QueryBase.md#where) +`StandardQueryBase.where` *** @@ -493,4 +497,4 @@ order to perform hybrid search. #### Inherited from -[`QueryBase`](QueryBase.md).[`withRowId`](QueryBase.md#withrowid) +`StandardQueryBase.withRowId` diff --git a/docs/src/js/classes/QueryBase.md b/docs/src/js/classes/QueryBase.md index b58173ed..b177b9e0 100644 --- a/docs/src/js/classes/QueryBase.md +++ b/docs/src/js/classes/QueryBase.md @@ -15,12 +15,11 @@ Common methods supported by all query types ## Extended by -- [`Query`](Query.md) -- [`VectorQuery`](VectorQuery.md) +- [`TakeQuery`](TakeQuery.md) ## Type Parameters -• **NativeQueryType** *extends* `NativeQuery` \| `NativeVectorQuery` +• **NativeQueryType** *extends* `NativeQuery` \| `NativeVectorQuery` \| `NativeTakeQuery` ## Implements @@ -141,104 +140,6 @@ const plan = await table.query().nearestTo([0.5, 0.2]).explainPlan(); *** -### fastSearch() - -```ts -fastSearch(): this -``` - -Skip searching un-indexed data. This can make search faster, but will miss -any data that is not yet indexed. - -Use [Table#optimize](Table.md#optimize) to index all un-indexed data. - -#### Returns - -`this` - -*** - -### ~~filter()~~ - -```ts -filter(predicate): this -``` - -A filter statement to be applied to this query. - -#### Parameters - -* **predicate**: `string` - -#### Returns - -`this` - -#### See - -where - -#### Deprecated - -Use `where` instead - -*** - -### fullTextSearch() - -```ts -fullTextSearch(query, options?): this -``` - -#### Parameters - -* **query**: `string` \| [`FullTextQuery`](../interfaces/FullTextQuery.md) - -* **options?**: `Partial`<[`FullTextSearchOptions`](../interfaces/FullTextSearchOptions.md)> - -#### Returns - -`this` - -*** - -### limit() - -```ts -limit(limit): this -``` - -Set the maximum number of results to return. - -By default, a plain search has no limit. If this method is not -called then every valid row from the table will be returned. - -#### Parameters - -* **limit**: `number` - -#### Returns - -`this` - -*** - -### offset() - -```ts -offset(offset): this -``` - -#### Parameters - -* **offset**: `number` - -#### Returns - -`this` - -*** - ### select() ```ts @@ -328,37 +229,6 @@ ArrowTable. *** -### where() - -```ts -where(predicate): this -``` - -A filter statement to be applied to this query. - -The filter should be supplied as an SQL query string. For example: - -#### Parameters - -* **predicate**: `string` - -#### Returns - -`this` - -#### Example - -```ts -x > 10 -y > 0 AND y < 100 -x > 5 OR y = 'test' - -Filtering performance can often be improved by creating a scalar index -on the filter column(s). -``` - -*** - ### withRowId() ```ts diff --git a/docs/src/js/classes/Session.md b/docs/src/js/classes/Session.md index 110fbf68..bd449c91 100644 --- a/docs/src/js/classes/Session.md +++ b/docs/src/js/classes/Session.md @@ -9,7 +9,8 @@ A session for managing caches and object stores across LanceDB operations. Sessions allow you to configure cache sizes for index and metadata caches, -which can significantly impact performance for large datasets. +which can significantly impact memory use and performance. They can +also be re-used across multiple connections to share the same cache state. ## Constructors @@ -24,8 +25,11 @@ Create a new session with custom cache sizes. # Parameters - `index_cache_size_bytes`: The size of the index cache in bytes. + Index data is stored in memory in this cache to speed up queries. Defaults to 6GB if not specified. - `metadata_cache_size_bytes`: The size of the metadata cache in bytes. + The metadata cache stores file metadata and schema information in memory. + This cache improves scan and write performance. Defaults to 1GB if not specified. #### Parameters diff --git a/docs/src/js/classes/Table.md b/docs/src/js/classes/Table.md index 23fd8b38..fd7902f0 100644 --- a/docs/src/js/classes/Table.md +++ b/docs/src/js/classes/Table.md @@ -674,6 +674,48 @@ console.log(tags); // { "v1": { version: 1, manifestSize: ... } } *** +### takeOffsets() + +```ts +abstract takeOffsets(offsets): TakeQuery +``` + +Create a query that returns a subset of the rows in the table. + +#### Parameters + +* **offsets**: `number`[] + The offsets of the rows to return. + +#### Returns + +[`TakeQuery`](TakeQuery.md) + +A builder that can be used to parameterize the query. + +*** + +### takeRowIds() + +```ts +abstract takeRowIds(rowIds): TakeQuery +``` + +Create a query that returns a subset of the rows in the table. + +#### Parameters + +* **rowIds**: `number`[] + The row ids of the rows to return. + +#### Returns + +[`TakeQuery`](TakeQuery.md) + +A builder that can be used to parameterize the query. + +*** + ### toArrow() ```ts diff --git a/docs/src/js/classes/TakeQuery.md b/docs/src/js/classes/TakeQuery.md new file mode 100644 index 00000000..cda76fd5 --- /dev/null +++ b/docs/src/js/classes/TakeQuery.md @@ -0,0 +1,265 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / TakeQuery + +# Class: TakeQuery + +A query that returns a subset of the rows in the table. + +## Extends + +- [`QueryBase`](QueryBase.md)<`NativeTakeQuery`> + +## Properties + +### inner + +```ts +protected inner: TakeQuery | Promise; +``` + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`inner`](QueryBase.md#inner) + +## Methods + +### analyzePlan() + +```ts +analyzePlan(): Promise +``` + +Executes the query and returns the physical query plan annotated with runtime metrics. + +This is useful for debugging and performance analysis, as it shows how the query was executed +and includes metrics such as elapsed time, rows processed, and I/O statistics. + +#### Returns + +`Promise`<`string`> + +A query execution plan with runtime metrics for each step. + +#### Example + +```ts +import * as lancedb from "@lancedb/lancedb" + +const db = await lancedb.connect("./.lancedb"); +const table = await db.createTable("my_table", [ + { vector: [1.1, 0.9], id: "1" }, +]); + +const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan(); + +Example output (with runtime metrics inlined): +AnalyzeExec verbose=true, metrics=[] + ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs] + Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1] + CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs] + GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns] + FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs] + SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1] + KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1] + LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2] +``` + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan) + +*** + +### execute() + +```ts +protected execute(options?): RecordBatchIterator +``` + +Execute the query and return the results as an + +#### Parameters + +* **options?**: `Partial`<[`QueryExecutionOptions`](../interfaces/QueryExecutionOptions.md)> + +#### Returns + +[`RecordBatchIterator`](RecordBatchIterator.md) + +#### See + + - AsyncIterator +of + - RecordBatch. + +By default, LanceDb will use many threads to calculate results and, when +the result set is large, multiple batches will be processed at one time. +This readahead is limited however and backpressure will be applied if this +stream is consumed slowly (this constrains the maximum memory used by a +single query) + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`execute`](QueryBase.md#execute) + +*** + +### explainPlan() + +```ts +explainPlan(verbose): Promise +``` + +Generates an explanation of the query execution plan. + +#### Parameters + +* **verbose**: `boolean` = `false` + If true, provides a more detailed explanation. Defaults to false. + +#### Returns + +`Promise`<`string`> + +A Promise that resolves to a string containing the query execution plan explanation. + +#### Example + +```ts +import * as lancedb from "@lancedb/lancedb" +const db = await lancedb.connect("./.lancedb"); +const table = await db.createTable("my_table", [ + { vector: [1.1, 0.9], id: "1" }, +]); +const plan = await table.query().nearestTo([0.5, 0.2]).explainPlan(); +``` + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`explainPlan`](QueryBase.md#explainplan) + +*** + +### select() + +```ts +select(columns): this +``` + +Return only the specified columns. + +By default a query will return all columns from the table. However, this can have +a very significant impact on latency. LanceDb stores data in a columnar fashion. This +means we can finely tune our I/O to select exactly the columns we need. + +As a best practice you should always limit queries to the columns that you need. If you +pass in an array of column names then only those columns will be returned. + +You can also use this method to create new "dynamic" columns based on your existing columns. +For example, you may not care about "a" or "b" but instead simply want "a + b". This is often +seen in the SELECT clause of an SQL query (e.g. `SELECT a+b FROM my_table`). + +To create dynamic columns you can pass in a Map. A column will be returned +for each entry in the map. The key provides the name of the column. The value is +an SQL string used to specify how the column is calculated. + +For example, an SQL query might state `SELECT a + b AS combined, c`. The equivalent +input to this method would be: + +#### Parameters + +* **columns**: `string` \| `string`[] \| `Record`<`string`, `string`> \| `Map`<`string`, `string`> + +#### Returns + +`this` + +#### Example + +```ts +new Map([["combined", "a + b"], ["c", "c"]]) + +Columns will always be returned in the order given, even if that order is different than +the order used when adding the data. + +Note that you can pass in a `Record` (e.g. an object literal). This method +uses `Object.entries` which should preserve the insertion order of the object. However, +object insertion order is easy to get wrong and `Map` is more foolproof. +``` + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`select`](QueryBase.md#select) + +*** + +### toArray() + +```ts +toArray(options?): Promise +``` + +Collect the results as an array of objects. + +#### Parameters + +* **options?**: `Partial`<[`QueryExecutionOptions`](../interfaces/QueryExecutionOptions.md)> + +#### Returns + +`Promise`<`any`[]> + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`toArray`](QueryBase.md#toarray) + +*** + +### toArrow() + +```ts +toArrow(options?): Promise> +``` + +Collect the results as an Arrow + +#### Parameters + +* **options?**: `Partial`<[`QueryExecutionOptions`](../interfaces/QueryExecutionOptions.md)> + +#### Returns + +`Promise`<`Table`<`any`>> + +#### See + +ArrowTable. + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`toArrow`](QueryBase.md#toarrow) + +*** + +### withRowId() + +```ts +withRowId(): this +``` + +Whether to return the row id in the results. + +This column can be used to match results between different queries. For +example, to match results from a full text search and a vector search in +order to perform hybrid search. + +#### Returns + +`this` + +#### Inherited from + +[`QueryBase`](QueryBase.md).[`withRowId`](QueryBase.md#withrowid) diff --git a/docs/src/js/classes/VectorQuery.md b/docs/src/js/classes/VectorQuery.md index 66339774..f935cd21 100644 --- a/docs/src/js/classes/VectorQuery.md +++ b/docs/src/js/classes/VectorQuery.md @@ -16,7 +16,7 @@ This builder can be reused to execute the query many times. ## Extends -- [`QueryBase`](QueryBase.md)<`NativeVectorQuery`> +- `StandardQueryBase`<`NativeVectorQuery`> ## Properties @@ -28,7 +28,7 @@ protected inner: VectorQuery | Promise; #### Inherited from -[`QueryBase`](QueryBase.md).[`inner`](QueryBase.md#inner) +`StandardQueryBase.inner` ## Methods @@ -91,7 +91,7 @@ AnalyzeExec verbose=true, metrics=[] #### Inherited from -[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan) +`StandardQueryBase.analyzePlan` *** @@ -248,7 +248,7 @@ single query) #### Inherited from -[`QueryBase`](QueryBase.md).[`execute`](QueryBase.md#execute) +`StandardQueryBase.execute` *** @@ -284,7 +284,7 @@ const plan = await table.query().nearestTo([0.5, 0.2]).explainPlan(); #### Inherited from -[`QueryBase`](QueryBase.md).[`explainPlan`](QueryBase.md#explainplan) +`StandardQueryBase.explainPlan` *** @@ -305,7 +305,7 @@ Use [Table#optimize](Table.md#optimize) to index all un-indexed data. #### Inherited from -[`QueryBase`](QueryBase.md).[`fastSearch`](QueryBase.md#fastsearch) +`StandardQueryBase.fastSearch` *** @@ -335,7 +335,7 @@ Use `where` instead #### Inherited from -[`QueryBase`](QueryBase.md).[`filter`](QueryBase.md#filter) +`StandardQueryBase.filter` *** @@ -357,7 +357,7 @@ fullTextSearch(query, options?): this #### Inherited from -[`QueryBase`](QueryBase.md).[`fullTextSearch`](QueryBase.md#fulltextsearch) +`StandardQueryBase.fullTextSearch` *** @@ -382,7 +382,7 @@ called then every valid row from the table will be returned. #### Inherited from -[`QueryBase`](QueryBase.md).[`limit`](QueryBase.md#limit) +`StandardQueryBase.limit` *** @@ -480,6 +480,10 @@ the minimum and maximum to the same value. offset(offset): this ``` +Set the number of rows to skip before returning results. + +This is useful for pagination. + #### Parameters * **offset**: `number` @@ -490,7 +494,7 @@ offset(offset): this #### Inherited from -[`QueryBase`](QueryBase.md).[`offset`](QueryBase.md#offset) +`StandardQueryBase.offset` *** @@ -637,7 +641,7 @@ object insertion order is easy to get wrong and `Map` is more foolproof. #### Inherited from -[`QueryBase`](QueryBase.md).[`select`](QueryBase.md#select) +`StandardQueryBase.select` *** @@ -659,7 +663,7 @@ Collect the results as an array of objects. #### Inherited from -[`QueryBase`](QueryBase.md).[`toArray`](QueryBase.md#toarray) +`StandardQueryBase.toArray` *** @@ -685,7 +689,7 @@ ArrowTable. #### Inherited from -[`QueryBase`](QueryBase.md).[`toArrow`](QueryBase.md#toarrow) +`StandardQueryBase.toArrow` *** @@ -720,7 +724,7 @@ on the filter column(s). #### Inherited from -[`QueryBase`](QueryBase.md).[`where`](QueryBase.md#where) +`StandardQueryBase.where` *** @@ -742,4 +746,4 @@ order to perform hybrid search. #### Inherited from -[`QueryBase`](QueryBase.md).[`withRowId`](QueryBase.md#withrowid) +`StandardQueryBase.withRowId` diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index 11f3d899..857c0bc7 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -33,6 +33,7 @@ - [Table](classes/Table.md) - [TagContents](classes/TagContents.md) - [Tags](classes/Tags.md) +- [TakeQuery](classes/TakeQuery.md) - [VectorColumnOptions](classes/VectorColumnOptions.md) - [VectorQuery](classes/VectorQuery.md) diff --git a/docs/src/js/interfaces/TimeoutConfig.md b/docs/src/js/interfaces/TimeoutConfig.md index 73fde995..c821afc4 100644 --- a/docs/src/js/interfaces/TimeoutConfig.md +++ b/docs/src/js/interfaces/TimeoutConfig.md @@ -44,3 +44,17 @@ optional readTimeout: number; The timeout for reading data from the server in seconds. Default is 300 seconds (5 minutes). This can also be set via the environment variable `LANCE_CLIENT_READ_TIMEOUT`, as an integer number of seconds. + +*** + +### timeout? + +```ts +optional timeout: number; +``` + +The overall timeout for the entire request in seconds. This includes +connection, send, and read time. If the entire request doesn't complete +within this time, it will fail. Default is None (no overall timeout). +This can also be set via the environment variable `LANCE_CLIENT_TIMEOUT`, +as an integer number of seconds. diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 37f56e69..7717be4d 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -287,6 +287,12 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( expect(res2[1].id).toEqual(data2.id); }); + it("should support take queries", async () => { + await table.add([{ id: 1 }, { id: 2 }, { id: 3 }]); + const res = await table.takeOffsets([1, 2]).toArrow(); + expect(res.getChild("id")?.toJSON()).toEqual([2, 3]); + }); + it("should return the table as an instance of an arrow table", async () => { const arrowTbl = await table.toArrow(); expect(arrowTbl).toBeInstanceOf(ArrowTable); diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index ce0f4546..e27eb414 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -59,6 +59,7 @@ export { Query, QueryBase, VectorQuery, + TakeQuery, QueryExecutionOptions, FullTextSearchOptions, RecordBatchIterator, diff --git a/nodejs/lancedb/query.ts b/nodejs/lancedb/query.ts index 9ea82145..2fbe48b8 100644 --- a/nodejs/lancedb/query.ts +++ b/nodejs/lancedb/query.ts @@ -15,6 +15,7 @@ import { RecordBatchIterator as NativeBatchIterator, Query as NativeQuery, Table as NativeTable, + TakeQuery as NativeTakeQuery, VectorQuery as NativeVectorQuery, } from "./native"; import { Reranker } from "./rerankers"; @@ -50,7 +51,7 @@ export class RecordBatchIterator implements AsyncIterator { /* eslint-enable */ class RecordBatchIterable< - NativeQueryType extends NativeQuery | NativeVectorQuery, + NativeQueryType extends NativeQuery | NativeVectorQuery | NativeTakeQuery, > implements AsyncIterable { private inner: NativeQueryType; @@ -107,8 +108,9 @@ export interface FullTextSearchOptions { * * @hideconstructor */ -export class QueryBase - implements AsyncIterable +export class QueryBase< + NativeQueryType extends NativeQuery | NativeVectorQuery | NativeTakeQuery, +> implements AsyncIterable { /** * @hidden @@ -133,56 +135,6 @@ export class QueryBase fn(this.inner); } } - /** - * A filter statement to be applied to this query. - * - * The filter should be supplied as an SQL query string. For example: - * @example - * x > 10 - * y > 0 AND y < 100 - * x > 5 OR y = 'test' - * - * Filtering performance can often be improved by creating a scalar index - * on the filter column(s). - */ - where(predicate: string): this { - this.doCall((inner: NativeQueryType) => inner.onlyIf(predicate)); - return this; - } - /** - * A filter statement to be applied to this query. - * @see where - * @deprecated Use `where` instead - */ - filter(predicate: string): this { - return this.where(predicate); - } - - fullTextSearch( - query: string | FullTextQuery, - options?: Partial, - ): this { - let columns: string[] | null = null; - if (options) { - if (typeof options.columns === "string") { - columns = [options.columns]; - } else if (Array.isArray(options.columns)) { - columns = options.columns; - } - } - - this.doCall((inner: NativeQueryType) => { - if (typeof query === "string") { - inner.fullTextSearch({ - query: query, - columns: columns, - }); - } else { - inner.fullTextSearch({ query: query.inner }); - } - }); - return this; - } /** * Return only the specified columns. @@ -241,33 +193,6 @@ export class QueryBase return this; } - /** - * Set the maximum number of results to return. - * - * By default, a plain search has no limit. If this method is not - * called then every valid row from the table will be returned. - */ - limit(limit: number): this { - this.doCall((inner: NativeQueryType) => inner.limit(limit)); - return this; - } - - offset(offset: number): this { - this.doCall((inner: NativeQueryType) => inner.offset(offset)); - return this; - } - - /** - * Skip searching un-indexed data. This can make search faster, but will miss - * any data that is not yet indexed. - * - * Use {@link Table#optimize} to index all un-indexed data. - */ - fastSearch(): this { - this.doCall((inner: NativeQueryType) => inner.fastSearch()); - return this; - } - /** * Whether to return the row id in the results. * @@ -403,6 +328,100 @@ export class QueryBase } } +export class StandardQueryBase< + NativeQueryType extends NativeQuery | NativeVectorQuery, + > + extends QueryBase + implements ExecutableQuery +{ + constructor(inner: NativeQueryType | Promise) { + super(inner); + } + + /** + * A filter statement to be applied to this query. + * + * The filter should be supplied as an SQL query string. For example: + * @example + * x > 10 + * y > 0 AND y < 100 + * x > 5 OR y = 'test' + * + * Filtering performance can often be improved by creating a scalar index + * on the filter column(s). + */ + where(predicate: string): this { + this.doCall((inner: NativeQueryType) => inner.onlyIf(predicate)); + return this; + } + /** + * A filter statement to be applied to this query. + * @see where + * @deprecated Use `where` instead + */ + filter(predicate: string): this { + return this.where(predicate); + } + + fullTextSearch( + query: string | FullTextQuery, + options?: Partial, + ): this { + let columns: string[] | null = null; + if (options) { + if (typeof options.columns === "string") { + columns = [options.columns]; + } else if (Array.isArray(options.columns)) { + columns = options.columns; + } + } + + this.doCall((inner: NativeQueryType) => { + if (typeof query === "string") { + inner.fullTextSearch({ + query: query, + columns: columns, + }); + } else { + inner.fullTextSearch({ query: query.inner }); + } + }); + return this; + } + + /** + * Set the maximum number of results to return. + * + * By default, a plain search has no limit. If this method is not + * called then every valid row from the table will be returned. + */ + limit(limit: number): this { + this.doCall((inner: NativeQueryType) => inner.limit(limit)); + return this; + } + + /** + * Set the number of rows to skip before returning results. + * + * This is useful for pagination. + */ + offset(offset: number): this { + this.doCall((inner: NativeQueryType) => inner.offset(offset)); + return this; + } + + /** + * Skip searching un-indexed data. This can make search faster, but will miss + * any data that is not yet indexed. + * + * Use {@link Table#optimize} to index all un-indexed data. + */ + fastSearch(): this { + this.doCall((inner: NativeQueryType) => inner.fastSearch()); + return this; + } +} + /** * An interface for a query that can be executed * @@ -419,7 +438,7 @@ export interface ExecutableQuery {} * * @hideconstructor */ -export class VectorQuery extends QueryBase { +export class VectorQuery extends StandardQueryBase { /** * @hidden */ @@ -679,13 +698,24 @@ export class VectorQuery extends QueryBase { } } +/** + * A query that returns a subset of the rows in the table. + * + * @hideconstructor + */ +export class TakeQuery extends QueryBase { + constructor(inner: NativeTakeQuery) { + super(inner); + } +} + /** A builder for LanceDB queries. * * @see {@link Table#query}, {@link Table#search} * * @hideconstructor */ -export class Query extends QueryBase { +export class Query extends StandardQueryBase { /** * @hidden */ diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 35dbd4c0..eee701f1 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -35,6 +35,7 @@ import { import { FullTextQuery, Query, + TakeQuery, VectorQuery, instanceOfFullTextQuery, } from "./query"; @@ -336,6 +337,20 @@ export abstract class Table { */ abstract query(): Query; + /** + * Create a query that returns a subset of the rows in the table. + * @param offsets The offsets of the rows to return. + * @returns A builder that can be used to parameterize the query. + */ + abstract takeOffsets(offsets: number[]): TakeQuery; + + /** + * Create a query that returns a subset of the rows in the table. + * @param rowIds The row ids of the rows to return. + * @returns A builder that can be used to parameterize the query. + */ + abstract takeRowIds(rowIds: number[]): TakeQuery; + /** * Create a search query to find the nearest neighbors * of the given query @@ -665,6 +680,14 @@ export class LocalTable extends Table { await this.inner.waitForIndex(indexNames, timeoutSeconds); } + takeOffsets(offsets: number[]): TakeQuery { + return new TakeQuery(this.inner.takeOffsets(offsets)); + } + + takeRowIds(rowIds: number[]): TakeQuery { + return new TakeQuery(this.inner.takeRowIds(rowIds)); + } + query(): Query { return new Query(this.inner); } diff --git a/nodejs/src/query.rs b/nodejs/src/query.rs index aa28aa05..3a1d441d 100644 --- a/nodejs/src/query.rs +++ b/nodejs/src/query.rs @@ -12,6 +12,7 @@ use lancedb::query::Query as LanceDbQuery; use lancedb::query::QueryBase; use lancedb::query::QueryExecutionOptions; use lancedb::query::Select; +use lancedb::query::TakeQuery as LanceDbTakeQuery; use lancedb::query::VectorQuery as LanceDbVectorQuery; use napi::bindgen_prelude::*; use napi_derive::napi; @@ -319,6 +320,79 @@ impl VectorQuery { } } +#[napi] +pub struct TakeQuery { + inner: LanceDbTakeQuery, +} + +#[napi] +impl TakeQuery { + pub fn new(query: LanceDbTakeQuery) -> Self { + Self { inner: query } + } + + #[napi] + pub fn select(&mut self, columns: Vec<(String, String)>) { + self.inner = self.inner.clone().select(Select::dynamic(&columns)); + } + + #[napi] + pub fn select_columns(&mut self, columns: Vec) { + self.inner = self.inner.clone().select(Select::columns(&columns)); + } + + #[napi] + pub fn with_row_id(&mut self) { + self.inner = self.inner.clone().with_row_id(); + } + + #[napi(catch_unwind)] + pub async fn execute( + &self, + max_batch_length: Option, + timeout_ms: Option, + ) -> napi::Result { + let mut execution_opts = QueryExecutionOptions::default(); + if let Some(max_batch_length) = max_batch_length { + execution_opts.max_batch_length = max_batch_length; + } + if let Some(timeout_ms) = timeout_ms { + execution_opts.timeout = Some(std::time::Duration::from_millis(timeout_ms as u64)) + } + let inner_stream = self + .inner + .execute_with_options(execution_opts) + .await + .map_err(|e| { + napi::Error::from_reason(format!( + "Failed to execute query stream: {}", + convert_error(&e) + )) + })?; + Ok(RecordBatchIterator::new(inner_stream)) + } + + #[napi] + pub async fn explain_plan(&self, verbose: bool) -> napi::Result { + self.inner.explain_plan(verbose).await.map_err(|e| { + napi::Error::from_reason(format!( + "Failed to retrieve the query plan: {}", + convert_error(&e) + )) + }) + } + + #[napi(catch_unwind)] + pub async fn analyze_plan(&self) -> napi::Result { + self.inner.analyze_plan().await.map_err(|e| { + napi::Error::from_reason(format!( + "Failed to execute analyze plan: {}", + convert_error(&e) + )) + }) + } +} + #[napi] #[derive(Debug, Clone)] pub struct JsFullTextQuery { diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index afc8203b..5ef7919c 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -15,7 +15,7 @@ use napi_derive::napi; use crate::error::NapiErrorExt; use crate::index::Index; use crate::merge::NativeMergeInsertBuilder; -use crate::query::{Query, VectorQuery}; +use crate::query::{Query, TakeQuery, VectorQuery}; #[napi] pub struct Table { @@ -187,6 +187,44 @@ impl Table { Ok(Query::new(self.inner_ref()?.query())) } + #[napi(catch_unwind)] + pub fn take_offsets(&self, offsets: Vec) -> napi::Result { + Ok(TakeQuery::new( + self.inner_ref()?.take_offsets( + offsets + .into_iter() + .map(|o| { + u64::try_from(o).map_err(|e| { + napi::Error::from_reason(format!( + "Failed to convert offset to u64: {}", + e + )) + }) + }) + .collect::>>()?, + ), + )) + } + + #[napi(catch_unwind)] + pub fn take_row_ids(&self, row_ids: Vec) -> napi::Result { + Ok(TakeQuery::new( + self.inner_ref()?.take_row_ids( + row_ids + .into_iter() + .map(|o| { + u64::try_from(o).map_err(|e| { + napi::Error::from_reason(format!( + "Failed to convert row id to u64: {}", + e + )) + }) + }) + .collect::>>()?, + ), + )) + } + #[napi(catch_unwind)] pub fn vector_search(&self, vector: Float32Array) -> napi::Result { self.query()?.nearest_to(vector) diff --git a/nodejs/typedoc.json b/nodejs/typedoc.json index 72784657..e46085cd 100644 --- a/nodejs/typedoc.json +++ b/nodejs/typedoc.json @@ -2,6 +2,7 @@ "intentionallyNotExported": [ "lancedb/native.d.ts:Query", "lancedb/native.d.ts:VectorQuery", + "lancedb/native.d.ts:TakeQuery", "lancedb/native.d.ts:RecordBatchIterator", "lancedb/native.d.ts:NativeMergeInsertBuilder" ], diff --git a/python/python/lancedb/query.py b/python/python/lancedb/query.py index 4ac0fd6b..83484099 100644 --- a/python/python/lancedb/query.py +++ b/python/python/lancedb/query.py @@ -28,6 +28,7 @@ import pyarrow.fs as pa_fs import pydantic from lancedb.pydantic import PYDANTIC_VERSION +from lancedb.background_loop import LOOP from . import __version__ from .arrow import AsyncRecordBatchReader @@ -48,6 +49,7 @@ if TYPE_CHECKING: from ._lancedb import FTSQuery as LanceFTSQuery from ._lancedb import HybridQuery as LanceHybridQuery from ._lancedb import VectorQuery as LanceVectorQuery + from ._lancedb import TakeQuery as LanceTakeQuery from ._lancedb import PyQueryRequest from .common import VEC from .pydantic import LanceModel @@ -2139,7 +2141,11 @@ class LanceHybridQueryBuilder(LanceQueryBuilder): class AsyncQueryBase(object): - def __init__(self, inner: Union[LanceQuery, LanceVectorQuery]): + """ + Base class for all async queries (take, scan, vector, fts, hybrid) + """ + + def __init__(self, inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery]): """ Construct an AsyncQueryBase @@ -2149,27 +2155,14 @@ class AsyncQueryBase(object): self._inner = inner def to_query_object(self) -> Query: + """ + Convert the query into a query object + + This is currently experimental but can be useful as the query object is pure + python and more easily serializable. + """ return Query.from_inner(self._inner.to_query_request()) - def where(self, predicate: str) -> Self: - """ - Only return rows matching the given predicate - - The predicate should be supplied as an SQL query string. - - Examples - -------- - - >>> predicate = "x > 10" - >>> predicate = "y > 0 AND y < 100" - >>> predicate = "x > 5 OR y = 'test'" - - Filtering performance can often be improved by creating a scalar index - on the filter column(s). - """ - self._inner.where(predicate) - return self - def select(self, columns: Union[List[str], dict[str, str]]) -> Self: """ Return only the specified columns. @@ -2208,42 +2201,6 @@ class AsyncQueryBase(object): raise TypeError("columns must be a list of column names or a dict") return self - def limit(self, limit: int) -> Self: - """ - Set the maximum number of results to return. - - By default, a plain search has no limit. If this method is not - called then every valid row from the table will be returned. - """ - self._inner.limit(limit) - return self - - def offset(self, offset: int) -> Self: - """ - Set the offset for the results. - - Parameters - ---------- - offset: int - The offset to start fetching results from. - """ - self._inner.offset(offset) - return self - - def fast_search(self) -> Self: - """ - Skip searching un-indexed data. - - This can make queries faster, but will miss any data that has not been - indexed. - - !!! tip - You can add new data into an existing index by calling - [AsyncTable.optimize][lancedb.table.AsyncTable.optimize]. - """ - self._inner.fast_search() - return self - def with_row_id(self) -> Self: """ Include the _rowid column in the results. @@ -2251,27 +2208,6 @@ class AsyncQueryBase(object): self._inner.with_row_id() return self - def postfilter(self) -> Self: - """ - If this is called then filtering will happen after the search instead of - before. - By default filtering will be performed before the search. This is how - filtering is typically understood to work. This prefilter step does add some - additional latency. Creating a scalar index on the filter column(s) can - often improve this latency. However, sometimes a filter is too complex or - scalar indices cannot be applied to the column. In these cases postfiltering - can be used instead of prefiltering to improve latency. - Post filtering applies the filter to the results of the search. This - means we only run the filter on a much smaller set of data. However, it can - cause the query to return fewer than `limit` results (or even no results) if - none of the nearest results match the filter. - Post filtering happens during the "refine stage" (described in more detail in - @see {@link VectorQuery#refineFactor}). This means that setting a higher refine - factor can often help restore some of the results lost by post filtering. - """ - self._inner.postfilter() - return self - async def to_batches( self, *, @@ -2295,7 +2231,9 @@ class AsyncQueryBase(object): complete within the specified time, an error will be raised. """ return AsyncRecordBatchReader( - await self._inner.execute(max_batch_length, timeout) + await self._inner.execute( + max_batch_length=max_batch_length, timeout=timeout + ) ) async def to_arrow(self, timeout: Optional[timedelta] = None) -> pa.Table: @@ -2454,7 +2392,98 @@ class AsyncQueryBase(object): return await self._inner.analyze_plan() -class AsyncQuery(AsyncQueryBase): +class AsyncStandardQuery(AsyncQueryBase): + """ + Base class for "standard" async queries (all but take currently) + """ + + def __init__(self, inner: Union[LanceQuery, LanceVectorQuery]): + """ + Construct an AsyncStandardQuery + + This method is not intended to be called directly. Instead, use the + [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. + """ + super().__init__(inner) + + def where(self, predicate: str) -> Self: + """ + Only return rows matching the given predicate + + The predicate should be supplied as an SQL query string. + + Examples + -------- + + >>> predicate = "x > 10" + >>> predicate = "y > 0 AND y < 100" + >>> predicate = "x > 5 OR y = 'test'" + + Filtering performance can often be improved by creating a scalar index + on the filter column(s). + """ + self._inner.where(predicate) + return self + + def limit(self, limit: int) -> Self: + """ + Set the maximum number of results to return. + + By default, a plain search has no limit. If this method is not + called then every valid row from the table will be returned. + """ + self._inner.limit(limit) + return self + + def offset(self, offset: int) -> Self: + """ + Set the offset for the results. + + Parameters + ---------- + offset: int + The offset to start fetching results from. + """ + self._inner.offset(offset) + return self + + def fast_search(self) -> Self: + """ + Skip searching un-indexed data. + + This can make queries faster, but will miss any data that has not been + indexed. + + !!! tip + You can add new data into an existing index by calling + [AsyncTable.optimize][lancedb.table.AsyncTable.optimize]. + """ + self._inner.fast_search() + return self + + def postfilter(self) -> Self: + """ + If this is called then filtering will happen after the search instead of + before. + By default filtering will be performed before the search. This is how + filtering is typically understood to work. This prefilter step does add some + additional latency. Creating a scalar index on the filter column(s) can + often improve this latency. However, sometimes a filter is too complex or + scalar indices cannot be applied to the column. In these cases postfiltering + can be used instead of prefiltering to improve latency. + Post filtering applies the filter to the results of the search. This + means we only run the filter on a much smaller set of data. However, it can + cause the query to return fewer than `limit` results (or even no results) if + none of the nearest results match the filter. + Post filtering happens during the "refine stage" (described in more detail in + @see {@link VectorQuery#refineFactor}). This means that setting a higher refine + factor can often help restore some of the results lost by post filtering. + """ + self._inner.postfilter() + return self + + +class AsyncQuery(AsyncStandardQuery): def __init__(self, inner: LanceQuery): """ Construct an AsyncQuery @@ -2588,7 +2617,7 @@ class AsyncQuery(AsyncQueryBase): return AsyncFTSQuery(self._inner.nearest_to_text({"query": query})) -class AsyncFTSQuery(AsyncQueryBase): +class AsyncFTSQuery(AsyncStandardQuery): """A query for full text search for LanceDB.""" def __init__(self, inner: LanceFTSQuery): @@ -2867,7 +2896,7 @@ class AsyncVectorQueryBase: return self -class AsyncVectorQuery(AsyncQueryBase, AsyncVectorQueryBase): +class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): def __init__(self, inner: LanceVectorQuery): """ Construct an AsyncVectorQuery @@ -2950,7 +2979,7 @@ class AsyncVectorQuery(AsyncQueryBase, AsyncVectorQueryBase): return AsyncRecordBatchReader(results, max_batch_length=max_batch_length) -class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase): +class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase): """ A query builder that performs hybrid vector and full text search. Results are combined and reranked based on the specified reranker. @@ -3102,3 +3131,252 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase): results.append(await self._inner.to_fts_query().analyze_plan()) return "\n".join(results) + + +class AsyncTakeQuery(AsyncQueryBase): + """ + Builder for parameterizing and executing take queries. + """ + + def __init__(self, inner: LanceTakeQuery): + super().__init__(inner) + + +class BaseQueryBuilder(object): + """ + Wraps AsyncQueryBase and provides a synchronous interface + """ + + def __init__(self, inner: AsyncQueryBase): + self._inner = inner + + def to_query_object(self) -> Query: + return self._inner.to_query_object() + + def select(self, columns: Union[List[str], dict[str, str]]) -> Self: + """ + Return only the specified columns. + + By default a query will return all columns from the table. However, this can + have a very significant impact on latency. LanceDb stores data in a columnar + fashion. This + means we can finely tune our I/O to select exactly the columns we need. + + As a best practice you should always limit queries to the columns that you need. + If you pass in a list of column names then only those columns will be + returned. + + You can also use this method to create new "dynamic" columns based on your + existing columns. For example, you may not care about "a" or "b" but instead + simply want "a + b". This is often seen in the SELECT clause of an SQL query + (e.g. `SELECT a+b FROM my_table`). + + To create dynamic columns you can pass in a dict[str, str]. A column will be + returned for each entry in the map. The key provides the name of the column. + The value is an SQL string used to specify how the column is calculated. + + For example, an SQL query might state `SELECT a + b AS combined, c`. The + equivalent input to this method would be `{"combined": "a + b", "c": "c"}`. + + Columns will always be returned in the order given, even if that order is + different than the order used when adding the data. + """ + self._inner.select(columns) + return self + + def with_row_id(self) -> Self: + """ + Include the _rowid column in the results. + """ + self._inner.with_row_id() + return self + + def to_batches( + self, + *, + max_batch_length: Optional[int] = None, + timeout: Optional[timedelta] = None, + ) -> pa.RecordBatchReader: + """ + Execute the query and return the results as an Apache Arrow RecordBatchReader. + + Parameters + ---------- + + max_batch_length: Optional[int] + The maximum number of selected records in a single RecordBatch object. + If not specified, a default batch length is used. + It is possible for batches to be smaller than the provided length if the + underlying data is stored in smaller chunks. + timeout: Optional[timedelta] + The maximum time to wait for the query to complete. + If not specified, no timeout is applied. If the query does not + complete within the specified time, an error will be raised. + """ + async_iter = LOOP.run(self._inner.execute(max_batch_length, timeout)) + + def iter_sync(): + try: + while True: + yield LOOP.run(async_iter.__anext__()) + except StopAsyncIteration: + return + + return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync()) + + def to_arrow(self, timeout: Optional[timedelta] = None) -> pa.Table: + """ + Execute the query and collect the results into an Apache Arrow Table. + + This method will collect all results into memory before returning. If + you expect a large number of results, you may want to use + [to_batches][lancedb.query.AsyncQueryBase.to_batches] + + Parameters + ---------- + timeout: Optional[timedelta] + The maximum time to wait for the query to complete. + If not specified, no timeout is applied. If the query does not + complete within the specified time, an error will be raised. + """ + return LOOP.run(self._inner.to_arrow(timeout)) + + def to_list(self, timeout: Optional[timedelta] = None) -> List[dict]: + """ + Execute the query and return the results as a list of dictionaries. + + Each list entry is a dictionary with the selected column names as keys, + or all table columns if `select` is not called. The vector and the "_distance" + fields are returned whether or not they're explicitly selected. + + Parameters + ---------- + timeout: Optional[timedelta] + The maximum time to wait for the query to complete. + If not specified, no timeout is applied. If the query does not + complete within the specified time, an error will be raised. + """ + return LOOP.run(self._inner.to_list(timeout)) + + def to_pandas( + self, + flatten: Optional[Union[int, bool]] = None, + timeout: Optional[timedelta] = None, + ) -> "pd.DataFrame": + """ + Execute the query and collect the results into a pandas DataFrame. + + This method will collect all results into memory before returning. If you + expect a large number of results, you may want to use + [to_batches][lancedb.query.AsyncQueryBase.to_batches] and convert each batch to + pandas separately. + + Examples + -------- + + >>> import asyncio + >>> from lancedb import connect_async + >>> async def doctest_example(): + ... conn = await connect_async("./.lancedb") + ... table = await conn.create_table("my_table", data=[{"a": 1, "b": 2}]) + ... async for batch in await table.query().to_batches(): + ... batch_df = batch.to_pandas() + >>> asyncio.run(doctest_example()) + + Parameters + ---------- + flatten: Optional[Union[int, bool]] + If flatten is True, flatten all nested columns. + If flatten is an integer, flatten the nested columns up to the + specified depth. + If unspecified, do not flatten the nested columns. + timeout: Optional[timedelta] + The maximum time to wait for the query to complete. + If not specified, no timeout is applied. If the query does not + complete within the specified time, an error will be raised. + """ + return LOOP.run(self._inner.to_pandas(flatten, timeout)) + + def to_polars( + self, + timeout: Optional[timedelta] = None, + ) -> "pl.DataFrame": + """ + Execute the query and collect the results into a Polars DataFrame. + + This method will collect all results into memory before returning. If you + expect a large number of results, you may want to use + [to_batches][lancedb.query.AsyncQueryBase.to_batches] and convert each batch to + polars separately. + + Parameters + ---------- + timeout: Optional[timedelta] + The maximum time to wait for the query to complete. + If not specified, no timeout is applied. If the query does not + complete within the specified time, an error will be raised. + + Examples + -------- + + >>> import asyncio + >>> import polars as pl + >>> from lancedb import connect_async + >>> async def doctest_example(): + ... conn = await connect_async("./.lancedb") + ... table = await conn.create_table("my_table", data=[{"a": 1, "b": 2}]) + ... async for batch in await table.query().to_batches(): + ... batch_df = pl.from_arrow(batch) + >>> asyncio.run(doctest_example()) + """ + return LOOP.run(self._inner.to_polars(timeout)) + + def explain_plan(self, verbose: Optional[bool] = False): + """Return the execution plan for this query. + + Examples + -------- + >>> import asyncio + >>> from lancedb import connect_async + >>> async def doctest_example(): + ... conn = await connect_async("./.lancedb") + ... table = await conn.create_table("my_table", [{"vector": [99, 99]}]) + ... query = [100, 100] + ... plan = await table.query().nearest_to([1, 2]).explain_plan(True) + ... print(plan) + >>> asyncio.run(doctest_example()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE + ProjectionExec: expr=[vector@0 as vector, _distance@2 as _distance] + GlobalLimitExec: skip=0, fetch=10 + FilterExec: _distance@2 IS NOT NULL + SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST, _rowid@1 ASC NULLS LAST], preserve_partitioning=[false] + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[vector], ... + + Parameters + ---------- + verbose : bool, default False + Use a verbose output format. + + Returns + ------- + plan : str + """ # noqa: E501 + return LOOP.run(self._inner.explain_plan(verbose)) + + def analyze_plan(self): + """Execute the query and display with runtime metrics. + + Returns + ------- + plan : str + """ + return LOOP.run(self._inner.analyze_plan()) + + +class LanceTakeQueryBuilder(BaseQueryBuilder): + """ + Builder for parameterizing and executing take queries. + """ + + def __init__(self, inner: AsyncTakeQuery): + super().__init__(inner) diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index a07bc00d..676e56af 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -26,7 +26,7 @@ from lancedb.common import DATA, VEC, VECTOR_COLUMN_NAME from lancedb.merge import LanceMergeInsertBuilder from lancedb.embeddings import EmbeddingFunctionRegistry -from ..query import LanceVectorQueryBuilder, LanceQueryBuilder +from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder from ..table import AsyncTable, IndexStatistics, Query, Table, Tags @@ -617,6 +617,12 @@ class RemoteTable(Table): def stats(self): return LOOP.run(self._table.stats()) + def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder: + return LanceTakeQueryBuilder(self._table.take_offsets(offsets)) + + def take_row_ids(self, row_ids: list[int]) -> LanceTakeQueryBuilder: + return LanceTakeQueryBuilder(self._table.take_row_ids(row_ids)) + def uses_v2_manifest_paths(self) -> bool: raise NotImplementedError( "uses_v2_manifest_paths() is not supported on the LanceDB Cloud" diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 3088572a..61247e5c 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -51,6 +51,7 @@ from .query import ( AsyncFTSQuery, AsyncHybridQuery, AsyncQuery, + AsyncTakeQuery, AsyncVectorQuery, FullTextQuery, LanceEmptyQueryBuilder, @@ -58,6 +59,7 @@ from .query import ( LanceHybridQueryBuilder, LanceQueryBuilder, LanceVectorQueryBuilder, + LanceTakeQueryBuilder, Query, ) from .util import ( @@ -1103,6 +1105,66 @@ class Table(ABC): """ raise NotImplementedError + @abstractmethod + def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder: + """ + Take a list of offsets from the table. + + Offsets are 0-indexed and relative to the current version of the table. Offsets + are not stable. A row with an offset of N may have a different offset in a + different version of the table (e.g. if an earlier row is deleted). + + Offsets are mostly useful for sampling as the set of all valid offsets is easily + known in advance to be [0, len(table)). + + No guarantees are made regarding the order in which results are returned. If + you desire an output order that matches the order of the given offsets, you will + need to add the row offset column to the output and align it yourself. + + Parameters + ---------- + offsets: list[int] + The offsets to take. + + Returns + ------- + pa.RecordBatch + A record batch containing the rows at the given offsets. + """ + + @abstractmethod + def take_row_ids(self, row_ids: list[int]) -> LanceTakeQueryBuilder: + """ + Take a list of row ids from the table. + + Row ids are not stable and are relative to the current version of the table. + They can change due to compaction and updates. + + No guarantees are made regarding the order in which results are returned. If + you desire an output order that matches the order of the given ids, you will + need to add the row id column to the output and align it yourself. + + Unlike offsets, row ids are not 0-indexed and no assumptions should be made + about the possible range of row ids. In order to use this method you must + first obtain the row ids by scanning or searching the table. + + Even so, row ids are more stable than offsets and can be useful in some + situations. + + There is an ongoing effort to make row ids stable which is tracked at + https://github.com/lancedb/lancedb/issues/1120 + + Parameters + ---------- + row_ids: list[int] + The row ids to take. + + Returns + ------- + AsyncTakeQuery + A query object that can be executed to get the rows. + """ + @abstractmethod def _execute_query( self, @@ -1648,6 +1710,12 @@ class LanceTable(Table): """Get the current version of the table""" return LOOP.run(self._table.version()) + def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder: + return LanceTakeQueryBuilder(self._table.take_offsets(offsets)) + + def take_row_ids(self, row_ids: list[int]) -> LanceTakeQueryBuilder: + return LanceTakeQueryBuilder(self._table.take_row_ids(row_ids)) + @property def tags(self) -> Tags: """Tag management for the table. @@ -4030,6 +4098,58 @@ class AsyncTable: """ await self._inner.restore(version) + def take_offsets(self, offsets: list[int]) -> AsyncTakeQuery: + """ + Take a list of offsets from the table. + + Offsets are 0-indexed and relative to the current version of the table. Offsets + are not stable. A row with an offset of N may have a different offset in a + different version of the table (e.g. if an earlier row is deleted). + + Offsets are mostly useful for sampling as the set of all valid offsets is easily + known in advance to be [0, len(table)). + + Parameters + ---------- + offsets: list[int] + The offsets to take. + + Returns + ------- + pa.RecordBatch + A record batch containing the rows at the given offsets. + """ + return AsyncTakeQuery(self._inner.take_offsets(offsets)) + + def take_row_ids(self, row_ids: list[int]) -> AsyncTakeQuery: + """ + Take a list of row ids from the table. + + Row ids are not stable and are relative to the current version of the table. + They can change due to compaction and updates. + + Unlike offsets, row ids are not 0-indexed and no assumptions should be made + about the possible range of row ids. In order to use this method you must + first obtain the row ids by scanning or searching the table. + + Even so, row ids are more stable than offsets and can be useful in some + situations. + + There is an ongoing effort to make row ids stable which is tracked at + https://github.com/lancedb/lancedb/issues/1120 + + Parameters + ---------- + row_ids: list[int] + The row ids to take. + + Returns + ------- + AsyncTakeQuery + A query object that can be executed to get the rows. + """ + return AsyncTakeQuery(self._inner.take_row_ids(row_ids)) + @property def tags(self) -> AsyncTags: """Tag management for the dataset. diff --git a/python/python/tests/test_query.py b/python/python/tests/test_query.py index 28f3d69e..756aa009 100644 --- a/python/python/tests/test_query.py +++ b/python/python/tests/test_query.py @@ -1327,6 +1327,34 @@ def test_query_timeout(tmp_path): ) +def test_take_queries(tmp_path): + db = lancedb.connect(tmp_path) + data = pa.table( + { + "idx": range(100), + } + ) + table = db.create_table("test", data) + + # Take by offset + assert list( + sorted(table.take_offsets([5, 2, 17]).to_pandas()["idx"].to_list()) + ) == [ + 2, + 5, + 17, + ] + + # Take by row id + assert list( + sorted(table.take_row_ids([5, 2, 17]).to_pandas()["idx"].to_list()) + ) == [ + 2, + 5, + 17, + ] + + @pytest.mark.asyncio async def test_query_timeout_async(tmp_path): db = await lancedb.connect_async(tmp_path) diff --git a/python/src/query.rs b/python/src/query.rs index 8fd87e7a..aa285c01 100644 --- a/python/src/query.rs +++ b/python/src/query.rs @@ -13,10 +13,12 @@ use lancedb::index::scalar::{ BooleanQuery, BoostQuery, FtsQuery, FullTextSearchQuery, MatchQuery, MultiMatchQuery, Occur, Operator, PhraseQuery, }; +use lancedb::query::QueryBase; use lancedb::query::QueryExecutionOptions; use lancedb::query::QueryFilter; use lancedb::query::{ - ExecutableQuery, Query as LanceDbQuery, QueryBase, Select, VectorQuery as LanceDbVectorQuery, + ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery, + VectorQuery as LanceDbVectorQuery, }; use lancedb::table::AnyQuery; use pyo3::prelude::{PyAnyMethods, PyDictMethods}; @@ -488,6 +490,76 @@ impl Query { } } +#[pyclass] +pub struct TakeQuery { + inner: LanceDbTakeQuery, +} + +impl TakeQuery { + pub fn new(query: LanceDbTakeQuery) -> Self { + Self { inner: query } + } +} + +#[pymethods] +impl TakeQuery { + pub fn select(&mut self, columns: Vec<(String, String)>) { + self.inner = self.inner.clone().select(Select::dynamic(&columns)); + } + + pub fn select_columns(&mut self, columns: Vec) { + self.inner = self.inner.clone().select(Select::columns(&columns)); + } + + pub fn with_row_id(&mut self) { + self.inner = self.inner.clone().with_row_id(); + } + + #[pyo3(signature = (max_batch_length=None, timeout=None))] + pub fn execute( + self_: PyRef<'_, Self>, + max_batch_length: Option, + timeout: Option, + ) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + let mut opts = QueryExecutionOptions::default(); + if let Some(max_batch_length) = max_batch_length { + opts.max_batch_length = max_batch_length; + } + if let Some(timeout) = timeout { + opts.timeout = Some(timeout); + } + let inner_stream = inner.execute_with_options(opts).await.infer_error()?; + Ok(RecordBatchStream::new(inner_stream)) + }) + } + + pub fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + inner + .explain_plan(verbose) + .await + .map_err(|e| PyRuntimeError::new_err(e.to_string())) + }) + } + + pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + inner + .analyze_plan() + .await + .map_err(|e| PyRuntimeError::new_err(e.to_string())) + }) + } + + pub fn to_query_request(&self) -> PyQueryRequest { + PyQueryRequest::from(AnyQuery::Query(self.inner.clone().into_request())) + } +} + #[pyclass] #[derive(Clone)] pub struct FTSQuery { diff --git a/python/src/table.rs b/python/src/table.rs index 7c097759..7641d6bd 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::{ error::PythonErrorExt, index::{extract_index_params, IndexConfig}, - query::Query, + query::{Query, TakeQuery}, }; use arrow::{ datatypes::{DataType, Schema}, @@ -568,6 +568,20 @@ impl Table { Ok(Tags::new(self.inner_ref()?.clone())) } + #[pyo3(signature = (offsets))] + pub fn take_offsets(self_: PyRef<'_, Self>, offsets: Vec) -> PyResult { + Ok(TakeQuery::new( + self_.inner_ref()?.clone().take_offsets(offsets), + )) + } + + #[pyo3(signature = (row_ids))] + pub fn take_row_ids(self_: PyRef<'_, Self>, row_ids: Vec) -> PyResult { + Ok(TakeQuery::new( + self_.inner_ref()?.clone().take_row_ids(row_ids), + )) + } + /// Optimize the on-disk data by compacting and pruning old data, for better performance. #[pyo3(signature = (cleanup_since_ms=None, delete_unverified=None, retrain=None))] pub fn optimize( diff --git a/rust/lancedb/src/query.rs b/rust/lancedb/src/query.rs index ba4424e3..ce8857c0 100644 --- a/rust/lancedb/src/query.rs +++ b/rust/lancedb/src/query.rs @@ -1206,6 +1206,144 @@ impl HasQuery for VectorQuery { } } +/// A builder for LanceDB take queries. +/// +/// See [`crate::Table::query`] for more details on queries +/// +/// A `TakeQuery` is a query that is used to select a subset of rows +/// from a table using dataset offsets or row ids. +/// +/// See [`ExecutableQuery`] for methods that can be used to execute +/// the query and retrieve results. +/// +/// This query object can be reused to issue the same query multiple +/// times. +#[derive(Debug, Clone)] +pub struct TakeQuery { + parent: Arc, + request: QueryRequest, +} + +impl TakeQuery { + /// Create a new `TakeQuery` that will return rows at the given offsets. + /// + /// See [`crate::Table::take_offsets`] for more details. + pub fn from_offsets(parent: Arc, offsets: Vec) -> Self { + let filter = format!( + "_rowoffset in ({})", + offsets + .iter() + .map(|o| o.to_string()) + .collect::>() + .join(",") + ); + Self { + parent, + request: QueryRequest { + filter: Some(QueryFilter::Sql(filter)), + ..Default::default() + }, + } + } + + /// Create a new `TakeQuery` that will return rows with the given row ids. + /// + /// See [`crate::Table::take_row_ids`] for more details. + pub fn from_row_ids(parent: Arc, row_ids: Vec) -> Self { + let filter = format!( + "_rowid in ({})", + row_ids + .iter() + .map(|o| o.to_string()) + .collect::>() + .join(",") + ); + Self { + parent, + request: QueryRequest { + filter: Some(QueryFilter::Sql(filter)), + ..Default::default() + }, + } + } + + /// Convert the `TakeQuery` into a `QueryRequest`. + pub fn into_request(self) -> QueryRequest { + self.request + } + + /// Return the current `QueryRequest` for the `TakeQuery`. + pub fn current_request(&self) -> &QueryRequest { + &self.request + } + + /// Return only the specified columns. + /// + /// By default a query will return all columns from the table. However, this can have + /// a very significant impact on latency. LanceDb stores data in a columnar fashion. This + /// means we can finely tune our I/O to select exactly the columns we need. + /// + /// As a best practice you should always limit queries to the columns that you need. + /// + /// You can also use this method to create new "dynamic" columns based on your existing columns. + /// For example, you may not care about "a" or "b" but instead simply want "a + b". This is often + /// seen in the SELECT clause of an SQL query (e.g. `SELECT a+b FROM my_table`). + /// + /// To create dynamic columns use [`Select::Dynamic`] (it might be easier to create this with the + /// helper method [`Select::dynamic`]). A column will be returned for each tuple provided. The + /// first value in that tuple provides the name of the column. The second value in the tuple is + /// an SQL string used to specify how the column is calculated. + /// + /// For example, an SQL query might state `SELECT a + b AS combined, c`. The equivalent + /// input to [`Select::dynamic`] would be `&[("combined", "a + b"), ("c", "c")]`. + /// + /// Columns will always be returned in the order given, even if that order is different than + /// the order used when adding the data. + pub fn select(mut self, selection: Select) -> Self { + self.request.select = selection; + self + } + + /// Return the `_rowid` meta column from the Table. + pub fn with_row_id(mut self) -> Self { + self.request.with_row_id = true; + self + } +} + +impl HasQuery for TakeQuery { + fn mut_query(&mut self) -> &mut QueryRequest { + &mut self.request + } +} + +impl ExecutableQuery for TakeQuery { + async fn create_plan(&self, options: QueryExecutionOptions) -> Result> { + let req = AnyQuery::Query(self.request.clone()); + self.parent.clone().create_plan(&req, options).await + } + + async fn execute_with_options( + &self, + options: QueryExecutionOptions, + ) -> Result { + let query = AnyQuery::Query(self.request.clone()); + Ok(SendableRecordBatchStream::from( + self.parent.clone().query(&query, options).await?, + )) + } + + async fn explain_plan(&self, verbose: bool) -> Result { + let query = AnyQuery::Query(self.request.clone()); + self.parent.explain_plan(&query, verbose).await + } + + async fn analyze_plan_with_options(&self, options: QueryExecutionOptions) -> Result { + let query = AnyQuery::Query(self.request.clone()); + self.parent.analyze_plan(&query, options).await + } +} + #[cfg(test)] mod tests { use std::{collections::HashSet, sync::Arc}; @@ -1802,4 +1940,78 @@ mod tests { assert_eq!(0, batch.num_rows()); assert_eq!(2, batch.num_columns()); } + + #[tokio::test] + async fn test_take_offsets() { + let tmp_dir = tempdir().unwrap(); + let table = make_test_table(&tmp_dir).await; + + let results = table + .take_offsets(vec![5, 1, 17]) + .execute() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + assert_eq!(results[0].num_columns(), 2); + + let mut ids = results[0] + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .to_vec(); + ids.sort(); + + assert_eq!(ids, vec![1, 5, 17]); + + // Select specific columns + let results = table + .take_offsets(vec![5, 1, 17]) + .select(Select::Columns(vec!["vector".to_string()])) + .execute() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + assert_eq!(results[0].num_columns(), 1); + } + + #[tokio::test] + async fn test_take_row_ids() { + let tmp_dir = tempdir().unwrap(); + let table = make_test_table(&tmp_dir).await; + + let results = table + .take_row_ids(vec![5, 1, 17]) + .execute() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + assert_eq!(results[0].num_columns(), 2); + + let mut ids = results[0] + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .to_vec(); + + ids.sort(); + + assert_eq!(ids, vec![1, 5, 17]); + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index d21d9bda..cee85932 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -62,8 +62,8 @@ use crate::index::{ }; use crate::index::{IndexConfig, IndexStatisticsImpl}; use crate::query::{ - IntoQueryVector, Query, QueryExecutionOptions, QueryFilter, QueryRequest, Select, VectorQuery, - VectorQueryRequest, DEFAULT_TOP_K, + IntoQueryVector, Query, QueryExecutionOptions, QueryFilter, QueryRequest, Select, TakeQuery, + VectorQuery, VectorQueryRequest, DEFAULT_TOP_K, }; use crate::utils::{ default_vector_column, supported_bitmap_data_type, supported_btree_data_type, @@ -1078,6 +1078,54 @@ impl Table { Query::new(self.inner.clone()) } + /// Extract rows from the dataset using dataset offsets. + /// + /// Dataset offsets are 0-indexed and relative to the current version of the table. + /// They are not stable. A row with an offset of N may have a different offset in a + /// different version of the table (e.g. if an earlier row is deleted). + /// + /// Offsets are useful for sampling as the set of all valid offsets is easily + /// known in advance to be [0, len(table)). + /// + /// No guarantees are made regarding the order in which results are returned. If you + /// desire an output order that matches the order of the given offsets, you will need + /// to add the row offset column to the output and align it yourself. + /// + /// Parameters + /// ---------- + /// offsets: list[int] + /// The offsets to take. + /// + /// Returns + /// ------- + /// pa.RecordBatch + /// A record batch containing the rows at the given offsets. + pub fn take_offsets(&self, offsets: Vec) -> TakeQuery { + TakeQuery::from_offsets(self.inner.clone(), offsets) + } + + /// Extract rows from the dataset using row ids. + /// + /// Row ids are not stable and are relative to the current version of the table. + /// They can change due to compaction and updates. + /// + /// Even so, row ids are more stable than offsets and can be useful in some situations. + /// + /// There is an ongoing effort to make row ids stable which is tracked at + /// https://github.com/lancedb/lancedb/issues/1120 + /// + /// No guarantees are made regarding the order in which results are returned. If you + /// desire an output order that matches the order of the given ids, you will need + /// to add the row id column to the output and align it yourself. + /// Parameters + /// ---------- + /// row_ids: list[int] + /// The row ids to take. + /// + pub fn take_row_ids(&self, row_ids: Vec) -> TakeQuery { + TakeQuery::from_row_ids(self.inner.clone(), row_ids) + } + /// Search the table with a given query vector. /// /// This is a convenience method for preparing a vector query and