Skip to main content

datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, RwLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use api::region::RegionResponse;
27use api::v1::meta::TopicStat;
28use api::v1::region::sync_request::ManifestInfo;
29use api::v1::region::{
30    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
31};
32use api::v1::{ResponseHeader, Status};
33use arrow_flight::{FlightData, Ticket};
34use async_trait::async_trait;
35use bytes::Bytes;
36use common_error::ext::{BoxedError, ErrorExt};
37use common_error::status_code::StatusCode;
38use common_meta::datanode::TopicStatsReporter;
39use common_query::OutputData;
40use common_query::request::QueryRequest;
41use common_recordbatch::adapter::RecordBatchMetrics;
42use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
43use common_runtime::Runtime;
44use common_telemetry::tracing::{self, info_span};
45use common_telemetry::tracing_context::{FutureExt, TracingContext};
46use common_telemetry::{debug, error, info, warn};
47use dashmap::DashMap;
48use datafusion::datasource::TableProvider;
49use datafusion_common::tree_node::TreeNode;
50use either::Either;
51use futures_util::Stream;
52use futures_util::future::try_join_all;
53use metric_engine::engine::MetricEngine;
54use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
55use prost::Message;
56use query::QueryEngineRef;
57pub use query::dummy_catalog::{
58    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
59};
60use query::options::should_collect_region_watermark_from_extensions;
61use serde_json;
62use servers::error::{
63    self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
64};
65use servers::grpc::FlightCompression;
66use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
67use servers::grpc::region_server::RegionServerHandler;
68use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
69use snafu::{OptionExt, ResultExt, ensure};
70use store_api::metric_engine_consts::{
71    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
72};
73use store_api::region_engine::{
74    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
75    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
76    SyncRegionFromRequest,
77};
78use store_api::region_request::{
79    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
80    RegionOpenRequest, RegionRequest,
81};
82use store_api::storage::RegionId;
83use tokio::sync::{Semaphore, SemaphorePermit};
84use tokio::time::timeout;
85use tonic::{Request, Response, Result as TonicResult};
86
87use crate::error::{
88    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
89    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
90    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
91    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
92    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
93    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
94};
95use crate::event_listener::RegionServerEventListenerRef;
96use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
97
98#[derive(Clone)]
99pub struct RegionServer {
100    inner: Arc<RegionServerInner>,
101    flight_compression: FlightCompression,
102    suspend: Arc<AtomicBool>,
103}
104
105pub struct RegionStat {
106    pub region_id: RegionId,
107    pub engine: String,
108    pub role: RegionRole,
109}
110
111impl RegionServer {
112    pub fn new(
113        query_engine: QueryEngineRef,
114        runtime: Runtime,
115        event_listener: RegionServerEventListenerRef,
116        flight_compression: FlightCompression,
117    ) -> Self {
118        Self::with_table_provider(
119            query_engine,
120            runtime,
121            event_listener,
122            Arc::new(DummyTableProviderFactory),
123            0,
124            Duration::from_millis(0),
125            flight_compression,
126        )
127    }
128
129    pub fn with_table_provider(
130        query_engine: QueryEngineRef,
131        runtime: Runtime,
132        event_listener: RegionServerEventListenerRef,
133        table_provider_factory: TableProviderFactoryRef,
134        max_concurrent_queries: usize,
135        concurrent_query_limiter_timeout: Duration,
136        flight_compression: FlightCompression,
137    ) -> Self {
138        Self {
139            inner: Arc::new(RegionServerInner::new(
140                query_engine,
141                runtime,
142                event_listener,
143                table_provider_factory,
144                RegionServerParallelism::from_opts(
145                    max_concurrent_queries,
146                    concurrent_query_limiter_timeout,
147                ),
148            )),
149            flight_compression,
150            suspend: Arc::new(AtomicBool::new(false)),
151        }
152    }
153
154    /// Registers an engine.
155    pub fn register_engine(&mut self, engine: RegionEngineRef) {
156        self.inner.register_engine(engine);
157    }
158
159    /// Sets the topic stats.
160    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
161        self.inner.set_topic_stats_reporter(topic_stats_reporter);
162    }
163
164    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
165    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
166        match self.inner.get_engine(region_id, &RegionChange::None) {
167            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
168            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
169            Err(error::Error::RegionNotFound { .. }) => Ok(None),
170            Err(err) => Err(err),
171        }
172    }
173
174    /// Gets the MitoEngine if it's registered.
175    pub fn mito_engine(&self) -> Option<MitoEngine> {
176        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
177            Some(mito)
178        } else {
179            self.inner
180                .engines
181                .read()
182                .unwrap()
183                .get(MITO_ENGINE_NAME)
184                .cloned()
185                .and_then(|e| {
186                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
187                    if mito.is_none() {
188                        warn!("Mito engine not found in region server engines");
189                    }
190                    mito
191                })
192        }
193    }
194
195    #[tracing::instrument(skip_all)]
196    pub async fn handle_batch_open_requests(
197        &self,
198        parallelism: usize,
199        requests: Vec<(RegionId, RegionOpenRequest)>,
200        ignore_nonexistent_region: bool,
201    ) -> Result<Vec<RegionId>> {
202        self.inner
203            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
204            .await
205    }
206
207    #[tracing::instrument(skip_all)]
208    pub async fn handle_batch_catchup_requests(
209        &self,
210        parallelism: usize,
211        requests: Vec<(RegionId, RegionCatchupRequest)>,
212    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
213        self.inner
214            .handle_batch_catchup_requests(parallelism, requests)
215            .await
216    }
217
218    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
219    pub async fn handle_request(
220        &self,
221        region_id: RegionId,
222        request: RegionRequest,
223    ) -> Result<RegionResponse> {
224        self.inner.handle_request(region_id, request).await
225    }
226
227    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
228    async fn table_provider(
229        &self,
230        region_id: RegionId,
231        ctx: Option<QueryContextRef>,
232    ) -> Result<Arc<dyn TableProvider>> {
233        let status = self
234            .inner
235            .region_map
236            .get(&region_id)
237            .context(RegionNotFoundSnafu { region_id })?
238            .clone();
239        ensure!(
240            matches!(status, RegionEngineWithStatus::Ready(_)),
241            RegionNotReadySnafu { region_id }
242        );
243
244        self.inner
245            .table_provider_factory
246            .create(region_id, status.into_engine(), ctx)
247            .await
248            .context(ExecuteLogicalPlanSnafu)
249    }
250
251    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
252    pub async fn handle_remote_read(
253        &self,
254        request: api::v1::region::QueryRequest,
255        query_ctx: QueryContextRef,
256    ) -> Result<SendableRecordBatchStream> {
257        let _permit = if let Some(p) = &self.inner.parallelism {
258            Some(p.acquire().await?)
259        } else {
260            None
261        };
262
263        let region_id = RegionId::from_u64(request.region_id);
264        let catalog_list = Arc::new(NameAwareCatalogList::new(
265            self.clone(),
266            region_id,
267            query_ctx.clone(),
268        ));
269
270        if query_ctx.explain_verbose() {
271            common_telemetry::info!("Handle remote read for region: {}", region_id);
272        }
273
274        let decoder = self
275            .inner
276            .query_engine
277            .engine_context(query_ctx.clone())
278            .new_plan_decoder()
279            .context(NewPlanDecoderSnafu)?;
280
281        let plan = decoder
282            .decode(Bytes::from(request.plan), catalog_list, false)
283            .await
284            .context(DecodeLogicalPlanSnafu)?;
285
286        let stream = self
287            .inner
288            .handle_read(
289                QueryRequest {
290                    header: request.header,
291                    region_id,
292                    plan,
293                },
294                query_ctx.clone(),
295            )
296            .await?;
297
298        Ok(wrap_flow_region_watermark_stream(
299            stream, region_id, &query_ctx,
300        ))
301    }
302
303    #[tracing::instrument(skip_all)]
304    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
305        let _permit = if let Some(p) = &self.inner.parallelism {
306            Some(p.acquire().await?)
307        } else {
308            None
309        };
310
311        let ctx = request.header.as_ref().map(|h| h.into());
312        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
313
314        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
315            .context(DataFusionSnafu)?;
316        let mut injector = injector_builder
317            .build(self, request.region_id, query_ctx.clone())
318            .await?;
319
320        let plan = request
321            .plan
322            .rewrite(&mut injector)
323            .context(DataFusionSnafu)?
324            .data;
325
326        let region_id = request.region_id;
327        let stream = self
328            .inner
329            .handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
330            .await?;
331
332        Ok(wrap_flow_region_watermark_stream(
333            stream, region_id, &query_ctx,
334        ))
335    }
336
337    /// Returns all opened and reportable regions.
338    ///
339    /// Notes: except all metrics regions.
340    pub fn reportable_regions(&self) -> Vec<RegionStat> {
341        self.inner
342            .region_map
343            .iter()
344            .filter_map(|e| {
345                let region_id = *e.key();
346                // Filters out any regions whose role equals None.
347                e.role(region_id).map(|role| RegionStat {
348                    region_id,
349                    engine: e.value().name().to_string(),
350                    role,
351                })
352            })
353            .collect()
354    }
355
356    /// Returns the reportable topics.
357    pub fn topic_stats(&self) -> Vec<TopicStat> {
358        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
359        let Some(reporter) = reporter.as_mut() else {
360            return vec![];
361        };
362        reporter
363            .reportable_topics()
364            .into_iter()
365            .map(|stat| TopicStat {
366                topic_name: stat.topic,
367                record_size: stat.record_size,
368                record_num: stat.record_num,
369                latest_entry_id: stat.latest_entry_id,
370            })
371            .collect()
372    }
373
374    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
375        self.inner.region_map.get(&region_id).and_then(|engine| {
376            engine.role(region_id).map(|role| match role {
377                RegionRole::Follower => false,
378                RegionRole::Leader => true,
379                RegionRole::StagingLeader => true,
380                RegionRole::DowngradingLeader => true,
381            })
382        })
383    }
384
385    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
386        let engine = self
387            .inner
388            .region_map
389            .get(&region_id)
390            .with_context(|| RegionNotFoundSnafu { region_id })?;
391        engine
392            .set_region_role(region_id, role)
393            .with_context(|_| HandleRegionRequestSnafu { region_id })
394    }
395
396    /// Set region role state gracefully.
397    ///
398    /// For [SettableRegionRoleState::Follower]:
399    /// After the call returns, the engine ensures that
400    /// no **further** write or flush operations will succeed in this region.
401    ///
402    /// For [SettableRegionRoleState::DowngradingLeader]:
403    /// After the call returns, the engine ensures that
404    /// no **further** write operations will succeed in this region.
405    pub async fn set_region_role_state_gracefully(
406        &self,
407        region_id: RegionId,
408        state: SettableRegionRoleState,
409    ) -> Result<SetRegionRoleStateResponse> {
410        match self.inner.region_map.get(&region_id) {
411            Some(engine) => Ok(engine
412                .set_region_role_state_gracefully(region_id, state)
413                .await
414                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
415            None => Ok(SetRegionRoleStateResponse::NotFound),
416        }
417    }
418
419    pub fn runtime(&self) -> Runtime {
420        self.inner.runtime.clone()
421    }
422
423    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
424        match self.inner.region_map.get(&region_id) {
425            Some(e) => e.region_statistic(region_id),
426            None => None,
427        }
428    }
429
430    /// Stop the region server.
431    pub async fn stop(&self) -> Result<()> {
432        self.inner.stop().await
433    }
434
435    #[cfg(test)]
436    /// Registers a region for test purpose.
437    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
438        {
439            let mut engines = self.inner.engines.write().unwrap();
440            if !engines.contains_key(engine.name()) {
441                debug!("Registering test engine: {}", engine.name());
442                engines.insert(engine.name().to_string(), engine.clone());
443            }
444        }
445
446        self.inner
447            .region_map
448            .insert(region_id, RegionEngineWithStatus::Ready(engine));
449    }
450
451    async fn handle_batch_ddl_requests(
452        &self,
453        request: region_request::Body,
454    ) -> Result<RegionResponse> {
455        // Safety: we have already checked the request type in `RegionServer::handle()`.
456        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
457            .context(BuildRegionRequestsSnafu)?
458            .unwrap();
459        let tracing_context = TracingContext::from_current_span();
460
461        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
462        self.inner
463            .handle_batch_request(batch_request)
464            .trace(span)
465            .await
466    }
467
468    async fn handle_requests_in_parallel(
469        &self,
470        request: region_request::Body,
471    ) -> Result<RegionResponse> {
472        let requests =
473            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
474
475        // Try to optimize batch Put requests for metric engine
476        // Returns either Some(response) or None(requests_back)
477        match self.try_handle_metric_batch_puts(requests).await? {
478            Either::Left(response) => Ok(response),
479            Either::Right(requests) => {
480                // Fallback: original parallel processing
481                let tracing_context = TracingContext::from_current_span();
482                let join_tasks =
483                    requests
484                        .into_iter()
485                        .map(|(region_id, req): (RegionId, RegionRequest)| {
486                            let self_to_move = self;
487                            let span = tracing_context.attach(info_span!(
488                                "RegionServer::handle_region_request",
489                                region_id = region_id.to_string()
490                            ));
491                            async move {
492                                self_to_move
493                                    .handle_request(region_id, req)
494                                    .trace(span)
495                                    .await
496                            }
497                        });
498
499                let results = try_join_all(join_tasks).await?;
500                let mut affected_rows = 0;
501                let mut extensions = HashMap::new();
502                for result in results {
503                    affected_rows += result.affected_rows;
504                    extensions.extend(result.extensions);
505                }
506
507                Ok(RegionResponse {
508                    affected_rows,
509                    extensions,
510                    metadata: Vec::new(),
511                })
512            }
513        }
514    }
515
516    async fn handle_requests_in_serial(
517        &self,
518        request: region_request::Body,
519    ) -> Result<RegionResponse> {
520        let requests =
521            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
522        let tracing_context = TracingContext::from_current_span();
523
524        let mut affected_rows = 0;
525        let mut extensions = HashMap::new();
526        for (region_id, req) in requests {
527            let span = tracing_context.attach(info_span!(
528                "RegionServer::handle_region_request",
529                region_id = region_id.to_string()
530            ));
531            let result = self.handle_request(region_id, req).trace(span).await?;
532
533            affected_rows += result.affected_rows;
534            extensions.extend(result.extensions);
535        }
536
537        Ok(RegionResponse {
538            affected_rows,
539            extensions,
540            metadata: Vec::new(),
541        })
542    }
543
544    /// Attempts to optimize batch Put requests for metric engine.
545    ///
546    /// Returns Either::Left(response) if optimization succeeded,
547    /// or Either::Right(original_requests) to fall back to parallel processing.
548    ///
549    /// This avoids cloning requests when optimization cannot be applied.
550    async fn try_handle_metric_batch_puts(
551        &self,
552        requests: Vec<(RegionId, RegionRequest)>,
553    ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
554        if requests.is_empty() {
555            return Ok(Either::Right(requests));
556        }
557
558        // Quick check: verify first request is Put and is metric engine
559        if !matches!(requests[0].1, RegionRequest::Put(_)) {
560            return Ok(Either::Right(requests));
561        }
562        let first_region_id = requests[0].0;
563        let request_type = requests[0].1.request_type();
564
565        // SAFETY: If the first request belongs to metric engine, then ALL requests
566        // in this batch are guaranteed to belong to metric engine. This invariant
567        // is maintained by the request batching logic upstream.
568        let engine = match self
569            .inner
570            .get_engine(first_region_id, &RegionChange::None)?
571        {
572            CurrentEngine::Engine(e) => e,
573            _ => return Ok(Either::Right(requests)),
574        };
575
576        if engine.name() != METRIC_ENGINE_NAME {
577            return Ok(Either::Right(requests));
578        }
579
580        // Check if ALL requests are Put (now we know it's worth checking)
581        let mut all_puts = true;
582        for (_, req) in &requests {
583            if !matches!(req, RegionRequest::Put(_)) {
584                all_puts = false;
585                break;
586            }
587        }
588
589        if !all_puts {
590            return Ok(Either::Right(requests));
591        }
592
593        // Now extract Put requests by consuming ownership (zero clone!)
594        let put_requests = requests.into_iter().map(|(region_id, req)| {
595            if let RegionRequest::Put(put) = req {
596                (region_id, put)
597            } else {
598                unreachable!("Already checked all are Put")
599            }
600        });
601
602        // Downcast to MetricEngine and call batch API
603        let metric_engine =
604            engine
605                .as_any()
606                .downcast_ref::<MetricEngine>()
607                .context(UnexpectedSnafu {
608                    violated: "Failed to downcast to MetricEngine",
609                })?;
610
611        let tracing_context = TracingContext::from_current_span();
612        let batch_size = put_requests.len();
613        let span = tracing_context.attach(info_span!(
614            "RegionServer::handle_metric_batch_puts",
615            batch_size = batch_size,
616        ));
617        let result = metric_engine
618            .put_regions_batch(put_requests)
619            .trace(span)
620            .await
621            .map_err(BoxedError::new)
622            .context(HandleRegionRequestSnafu {
623                region_id: first_region_id,
624            });
625
626        match result {
627            Ok(total_affected) => {
628                crate::metrics::REGION_CHANGED_ROW_COUNT
629                    .with_label_values(&[request_type])
630                    .inc_by(total_affected as u64);
631                Ok(Either::Left(RegionResponse::new(total_affected)))
632            }
633            Err(err) => {
634                crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
635                    .with_label_values(&[request_type])
636                    .inc_by(batch_size as u64);
637                Err(err)
638            }
639        }
640    }
641
642    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
643        let region_id = RegionId::from_u64(request.region_id);
644        let manifest_info = request
645            .manifest_info
646            .context(error::MissingRequiredFieldSnafu {
647                name: "manifest_info",
648            })?;
649
650        let manifest_info = match manifest_info {
651            ManifestInfo::MitoManifestInfo(info) => {
652                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
653            }
654            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
655                info.data_manifest_version,
656                0,
657                info.metadata_manifest_version,
658                0,
659            ),
660        };
661
662        let tracing_context = TracingContext::from_current_span();
663        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
664
665        self.sync_region(
666            region_id,
667            SyncRegionFromRequest::from_manifest(manifest_info),
668        )
669        .trace(span)
670        .await
671        .map(|_| RegionResponse::new(AffectedRows::default()))
672    }
673
674    /// Handles the ListMetadata request and retrieves metadata for specified regions.
675    ///
676    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
677    /// non-existing regions as `null`.
678    #[tracing::instrument(skip_all)]
679    async fn handle_list_metadata_request(
680        &self,
681        request: &ListMetadataRequest,
682    ) -> Result<RegionResponse> {
683        let mut region_metadatas = Vec::new();
684        // Collect metadata for each region
685        for region_id in &request.region_ids {
686            let region_id = RegionId::from_u64(*region_id);
687            // Get the engine.
688            let Some(engine) = self.find_engine(region_id)? else {
689                region_metadatas.push(None);
690                continue;
691            };
692
693            match engine.get_metadata(region_id).await {
694                Ok(metadata) => region_metadatas.push(Some(metadata)),
695                Err(err) => {
696                    if err.status_code() == StatusCode::RegionNotFound {
697                        region_metadatas.push(None);
698                    } else {
699                        Err(err).with_context(|_| GetRegionMetadataSnafu {
700                            engine: engine.name(),
701                            region_id,
702                        })?;
703                    }
704                }
705            }
706        }
707
708        // Serialize metadata to JSON
709        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
710
711        let response = RegionResponse::from_metadata(json_result);
712
713        Ok(response)
714    }
715
716    /// Sync region manifest and registers new opened logical regions.
717    pub async fn sync_region(
718        &self,
719        region_id: RegionId,
720        request: SyncRegionFromRequest,
721    ) -> Result<()> {
722        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
723            CurrentEngine::Engine(engine) => engine,
724            _ => {
725                return UnexpectedSnafu {
726                    violated: "unexpected EarlyReturn engine status for a ready region",
727                }
728                .fail();
729            }
730        };
731
732        self.inner
733            .handle_sync_region(&engine, region_id, request)
734            .await
735    }
736
737    /// Remaps manifests from old regions to new regions.
738    pub async fn remap_manifests(
739        &self,
740        request: RemapManifestsRequest,
741    ) -> Result<RemapManifestsResponse> {
742        let region_id = request.region_id;
743        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
744            CurrentEngine::Engine(engine) => engine,
745            _ => {
746                return UnexpectedSnafu {
747                    violated: "unexpected EarlyReturn engine status for a ready region",
748                }
749                .fail();
750            }
751        };
752
753        engine
754            .remap_manifests(request)
755            .await
756            .with_context(|_| HandleRegionRequestSnafu { region_id })
757    }
758
759    fn is_suspended(&self) -> bool {
760        self.suspend.load(Ordering::Relaxed)
761    }
762
763    pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
764        self.suspend.clone()
765    }
766}
767
768fn wrap_flow_region_watermark_stream(
769    stream: SendableRecordBatchStream,
770    region_id: RegionId,
771    query_ctx: &QueryContextRef,
772) -> SendableRecordBatchStream {
773    let Some(seq) = should_collect_region_watermark_from_extensions(&query_ctx.extensions())
774        .then(|| query_ctx.get_snapshot(region_id.as_u64()))
775        .flatten()
776    else {
777        return stream;
778    };
779
780    Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
781}
782
783/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
784struct RegionWatermarkStream {
785    stream: SendableRecordBatchStream,
786    region_id: u64,
787    snapshot_seq: u64,
788    finished: bool,
789}
790
791impl RegionWatermarkStream {
792    fn new(stream: SendableRecordBatchStream, region_id: RegionId, snapshot_seq: u64) -> Self {
793        Self {
794            stream,
795            region_id: region_id.as_u64(),
796            snapshot_seq,
797            finished: false,
798        }
799    }
800
801    fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
802        if metrics
803            .region_watermarks
804            .iter()
805            .any(|entry| entry.region_id == self.region_id)
806        {
807            return metrics;
808        }
809
810        metrics
811            .region_watermarks
812            .push(common_recordbatch::adapter::RegionWatermarkEntry {
813                region_id: self.region_id,
814                watermark: Some(self.snapshot_seq),
815            });
816        metrics
817    }
818}
819
820impl RecordBatchStream for RegionWatermarkStream {
821    fn name(&self) -> &str {
822        self.stream.name()
823    }
824
825    fn schema(&self) -> datatypes::schema::SchemaRef {
826        self.stream.schema()
827    }
828
829    fn output_ordering(&self) -> Option<&[OrderOption]> {
830        self.stream.output_ordering()
831    }
832
833    fn metrics(&self) -> Option<RecordBatchMetrics> {
834        let base = self.stream.metrics();
835        if !self.finished {
836            return base;
837        }
838
839        Some(self.merged_metrics(base.unwrap_or_default()))
840    }
841}
842
843impl Stream for RegionWatermarkStream {
844    type Item = common_recordbatch::error::Result<RecordBatch>;
845
846    fn size_hint(&self) -> (usize, Option<usize>) {
847        self.stream.size_hint()
848    }
849
850    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
851        match Pin::new(&mut self.stream).poll_next(cx) {
852            Poll::Ready(None) => {
853                self.finished = true;
854                Poll::Ready(None)
855            }
856            other => other,
857        }
858    }
859}
860
861#[async_trait]
862impl RegionServerHandler for RegionServer {
863    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
864        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
865            .with_label_values(&[request.as_ref()]);
866        let response = match &request {
867            region_request::Body::Creates(_)
868            | region_request::Body::Drops(_)
869            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
870            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
871                self.handle_requests_in_parallel(request).await
872            }
873            region_request::Body::Sync(sync_request) => {
874                self.handle_sync_region_request(sync_request).await
875            }
876            region_request::Body::ListMetadata(list_metadata_request) => {
877                self.handle_list_metadata_request(list_metadata_request)
878                    .await
879            }
880            _ => self.handle_requests_in_serial(request).await,
881        }
882        .map_err(BoxedError::new)
883        .inspect_err(|_| {
884            failed_requests_cnt.inc();
885        })
886        .context(ExecuteGrpcRequestSnafu)?;
887
888        Ok(RegionResponseV1 {
889            header: Some(ResponseHeader {
890                status: Some(Status {
891                    status_code: StatusCode::Success as _,
892                    ..Default::default()
893                }),
894            }),
895            affected_rows: response.affected_rows as _,
896            extensions: response.extensions,
897            metadata: response.metadata,
898        })
899    }
900}
901
902#[async_trait]
903impl FlightCraft for RegionServer {
904    async fn do_get(
905        &self,
906        request: Request<Ticket>,
907    ) -> TonicResult<Response<TonicStream<FlightData>>> {
908        ensure!(!self.is_suspended(), SuspendedSnafu);
909
910        let ticket = request.into_inner().ticket;
911        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
912            .context(servers_error::InvalidFlightTicketSnafu)?;
913        let tracing_context = request
914            .header
915            .as_ref()
916            .map(|h| TracingContext::from_w3c(&h.tracing_context))
917            .unwrap_or_default();
918        let query_ctx = request
919            .header
920            .as_ref()
921            .map(|h| Arc::new(QueryContext::from(h)))
922            .unwrap_or(QueryContext::arc());
923
924        let result = self
925            .handle_remote_read(request, query_ctx.clone())
926            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
927            .await?;
928
929        let stream = Box::pin(FlightRecordBatchStream::new(
930            result,
931            tracing_context,
932            self.flight_compression,
933            query_ctx,
934        ));
935        Ok(Response::new(stream))
936    }
937}
938
939#[derive(Clone)]
940enum RegionEngineWithStatus {
941    // An opening, or creating region.
942    Registering(RegionEngineRef),
943    // A closing, or dropping region.
944    Deregistering(RegionEngineRef),
945    // A ready region.
946    Ready(RegionEngineRef),
947}
948
949impl RegionEngineWithStatus {
950    /// Returns [RegionEngineRef].
951    pub fn into_engine(self) -> RegionEngineRef {
952        match self {
953            RegionEngineWithStatus::Registering(engine) => engine,
954            RegionEngineWithStatus::Deregistering(engine) => engine,
955            RegionEngineWithStatus::Ready(engine) => engine,
956        }
957    }
958}
959
960impl Deref for RegionEngineWithStatus {
961    type Target = RegionEngineRef;
962
963    fn deref(&self) -> &Self::Target {
964        match self {
965            RegionEngineWithStatus::Registering(engine) => engine,
966            RegionEngineWithStatus::Deregistering(engine) => engine,
967            RegionEngineWithStatus::Ready(engine) => engine,
968        }
969    }
970}
971
972struct RegionServerInner {
973    engines: RwLock<HashMap<String, RegionEngineRef>>,
974    region_map: DashMap<RegionId, RegionEngineWithStatus>,
975    query_engine: QueryEngineRef,
976    runtime: Runtime,
977    event_listener: RegionServerEventListenerRef,
978    table_provider_factory: TableProviderFactoryRef,
979    /// The number of queries allowed to be executed at the same time.
980    /// Act as last line of defense on datanode to prevent query overloading.
981    parallelism: Option<RegionServerParallelism>,
982    /// The topic stats reporter.
983    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
984    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
985    /// server with a concrete engine; acceptable for now to fetch Mito-specific
986    /// info (e.g., list SSTs). Consider a diagnostics trait later.
987    mito_engine: RwLock<Option<MitoEngine>>,
988}
989
990struct RegionServerParallelism {
991    semaphore: Semaphore,
992    timeout: Duration,
993}
994
995impl RegionServerParallelism {
996    pub fn from_opts(
997        max_concurrent_queries: usize,
998        concurrent_query_limiter_timeout: Duration,
999    ) -> Option<Self> {
1000        if max_concurrent_queries == 0 {
1001            return None;
1002        }
1003        Some(RegionServerParallelism {
1004            semaphore: Semaphore::new(max_concurrent_queries),
1005            timeout: concurrent_query_limiter_timeout,
1006        })
1007    }
1008
1009    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
1010        timeout(self.timeout, self.semaphore.acquire())
1011            .await
1012            .context(ConcurrentQueryLimiterTimeoutSnafu)?
1013            .context(ConcurrentQueryLimiterClosedSnafu)
1014    }
1015}
1016
1017enum CurrentEngine {
1018    Engine(RegionEngineRef),
1019    EarlyReturn(AffectedRows),
1020}
1021
1022impl Debug for CurrentEngine {
1023    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1024        match self {
1025            CurrentEngine::Engine(engine) => f
1026                .debug_struct("CurrentEngine")
1027                .field("engine", &engine.name())
1028                .finish(),
1029            CurrentEngine::EarlyReturn(rows) => f
1030                .debug_struct("CurrentEngine")
1031                .field("return", rows)
1032                .finish(),
1033        }
1034    }
1035}
1036
1037impl RegionServerInner {
1038    pub fn new(
1039        query_engine: QueryEngineRef,
1040        runtime: Runtime,
1041        event_listener: RegionServerEventListenerRef,
1042        table_provider_factory: TableProviderFactoryRef,
1043        parallelism: Option<RegionServerParallelism>,
1044    ) -> Self {
1045        Self {
1046            engines: RwLock::new(HashMap::new()),
1047            region_map: DashMap::new(),
1048            query_engine,
1049            runtime,
1050            event_listener,
1051            table_provider_factory,
1052            parallelism,
1053            topic_stats_reporter: RwLock::new(None),
1054            mito_engine: RwLock::new(None),
1055        }
1056    }
1057
1058    pub fn register_engine(&self, engine: RegionEngineRef) {
1059        let engine_name = engine.name();
1060        if engine_name == MITO_ENGINE_NAME
1061            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
1062        {
1063            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
1064        }
1065
1066        info!("Region Engine {engine_name} is registered");
1067        self.engines
1068            .write()
1069            .unwrap()
1070            .insert(engine_name.to_string(), engine);
1071    }
1072
1073    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
1074        info!("Set topic stats reporter");
1075        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
1076    }
1077
1078    fn get_engine(
1079        &self,
1080        region_id: RegionId,
1081        region_change: &RegionChange,
1082    ) -> Result<CurrentEngine> {
1083        let current_region_status = self.region_map.get(&region_id);
1084
1085        let engine = match region_change {
1086            RegionChange::Register(attribute) => match current_region_status {
1087                Some(status) => match status.clone() {
1088                    RegionEngineWithStatus::Registering(engine) => engine,
1089                    RegionEngineWithStatus::Deregistering(_) => {
1090                        return error::RegionBusySnafu { region_id }.fail();
1091                    }
1092                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1093                },
1094                _ => self
1095                    .engines
1096                    .read()
1097                    .unwrap()
1098                    .get(attribute.engine())
1099                    .with_context(|| RegionEngineNotFoundSnafu {
1100                        name: attribute.engine(),
1101                    })?
1102                    .clone(),
1103            },
1104            RegionChange::Deregisters => match current_region_status {
1105                Some(status) => match status.clone() {
1106                    RegionEngineWithStatus::Registering(_) => {
1107                        return error::RegionBusySnafu { region_id }.fail();
1108                    }
1109                    RegionEngineWithStatus::Deregistering(_) => {
1110                        return Ok(CurrentEngine::EarlyReturn(0));
1111                    }
1112                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1113                },
1114                None => return Ok(CurrentEngine::EarlyReturn(0)),
1115            },
1116            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1117                match current_region_status {
1118                    Some(status) => match status.clone() {
1119                        RegionEngineWithStatus::Registering(_) => {
1120                            return error::RegionNotReadySnafu { region_id }.fail();
1121                        }
1122                        RegionEngineWithStatus::Deregistering(_) => {
1123                            return error::RegionNotFoundSnafu { region_id }.fail();
1124                        }
1125                        RegionEngineWithStatus::Ready(engine) => engine,
1126                    },
1127                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
1128                }
1129            }
1130        };
1131
1132        Ok(CurrentEngine::Engine(engine))
1133    }
1134
1135    async fn handle_batch_open_requests_inner(
1136        &self,
1137        engine: RegionEngineRef,
1138        parallelism: usize,
1139        requests: Vec<(RegionId, RegionOpenRequest)>,
1140        ignore_nonexistent_region: bool,
1141    ) -> Result<Vec<RegionId>> {
1142        let region_changes = requests
1143            .iter()
1144            .map(|(region_id, open)| {
1145                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1146                Ok((*region_id, RegionChange::Register(attribute)))
1147            })
1148            .collect::<Result<HashMap<_, _>>>()?;
1149
1150        for (&region_id, region_change) in &region_changes {
1151            self.set_region_status_not_ready(region_id, &engine, region_change)
1152        }
1153
1154        let mut open_regions = Vec::with_capacity(requests.len());
1155        let mut errors = vec![];
1156        match engine
1157            .handle_batch_open_requests(parallelism, requests)
1158            .await
1159            .with_context(|_| HandleBatchOpenRequestSnafu)
1160        {
1161            Ok(results) => {
1162                for (region_id, result) in results {
1163                    let region_change = &region_changes[&region_id];
1164                    match result {
1165                        Ok(_) => {
1166                            if let Err(e) = self
1167                                .set_region_status_ready(region_id, engine.clone(), *region_change)
1168                                .await
1169                            {
1170                                error!(e; "Failed to set region to ready: {}", region_id);
1171                                errors.push(BoxedError::new(e));
1172                            } else {
1173                                open_regions.push(region_id)
1174                            }
1175                        }
1176                        Err(e) => {
1177                            self.unset_region_status(region_id, &engine, *region_change);
1178                            if e.status_code() == StatusCode::RegionNotFound
1179                                && ignore_nonexistent_region
1180                            {
1181                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1182                            } else {
1183                                error!(e; "Failed to open region: {}", region_id);
1184                                errors.push(e);
1185                            }
1186                        }
1187                    }
1188                }
1189            }
1190            Err(e) => {
1191                for (&region_id, region_change) in &region_changes {
1192                    self.unset_region_status(region_id, &engine, *region_change);
1193                }
1194                error!(e; "Failed to open batch regions");
1195                errors.push(BoxedError::new(e));
1196            }
1197        }
1198
1199        if !errors.is_empty() {
1200            return error::UnexpectedSnafu {
1201                // Returns the first error.
1202                violated: format!("Failed to open batch regions: {:?}", errors[0]),
1203            }
1204            .fail();
1205        }
1206
1207        Ok(open_regions)
1208    }
1209
1210    pub async fn handle_batch_open_requests(
1211        &self,
1212        parallelism: usize,
1213        requests: Vec<(RegionId, RegionOpenRequest)>,
1214        ignore_nonexistent_region: bool,
1215    ) -> Result<Vec<RegionId>> {
1216        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1217            HashMap::with_capacity(requests.len());
1218        for (region_id, request) in requests {
1219            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1220                requests.push((region_id, request));
1221            } else {
1222                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1223            }
1224        }
1225
1226        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1227        for (engine, requests) in engine_grouped_requests {
1228            let engine = self
1229                .engines
1230                .read()
1231                .unwrap()
1232                .get(&engine)
1233                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1234                .clone();
1235            results.push(
1236                self.handle_batch_open_requests_inner(
1237                    engine,
1238                    parallelism,
1239                    requests,
1240                    ignore_nonexistent_region,
1241                )
1242                .await,
1243            )
1244        }
1245
1246        Ok(results
1247            .into_iter()
1248            .collect::<Result<Vec<_>>>()?
1249            .into_iter()
1250            .flatten()
1251            .collect::<Vec<_>>())
1252    }
1253
1254    pub async fn handle_batch_catchup_requests_inner(
1255        &self,
1256        engine: RegionEngineRef,
1257        parallelism: usize,
1258        requests: Vec<(RegionId, RegionCatchupRequest)>,
1259    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1260        for (region_id, _) in &requests {
1261            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1262        }
1263        let region_ids = requests
1264            .iter()
1265            .map(|(region_id, _)| *region_id)
1266            .collect::<Vec<_>>();
1267        let mut responses = Vec::with_capacity(requests.len());
1268        match engine
1269            .handle_batch_catchup_requests(parallelism, requests)
1270            .await
1271        {
1272            Ok(results) => {
1273                for (region_id, result) in results {
1274                    match result {
1275                        Ok(_) => {
1276                            if let Err(e) = self
1277                                .set_region_status_ready(
1278                                    region_id,
1279                                    engine.clone(),
1280                                    RegionChange::Catchup,
1281                                )
1282                                .await
1283                            {
1284                                error!(e; "Failed to set region to ready: {}", region_id);
1285                                responses.push((region_id, Err(BoxedError::new(e))));
1286                            } else {
1287                                responses.push((region_id, Ok(())));
1288                            }
1289                        }
1290                        Err(e) => {
1291                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1292                            error!(e; "Failed to catchup region: {}", region_id);
1293                            responses.push((region_id, Err(e)));
1294                        }
1295                    }
1296                }
1297            }
1298            Err(e) => {
1299                for region_id in region_ids {
1300                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1301                }
1302                error!(e; "Failed to catchup batch regions");
1303                return error::UnexpectedSnafu {
1304                    violated: format!("Failed to catchup batch regions: {:?}", e),
1305                }
1306                .fail();
1307            }
1308        }
1309
1310        Ok(responses)
1311    }
1312
1313    pub async fn handle_batch_catchup_requests(
1314        &self,
1315        parallelism: usize,
1316        requests: Vec<(RegionId, RegionCatchupRequest)>,
1317    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1318        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1319
1320        let mut responses = Vec::with_capacity(requests.len());
1321        for (region_id, request) in requests {
1322            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1323                match engine {
1324                    CurrentEngine::Engine(engine) => {
1325                        engine_grouped_requests
1326                            .entry(engine.name().to_string())
1327                            .or_default()
1328                            .push((region_id, request));
1329                    }
1330                    CurrentEngine::EarlyReturn(_) => {
1331                        return error::UnexpectedSnafu {
1332                            violated: format!("Unexpected engine type for region {}", region_id),
1333                        }
1334                        .fail();
1335                    }
1336                }
1337            } else {
1338                responses.push((
1339                    region_id,
1340                    Err(BoxedError::new(
1341                        error::RegionNotFoundSnafu { region_id }.build(),
1342                    )),
1343                ));
1344            }
1345        }
1346
1347        for (engine, requests) in engine_grouped_requests {
1348            let engine = self
1349                .engines
1350                .read()
1351                .unwrap()
1352                .get(&engine)
1353                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1354                .clone();
1355            responses.extend(
1356                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1357                    .await?,
1358            );
1359        }
1360
1361        Ok(responses)
1362    }
1363
1364    // Handle requests in batch.
1365    //
1366    // limitation: all create requests must be in the same engine.
1367    pub async fn handle_batch_request(
1368        &self,
1369        batch_request: BatchRegionDdlRequest,
1370    ) -> Result<RegionResponse> {
1371        let region_changes = match &batch_request {
1372            BatchRegionDdlRequest::Create(requests) => requests
1373                .iter()
1374                .map(|(region_id, create)| {
1375                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1376                    Ok((*region_id, RegionChange::Register(attribute)))
1377                })
1378                .collect::<Result<Vec<_>>>()?,
1379            BatchRegionDdlRequest::Drop(requests) => requests
1380                .iter()
1381                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1382                .collect::<Vec<_>>(),
1383            BatchRegionDdlRequest::Alter(requests) => requests
1384                .iter()
1385                .map(|(region_id, _)| (*region_id, RegionChange::None))
1386                .collect::<Vec<_>>(),
1387        };
1388
1389        // The ddl procedure will ensure all requests are in the same engine.
1390        // Therefore, we can get the engine from the first request.
1391        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1392        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1393            CurrentEngine::Engine(engine) => engine,
1394            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1395        };
1396
1397        for (region_id, region_change) in region_changes.iter() {
1398            self.set_region_status_not_ready(*region_id, &engine, region_change);
1399        }
1400
1401        let ddl_type = batch_request.request_type();
1402        let result = engine
1403            .handle_batch_ddl_requests(batch_request)
1404            .await
1405            .context(HandleBatchDdlRequestSnafu { ddl_type });
1406
1407        match result {
1408            Ok(result) => {
1409                for (region_id, region_change) in &region_changes {
1410                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1411                        .await?;
1412                }
1413
1414                Ok(RegionResponse {
1415                    affected_rows: result.affected_rows,
1416                    extensions: result.extensions,
1417                    metadata: Vec::new(),
1418                })
1419            }
1420            Err(err) => {
1421                for (region_id, region_change) in region_changes {
1422                    self.unset_region_status(region_id, &engine, region_change);
1423                }
1424
1425                Err(err)
1426            }
1427        }
1428    }
1429
1430    pub async fn handle_request(
1431        &self,
1432        region_id: RegionId,
1433        request: RegionRequest,
1434    ) -> Result<RegionResponse> {
1435        let request_type = request.request_type();
1436        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1437            .with_label_values(&[request_type])
1438            .start_timer();
1439
1440        let region_change = match &request {
1441            RegionRequest::Create(create) => {
1442                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1443                RegionChange::Register(attribute)
1444            }
1445            RegionRequest::Open(open) => {
1446                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1447                RegionChange::Register(attribute)
1448            }
1449            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1450            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1451                RegionChange::Ingest
1452            }
1453            RegionRequest::Alter(_)
1454            | RegionRequest::Flush(_)
1455            | RegionRequest::Compact(_)
1456            | RegionRequest::Truncate(_)
1457            | RegionRequest::BuildIndex(_)
1458            | RegionRequest::EnterStaging(_)
1459            | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1460            RegionRequest::Catchup(_) => RegionChange::Catchup,
1461        };
1462
1463        let engine = match self.get_engine(region_id, &region_change)? {
1464            CurrentEngine::Engine(engine) => engine,
1465            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1466        };
1467
1468        // Sets corresponding region status to registering/deregistering before the operation.
1469        self.set_region_status_not_ready(region_id, &engine, &region_change);
1470
1471        match engine
1472            .handle_request(region_id, request)
1473            .await
1474            .with_context(|_| HandleRegionRequestSnafu { region_id })
1475        {
1476            Ok(result) => {
1477                // Update metrics
1478                if matches!(region_change, RegionChange::Ingest) {
1479                    crate::metrics::REGION_CHANGED_ROW_COUNT
1480                        .with_label_values(&[request_type])
1481                        .inc_by(result.affected_rows as u64);
1482                }
1483                // Sets corresponding region status to ready.
1484                self.set_region_status_ready(region_id, engine.clone(), region_change)
1485                    .await?;
1486
1487                Ok(RegionResponse {
1488                    affected_rows: result.affected_rows,
1489                    extensions: result.extensions,
1490                    metadata: Vec::new(),
1491                })
1492            }
1493            Err(err) => {
1494                if matches!(region_change, RegionChange::Ingest) {
1495                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1496                        .with_label_values(&[request_type])
1497                        .inc();
1498                }
1499                // Removes the region status if the operation fails.
1500                self.unset_region_status(region_id, &engine, region_change);
1501                Err(err)
1502            }
1503        }
1504    }
1505
1506    /// Handles the sync region request.
1507    pub async fn handle_sync_region(
1508        &self,
1509        engine: &RegionEngineRef,
1510        region_id: RegionId,
1511        request: SyncRegionFromRequest,
1512    ) -> Result<()> {
1513        let Some(new_opened_regions) = engine
1514            .sync_region(region_id, request)
1515            .await
1516            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1517            .new_opened_logical_region_ids()
1518        else {
1519            return Ok(());
1520        };
1521
1522        for region in &new_opened_regions {
1523            self.region_map
1524                .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1525        }
1526        if !new_opened_regions.is_empty() {
1527            info!(
1528                region_id = %region_id,
1529                logical_region_count = new_opened_regions.len(),
1530                logical_regions = ?new_opened_regions,
1531                "Logical regions are registered"
1532            );
1533        }
1534
1535        Ok(())
1536    }
1537
1538    fn set_region_status_not_ready(
1539        &self,
1540        region_id: RegionId,
1541        engine: &RegionEngineRef,
1542        region_change: &RegionChange,
1543    ) {
1544        match region_change {
1545            RegionChange::Register(_) => {
1546                self.region_map.insert(
1547                    region_id,
1548                    RegionEngineWithStatus::Registering(engine.clone()),
1549                );
1550            }
1551            RegionChange::Deregisters => {
1552                self.region_map.insert(
1553                    region_id,
1554                    RegionEngineWithStatus::Deregistering(engine.clone()),
1555                );
1556            }
1557            _ => {}
1558        }
1559    }
1560
1561    fn unset_region_status(
1562        &self,
1563        region_id: RegionId,
1564        engine: &RegionEngineRef,
1565        region_change: RegionChange,
1566    ) {
1567        match region_change {
1568            RegionChange::None | RegionChange::Ingest => {}
1569            RegionChange::Register(_) => {
1570                self.region_map.remove(&region_id);
1571            }
1572            RegionChange::Deregisters => {
1573                self.region_map
1574                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1575            }
1576            RegionChange::Catchup => {}
1577        }
1578    }
1579
1580    async fn set_region_status_ready(
1581        &self,
1582        region_id: RegionId,
1583        engine: RegionEngineRef,
1584        region_change: RegionChange,
1585    ) -> Result<()> {
1586        let engine_type = engine.name();
1587        match region_change {
1588            RegionChange::None | RegionChange::Ingest => {}
1589            RegionChange::Register(attribute) => {
1590                info!(
1591                    "Region {region_id} is registered to engine {}",
1592                    attribute.engine()
1593                );
1594                self.region_map
1595                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1596
1597                match attribute {
1598                    RegionAttribute::Metric { physical } => {
1599                        if physical {
1600                            // Registers the logical regions belong to the physical region (`region_id`).
1601                            self.register_logical_regions(&engine, region_id).await?;
1602                            // We only send the `on_region_registered` event of the physical region.
1603                            self.event_listener.on_region_registered(region_id);
1604                        }
1605                    }
1606                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1607                    RegionAttribute::File => {
1608                        // do nothing
1609                    }
1610                }
1611            }
1612            RegionChange::Deregisters => {
1613                info!("Region {region_id} is deregistered from engine {engine_type}");
1614                self.region_map
1615                    .remove(&region_id)
1616                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1617                self.event_listener.on_region_deregistered(region_id);
1618            }
1619            RegionChange::Catchup => {
1620                if is_metric_engine(engine.name()) {
1621                    // Registers the logical regions belong to the physical region (`region_id`).
1622                    self.register_logical_regions(&engine, region_id).await?;
1623                }
1624            }
1625        }
1626        Ok(())
1627    }
1628
1629    async fn register_logical_regions(
1630        &self,
1631        engine: &RegionEngineRef,
1632        physical_region_id: RegionId,
1633    ) -> Result<()> {
1634        let metric_engine =
1635            engine
1636                .as_any()
1637                .downcast_ref::<MetricEngine>()
1638                .context(UnexpectedSnafu {
1639                    violated: format!(
1640                        "expecting engine type '{}', actual '{}'",
1641                        METRIC_ENGINE_NAME,
1642                        engine.name(),
1643                    ),
1644                })?;
1645
1646        let logical_regions = metric_engine
1647            .logical_regions(physical_region_id)
1648            .await
1649            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1650
1651        for region in &logical_regions {
1652            self.region_map
1653                .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1654        }
1655        if !logical_regions.is_empty() {
1656            info!(
1657                physical_region_id = %physical_region_id,
1658                logical_region_count = logical_regions.len(),
1659                logical_regions = ?logical_regions,
1660                "Logical regions are registered"
1661            );
1662        }
1663        Ok(())
1664    }
1665
1666    pub async fn handle_read(
1667        &self,
1668        request: QueryRequest,
1669        query_ctx: QueryContextRef,
1670    ) -> Result<SendableRecordBatchStream> {
1671        // TODO(ruihang): add metrics and set trace id
1672
1673        let result = self
1674            .query_engine
1675            .execute(request.plan, query_ctx)
1676            .await
1677            .context(ExecuteLogicalPlanSnafu)?;
1678
1679        match result.data {
1680            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1681                UnsupportedOutputSnafu { expected: "stream" }.fail()
1682            }
1683            OutputData::Stream(stream) => Ok(stream),
1684        }
1685    }
1686
1687    async fn stop(&self) -> Result<()> {
1688        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1689        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1690        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1691        //
1692        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1693        // it here, collect the values first then use later separately.
1694
1695        let regions = self
1696            .region_map
1697            .iter()
1698            .map(|x| (*x.key(), x.value().clone()))
1699            .collect::<Vec<_>>();
1700        let num_regions = regions.len();
1701
1702        for (region_id, engine) in regions {
1703            let closed = engine
1704                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1705                .await;
1706            match closed {
1707                Ok(_) => debug!("Region {region_id} is closed"),
1708                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1709            }
1710        }
1711        self.region_map.clear();
1712        info!("closed {num_regions} regions");
1713
1714        drop(self.mito_engine.write().unwrap().take());
1715        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1716        for (engine_name, engine) in engines {
1717            engine
1718                .stop()
1719                .await
1720                .context(StopRegionEngineSnafu { name: &engine_name })?;
1721            info!("Region engine {engine_name} is stopped");
1722        }
1723
1724        Ok(())
1725    }
1726}
1727
1728#[derive(Debug, Clone, Copy)]
1729enum RegionChange {
1730    None,
1731    Register(RegionAttribute),
1732    Deregisters,
1733    Catchup,
1734    Ingest,
1735}
1736
1737fn is_metric_engine(engine: &str) -> bool {
1738    engine == METRIC_ENGINE_NAME
1739}
1740
1741fn parse_region_attribute(
1742    engine: &str,
1743    options: &HashMap<String, String>,
1744) -> Result<RegionAttribute> {
1745    match engine {
1746        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1747        METRIC_ENGINE_NAME => {
1748            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1749
1750            Ok(RegionAttribute::Metric { physical })
1751        }
1752        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1753        _ => error::UnexpectedSnafu {
1754            violated: format!("Unknown engine: {}", engine),
1755        }
1756        .fail(),
1757    }
1758}
1759
1760#[derive(Debug, Clone, Copy)]
1761enum RegionAttribute {
1762    Mito,
1763    Metric { physical: bool },
1764    File,
1765}
1766
1767impl RegionAttribute {
1768    fn engine(&self) -> &'static str {
1769        match self {
1770            RegionAttribute::Mito => MITO_ENGINE_NAME,
1771            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1772            RegionAttribute::File => FILE_ENGINE_NAME,
1773        }
1774    }
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779
1780    use std::assert_matches;
1781    use std::collections::HashMap;
1782    use std::sync::Arc;
1783
1784    use api::v1::SemanticType;
1785    use common_error::ext::ErrorExt;
1786    use common_recordbatch::RecordBatches;
1787    use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
1788    use datatypes::prelude::{ConcreteDataType, VectorRef};
1789    use datatypes::schema::{ColumnSchema, Schema};
1790    use datatypes::vectors::Int32Vector;
1791    use futures_util::StreamExt;
1792    use mito2::test_util::CreateRequestBuilder;
1793    use query::options::FLOW_RETURN_REGION_SEQ;
1794    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1795    use store_api::region_engine::RegionEngine;
1796    use store_api::region_request::{
1797        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1798    };
1799    use store_api::storage::RegionId;
1800
1801    use super::*;
1802    use crate::error::Result;
1803    use crate::tests::{MockRegionEngine, mock_region_server};
1804
1805    fn single_value_stream() -> SendableRecordBatchStream {
1806        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1807            "v",
1808            ConcreteDataType::int32_datatype(),
1809            false,
1810        )]));
1811        let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
1812        let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1813        RecordBatches::try_new(schema, vec![batch])
1814            .unwrap()
1815            .as_stream()
1816    }
1817
1818    #[tokio::test]
1819    async fn test_region_watermark_stream_only_sets_terminal_metrics() {
1820        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1821            "v",
1822            ConcreteDataType::int32_datatype(),
1823            false,
1824        )]));
1825        let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
1826        let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1827        let stream = RecordBatches::try_new(schema, vec![batch])
1828            .unwrap()
1829            .as_stream();
1830
1831        let region_id = RegionId::new(42, 7);
1832        let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
1833        let mut pinned = Box::pin(wrapped);
1834
1835        assert!(pinned.as_ref().get_ref().metrics().is_none());
1836        while pinned.next().await.is_some() {}
1837
1838        let metrics = pinned.as_ref().get_ref().metrics().unwrap();
1839        assert_eq!(
1840            metrics.region_watermarks,
1841            vec![RegionWatermarkEntry {
1842                region_id: region_id.as_u64(),
1843                watermark: Some(99),
1844            }]
1845        );
1846    }
1847
1848    #[test]
1849    fn test_region_watermark_stream_preserves_unproved_watermark() {
1850        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1851            "v",
1852            ConcreteDataType::int32_datatype(),
1853            false,
1854        )]));
1855        let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
1856        let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1857        let stream = RecordBatches::try_new(schema, vec![batch])
1858            .unwrap()
1859            .as_stream();
1860
1861        let region_id = RegionId::new(42, 7);
1862        let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
1863        let metrics = RecordBatchMetrics {
1864            region_watermarks: vec![RegionWatermarkEntry {
1865                region_id: region_id.as_u64(),
1866                watermark: None,
1867            }],
1868            ..Default::default()
1869        };
1870
1871        let merged = wrapped.merged_metrics(metrics);
1872        assert_eq!(
1873            merged.region_watermarks,
1874            vec![RegionWatermarkEntry {
1875                region_id: region_id.as_u64(),
1876                watermark: None,
1877            }]
1878        );
1879    }
1880
1881    #[tokio::test]
1882    async fn test_wrap_flow_region_watermark_stream_adds_terminal_metrics() {
1883        let region_id = RegionId::new(42, 7);
1884        let query_ctx = Arc::new(
1885            QueryContextBuilder::default()
1886                .extensions(HashMap::from([(
1887                    FLOW_RETURN_REGION_SEQ.to_string(),
1888                    "true".to_string(),
1889                )]))
1890                .build(),
1891        );
1892        query_ctx.set_snapshot(region_id.as_u64(), 99);
1893
1894        let wrapped =
1895            wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
1896        let mut pinned = Box::pin(wrapped);
1897        while pinned.next().await.is_some() {}
1898
1899        let metrics = pinned.as_ref().get_ref().metrics().unwrap();
1900        assert_eq!(
1901            metrics.region_watermarks,
1902            vec![RegionWatermarkEntry {
1903                region_id: region_id.as_u64(),
1904                watermark: Some(99),
1905            }]
1906        );
1907    }
1908
1909    #[tokio::test]
1910    async fn test_wrap_flow_region_watermark_stream_skips_without_extension() {
1911        let region_id = RegionId::new(42, 7);
1912        let query_ctx = Arc::new(QueryContextBuilder::default().build());
1913        query_ctx.set_snapshot(region_id.as_u64(), 99);
1914
1915        let wrapped =
1916            wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
1917        let mut pinned = Box::pin(wrapped);
1918        while pinned.next().await.is_some() {}
1919
1920        assert!(pinned.as_ref().get_ref().metrics().is_none());
1921    }
1922
1923    #[tokio::test]
1924    async fn test_wrap_flow_region_watermark_stream_skips_without_snapshot() {
1925        let region_id = RegionId::new(42, 7);
1926        let query_ctx = Arc::new(
1927            QueryContextBuilder::default()
1928                .extensions(HashMap::from([(
1929                    FLOW_RETURN_REGION_SEQ.to_string(),
1930                    "true".to_string(),
1931                )]))
1932                .build(),
1933        );
1934
1935        let wrapped =
1936            wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
1937        let mut pinned = Box::pin(wrapped);
1938        while pinned.next().await.is_some() {}
1939
1940        assert!(pinned.as_ref().get_ref().metrics().is_none());
1941    }
1942
1943    #[tokio::test]
1944    async fn test_region_registering() {
1945        common_telemetry::init_default_ut_logging();
1946
1947        let mut mock_region_server = mock_region_server();
1948        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1949        let engine_name = engine.name();
1950        mock_region_server.register_engine(engine.clone());
1951        let region_id = RegionId::new(1, 1);
1952        let builder = CreateRequestBuilder::new();
1953        let create_req = builder.build();
1954        // Tries to create/open a registering region.
1955        mock_region_server.inner.region_map.insert(
1956            region_id,
1957            RegionEngineWithStatus::Registering(engine.clone()),
1958        );
1959        let response = mock_region_server
1960            .handle_request(region_id, RegionRequest::Create(create_req))
1961            .await
1962            .unwrap();
1963        assert_eq!(response.affected_rows, 0);
1964        let status = mock_region_server
1965            .inner
1966            .region_map
1967            .get(&region_id)
1968            .unwrap()
1969            .clone();
1970        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1971
1972        mock_region_server.inner.region_map.insert(
1973            region_id,
1974            RegionEngineWithStatus::Registering(engine.clone()),
1975        );
1976        let response = mock_region_server
1977            .handle_request(
1978                region_id,
1979                RegionRequest::Open(RegionOpenRequest {
1980                    engine: engine_name.to_string(),
1981                    table_dir: String::new(),
1982                    path_type: PathType::Bare,
1983                    options: Default::default(),
1984                    skip_wal_replay: false,
1985                    checkpoint: None,
1986                }),
1987            )
1988            .await
1989            .unwrap();
1990        assert_eq!(response.affected_rows, 0);
1991        let status = mock_region_server
1992            .inner
1993            .region_map
1994            .get(&region_id)
1995            .unwrap()
1996            .clone();
1997        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1998    }
1999
2000    #[tokio::test]
2001    async fn test_region_deregistering() {
2002        common_telemetry::init_default_ut_logging();
2003
2004        let mut mock_region_server = mock_region_server();
2005        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2006
2007        mock_region_server.register_engine(engine.clone());
2008
2009        let region_id = RegionId::new(1, 1);
2010
2011        // Tries to drop/close a registering region.
2012        mock_region_server.inner.region_map.insert(
2013            region_id,
2014            RegionEngineWithStatus::Deregistering(engine.clone()),
2015        );
2016
2017        let response = mock_region_server
2018            .handle_request(
2019                region_id,
2020                RegionRequest::Drop(RegionDropRequest {
2021                    fast_path: false,
2022                    force: false,
2023                    partial_drop: false,
2024                }),
2025            )
2026            .await
2027            .unwrap();
2028        assert_eq!(response.affected_rows, 0);
2029
2030        let status = mock_region_server
2031            .inner
2032            .region_map
2033            .get(&region_id)
2034            .unwrap()
2035            .clone();
2036        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
2037
2038        mock_region_server.inner.region_map.insert(
2039            region_id,
2040            RegionEngineWithStatus::Deregistering(engine.clone()),
2041        );
2042
2043        let response = mock_region_server
2044            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
2045            .await
2046            .unwrap();
2047        assert_eq!(response.affected_rows, 0);
2048
2049        let status = mock_region_server
2050            .inner
2051            .region_map
2052            .get(&region_id)
2053            .unwrap()
2054            .clone();
2055        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
2056    }
2057
2058    #[tokio::test]
2059    async fn test_region_not_ready() {
2060        common_telemetry::init_default_ut_logging();
2061
2062        let mut mock_region_server = mock_region_server();
2063        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2064
2065        mock_region_server.register_engine(engine.clone());
2066
2067        let region_id = RegionId::new(1, 1);
2068
2069        // Tries to drop/close a registering region.
2070        mock_region_server.inner.region_map.insert(
2071            region_id,
2072            RegionEngineWithStatus::Registering(engine.clone()),
2073        );
2074
2075        let err = mock_region_server
2076            .handle_request(
2077                region_id,
2078                RegionRequest::Truncate(RegionTruncateRequest::All),
2079            )
2080            .await
2081            .unwrap_err();
2082
2083        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2084    }
2085
2086    #[tokio::test]
2087    async fn test_region_request_failed() {
2088        common_telemetry::init_default_ut_logging();
2089
2090        let mut mock_region_server = mock_region_server();
2091        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
2092            MITO_ENGINE_NAME,
2093            Box::new(|_region_id, _request| {
2094                error::UnexpectedSnafu {
2095                    violated: "test".to_string(),
2096                }
2097                .fail()
2098            }),
2099        );
2100
2101        mock_region_server.register_engine(engine.clone());
2102
2103        let region_id = RegionId::new(1, 1);
2104        let builder = CreateRequestBuilder::new();
2105        let create_req = builder.build();
2106        mock_region_server
2107            .handle_request(region_id, RegionRequest::Create(create_req))
2108            .await
2109            .unwrap_err();
2110
2111        let status = mock_region_server.inner.region_map.get(&region_id);
2112        assert!(status.is_none());
2113
2114        mock_region_server
2115            .inner
2116            .region_map
2117            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
2118
2119        mock_region_server
2120            .handle_request(
2121                region_id,
2122                RegionRequest::Drop(RegionDropRequest {
2123                    fast_path: false,
2124                    force: false,
2125                    partial_drop: false,
2126                }),
2127            )
2128            .await
2129            .unwrap_err();
2130
2131        let status = mock_region_server.inner.region_map.get(&region_id);
2132        assert!(status.is_some());
2133    }
2134
2135    #[tokio::test]
2136    async fn test_batch_open_region_ignore_nonexistent_regions() {
2137        common_telemetry::init_default_ut_logging();
2138        let mut mock_region_server = mock_region_server();
2139        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
2140            MITO_ENGINE_NAME,
2141            Box::new(|region_id, _request| {
2142                if region_id == RegionId::new(1, 1) {
2143                    error::RegionNotFoundSnafu { region_id }.fail()
2144                } else {
2145                    Ok(0)
2146                }
2147            }),
2148        );
2149        mock_region_server.register_engine(engine.clone());
2150
2151        let region_ids = mock_region_server
2152            .handle_batch_open_requests(
2153                8,
2154                vec![
2155                    (
2156                        RegionId::new(1, 1),
2157                        RegionOpenRequest {
2158                            engine: MITO_ENGINE_NAME.to_string(),
2159                            table_dir: String::new(),
2160                            path_type: PathType::Bare,
2161                            options: Default::default(),
2162                            skip_wal_replay: false,
2163                            checkpoint: None,
2164                        },
2165                    ),
2166                    (
2167                        RegionId::new(1, 2),
2168                        RegionOpenRequest {
2169                            engine: MITO_ENGINE_NAME.to_string(),
2170                            table_dir: String::new(),
2171                            path_type: PathType::Bare,
2172                            options: Default::default(),
2173                            skip_wal_replay: false,
2174                            checkpoint: None,
2175                        },
2176                    ),
2177                ],
2178                true,
2179            )
2180            .await
2181            .unwrap();
2182        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
2183
2184        let err = mock_region_server
2185            .handle_batch_open_requests(
2186                8,
2187                vec![
2188                    (
2189                        RegionId::new(1, 1),
2190                        RegionOpenRequest {
2191                            engine: MITO_ENGINE_NAME.to_string(),
2192                            table_dir: String::new(),
2193                            path_type: PathType::Bare,
2194                            options: Default::default(),
2195                            skip_wal_replay: false,
2196                            checkpoint: None,
2197                        },
2198                    ),
2199                    (
2200                        RegionId::new(1, 2),
2201                        RegionOpenRequest {
2202                            engine: MITO_ENGINE_NAME.to_string(),
2203                            table_dir: String::new(),
2204                            path_type: PathType::Bare,
2205                            options: Default::default(),
2206                            skip_wal_replay: false,
2207                            checkpoint: None,
2208                        },
2209                    ),
2210                ],
2211                false,
2212            )
2213            .await
2214            .unwrap_err();
2215        assert_eq!(err.status_code(), StatusCode::Unexpected);
2216    }
2217
2218    struct CurrentEngineTest {
2219        region_id: RegionId,
2220        current_region_status: Option<RegionEngineWithStatus>,
2221        region_change: RegionChange,
2222        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
2223    }
2224
2225    #[tokio::test]
2226    async fn test_current_engine() {
2227        common_telemetry::init_default_ut_logging();
2228
2229        let mut mock_region_server = mock_region_server();
2230        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
2231        mock_region_server.register_engine(engine.clone());
2232
2233        let region_id = RegionId::new(1024, 1);
2234        let tests = vec![
2235            // RegionChange::None
2236            CurrentEngineTest {
2237                region_id,
2238                current_region_status: None,
2239                region_change: RegionChange::None,
2240                assert: Box::new(|result| {
2241                    let err = result.unwrap_err();
2242                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2243                }),
2244            },
2245            CurrentEngineTest {
2246                region_id,
2247                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2248                region_change: RegionChange::None,
2249                assert: Box::new(|result| {
2250                    let current_engine = result.unwrap();
2251                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2252                }),
2253            },
2254            CurrentEngineTest {
2255                region_id,
2256                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2257                region_change: RegionChange::None,
2258                assert: Box::new(|result| {
2259                    let err = result.unwrap_err();
2260                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2261                }),
2262            },
2263            CurrentEngineTest {
2264                region_id,
2265                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2266                region_change: RegionChange::None,
2267                assert: Box::new(|result| {
2268                    let err = result.unwrap_err();
2269                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2270                }),
2271            },
2272            // RegionChange::Register
2273            CurrentEngineTest {
2274                region_id,
2275                current_region_status: None,
2276                region_change: RegionChange::Register(RegionAttribute::Mito),
2277                assert: Box::new(|result| {
2278                    let current_engine = result.unwrap();
2279                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2280                }),
2281            },
2282            CurrentEngineTest {
2283                region_id,
2284                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2285                region_change: RegionChange::Register(RegionAttribute::Mito),
2286                assert: Box::new(|result| {
2287                    let current_engine = result.unwrap();
2288                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2289                }),
2290            },
2291            CurrentEngineTest {
2292                region_id,
2293                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2294                region_change: RegionChange::Register(RegionAttribute::Mito),
2295                assert: Box::new(|result| {
2296                    let err = result.unwrap_err();
2297                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2298                }),
2299            },
2300            CurrentEngineTest {
2301                region_id,
2302                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2303                region_change: RegionChange::Register(RegionAttribute::Mito),
2304                assert: Box::new(|result| {
2305                    let current_engine = result.unwrap();
2306                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2307                }),
2308            },
2309            // RegionChange::Deregister
2310            CurrentEngineTest {
2311                region_id,
2312                current_region_status: None,
2313                region_change: RegionChange::Deregisters,
2314                assert: Box::new(|result| {
2315                    let current_engine = result.unwrap();
2316                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2317                }),
2318            },
2319            CurrentEngineTest {
2320                region_id,
2321                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2322                region_change: RegionChange::Deregisters,
2323                assert: Box::new(|result| {
2324                    let err = result.unwrap_err();
2325                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2326                }),
2327            },
2328            CurrentEngineTest {
2329                region_id,
2330                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2331                region_change: RegionChange::Deregisters,
2332                assert: Box::new(|result| {
2333                    let current_engine = result.unwrap();
2334                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2335                }),
2336            },
2337            CurrentEngineTest {
2338                region_id,
2339                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2340                region_change: RegionChange::Deregisters,
2341                assert: Box::new(|result| {
2342                    let current_engine = result.unwrap();
2343                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2344                }),
2345            },
2346        ];
2347
2348        for test in tests {
2349            let CurrentEngineTest {
2350                region_id,
2351                current_region_status,
2352                region_change,
2353                assert,
2354            } = test;
2355
2356            // Sets up
2357            if let Some(status) = current_region_status {
2358                mock_region_server
2359                    .inner
2360                    .region_map
2361                    .insert(region_id, status);
2362            } else {
2363                mock_region_server.inner.region_map.remove(&region_id);
2364            }
2365
2366            let result = mock_region_server
2367                .inner
2368                .get_engine(region_id, &region_change);
2369
2370            assert(result);
2371        }
2372    }
2373
2374    #[tokio::test]
2375    async fn test_region_server_parallelism() {
2376        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2377        let first_query = p.acquire().await;
2378        assert!(first_query.is_ok());
2379        let second_query = p.acquire().await;
2380        assert!(second_query.is_ok());
2381        let third_query = p.acquire().await;
2382        assert!(third_query.is_err());
2383        let err = third_query.unwrap_err();
2384        assert_eq!(
2385            err.output_msg(),
2386            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2387        );
2388        drop(first_query);
2389        let forth_query = p.acquire().await;
2390        assert!(forth_query.is_ok());
2391    }
2392
2393    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2394        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2395        metadata_builder.push_column_metadata(ColumnMetadata {
2396            column_schema: datatypes::schema::ColumnSchema::new(
2397                "timestamp",
2398                ConcreteDataType::timestamp_nanosecond_datatype(),
2399                false,
2400            ),
2401            semantic_type: SemanticType::Timestamp,
2402            column_id: 0,
2403        });
2404        metadata_builder.push_column_metadata(ColumnMetadata {
2405            column_schema: datatypes::schema::ColumnSchema::new(
2406                "file",
2407                ConcreteDataType::string_datatype(),
2408                true,
2409            ),
2410            semantic_type: SemanticType::Tag,
2411            column_id: 1,
2412        });
2413        metadata_builder.push_column_metadata(ColumnMetadata {
2414            column_schema: datatypes::schema::ColumnSchema::new(
2415                "message",
2416                ConcreteDataType::string_datatype(),
2417                true,
2418            ),
2419            semantic_type: SemanticType::Field,
2420            column_id: 2,
2421        });
2422        metadata_builder.primary_key(vec![1]);
2423        metadata_builder.build().unwrap()
2424    }
2425
2426    #[tokio::test]
2427    async fn test_handle_list_metadata_request() {
2428        common_telemetry::init_default_ut_logging();
2429
2430        let mut mock_region_server = mock_region_server();
2431        let region_id_1 = RegionId::new(1, 0);
2432        let region_id_2 = RegionId::new(2, 0);
2433
2434        let metadata_1 = mock_region_metadata(region_id_1);
2435        let metadata_2 = mock_region_metadata(region_id_2);
2436        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2437
2438        let metadata_1 = Arc::new(metadata_1);
2439        let metadata_2 = Arc::new(metadata_2);
2440        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2441            MITO_ENGINE_NAME,
2442            Box::new(move |region_id| {
2443                if region_id == region_id_1 {
2444                    Ok(metadata_1.clone())
2445                } else if region_id == region_id_2 {
2446                    Ok(metadata_2.clone())
2447                } else {
2448                    error::RegionNotFoundSnafu { region_id }.fail()
2449                }
2450            }),
2451        );
2452
2453        mock_region_server.register_engine(engine.clone());
2454        mock_region_server
2455            .inner
2456            .region_map
2457            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2458        mock_region_server
2459            .inner
2460            .region_map
2461            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2462
2463        // All regions exist.
2464        let list_metadata_request = ListMetadataRequest {
2465            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2466        };
2467        let response = mock_region_server
2468            .handle_list_metadata_request(&list_metadata_request)
2469            .await
2470            .unwrap();
2471        let decoded_metadata: Vec<Option<RegionMetadata>> =
2472            serde_json::from_slice(&response.metadata).unwrap();
2473        assert_eq!(metadatas, decoded_metadata);
2474    }
2475
2476    #[tokio::test]
2477    async fn test_handle_list_metadata_not_found() {
2478        common_telemetry::init_default_ut_logging();
2479
2480        let mut mock_region_server = mock_region_server();
2481        let region_id_1 = RegionId::new(1, 0);
2482        let region_id_2 = RegionId::new(2, 0);
2483
2484        let metadata_1 = mock_region_metadata(region_id_1);
2485        let metadatas = vec![Some(metadata_1.clone()), None];
2486
2487        let metadata_1 = Arc::new(metadata_1);
2488        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2489            MITO_ENGINE_NAME,
2490            Box::new(move |region_id| {
2491                if region_id == region_id_1 {
2492                    Ok(metadata_1.clone())
2493                } else {
2494                    error::RegionNotFoundSnafu { region_id }.fail()
2495                }
2496            }),
2497        );
2498
2499        mock_region_server.register_engine(engine.clone());
2500        mock_region_server
2501            .inner
2502            .region_map
2503            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2504
2505        // Not in region map.
2506        let list_metadata_request = ListMetadataRequest {
2507            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2508        };
2509        let response = mock_region_server
2510            .handle_list_metadata_request(&list_metadata_request)
2511            .await
2512            .unwrap();
2513        let decoded_metadata: Vec<Option<RegionMetadata>> =
2514            serde_json::from_slice(&response.metadata).unwrap();
2515        assert_eq!(metadatas, decoded_metadata);
2516
2517        // Not in region engine.
2518        mock_region_server
2519            .inner
2520            .region_map
2521            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2522        let response = mock_region_server
2523            .handle_list_metadata_request(&list_metadata_request)
2524            .await
2525            .unwrap();
2526        let decoded_metadata: Vec<Option<RegionMetadata>> =
2527            serde_json::from_slice(&response.metadata).unwrap();
2528        assert_eq!(metadatas, decoded_metadata);
2529    }
2530
2531    #[tokio::test]
2532    async fn test_handle_list_metadata_failed() {
2533        common_telemetry::init_default_ut_logging();
2534
2535        let mut mock_region_server = mock_region_server();
2536        let region_id_1 = RegionId::new(1, 0);
2537
2538        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2539            MITO_ENGINE_NAME,
2540            Box::new(move |region_id| {
2541                error::UnexpectedSnafu {
2542                    violated: format!("Failed to get region {region_id}"),
2543                }
2544                .fail()
2545            }),
2546        );
2547
2548        mock_region_server.register_engine(engine.clone());
2549        mock_region_server
2550            .inner
2551            .region_map
2552            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2553
2554        // Failed to get.
2555        let list_metadata_request = ListMetadataRequest {
2556            region_ids: vec![region_id_1.as_u64()],
2557        };
2558        mock_region_server
2559            .handle_list_metadata_request(&list_metadata_request)
2560            .await
2561            .unwrap_err();
2562    }
2563}