From e9d783cccf6b41596b4537ac102b9e757a462da8 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 10 Apr 2026 03:18:56 +0800 Subject: [PATCH] feat: execution timeout for prepared statement (#7932) * feat: execution timeout for prepared statement * fix: lint fix --- src/frontend/src/instance.rs | 58 +++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ce589bb677..99444bb2a2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -483,6 +483,27 @@ fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option, + query_ctx: &QueryContextRef, +) -> Option { + match stmt { + Some(s) => derive_timeout(s, query_ctx), + None => { + let query_timeout = query_ctx.query_timeout()?; + if query_timeout.is_zero() { + return None; + } + match query_ctx.channel() { + Channel::Postgres => Some(query_timeout), + _ => None, + } + } + } +} + fn attach_timeout(output: Output, mut timeout: Duration) -> Result { if timeout.is_zero() { return StatementTimeoutSnafu.fail(); @@ -588,6 +609,33 @@ impl Instance { } } + async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { + self.query_engine + .execute(plan, query_ctx) + .await + .context(ExecLogicalPlanSnafu) + } + + async fn exec_plan_with_timeout( + &self, + stmt: Option, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { + let timeout = derive_timeout_for_plan(stmt.as_ref(), &query_ctx); + match timeout { + Some(timeout) => { + let start = tokio::time::Instant::now(); + let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx)) + .await + .map_err(|_| StatementTimeoutSnafu.build())??; + let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default(); + attach_timeout(output, remaining_timeout) + } + None => self.exec_plan(plan, query_ctx).await, + } + } + async fn do_exec_plan_inner( &self, stmt: Option, @@ -624,7 +672,7 @@ impl Instance { slow_query_timer, ); - let query_fut = self.query_engine.execute(plan.clone(), query_ctx); + let query_fut = self.exec_plan_with_timeout(Some(stmt), plan, query_ctx); CancellableFuture::new(query_fut, ticket.cancellation_handle.clone()) .await @@ -640,14 +688,8 @@ impl Instance { }; Output { data, meta } }) - .context(ExecLogicalPlanSnafu) } else { - // plan should be prepared before exec - // we'll do check there - self.query_engine - .execute(plan.clone(), query_ctx) - .await - .context(ExecLogicalPlanSnafu) + self.exec_plan_with_timeout(stmt, plan, query_ctx).await } }