feat(query): pass runtime builder to provider

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-07-01 01:56:52 +08:00
parent 451682ad13
commit ff2d89dc86
2 changed files with 37 additions and 25 deletions

View File

@@ -24,6 +24,7 @@ use crate::query_engine::state::MetricsMemoryPool;
pub type QueryRuntimeProviderRef = Arc<dyn QueryRuntimeProvider>;
/// Context for building query runtime components.
#[derive(Clone, Copy)]
#[non_exhaustive]
pub struct QueryRuntimeContext<'a> {
/// Query options used by the query engine.
@@ -32,6 +33,16 @@ pub struct QueryRuntimeContext<'a> {
pub resolved_memory_pool_size: usize,
}
impl<'a> QueryRuntimeContext<'a> {
/// Creates a new query runtime context.
pub fn new(query_options: &'a QueryOptions, resolved_memory_pool_size: usize) -> Self {
Self {
query_options,
resolved_memory_pool_size,
}
}
}
/// Provides DataFusion session and runtime setup for the query engine.
pub trait QueryRuntimeProvider: Send + Sync + 'static {
/// Configures the DataFusion session config before building the session state.
@@ -39,26 +50,30 @@ pub trait QueryRuntimeProvider: Send + Sync + 'static {
}
/// Builds the DataFusion runtime environment.
fn build_runtime_env(&self, ctx: QueryRuntimeContext<'_>) -> Arc<RuntimeEnv>;
fn build_runtime_env(
&self,
_ctx: QueryRuntimeContext<'_>,
builder: RuntimeEnvBuilder,
) -> Arc<RuntimeEnv> {
Arc::new(builder.build().expect("Failed to build RuntimeEnv"))
}
}
/// Default query runtime provider.
#[derive(Debug, Default)]
pub struct DefaultQueryRuntimeProvider;
impl QueryRuntimeProvider for DefaultQueryRuntimeProvider {
fn build_runtime_env(&self, ctx: QueryRuntimeContext<'_>) -> Arc<RuntimeEnv> {
impl DefaultQueryRuntimeProvider {
/// Creates a default DataFusion runtime environment builder.
pub fn runtime_env_builder(ctx: QueryRuntimeContext<'_>) -> RuntimeEnvBuilder {
if ctx.resolved_memory_pool_size > 0 {
Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(MetricsMemoryPool::new(
ctx.resolved_memory_pool_size,
)))
.build()
.expect("Failed to build RuntimeEnv"),
)
RuntimeEnvBuilder::new().with_memory_pool(Arc::new(MetricsMemoryPool::new(
ctx.resolved_memory_pool_size,
)))
} else {
Arc::new(RuntimeEnv::default())
RuntimeEnvBuilder::new()
}
}
}
impl QueryRuntimeProvider for DefaultQueryRuntimeProvider {}

View File

@@ -144,17 +144,10 @@ impl QueryEngineState {
.execution
.skip_physical_aggregate_schema_check = true;
runtime_provider.configure_session_config(
QueryRuntimeContext {
query_options: &options,
resolved_memory_pool_size: memory_pool_size,
},
&mut session_config,
);
let runtime_env = runtime_provider.build_runtime_env(QueryRuntimeContext {
query_options: &options,
resolved_memory_pool_size: memory_pool_size,
});
let runtime_context = QueryRuntimeContext::new(&options, memory_pool_size);
runtime_provider.configure_session_config(runtime_context, &mut session_config);
let runtime_builder = DefaultQueryRuntimeProvider::runtime_env_builder(runtime_context);
let runtime_env = runtime_provider.build_runtime_env(runtime_context, runtime_builder);
// Apply extension rules
let mut extension_rules = Vec::new();
@@ -670,11 +663,15 @@ mod tests {
*config = config.clone().with_target_partitions(7);
}
fn build_runtime_env(&self, ctx: QueryRuntimeContext<'_>) -> Arc<RuntimeEnv> {
fn build_runtime_env(
&self,
ctx: QueryRuntimeContext<'_>,
builder: RuntimeEnvBuilder,
) -> Arc<RuntimeEnv> {
assert_eq!(ctx.resolved_memory_pool_size, 1024);
self.build_called.store(true, Ordering::SeqCst);
Arc::new(
RuntimeEnvBuilder::new()
builder
.with_memory_pool(Arc::new(GreedyMemoryPool::new(2048)))
.build()
.unwrap(),