diff --git a/config/config.md b/config/config.md index 310cd3c2d8..f03fa6377f 100644 --- a/config/config.md +++ b/config/config.md @@ -472,6 +472,8 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | +| `runtime.query_rt_size` | Integer | `7` | The number of threads to execute datanode query operations.
Defaults to max(num_cpus - 1, 1). | +| `runtime.ingest_rt_size` | Integer | `8` | The number of threads to execute datanode ingestion operations. | | `meta_client` | -- | -- | The metasrv client options. | | `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. | | `meta_client.timeout` | String | `3s` | Operation timeout. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index cad0086169..d4c70e0fb4 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -82,6 +82,11 @@ watch = false #+ global_rt_size = 8 ## The number of threads to execute the runtime for global write operations. #+ compact_rt_size = 4 +## The number of threads to execute datanode query operations. +## Defaults to max(num_cpus - 1, 1). +#+ query_rt_size = 7 +## The number of threads to execute datanode ingestion operations. +#+ ingest_rt_size = 8 ## The metasrv client options. [meta_client] diff --git a/src/cmd/src/datanode/builder.rs b/src/cmd/src/datanode/builder.rs index 28a6e82705..7e4b83f09c 100644 --- a/src/cmd/src/datanode/builder.rs +++ b/src/cmd/src/datanode/builder.rs @@ -62,6 +62,7 @@ impl InstanceBuilder { async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result> { common_runtime::init_global_runtimes(&opts.runtime); + common_runtime::init_datanode_runtimes(&opts.runtime); let dn_opts = &mut opts.component; let guard = common_telemetry::init_global_logging( diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index ecff735294..0f72c980b2 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -49,7 +49,10 @@ pub struct GreptimeOptions { pub component: T, } -impl Configurable for GreptimeOptions { +impl Configurable for GreptimeOptions +where + T: Configurable, +{ fn env_list_keys() -> Option<&'static [&'static str]> { T::env_list_keys() } diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 52e8cc278c..73fb6c3d0b 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -37,6 +37,24 @@ use servers::tls::{TlsMode, TlsOption}; use standalone::options::StandaloneOptions; use store_api::path_utils::WAL_DIR; +#[test] +fn test_load_datanode_runtime_options_from_runtime_section() { + let toml = r#" + [runtime] + global_rt_size = 8 + compact_rt_size = 4 + ingest_rt_size = 8 + query_rt_size = 7 + "#; + + let options: GreptimeOptions = toml::from_str(toml).unwrap(); + + assert_eq!(8, options.runtime.global_rt_size); + assert_eq!(4, options.runtime.compact_rt_size); + assert_eq!(8, options.runtime.ingest_rt_size); + assert_eq!(7, options.runtime.query_rt_size); +} + #[allow(deprecated)] #[test] fn test_load_datanode_example_config() { diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 9480c94713..e3f9b93280 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -30,11 +30,16 @@ const HB_WORKERS: usize = 2; /// The options for the global runtimes. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(default)] pub struct RuntimeOptions { /// The number of threads for the global default runtime. pub global_rt_size: usize, /// The number of threads to execute the runtime for compact operations. pub compact_rt_size: usize, + /// The number of threads to execute datanode query operations. + pub query_rt_size: usize, + /// The number of threads to execute datanode ingestion operations. + pub ingest_rt_size: usize, } impl Default for RuntimeOptions { @@ -43,6 +48,8 @@ impl Default for RuntimeOptions { Self { global_rt_size: cpus, compact_rt_size: usize::max(cpus / 2, 1), + query_rt_size: usize::max(cpus.saturating_sub(1), 1), + ingest_rt_size: cpus, } } } @@ -63,6 +70,8 @@ struct GlobalRuntimes { global_runtime: Runtime, compact_runtime: Runtime, hb_runtime: Runtime, + query_runtime: Runtime, + ingest_runtime: Runtime, } macro_rules! define_spawn { @@ -96,15 +105,29 @@ impl GlobalRuntimes { define_spawn!(global); define_spawn!(compact); define_spawn!(hb); + define_spawn!(query); + define_spawn!(ingest); + + fn new( + global: Option, + compact: Option, + heartbeat: Option, + query: Option, + ingest: Option, + ) -> Self { + let global_runtime = + global.unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)); + let query_runtime = query.unwrap_or_else(|| global_runtime.clone()); + let ingest_runtime = ingest.unwrap_or_else(|| global_runtime.clone()); - fn new(global: Option, compact: Option, heartbeat: Option) -> Self { Self { - global_runtime: global - .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)), + global_runtime, compact_runtime: compact .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)), hb_runtime: heartbeat .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)), + query_runtime, + ingest_runtime, } } } @@ -114,6 +137,8 @@ struct ConfigRuntimes { global_runtime: Option, compact_runtime: Option, hb_runtime: Option, + query_runtime: Option, + ingest_runtime: Option, already_init: bool, } @@ -122,9 +147,11 @@ static GLOBAL_RUNTIMES: Lazy = Lazy::new(|| { let global = c.global_runtime.take(); let compact = c.compact_runtime.take(); let heartbeat = c.hb_runtime.take(); + let query = c.query_runtime.take(); + let ingest = c.ingest_runtime.take(); c.already_init = true; - GlobalRuntimes::new(global, compact, heartbeat) + GlobalRuntimes::new(global, compact, heartbeat, query, ingest) }); static CONFIG_RUNTIMES: Lazy> = @@ -150,7 +177,30 @@ pub fn init_global_runtimes(options: &RuntimeOptions) { "compact-worker", options.compact_rt_size, )); - c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS)); + c.hb_runtime = Some(create_runtime("heartbeat", "hb-worker", HB_WORKERS)); + }); +} + +/// Initialize the datanode-specific global runtimes. +/// +/// # Panics +/// Panics when the global runtimes are already initialized. +/// You should call this function before using any runtime functions. +pub fn init_datanode_runtimes(options: &RuntimeOptions) { + static START: Once = Once::new(); + START.call_once(move || { + let mut c = CONFIG_RUNTIMES.lock().unwrap(); + assert!(!c.already_init, "Global runtimes already initialized"); + c.query_runtime = Some(create_runtime( + "query", + "query-worker", + options.query_rt_size, + )); + c.ingest_runtime = Some(create_runtime( + "ingest", + "ingest-worker", + options.ingest_rt_size, + )); }); } @@ -191,6 +241,8 @@ macro_rules! define_global_runtime_spawn { define_global_runtime_spawn!(global); define_global_runtime_spawn!(compact); define_global_runtime_spawn!(hb); +define_global_runtime_spawn!(query); +define_global_runtime_spawn!(ingest); #[cfg(test)] mod tests { @@ -198,6 +250,41 @@ mod tests { use super::*; + #[test] + fn test_datanode_runtime_options_default() { + let options = RuntimeOptions::default(); + let cpus = num_cpus::get(); + + assert_eq!(cpus, options.global_rt_size); + assert_eq!(usize::max(cpus / 2, 1), options.compact_rt_size); + assert_eq!(usize::max(cpus.saturating_sub(1), 1), options.query_rt_size); + assert_eq!(cpus, options.ingest_rt_size); + } + + #[test] + fn test_datanode_runtimes_fallback_to_global_runtime() { + let runtimes = GlobalRuntimes::new( + Some(create_runtime("test-global", "test-global-worker", 1)), + None, + None, + None, + None, + ); + + assert_eq!("test-global", runtimes.global_runtime.name()); + assert_eq!("test-global", runtimes.query_runtime.name()); + assert_eq!("test-global", runtimes.ingest_runtime.name()); + } + + #[test] + fn test_datanode_runtime_spawn_block_on() { + let handle = spawn_query(async { 1 + 1 }); + assert_eq!(2, block_on_query(handle).unwrap()); + + let handle = spawn_ingest(async { 2 + 2 }); + assert_eq!(4, block_on_ingest(handle).unwrap()); + } + #[test] fn test_spawn_block_on() { let handle = spawn_global(async { 1 + 1 }); diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index d1effcfa4e..533fb57c6a 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -21,9 +21,11 @@ pub mod runtime_default; pub mod runtime_throttleable; pub use global::{ - block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime, - init_global_runtimes, spawn_blocking_compact, spawn_blocking_global, spawn_blocking_hb, - spawn_compact, spawn_global, spawn_hb, + block_on_compact, block_on_global, block_on_ingest, block_on_query, compact_runtime, + create_runtime, global_runtime, ingest_runtime, init_datanode_runtimes, init_global_runtimes, + query_runtime, spawn_blocking_compact, spawn_blocking_global, spawn_blocking_hb, + spawn_blocking_ingest, spawn_blocking_query, spawn_compact, spawn_global, spawn_hb, + spawn_ingest, spawn_query, }; pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction}; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 4a8aaf3d1d..90aedae645 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -450,7 +450,6 @@ impl DatanodeBuilder { .table_provider_factory .clone() .unwrap_or_else(|| Arc::new(DummyTableProviderFactory)); - let mut region_server = RegionServer::with_table_provider( query_engine, common_runtime::global_runtime(), diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 4d2e9119f5..9b47644b45 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -19,6 +19,7 @@ use common_error::define_into_tonic_status; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_runtime::JoinError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::error::Error as TableError; @@ -65,6 +66,15 @@ pub enum Error { source: query::error::Error, }, + #[snafu(display("Failed to join datanode runtime task, request_type: {}", request_type))] + RuntimeJoin { + request_type: &'static str, + #[snafu(source)] + error: JoinError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to create plan decoder"))] NewPlanDecoder { #[snafu(implicit)] @@ -448,9 +458,11 @@ impl ErrorExt for Error { AsyncTaskExecute { source, .. } => source.status_code(), - CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => { - StatusCode::Internal - } + CreateDir { .. } + | RemoveDir { .. } + | ShutdownInstance { .. } + | DataFusion { .. } + | RuntimeJoin { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 7e0db3cabc..c14ec886fa 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -21,6 +21,7 @@ mod greptimedb_telemetry; pub mod heartbeat; pub mod metrics; mod partition_expr_fetcher; +pub mod query_stream; pub mod region_server; pub mod service; pub mod store; diff --git a/src/datanode/src/query_stream.rs b/src/datanode/src/query_stream.rs new file mode 100644 index 0000000000..0f1916519b --- /dev/null +++ b/src/datanode/src/query_stream.rs @@ -0,0 +1,220 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; + +use common_recordbatch::adapter::RecordBatchMetrics; +use common_recordbatch::error::Result as RecordBatchResult; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream}; +use common_runtime::JoinHandle; +use datatypes::schema::SchemaRef; +use futures_util::Stream; +use tokio::sync::mpsc; + +pub type QueryRuntimeSender = mpsc::Sender>; +pub type QueryRuntimeMetrics = Arc>>; + +/// A record batch stream backed by batches produced on the datanode query runtime. +pub struct QueryRuntimeStream { + schema: SchemaRef, + receiver: mpsc::Receiver>, + output_ordering: Option>, + metrics: QueryRuntimeMetrics, + producer_handle: Option>, +} + +impl QueryRuntimeStream { + pub fn new( + schema: SchemaRef, + receiver: mpsc::Receiver>, + ) -> Self { + Self { + schema, + receiver, + output_ordering: None, + metrics: Default::default(), + producer_handle: None, + } + } + + pub fn with_output_ordering(mut self, output_ordering: Option>) -> Self { + self.output_ordering = output_ordering; + self + } + + pub fn with_metrics(self, metrics: Option) -> Self { + *self.metrics.write().unwrap() = metrics; + self + } + + pub fn with_metrics_store(mut self, metrics: QueryRuntimeMetrics) -> Self { + self.metrics = metrics; + self + } + + pub fn with_producer_handle(mut self, handle: JoinHandle<()>) -> Self { + self.producer_handle = Some(handle); + self + } + + pub fn metrics_store() -> QueryRuntimeMetrics { + Default::default() + } +} + +impl Stream for QueryRuntimeStream { + type Item = RecordBatchResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.receiver.poll_recv(cx) + } +} + +impl Drop for QueryRuntimeStream { + fn drop(&mut self) { + if let Some(handle) = self.producer_handle.take() { + handle.abort(); + } + } +} + +impl RecordBatchStream for QueryRuntimeStream { + fn name(&self) -> &str { + "QueryRuntimeStream" + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.output_ordering.as_deref() + } + + fn metrics(&self) -> Option { + self.metrics.read().unwrap().clone() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_recordbatch::error::CreateRecordBatchesSnafu; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + use futures_util::StreamExt; + + use super::*; + + fn test_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let values: VectorRef = Arc::new(Int32Vector::from_slice([1])); + RecordBatch::new(schema, vec![values]).unwrap() + } + + #[tokio::test] + async fn test_query_runtime_stream_receives_batches() { + let batch = test_batch(); + let schema = batch.schema.clone(); + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(batch)).await.unwrap(); + drop(tx); + + let mut stream = QueryRuntimeStream::new(schema, rx); + let batch = stream.next().await.unwrap().unwrap(); + assert_eq!(1, batch.num_rows()); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_query_runtime_stream_forwards_errors() { + let schema = test_batch().schema.clone(); + let (tx, rx) = mpsc::channel(1); + tx.send(Err(CreateRecordBatchesSnafu { + reason: "test error", + } + .build())) + .await + .unwrap(); + drop(tx); + + let mut stream = QueryRuntimeStream::new(schema, rx); + assert!(stream.next().await.unwrap().is_err()); + } + + #[tokio::test] + async fn test_query_runtime_stream_reads_shared_metrics() { + let schema = test_batch().schema.clone(); + let (tx, rx) = mpsc::channel(1); + drop(tx); + let metrics = QueryRuntimeStream::metrics_store(); + let stream = QueryRuntimeStream::new(schema, rx).with_metrics_store(metrics.clone()); + + assert!(stream.metrics().is_none()); + *metrics.write().unwrap() = Some(RecordBatchMetrics { + elapsed_compute: 42, + ..Default::default() + }); + + assert_eq!(42, stream.metrics().unwrap().elapsed_compute); + } + + #[tokio::test] + async fn test_query_runtime_stream_drop_aborts_producer() { + struct AbortGuard(Option>); + + impl Drop for AbortGuard { + fn drop(&mut self) { + let _ = self.0.take().unwrap().send(()); + } + } + + let schema = test_batch().schema.clone(); + let (_tx, rx) = mpsc::channel(1); + let (abort_tx, abort_rx) = tokio::sync::oneshot::channel(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel(); + let handle = tokio::spawn(async move { + let _guard = AbortGuard(Some(abort_tx)); + let _ = started_tx.send(()); + std::future::pending::<()>().await; + }); + started_rx.await.unwrap(); + + let stream = QueryRuntimeStream::new(schema, rx).with_producer_handle(handle); + drop(stream); + + tokio::time::timeout(std::time::Duration::from_secs(1), abort_rx) + .await + .unwrap() + .unwrap(); + } + + #[tokio::test] + async fn test_query_runtime_stream_close_stops_sender() { + let schema = test_batch().schema.clone(); + let (tx, rx) = mpsc::channel(1); + let stream = QueryRuntimeStream::new(schema, rx); + drop(stream); + + assert!(tx.send(Ok(test_batch())).await.is_err()); + } +} diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index eb7d2dfa6c..a5e724ab85 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -51,8 +51,8 @@ use datafusion::datasource::TableProvider; use datafusion_common::tree_node::TreeNode; use datatypes::schema::SchemaRef; use either::Either; -use futures_util::Stream; use futures_util::future::try_join_all; +use futures_util::{Stream, StreamExt}; use metric_engine::engine::MetricEngine; use mito2::engine::{MITO_ENGINE_NAME, MitoEngine}; use prost::Message; @@ -83,7 +83,7 @@ use store_api::region_request::{ RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc}; use tokio::time::timeout; use tonic::{Request, Response, Result as TonicResult}; @@ -93,11 +93,13 @@ use crate::error::{ ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, - RegionNotReadySnafu, Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, - UnsupportedOutputSnafu, + RegionNotReadySnafu, Result, RuntimeJoinSnafu, SerializeJsonSnafu, StopRegionEngineSnafu, + UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; +use crate::query_stream::QueryRuntimeStream; use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; +const QUERY_RUNTIME_STREAM_BUFFER_SIZE: usize = 8; #[derive(Clone)] pub struct RegionServer { @@ -225,6 +227,16 @@ impl RegionServer { region_id: RegionId, request: RegionRequest, ) -> Result { + if RegionServerInner::is_ingest_request(&request) { + let inner = self.inner.clone(); + let request_type = request.request_type(); + return common_runtime::spawn_ingest(async move { + inner.handle_request(region_id, request).await + }) + .await + .context(RuntimeJoinSnafu { request_type })?; + } + self.inner.handle_request(region_id, request).await } @@ -257,6 +269,14 @@ impl RegionServer { &self, request: api::v1::region::QueryRequest, query_ctx: QueryContextRef, + ) -> Result { + self.handle_remote_read_inner(request, query_ctx).await + } + + async fn handle_remote_read_inner( + &self, + request: api::v1::region::QueryRequest, + query_ctx: QueryContextRef, ) -> Result { let permit = if let Some(p) = &self.inner.parallelism { Some(p.acquire().await?) @@ -304,6 +324,10 @@ impl RegionServer { #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { + self.handle_read_inner(request).await + } + + async fn handle_read_inner(&self, request: QueryRequest) -> Result { let permit = if let Some(p) = &self.inner.parallelism { Some(p.acquire().await?) } else { @@ -601,13 +625,13 @@ impl RegionServer { }); // Downcast to MetricEngine and call batch API - let metric_engine = - engine - .as_any() - .downcast_ref::() - .context(UnexpectedSnafu { - violated: "Failed to downcast to MetricEngine", - })?; + let metric_engine = engine + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + violated: "Failed to downcast to MetricEngine", + })? + .clone(); let tracing_context = TracingContext::from_current_span(); let batch_size = put_requests.len(); @@ -615,14 +639,18 @@ impl RegionServer { "RegionServer::handle_metric_batch_puts", batch_size = batch_size, )); - let result = metric_engine - .put_regions_batch(put_requests) - .trace(span) - .await - .map_err(BoxedError::new) - .context(HandleRegionRequestSnafu { - region_id: first_region_id, - }); + let result = common_runtime::spawn_ingest(async move { + metric_engine + .put_regions_batch(put_requests) + .trace(span) + .await + }) + .await + .context(RuntimeJoinSnafu { request_type })? + .map_err(BoxedError::new) + .context(HandleRegionRequestSnafu { + region_id: first_region_id, + }); match result { Ok(total_affected) => { @@ -1192,6 +1220,13 @@ impl RegionServerInner { *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter); } + fn is_ingest_request(request: &RegionRequest) -> bool { + matches!( + request, + RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) + ) + } + fn get_engine( &self, region_id: RegionId, @@ -1781,6 +1816,37 @@ impl RegionServerInner { } pub async fn handle_read( + self: &Arc, + request: QueryRequest, + query_ctx: QueryContextRef, + ) -> Result { + let mut stream = self.handle_read_inner(request, query_ctx).await?; + let schema = stream.schema(); + let output_ordering = stream.output_ordering().map(|ordering| ordering.to_vec()); + + let (sender, receiver) = mpsc::channel(QUERY_RUNTIME_STREAM_BUFFER_SIZE); + let metrics = QueryRuntimeStream::metrics_store(); + let producer_metrics = metrics.clone(); + + let producer_handle = common_runtime::spawn_query(async move { + while let Some(batch) = stream.next().await { + *producer_metrics.write().unwrap() = stream.metrics(); + if sender.send(batch).await.is_err() { + break; + } + } + *producer_metrics.write().unwrap() = stream.metrics(); + }); + + Ok(Box::pin( + QueryRuntimeStream::new(schema, receiver) + .with_output_ordering(output_ordering) + .with_metrics_store(metrics) + .with_producer_handle(producer_handle), + )) + } + + async fn handle_read_inner( &self, request: QueryRequest, query_ctx: QueryContextRef, @@ -1898,11 +1964,11 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - use api::v1::SemanticType; use api::v1::region::{ RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate, remote_dyn_filter_request, }; + use api::v1::{Rows, SemanticType}; use common_error::ext::ErrorExt; use common_recordbatch::RecordBatches; use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; @@ -1915,7 +1981,8 @@ mod tests { use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest, + PathType, RegionCompactRequest, RegionDeleteRequest, RegionDropRequest, RegionOpenRequest, + RegionPutRequest, RegionTruncateRequest, }; use store_api::storage::RegionId; @@ -1923,6 +1990,32 @@ mod tests { use crate::error::Result; use crate::tests::{MockRegionEngine, mock_region_server}; + #[test] + fn test_is_ingest_request() { + let rows = || Rows { + schema: Vec::new(), + rows: Vec::new(), + }; + + assert!(RegionServerInner::is_ingest_request(&RegionRequest::Put( + RegionPutRequest { + rows: rows(), + hint: None, + partition_expr_version: None, + }, + ))); + assert!(RegionServerInner::is_ingest_request( + &RegionRequest::Delete(RegionDeleteRequest { + rows: rows(), + hint: None, + partition_expr_version: None, + },) + )); + assert!(!RegionServerInner::is_ingest_request( + &RegionRequest::Compact(RegionCompactRequest::default()), + )); + } + fn single_value_stream() -> SendableRecordBatchStream { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( "v", diff --git a/src/mito2/src/read/pruner.rs b/src/mito2/src/read/pruner.rs index 320144484d..656d222a0b 100644 --- a/src/mito2/src/read/pruner.rs +++ b/src/mito2/src/read/pruner.rs @@ -214,7 +214,7 @@ impl Pruner { // Spawn worker tasks with their receivers for (worker_id, rx) in receivers.into_iter().enumerate() { let inner_clone = inner.clone(); - common_runtime::spawn_global(async move { + common_runtime::spawn_query(async move { Self::worker_loop(worker_id, rx, inner_clone).await; }); } diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index af212ab23e..d305caf23b 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -657,7 +657,7 @@ impl CacheBatchBuffer { let sender = cache_strategy.range_result_memory_limiter().map(|limiter| { let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0); let (tx, rx) = mpsc::unbounded_channel(); - common_runtime::spawn_global(run_cache_concat_task( + common_runtime::spawn_query(run_cache_concat_task( rx, limiter.clone(), skip_threshold_bytes, diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index baf6964c27..8a709af6de 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1190,7 +1190,7 @@ impl ScanInput { region_id = %region_id, stream_kind = "flat" ); - common_runtime::spawn_global( + common_runtime::spawn_query( async move { loop { // We release the permit before sending result to avoid the task waiting on diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 003a754363..8f554b6aed 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -238,7 +238,7 @@ impl SeriesScan { }; let region_id = distributor.stream_ctx.input.mapper.metadata().region_id; let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id); - common_runtime::spawn_global( + common_runtime::spawn_query( async move { distributor.execute().await; } @@ -504,7 +504,7 @@ impl SeriesDistributor { let partition_pruner = partition_pruner.clone(); let file_scan_semaphore = self.range_semaphore.clone(); let merge_semaphore = self.range_semaphore.clone(); - tasks.push(common_runtime::spawn_global(async move { + tasks.push(common_runtime::spawn_query(async move { SeqScan::build_flat_partition_range_read( &stream_ctx, &part_range,