mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-26 16:30:41 +00:00
fix: move timeout to avoid retries (#2347)
I added a timeout to query execution options in https://github.com/lancedb/lancedb/pull/2288. However, this was send to the request timeout, but the retry implementation is unaware of this timeout. So once the query timed out, a retry would be triggered. Instead, this PR changes it so the timeout happens outside the retry loop. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved query timeout handling to provide clearer error messages and more reliable cancellation if a query takes too long to complete. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -339,8 +339,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
let mut request = self.client.post(&format!("/v1/table/{}/query/", self.name));
|
||||
|
||||
if let Some(timeout) = options.timeout {
|
||||
// Client side timeout
|
||||
request = request.timeout(timeout);
|
||||
// Also send to server, so it can abort the query if it takes too long.
|
||||
// (If it doesn't fit into u64, it's not worth sending anyways.)
|
||||
if let Ok(timeout_ms) = u64::try_from(timeout.as_millis()) {
|
||||
@@ -358,8 +356,26 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
let (request_id, response) = self.client.send(req, true).await?;
|
||||
self.read_arrow_stream(&request_id, response).await
|
||||
});
|
||||
let streams = futures::future::try_join_all(futures).await?;
|
||||
Ok(streams)
|
||||
let streams = futures::future::try_join_all(futures);
|
||||
|
||||
if let Some(timeout) = options.timeout {
|
||||
let timeout_future = tokio::time::sleep(timeout);
|
||||
tokio::pin!(timeout_future);
|
||||
tokio::pin!(streams);
|
||||
tokio::select! {
|
||||
_ = &mut timeout_future => {
|
||||
Err(Error::Other {
|
||||
message: format!("Query timeout after {} ms", timeout.as_millis()),
|
||||
source: None,
|
||||
})
|
||||
}
|
||||
result = &mut streams => {
|
||||
Ok(result?)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(streams.await?)
|
||||
}
|
||||
}
|
||||
|
||||
async fn prepare_query_bodies(&self, query: &AnyQuery) -> Result<Vec<serde_json::Value>> {
|
||||
|
||||
Reference in New Issue
Block a user