feat: execution timeout for prepared statement (#7932)

* feat: execution timeout for prepared statement

* fix: lint fix
This commit is contained in:
Ning Sun
2026-04-10 03:18:56 +08:00
committed by GitHub
parent fb5333e116
commit e9d783cccf

View File

@@ -483,6 +483,27 @@ fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Durat
}
}
/// Derives timeout for plan execution. When statement is not available,
/// applies timeout for PostgreSQL only (can't determine readonly status without statement).
fn derive_timeout_for_plan(
stmt: Option<&Statement>,
query_ctx: &QueryContextRef,
) -> Option<Duration> {
match stmt {
Some(s) => derive_timeout(s, query_ctx),
None => {
let query_timeout = query_ctx.query_timeout()?;
if query_timeout.is_zero() {
return None;
}
match query_ctx.channel() {
Channel::Postgres => Some(query_timeout),
_ => None,
}
}
}
}
fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
if timeout.is_zero() {
return StatementTimeoutSnafu.fail();
@@ -588,6 +609,33 @@ impl Instance {
}
}
async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
self.query_engine
.execute(plan, query_ctx)
.await
.context(ExecLogicalPlanSnafu)
}
async fn exec_plan_with_timeout(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
let timeout = derive_timeout_for_plan(stmt.as_ref(), &query_ctx);
match timeout {
Some(timeout) => {
let start = tokio::time::Instant::now();
let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
.await
.map_err(|_| StatementTimeoutSnafu.build())??;
let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
attach_timeout(output, remaining_timeout)
}
None => self.exec_plan(plan, query_ctx).await,
}
}
async fn do_exec_plan_inner(
&self,
stmt: Option<Statement>,
@@ -624,7 +672,7 @@ impl Instance {
slow_query_timer,
);
let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
let query_fut = self.exec_plan_with_timeout(Some(stmt), plan, query_ctx);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
@@ -640,14 +688,8 @@ impl Instance {
};
Output { data, meta }
})
.context(ExecLogicalPlanSnafu)
} else {
// plan should be prepared before exec
// we'll do check there
self.query_engine
.execute(plan.clone(), query_ctx)
.await
.context(ExecLogicalPlanSnafu)
self.exec_plan_with_timeout(stmt, plan, query_ctx).await
}
}