From 3ae90dde80a3af0bc14a61d9d2529ef67f83eac0 Mon Sep 17 00:00:00 2001 From: Ryan Green Date: Mon, 21 Apr 2025 08:41:21 -0230 Subject: [PATCH] feat: add new table API to wait for async indexing (#2338) * Add new wait_for_index() table operation that polls until indices are created/fully indexed * Add an optional wait timeout parameter to all create_index operations * Python and NodeJS interfaces ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added optional waiting for index creation completion with configurable timeout. - Introduced methods to poll and wait for indices to be fully built across sync and async tables. - Extended index creation APIs to accept a wait timeout parameter. - **Bug Fixes** - Added a new timeout error variant for improved error reporting on index operations. - **Tests** - Added tests covering successful index readiness waiting, timeout scenarios, and missing index cases. --- Cargo.lock | 8 +- docs/src/js/classes/Table.md | 23 +++++ docs/src/js/interfaces/IndexOptions.md | 8 ++ nodejs/__test__/table.test.ts | 10 +++ nodejs/lancedb/indices.ts | 2 + nodejs/lancedb/table.ts | 27 +++++- nodejs/src/table.rs | 17 ++++ python/python/lancedb/remote/table.py | 26 +++++- python/python/lancedb/table.py | 56 +++++++++++- python/python/tests/test_remote_db.py | 120 ++++++++++++++++++++++++- python/src/table.rs | 28 +++++- rust/lancedb/src/error.rs | 2 + rust/lancedb/src/index.rs | 16 +++- rust/lancedb/src/index/waiter.rs | 90 +++++++++++++++++++ rust/lancedb/src/remote/table.rs | 114 +++++++++++++++++++++-- rust/lancedb/src/table.rs | 68 ++++++++++++-- 16 files changed, 582 insertions(+), 33 deletions(-) create mode 100644 rust/lancedb/src/index/waiter.rs diff --git a/Cargo.lock b/Cargo.lock index d9d321b1..e0f52f5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4115,7 +4115,7 @@ dependencies = [ [[package]] name = "lancedb" -version = "0.19.0-beta.7" +version = "0.19.0-beta.8" dependencies = [ "arrow", "arrow-array", @@ -4202,7 +4202,7 @@ dependencies = [ [[package]] name = "lancedb-node" -version = "0.19.0-beta.7" +version = "0.19.0-beta.8" dependencies = [ "arrow-array", "arrow-ipc", @@ -4227,7 +4227,7 @@ dependencies = [ [[package]] name = "lancedb-nodejs" -version = "0.19.0-beta.7" +version = "0.19.0-beta.8" dependencies = [ "arrow-array", "arrow-ipc", @@ -4245,7 +4245,7 @@ dependencies = [ [[package]] name = "lancedb-python" -version = "0.22.0-beta.7" +version = "0.22.0-beta.8" dependencies = [ "arrow", "env_logger", diff --git a/docs/src/js/classes/Table.md b/docs/src/js/classes/Table.md index e771f27e..1dc54b8e 100644 --- a/docs/src/js/classes/Table.md +++ b/docs/src/js/classes/Table.md @@ -753,3 +753,26 @@ Retrieve the version of the table #### Returns `Promise`<`number`> + +*** + +### waitForIndex() + +```ts +abstract waitForIndex(indexNames, timeoutSeconds): Promise +``` + +Waits for asynchronous indexing to complete on the table. + +#### Parameters + +* **indexNames**: `string`[] + The name of the indices to wait for + +* **timeoutSeconds**: `number` + The number of seconds to wait before timing out + This will raise an error if the indices are not created and fully indexed within the timeout. + +#### Returns + +`Promise`<`void`> diff --git a/docs/src/js/interfaces/IndexOptions.md b/docs/src/js/interfaces/IndexOptions.md index 623d8b17..f7834974 100644 --- a/docs/src/js/interfaces/IndexOptions.md +++ b/docs/src/js/interfaces/IndexOptions.md @@ -39,3 +39,11 @@ and the same name, then an error will be returned. This is true even if that index is out of date. The default is true + +*** + +### waitTimeoutSeconds? + +```ts +optional waitTimeoutSeconds: number; +``` diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index ac56e98b..0980df76 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -507,6 +507,15 @@ describe("When creating an index", () => { expect(indices2.length).toBe(0); }); + it("should wait for index readiness", async () => { + // Create an index and then wait for it to be ready + await tbl.createIndex("vec"); + const indices = await tbl.listIndices(); + expect(indices.length).toBeGreaterThan(0); + const idxName = indices[0].name; + await expect(tbl.waitForIndex([idxName], 5)).resolves.toBeUndefined(); + }); + it("should search with distance range", async () => { await tbl.createIndex("vec"); @@ -824,6 +833,7 @@ describe("When creating an index", () => { // Only build index over v1 await tbl.createIndex("vec", { config: Index.ivfPq({ numPartitions: 2, numSubVectors: 2 }), + waitTimeoutSeconds: 30, }); const rst = await tbl diff --git a/nodejs/lancedb/indices.ts b/nodejs/lancedb/indices.ts index ff37723d..2aa14ff5 100644 --- a/nodejs/lancedb/indices.ts +++ b/nodejs/lancedb/indices.ts @@ -681,4 +681,6 @@ export interface IndexOptions { * The default is true */ replace?: boolean; + + waitTimeoutSeconds?: number; } diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 5f8d4412..54481656 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -246,6 +246,19 @@ export abstract class Table { */ abstract prewarmIndex(name: string): Promise; + /** + * Waits for asynchronous indexing to complete on the table. + * + * @param indexNames The name of the indices to wait for + * @param timeoutSeconds The number of seconds to wait before timing out + * + * This will raise an error if the indices are not created and fully indexed within the timeout. + */ + abstract waitForIndex( + indexNames: string[], + timeoutSeconds: number, + ): Promise; + /** * Create a {@link Query} Builder. * @@ -569,7 +582,12 @@ export class LocalTable extends Table { // Bit of a hack to get around the fact that TS has no package-scope. // biome-ignore lint/suspicious/noExplicitAny: skip const nativeIndex = (options?.config as any)?.inner; - await this.inner.createIndex(nativeIndex, column, options?.replace); + await this.inner.createIndex( + nativeIndex, + column, + options?.replace, + options?.waitTimeoutSeconds, + ); } async dropIndex(name: string): Promise { @@ -580,6 +598,13 @@ export class LocalTable extends Table { await this.inner.prewarmIndex(name); } + async waitForIndex( + indexNames: string[], + timeoutSeconds: number, + ): Promise { + await this.inner.waitForIndex(indexNames, timeoutSeconds); + } + query(): Query { return new Query(this.inner); } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 6b38c621..66f480c8 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -111,6 +111,7 @@ impl Table { index: Option<&Index>, column: String, replace: Option, + wait_timeout_s: Option, ) -> napi::Result<()> { let lancedb_index = if let Some(index) = index { index.consume()? @@ -121,6 +122,10 @@ impl Table { if let Some(replace) = replace { builder = builder.replace(replace); } + if let Some(timeout) = wait_timeout_s { + builder = + builder.wait_timeout(std::time::Duration::from_secs(timeout.try_into().unwrap())); + } builder.execute().await.default_error() } @@ -140,6 +145,18 @@ impl Table { .default_error() } + #[napi(catch_unwind)] + pub async fn wait_for_index(&self, index_names: Vec, timeout_s: i64) -> Result<()> { + let timeout = std::time::Duration::from_secs(timeout_s.try_into().unwrap()); + let index_names: Vec<&str> = index_names.iter().map(|s| s.as_str()).collect(); + let slice: &[&str] = &index_names; + + self.inner_ref()? + .wait_for_index(slice, timeout) + .await + .default_error() + } + #[napi(catch_unwind)] pub async fn update( &self, diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 59bcb5bb..81a1d754 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -104,6 +104,7 @@ class RemoteTable(Table): index_type: Literal["BTREE", "BITMAP", "LABEL_LIST", "scalar"] = "scalar", *, replace: bool = False, + wait_timeout: timedelta = None, ): """Creates a scalar index Parameters @@ -126,13 +127,18 @@ class RemoteTable(Table): else: raise ValueError(f"Unknown index type: {index_type}") - LOOP.run(self._table.create_index(column, config=config, replace=replace)) + LOOP.run( + self._table.create_index( + column, config=config, replace=replace, wait_timeout=wait_timeout + ) + ) def create_fts_index( self, column: str, *, replace: bool = False, + wait_timeout: timedelta = None, with_position: bool = True, # tokenizer configs: base_tokenizer: str = "simple", @@ -153,7 +159,11 @@ class RemoteTable(Table): remove_stop_words=remove_stop_words, ascii_folding=ascii_folding, ) - LOOP.run(self._table.create_index(column, config=config, replace=replace)) + LOOP.run( + self._table.create_index( + column, config=config, replace=replace, wait_timeout=wait_timeout + ) + ) def create_index( self, @@ -165,6 +175,7 @@ class RemoteTable(Table): replace: Optional[bool] = None, accelerator: Optional[str] = None, index_type="vector", + wait_timeout: Optional[timedelta] = None, ): """Create an index on the table. Currently, the only parameters that matter are @@ -236,7 +247,11 @@ class RemoteTable(Table): " 'IVF_FLAT', 'IVF_PQ', 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'" ) - LOOP.run(self._table.create_index(vector_column_name, config=config)) + LOOP.run( + self._table.create_index( + vector_column_name, config=config, wait_timeout=wait_timeout + ) + ) def add( self, @@ -554,6 +569,11 @@ class RemoteTable(Table): def drop_index(self, index_name: str): return LOOP.run(self._table.drop_index(index_name)) + def wait_for_index( + self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300) + ): + return LOOP.run(self._table.wait_for_index(index_names, timeout)) + def uses_v2_manifest_paths(self) -> bool: raise NotImplementedError( "uses_v2_manifest_paths() is not supported on the LanceDB Cloud" diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index f3edf8e5..b8b09fd3 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -631,6 +631,7 @@ class Table(ABC): index_cache_size: Optional[int] = None, *, index_type: VectorIndexType = "IVF_PQ", + wait_timeout: Optional[timedelta] = None, num_bits: int = 8, max_iterations: int = 50, sample_rate: int = 256, @@ -666,6 +667,8 @@ class Table(ABC): num_bits: int The number of bits to encode sub-vectors. Only used with the IVF_PQ index. Only 4 and 8 are supported. + wait_timeout: timedelta, optional + The timeout to wait if indexing is asynchronous. """ raise NotImplementedError @@ -689,6 +692,23 @@ class Table(ABC): """ raise NotImplementedError + def wait_for_index( + self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300) + ) -> None: + """ + Wait for indexing to complete for the given index names. + This will poll the table until all the indices are fully indexed, + or raise a timeout exception if the timeout is reached. + + Parameters + ---------- + index_names: str + The name of the indices to poll + timeout: timedelta + Timeout to wait for asynchronous indexing. The default is 5 minutes. + """ + raise NotImplementedError + @abstractmethod def create_scalar_index( self, @@ -696,6 +716,7 @@ class Table(ABC): *, replace: bool = True, index_type: ScalarIndexType = "BTREE", + wait_timeout: Optional[timedelta] = None, ): """Create a scalar index on a column. @@ -708,7 +729,8 @@ class Table(ABC): Replace the existing index if it exists. index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"], default "BTREE" The type of index to create. - + wait_timeout: timedelta, optional + The timeout to wait if indexing is asynchronous. Examples -------- @@ -767,6 +789,7 @@ class Table(ABC): stem: bool = False, remove_stop_words: bool = False, ascii_folding: bool = False, + wait_timeout: Optional[timedelta] = None, ): """Create a full-text search index on the table. @@ -822,6 +845,8 @@ class Table(ABC): ascii_folding : bool, default False Whether to fold ASCII characters. This converts accented characters to their ASCII equivalent. For example, "café" would be converted to "cafe". + wait_timeout: timedelta, optional + The timeout to wait if indexing is asynchronous. """ raise NotImplementedError @@ -1771,6 +1796,11 @@ class LanceTable(Table): """ return LOOP.run(self._table.prewarm_index(name)) + def wait_for_index( + self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300) + ) -> None: + return LOOP.run(self._table.wait_for_index(index_names, timeout)) + def create_scalar_index( self, column: str, @@ -2964,6 +2994,7 @@ class AsyncTable: config: Optional[ Union[IvfFlat, IvfPq, HnswPq, HnswSq, BTree, Bitmap, LabelList, FTS] ] = None, + wait_timeout: Optional[timedelta] = None, ): """Create an index to speed up queries @@ -2988,6 +3019,8 @@ class AsyncTable: For advanced configuration you can specify the type of index you would like to create. You can also specify index-specific parameters when creating an index object. + wait_timeout: timedelta, optional + The timeout to wait if indexing is asynchronous. """ if config is not None: if not isinstance( @@ -2998,7 +3031,9 @@ class AsyncTable: " Bitmap, LabelList, or FTS" ) try: - await self._inner.create_index(column, index=config, replace=replace) + await self._inner.create_index( + column, index=config, replace=replace, wait_timeout=wait_timeout + ) except ValueError as e: if "not support the requested language" in str(e): supported_langs = ", ".join(lang_mapping.values()) @@ -3043,6 +3078,23 @@ class AsyncTable: """ await self._inner.prewarm_index(name) + async def wait_for_index( + self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300) + ) -> None: + """ + Wait for indexing to complete for the given index names. + This will poll the table until all the indices are fully indexed, + or raise a timeout exception if the timeout is reached. + + Parameters + ---------- + index_names: str + The name of the indices to poll + timeout: timedelta + Timeout to wait for asynchronous indexing. The default is 5 minutes. + """ + await self._inner.wait_for_index(index_names, timeout) + async def add( self, data: DATA, diff --git a/python/python/tests/test_remote_db.py b/python/python/tests/test_remote_db.py index c4975f97..7c6bee28 100644 --- a/python/python/tests/test_remote_db.py +++ b/python/python/tests/test_remote_db.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The LanceDB Authors - +import re from concurrent.futures import ThreadPoolExecutor import contextlib from datetime import timedelta @@ -235,6 +235,10 @@ def test_table_add_in_threadpool(): def test_table_create_indices(): def handler(request): + index_stats = dict( + index_type="IVF_PQ", num_indexed_rows=1000, num_unindexed_rows=0 + ) + if request.path == "/v1/table/test/create_index/": request.send_response(200) request.end_headers() @@ -258,6 +262,47 @@ def test_table_create_indices(): ) ) request.wfile.write(payload.encode()) + elif request.path == "/v1/table/test/index/list/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps( + dict( + indexes=[ + { + "index_name": "id_idx", + "columns": ["id"], + }, + { + "index_name": "text_idx", + "columns": ["text"], + }, + { + "index_name": "vector_idx", + "columns": ["vector"], + }, + ] + ) + ) + request.wfile.write(payload.encode()) + elif request.path == "/v1/table/test/index/id_idx/stats/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps(index_stats) + request.wfile.write(payload.encode()) + elif request.path == "/v1/table/test/index/text_idx/stats/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps(index_stats) + request.wfile.write(payload.encode()) + elif request.path == "/v1/table/test/index/vector_idx/stats/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps(index_stats) + request.wfile.write(payload.encode()) elif "/drop/" in request.path: request.send_response(200) request.end_headers() @@ -269,14 +314,81 @@ def test_table_create_indices(): # Parameters are well-tested through local and async tests. # This is a smoke-test. table = db.create_table("test", [{"id": 1}]) - table.create_scalar_index("id") - table.create_fts_index("text") - table.create_scalar_index("vector") + table.create_scalar_index("id", wait_timeout=timedelta(seconds=2)) + table.create_fts_index("text", wait_timeout=timedelta(seconds=2)) + table.create_index( + vector_column_name="vector", wait_timeout=timedelta(seconds=10) + ) + table.wait_for_index(["id_idx"], timedelta(seconds=2)) + table.wait_for_index(["text_idx", "vector_idx"], timedelta(seconds=2)) table.drop_index("vector_idx") table.drop_index("id_idx") table.drop_index("text_idx") +def test_table_wait_for_index_timeout(): + def handler(request): + index_stats = dict( + index_type="BTREE", num_indexed_rows=1000, num_unindexed_rows=1 + ) + + if request.path == "/v1/table/test/create/?mode=create": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + request.wfile.write(b"{}") + elif request.path == "/v1/table/test/describe/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps( + dict( + version=1, + schema=dict( + fields=[ + dict(name="id", type={"type": "int64"}, nullable=False), + ] + ), + ) + ) + request.wfile.write(payload.encode()) + elif request.path == "/v1/table/test/index/list/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps( + dict( + indexes=[ + { + "index_name": "id_idx", + "columns": ["id"], + }, + ] + ) + ) + request.wfile.write(payload.encode()) + elif request.path == "/v1/table/test/index/id_idx/stats/": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + payload = json.dumps(index_stats) + print(f"{index_stats=}") + request.wfile.write(payload.encode()) + else: + request.send_response(404) + request.end_headers() + + with mock_lancedb_connection(handler) as db: + table = db.create_table("test", [{"id": 1}]) + with pytest.raises( + RuntimeError, + match=re.escape( + 'Timeout error: timed out waiting for indices: ["id_idx"] after 1s' + ), + ): + table.wait_for_index(["id_idx"], timedelta(seconds=1)) + + @contextlib.contextmanager def query_test_table(query_handler, *, server_version=Version("0.1.0")): def handler(request): diff --git a/python/src/table.rs b/python/src/table.rs index 0b0d4671..7575f0d4 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -177,15 +177,19 @@ impl Table { }) } - #[pyo3(signature = (column, index=None, replace=None))] + #[pyo3(signature = (column, index=None, replace=None, wait_timeout=None))] pub fn create_index<'a>( self_: PyRef<'a, Self>, column: String, index: Option>, replace: Option, + wait_timeout: Option>, ) -> PyResult> { let index = extract_index_params(&index)?; - let mut op = self_.inner_ref()?.create_index(&[column], index); + let timeout = wait_timeout.map(|t| t.extract::().unwrap()); + let mut op = self_ + .inner_ref()? + .create_index_with_timeout(&[column], index, timeout); if let Some(replace) = replace { op = op.replace(replace); } @@ -204,6 +208,26 @@ impl Table { }) } + pub fn wait_for_index<'a>( + self_: PyRef<'a, Self>, + index_names: Vec, + timeout: Bound<'_, PyAny>, + ) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + let timeout = timeout.extract::()?; + future_into_py(self_.py(), async move { + let index_refs = index_names + .iter() + .map(String::as_str) + .collect::>(); + inner + .wait_for_index(&index_refs, timeout) + .await + .infer_error()?; + Ok(()) + }) + } + pub fn prewarm_index(self_: PyRef<'_, Self>, index_name: String) -> PyResult> { let inner = self_.inner_ref()?.clone(); future_into_py(self_.py(), async move { diff --git a/rust/lancedb/src/error.rs b/rust/lancedb/src/error.rs index 34d2fe76..d5c0b52d 100644 --- a/rust/lancedb/src/error.rs +++ b/rust/lancedb/src/error.rs @@ -35,6 +35,8 @@ pub enum Error { Schema { message: String }, #[snafu(display("Runtime error: {message}"))] Runtime { message: String }, + #[snafu(display("Timeout error: {message}"))] + Timeout { message: String }, // 3rd party / external errors #[snafu(display("object_store error: {source}"))] diff --git a/rust/lancedb/src/index.rs b/rust/lancedb/src/index.rs index 60bb31c9..f383a118 100644 --- a/rust/lancedb/src/index.rs +++ b/rust/lancedb/src/index.rs @@ -1,11 +1,11 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors -use std::sync::Arc; - use scalar::FtsIndexBuilder; use serde::Deserialize; use serde_with::skip_serializing_none; +use std::sync::Arc; +use std::time::Duration; use vector::IvfFlatIndexBuilder; use crate::{table::BaseTable, DistanceType, Error, Result}; @@ -17,6 +17,7 @@ use self::{ pub mod scalar; pub mod vector; +pub mod waiter; /// Supported index types. #[derive(Debug, Clone)] @@ -69,6 +70,7 @@ pub struct IndexBuilder { pub(crate) index: Index, pub(crate) columns: Vec, pub(crate) replace: bool, + pub(crate) wait_timeout: Option, } impl IndexBuilder { @@ -78,6 +80,7 @@ impl IndexBuilder { index, columns, replace: true, + wait_timeout: None, } } @@ -91,6 +94,15 @@ impl IndexBuilder { self } + /// Duration of time to wait for asynchronous indexing to complete. If not set, + /// `create_index()` will not wait. + /// + /// This is not supported for `NativeTable` since indexing is synchronous. + pub fn wait_timeout(mut self, d: Duration) -> Self { + self.wait_timeout = Some(d); + self + } + pub async fn execute(self) -> Result<()> { self.parent.clone().create_index(self).await } diff --git a/rust/lancedb/src/index/waiter.rs b/rust/lancedb/src/index/waiter.rs new file mode 100644 index 00000000..8fde37b7 --- /dev/null +++ b/rust/lancedb/src/index/waiter.rs @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +use crate::error::Result; +use crate::table::BaseTable; +use crate::Error; +use log::debug; +use std::time::{Duration, Instant}; +use tokio::time::sleep; + +const DEFAULT_SLEEP_MS: u64 = 1000; +const MAX_WAIT: Duration = Duration::from_secs(2 * 60 * 60); + +/// Poll the table using list_indices() and index_stats() until all of the indices have 0 un-indexed rows. +/// Will return Error::Timeout if the columns are not fully indexed within the timeout. +pub async fn wait_for_index( + table: &dyn BaseTable, + index_names: &[&str], + timeout: Duration, +) -> Result<()> { + if timeout > MAX_WAIT { + return Err(Error::InvalidInput { + message: format!("timeout must be less than {:?}", MAX_WAIT).to_string(), + }); + } + let start = Instant::now(); + let mut remaining = index_names.to_vec(); + + // poll via list_indices() and index_stats() until all indices are created and fully indexed + while start.elapsed() < timeout { + let mut completed = vec![]; + let indices = table.list_indices().await?; + + for &idx in &remaining { + if !indices.iter().any(|i| i.name == *idx) { + debug!("still waiting for new index '{}'", idx); + continue; + } + + let stats = table.index_stats(idx.as_ref()).await?; + match stats { + None => { + debug!("still waiting for new index '{}'", idx); + continue; + } + Some(s) => { + if s.num_unindexed_rows == 0 { + // note: this may never stabilize under constant writes. + // we should later replace this with a status/job model + completed.push(idx); + debug!( + "fully indexed '{}'. indexed rows: {}", + idx, s.num_indexed_rows + ); + } else { + debug!( + "still waiting for index '{}'. unindexed rows: {}", + idx, s.num_unindexed_rows + ); + } + } + } + } + remaining.retain(|idx| !completed.contains(idx)); + if remaining.is_empty() { + return Ok(()); + } + sleep(Duration::from_millis(DEFAULT_SLEEP_MS)).await; + } + + // debug log index diagnostics + for &r in &remaining { + let stats = table.index_stats(r.as_ref()).await?; + match stats { + Some(s) => debug!( + "index '{}' not fully indexed after {:?}. stats: {:?}", + r, timeout, s + ), + None => debug!("index '{}' not found after {:?}", r, timeout), + } + } + + Err(Error::Timeout { + message: format!( + "timed out waiting for indices: {:?} after {:?}", + remaining, timeout + ) + .to_string(), + }) +} diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 73f909e9..a65fc1c1 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -1,10 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors -use std::io::Cursor; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; - use crate::index::Index; use crate::index::IndexStatistics; use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest}; @@ -26,8 +22,17 @@ use lance::dataset::scanner::DatasetRecordBatchStream; use lance::dataset::{ColumnAlteration, NewColumnTransform, Version}; use lance_datafusion::exec::{execute_plan, OneShotExec}; use serde::{Deserialize, Serialize}; +use std::io::Cursor; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use tokio::sync::RwLock; +use super::client::RequestResultExt; +use super::client::{HttpSend, RestfulLanceDbClient, Sender}; +use super::db::ServerVersion; +use super::ARROW_STREAM_CONTENT_TYPE; +use crate::index::waiter::wait_for_index; use crate::{ connection::NoData, error::Result, @@ -39,11 +44,6 @@ use crate::{ }, }; -use super::client::RequestResultExt; -use super::client::{HttpSend, RestfulLanceDbClient, Sender}; -use super::db::ServerVersion; -use super::ARROW_STREAM_CONTENT_TYPE; - const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); #[derive(Debug)] @@ -800,9 +800,20 @@ impl BaseTable for RemoteTable { self.check_table_response(&request_id, response).await?; + if let Some(wait_timeout) = index.wait_timeout { + let name = format!("{}_idx", column); + self.wait_for_index(&[&name], wait_timeout).await?; + } + Ok(()) } + /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns + /// are not fully indexed within the timeout. + async fn wait_for_index(&self, index_names: &[&str], timeout: Duration) -> Result<()> { + wait_for_index(self, index_names, timeout).await + } + async fn merge_insert( &self, params: MergeInsertBuilder, @@ -984,6 +995,7 @@ impl BaseTable for RemoteTable { let body = response.text().await.err_to_http(request_id.clone())?; + println!("body: {:?}", body); let stats = serde_json::from_str(&body).map_err(|e| Error::Http { source: format!("Failed to parse index statistics: {}", e).into(), request_id, @@ -2416,4 +2428,88 @@ mod tests { }); table.drop_index("my_index").await.unwrap(); } + + #[tokio::test] + async fn test_wait_for_index() { + let table = _make_table_with_indices(0); + table + .wait_for_index(&["vector_idx", "my_idx"], Duration::from_secs(1)) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_wait_for_index_timeout() { + let table = _make_table_with_indices(100); + let e = table + .wait_for_index(&["vector_idx", "my_idx"], Duration::from_secs(1)) + .await + .unwrap_err(); + assert_eq!( + e.to_string(), + "Timeout error: timed out waiting for indices: [\"vector_idx\", \"my_idx\"] after 1s" + ); + } + + #[tokio::test] + async fn test_wait_for_index_timeout_never_created() { + let table = _make_table_with_indices(0); + let e = table + .wait_for_index(&["doesnt_exist_idx"], Duration::from_secs(1)) + .await + .unwrap_err(); + assert_eq!( + e.to_string(), + "Timeout error: timed out waiting for indices: [\"doesnt_exist_idx\"] after 1s" + ); + } + + fn _make_table_with_indices(unindexed_rows: usize) -> Table { + let table = Table::new_with_handler("my_table", move |request| { + assert_eq!(request.method(), "POST"); + + let response_body = match request.url().path() { + "/v1/table/my_table/index/list/" => { + serde_json::json!({ + "indexes": [ + { + "index_name": "vector_idx", + "index_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "columns": ["vector"], + "index_status": "done", + }, + { + "index_name": "my_idx", + "index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6", + "columns": ["my_column"], + "index_status": "done", + }, + ] + }) + } + "/v1/table/my_table/index/vector_idx/stats/" => { + serde_json::json!({ + "num_indexed_rows": 100000, + "num_unindexed_rows": unindexed_rows, + "index_type": "IVF_PQ", + "distance_type": "l2" + }) + } + "/v1/table/my_table/index/my_idx/stats/" => { + serde_json::json!({ + "num_indexed_rows": 100000, + "num_unindexed_rows": unindexed_rows, + "index_type": "LABEL_LIST" + }) + } + _path => { + serde_json::json!(None::) + } + }; + let body = serde_json::to_string(&response_body).unwrap(); + let status = if body == "null" { 404 } else { 200 }; + http::Response::builder().status(status).body(body).unwrap() + }); + table + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 36151dc7..29f0edff 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -3,10 +3,6 @@ //! LanceDB Table APIs -use std::collections::HashMap; -use std::path::Path; -use std::sync::Arc; - use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder}; use arrow::datatypes::{Float32Type, UInt8Type}; use arrow_array::{RecordBatchIterator, RecordBatchReader}; @@ -45,6 +41,10 @@ use lance_table::format::Manifest; use lance_table::io::commit::ManifestNamingScheme; use log::info; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::format; +use std::path::Path; +use std::sync::Arc; use crate::arrow::IntoArrow; use crate::connection::NoData; @@ -78,6 +78,7 @@ pub mod datafusion; pub(crate) mod dataset; pub mod merge; +use crate::index::waiter::wait_for_index; pub use chrono::Duration; pub use lance::dataset::optimize::CompactionOptions; pub use lance::dataset::scanner::DatasetRecordBatchStream; @@ -491,6 +492,13 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { async fn table_definition(&self) -> Result; /// Get the table URI fn dataset_uri(&self) -> &str; + /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns + /// are not fully indexed within the timeout. + async fn wait_for_index( + &self, + index_names: &[&str], + timeout: std::time::Duration, + ) -> Result<()>; } /// A Table is a collection of strong typed Rows. @@ -769,6 +777,28 @@ impl Table { ) } + /// See [Table::create_index] + /// For remote tables, this allows an optional wait_timeout to poll until asynchronous indexing is complete + pub fn create_index_with_timeout( + &self, + columns: &[impl AsRef], + index: Index, + wait_timeout: Option, + ) -> IndexBuilder { + let mut builder = IndexBuilder::new( + self.inner.clone(), + columns + .iter() + .map(|val| val.as_ref().to_string()) + .collect::>(), + index, + ); + if let Some(timeout) = wait_timeout { + builder = builder.wait_timeout(timeout); + } + builder + } + /// Create a builder for a merge insert operation /// /// This operation can add rows, update rows, and remove rows all in a single @@ -1104,6 +1134,16 @@ impl Table { self.inner.prewarm_index(name).await } + /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns + /// are not fully indexed within the timeout. + pub async fn wait_for_index( + &self, + index_names: &[&str], + timeout: std::time::Duration, + ) -> Result<()> { + self.inner.wait_for_index(index_names, timeout).await + } + // Take many execution plans and map them into a single plan that adds // a query_index column and unions them. pub(crate) fn multi_vector_plan( @@ -2430,6 +2470,16 @@ impl BaseTable for NativeTable { loss, })) } + + /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns + /// are not fully indexed within the timeout. + async fn wait_for_index( + &self, + index_names: &[&str], + timeout: std::time::Duration, + ) -> Result<()> { + wait_for_index(self, index_names, timeout).await + } } #[cfg(test)] @@ -3213,7 +3263,10 @@ mod tests { .execute() .await .unwrap(); - + table + .wait_for_index(&["embeddings_idx"], Duration::from_millis(10)) + .await + .unwrap(); let index_configs = table.list_indices().await.unwrap(); assert_eq!(index_configs.len(), 1); let index = index_configs.into_iter().next().unwrap(); @@ -3281,7 +3334,10 @@ mod tests { .execute() .await .unwrap(); - + table + .wait_for_index(&["i_idx"], Duration::from_millis(10)) + .await + .unwrap(); let index_configs = table.list_indices().await.unwrap(); assert_eq!(index_configs.len(), 1); let index = index_configs.into_iter().next().unwrap();