feat: introduce remap_manifests for RegionEngine (#7265)

* refactor: consolidate RegionManifestOptions creation logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce`remap_manifests` for `RegionEngine`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Weny Xu
2025-11-25 20:09:20 +08:00
committed by GitHub
parent 7e4f0af065
commit 6b6d1ce7c4
17 changed files with 647 additions and 75 deletions

View File

@@ -34,7 +34,8 @@ use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -291,6 +292,13 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn remap_manifests(
&self,
_request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -27,8 +27,9 @@ use snafu::{OptionExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SinglePartitionScanner, SyncManifestResponse,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner,
SyncManifestResponse,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -150,6 +151,18 @@ impl RegionEngine for FileRegionEngine {
Ok(SyncManifestResponse::NotSupported)
}
async fn remap_manifests(
&self,
_request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError> {
Err(BoxedError::new(
UnsupportedSnafu {
operation: "remap_manifests",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}

View File

@@ -43,8 +43,8 @@ use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
@@ -53,7 +53,10 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
use crate::error::{
self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu,
UnsupportedRemapManifestsRequestSnafu,
};
use crate::metadata_region::MetadataRegion;
use crate::repeated_task::FlushMetadataRegionTask;
use crate::row_modifier::RowModifier;
@@ -350,6 +353,20 @@ impl RegionEngine for MetricEngine {
.map_err(BoxedError::new)
}
async fn remap_manifests(
&self,
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError> {
let region_id = request.region_id;
if self.inner.is_physical_region(region_id) {
self.inner.mito.remap_manifests(request).await
} else {
Err(BoxedError::new(
UnsupportedRemapManifestsRequestSnafu { region_id }.build(),
))
}
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,

View File

@@ -242,6 +242,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unsupported remap manifests request for region {}", region_id))]
UnsupportedRemapManifestsRequest {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported alter kind: {}", kind))]
UnsupportedAlterKind {
kind: String,
@@ -324,7 +331,8 @@ impl ErrorExt for Error {
| AddingFieldColumn { .. }
| ParseRegionOptions { .. }
| UnexpectedRequest { .. }
| UnsupportedAlterKind { .. } => StatusCode::InvalidArguments,
| UnsupportedAlterKind { .. }
| UnsupportedRemapManifestsRequest { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported

View File

@@ -41,11 +41,9 @@ use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::metrics;
use crate::read::{FlatSource, Source};
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
@@ -162,19 +160,9 @@ pub async fn open_compaction_region(
};
let manifest_manager = {
let region_manifest_options = RegionManifestOptions {
manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
&req.table_dir,
req.region_id,
req.path_type,
)),
object_store: object_store.clone(),
compress_type: manifest_compress_type(mito_config.compress_manifest),
checkpoint_distance: mito_config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
enable_gc: mito_config.gc.enable,
},
};
let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
let region_manifest_options =
RegionManifestOptions::new(mito_config, &region_dir, object_store);
RegionManifestManager::open(region_manifest_options, &Default::default())
.await?

View File

@@ -71,6 +71,9 @@ mod sync_test;
#[cfg(test)]
mod truncate_test;
#[cfg(test)]
mod remap_manifests_test;
mod puffin_index;
use std::any::Any;
@@ -101,7 +104,8 @@ use store_api::metric_engine_consts::{
};
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
@@ -116,7 +120,7 @@ use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -1027,6 +1031,28 @@ impl EngineInner {
receiver.await.context(RecvSnafu)?
}
async fn remap_manifests(
&self,
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse> {
let region_id = request.region_id;
let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
self.workers.submit_to_worker(region_id, request).await?;
let manifests = receiver.await.context(RecvSnafu)??;
let new_manifests = manifests
.into_iter()
.map(|(region_id, manifest)| {
Ok((
region_id,
serde_json::to_string(&manifest)
.context(SerializeManifestSnafu { region_id })?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(RemapManifestsResponse { new_manifests })
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_follower() {
@@ -1203,6 +1229,16 @@ impl RegionEngine for MitoEngine {
Ok(SyncManifestResponse::Mito { synced })
}
async fn remap_manifests(
&self,
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError> {
self.inner
.remap_manifests(request)
.await
.map_err(BoxedError::new)
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}

View File

@@ -0,0 +1,239 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use api::v1::Rows;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::region_engine::{RegionEngine, RemapManifestsRequest, SettableRegionRoleState};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::manifest::action::RegionManifest;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
#[tokio::test]
async fn test_remap_manifests_invalid_partition_expr() {
common_telemetry::init_default_ut_logging();
test_remap_manifests_invalid_partition_expr_with_format(false).await;
test_remap_manifests_invalid_partition_expr_with_format(true).await;
}
async fn test_remap_manifests_invalid_partition_expr_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let err = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id],
region_mapping: [(region_id, vec![region_id])].into_iter().collect(),
new_partition_exprs: [(region_id, "invalid expr".to_string())]
.into_iter()
.collect(),
})
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::InvalidPartitionExpr { .. }
)
}
#[tokio::test]
async fn test_remap_manifests_invalid_region_state() {
common_telemetry::init_default_ut_logging();
test_remap_manifests_invalid_region_state_with_format(false).await;
test_remap_manifests_invalid_region_state_with_format(true).await;
}
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
async fn test_remap_manifests_invalid_region_state_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let err = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id],
region_mapping: [(region_id, vec![region_id])].into_iter().collect(),
new_partition_exprs: [(region_id, range_expr("x", 0, 100).as_json_str().unwrap())]
.into_iter()
.collect(),
})
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
)
}
#[tokio::test]
async fn test_remap_manifests_invalid_input_regions() {
common_telemetry::init_default_ut_logging();
test_remap_manifests_invalid_input_regions_with_format(false).await;
test_remap_manifests_invalid_input_regions_with_format(true).await;
}
async fn test_remap_manifests_invalid_input_regions_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-input-regions").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let err = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id, RegionId::new(2, 1)],
region_mapping: [(region_id, vec![region_id])].into_iter().collect(),
new_partition_exprs: [(region_id, range_expr("x", 0, 100).as_json_str().unwrap())]
.into_iter()
.collect(),
})
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::InvalidRequest { .. }
)
}
#[tokio::test]
async fn test_remap_manifests_success() {
common_telemetry::init_default_ut_logging();
test_remap_manifests_success_with_format(false).await;
test_remap_manifests_success_with_format(true).await;
}
async fn test_remap_manifests_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(range_expr("tag_0", 0, 100).as_json_str().unwrap()))
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let new_region_id_1 = RegionId::new(1, 2);
let new_region_id_2 = RegionId::new(1, 3);
// Generate some data
for i in 0..3 {
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(i * 10, (i + 1) * 10),
};
put_rows(&engine, region_id, rows_data).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
}
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let result = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id],
region_mapping: [(region_id, vec![new_region_id_1, new_region_id_2])]
.into_iter()
.collect(),
new_partition_exprs: [
(
new_region_id_1,
range_expr("tag_0", 0, 50).as_json_str().unwrap(),
),
(
new_region_id_2,
range_expr("tag_0", 50, 100).as_json_str().unwrap(),
),
]
.into_iter()
.collect(),
})
.await
.unwrap();
assert_eq!(result.new_manifests.len(), 2);
let new_manifest_1 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_1]).unwrap();
let new_manifest_2 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_2]).unwrap();
assert_eq!(new_manifest_1.files.len(), 3);
assert_eq!(new_manifest_2.files.len(), 3);
}

View File

@@ -104,6 +104,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to serialize manifest, region_id: {}", region_id))]
SerializeManifest {
region_id: RegionId,
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid scan index, start: {}, end: {}", start, end))]
InvalidScanIndex {
start: ManifestVersion,
@@ -232,6 +241,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Manifest missing for region {}", region_id))]
MissingManifest {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File consistency check failed for file {}: {}", file_id, reason))]
InconsistentFile {
file_id: FileId,
@@ -254,6 +270,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to fetch manifests"))]
FetchManifests {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Partition expression missing for region {}", region_id))]
MissingPartitionExpr {
region_id: RegionId,
@@ -1172,7 +1195,8 @@ impl ErrorExt for Error {
| FilesLost { .. }
| InstallManifestTo { .. }
| Unexpected { .. }
| SerializeColumnMetadata { .. } => StatusCode::Unexpected,
| SerializeColumnMetadata { .. }
| SerializeManifest { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
@@ -1190,6 +1214,7 @@ impl ErrorExt for Error {
| DurationOutOfRange { .. }
| MissingOldManifest { .. }
| MissingNewManifest { .. }
| MissingManifest { .. }
| NoOldManifests { .. }
| MissingPartitionExpr { .. }
| SerializePartitionExpr { .. } => StatusCode::InvalidArguments,
@@ -1211,6 +1236,8 @@ impl ErrorExt for Error {
| Metadata { .. }
| MitoManifestInfo { .. } => StatusCode::Internal,
FetchManifests { source, .. } => source.status_code(),
OpenRegion { source, .. } => source.status_code(),
WriteParquet { .. } => StatusCode::StorageUnavailable,

View File

@@ -24,6 +24,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::FileId;
use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
use crate::config::MitoConfig;
use crate::error::{
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
};
@@ -33,7 +34,8 @@ use crate::manifest::action::{
};
use crate::manifest::checkpointer::Checkpointer;
use crate::manifest::storage::{
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
manifest_dir,
};
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
@@ -52,6 +54,23 @@ pub struct RegionManifestOptions {
pub remove_file_options: RemoveFileOptions,
}
impl RegionManifestOptions {
/// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
RegionManifestOptions {
manifest_dir: manifest_dir(region_dir),
object_store: object_store.clone(),
// We don't allow users to set the compression algorithm as we use it as a file suffix.
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
compress_type: manifest_compress_type(config.compress_manifest),
checkpoint_distance: config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
enable_gc: config.gc.enable,
},
}
}
}
/// Options for updating `removed_files` field in [RegionManifest].
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature = "test"), derive(Default))]

View File

@@ -24,6 +24,7 @@ use crc32fast::Hasher;
use futures::TryStreamExt;
use futures::future::try_join_all;
use lazy_static::lazy_static;
use object_store::util::join_dir;
use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -49,6 +50,11 @@ const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip
const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
const FETCH_MANIFEST_PARALLELISM: usize = 16;
/// Returns the directory to the manifest files.
pub fn manifest_dir(region_dir: &str) -> String {
join_dir(region_dir, "manifest")
}
/// Returns the [CompressionType] according to whether to compress manifest files.
pub const fn manifest_compress_type(compress: bool) -> CompressionType {
if compress {

View File

@@ -1035,6 +1035,24 @@ impl RegionMap {
Ok(region)
}
/// Gets staging region by region id.
///
/// Returns error if the region does not exist or is not in staging state.
pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_staging(),
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
}
);
Ok(region)
}
/// Gets flushable region by region id.
///
/// Returns error if the region does not exist or is not operable.

View File

@@ -28,7 +28,7 @@ use log_store::kafka::log_store::KafkaLogStore;
use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
use object_store::util::normalize_dir;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::logstore::LogStore;
use store_api::logstore::provider::Provider;
@@ -49,8 +49,7 @@ use crate::error::{
Result, StaleLogEntrySnafu,
};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::memtable::MemtableBuilderProvider;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
@@ -272,7 +271,7 @@ impl RegionOpener {
};
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options =
Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
RegionManifestOptions::new(config, &region_dir, &object_store);
// For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
let manifest_manager = RegionManifestManager::new(
@@ -406,13 +405,9 @@ impl RegionOpener {
) -> Result<Option<MitoRegionRef>> {
let now = Instant::now();
let mut region_options = self.options.as_ref().unwrap().clone();
let region_manifest_options = Self::manifest_options(
config,
&region_options,
&self.region_dir(),
&self.object_store_manager,
)?;
let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
let region_manifest_options =
RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &self.stats).await?
else {
@@ -576,27 +571,6 @@ impl RegionOpener {
Ok(Some(region))
}
/// Returns a new manifest options.
fn manifest_options(
config: &MitoConfig,
options: &RegionOptions,
region_dir: &str,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<RegionManifestOptions> {
let object_store = get_object_store(&options.storage, object_store_manager)?;
Ok(RegionManifestOptions {
manifest_dir: new_manifest_dir(region_dir),
object_store,
// We don't allow users to set the compression algorithm as we use it as a file suffix.
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
compress_type: manifest_compress_type(config.compress_manifest),
checkpoint_distance: config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
enable_gc: config.gc.enable,
},
})
}
}
/// Creates a version builder from a region manifest.
@@ -648,6 +622,7 @@ pub fn get_object_store(
}
/// A loader for loading metadata from a region dir.
#[derive(Debug, Clone)]
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
@@ -668,7 +643,9 @@ impl RegionMetadataLoader {
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self.load_manifest(region_dir, region_options).await?;
let manifest = self
.load_manifest(region_dir, &region_options.storage)
.await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
@@ -676,14 +653,11 @@ impl RegionMetadataLoader {
pub async fn load_manifest(
&self,
region_dir: &str,
region_options: &RegionOptions,
storage: &Option<String>,
) -> Result<Option<Arc<RegionManifest>>> {
let region_manifest_options = RegionOpener::manifest_options(
&self.config,
region_options,
region_dir,
&self.object_store_manager,
)?;
let object_store = get_object_store(storage, &self.object_store_manager)?;
let region_manifest_options =
RegionManifestOptions::new(&self.config, region_dir, &object_store);
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
else {
@@ -848,11 +822,6 @@ where
Ok(last_entry_id)
}
/// Returns the directory to the manifest files.
pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
join_dir(region_dir, "manifest")
}
/// A task to load and fill the region file cache.
pub(crate) struct RegionLoadCacheTask {
region: MitoRegionRef,

View File

@@ -26,6 +26,7 @@ use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
use common_telemetry::info;
use datatypes::prelude::DataType;
use partition::expr::PartitionExpr;
use prometheus::HistogramTimer;
use prost::Message;
use smallvec::SmallVec;
@@ -44,9 +45,10 @@ use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
Result, UnexpectedSnafu,
};
use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::manifest::action::{RegionEdit, RegionManifest, TruncateKind};
use crate::memtable::MemtableId;
use crate::memtable::bulk::part::BulkPart;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
@@ -600,6 +602,9 @@ pub(crate) enum WorkerRequest {
request: RegionBulkInsertsRequest,
sender: OptionOutputTx,
},
/// Remap manifests request.
RemapManifests(RemapManifestsRequest),
}
impl WorkerRequest {
@@ -761,6 +766,48 @@ impl WorkerRequest {
receiver,
)
}
/// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
///
/// # Errors
///
/// Returns an error if the partition expression is invalid or missing.
/// Returns an error if the new partition expressions are not found for some regions.
#[allow(clippy::type_complexity)]
pub(crate) fn try_from_remap_manifests_request(
store_api::region_engine::RemapManifestsRequest {
region_id,
input_regions,
region_mapping,
new_partition_exprs,
}: store_api::region_engine::RemapManifestsRequest,
) -> Result<(
WorkerRequest,
Receiver<Result<HashMap<RegionId, RegionManifest>>>,
)> {
let (sender, receiver) = oneshot::channel();
let new_partition_exprs = new_partition_exprs
.into_iter()
.map(|(k, v)| {
Ok((
k,
PartitionExpr::from_json_str(&v)
.context(InvalidPartitionExprSnafu { expr: v })?
.context(MissingPartitionExprSnafu { region_id: k })?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
let request = RemapManifestsRequest {
region_id,
input_regions,
region_mapping,
new_partition_exprs,
sender,
};
Ok((WorkerRequest::RemapManifests(request), receiver))
}
}
/// DDL request to a region.
@@ -993,6 +1040,20 @@ pub(crate) struct RegionSyncRequest {
pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
}
#[derive(Debug)]
pub(crate) struct RemapManifestsRequest {
/// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
pub(crate) region_id: RegionId,
/// Regions to remap manifests from.
pub(crate) input_regions: Vec<RegionId>,
/// For each old region, which new regions should receive its files
pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
/// New partition expressions for the new regions.
pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
/// Result sender.
pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>,
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;

View File

@@ -25,6 +25,7 @@ mod handle_flush;
mod handle_manifest;
mod handle_open;
mod handle_rebuild_index;
mod handle_remap;
mod handle_truncate;
mod handle_write;
@@ -1002,6 +1003,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
}
}
WorkerRequest::RemapManifests(req) => {
self.handle_remap_manifests_request(req);
}
}
}

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Instant;
use common_error::ext::BoxedError;
use common_telemetry::info;
use futures::future::try_join_all;
use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt};
use store_api::region_request::PathType;
use store_api::storage::RegionId;
use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::RegionManifest;
use crate::region::MitoRegionRef;
use crate::region::opener::RegionMetadataLoader;
use crate::remap_manifest::RemapManifest;
use crate::request::RemapManifestsRequest;
use crate::sst::location::region_dir_from_table_dir;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) fn handle_remap_manifests_request(&mut self, request: RemapManifestsRequest) {
let region_id = request.region_id;
let sender = request.sender;
let region = match self.regions.staging_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let same_table = request
.input_regions
.iter()
.map(|r| r.table_id())
.all(|t| t == region_id.table_id());
if !same_table {
let _ = sender.send(
InvalidRequestSnafu {
region_id,
reason: "Input regions must be from the same table",
}
.fail(),
);
return;
}
let region_metadata_loader =
RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
common_runtime::spawn_global(async move {
let result = Self::fetch_and_remap_manifests(
region,
region_metadata_loader,
request.input_regions,
request.new_partition_exprs,
request.region_mapping,
)
.await;
let _ = sender.send(result);
});
}
async fn fetch_and_remap_manifests(
region: MitoRegionRef,
region_metadata_loader: RegionMetadataLoader,
input_regions: Vec<RegionId>,
new_partition_exprs: HashMap<RegionId, PartitionExpr>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
) -> Result<HashMap<RegionId, RegionManifest>> {
let mut tasks = Vec::with_capacity(input_regions.len());
let region_options = region.version().options.clone();
let table_dir = region.table_dir();
let now = Instant::now();
for input_region in &input_regions {
let region_dir = region_dir_from_table_dir(table_dir, *input_region, PathType::Bare);
let storage = region_options.storage.clone();
let moved_region_metadata_loader = region_metadata_loader.clone();
tasks.push(async move {
moved_region_metadata_loader
.load_manifest(&region_dir, &storage)
.await
});
}
let results = try_join_all(tasks)
.await
.map_err(BoxedError::new)
.context(FetchManifestsSnafu)?;
let manifests = results
.into_iter()
.zip(input_regions)
.map(|(manifest_res, region_id)| {
let manifest = manifest_res.context(MissingManifestSnafu { region_id })?;
Ok((region_id, (*manifest).clone()))
})
.collect::<Result<HashMap<_, _>>>()?;
let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping);
let remap_result = mapper.remap_manifests()?;
info!(
"Remap manifests cost: {:?}, region: {}",
now.elapsed(),
region.region_id
);
Ok(remap_result.new_manifests)
}
}

View File

@@ -29,7 +29,8 @@ use store_api::metadata::{
};
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber};
@@ -117,6 +118,13 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!()
}
async fn remap_manifests(
&self,
_request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError> {
unimplemented!()
}
fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
None
}

View File

@@ -693,6 +693,26 @@ impl SyncManifestResponse {
}
}
/// Request to remap manifests from old regions to new regions.
#[derive(Debug, Clone)]
pub struct RemapManifestsRequest {
/// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
pub region_id: RegionId,
/// Regions to remap manifests from.
pub input_regions: Vec<RegionId>,
/// For each old region, which new regions should receive its files
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
/// New partition expressions for the new regions.
pub new_partition_exprs: HashMap<RegionId, String>,
}
/// Response to remap manifests from old regions to new regions.
#[derive(Debug, Clone)]
pub struct RemapManifestsResponse {
/// The new manifests for the new regions.
pub new_manifests: HashMap<RegionId, String>,
}
#[async_trait]
pub trait RegionEngine: Send + Sync {
/// Name of this engine
@@ -817,6 +837,12 @@ pub trait RegionEngine: Send + Sync {
manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError>;
/// Remaps manifests from old regions to new regions.
async fn remap_manifests(
&self,
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError>;
/// Sets region role state gracefully.
///
/// After the call returns, the engine ensures no more write operations will succeed in the region.