diff --git a/Cargo.lock b/Cargo.lock index 1120adb4..bfe15464 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2738,7 +2738,7 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] name = "fsst" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "rand 0.8.5", ] @@ -3728,7 +3728,7 @@ dependencies = [ [[package]] name = "lance" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow", "arrow-arith", @@ -3791,7 +3791,7 @@ dependencies = [ [[package]] name = "lance-arrow" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow-array", "arrow-buffer", @@ -3809,7 +3809,7 @@ dependencies = [ [[package]] name = "lance-core" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow-array", "arrow-buffer", @@ -3846,7 +3846,7 @@ dependencies = [ [[package]] name = "lance-datafusion" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow", "arrow-array", @@ -3876,7 +3876,7 @@ dependencies = [ [[package]] name = "lance-datagen" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow", "arrow-array", @@ -3892,7 +3892,7 @@ dependencies = [ [[package]] name = "lance-encoding" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrayref", "arrow", @@ -3932,7 +3932,7 @@ dependencies = [ [[package]] name = "lance-file" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow-arith", "arrow-array", @@ -3967,7 +3967,7 @@ dependencies = [ [[package]] name = "lance-index" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow", "arrow-array", @@ -4021,7 +4021,7 @@ dependencies = [ [[package]] name = "lance-io" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow", "arrow-arith", @@ -4060,7 +4060,7 @@ dependencies = [ [[package]] name = "lance-linalg" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow-array", "arrow-ord", @@ -4084,7 +4084,7 @@ dependencies = [ [[package]] name = "lance-table" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow", "arrow-array", @@ -4124,7 +4124,7 @@ dependencies = [ [[package]] name = "lance-testing" version = "0.27.0" -source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.2#cf903b470be1aaff2998830bd0358226f27f4185" +source = "git+https://github.com/lancedb/lance.git?tag=v0.27.0-beta.5#80a3f8796aee814c60cbdc94179b4e6231fa54e4" dependencies = [ "arrow-array", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index dcef837b..b230942b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,14 +21,14 @@ categories = ["database-implementations"] rust-version = "1.78.0" [workspace.dependencies] -lance = { "version" = "=0.27.0", "features" = ["dynamodb"], tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-io = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-index = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-linalg = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-table = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-testing = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-datafusion = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } -lance-encoding = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" } +lance = { "version" = "=0.27.0", "features" = ["dynamodb"], tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-io = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-index = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-linalg = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-table = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-testing = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-datafusion = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } +lance-encoding = { version = "=0.27.0", tag = "v0.27.0-beta.5", git="https://github.com/lancedb/lance.git" } # Note that this one does not include pyarrow arrow = { version = "54.1", optional = false } arrow-array = "54.1" diff --git a/docs/src/js/classes/MergeInsertBuilder.md b/docs/src/js/classes/MergeInsertBuilder.md index 5d407d95..09664e62 100644 --- a/docs/src/js/classes/MergeInsertBuilder.md +++ b/docs/src/js/classes/MergeInsertBuilder.md @@ -33,7 +33,7 @@ Construct a MergeInsertBuilder. __Internal use only.__ ### execute() ```ts -execute(data): Promise +execute(data, execOptions?): Promise ``` Executes the merge insert operation @@ -42,6 +42,8 @@ Executes the merge insert operation * **data**: [`Data`](../type-aliases/Data.md) +* **execOptions?**: `Partial`<[`WriteExecutionOptions`](../interfaces/WriteExecutionOptions.md)> + #### Returns `Promise`<[`MergeResult`](../interfaces/MergeResult.md)> diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index c9779d79..e555e2ab 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -72,6 +72,7 @@ - [UpdateOptions](interfaces/UpdateOptions.md) - [UpdateResult](interfaces/UpdateResult.md) - [Version](interfaces/Version.md) +- [WriteExecutionOptions](interfaces/WriteExecutionOptions.md) ## Type Aliases diff --git a/docs/src/js/interfaces/WriteExecutionOptions.md b/docs/src/js/interfaces/WriteExecutionOptions.md new file mode 100644 index 00000000..0ca796b6 --- /dev/null +++ b/docs/src/js/interfaces/WriteExecutionOptions.md @@ -0,0 +1,26 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / WriteExecutionOptions + +# Interface: WriteExecutionOptions + +## Properties + +### timeoutMs? + +```ts +optional timeoutMs: number; +``` + +Maximum time to run the operation before cancelling it. + +By default, there is a 30-second timeout that is only enforced after the +first attempt. This is to prevent spending too long retrying to resolve +conflicts. For example, if a write attempt takes 20 seconds and fails, +the second attempt will be cancelled after 10 seconds, hitting the +30-second timeout. However, a write that takes one hour and succeeds on the +first attempt will not be cancelled. + +When this is set, the timeout is enforced on all attempts, including the first. diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 4b23a82e..390b25fb 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -349,7 +349,7 @@ describe("merge insert", () => { .mergeInsert("a") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() - .execute(newData); + .execute(newData, { timeoutMs: 10_000 }); expect(mergeInsertRes).toHaveProperty("version"); expect(mergeInsertRes.version).toBe(2); expect(mergeInsertRes.numInsertedRows).toBe(1); @@ -463,6 +463,20 @@ describe("merge insert", () => { res = res.sort((a, b) => a.a - b.a); expect(res).toEqual(expected); }); + + test("timeout", async () => { + const newData = [ + { a: 2, b: "x" }, + { a: 4, b: "z" }, + ]; + await expect( + table + .mergeInsert("a") + .whenMatchedUpdateAll() + .whenNotMatchedInsertAll() + .execute(newData, { timeoutMs: 0 }), + ).rejects.toThrow("merge insert timed out"); + }); }); describe("When creating an index", () => { diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index 6f548cb5..9565191f 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -86,7 +86,7 @@ export { ColumnAlteration, } from "./table"; -export { MergeInsertBuilder } from "./merge"; +export { MergeInsertBuilder, WriteExecutionOptions } from "./merge"; export * as embedding from "./embedding"; export * as rerankers from "./rerankers"; diff --git a/nodejs/lancedb/merge.ts b/nodejs/lancedb/merge.ts index 781ca177..d4823130 100644 --- a/nodejs/lancedb/merge.ts +++ b/nodejs/lancedb/merge.ts @@ -75,7 +75,10 @@ export class MergeInsertBuilder { * * @returns {Promise} the merge result */ - async execute(data: Data): Promise { + async execute( + data: Data, + execOptions?: Partial, + ): Promise { let schema: Schema; if (this.#schema instanceof Promise) { schema = await this.#schema; @@ -83,7 +86,28 @@ export class MergeInsertBuilder { } else { schema = this.#schema; } + + if (execOptions?.timeoutMs !== undefined) { + this.#native.setTimeout(execOptions.timeoutMs); + } + const buffer = await fromDataToBuffer(data, undefined, schema); return await this.#native.execute(buffer); } } + +export interface WriteExecutionOptions { + /** + * Maximum time to run the operation before cancelling it. + * + * By default, there is a 30-second timeout that is only enforced after the + * first attempt. This is to prevent spending too long retrying to resolve + * conflicts. For example, if a write attempt takes 20 seconds and fails, + * the second attempt will be cancelled after 10 seconds, hitting the + * 30-second timeout. However, a write that takes one hour and succeeds on the + * first attempt will not be cancelled. + * + * When this is set, the timeout is enforced on all attempts, including the first. + */ + timeoutMs?: number; +} diff --git a/nodejs/src/merge.rs b/nodejs/src/merge.rs index 38eb4883..5b2c93a4 100644 --- a/nodejs/src/merge.rs +++ b/nodejs/src/merge.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors +use std::time::Duration; + use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder}; use napi::bindgen_prelude::*; use napi_derive::napi; @@ -36,6 +38,11 @@ impl NativeMergeInsertBuilder { this } + #[napi] + pub fn set_timeout(&mut self, timeout: u32) { + self.inner.timeout(Duration::from_millis(timeout as u64)); + } + #[napi(catch_unwind)] pub async fn execute(&self, buf: Buffer) -> napi::Result { let data = ipc_file_to_batches(buf.to_vec()) diff --git a/python/python/lancedb/merge.py b/python/python/lancedb/merge.py index 419877bb..3cf56a9d 100644 --- a/python/python/lancedb/merge.py +++ b/python/python/lancedb/merge.py @@ -4,6 +4,7 @@ from __future__ import annotations +from datetime import timedelta from typing import TYPE_CHECKING, List, Optional if TYPE_CHECKING: @@ -31,6 +32,7 @@ class LanceMergeInsertBuilder(object): self._when_not_matched_insert_all = False self._when_not_matched_by_source_delete = False self._when_not_matched_by_source_condition = None + self._timeout = None def when_matched_update_all( self, *, where: Optional[str] = None @@ -81,6 +83,7 @@ class LanceMergeInsertBuilder(object): new_data: DATA, on_bad_vectors: str = "error", fill_value: float = 0.0, + timeout: Optional[timedelta] = None, ) -> MergeInsertResult: """ Executes the merge insert operation @@ -98,10 +101,24 @@ class LanceMergeInsertBuilder(object): One of "error", "drop", "fill". fill_value: float, default 0. The value to use when filling vectors. Only used if on_bad_vectors="fill". + timeout: Optional[timedelta], default None + Maximum time to run the operation before cancelling it. + + By default, there is a 30-second timeout that is only enforced after the + first attempt. This is to prevent spending too long retrying to resolve + conflicts. For example, if a write attempt takes 20 seconds and fails, + the second attempt will be cancelled after 10 seconds, hitting the + 30-second timeout. However, a write that takes one hour and succeeds on the + first attempt will not be cancelled. + + When this is set, the timeout is enforced on all attempts, including + the first. Returns ------- MergeInsertResult version: the new version number of the table after doing merge insert. """ + if timeout is not None: + self._timeout = timeout return self._table._do_merge(self, new_data, on_bad_vectors, fill_value) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 8fb84ff3..cb225f3b 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -3716,6 +3716,7 @@ class AsyncTable: when_not_matched_insert_all=merge._when_not_matched_insert_all, when_not_matched_by_source_delete=merge._when_not_matched_by_source_delete, when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition, + timeout=merge._timeout, ), ) diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index a8529cb3..60751e64 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -937,7 +937,7 @@ def test_merge_insert(mem_db: DBConnection): table.merge_insert("a") .when_matched_update_all() .when_not_matched_insert_all() - .execute(new_data) + .execute(new_data, timeout=timedelta(seconds=10)) ) assert merge_insert_res.version == 2 assert merge_insert_res.num_inserted_rows == 1 @@ -1013,6 +1013,12 @@ def test_merge_insert(mem_db: DBConnection): expected = pa.table({"a": [2, 4], "b": ["x", "z"]}) assert table.to_arrow().sort_by("a") == expected + # timeout + with pytest.raises(Exception, match="merge insert timed out"): + table.merge_insert("a").when_matched_update_all().execute( + new_data, timeout=timedelta(0) + ) + # We vary the data format because there are slight differences in how # subschemas are handled in different formats diff --git a/python/src/table.rs b/python/src/table.rs index e00ebd95..7c097759 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -649,6 +649,9 @@ impl Table { builder .when_not_matched_by_source_delete(parameters.when_not_matched_by_source_condition); } + if let Some(timeout) = parameters.timeout { + builder.timeout(timeout); + } future_into_py(self_.py(), async move { let res = builder.execute(Box::new(batches)).await.infer_error()?; @@ -807,6 +810,7 @@ pub struct MergeInsertParams { when_not_matched_insert_all: bool, when_not_matched_by_source_delete: bool, when_not_matched_by_source_condition: Option, + timeout: Option, } #[pyclass] diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index ba329aba..8987a75c 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -1068,13 +1068,22 @@ impl BaseTable for RemoteTable { ) -> Result { self.check_mutable().await?; + let timeout = params.timeout; + let query = MergeInsertRequest::try_from(params)?; - let request = self + let mut request = self .client .post(&format!("/v1/table/{}/merge_insert/", self.name)) .query(&query) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); + if let Some(timeout) = timeout { + // (If it doesn't fit into u64, it's not worth sending anyways.) + if let Ok(timeout_ms) = u64::try_from(timeout.as_millis()) { + request = request.header(REQUEST_TIMEOUT_HEADER, timeout_ms); + } + } + let (request_id, response) = self.send_streaming(request, new_data, true).await?; let response = self.check_table_response(&request_id, response).await?; diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index febe6762..f18fad6c 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -14,7 +14,7 @@ use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::ExecutionPlan; -use futures::{StreamExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use lance::dataset::builder::DatasetBuilder; use lance::dataset::cleanup::RemovalStats; use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions}; @@ -80,7 +80,7 @@ pub mod merge; use crate::index::waiter::wait_for_index; pub use chrono::Duration; -use futures::future::join_all; +use futures::future::{join_all, Either}; pub use lance::dataset::optimize::CompactionOptions; pub use lance::dataset::refs::{TagContents, Tags as LanceTags}; pub use lance::dataset::scanner::DatasetRecordBatchStream; @@ -2014,7 +2014,7 @@ impl NativeTable { /// more information. pub async fn uses_v2_manifest_paths(&self) -> Result { let dataset = self.dataset.get().await?; - Ok(dataset.manifest_naming_scheme == ManifestNamingScheme::V2) + Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2) } /// Migrate the table to use the new manifest path scheme. @@ -2475,8 +2475,26 @@ impl BaseTable for NativeTable { } else { builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep); } - let job = builder.try_build()?; - let (new_dataset, stats) = job.execute_reader(new_data).await?; + + let future = if let Some(timeout) = params.timeout { + // The default retry timeout is 30s, so we pass the full timeout down + // as well in case it is longer than that. + let future = builder + .retry_timeout(timeout) + .try_build()? + .execute_reader(new_data); + Either::Left(tokio::time::timeout(timeout, future).map(|res| match res { + Ok(Ok((new_dataset, stats))) => Ok((new_dataset, stats)), + Ok(Err(e)) => Err(e.into()), + Err(_) => Err(Error::Runtime { + message: "merge insert timed out".to_string(), + }), + })) + } else { + let job = builder.try_build()?; + Either::Right(job.execute_reader(new_data).map_err(|e| e.into())) + }; + let (new_dataset, stats) = future.await?; let version = new_dataset.manifest().version; self.dataset.set_latest(new_dataset.as_ref().clone()).await; Ok(MergeResult { diff --git a/rust/lancedb/src/table/merge.rs b/rust/lancedb/src/table/merge.rs index 130800fc..376bd3da 100644 --- a/rust/lancedb/src/table/merge.rs +++ b/rust/lancedb/src/table/merge.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use arrow_array::RecordBatchReader; @@ -21,6 +21,7 @@ pub struct MergeInsertBuilder { pub(crate) when_not_matched_insert_all: bool, pub(crate) when_not_matched_by_source_delete: bool, pub(crate) when_not_matched_by_source_delete_filt: Option, + pub(crate) timeout: Option, } impl MergeInsertBuilder { @@ -33,6 +34,7 @@ impl MergeInsertBuilder { when_not_matched_insert_all: false, when_not_matched_by_source_delete: false, when_not_matched_by_source_delete_filt: None, + timeout: None, } } @@ -84,6 +86,21 @@ impl MergeInsertBuilder { self } + /// Maximum time to run the operation before cancelling it. + /// + /// By default, there is a 30-second timeout that is only enforced after the + /// first attempt. This is to prevent spending too long retrying to resolve + /// conflicts. For example, if a write attempt takes 20 seconds and fails, + /// the second attempt will be cancelled after 10 seconds, hitting the + /// 30-second timeout. However, a write that takes one hour and succeeds on the + /// first attempt will not be cancelled. + /// + /// When this is set, the timeout is enforced on all attempts, including the first. + pub fn timeout(&mut self, timeout: Duration) -> &mut Self { + self.timeout = Some(timeout); + self + } + /// Executes the merge insert operation /// /// Returns version and statistics about the merge operation including the number of rows