feat: separate datanode query and ingestion runtimes (#8246)

* feat: add datanode runtime options

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: add datanode runtime handles

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: wire datanode runtimes into region server

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: route datanode ingestion to ingestion runtime

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: add datanode query runtime stream bridge

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: route datanode reads to query runtime

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: add datanode global runtimes

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: use common datanode runtimes

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: run mito scan tasks on query runtime

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: split datanode runtime options

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: share global fallback for datanode runtimes

Use the global runtime as the fallback for datanode query and ingestion
runtimes when datanode-specific pools are not initialized. This avoids
creating unused datanode worker pools in non-datanode services.

Files:
- `src/common/runtime/src/global.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: docs

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: forward query runtime stream metrics

Forward inner stream metrics through the datanode query runtime bridge so
`EXPLAIN ANALYZE` can report plan metrics after stream polling moves to the
query runtime.

Files:
- `src/datanode/src/query_stream.rs`
- `src/datanode/src/region_server.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: route metric batch puts to ingest runtime

Run the optimized metric batch put path on the datanode ingest runtime so
metric ingestion does not bypass runtime isolation.

Files:
- `src/datanode/src/region_server.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: abort query producer on stream drop

Abort the datanode query runtime producer when the returned read stream is
dropped so cancelled clients do not leave query work running in the
background.

Files:
- `src/datanode/src/query_stream.rs`
- `src/datanode/src/region_server.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: simplify query stream bridge setup

Create the inner read stream before spawning the datanode query runtime
producer so setup does not use an extra task and initialization channel.

Files:
- `src/datanode/src/region_server.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/runtime-priority:
 ### Update Datanode Runtime Options and Region Server Logic

 - **`global.rs`**: Adjusted `datanode_ingest_rt_size` to utilize all available CPUs for improved performance.
 - **`region_server.rs`**: Simplified the collection of `put_requests` and optimized the `put_regions_batch` call for better efficiency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/runtime-priority:
 ### Remove Redundant Checks and Simplify Code

 - **`global.rs`**: Removed the assertion check for already initialized global runtimes to streamline the initialization process.
 - **`region_server.rs`**: Simplified the extraction of `Put` requests by removing unnecessary cloning and restructuring the iterator logic.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: remove redundant spawn_datanode_query in RegionServer::handle_read

The outer `spawn_datanode_query` wrapped `handle_read_inner` on the
same runtime, creating a nested spawn that consumed query runtime
threads unnecessarily under concurrent read load. The gRPC handler
already provides runtime isolation, so the inner call is sufficient.

- `src/datanode/src/region_server.rs` — inline `handle_read_inner`
  directly instead of spawning onto the datanode query runtime

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: resolve test mismatch and redundant spawn in handle_remote_read

- `src/common/runtime/src/global.rs` — update test assertion to match
  default `datanode_ingest_rt_size` of `cpus` instead of `1`
- `src/datanode/src/region_server.rs` — inline `handle_remote_read_inner`
  directly instead of spawning onto the datanode query runtime

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: rename datanode runtimes

Summary:
- Rename datanode runtime APIs from `datanode_query` and `datanode_ingest` to `query` and `ingest`.
- Rename runtime config keys from `datanode_query_rt_size` and `datanode_ingest_rt_size` to `query_rt_size` and `ingest_rt_size`.
- Update config docs, example config, and config-loading coverage.

Files:
- `src/common/runtime/src/global.rs`
- `src/common/runtime/src/lib.rs`
- `src/cmd/tests/load_config_test.rs`
- `src/datanode/src/region_server.rs`
- `src/mito2/src/read/pruner.rs`
- `src/mito2/src/read/range_cache.rs`
- `src/mito2/src/read/scan_region.rs`
- `src/mito2/src/read/series_scan.rs`
- `config/datanode.example.toml`
- `config/config.md`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: consolidate runtime options

Summary:
- Embed datanode runtime sizes in shared `RuntimeOptions` and remove the extra `GreptimeOptions` runtime type parameter.
- Use the unified `RuntimeOptions` for datanode global and datanode-specific runtime initialization.
- Update datanode runtime config coverage and ingest runtime default documentation.

Files:
- `src/common/runtime/src/global.rs`
- `src/common/runtime/src/lib.rs`
- `src/cmd/src/options.rs`
- `src/cmd/src/datanode.rs`
- `src/cmd/src/datanode/builder.rs`
- `src/cmd/tests/load_config_test.rs`
- `config/datanode.example.toml`
- `config/config.md`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: guard against double initialization of datanode runtimes

Add an assertion in `init_datanode_runtimes` to panic when global runtimes
are already initialized, preventing silent overwrites.

- `src/common/runtime/src/global.rs` — assert guard in `init_datanode_runtimes`
  and test `test_set_datanode_runtimes_panics_after_global_runtimes_initialized`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-06-09 16:21:50 +08:00
committed by GitHub
parent 1451a51ea0
commit e74a73638d
16 changed files with 482 additions and 39 deletions

View File

@@ -472,6 +472,8 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `runtime.query_rt_size` | Integer | `7` | The number of threads to execute datanode query operations.<br/>Defaults to max(num_cpus - 1, 1). |
| `runtime.ingest_rt_size` | Integer | `8` | The number of threads to execute datanode ingestion operations. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |

View File

@@ -82,6 +82,11 @@ watch = false
#+ global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4
## The number of threads to execute datanode query operations.
## Defaults to max(num_cpus - 1, 1).
#+ query_rt_size = 7
## The number of threads to execute datanode ingestion operations.
#+ ingest_rt_size = 8
## The metasrv client options.
[meta_client]

View File

@@ -62,6 +62,7 @@ impl InstanceBuilder {
async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
common_runtime::init_global_runtimes(&opts.runtime);
common_runtime::init_datanode_runtimes(&opts.runtime);
let dn_opts = &mut opts.component;
let guard = common_telemetry::init_global_logging(

View File

@@ -49,7 +49,10 @@ pub struct GreptimeOptions<T> {
pub component: T,
}
impl<T: Configurable> Configurable for GreptimeOptions<T> {
impl<T> Configurable for GreptimeOptions<T>
where
T: Configurable,
{
fn env_list_keys() -> Option<&'static [&'static str]> {
T::env_list_keys()
}

View File

@@ -37,6 +37,24 @@ use servers::tls::{TlsMode, TlsOption};
use standalone::options::StandaloneOptions;
use store_api::path_utils::WAL_DIR;
#[test]
fn test_load_datanode_runtime_options_from_runtime_section() {
let toml = r#"
[runtime]
global_rt_size = 8
compact_rt_size = 4
ingest_rt_size = 8
query_rt_size = 7
"#;
let options: GreptimeOptions<DatanodeOptions> = toml::from_str(toml).unwrap();
assert_eq!(8, options.runtime.global_rt_size);
assert_eq!(4, options.runtime.compact_rt_size);
assert_eq!(8, options.runtime.ingest_rt_size);
assert_eq!(7, options.runtime.query_rt_size);
}
#[allow(deprecated)]
#[test]
fn test_load_datanode_example_config() {

View File

@@ -30,11 +30,16 @@ const HB_WORKERS: usize = 2;
/// The options for the global runtimes.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct RuntimeOptions {
/// The number of threads for the global default runtime.
pub global_rt_size: usize,
/// The number of threads to execute the runtime for compact operations.
pub compact_rt_size: usize,
/// The number of threads to execute datanode query operations.
pub query_rt_size: usize,
/// The number of threads to execute datanode ingestion operations.
pub ingest_rt_size: usize,
}
impl Default for RuntimeOptions {
@@ -43,6 +48,8 @@ impl Default for RuntimeOptions {
Self {
global_rt_size: cpus,
compact_rt_size: usize::max(cpus / 2, 1),
query_rt_size: usize::max(cpus.saturating_sub(1), 1),
ingest_rt_size: cpus,
}
}
}
@@ -63,6 +70,8 @@ struct GlobalRuntimes {
global_runtime: Runtime,
compact_runtime: Runtime,
hb_runtime: Runtime,
query_runtime: Runtime,
ingest_runtime: Runtime,
}
macro_rules! define_spawn {
@@ -96,15 +105,29 @@ impl GlobalRuntimes {
define_spawn!(global);
define_spawn!(compact);
define_spawn!(hb);
define_spawn!(query);
define_spawn!(ingest);
fn new(
global: Option<Runtime>,
compact: Option<Runtime>,
heartbeat: Option<Runtime>,
query: Option<Runtime>,
ingest: Option<Runtime>,
) -> Self {
let global_runtime =
global.unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS));
let query_runtime = query.unwrap_or_else(|| global_runtime.clone());
let ingest_runtime = ingest.unwrap_or_else(|| global_runtime.clone());
fn new(global: Option<Runtime>, compact: Option<Runtime>, heartbeat: Option<Runtime>) -> Self {
Self {
global_runtime: global
.unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)),
global_runtime,
compact_runtime: compact
.unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
hb_runtime: heartbeat
.unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
query_runtime,
ingest_runtime,
}
}
}
@@ -114,6 +137,8 @@ struct ConfigRuntimes {
global_runtime: Option<Runtime>,
compact_runtime: Option<Runtime>,
hb_runtime: Option<Runtime>,
query_runtime: Option<Runtime>,
ingest_runtime: Option<Runtime>,
already_init: bool,
}
@@ -122,9 +147,11 @@ static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
let global = c.global_runtime.take();
let compact = c.compact_runtime.take();
let heartbeat = c.hb_runtime.take();
let query = c.query_runtime.take();
let ingest = c.ingest_runtime.take();
c.already_init = true;
GlobalRuntimes::new(global, compact, heartbeat)
GlobalRuntimes::new(global, compact, heartbeat, query, ingest)
});
static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
@@ -150,7 +177,30 @@ pub fn init_global_runtimes(options: &RuntimeOptions) {
"compact-worker",
options.compact_rt_size,
));
c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS));
c.hb_runtime = Some(create_runtime("heartbeat", "hb-worker", HB_WORKERS));
});
}
/// Initialize the datanode-specific global runtimes.
///
/// # Panics
/// Panics when the global runtimes are already initialized.
/// You should call this function before using any runtime functions.
pub fn init_datanode_runtimes(options: &RuntimeOptions) {
static START: Once = Once::new();
START.call_once(move || {
let mut c = CONFIG_RUNTIMES.lock().unwrap();
assert!(!c.already_init, "Global runtimes already initialized");
c.query_runtime = Some(create_runtime(
"query",
"query-worker",
options.query_rt_size,
));
c.ingest_runtime = Some(create_runtime(
"ingest",
"ingest-worker",
options.ingest_rt_size,
));
});
}
@@ -191,6 +241,8 @@ macro_rules! define_global_runtime_spawn {
define_global_runtime_spawn!(global);
define_global_runtime_spawn!(compact);
define_global_runtime_spawn!(hb);
define_global_runtime_spawn!(query);
define_global_runtime_spawn!(ingest);
#[cfg(test)]
mod tests {
@@ -198,6 +250,41 @@ mod tests {
use super::*;
#[test]
fn test_datanode_runtime_options_default() {
let options = RuntimeOptions::default();
let cpus = num_cpus::get();
assert_eq!(cpus, options.global_rt_size);
assert_eq!(usize::max(cpus / 2, 1), options.compact_rt_size);
assert_eq!(usize::max(cpus.saturating_sub(1), 1), options.query_rt_size);
assert_eq!(cpus, options.ingest_rt_size);
}
#[test]
fn test_datanode_runtimes_fallback_to_global_runtime() {
let runtimes = GlobalRuntimes::new(
Some(create_runtime("test-global", "test-global-worker", 1)),
None,
None,
None,
None,
);
assert_eq!("test-global", runtimes.global_runtime.name());
assert_eq!("test-global", runtimes.query_runtime.name());
assert_eq!("test-global", runtimes.ingest_runtime.name());
}
#[test]
fn test_datanode_runtime_spawn_block_on() {
let handle = spawn_query(async { 1 + 1 });
assert_eq!(2, block_on_query(handle).unwrap());
let handle = spawn_ingest(async { 2 + 2 });
assert_eq!(4, block_on_ingest(handle).unwrap());
}
#[test]
fn test_spawn_block_on() {
let handle = spawn_global(async { 1 + 1 });

View File

@@ -21,9 +21,11 @@ pub mod runtime_default;
pub mod runtime_throttleable;
pub use global::{
block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime,
init_global_runtimes, spawn_blocking_compact, spawn_blocking_global, spawn_blocking_hb,
spawn_compact, spawn_global, spawn_hb,
block_on_compact, block_on_global, block_on_ingest, block_on_query, compact_runtime,
create_runtime, global_runtime, ingest_runtime, init_datanode_runtimes, init_global_runtimes,
query_runtime, spawn_blocking_compact, spawn_blocking_global, spawn_blocking_hb,
spawn_blocking_ingest, spawn_blocking_query, spawn_compact, spawn_global, spawn_hb,
spawn_ingest, spawn_query,
};
pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction};

View File

@@ -450,7 +450,6 @@ impl DatanodeBuilder {
.table_provider_factory
.clone()
.unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
let mut region_server = RegionServer::with_table_provider(
query_engine,
common_runtime::global_runtime(),

View File

@@ -19,6 +19,7 @@ use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::JoinError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
@@ -65,6 +66,15 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to join datanode runtime task, request_type: {}", request_type))]
RuntimeJoin {
request_type: &'static str,
#[snafu(source)]
error: JoinError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create plan decoder"))]
NewPlanDecoder {
#[snafu(implicit)]
@@ -448,9 +458,11 @@ impl ErrorExt for Error {
AsyncTaskExecute { source, .. } => source.status_code(),
CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
StatusCode::Internal
}
CreateDir { .. }
| RemoveDir { .. }
| ShutdownInstance { .. }
| DataFusion { .. }
| RuntimeJoin { .. } => StatusCode::Internal,
RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,

View File

@@ -21,6 +21,7 @@ mod greptimedb_telemetry;
pub mod heartbeat;
pub mod metrics;
mod partition_expr_fetcher;
pub mod query_stream;
pub mod region_server;
pub mod service;
pub mod store;

View File

@@ -0,0 +1,220 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
use common_runtime::JoinHandle;
use datatypes::schema::SchemaRef;
use futures_util::Stream;
use tokio::sync::mpsc;
pub type QueryRuntimeSender = mpsc::Sender<RecordBatchResult<RecordBatch>>;
pub type QueryRuntimeMetrics = Arc<RwLock<Option<RecordBatchMetrics>>>;
/// A record batch stream backed by batches produced on the datanode query runtime.
pub struct QueryRuntimeStream {
schema: SchemaRef,
receiver: mpsc::Receiver<RecordBatchResult<RecordBatch>>,
output_ordering: Option<Vec<OrderOption>>,
metrics: QueryRuntimeMetrics,
producer_handle: Option<JoinHandle<()>>,
}
impl QueryRuntimeStream {
pub fn new(
schema: SchemaRef,
receiver: mpsc::Receiver<RecordBatchResult<RecordBatch>>,
) -> Self {
Self {
schema,
receiver,
output_ordering: None,
metrics: Default::default(),
producer_handle: None,
}
}
pub fn with_output_ordering(mut self, output_ordering: Option<Vec<OrderOption>>) -> Self {
self.output_ordering = output_ordering;
self
}
pub fn with_metrics(self, metrics: Option<RecordBatchMetrics>) -> Self {
*self.metrics.write().unwrap() = metrics;
self
}
pub fn with_metrics_store(mut self, metrics: QueryRuntimeMetrics) -> Self {
self.metrics = metrics;
self
}
pub fn with_producer_handle(mut self, handle: JoinHandle<()>) -> Self {
self.producer_handle = Some(handle);
self
}
pub fn metrics_store() -> QueryRuntimeMetrics {
Default::default()
}
}
impl Stream for QueryRuntimeStream {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
impl Drop for QueryRuntimeStream {
fn drop(&mut self) {
if let Some(handle) = self.producer_handle.take() {
handle.abort();
}
}
}
impl RecordBatchStream for QueryRuntimeStream {
fn name(&self) -> &str {
"QueryRuntimeStream"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.output_ordering.as_deref()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.metrics.read().unwrap().clone()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_recordbatch::error::CreateRecordBatchesSnafu;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use super::*;
fn test_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
RecordBatch::new(schema, vec![values]).unwrap()
}
#[tokio::test]
async fn test_query_runtime_stream_receives_batches() {
let batch = test_batch();
let schema = batch.schema.clone();
let (tx, rx) = mpsc::channel(1);
tx.send(Ok(batch)).await.unwrap();
drop(tx);
let mut stream = QueryRuntimeStream::new(schema, rx);
let batch = stream.next().await.unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_query_runtime_stream_forwards_errors() {
let schema = test_batch().schema.clone();
let (tx, rx) = mpsc::channel(1);
tx.send(Err(CreateRecordBatchesSnafu {
reason: "test error",
}
.build()))
.await
.unwrap();
drop(tx);
let mut stream = QueryRuntimeStream::new(schema, rx);
assert!(stream.next().await.unwrap().is_err());
}
#[tokio::test]
async fn test_query_runtime_stream_reads_shared_metrics() {
let schema = test_batch().schema.clone();
let (tx, rx) = mpsc::channel(1);
drop(tx);
let metrics = QueryRuntimeStream::metrics_store();
let stream = QueryRuntimeStream::new(schema, rx).with_metrics_store(metrics.clone());
assert!(stream.metrics().is_none());
*metrics.write().unwrap() = Some(RecordBatchMetrics {
elapsed_compute: 42,
..Default::default()
});
assert_eq!(42, stream.metrics().unwrap().elapsed_compute);
}
#[tokio::test]
async fn test_query_runtime_stream_drop_aborts_producer() {
struct AbortGuard(Option<tokio::sync::oneshot::Sender<()>>);
impl Drop for AbortGuard {
fn drop(&mut self) {
let _ = self.0.take().unwrap().send(());
}
}
let schema = test_batch().schema.clone();
let (_tx, rx) = mpsc::channel(1);
let (abort_tx, abort_rx) = tokio::sync::oneshot::channel();
let (started_tx, started_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
let _guard = AbortGuard(Some(abort_tx));
let _ = started_tx.send(());
std::future::pending::<()>().await;
});
started_rx.await.unwrap();
let stream = QueryRuntimeStream::new(schema, rx).with_producer_handle(handle);
drop(stream);
tokio::time::timeout(std::time::Duration::from_secs(1), abort_rx)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn test_query_runtime_stream_close_stops_sender() {
let schema = test_batch().schema.clone();
let (tx, rx) = mpsc::channel(1);
let stream = QueryRuntimeStream::new(schema, rx);
drop(stream);
assert!(tx.send(Ok(test_batch())).await.is_err());
}
}

View File

@@ -51,8 +51,8 @@ use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
use datatypes::schema::SchemaRef;
use either::Either;
use futures_util::Stream;
use futures_util::future::try_join_all;
use futures_util::{Stream, StreamExt};
use metric_engine::engine::MetricEngine;
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
use prost::Message;
@@ -83,7 +83,7 @@ use store_api::region_request::{
RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};
@@ -93,11 +93,13 @@ use crate::error::{
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
RegionNotReadySnafu, Result, RuntimeJoinSnafu, SerializeJsonSnafu, StopRegionEngineSnafu,
UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
use crate::query_stream::QueryRuntimeStream;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
const QUERY_RUNTIME_STREAM_BUFFER_SIZE: usize = 8;
#[derive(Clone)]
pub struct RegionServer {
@@ -225,6 +227,16 @@ impl RegionServer {
region_id: RegionId,
request: RegionRequest,
) -> Result<RegionResponse> {
if RegionServerInner::is_ingest_request(&request) {
let inner = self.inner.clone();
let request_type = request.request_type();
return common_runtime::spawn_ingest(async move {
inner.handle_request(region_id, request).await
})
.await
.context(RuntimeJoinSnafu { request_type })?;
}
self.inner.handle_request(region_id, request).await
}
@@ -257,6 +269,14 @@ impl RegionServer {
&self,
request: api::v1::region::QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
self.handle_remote_read_inner(request, query_ctx).await
}
async fn handle_remote_read_inner(
&self,
request: api::v1::region::QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
@@ -304,6 +324,10 @@ impl RegionServer {
#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
self.handle_read_inner(request).await
}
async fn handle_read_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
@@ -601,13 +625,13 @@ impl RegionServer {
});
// Downcast to MetricEngine and call batch API
let metric_engine =
engine
.as_any()
.downcast_ref::<MetricEngine>()
.context(UnexpectedSnafu {
violated: "Failed to downcast to MetricEngine",
})?;
let metric_engine = engine
.as_any()
.downcast_ref::<MetricEngine>()
.context(UnexpectedSnafu {
violated: "Failed to downcast to MetricEngine",
})?
.clone();
let tracing_context = TracingContext::from_current_span();
let batch_size = put_requests.len();
@@ -615,14 +639,18 @@ impl RegionServer {
"RegionServer::handle_metric_batch_puts",
batch_size = batch_size,
));
let result = metric_engine
.put_regions_batch(put_requests)
.trace(span)
.await
.map_err(BoxedError::new)
.context(HandleRegionRequestSnafu {
region_id: first_region_id,
});
let result = common_runtime::spawn_ingest(async move {
metric_engine
.put_regions_batch(put_requests)
.trace(span)
.await
})
.await
.context(RuntimeJoinSnafu { request_type })?
.map_err(BoxedError::new)
.context(HandleRegionRequestSnafu {
region_id: first_region_id,
});
match result {
Ok(total_affected) => {
@@ -1192,6 +1220,13 @@ impl RegionServerInner {
*self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
}
fn is_ingest_request(request: &RegionRequest) -> bool {
matches!(
request,
RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_)
)
}
fn get_engine(
&self,
region_id: RegionId,
@@ -1781,6 +1816,37 @@ impl RegionServerInner {
}
pub async fn handle_read(
self: &Arc<Self>,
request: QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
let mut stream = self.handle_read_inner(request, query_ctx).await?;
let schema = stream.schema();
let output_ordering = stream.output_ordering().map(|ordering| ordering.to_vec());
let (sender, receiver) = mpsc::channel(QUERY_RUNTIME_STREAM_BUFFER_SIZE);
let metrics = QueryRuntimeStream::metrics_store();
let producer_metrics = metrics.clone();
let producer_handle = common_runtime::spawn_query(async move {
while let Some(batch) = stream.next().await {
*producer_metrics.write().unwrap() = stream.metrics();
if sender.send(batch).await.is_err() {
break;
}
}
*producer_metrics.write().unwrap() = stream.metrics();
});
Ok(Box::pin(
QueryRuntimeStream::new(schema, receiver)
.with_output_ordering(output_ordering)
.with_metrics_store(metrics)
.with_producer_handle(producer_handle),
))
}
async fn handle_read_inner(
&self,
request: QueryRequest,
query_ctx: QueryContextRef,
@@ -1898,11 +1964,11 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use api::v1::region::{
RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
remote_dyn_filter_request,
};
use api::v1::{Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
@@ -1915,7 +1981,8 @@ mod tests {
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
PathType, RegionCompactRequest, RegionDeleteRequest, RegionDropRequest, RegionOpenRequest,
RegionPutRequest, RegionTruncateRequest,
};
use store_api::storage::RegionId;
@@ -1923,6 +1990,32 @@ mod tests {
use crate::error::Result;
use crate::tests::{MockRegionEngine, mock_region_server};
#[test]
fn test_is_ingest_request() {
let rows = || Rows {
schema: Vec::new(),
rows: Vec::new(),
};
assert!(RegionServerInner::is_ingest_request(&RegionRequest::Put(
RegionPutRequest {
rows: rows(),
hint: None,
partition_expr_version: None,
},
)));
assert!(RegionServerInner::is_ingest_request(
&RegionRequest::Delete(RegionDeleteRequest {
rows: rows(),
hint: None,
partition_expr_version: None,
},)
));
assert!(!RegionServerInner::is_ingest_request(
&RegionRequest::Compact(RegionCompactRequest::default()),
));
}
fn single_value_stream() -> SendableRecordBatchStream {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",

View File

@@ -214,7 +214,7 @@ impl Pruner {
// Spawn worker tasks with their receivers
for (worker_id, rx) in receivers.into_iter().enumerate() {
let inner_clone = inner.clone();
common_runtime::spawn_global(async move {
common_runtime::spawn_query(async move {
Self::worker_loop(worker_id, rx, inner_clone).await;
});
}

View File

@@ -657,7 +657,7 @@ impl CacheBatchBuffer {
let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0);
let (tx, rx) = mpsc::unbounded_channel();
common_runtime::spawn_global(run_cache_concat_task(
common_runtime::spawn_query(run_cache_concat_task(
rx,
limiter.clone(),
skip_threshold_bytes,

View File

@@ -1190,7 +1190,7 @@ impl ScanInput {
region_id = %region_id,
stream_kind = "flat"
);
common_runtime::spawn_global(
common_runtime::spawn_query(
async move {
loop {
// We release the permit before sending result to avoid the task waiting on

View File

@@ -238,7 +238,7 @@ impl SeriesScan {
};
let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
common_runtime::spawn_global(
common_runtime::spawn_query(
async move {
distributor.execute().await;
}
@@ -504,7 +504,7 @@ impl SeriesDistributor {
let partition_pruner = partition_pruner.clone();
let file_scan_semaphore = self.range_semaphore.clone();
let merge_semaphore = self.range_semaphore.clone();
tasks.push(common_runtime::spawn_global(async move {
tasks.push(common_runtime::spawn_query(async move {
SeqScan::build_flat_partition_range_read(
&stream_ctx,
&part_range,