feat: add new table API to wait for async indexing (#2338)

* Add new wait_for_index() table operation that polls until indices are
created/fully indexed
* Add an optional wait timeout parameter to all create_index operations
* Python and NodeJS interfaces

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Added optional waiting for index creation completion with configurable
timeout.
- Introduced methods to poll and wait for indices to be fully built
across sync and async tables.
  - Extended index creation APIs to accept a wait timeout parameter.
- **Bug Fixes**
- Added a new timeout error variant for improved error reporting on
index operations.
- **Tests**
- Added tests covering successful index readiness waiting, timeout
scenarios, and missing index cases.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Ryan Green
2025-04-21 08:41:21 -02:30
committed by GitHub
parent 4f07fea6df
commit 3ae90dde80
16 changed files with 582 additions and 33 deletions

8
Cargo.lock generated
View File

@@ -4115,7 +4115,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb" name = "lancedb"
version = "0.19.0-beta.7" version = "0.19.0-beta.8"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-array", "arrow-array",
@@ -4202,7 +4202,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-node" name = "lancedb-node"
version = "0.19.0-beta.7" version = "0.19.0-beta.8"
dependencies = [ dependencies = [
"arrow-array", "arrow-array",
"arrow-ipc", "arrow-ipc",
@@ -4227,7 +4227,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-nodejs" name = "lancedb-nodejs"
version = "0.19.0-beta.7" version = "0.19.0-beta.8"
dependencies = [ dependencies = [
"arrow-array", "arrow-array",
"arrow-ipc", "arrow-ipc",
@@ -4245,7 +4245,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-python" name = "lancedb-python"
version = "0.22.0-beta.7" version = "0.22.0-beta.8"
dependencies = [ dependencies = [
"arrow", "arrow",
"env_logger", "env_logger",

View File

@@ -753,3 +753,26 @@ Retrieve the version of the table
#### Returns #### Returns
`Promise`&lt;`number`&gt; `Promise`&lt;`number`&gt;
***
### waitForIndex()
```ts
abstract waitForIndex(indexNames, timeoutSeconds): Promise<void>
```
Waits for asynchronous indexing to complete on the table.
#### Parameters
* **indexNames**: `string`[]
The name of the indices to wait for
* **timeoutSeconds**: `number`
The number of seconds to wait before timing out
This will raise an error if the indices are not created and fully indexed within the timeout.
#### Returns
`Promise`&lt;`void`&gt;

View File

@@ -39,3 +39,11 @@ and the same name, then an error will be returned. This is true even if
that index is out of date. that index is out of date.
The default is true The default is true
***
### waitTimeoutSeconds?
```ts
optional waitTimeoutSeconds: number;
```

View File

@@ -507,6 +507,15 @@ describe("When creating an index", () => {
expect(indices2.length).toBe(0); expect(indices2.length).toBe(0);
}); });
it("should wait for index readiness", async () => {
// Create an index and then wait for it to be ready
await tbl.createIndex("vec");
const indices = await tbl.listIndices();
expect(indices.length).toBeGreaterThan(0);
const idxName = indices[0].name;
await expect(tbl.waitForIndex([idxName], 5)).resolves.toBeUndefined();
});
it("should search with distance range", async () => { it("should search with distance range", async () => {
await tbl.createIndex("vec"); await tbl.createIndex("vec");
@@ -824,6 +833,7 @@ describe("When creating an index", () => {
// Only build index over v1 // Only build index over v1
await tbl.createIndex("vec", { await tbl.createIndex("vec", {
config: Index.ivfPq({ numPartitions: 2, numSubVectors: 2 }), config: Index.ivfPq({ numPartitions: 2, numSubVectors: 2 }),
waitTimeoutSeconds: 30,
}); });
const rst = await tbl const rst = await tbl

View File

@@ -681,4 +681,6 @@ export interface IndexOptions {
* The default is true * The default is true
*/ */
replace?: boolean; replace?: boolean;
waitTimeoutSeconds?: number;
} }

View File

@@ -246,6 +246,19 @@ export abstract class Table {
*/ */
abstract prewarmIndex(name: string): Promise<void>; abstract prewarmIndex(name: string): Promise<void>;
/**
* Waits for asynchronous indexing to complete on the table.
*
* @param indexNames The name of the indices to wait for
* @param timeoutSeconds The number of seconds to wait before timing out
*
* This will raise an error if the indices are not created and fully indexed within the timeout.
*/
abstract waitForIndex(
indexNames: string[],
timeoutSeconds: number,
): Promise<void>;
/** /**
* Create a {@link Query} Builder. * Create a {@link Query} Builder.
* *
@@ -569,7 +582,12 @@ export class LocalTable extends Table {
// Bit of a hack to get around the fact that TS has no package-scope. // Bit of a hack to get around the fact that TS has no package-scope.
// biome-ignore lint/suspicious/noExplicitAny: skip // biome-ignore lint/suspicious/noExplicitAny: skip
const nativeIndex = (options?.config as any)?.inner; const nativeIndex = (options?.config as any)?.inner;
await this.inner.createIndex(nativeIndex, column, options?.replace); await this.inner.createIndex(
nativeIndex,
column,
options?.replace,
options?.waitTimeoutSeconds,
);
} }
async dropIndex(name: string): Promise<void> { async dropIndex(name: string): Promise<void> {
@@ -580,6 +598,13 @@ export class LocalTable extends Table {
await this.inner.prewarmIndex(name); await this.inner.prewarmIndex(name);
} }
async waitForIndex(
indexNames: string[],
timeoutSeconds: number,
): Promise<void> {
await this.inner.waitForIndex(indexNames, timeoutSeconds);
}
query(): Query { query(): Query {
return new Query(this.inner); return new Query(this.inner);
} }

View File

@@ -111,6 +111,7 @@ impl Table {
index: Option<&Index>, index: Option<&Index>,
column: String, column: String,
replace: Option<bool>, replace: Option<bool>,
wait_timeout_s: Option<i64>,
) -> napi::Result<()> { ) -> napi::Result<()> {
let lancedb_index = if let Some(index) = index { let lancedb_index = if let Some(index) = index {
index.consume()? index.consume()?
@@ -121,6 +122,10 @@ impl Table {
if let Some(replace) = replace { if let Some(replace) = replace {
builder = builder.replace(replace); builder = builder.replace(replace);
} }
if let Some(timeout) = wait_timeout_s {
builder =
builder.wait_timeout(std::time::Duration::from_secs(timeout.try_into().unwrap()));
}
builder.execute().await.default_error() builder.execute().await.default_error()
} }
@@ -140,6 +145,18 @@ impl Table {
.default_error() .default_error()
} }
#[napi(catch_unwind)]
pub async fn wait_for_index(&self, index_names: Vec<String>, timeout_s: i64) -> Result<()> {
let timeout = std::time::Duration::from_secs(timeout_s.try_into().unwrap());
let index_names: Vec<&str> = index_names.iter().map(|s| s.as_str()).collect();
let slice: &[&str] = &index_names;
self.inner_ref()?
.wait_for_index(slice, timeout)
.await
.default_error()
}
#[napi(catch_unwind)] #[napi(catch_unwind)]
pub async fn update( pub async fn update(
&self, &self,

View File

@@ -104,6 +104,7 @@ class RemoteTable(Table):
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST", "scalar"] = "scalar", index_type: Literal["BTREE", "BITMAP", "LABEL_LIST", "scalar"] = "scalar",
*, *,
replace: bool = False, replace: bool = False,
wait_timeout: timedelta = None,
): ):
"""Creates a scalar index """Creates a scalar index
Parameters Parameters
@@ -126,13 +127,18 @@ class RemoteTable(Table):
else: else:
raise ValueError(f"Unknown index type: {index_type}") raise ValueError(f"Unknown index type: {index_type}")
LOOP.run(self._table.create_index(column, config=config, replace=replace)) LOOP.run(
self._table.create_index(
column, config=config, replace=replace, wait_timeout=wait_timeout
)
)
def create_fts_index( def create_fts_index(
self, self,
column: str, column: str,
*, *,
replace: bool = False, replace: bool = False,
wait_timeout: timedelta = None,
with_position: bool = True, with_position: bool = True,
# tokenizer configs: # tokenizer configs:
base_tokenizer: str = "simple", base_tokenizer: str = "simple",
@@ -153,7 +159,11 @@ class RemoteTable(Table):
remove_stop_words=remove_stop_words, remove_stop_words=remove_stop_words,
ascii_folding=ascii_folding, ascii_folding=ascii_folding,
) )
LOOP.run(self._table.create_index(column, config=config, replace=replace)) LOOP.run(
self._table.create_index(
column, config=config, replace=replace, wait_timeout=wait_timeout
)
)
def create_index( def create_index(
self, self,
@@ -165,6 +175,7 @@ class RemoteTable(Table):
replace: Optional[bool] = None, replace: Optional[bool] = None,
accelerator: Optional[str] = None, accelerator: Optional[str] = None,
index_type="vector", index_type="vector",
wait_timeout: Optional[timedelta] = None,
): ):
"""Create an index on the table. """Create an index on the table.
Currently, the only parameters that matter are Currently, the only parameters that matter are
@@ -236,7 +247,11 @@ class RemoteTable(Table):
" 'IVF_FLAT', 'IVF_PQ', 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'" " 'IVF_FLAT', 'IVF_PQ', 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'"
) )
LOOP.run(self._table.create_index(vector_column_name, config=config)) LOOP.run(
self._table.create_index(
vector_column_name, config=config, wait_timeout=wait_timeout
)
)
def add( def add(
self, self,
@@ -554,6 +569,11 @@ class RemoteTable(Table):
def drop_index(self, index_name: str): def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name)) return LOOP.run(self._table.drop_index(index_name))
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
):
return LOOP.run(self._table.wait_for_index(index_names, timeout))
def uses_v2_manifest_paths(self) -> bool: def uses_v2_manifest_paths(self) -> bool:
raise NotImplementedError( raise NotImplementedError(
"uses_v2_manifest_paths() is not supported on the LanceDB Cloud" "uses_v2_manifest_paths() is not supported on the LanceDB Cloud"

View File

@@ -631,6 +631,7 @@ class Table(ABC):
index_cache_size: Optional[int] = None, index_cache_size: Optional[int] = None,
*, *,
index_type: VectorIndexType = "IVF_PQ", index_type: VectorIndexType = "IVF_PQ",
wait_timeout: Optional[timedelta] = None,
num_bits: int = 8, num_bits: int = 8,
max_iterations: int = 50, max_iterations: int = 50,
sample_rate: int = 256, sample_rate: int = 256,
@@ -666,6 +667,8 @@ class Table(ABC):
num_bits: int num_bits: int
The number of bits to encode sub-vectors. Only used with the IVF_PQ index. The number of bits to encode sub-vectors. Only used with the IVF_PQ index.
Only 4 and 8 are supported. Only 4 and 8 are supported.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
""" """
raise NotImplementedError raise NotImplementedError
@@ -689,6 +692,23 @@ class Table(ABC):
""" """
raise NotImplementedError raise NotImplementedError
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
"""
Wait for indexing to complete for the given index names.
This will poll the table until all the indices are fully indexed,
or raise a timeout exception if the timeout is reached.
Parameters
----------
index_names: str
The name of the indices to poll
timeout: timedelta
Timeout to wait for asynchronous indexing. The default is 5 minutes.
"""
raise NotImplementedError
@abstractmethod @abstractmethod
def create_scalar_index( def create_scalar_index(
self, self,
@@ -696,6 +716,7 @@ class Table(ABC):
*, *,
replace: bool = True, replace: bool = True,
index_type: ScalarIndexType = "BTREE", index_type: ScalarIndexType = "BTREE",
wait_timeout: Optional[timedelta] = None,
): ):
"""Create a scalar index on a column. """Create a scalar index on a column.
@@ -708,7 +729,8 @@ class Table(ABC):
Replace the existing index if it exists. Replace the existing index if it exists.
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"], default "BTREE" index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"], default "BTREE"
The type of index to create. The type of index to create.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
Examples Examples
-------- --------
@@ -767,6 +789,7 @@ class Table(ABC):
stem: bool = False, stem: bool = False,
remove_stop_words: bool = False, remove_stop_words: bool = False,
ascii_folding: bool = False, ascii_folding: bool = False,
wait_timeout: Optional[timedelta] = None,
): ):
"""Create a full-text search index on the table. """Create a full-text search index on the table.
@@ -822,6 +845,8 @@ class Table(ABC):
ascii_folding : bool, default False ascii_folding : bool, default False
Whether to fold ASCII characters. This converts accented characters to Whether to fold ASCII characters. This converts accented characters to
their ASCII equivalent. For example, "café" would be converted to "cafe". their ASCII equivalent. For example, "café" would be converted to "cafe".
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
""" """
raise NotImplementedError raise NotImplementedError
@@ -1771,6 +1796,11 @@ class LanceTable(Table):
""" """
return LOOP.run(self._table.prewarm_index(name)) return LOOP.run(self._table.prewarm_index(name))
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
return LOOP.run(self._table.wait_for_index(index_names, timeout))
def create_scalar_index( def create_scalar_index(
self, self,
column: str, column: str,
@@ -2964,6 +2994,7 @@ class AsyncTable:
config: Optional[ config: Optional[
Union[IvfFlat, IvfPq, HnswPq, HnswSq, BTree, Bitmap, LabelList, FTS] Union[IvfFlat, IvfPq, HnswPq, HnswSq, BTree, Bitmap, LabelList, FTS]
] = None, ] = None,
wait_timeout: Optional[timedelta] = None,
): ):
"""Create an index to speed up queries """Create an index to speed up queries
@@ -2988,6 +3019,8 @@ class AsyncTable:
For advanced configuration you can specify the type of index you would For advanced configuration you can specify the type of index you would
like to create. You can also specify index-specific parameters when like to create. You can also specify index-specific parameters when
creating an index object. creating an index object.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
""" """
if config is not None: if config is not None:
if not isinstance( if not isinstance(
@@ -2998,7 +3031,9 @@ class AsyncTable:
" Bitmap, LabelList, or FTS" " Bitmap, LabelList, or FTS"
) )
try: try:
await self._inner.create_index(column, index=config, replace=replace) await self._inner.create_index(
column, index=config, replace=replace, wait_timeout=wait_timeout
)
except ValueError as e: except ValueError as e:
if "not support the requested language" in str(e): if "not support the requested language" in str(e):
supported_langs = ", ".join(lang_mapping.values()) supported_langs = ", ".join(lang_mapping.values())
@@ -3043,6 +3078,23 @@ class AsyncTable:
""" """
await self._inner.prewarm_index(name) await self._inner.prewarm_index(name)
async def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
"""
Wait for indexing to complete for the given index names.
This will poll the table until all the indices are fully indexed,
or raise a timeout exception if the timeout is reached.
Parameters
----------
index_names: str
The name of the indices to poll
timeout: timedelta
Timeout to wait for asynchronous indexing. The default is 5 minutes.
"""
await self._inner.wait_for_index(index_names, timeout)
async def add( async def add(
self, self,
data: DATA, data: DATA,

View File

@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors # SPDX-FileCopyrightText: Copyright The LanceDB Authors
import re
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import contextlib import contextlib
from datetime import timedelta from datetime import timedelta
@@ -235,6 +235,10 @@ def test_table_add_in_threadpool():
def test_table_create_indices(): def test_table_create_indices():
def handler(request): def handler(request):
index_stats = dict(
index_type="IVF_PQ", num_indexed_rows=1000, num_unindexed_rows=0
)
if request.path == "/v1/table/test/create_index/": if request.path == "/v1/table/test/create_index/":
request.send_response(200) request.send_response(200)
request.end_headers() request.end_headers()
@@ -258,6 +262,47 @@ def test_table_create_indices():
) )
) )
request.wfile.write(payload.encode()) request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/list/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
dict(
indexes=[
{
"index_name": "id_idx",
"columns": ["id"],
},
{
"index_name": "text_idx",
"columns": ["text"],
},
{
"index_name": "vector_idx",
"columns": ["vector"],
},
]
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/id_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/text_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/vector_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif "/drop/" in request.path: elif "/drop/" in request.path:
request.send_response(200) request.send_response(200)
request.end_headers() request.end_headers()
@@ -269,14 +314,81 @@ def test_table_create_indices():
# Parameters are well-tested through local and async tests. # Parameters are well-tested through local and async tests.
# This is a smoke-test. # This is a smoke-test.
table = db.create_table("test", [{"id": 1}]) table = db.create_table("test", [{"id": 1}])
table.create_scalar_index("id") table.create_scalar_index("id", wait_timeout=timedelta(seconds=2))
table.create_fts_index("text") table.create_fts_index("text", wait_timeout=timedelta(seconds=2))
table.create_scalar_index("vector") table.create_index(
vector_column_name="vector", wait_timeout=timedelta(seconds=10)
)
table.wait_for_index(["id_idx"], timedelta(seconds=2))
table.wait_for_index(["text_idx", "vector_idx"], timedelta(seconds=2))
table.drop_index("vector_idx") table.drop_index("vector_idx")
table.drop_index("id_idx") table.drop_index("id_idx")
table.drop_index("text_idx") table.drop_index("text_idx")
def test_table_wait_for_index_timeout():
def handler(request):
index_stats = dict(
index_type="BTREE", num_indexed_rows=1000, num_unindexed_rows=1
)
if request.path == "/v1/table/test/create/?mode=create":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
elif request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
dict(
version=1,
schema=dict(
fields=[
dict(name="id", type={"type": "int64"}, nullable=False),
]
),
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/list/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
dict(
indexes=[
{
"index_name": "id_idx",
"columns": ["id"],
},
]
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/id_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
print(f"{index_stats=}")
request.wfile.write(payload.encode())
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}])
with pytest.raises(
RuntimeError,
match=re.escape(
'Timeout error: timed out waiting for indices: ["id_idx"] after 1s'
),
):
table.wait_for_index(["id_idx"], timedelta(seconds=1))
@contextlib.contextmanager @contextlib.contextmanager
def query_test_table(query_handler, *, server_version=Version("0.1.0")): def query_test_table(query_handler, *, server_version=Version("0.1.0")):
def handler(request): def handler(request):

View File

@@ -177,15 +177,19 @@ impl Table {
}) })
} }
#[pyo3(signature = (column, index=None, replace=None))] #[pyo3(signature = (column, index=None, replace=None, wait_timeout=None))]
pub fn create_index<'a>( pub fn create_index<'a>(
self_: PyRef<'a, Self>, self_: PyRef<'a, Self>,
column: String, column: String,
index: Option<Bound<'_, PyAny>>, index: Option<Bound<'_, PyAny>>,
replace: Option<bool>, replace: Option<bool>,
wait_timeout: Option<Bound<'_, PyAny>>,
) -> PyResult<Bound<'a, PyAny>> { ) -> PyResult<Bound<'a, PyAny>> {
let index = extract_index_params(&index)?; let index = extract_index_params(&index)?;
let mut op = self_.inner_ref()?.create_index(&[column], index); let timeout = wait_timeout.map(|t| t.extract::<std::time::Duration>().unwrap());
let mut op = self_
.inner_ref()?
.create_index_with_timeout(&[column], index, timeout);
if let Some(replace) = replace { if let Some(replace) = replace {
op = op.replace(replace); op = op.replace(replace);
} }
@@ -204,6 +208,26 @@ impl Table {
}) })
} }
pub fn wait_for_index<'a>(
self_: PyRef<'a, Self>,
index_names: Vec<String>,
timeout: Bound<'_, PyAny>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
let timeout = timeout.extract::<std::time::Duration>()?;
future_into_py(self_.py(), async move {
let index_refs = index_names
.iter()
.map(String::as_str)
.collect::<Vec<&str>>();
inner
.wait_for_index(&index_refs, timeout)
.await
.infer_error()?;
Ok(())
})
}
pub fn prewarm_index(self_: PyRef<'_, Self>, index_name: String) -> PyResult<Bound<'_, PyAny>> { pub fn prewarm_index(self_: PyRef<'_, Self>, index_name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone(); let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move { future_into_py(self_.py(), async move {

View File

@@ -35,6 +35,8 @@ pub enum Error {
Schema { message: String }, Schema { message: String },
#[snafu(display("Runtime error: {message}"))] #[snafu(display("Runtime error: {message}"))]
Runtime { message: String }, Runtime { message: String },
#[snafu(display("Timeout error: {message}"))]
Timeout { message: String },
// 3rd party / external errors // 3rd party / external errors
#[snafu(display("object_store error: {source}"))] #[snafu(display("object_store error: {source}"))]

View File

@@ -1,11 +1,11 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors // SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use scalar::FtsIndexBuilder; use scalar::FtsIndexBuilder;
use serde::Deserialize; use serde::Deserialize;
use serde_with::skip_serializing_none; use serde_with::skip_serializing_none;
use std::sync::Arc;
use std::time::Duration;
use vector::IvfFlatIndexBuilder; use vector::IvfFlatIndexBuilder;
use crate::{table::BaseTable, DistanceType, Error, Result}; use crate::{table::BaseTable, DistanceType, Error, Result};
@@ -17,6 +17,7 @@ use self::{
pub mod scalar; pub mod scalar;
pub mod vector; pub mod vector;
pub mod waiter;
/// Supported index types. /// Supported index types.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -69,6 +70,7 @@ pub struct IndexBuilder {
pub(crate) index: Index, pub(crate) index: Index,
pub(crate) columns: Vec<String>, pub(crate) columns: Vec<String>,
pub(crate) replace: bool, pub(crate) replace: bool,
pub(crate) wait_timeout: Option<Duration>,
} }
impl IndexBuilder { impl IndexBuilder {
@@ -78,6 +80,7 @@ impl IndexBuilder {
index, index,
columns, columns,
replace: true, replace: true,
wait_timeout: None,
} }
} }
@@ -91,6 +94,15 @@ impl IndexBuilder {
self self
} }
/// Duration of time to wait for asynchronous indexing to complete. If not set,
/// `create_index()` will not wait.
///
/// This is not supported for `NativeTable` since indexing is synchronous.
pub fn wait_timeout(mut self, d: Duration) -> Self {
self.wait_timeout = Some(d);
self
}
pub async fn execute(self) -> Result<()> { pub async fn execute(self) -> Result<()> {
self.parent.clone().create_index(self).await self.parent.clone().create_index(self).await
} }

View File

@@ -0,0 +1,90 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::error::Result;
use crate::table::BaseTable;
use crate::Error;
use log::debug;
use std::time::{Duration, Instant};
use tokio::time::sleep;
const DEFAULT_SLEEP_MS: u64 = 1000;
const MAX_WAIT: Duration = Duration::from_secs(2 * 60 * 60);
/// Poll the table using list_indices() and index_stats() until all of the indices have 0 un-indexed rows.
/// Will return Error::Timeout if the columns are not fully indexed within the timeout.
pub async fn wait_for_index(
table: &dyn BaseTable,
index_names: &[&str],
timeout: Duration,
) -> Result<()> {
if timeout > MAX_WAIT {
return Err(Error::InvalidInput {
message: format!("timeout must be less than {:?}", MAX_WAIT).to_string(),
});
}
let start = Instant::now();
let mut remaining = index_names.to_vec();
// poll via list_indices() and index_stats() until all indices are created and fully indexed
while start.elapsed() < timeout {
let mut completed = vec![];
let indices = table.list_indices().await?;
for &idx in &remaining {
if !indices.iter().any(|i| i.name == *idx) {
debug!("still waiting for new index '{}'", idx);
continue;
}
let stats = table.index_stats(idx.as_ref()).await?;
match stats {
None => {
debug!("still waiting for new index '{}'", idx);
continue;
}
Some(s) => {
if s.num_unindexed_rows == 0 {
// note: this may never stabilize under constant writes.
// we should later replace this with a status/job model
completed.push(idx);
debug!(
"fully indexed '{}'. indexed rows: {}",
idx, s.num_indexed_rows
);
} else {
debug!(
"still waiting for index '{}'. unindexed rows: {}",
idx, s.num_unindexed_rows
);
}
}
}
}
remaining.retain(|idx| !completed.contains(idx));
if remaining.is_empty() {
return Ok(());
}
sleep(Duration::from_millis(DEFAULT_SLEEP_MS)).await;
}
// debug log index diagnostics
for &r in &remaining {
let stats = table.index_stats(r.as_ref()).await?;
match stats {
Some(s) => debug!(
"index '{}' not fully indexed after {:?}. stats: {:?}",
r, timeout, s
),
None => debug!("index '{}' not found after {:?}", r, timeout),
}
}
Err(Error::Timeout {
message: format!(
"timed out waiting for indices: {:?} after {:?}",
remaining, timeout
)
.to_string(),
})
}

View File

@@ -1,10 +1,6 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors // SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use crate::index::Index; use crate::index::Index;
use crate::index::IndexStatistics; use crate::index::IndexStatistics;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest}; use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
@@ -26,8 +22,17 @@ use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version}; use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec}; use lance_datafusion::exec::{execute_plan, OneShotExec};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
use crate::index::waiter::wait_for_index;
use crate::{ use crate::{
connection::NoData, connection::NoData,
error::Result, error::Result,
@@ -39,11 +44,6 @@ use crate::{
}, },
}; };
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
#[derive(Debug)] #[derive(Debug)]
@@ -800,9 +800,20 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
if let Some(wait_timeout) = index.wait_timeout {
let name = format!("{}_idx", column);
self.wait_for_index(&[&name], wait_timeout).await?;
}
Ok(()) Ok(())
} }
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
async fn wait_for_index(&self, index_names: &[&str], timeout: Duration) -> Result<()> {
wait_for_index(self, index_names, timeout).await
}
async fn merge_insert( async fn merge_insert(
&self, &self,
params: MergeInsertBuilder, params: MergeInsertBuilder,
@@ -984,6 +995,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = response.text().await.err_to_http(request_id.clone())?; let body = response.text().await.err_to_http(request_id.clone())?;
println!("body: {:?}", body);
let stats = serde_json::from_str(&body).map_err(|e| Error::Http { let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse index statistics: {}", e).into(), source: format!("Failed to parse index statistics: {}", e).into(),
request_id, request_id,
@@ -2416,4 +2428,88 @@ mod tests {
}); });
table.drop_index("my_index").await.unwrap(); table.drop_index("my_index").await.unwrap();
} }
#[tokio::test]
async fn test_wait_for_index() {
let table = _make_table_with_indices(0);
table
.wait_for_index(&["vector_idx", "my_idx"], Duration::from_secs(1))
.await
.unwrap();
}
#[tokio::test]
async fn test_wait_for_index_timeout() {
let table = _make_table_with_indices(100);
let e = table
.wait_for_index(&["vector_idx", "my_idx"], Duration::from_secs(1))
.await
.unwrap_err();
assert_eq!(
e.to_string(),
"Timeout error: timed out waiting for indices: [\"vector_idx\", \"my_idx\"] after 1s"
);
}
#[tokio::test]
async fn test_wait_for_index_timeout_never_created() {
let table = _make_table_with_indices(0);
let e = table
.wait_for_index(&["doesnt_exist_idx"], Duration::from_secs(1))
.await
.unwrap_err();
assert_eq!(
e.to_string(),
"Timeout error: timed out waiting for indices: [\"doesnt_exist_idx\"] after 1s"
);
}
fn _make_table_with_indices(unindexed_rows: usize) -> Table {
let table = Table::new_with_handler("my_table", move |request| {
assert_eq!(request.method(), "POST");
let response_body = match request.url().path() {
"/v1/table/my_table/index/list/" => {
serde_json::json!({
"indexes": [
{
"index_name": "vector_idx",
"index_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"columns": ["vector"],
"index_status": "done",
},
{
"index_name": "my_idx",
"index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6",
"columns": ["my_column"],
"index_status": "done",
},
]
})
}
"/v1/table/my_table/index/vector_idx/stats/" => {
serde_json::json!({
"num_indexed_rows": 100000,
"num_unindexed_rows": unindexed_rows,
"index_type": "IVF_PQ",
"distance_type": "l2"
})
}
"/v1/table/my_table/index/my_idx/stats/" => {
serde_json::json!({
"num_indexed_rows": 100000,
"num_unindexed_rows": unindexed_rows,
"index_type": "LABEL_LIST"
})
}
_path => {
serde_json::json!(None::<String>)
}
};
let body = serde_json::to_string(&response_body).unwrap();
let status = if body == "null" { 404 } else { 200 };
http::Response::builder().status(status).body(body).unwrap()
});
table
}
} }

View File

@@ -3,10 +3,6 @@
//! LanceDB Table APIs //! LanceDB Table APIs
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder}; use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder};
use arrow::datatypes::{Float32Type, UInt8Type}; use arrow::datatypes::{Float32Type, UInt8Type};
use arrow_array::{RecordBatchIterator, RecordBatchReader}; use arrow_array::{RecordBatchIterator, RecordBatchReader};
@@ -45,6 +41,10 @@ use lance_table::format::Manifest;
use lance_table::io::commit::ManifestNamingScheme; use lance_table::io::commit::ManifestNamingScheme;
use log::info; use log::info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::arrow::IntoArrow; use crate::arrow::IntoArrow;
use crate::connection::NoData; use crate::connection::NoData;
@@ -78,6 +78,7 @@ pub mod datafusion;
pub(crate) mod dataset; pub(crate) mod dataset;
pub mod merge; pub mod merge;
use crate::index::waiter::wait_for_index;
pub use chrono::Duration; pub use chrono::Duration;
pub use lance::dataset::optimize::CompactionOptions; pub use lance::dataset::optimize::CompactionOptions;
pub use lance::dataset::scanner::DatasetRecordBatchStream; pub use lance::dataset::scanner::DatasetRecordBatchStream;
@@ -491,6 +492,13 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn table_definition(&self) -> Result<TableDefinition>; async fn table_definition(&self) -> Result<TableDefinition>;
/// Get the table URI /// Get the table URI
fn dataset_uri(&self) -> &str; fn dataset_uri(&self) -> &str;
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
async fn wait_for_index(
&self,
index_names: &[&str],
timeout: std::time::Duration,
) -> Result<()>;
} }
/// A Table is a collection of strong typed Rows. /// A Table is a collection of strong typed Rows.
@@ -769,6 +777,28 @@ impl Table {
) )
} }
/// See [Table::create_index]
/// For remote tables, this allows an optional wait_timeout to poll until asynchronous indexing is complete
pub fn create_index_with_timeout(
&self,
columns: &[impl AsRef<str>],
index: Index,
wait_timeout: Option<std::time::Duration>,
) -> IndexBuilder {
let mut builder = IndexBuilder::new(
self.inner.clone(),
columns
.iter()
.map(|val| val.as_ref().to_string())
.collect::<Vec<_>>(),
index,
);
if let Some(timeout) = wait_timeout {
builder = builder.wait_timeout(timeout);
}
builder
}
/// Create a builder for a merge insert operation /// Create a builder for a merge insert operation
/// ///
/// This operation can add rows, update rows, and remove rows all in a single /// This operation can add rows, update rows, and remove rows all in a single
@@ -1104,6 +1134,16 @@ impl Table {
self.inner.prewarm_index(name).await self.inner.prewarm_index(name).await
} }
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
pub async fn wait_for_index(
&self,
index_names: &[&str],
timeout: std::time::Duration,
) -> Result<()> {
self.inner.wait_for_index(index_names, timeout).await
}
// Take many execution plans and map them into a single plan that adds // Take many execution plans and map them into a single plan that adds
// a query_index column and unions them. // a query_index column and unions them.
pub(crate) fn multi_vector_plan( pub(crate) fn multi_vector_plan(
@@ -2430,6 +2470,16 @@ impl BaseTable for NativeTable {
loss, loss,
})) }))
} }
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
async fn wait_for_index(
&self,
index_names: &[&str],
timeout: std::time::Duration,
) -> Result<()> {
wait_for_index(self, index_names, timeout).await
}
} }
#[cfg(test)] #[cfg(test)]
@@ -3213,7 +3263,10 @@ mod tests {
.execute() .execute()
.await .await
.unwrap(); .unwrap();
table
.wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
.await
.unwrap();
let index_configs = table.list_indices().await.unwrap(); let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1); assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap(); let index = index_configs.into_iter().next().unwrap();
@@ -3281,7 +3334,10 @@ mod tests {
.execute() .execute()
.await .await
.unwrap(); .unwrap();
table
.wait_for_index(&["i_idx"], Duration::from_millis(10))
.await
.unwrap();
let index_configs = table.list_indices().await.unwrap(); let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1); assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap(); let index = index_configs.into_iter().next().unwrap();