From 081c6d9e74a540541e32b323c72eee0213efdac9 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 18 Oct 2024 15:21:35 +0800 Subject: [PATCH] fix: flush metric metadata region (#4852) * fix: flush metric metadata region * chore: apply suggestions from CR --- src/metric-engine/src/engine.rs | 6 ++- src/metric-engine/src/engine/catchup.rs | 3 +- src/metric-engine/src/engine/flush.rs | 52 +++++++++++++++++++++++++ src/store-api/src/region_request.rs | 2 +- 4 files changed, 59 insertions(+), 4 deletions(-) create mode 100644 src/metric-engine/src/engine/flush.rs diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index ab789db661..a136ed3c76 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -17,6 +17,7 @@ mod catchup; mod close; mod create; mod drop; +mod flush; mod open; mod options; mod put; @@ -145,7 +146,7 @@ impl RegionEngine for MetricEngine { .alter_region(region_id, alter, &mut extension_return_value) .await } - RegionRequest::Flush(_) | RegionRequest::Compact(_) => { + RegionRequest::Compact(_) => { if self.inner.is_physical_region(region_id) { self.inner .mito @@ -157,10 +158,11 @@ impl RegionEngine for MetricEngine { UnsupportedRegionRequestSnafu { request }.fail() } } + RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await, RegionRequest::Delete(_) | RegionRequest::Truncate(_) => { UnsupportedRegionRequestSnafu { request }.fail() } - RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await, + RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await, }; result.map_err(BoxedError::new).map(|rows| RegionResponse { diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index 576ca54cfa..4b1268c049 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -47,9 +47,10 @@ impl MetricEngineInner { .await .context(MitoCatchupOperationSnafu)?; + let data_region_id = utils::to_data_region_id(region_id); self.mito .handle_request( - region_id, + data_region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: req.set_writable, entry_id: req.entry_id, diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs new file mode 100644 index 0000000000..5afadd54b6 --- /dev/null +++ b/src/metric-engine/src/engine/flush.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AffectedRows, RegionFlushRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::{MitoFlushOperationSnafu, Result, UnsupportedRegionRequestSnafu}; +use crate::utils; + +impl MetricEngineInner { + pub async fn flush_region( + &self, + region_id: RegionId, + req: RegionFlushRequest, + ) -> Result { + if !self.is_physical_region(region_id) { + return UnsupportedRegionRequestSnafu { + request: RegionRequest::Flush(req), + } + .fail(); + } + + let metadata_region_id = utils::to_metadata_region_id(region_id); + // Flushes the metadata region as well + self.mito + .handle_request(metadata_region_id, RegionRequest::Flush(req.clone())) + .await + .context(MitoFlushOperationSnafu) + .map(|response| response.affected_rows)?; + + let data_region_id = utils::to_data_region_id(region_id); + self.mito + .handle_request(data_region_id, RegionRequest::Flush(req.clone())) + .await + .context(MitoFlushOperationSnafu) + .map(|response| response.affected_rows) + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index cad251988f..2453b83340 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -639,7 +639,7 @@ impl From for ChangeColumnType { } } -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct RegionFlushRequest { pub row_group_size: Option, }