feat: provide timeout parameter for merge_insert (#2378)

Provides the ability to set a timeout for merge insert. The default
underlying timeout is however long the first attempt takes, or if there
are multiple attempts, 30 seconds. This has two use cases:

1. Make the timeout shorter, when you want to fail if it takes too long.
2. Allow taking more time to do retries.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added support for specifying a timeout when performing merge insert
operations in Python, Node.js, and Rust APIs.
- Introduced a new option to control the maximum allowed execution time
for merge inserts, including retry timeout handling.

- **Documentation**
- Updated and added documentation to describe the new timeout option and
its usage in APIs.

- **Tests**
- Added and updated tests to verify correct timeout behavior during
merge insert operations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Will Jones
2025-05-08 13:07:05 -07:00
committed by GitHub
parent 75c257ebb6
commit 272e4103b2
16 changed files with 179 additions and 33 deletions

26
Cargo.lock generated
View File

@@ -2738,7 +2738,7 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"rand 0.8.5",
]
@@ -3728,7 +3728,7 @@ dependencies = [
[[package]]
name = "lance"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow",
"arrow-arith",
@@ -3791,7 +3791,7 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -3809,7 +3809,7 @@ dependencies = [
[[package]]
name = "lance-core"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -3846,7 +3846,7 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow",
"arrow-array",
@@ -3876,7 +3876,7 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow",
"arrow-array",
@@ -3892,7 +3892,7 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrayref",
"arrow",
@@ -3932,7 +3932,7 @@ dependencies = [
[[package]]
name = "lance-file"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -3967,7 +3967,7 @@ dependencies = [
[[package]]
name = "lance-index"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow",
"arrow-array",
@@ -4021,7 +4021,7 @@ dependencies = [
[[package]]
name = "lance-io"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow",
"arrow-arith",
@@ -4060,7 +4060,7 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow-array",
"arrow-ord",
@@ -4084,7 +4084,7 @@ dependencies = [
[[package]]
name = "lance-table"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow",
"arrow-array",
@@ -4124,7 +4124,7 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "0.27.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185"
source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4"
dependencies = [
"arrow-array",
"arrow-schema",

View File

@@ -21,14 +21,14 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.27.0", "features" = ["dynamodb"], tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-io = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-index = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-linalg = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-table = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-testing = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-datafusion = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance-encoding = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
lance = { "version" = "=0.27.0", "features" = ["dynamodb"], tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-io = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-index = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-linalg = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-table = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-testing = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-datafusion = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
lance-encoding = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" }
# Note that this one does not include pyarrow
arrow = { version = "54.1", optional = false }
arrow-array = "54.1"

View File

@@ -33,7 +33,7 @@ Construct a MergeInsertBuilder. __Internal use only.__
### execute()
```ts
execute(data): Promise<MergeResult>
execute(data, execOptions?): Promise<MergeResult>
```
Executes the merge insert operation
@@ -42,6 +42,8 @@ Executes the merge insert operation
* **data**: [`Data`](../type-aliases/Data.md)
* **execOptions?**: `Partial`&lt;[`WriteExecutionOptions`](../interfaces/WriteExecutionOptions.md)&gt;
#### Returns
`Promise`&lt;[`MergeResult`](../interfaces/MergeResult.md)&gt;

View File

@@ -72,6 +72,7 @@
- [UpdateOptions](interfaces/UpdateOptions.md)
- [UpdateResult](interfaces/UpdateResult.md)
- [Version](interfaces/Version.md)
- [WriteExecutionOptions](interfaces/WriteExecutionOptions.md)
## Type Aliases

View File

@@ -0,0 +1,26 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / WriteExecutionOptions
# Interface: WriteExecutionOptions
## Properties
### timeoutMs?
```ts
optional timeoutMs: number;
```
Maximum time to run the operation before cancelling it.
By default, there is a 30-second timeout that is only enforced after the
first attempt. This is to prevent spending too long retrying to resolve
conflicts. For example, if a write attempt takes 20 seconds and fails,
the second attempt will be cancelled after 10 seconds, hitting the
30-second timeout. However, a write that takes one hour and succeeds on the
first attempt will not be cancelled.
When this is set, the timeout is enforced on all attempts, including the first.

View File

@@ -349,7 +349,7 @@ describe("merge insert", () => {
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute(newData);
.execute(newData, { timeoutMs: 10_000 });
expect(mergeInsertRes).toHaveProperty("version");
expect(mergeInsertRes.version).toBe(2);
expect(mergeInsertRes.numInsertedRows).toBe(1);
@@ -463,6 +463,20 @@ describe("merge insert", () => {
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("timeout", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
await expect(
table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute(newData, { timeoutMs: 0 }),
).rejects.toThrow("merge insert timed out");
});
});
describe("When creating an index", () => {

View File

@@ -86,7 +86,7 @@ export {
ColumnAlteration,
} from "./table";
export { MergeInsertBuilder } from "./merge";
export { MergeInsertBuilder, WriteExecutionOptions } from "./merge";
export * as embedding from "./embedding";
export * as rerankers from "./rerankers";

View File

@@ -75,7 +75,10 @@ export class MergeInsertBuilder {
*
* @returns {Promise<MergeResult>} the merge result
*/
async execute(data: Data): Promise<MergeResult> {
async execute(
data: Data,
execOptions?: Partial<WriteExecutionOptions>,
): Promise<MergeResult> {
let schema: Schema;
if (this.#schema instanceof Promise) {
schema = await this.#schema;
@@ -83,7 +86,28 @@ export class MergeInsertBuilder {
} else {
schema = this.#schema;
}
if (execOptions?.timeoutMs !== undefined) {
this.#native.setTimeout(execOptions.timeoutMs);
}
const buffer = await fromDataToBuffer(data, undefined, schema);
return await this.#native.execute(buffer);
}
}
export interface WriteExecutionOptions {
/**
* Maximum time to run the operation before cancelling it.
*
* By default, there is a 30-second timeout that is only enforced after the
* first attempt. This is to prevent spending too long retrying to resolve
* conflicts. For example, if a write attempt takes 20 seconds and fails,
* the second attempt will be cancelled after 10 seconds, hitting the
* 30-second timeout. However, a write that takes one hour and succeeds on the
* first attempt will not be cancelled.
*
* When this is set, the timeout is enforced on all attempts, including the first.
*/
timeoutMs?: number;
}

View File

@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::time::Duration;
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
use napi::bindgen_prelude::*;
use napi_derive::napi;
@@ -36,6 +38,11 @@ impl NativeMergeInsertBuilder {
this
}
#[napi]
pub fn set_timeout(&mut self, timeout: u32) {
self.inner.timeout(Duration::from_millis(timeout as u64));
}
#[napi(catch_unwind)]
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
let data = ipc_file_to_batches(buf.to_vec())

View File

@@ -4,6 +4,7 @@
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING, List, Optional
if TYPE_CHECKING:
@@ -31,6 +32,7 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_insert_all = False
self._when_not_matched_by_source_delete = False
self._when_not_matched_by_source_condition = None
self._timeout = None
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -81,6 +83,7 @@ class LanceMergeInsertBuilder(object):
new_data: DATA,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
timeout: Optional[timedelta] = None,
) -> MergeInsertResult:
"""
Executes the merge insert operation
@@ -98,10 +101,24 @@ class LanceMergeInsertBuilder(object):
One of "error", "drop", "fill".
fill_value: float, default 0.
The value to use when filling vectors. Only used if on_bad_vectors="fill".
timeout: Optional[timedelta], default None
Maximum time to run the operation before cancelling it.
By default, there is a 30-second timeout that is only enforced after the
first attempt. This is to prevent spending too long retrying to resolve
conflicts. For example, if a write attempt takes 20 seconds and fails,
the second attempt will be cancelled after 10 seconds, hitting the
30-second timeout. However, a write that takes one hour and succeeds on the
first attempt will not be cancelled.
When this is set, the timeout is enforced on all attempts, including
the first.
Returns
-------
MergeInsertResult
version: the new version number of the table after doing merge insert.
"""
if timeout is not None:
self._timeout = timeout
return self._table._do_merge(self, new_data, on_bad_vectors, fill_value)

View File

@@ -3716,6 +3716,7 @@ class AsyncTable:
when_not_matched_insert_all=merge._when_not_matched_insert_all,
when_not_matched_by_source_delete=merge._when_not_matched_by_source_delete,
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
),
)

View File

@@ -937,7 +937,7 @@ def test_merge_insert(mem_db: DBConnection):
table.merge_insert("a")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(new_data)
.execute(new_data, timeout=timedelta(seconds=10))
)
assert merge_insert_res.version == 2
assert merge_insert_res.num_inserted_rows == 1
@@ -1013,6 +1013,12 @@ def test_merge_insert(mem_db: DBConnection):
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
assert table.to_arrow().sort_by("a") == expected
# timeout
with pytest.raises(Exception, match="merge insert timed out"):
table.merge_insert("a").when_matched_update_all().execute(
new_data, timeout=timedelta(0)
)
# We vary the data format because there are slight differences in how
# subschemas are handled in different formats

View File

@@ -649,6 +649,9 @@ impl Table {
builder
.when_not_matched_by_source_delete(parameters.when_not_matched_by_source_condition);
}
if let Some(timeout) = parameters.timeout {
builder.timeout(timeout);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -807,6 +810,7 @@ pub struct MergeInsertParams {
when_not_matched_insert_all: bool,
when_not_matched_by_source_delete: bool,
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
}
#[pyclass]

View File

@@ -1068,13 +1068,22 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
) -> Result<MergeResult> {
self.check_mutable().await?;
let timeout = params.timeout;
let query = MergeInsertRequest::try_from(params)?;
let request = self
let mut request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.name))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
if let Some(timeout) = timeout {
// (If it doesn't fit into u64, it's not worth sending anyways.)
if let Ok(timeout_ms) = u64::try_from(timeout.as_millis()) {
request = request.header(REQUEST_TIMEOUT_HEADER, timeout_ms);
}
}
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
let response = self.check_table_response(&request_id, response).await?;

View File

@@ -14,7 +14,7 @@ use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::ExecutionPlan;
use futures::{StreamExt, TryStreamExt};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
@@ -80,7 +80,7 @@ pub mod merge;
use crate::index::waiter::wait_for_index;
pub use chrono::Duration;
use futures::future::join_all;
use futures::future::{join_all, Either};
pub use lance::dataset::optimize::CompactionOptions;
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
pub use lance::dataset::scanner::DatasetRecordBatchStream;
@@ -2014,7 +2014,7 @@ impl NativeTable {
/// more information.
pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
let dataset = self.dataset.get().await?;
Ok(dataset.manifest_naming_scheme == ManifestNamingScheme::V2)
Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
}
/// Migrate the table to use the new manifest path scheme.
@@ -2475,8 +2475,26 @@ impl BaseTable for NativeTable {
} else {
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
}
let job = builder.try_build()?;
let (new_dataset, stats) = job.execute_reader(new_data).await?;
let future = if let Some(timeout) = params.timeout {
// The default retry timeout is 30s, so we pass the full timeout down
// as well in case it is longer than that.
let future = builder
.retry_timeout(timeout)
.try_build()?
.execute_reader(new_data);
Either::Left(tokio::time::timeout(timeout, future).map(|res| match res {
Ok(Ok((new_dataset, stats))) => Ok((new_dataset, stats)),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(Error::Runtime {
message: "merge insert timed out".to_string(),
}),
}))
} else {
let job = builder.try_build()?;
Either::Right(job.execute_reader(new_data).map_err(|e| e.into()))
};
let (new_dataset, stats) = future.await?;
let version = new_dataset.manifest().version;
self.dataset.set_latest(new_dataset.as_ref().clone()).await;
Ok(MergeResult {

View File

@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use arrow_array::RecordBatchReader;
@@ -21,6 +21,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_insert_all: bool,
pub(crate) when_not_matched_by_source_delete: bool,
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
}
impl MergeInsertBuilder {
@@ -33,6 +34,7 @@ impl MergeInsertBuilder {
when_not_matched_insert_all: false,
when_not_matched_by_source_delete: false,
when_not_matched_by_source_delete_filt: None,
timeout: None,
}
}
@@ -84,6 +86,21 @@ impl MergeInsertBuilder {
self
}
/// Maximum time to run the operation before cancelling it.
///
/// By default, there is a 30-second timeout that is only enforced after the
/// first attempt. This is to prevent spending too long retrying to resolve
/// conflicts. For example, if a write attempt takes 20 seconds and fails,
/// the second attempt will be cancelled after 10 seconds, hitting the
/// 30-second timeout. However, a write that takes one hour and succeeds on the
/// first attempt will not be cancelled.
///
/// When this is set, the timeout is enforced on all attempts, including the first.
pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
self.timeout = Some(timeout);
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows