From 44670076c1522223ae1b11cc1de7504ed90ea31d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 21 Apr 2025 14:27:04 -0700 Subject: [PATCH] fix: move timeout to avoid retries (#2347) I added a timeout to query execution options in https://github.com/lancedb/lancedb/pull/2288. However, this was send to the request timeout, but the retry implementation is unaware of this timeout. So once the query timed out, a retry would be triggered. Instead, this PR changes it so the timeout happens outside the retry loop. ## Summary by CodeRabbit - **Bug Fixes** - Improved query timeout handling to provide clearer error messages and more reliable cancellation if a query takes too long to complete. --- rust/lancedb/src/remote/table.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index a65fc1c17..4a01d2966 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -339,8 +339,6 @@ impl RemoteTable { let mut request = self.client.post(&format!("/v1/table/{}/query/", self.name)); if let Some(timeout) = options.timeout { - // Client side timeout - request = request.timeout(timeout); // Also send to server, so it can abort the query if it takes too long. // (If it doesn't fit into u64, it's not worth sending anyways.) if let Ok(timeout_ms) = u64::try_from(timeout.as_millis()) { @@ -358,8 +356,26 @@ impl RemoteTable { let (request_id, response) = self.client.send(req, true).await?; self.read_arrow_stream(&request_id, response).await }); - let streams = futures::future::try_join_all(futures).await?; - Ok(streams) + let streams = futures::future::try_join_all(futures); + + if let Some(timeout) = options.timeout { + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + tokio::pin!(streams); + tokio::select! { + _ = &mut timeout_future => { + Err(Error::Other { + message: format!("Query timeout after {} ms", timeout.as_millis()), + source: None, + }) + } + result = &mut streams => { + Ok(result?) + } + } + } else { + Ok(streams.await?) + } } async fn prepare_query_bodies(&self, query: &AnyQuery) -> Result> {