From ff2d89dc86c988ed052ae45219faa16b0ae55f49 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 1 Jul 2026 01:56:52 +0800 Subject: [PATCH] feat(query): pass runtime builder to provider Signed-off-by: discord9 --- src/query/src/query_engine/runtime.rs | 39 ++++++++++++++++++--------- src/query/src/query_engine/state.rs | 23 +++++++--------- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/src/query/src/query_engine/runtime.rs b/src/query/src/query_engine/runtime.rs index a8dece0be1..6f1ff5ccc2 100644 --- a/src/query/src/query_engine/runtime.rs +++ b/src/query/src/query_engine/runtime.rs @@ -24,6 +24,7 @@ use crate::query_engine::state::MetricsMemoryPool; pub type QueryRuntimeProviderRef = Arc; /// Context for building query runtime components. +#[derive(Clone, Copy)] #[non_exhaustive] pub struct QueryRuntimeContext<'a> { /// Query options used by the query engine. @@ -32,6 +33,16 @@ pub struct QueryRuntimeContext<'a> { pub resolved_memory_pool_size: usize, } +impl<'a> QueryRuntimeContext<'a> { + /// Creates a new query runtime context. + pub fn new(query_options: &'a QueryOptions, resolved_memory_pool_size: usize) -> Self { + Self { + query_options, + resolved_memory_pool_size, + } + } +} + /// Provides DataFusion session and runtime setup for the query engine. pub trait QueryRuntimeProvider: Send + Sync + 'static { /// Configures the DataFusion session config before building the session state. @@ -39,26 +50,30 @@ pub trait QueryRuntimeProvider: Send + Sync + 'static { } /// Builds the DataFusion runtime environment. - fn build_runtime_env(&self, ctx: QueryRuntimeContext<'_>) -> Arc; + fn build_runtime_env( + &self, + _ctx: QueryRuntimeContext<'_>, + builder: RuntimeEnvBuilder, + ) -> Arc { + Arc::new(builder.build().expect("Failed to build RuntimeEnv")) + } } /// Default query runtime provider. #[derive(Debug, Default)] pub struct DefaultQueryRuntimeProvider; -impl QueryRuntimeProvider for DefaultQueryRuntimeProvider { - fn build_runtime_env(&self, ctx: QueryRuntimeContext<'_>) -> Arc { +impl DefaultQueryRuntimeProvider { + /// Creates a default DataFusion runtime environment builder. + pub fn runtime_env_builder(ctx: QueryRuntimeContext<'_>) -> RuntimeEnvBuilder { if ctx.resolved_memory_pool_size > 0 { - Arc::new( - RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(MetricsMemoryPool::new( - ctx.resolved_memory_pool_size, - ))) - .build() - .expect("Failed to build RuntimeEnv"), - ) + RuntimeEnvBuilder::new().with_memory_pool(Arc::new(MetricsMemoryPool::new( + ctx.resolved_memory_pool_size, + ))) } else { - Arc::new(RuntimeEnv::default()) + RuntimeEnvBuilder::new() } } } + +impl QueryRuntimeProvider for DefaultQueryRuntimeProvider {} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index e4612baf6c..2e9391ddd3 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -144,17 +144,10 @@ impl QueryEngineState { .execution .skip_physical_aggregate_schema_check = true; - runtime_provider.configure_session_config( - QueryRuntimeContext { - query_options: &options, - resolved_memory_pool_size: memory_pool_size, - }, - &mut session_config, - ); - let runtime_env = runtime_provider.build_runtime_env(QueryRuntimeContext { - query_options: &options, - resolved_memory_pool_size: memory_pool_size, - }); + let runtime_context = QueryRuntimeContext::new(&options, memory_pool_size); + runtime_provider.configure_session_config(runtime_context, &mut session_config); + let runtime_builder = DefaultQueryRuntimeProvider::runtime_env_builder(runtime_context); + let runtime_env = runtime_provider.build_runtime_env(runtime_context, runtime_builder); // Apply extension rules let mut extension_rules = Vec::new(); @@ -670,11 +663,15 @@ mod tests { *config = config.clone().with_target_partitions(7); } - fn build_runtime_env(&self, ctx: QueryRuntimeContext<'_>) -> Arc { + fn build_runtime_env( + &self, + ctx: QueryRuntimeContext<'_>, + builder: RuntimeEnvBuilder, + ) -> Arc { assert_eq!(ctx.resolved_memory_pool_size, 1024); self.build_called.store(true, Ordering::SeqCst); Arc::new( - RuntimeEnvBuilder::new() + builder .with_memory_pool(Arc::new(GreedyMemoryPool::new(2048))) .build() .unwrap(),