diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 61a4eae128..6c63e9115f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -260,6 +260,14 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to handle batch ddl request, ddl_type: {}", ddl_type))] + HandleBatchDdlRequest { + #[snafu(implicit)] + location: Location, + source: BoxedError, + ddl_type: String, + }, + #[snafu(display("RegionId {} not found", region_id))] RegionNotFound { region_id: RegionId, @@ -438,7 +446,8 @@ impl ErrorExt for Error { UnsupportedOutput { .. } => StatusCode::Unsupported, HandleRegionRequest { source, .. } | GetRegionMetadata { source, .. } - | HandleBatchOpenRequest { source, .. } => source.status_code(), + | HandleBatchOpenRequest { source, .. } + | HandleBatchDdlRequest { source, .. } => source.status_code(), StopRegionEngine { source, .. } => source.status_code(), FindLogicalRegions { source, .. } => source.status_code(), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index f1eb63c51b..4c708002dd 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -59,7 +59,7 @@ use store_api::region_engine::{ SettableRegionRoleState, }; use store_api::region_request::{ - AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, + AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -69,9 +69,10 @@ use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu, ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, - ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, - HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, - RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, + ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu, + HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, + RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result, + StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -344,62 +345,47 @@ impl RegionServer { .region_map .insert(region_id, RegionEngineWithStatus::Ready(engine)); } -} - -#[async_trait] -impl RegionServerHandler for RegionServer { - async fn handle(&self, request: region_request::Body) -> ServerResult { - let is_parallel = matches!( - request, - region_request::Body::Inserts(_) | region_request::Body::Deletes(_) - ); - let requests = RegionRequest::try_from_request_body(request) - .context(BuildRegionRequestsSnafu) - .map_err(BoxedError::new) - .context(ExecuteGrpcRequestSnafu)?; + async fn handle_batch_ddl_requests( + &self, + request: region_request::Body, + ) -> Result { + // Safety: we have already checked the request type in `RegionServer::handle()`. + let batch_request = BatchRegionDdlRequest::try_from_request_body(request) + .context(BuildRegionRequestsSnafu)? + .unwrap(); let tracing_context = TracingContext::from_current_span(); - let results = if is_parallel { - let join_tasks = requests.into_iter().map(|(region_id, req)| { - let self_to_move = self.clone(); - let span = tracing_context.attach(info_span!( - "RegionServer::handle_region_request", - region_id = region_id.to_string() - )); - async move { - self_to_move - .handle_request(region_id, req) - .trace(span) - .await - } - }); + let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests")); + self.inner + .handle_batch_request(batch_request) + .trace(span) + .await + } - try_join_all(join_tasks) - .await - .map_err(BoxedError::new) - .context(ExecuteGrpcRequestSnafu)? - } else { - let mut results = Vec::with_capacity(requests.len()); - // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately - // modify this part to avoid inefficient serial loop calls. - for (region_id, req) in requests { - let span = tracing_context.attach(info_span!( - "RegionServer::handle_region_request", - region_id = region_id.to_string() - )); - let result = self + async fn handle_requests_in_parallel( + &self, + request: region_request::Body, + ) -> Result { + let requests = + RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?; + let tracing_context = TracingContext::from_current_span(); + + let join_tasks = requests.into_iter().map(|(region_id, req)| { + let self_to_move = self; + let span = tracing_context.attach(info_span!( + "RegionServer::handle_region_request", + region_id = region_id.to_string() + )); + async move { + self_to_move .handle_request(region_id, req) .trace(span) .await - .map_err(BoxedError::new) - .context(ExecuteGrpcRequestSnafu)?; - results.push(result); } - results - }; + }); - // merge results by sum up affected rows and merge extensions. + let results = try_join_all(join_tasks).await?; let mut affected_rows = 0; let mut extensions = HashMap::new(); for result in results { @@ -407,6 +393,57 @@ impl RegionServerHandler for RegionServer { extensions.extend(result.extensions); } + Ok(RegionResponse { + affected_rows, + extensions, + }) + } + + async fn handle_requests_in_serial( + &self, + request: region_request::Body, + ) -> Result { + let requests = + RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?; + let tracing_context = TracingContext::from_current_span(); + + let mut affected_rows = 0; + let mut extensions = HashMap::new(); + // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately + // modify this part to avoid inefficient serial loop calls. + for (region_id, req) in requests { + let span = tracing_context.attach(info_span!( + "RegionServer::handle_region_request", + region_id = region_id.to_string() + )); + let result = self.handle_request(region_id, req).trace(span).await?; + + affected_rows += result.affected_rows; + extensions.extend(result.extensions); + } + + Ok(RegionResponse { + affected_rows, + extensions, + }) + } +} + +#[async_trait] +impl RegionServerHandler for RegionServer { + async fn handle(&self, request: region_request::Body) -> ServerResult { + let response = match &request { + region_request::Body::Creates(_) + | region_request::Body::Drops(_) + | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await, + region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => { + self.handle_requests_in_parallel(request).await + } + _ => self.handle_requests_in_serial(request).await, + } + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu)?; + Ok(RegionResponseV1 { header: Some(ResponseHeader { status: Some(Status { @@ -414,8 +451,8 @@ impl RegionServerHandler for RegionServer { ..Default::default() }), }), - affected_rows: affected_rows as _, - extensions, + affected_rows: response.affected_rows as _, + extensions: response.extensions, }) } } @@ -727,6 +764,71 @@ impl RegionServerInner { .collect::>()) } + // Handle requests in batch. + // + // limitation: all create requests must be in the same engine. + pub async fn handle_batch_request( + &self, + batch_request: BatchRegionDdlRequest, + ) -> Result { + let region_changes = match &batch_request { + BatchRegionDdlRequest::Create(requests) => requests + .iter() + .map(|(region_id, create)| { + let attribute = parse_region_attribute(&create.engine, &create.options)?; + Ok((*region_id, RegionChange::Register(attribute))) + }) + .collect::>>()?, + BatchRegionDdlRequest::Drop(requests) => requests + .iter() + .map(|(region_id, _)| (*region_id, RegionChange::Deregisters)) + .collect::>(), + BatchRegionDdlRequest::Alter(requests) => requests + .iter() + .map(|(region_id, _)| (*region_id, RegionChange::None)) + .collect::>(), + }; + + // The ddl procedure will ensure all requests are in the same engine. + // Therefore, we can get the engine from the first request. + let (first_region_id, first_region_change) = region_changes.first().unwrap(); + let engine = match self.get_engine(*first_region_id, first_region_change)? { + CurrentEngine::Engine(engine) => engine, + CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)), + }; + + for (region_id, region_change) in region_changes.iter() { + self.set_region_status_not_ready(*region_id, &engine, region_change); + } + + let ddl_type = batch_request.request_type(); + let result = engine + .handle_batch_ddl_requests(batch_request) + .await + .context(HandleBatchDdlRequestSnafu { ddl_type }); + + match result { + Ok(result) => { + for (region_id, region_change) in region_changes { + self.set_region_status_ready(region_id, engine.clone(), region_change) + .await?; + } + + Ok(RegionResponse { + affected_rows: result.affected_rows, + extensions: result.extensions, + }) + } + Err(err) => { + for (region_id, region_change) in region_changes { + self.unset_region_status(region_id, &engine, region_change); + } + + Err(err) + } + } + } + pub async fn handle_request( &self, region_id: RegionId, diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index c9b0ac53db..46411d064b 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -15,6 +15,7 @@ //! Region Engine's definition use std::any::Any; +use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::sync::{Arc, Mutex}; @@ -33,7 +34,7 @@ use tokio::sync::Semaphore; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::{RegionOpenRequest, RegionRequest}; +use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The settable region role state. @@ -407,6 +408,27 @@ pub trait RegionEngine: Send + Sync { Ok(join_all(tasks).await) } + async fn handle_batch_ddl_requests( + &self, + request: BatchRegionDdlRequest, + ) -> Result { + let requests = request.into_region_requests(); + + let mut affected_rows = 0; + let mut extensions = HashMap::new(); + + for (region_id, request) in requests { + let result = self.handle_request(region_id, request).await?; + affected_rows += result.affected_rows; + extensions.extend(result.extensions); + } + + Ok(RegionResponse { + affected_rows, + extensions, + }) + } + /// Handles non-query request to the region. Returns the count of affected rows. async fn handle_request( &self, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index b391441bd5..58afdaf128 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -47,6 +47,67 @@ use crate::mito_engine_options::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; +#[derive(Debug, IntoStaticStr)] +pub enum BatchRegionDdlRequest { + Create(Vec<(RegionId, RegionCreateRequest)>), + Drop(Vec<(RegionId, RegionDropRequest)>), + Alter(Vec<(RegionId, RegionAlterRequest)>), +} + +impl BatchRegionDdlRequest { + /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`]. + pub fn try_from_request_body(body: region_request::Body) -> Result> { + match body { + region_request::Body::Creates(creates) => { + let requests = creates + .requests + .into_iter() + .map(parse_region_create) + .collect::>>()?; + Ok(Some(Self::Create(requests))) + } + region_request::Body::Drops(drops) => { + let requests = drops + .requests + .into_iter() + .map(parse_region_drop) + .collect::>>()?; + Ok(Some(Self::Drop(requests))) + } + region_request::Body::Alters(alters) => { + let requests = alters + .requests + .into_iter() + .map(parse_region_alter) + .collect::>>()?; + Ok(Some(Self::Alter(requests))) + } + _ => Ok(None), + } + } + + pub fn request_type(&self) -> &'static str { + self.into() + } + + pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> { + match self { + Self::Create(requests) => requests + .into_iter() + .map(|(region_id, request)| (region_id, RegionRequest::Create(request))) + .collect(), + Self::Drop(requests) => requests + .into_iter() + .map(|(region_id, request)| (region_id, RegionRequest::Drop(request))) + .collect(), + Self::Alter(requests) => requests + .into_iter() + .map(|(region_id, request)| (region_id, RegionRequest::Alter(request))) + .collect(), + } + } +} + #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { Put(RegionPutRequest), @@ -123,7 +184,7 @@ fn make_region_deletes(deletes: DeleteRequests) -> Result Result> { +fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> { let column_metadatas = create .column_defs .into_iter() @@ -131,16 +192,21 @@ fn make_region_create(create: CreateRequest) -> Result>>()?; let region_id = create.region_id.into(); let region_dir = region_dir(&create.path, region_id); - Ok(vec![( + Ok(( region_id, - RegionRequest::Create(RegionCreateRequest { + RegionCreateRequest { engine: create.engine, column_metadatas, primary_key: create.primary_key, options: create.options, region_dir, - }), - )]) + }, + )) +} + +fn make_region_create(create: CreateRequest) -> Result> { + let (region_id, request) = parse_region_create(create)?; + Ok(vec![(region_id, RegionRequest::Create(request))]) } fn make_region_creates(creates: CreateRequests) -> Result> { @@ -151,9 +217,14 @@ fn make_region_creates(creates: CreateRequests) -> Result Result> { +fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> { let region_id = drop.region_id.into(); - Ok(vec![(region_id, RegionRequest::Drop(RegionDropRequest {}))]) + Ok((region_id, RegionDropRequest {})) +} + +fn make_region_drop(drop: DropRequest) -> Result> { + let (region_id, request) = parse_region_drop(drop)?; + Ok(vec![(region_id, RegionRequest::Drop(request))]) } fn make_region_drops(drops: DropRequests) -> Result> { @@ -186,12 +257,15 @@ fn make_region_close(close: CloseRequest) -> Result Result> { +fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> { let region_id = alter.region_id.into(); - Ok(vec![( - region_id, - RegionRequest::Alter(RegionAlterRequest::try_from(alter)?), - )]) + let request = RegionAlterRequest::try_from(alter)?; + Ok((region_id, request)) +} + +fn make_region_alter(alter: AlterRequest) -> Result> { + let (region_id, request) = parse_region_alter(alter)?; + Ok(vec![(region_id, RegionRequest::Alter(request))]) } fn make_region_alters(alters: AlterRequests) -> Result> {