From 6b6d1ce7c484aa8a1ed682e419f4f356893b82e8 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 25 Nov 2025 20:09:20 +0800 Subject: [PATCH] feat: introduce `remap_manifests` for `RegionEngine` (#7265) * refactor: consolidate RegionManifestOptions creation logic Signed-off-by: WenyXu * feat: introduce`remap_manifests` for `RegionEngine` Signed-off-by: WenyXu * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/datanode/src/tests.rs | 10 +- src/file-engine/src/engine.rs | 17 +- src/metric-engine/src/engine.rs | 23 +- src/metric-engine/src/error.rs | 10 +- src/mito2/src/compaction/compactor.rs | 20 +- src/mito2/src/engine.rs | 40 +++- src/mito2/src/engine/remap_manifests_test.rs | 239 +++++++++++++++++++ src/mito2/src/error.rs | 29 ++- src/mito2/src/manifest/manager.rs | 21 +- src/mito2/src/manifest/storage.rs | 6 + src/mito2/src/region.rs | 18 ++ src/mito2/src/region/opener.rs | 59 ++--- src/mito2/src/request.rs | 65 ++++- src/mito2/src/worker.rs | 4 + src/mito2/src/worker/handle_remap.rs | 125 ++++++++++ src/query/src/optimizer/test_util.rs | 10 +- src/store-api/src/region_engine.rs | 26 ++ 17 files changed, 647 insertions(+), 75 deletions(-) create mode 100644 src/mito2/src/engine/remap_manifests_test.rs create mode 100644 src/mito2/src/worker/handle_remap.rs diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 5f7db4d928..557a8c92ed 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -34,7 +34,8 @@ use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, + RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -291,6 +292,13 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } + async fn remap_manifests( + &self, + _request: RemapManifestsRequest, + ) -> Result { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index d3ec72c1e2..231583beb6 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -27,8 +27,9 @@ use snafu::{OptionExt, ensure}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, - SinglePartitionScanner, SyncManifestResponse, + RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner, + SyncManifestResponse, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -150,6 +151,18 @@ impl RegionEngine for FileRegionEngine { Ok(SyncManifestResponse::NotSupported) } + async fn remap_manifests( + &self, + _request: RemapManifestsRequest, + ) -> Result { + Err(BoxedError::new( + UnsupportedSnafu { + operation: "remap_manifests", + } + .build(), + )) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.state(region_id) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 2ca7aa971a..9e4632a7d2 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -43,8 +43,8 @@ use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, - RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, - SettableRegionRoleState, SyncManifestResponse, + RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{ BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, @@ -53,7 +53,10 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use crate::config::EngineConfig; use crate::data_region::DataRegion; -use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu}; +use crate::error::{ + self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu, + UnsupportedRemapManifestsRequestSnafu, +}; use crate::metadata_region::MetadataRegion; use crate::repeated_task::FlushMetadataRegionTask; use crate::row_modifier::RowModifier; @@ -350,6 +353,20 @@ impl RegionEngine for MetricEngine { .map_err(BoxedError::new) } + async fn remap_manifests( + &self, + request: RemapManifestsRequest, + ) -> Result { + let region_id = request.region_id; + if self.inner.is_physical_region(region_id) { + self.inner.mito.remap_manifests(request).await + } else { + Err(BoxedError::new( + UnsupportedRemapManifestsRequestSnafu { region_id }.build(), + )) + } + } + async fn set_region_role_state_gracefully( &self, region_id: RegionId, diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 0f12e16cfc..243fd566a3 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -242,6 +242,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Unsupported remap manifests request for region {}", region_id))] + UnsupportedRemapManifestsRequest { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unsupported alter kind: {}", kind))] UnsupportedAlterKind { kind: String, @@ -324,7 +331,8 @@ impl ErrorExt for Error { | AddingFieldColumn { .. } | ParseRegionOptions { .. } | UnexpectedRequest { .. } - | UnsupportedAlterKind { .. } => StatusCode::InvalidArguments, + | UnsupportedAlterKind { .. } + | UnsupportedRemapManifestsRequest { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { StatusCode::Unsupported diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 8db9edc2b8..3c84a9ec92 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -41,11 +41,9 @@ use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; -use crate::manifest::storage::manifest_compress_type; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::metrics; use crate::read::{FlatSource, Source}; -use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; @@ -162,19 +160,9 @@ pub async fn open_compaction_region( }; let manifest_manager = { - let region_manifest_options = RegionManifestOptions { - manifest_dir: new_manifest_dir(®ion_dir_from_table_dir( - &req.table_dir, - req.region_id, - req.path_type, - )), - object_store: object_store.clone(), - compress_type: manifest_compress_type(mito_config.compress_manifest), - checkpoint_distance: mito_config.manifest_checkpoint_distance, - remove_file_options: RemoveFileOptions { - enable_gc: mito_config.gc.enable, - }, - }; + let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type); + let region_manifest_options = + RegionManifestOptions::new(mito_config, ®ion_dir, object_store); RegionManifestManager::open(region_manifest_options, &Default::default()) .await? diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 456d33a7f5..76c9a37616 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -71,6 +71,9 @@ mod sync_test; #[cfg(test)] mod truncate_test; +#[cfg(test)] +mod remap_manifests_test; + mod puffin_index; use std::any::Any; @@ -101,7 +104,8 @@ use store_api::metric_engine_consts::{ }; use store_api::region_engine::{ BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, - RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, + RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{ AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest, @@ -116,7 +120,7 @@ use crate::config::MitoConfig; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, - SerdeJsonSnafu, SerializeColumnMetadataSnafu, + SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; @@ -1027,6 +1031,28 @@ impl EngineInner { receiver.await.context(RecvSnafu)? } + async fn remap_manifests( + &self, + request: RemapManifestsRequest, + ) -> Result { + let region_id = request.region_id; + let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?; + self.workers.submit_to_worker(region_id, request).await?; + let manifests = receiver.await.context(RecvSnafu)??; + + let new_manifests = manifests + .into_iter() + .map(|(region_id, manifest)| { + Ok(( + region_id, + serde_json::to_string(&manifest) + .context(SerializeManifestSnafu { region_id })?, + )) + }) + .collect::>>()?; + Ok(RemapManifestsResponse { new_manifests }) + } + fn role(&self, region_id: RegionId) -> Option { self.workers.get_region(region_id).map(|region| { if region.is_follower() { @@ -1203,6 +1229,16 @@ impl RegionEngine for MitoEngine { Ok(SyncManifestResponse::Mito { synced }) } + async fn remap_manifests( + &self, + request: RemapManifestsRequest, + ) -> Result { + self.inner + .remap_manifests(request) + .await + .map_err(BoxedError::new) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.role(region_id) } diff --git a/src/mito2/src/engine/remap_manifests_test.rs b/src/mito2/src/engine/remap_manifests_test.rs new file mode 100644 index 0000000000..bd38e87e2a --- /dev/null +++ b/src/mito2/src/engine/remap_manifests_test.rs @@ -0,0 +1,239 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; + +use api::v1::Rows; +use datatypes::value::Value; +use partition::expr::{PartitionExpr, col}; +use store_api::region_engine::{RegionEngine, RemapManifestsRequest, SettableRegionRoleState}; +use store_api::region_request::{RegionFlushRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::error::Error; +use crate::manifest::action::RegionManifest; +use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; + +#[tokio::test] +async fn test_remap_manifests_invalid_partition_expr() { + common_telemetry::init_default_ut_logging(); + test_remap_manifests_invalid_partition_expr_with_format(false).await; + test_remap_manifests_invalid_partition_expr_with_format(true).await; +} + +async fn test_remap_manifests_invalid_partition_expr_with_format(flat_format: bool) { + let mut env = TestEnv::with_prefix("invalid-partition-expr").await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let err = engine + .remap_manifests(RemapManifestsRequest { + region_id, + input_regions: vec![region_id], + region_mapping: [(region_id, vec![region_id])].into_iter().collect(), + new_partition_exprs: [(region_id, "invalid expr".to_string())] + .into_iter() + .collect(), + }) + .await + .unwrap_err(); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::InvalidPartitionExpr { .. } + ) +} + +#[tokio::test] +async fn test_remap_manifests_invalid_region_state() { + common_telemetry::init_default_ut_logging(); + test_remap_manifests_invalid_region_state_with_format(false).await; + test_remap_manifests_invalid_region_state_with_format(true).await; +} + +fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { + col(col_name) + .gt_eq(Value::Int64(start)) + .and(col(col_name).lt(Value::Int64(end))) +} + +async fn test_remap_manifests_invalid_region_state_with_format(flat_format: bool) { + let mut env = TestEnv::with_prefix("invalid-region-state").await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let err = engine + .remap_manifests(RemapManifestsRequest { + region_id, + input_regions: vec![region_id], + region_mapping: [(region_id, vec![region_id])].into_iter().collect(), + new_partition_exprs: [(region_id, range_expr("x", 0, 100).as_json_str().unwrap())] + .into_iter() + .collect(), + }) + .await + .unwrap_err(); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::RegionState { .. } + ) +} + +#[tokio::test] +async fn test_remap_manifests_invalid_input_regions() { + common_telemetry::init_default_ut_logging(); + test_remap_manifests_invalid_input_regions_with_format(false).await; + test_remap_manifests_invalid_input_regions_with_format(true).await; +} + +async fn test_remap_manifests_invalid_input_regions_with_format(flat_format: bool) { + let mut env = TestEnv::with_prefix("invalid-input-regions").await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .await + .unwrap(); + let err = engine + .remap_manifests(RemapManifestsRequest { + region_id, + input_regions: vec![region_id, RegionId::new(2, 1)], + region_mapping: [(region_id, vec![region_id])].into_iter().collect(), + new_partition_exprs: [(region_id, range_expr("x", 0, 100).as_json_str().unwrap())] + .into_iter() + .collect(), + }) + .await + .unwrap_err(); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::InvalidRequest { .. } + ) +} + +#[tokio::test] +async fn test_remap_manifests_success() { + common_telemetry::init_default_ut_logging(); + test_remap_manifests_success_with_format(false).await; + test_remap_manifests_success_with_format(true).await; +} + +async fn test_remap_manifests_success_with_format(flat_format: bool) { + let mut env = TestEnv::with_prefix("engine-stop").await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(range_expr("tag_0", 0, 100).as_json_str().unwrap())) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let new_region_id_1 = RegionId::new(1, 2); + let new_region_id_2 = RegionId::new(1, 3); + + // Generate some data + for i in 0..3 { + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(i * 10, (i + 1) * 10), + }; + put_rows(&engine, region_id, rows_data).await; + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + } + + engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .await + .unwrap(); + + let result = engine + .remap_manifests(RemapManifestsRequest { + region_id, + input_regions: vec![region_id], + region_mapping: [(region_id, vec![new_region_id_1, new_region_id_2])] + .into_iter() + .collect(), + new_partition_exprs: [ + ( + new_region_id_1, + range_expr("tag_0", 0, 50).as_json_str().unwrap(), + ), + ( + new_region_id_2, + range_expr("tag_0", 50, 100).as_json_str().unwrap(), + ), + ] + .into_iter() + .collect(), + }) + .await + .unwrap(); + assert_eq!(result.new_manifests.len(), 2); + let new_manifest_1 = + serde_json::from_str::(&result.new_manifests[&new_region_id_1]).unwrap(); + let new_manifest_2 = + serde_json::from_str::(&result.new_manifests[&new_region_id_2]).unwrap(); + assert_eq!(new_manifest_1.files.len(), 3); + assert_eq!(new_manifest_2.files.len(), 3); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2a6fc855bc..b002568def 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -104,6 +104,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to serialize manifest, region_id: {}", region_id))] + SerializeManifest { + region_id: RegionId, + #[snafu(source)] + error: serde_json::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid scan index, start: {}, end: {}", start, end))] InvalidScanIndex { start: ManifestVersion, @@ -232,6 +241,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Manifest missing for region {}", region_id))] + MissingManifest { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("File consistency check failed for file {}: {}", file_id, reason))] InconsistentFile { file_id: FileId, @@ -254,6 +270,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to fetch manifests"))] + FetchManifests { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Partition expression missing for region {}", region_id))] MissingPartitionExpr { region_id: RegionId, @@ -1172,7 +1195,8 @@ impl ErrorExt for Error { | FilesLost { .. } | InstallManifestTo { .. } | Unexpected { .. } - | SerializeColumnMetadata { .. } => StatusCode::Unexpected, + | SerializeColumnMetadata { .. } + | SerializeManifest { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } @@ -1190,6 +1214,7 @@ impl ErrorExt for Error { | DurationOutOfRange { .. } | MissingOldManifest { .. } | MissingNewManifest { .. } + | MissingManifest { .. } | NoOldManifests { .. } | MissingPartitionExpr { .. } | SerializePartitionExpr { .. } => StatusCode::InvalidArguments, @@ -1211,6 +1236,8 @@ impl ErrorExt for Error { | Metadata { .. } | MitoManifestInfo { .. } => StatusCode::Internal, + FetchManifests { source, .. } => source.status_code(), + OpenRegion { source, .. } => source.status_code(), WriteParquet { .. } => StatusCode::StorageUnavailable, diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index ab6c9dd26d..13ea0a399e 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -24,6 +24,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::FileId; use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion}; +use crate::config::MitoConfig; use crate::error::{ self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result, }; @@ -33,7 +34,8 @@ use crate::manifest::action::{ }; use crate::manifest::checkpointer::Checkpointer; use crate::manifest::storage::{ - ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, + ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type, + manifest_dir, }; use crate::metrics::MANIFEST_OP_ELAPSED; use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState}; @@ -52,6 +54,23 @@ pub struct RegionManifestOptions { pub remove_file_options: RemoveFileOptions, } +impl RegionManifestOptions { + /// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration. + pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self { + RegionManifestOptions { + manifest_dir: manifest_dir(region_dir), + object_store: object_store.clone(), + // We don't allow users to set the compression algorithm as we use it as a file suffix. + // Currently, the manifest storage doesn't have good support for changing compression algorithms. + compress_type: manifest_compress_type(config.compress_manifest), + checkpoint_distance: config.manifest_checkpoint_distance, + remove_file_options: RemoveFileOptions { + enable_gc: config.gc.enable, + }, + } + } +} + /// Options for updating `removed_files` field in [RegionManifest]. #[derive(Debug, Clone)] #[cfg_attr(any(test, feature = "test"), derive(Default))] diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 48a30af16e..78ae188224 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -24,6 +24,7 @@ use crc32fast::Hasher; use futures::TryStreamExt; use futures::future::try_join_all; use lazy_static::lazy_static; +use object_store::util::join_dir; use object_store::{Entry, ErrorKind, Lister, ObjectStore, util}; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -49,6 +50,11 @@ const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed; const FETCH_MANIFEST_PARALLELISM: usize = 16; +/// Returns the directory to the manifest files. +pub fn manifest_dir(region_dir: &str) -> String { + join_dir(region_dir, "manifest") +} + /// Returns the [CompressionType] according to whether to compress manifest files. pub const fn manifest_compress_type(compress: bool) -> CompressionType { if compress { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 3306754843..1244647bfe 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -1035,6 +1035,24 @@ impl RegionMap { Ok(region) } + /// Gets staging region by region id. + /// + /// Returns error if the region does not exist or is not in staging state. + pub(crate) fn staging_region(&self, region_id: RegionId) -> Result { + let region = self + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + ensure!( + region.is_staging(), + RegionStateSnafu { + region_id, + state: region.state(), + expect: RegionRoleState::Leader(RegionLeaderState::Staging), + } + ); + Ok(region) + } + /// Gets flushable region by region id. /// /// Returns error if the region does not exist or is not operable. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 6616ba90b8..f844319b70 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -28,7 +28,7 @@ use log_store::kafka::log_store::KafkaLogStore; use log_store::noop::log_store::NoopLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use object_store::manager::ObjectStoreManagerRef; -use object_store::util::{join_dir, normalize_dir}; +use object_store::util::normalize_dir; use snafu::{OptionExt, ResultExt, ensure}; use store_api::logstore::LogStore; use store_api::logstore::provider::Provider; @@ -49,8 +49,7 @@ use crate::error::{ Result, StaleLogEntrySnafu, }; use crate::manifest::action::RegionManifest; -use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; -use crate::manifest::storage::manifest_compress_type; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::memtable::MemtableBuilderProvider; use crate::memtable::bulk::part::BulkPart; use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef}; @@ -272,7 +271,7 @@ impl RegionOpener { }; // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = - Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?; + RegionManifestOptions::new(config, ®ion_dir, &object_store); // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id. let flushed_entry_id = provider.initial_flushed_entry_id::(wal.store()); let manifest_manager = RegionManifestManager::new( @@ -406,13 +405,9 @@ impl RegionOpener { ) -> Result> { let now = Instant::now(); let mut region_options = self.options.as_ref().unwrap().clone(); - - let region_manifest_options = Self::manifest_options( - config, - ®ion_options, - &self.region_dir(), - &self.object_store_manager, - )?; + let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?; + let region_manifest_options = + RegionManifestOptions::new(config, &self.region_dir(), &object_storage); let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options, &self.stats).await? else { @@ -576,27 +571,6 @@ impl RegionOpener { Ok(Some(region)) } - - /// Returns a new manifest options. - fn manifest_options( - config: &MitoConfig, - options: &RegionOptions, - region_dir: &str, - object_store_manager: &ObjectStoreManagerRef, - ) -> Result { - let object_store = get_object_store(&options.storage, object_store_manager)?; - Ok(RegionManifestOptions { - manifest_dir: new_manifest_dir(region_dir), - object_store, - // We don't allow users to set the compression algorithm as we use it as a file suffix. - // Currently, the manifest storage doesn't have good support for changing compression algorithms. - compress_type: manifest_compress_type(config.compress_manifest), - checkpoint_distance: config.manifest_checkpoint_distance, - remove_file_options: RemoveFileOptions { - enable_gc: config.gc.enable, - }, - }) - } } /// Creates a version builder from a region manifest. @@ -648,6 +622,7 @@ pub fn get_object_store( } /// A loader for loading metadata from a region dir. +#[derive(Debug, Clone)] pub struct RegionMetadataLoader { config: Arc, object_store_manager: ObjectStoreManagerRef, @@ -668,7 +643,9 @@ impl RegionMetadataLoader { region_dir: &str, region_options: &RegionOptions, ) -> Result> { - let manifest = self.load_manifest(region_dir, region_options).await?; + let manifest = self + .load_manifest(region_dir, ®ion_options.storage) + .await?; Ok(manifest.map(|m| m.metadata.clone())) } @@ -676,14 +653,11 @@ impl RegionMetadataLoader { pub async fn load_manifest( &self, region_dir: &str, - region_options: &RegionOptions, + storage: &Option, ) -> Result>> { - let region_manifest_options = RegionOpener::manifest_options( - &self.config, - region_options, - region_dir, - &self.object_store_manager, - )?; + let object_store = get_object_store(storage, &self.object_store_manager)?; + let region_manifest_options = + RegionManifestOptions::new(&self.config, region_dir, &object_store); let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options, &Default::default()).await? else { @@ -848,11 +822,6 @@ where Ok(last_entry_id) } -/// Returns the directory to the manifest files. -pub(crate) fn new_manifest_dir(region_dir: &str) -> String { - join_dir(region_dir, "manifest") -} - /// A task to load and fill the region file cache. pub(crate) struct RegionLoadCacheTask { region: MitoRegionRef, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 65a1fff9ef..4bac6b6266 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -26,6 +26,7 @@ use api::v1::column_def::options_from_column_schema; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint}; use common_telemetry::info; use datatypes::prelude::DataType; +use partition::expr::PartitionExpr; use prometheus::HistogramTimer; use prost::Message; use smallvec::SmallVec; @@ -44,9 +45,10 @@ use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, - FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu, + FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu, + Result, UnexpectedSnafu, }; -use crate::manifest::action::{RegionEdit, TruncateKind}; +use crate::manifest::action::{RegionEdit, RegionManifest, TruncateKind}; use crate::memtable::MemtableId; use crate::memtable::bulk::part::BulkPart; use crate::metrics::COMPACTION_ELAPSED_TOTAL; @@ -600,6 +602,9 @@ pub(crate) enum WorkerRequest { request: RegionBulkInsertsRequest, sender: OptionOutputTx, }, + + /// Remap manifests request. + RemapManifests(RemapManifestsRequest), } impl WorkerRequest { @@ -761,6 +766,48 @@ impl WorkerRequest { receiver, ) } + + /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest). + /// + /// # Errors + /// + /// Returns an error if the partition expression is invalid or missing. + /// Returns an error if the new partition expressions are not found for some regions. + #[allow(clippy::type_complexity)] + pub(crate) fn try_from_remap_manifests_request( + store_api::region_engine::RemapManifestsRequest { + region_id, + input_regions, + region_mapping, + new_partition_exprs, + }: store_api::region_engine::RemapManifestsRequest, + ) -> Result<( + WorkerRequest, + Receiver>>, + )> { + let (sender, receiver) = oneshot::channel(); + let new_partition_exprs = new_partition_exprs + .into_iter() + .map(|(k, v)| { + Ok(( + k, + PartitionExpr::from_json_str(&v) + .context(InvalidPartitionExprSnafu { expr: v })? + .context(MissingPartitionExprSnafu { region_id: k })?, + )) + }) + .collect::>>()?; + + let request = RemapManifestsRequest { + region_id, + input_regions, + region_mapping, + new_partition_exprs, + sender, + }; + + Ok((WorkerRequest::RemapManifests(request), receiver)) + } } /// DDL request to a region. @@ -993,6 +1040,20 @@ pub(crate) struct RegionSyncRequest { pub(crate) sender: Sender>, } +#[derive(Debug)] +pub(crate) struct RemapManifestsRequest { + /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation. + pub(crate) region_id: RegionId, + /// Regions to remap manifests from. + pub(crate) input_regions: Vec, + /// For each old region, which new regions should receive its files + pub(crate) region_mapping: HashMap>, + /// New partition expressions for the new regions. + pub(crate) new_partition_exprs: HashMap, + /// Result sender. + pub(crate) sender: Sender>>, +} + #[cfg(test)] mod tests { use api::v1::value::ValueData; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 60ac23af33..cf97d2a63b 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -25,6 +25,7 @@ mod handle_flush; mod handle_manifest; mod handle_open; mod handle_rebuild_index; +mod handle_remap; mod handle_truncate; mod handle_write; @@ -1002,6 +1003,9 @@ impl RegionWorkerLoop { ); } } + WorkerRequest::RemapManifests(req) => { + self.handle_remap_manifests_request(req); + } } } diff --git a/src/mito2/src/worker/handle_remap.rs b/src/mito2/src/worker/handle_remap.rs new file mode 100644 index 0000000000..3039e04a1c --- /dev/null +++ b/src/mito2/src/worker/handle_remap.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::time::Instant; + +use common_error::ext::BoxedError; +use common_telemetry::info; +use futures::future::try_join_all; +use partition::expr::PartitionExpr; +use snafu::{OptionExt, ResultExt}; +use store_api::region_request::PathType; +use store_api::storage::RegionId; + +use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result}; +use crate::manifest::action::RegionManifest; +use crate::region::MitoRegionRef; +use crate::region::opener::RegionMetadataLoader; +use crate::remap_manifest::RemapManifest; +use crate::request::RemapManifestsRequest; +use crate::sst::location::region_dir_from_table_dir; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) fn handle_remap_manifests_request(&mut self, request: RemapManifestsRequest) { + let region_id = request.region_id; + let sender = request.sender; + let region = match self.regions.staging_region(region_id) { + Ok(region) => region, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + + let same_table = request + .input_regions + .iter() + .map(|r| r.table_id()) + .all(|t| t == region_id.table_id()); + + if !same_table { + let _ = sender.send( + InvalidRequestSnafu { + region_id, + reason: "Input regions must be from the same table", + } + .fail(), + ); + return; + } + + let region_metadata_loader = + RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone()); + common_runtime::spawn_global(async move { + let result = Self::fetch_and_remap_manifests( + region, + region_metadata_loader, + request.input_regions, + request.new_partition_exprs, + request.region_mapping, + ) + .await; + + let _ = sender.send(result); + }); + } + + async fn fetch_and_remap_manifests( + region: MitoRegionRef, + region_metadata_loader: RegionMetadataLoader, + input_regions: Vec, + new_partition_exprs: HashMap, + region_mapping: HashMap>, + ) -> Result> { + let mut tasks = Vec::with_capacity(input_regions.len()); + let region_options = region.version().options.clone(); + let table_dir = region.table_dir(); + + let now = Instant::now(); + for input_region in &input_regions { + let region_dir = region_dir_from_table_dir(table_dir, *input_region, PathType::Bare); + let storage = region_options.storage.clone(); + let moved_region_metadata_loader = region_metadata_loader.clone(); + tasks.push(async move { + moved_region_metadata_loader + .load_manifest(®ion_dir, &storage) + .await + }); + } + + let results = try_join_all(tasks) + .await + .map_err(BoxedError::new) + .context(FetchManifestsSnafu)?; + let manifests = results + .into_iter() + .zip(input_regions) + .map(|(manifest_res, region_id)| { + let manifest = manifest_res.context(MissingManifestSnafu { region_id })?; + Ok((region_id, (*manifest).clone())) + }) + .collect::>>()?; + let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping); + let remap_result = mapper.remap_manifests()?; + info!( + "Remap manifests cost: {:?}, region: {}", + now.elapsed(), + region.region_id + ); + + Ok(remap_result.new_manifests) + } +} diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index cc5712b8a5..5c24915bde 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -29,7 +29,8 @@ use store_api::metadata::{ }; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, + RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber}; @@ -117,6 +118,13 @@ impl RegionEngine for MetaRegionEngine { unimplemented!() } + async fn remap_manifests( + &self, + _request: RemapManifestsRequest, + ) -> Result { + unimplemented!() + } + fn role(&self, _region_id: RegionId) -> Option { None } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 49ad140bcd..8ef5d0596c 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -693,6 +693,26 @@ impl SyncManifestResponse { } } +/// Request to remap manifests from old regions to new regions. +#[derive(Debug, Clone)] +pub struct RemapManifestsRequest { + /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation. + pub region_id: RegionId, + /// Regions to remap manifests from. + pub input_regions: Vec, + /// For each old region, which new regions should receive its files + pub region_mapping: HashMap>, + /// New partition expressions for the new regions. + pub new_partition_exprs: HashMap, +} + +/// Response to remap manifests from old regions to new regions. +#[derive(Debug, Clone)] +pub struct RemapManifestsResponse { + /// The new manifests for the new regions. + pub new_manifests: HashMap, +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -817,6 +837,12 @@ pub trait RegionEngine: Send + Sync { manifest_info: RegionManifestInfo, ) -> Result; + /// Remaps manifests from old regions to new regions. + async fn remap_manifests( + &self, + request: RemapManifestsRequest, + ) -> Result; + /// Sets region role state gracefully. /// /// After the call returns, the engine ensures no more write operations will succeed in the region.