feat: add query engine options (#5895)

* feat: add query engine options

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update example

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-14 21:12:37 +08:00
committed by GitHub
parent c522893552
commit 747b71bf74
24 changed files with 195 additions and 14 deletions

View File

@@ -96,6 +96,8 @@
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
@@ -270,6 +272,8 @@
| `meta_client.metadata_cache_max_capacity` | Integer | `100000` | The configuration about the cache of the metadata. |
| `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. |
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
| `datanode.client.connect_timeout` | String | `10s` | -- |
@@ -429,6 +433,8 @@
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |

View File

@@ -243,6 +243,12 @@ overwrite_entry_start_id = false
# credential = "base64-credential"
# endpoint = "https://storage.googleapis.com"
## The query engine options.
[query]
## Parallelism of the query engine.
## Default to 0, which means the number of CPU cores.
parallelism = 0
## The data storage options.
[storage]
## The working home directory.

View File

@@ -179,6 +179,12 @@ metadata_cache_ttl = "10m"
# TTI of the metadata cache.
metadata_cache_tti = "5m"
## The query engine options.
[query]
## Parallelism of the query engine.
## Default to 0, which means the number of CPU cores.
parallelism = 0
## Datanode options.
[datanode]
## Datanode client options.

View File

@@ -334,6 +334,12 @@ max_running_procedures = 128
# credential = "base64-credential"
# endpoint = "https://storage.googleapis.com"
## The query engine options.
[query]
## Parallelism of the query engine.
## Default to 0, which means the number of CPU cores.
parallelism = 0
## The data storage options.
[storage]
## The working home directory.

View File

@@ -26,6 +26,7 @@ use file_engine::config::EngineConfig as FileEngineConfig;
use meta_client::MetaClientOptions;
use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
@@ -375,6 +376,7 @@ pub struct DatanodeOptions {
pub enable_telemetry: bool,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub query: QueryOptions,
/// Deprecated options, please use the new options instead.
#[deprecated(note = "Please use `grpc.addr` instead.")]
@@ -412,6 +414,7 @@ impl Default for DatanodeOptions {
enable_telemetry: true,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
query: QueryOptions::default(),
// Deprecated options
rpc_addr: None,

View File

@@ -359,6 +359,7 @@ impl DatanodeBuilder {
None,
false,
self.plugins.clone(),
opts.query.clone(),
);
let query_engine = query_engine_factory.query_engine();

View File

@@ -32,6 +32,7 @@ use datatypes::value::Value;
use greptime_proto::v1;
use itertools::{EitherOrBoth, Itertools};
use meta_client::MetaClientOptions;
use query::options::QueryOptions;
use query::QueryEngine;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
@@ -109,6 +110,7 @@ pub struct FlownodeOptions {
pub logging: LoggingOptions,
pub tracing: TracingOptions,
pub heartbeat: HeartbeatOptions,
pub query: QueryOptions,
}
impl Default for FlownodeOptions {
@@ -122,6 +124,7 @@ impl Default for FlownodeOptions {
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
heartbeat: HeartbeatOptions::default(),
query: QueryOptions::default(),
}
}
}

View File

@@ -332,6 +332,7 @@ impl FlownodeBuilder {
None,
false,
Default::default(),
self.opts.query.clone(),
);
let manager = Arc::new(
self.build_manager(query_engine_factory.query_engine())

View File

@@ -23,6 +23,7 @@ use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef};
use itertools::Itertools;
use prost::Message;
use query::options::QueryOptions;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
@@ -146,7 +147,15 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let factory = query::QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
);
let engine = factory.query_engine();
register_function_to_query_engine(&engine);

View File

@@ -171,6 +171,7 @@ mod test {
use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef};
use itertools::Itertools;
use prost::Message;
use query::options::QueryOptions;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
@@ -263,7 +264,15 @@ mod test {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let factory = query::QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
);
let engine = factory.query_engine();
register_function_to_query_engine(&engine);

View File

@@ -19,6 +19,7 @@ use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use meta_client::MetaClientOptions;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
@@ -58,6 +59,7 @@ pub struct FrontendOptions {
pub user_provider: Option<String>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub query: QueryOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
}
@@ -82,6 +84,7 @@ impl Default for FrontendOptions {
user_provider: None,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
query: QueryOptions::default(),
max_in_flight_write_bytes: None,
}
}

View File

@@ -166,6 +166,7 @@ impl FrontendBuilder {
Some(Arc::new(flow_service)),
true,
plugins.clone(),
self.options.query.clone(),
)
.query_engine();

View File

@@ -567,6 +567,7 @@ mod tests {
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
use crate::options::QueryOptions;
use crate::parser::QueryLanguageParser;
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
@@ -581,7 +582,16 @@ mod tests {
};
catalog_manager.register_table_sync(req).unwrap();
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine()
QueryEngineFactory::new(
catalog_manager,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
}
#[tokio::test]

View File

@@ -29,6 +29,7 @@ pub mod executor;
pub mod log_query;
pub mod metrics;
mod optimizer;
pub mod options;
pub mod parser;
mod part_sort;
pub mod physical_wrapper;

30
src/query/src/options.rs Normal file
View File

@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
/// Query engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct QueryOptions {
/// Parallelism of query engine. Default to 0, which implies the number of logical CPUs.
pub parallelism: usize,
}
#[allow(clippy::derivable_impls)]
impl Default for QueryOptions {
fn default() -> Self {
Self { parallelism: 0 }
}
}

View File

@@ -38,6 +38,7 @@ use table::TableRef;
use crate::dataframe::DataFrame;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::options::QueryOptions;
use crate::planner::LogicalPlanner;
pub use crate::query_engine::context::QueryEngineContext;
pub use crate::query_engine::state::QueryEngineState;
@@ -106,6 +107,7 @@ impl QueryEngineFactory {
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
with_dist_planner: bool,
options: QueryOptions,
) -> Self {
Self::new_with_plugins(
catalog_manager,
@@ -115,9 +117,11 @@ impl QueryEngineFactory {
flow_service_handler,
with_dist_planner,
Default::default(),
options,
)
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_plugins(
catalog_manager: CatalogManagerRef,
region_query_handler: Option<RegionQueryHandlerRef>,
@@ -126,6 +130,7 @@ impl QueryEngineFactory {
flow_service_handler: Option<FlowServiceHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
options: QueryOptions,
) -> Self {
let state = Arc::new(QueryEngineState::new(
catalog_manager,
@@ -135,6 +140,7 @@ impl QueryEngineFactory {
flow_service_handler,
with_dist_planner,
plugins.clone(),
options,
));
let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
register_functions(&query_engine);
@@ -166,7 +172,15 @@ mod tests {
#[test]
fn test_query_engine_factory() {
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let factory = QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
);
let engine = factory.query_engine();

View File

@@ -75,6 +75,7 @@ impl QueryEngineContext {
use common_base::Plugins;
use session::context::QueryContext;
use crate::options::QueryOptions;
use crate::query_engine::QueryEngineState;
let state = Arc::new(QueryEngineState::new(
@@ -85,6 +86,7 @@ impl QueryEngineContext {
None,
false,
Plugins::default(),
QueryOptions::default(),
));
QueryEngineContext::new(state.session_state(), QueryContext::arc())

View File

@@ -159,6 +159,7 @@ mod tests {
use super::*;
use crate::dummy_catalog::DummyCatalogList;
use crate::optimizer::test_util::mock_table_provider;
use crate::options::QueryOptions;
use crate::QueryEngineFactory;
fn mock_plan(schema: SchemaRef) -> LogicalPlan {
@@ -177,7 +178,15 @@ mod tests {
#[tokio::test]
async fn test_serializer_decode_plan() {
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let factory = QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
);
let engine = factory.query_engine();

View File

@@ -54,6 +54,7 @@ use crate::optimizer::string_normalization::StringNormalizationRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
use crate::optimizer::ExtensionAnalyzerRule;
use crate::options::QueryOptions as QueryOptionsNew;
use crate::query_engine::options::QueryOptions;
use crate::query_engine::DefaultSerializer;
use crate::range_select::planner::RangeSelectPlanner;
@@ -81,6 +82,7 @@ impl fmt::Debug for QueryEngineState {
}
impl QueryEngineState {
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog_list: CatalogManagerRef,
region_query_handler: Option<RegionQueryHandlerRef>,
@@ -89,9 +91,13 @@ impl QueryEngineState {
flow_service_handler: Option<FlowServiceHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
options: QueryOptionsNew,
) -> Self {
let runtime_env = Arc::new(RuntimeEnv::default());
let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
if options.parallelism > 0 {
session_config = session_config.with_target_partitions(options.parallelism);
}
// todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
// and we can add that check back once we upgrade datafusion.

View File

@@ -611,6 +611,7 @@ mod test {
use table::test_util::EmptyTable;
use super::*;
use crate::options::QueryOptions;
use crate::parser::QueryLanguageParser;
use crate::{QueryEngineFactory, QueryEngineRef};
@@ -663,7 +664,16 @@ mod test {
table,
})
.is_ok());
QueryEngineFactory::new(catalog_list, None, None, None, None, false).query_engine()
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
}
async fn do_query(sql: &str) -> Result<LogicalPlan> {

View File

@@ -18,6 +18,7 @@ use common_recordbatch::{util, RecordBatch};
use session::context::QueryContext;
use table::TableRef;
use crate::options::QueryOptions;
use crate::parser::QueryLanguageParser;
use crate::{QueryEngineFactory, QueryEngineRef};
@@ -46,5 +47,14 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine()
QueryEngineFactory::new(
catalog_manager,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
}

View File

@@ -33,6 +33,7 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::test_util::MemTable;
use crate::error::{QueryExecutionSnafu, Result};
use crate::options::QueryOptions as QueryOptionsNew;
use crate::parser::QueryLanguageParser;
use crate::query_engine::options::QueryOptions;
use crate::query_engine::QueryEngineFactory;
@@ -43,7 +44,15 @@ async fn test_datafusion_query_engine() -> Result<()> {
let catalog_list = catalog::memory::new_memory_catalog_manager()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let factory = QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptionsNew::default(),
);
let engine = factory.query_engine();
let column_schemas = vec![ColumnSchema::new(
@@ -122,8 +131,16 @@ async fn test_query_validate() -> Result<()> {
disallow_cross_catalog_query: true,
});
let factory =
QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, None, false, plugins);
let factory = QueryEngineFactory::new_with_plugins(
catalog_list,
None,
None,
None,
None,
false,
plugins,
QueryOptionsNew::default(),
);
let engine = factory.query_engine();
let stmt =

View File

@@ -33,6 +33,7 @@ use table::predicate::build_time_range_predicate;
use table::test_util::MemTable;
use table::{Table, TableRef};
use crate::options::QueryOptions;
use crate::tests::exec_selection;
use crate::{QueryEngineFactory, QueryEngineRef};
@@ -102,8 +103,16 @@ fn create_test_engine() -> TimeRangeTester {
};
let _ = catalog_manager.register_table_sync(req).unwrap();
let engine =
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine();
let engine = QueryEngineFactory::new(
catalog_manager,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine();
TimeRangeTester { engine, filter }
}

View File

@@ -21,6 +21,7 @@ use catalog::memory::MemoryCatalogManager;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::options::QueryOptions;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
@@ -158,8 +159,16 @@ impl GrpcQueryHandler for DummyInstance {
fn create_testing_instance(table: TableRef) -> DummyInstance {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, None, None, false).query_engine();
let query_engine = QueryEngineFactory::new(
catalog_manager,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine();
DummyInstance::new(query_engine)
}