Compare commits

...

1 Commits

Author SHA1 Message Date
Weny Xu
2ae20daa62 feat: add sync region instruction for repartition procedure (#7562)
* feat: add sync region instruction for repartition procedure

This commit introduces a new sync region instruction and integrates it
into the repartition procedure flow, specifically for metric engine tables.

Changes:
- Add SyncRegion instruction type and SyncRegionsReply in instruction.rs
- Implement SyncRegionHandler in datanode to handle sync region requests
- Add SyncRegion state in repartition procedure to sync newly allocated regions
- Integrate sync region step after enter_staging_region for metric engine tables
- Add sync_region flag and allocated_region_ids to PersistentContext
- Make SyncRegionFromRequest serializable for instruction transmission
- Add test utilities and mock support for sync region operations

The sync region step is conditionally executed based on the table engine type,
ensuring that newly allocated regions in metric engine tables are properly
synced from their source regions before proceeding with manifest remapping.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(repartition): improve staging region handling and support metric engine repartition
- Reorder sync region flow: move SyncRegion from EnterStagingRegion to RepartitionStart to sync before applying staging
- Add ExitStaging metadata update state to properly clear staging leader info after repartition completes
- Update build_template_from_raw_table_info to optionally skip metric engine internal columns when creating region requests
- Fix region state transition: set_dropping now expects specific state (Staging or Writable) for proper validation
- Adjust region drop and copy handlers to handle staging regions correctly
- Add comprehensive test cases for metric engine SPLIT/MERGE partition operations on physical tables with logical tables
- Improve logging for table route updates, region drops, and repartition operations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: removes code duplication

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update result

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add error strategy support for flush region and flush pending deallocate regions

- **Add `ErrorStrategy` enum** in `procedure/utils.rs`:
  - Supports `Ignore` and `Retry` strategies for error handling
  - Refactor `flush_region` to accept `error_strategy` parameter
  - Extract `handle_flush_region_reply` helper function for better code organization

- **Add pending deallocate region support**:
  - Add `pending_deallocate_region_ids` field to `PersistentContext`
  - Implement `flush_pending_deallocate_regions` in `EnterStagingRegion` state
  - Flush pending deallocate regions before entering staging regions to ensure data consistency

- **Update error handling**:
  - `flush_leader_region`: Use `ErrorStrategy::Ignore` to skip unreachable datanodes
  - `sync_region`: Use `ErrorStrategy::Retry` for critical operations
  - `enter_staging_region`: Use `ErrorStrategy::Retry` when flushing pending deallocate regions

This change improves the robustness of the repartition procedure by:
1. Providing flexible error handling strategies for flush operations
2. Ensuring pending deallocate regions are properly flushed before repartitioning
3. Preventing data inconsistency during region migration

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: compile

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-15 04:52:57 +00:00
37 changed files with 1673 additions and 224 deletions

View File

@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
let template = build_template_from_raw_table_info(raw_table_info, false)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -20,7 +20,9 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column,
};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
@@ -30,34 +32,45 @@ use crate::wal_provider::prepare_wal_options;
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
pub fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
skip_internal_columns: bool,
) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
let filtered = raw_table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
.filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name))
.map(|(i, c)| {
let is_primary_key = primary_key_indices.contains(&i);
let column_def = try_as_column_def(c, is_primary_key)
.context(error::ConvertColumnDefSnafu { column: &c.name })?;
Ok(RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
})
Ok((
is_primary_key.then_some(i),
RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
},
))
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<(Option<usize>, RegionColumnDef)>>>()?;
let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip();
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
primary_key: new_primary_key_indices
.iter()
.flatten()
.map(|i| *i as u32)
.collect(),
path: String::new(),
options,
partition: None,

View File

@@ -17,6 +17,7 @@ use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -530,6 +531,25 @@ impl Display for EnterStagingRegion {
}
}
/// Instruction payload for syncing a region from a manifest or another region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SyncRegion {
/// Region id to sync.
pub region_id: RegionId,
/// Request to sync the region.
pub request: SyncRegionFromRequest,
}
impl Display for SyncRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SyncRegion(region_id={}, request={:?})",
self.region_id, self.request
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RemapManifest {
pub region_id: RegionId,
@@ -602,8 +622,11 @@ pub enum Instruction {
Suspend,
/// Makes regions enter staging state.
EnterStagingRegions(Vec<EnterStagingRegion>),
/// Syncs regions.
SyncRegions(Vec<SyncRegion>),
/// Remaps manifests for a region.
RemapManifest(RemapManifest),
/// Applies staging manifests for a region.
ApplyStagingManifests(Vec<ApplyStagingManifest>),
}
@@ -669,6 +692,13 @@ impl Instruction {
_ => None,
}
}
pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
match self {
Self::SyncRegions(sync_regions) => Some(sync_regions),
_ => None,
}
}
}
/// The reply of [UpgradeRegion].
@@ -784,6 +814,31 @@ impl EnterStagingRegionsReply {
}
}
/// Reply for a single region sync request.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SyncRegionReply {
/// Region id of the synced region.
pub region_id: RegionId,
/// Returns true if the region is successfully synced and ready.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error message if any during the operation.
pub error: Option<String>,
}
/// Reply for a batch of region sync requests.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SyncRegionsReply {
pub replies: Vec<SyncRegionReply>,
}
impl SyncRegionsReply {
pub fn new(replies: Vec<SyncRegionReply>) -> Self {
Self { replies }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RemapManifestReply {
/// Returns false if the region does not exist.
@@ -847,6 +902,7 @@ pub enum InstructionReply {
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
EnterStagingRegions(EnterStagingRegionsReply),
SyncRegions(SyncRegionsReply),
RemapManifest(RemapManifestReply),
ApplyStagingManifests(ApplyStagingManifestsReply),
}
@@ -872,6 +928,9 @@ impl Display for InstructionReply {
reply.replies
)
}
Self::SyncRegions(reply) => {
write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
}
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
Self::ApplyStagingManifests(reply) => write!(
f,
@@ -926,6 +985,13 @@ impl InstructionReply {
}
}
pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
match self {
Self::SyncRegions(reply) => reply.replies,
_ => panic!("Expected SyncRegion reply"),
}
}
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
match self {
Self::RemapManifest(reply) => reply,

View File

@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
let template = build_template_from_raw_table_info(raw_table_info, false)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -31,6 +31,7 @@ mod flush_region;
mod gc_worker;
mod open_region;
mod remap_manifest;
mod sync_region;
mod upgrade_region;
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
@@ -42,6 +43,7 @@ use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
use crate::heartbeat::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
@@ -132,6 +134,7 @@ impl RegionHeartbeatResponseHandler {
Instruction::EnterStagingRegions(_) => {
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
}
Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
Instruction::ApplyStagingManifests(_) => {
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
@@ -150,6 +153,7 @@ pub enum InstructionHandlers {
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
EnterStagingRegions(EnterStagingRegionsHandler),
SyncRegions(SyncRegionHandler),
RemapManifest(RemapManifestHandler),
ApplyStagingManifests(ApplyStagingManifestsHandler),
}
@@ -175,6 +179,7 @@ impl_from_handler!(
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions,
EnterStagingRegionsHandler => EnterStagingRegions,
SyncRegionHandler => SyncRegions,
RemapManifestHandler => RemapManifest,
ApplyStagingManifestsHandler => ApplyStagingManifests
);
@@ -222,6 +227,7 @@ dispatch_instr!(
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
EnterStagingRegions => EnterStagingRegions,
SyncRegions => SyncRegions,
RemapManifest => RemapManifest,
ApplyStagingManifests => ApplyStagingManifests,
);

View File

@@ -48,19 +48,32 @@ impl ApplyStagingManifestsHandler {
ctx: &HandlerContext,
request: ApplyStagingManifest,
) -> ApplyStagingManifestReply {
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
warn!("Region: {} is not found", request.region_id);
let ApplyStagingManifest {
region_id,
ref partition_expr,
central_region_id,
ref manifest_path,
} = request;
common_telemetry::info!(
"Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}",
region_id,
central_region_id,
partition_expr,
manifest_path
);
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: false,
ready: false,
error: None,
};
};
if !leader {
warn!("Region: {} is not leader", request.region_id);
warn!("Region: {} is not leader", region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: true,
ready: false,
error: Some("Region is not leader".into()),
@@ -70,25 +83,25 @@ impl ApplyStagingManifestsHandler {
match ctx
.region_server
.handle_request(
request.region_id,
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: request.partition_expr,
central_region_id: request.central_region_id,
manifest_path: request.manifest_path,
partition_expr: partition_expr.clone(),
central_region_id,
manifest_path: manifest_path.clone(),
}),
)
.await
{
Ok(_) => ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: true,
ready: true,
error: None,
},
Err(err) => {
error!(err; "Failed to apply staging manifest");
error!(err; "Failed to apply staging manifest, region_id: {}", region_id);
ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: true,
ready: false,
error: Some(format!("{err:?}")),

View File

@@ -51,6 +51,11 @@ impl EnterStagingRegionsHandler {
partition_expr,
}: EnterStagingRegion,
) -> EnterStagingRegionReply {
common_telemetry::info!(
"Datanode received enter staging region: {}, partition_expr: {}",
region_id,
partition_expr
);
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return EnterStagingRegionReply {
@@ -85,7 +90,7 @@ impl EnterStagingRegionsHandler {
error: None,
},
Err(err) => {
error!(err; "Failed to enter staging region");
error!(err; "Failed to enter staging region, region_id: {}", region_id);
EnterStagingRegionReply {
region_id,
ready: false,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
use common_telemetry::warn;
use common_telemetry::{error, info, warn};
use store_api::region_engine::RemapManifestsRequest;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
@@ -34,6 +34,12 @@ impl InstructionHandler for RemapManifestHandler {
region_mapping,
new_partition_exprs,
} = request;
info!(
"Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
region_id,
input_regions.len(),
new_partition_exprs.len()
);
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
@@ -67,11 +73,18 @@ impl InstructionHandler for RemapManifestHandler {
manifest_paths: result.manifest_paths,
error: None,
}),
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
}),
Err(e) => {
error!(
e;
"Remap manifests failed on datanode, region_id: {}",
region_id
);
InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
})
}
};
Some(reply)

View File

@@ -0,0 +1,192 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
use common_telemetry::{error, info, warn};
use futures::future::join_all;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
/// Handler for [SyncRegion] instruction.
/// It syncs the region from a manifest or another region.
#[derive(Debug, Clone, Copy, Default)]
pub struct SyncRegionHandler;
#[async_trait::async_trait]
impl InstructionHandler for SyncRegionHandler {
type Instruction = Vec<SyncRegion>;
/// Handles a batch of [SyncRegion] instructions.
async fn handle(
&self,
ctx: &HandlerContext,
regions: Self::Instruction,
) -> Option<InstructionReply> {
let futures = regions
.into_iter()
.map(|sync_region| Self::handle_sync_region(ctx, sync_region));
let results = join_all(futures).await;
Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
results,
)))
}
}
impl SyncRegionHandler {
/// Handles a single [SyncRegion] instruction.
async fn handle_sync_region(
ctx: &HandlerContext,
SyncRegion { region_id, request }: SyncRegion,
) -> SyncRegionReply {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return SyncRegionReply {
region_id,
ready: false,
exists: false,
error: None,
};
};
if !writable {
warn!("Region: {} is not writable", region_id);
return SyncRegionReply {
region_id,
ready: false,
exists: true,
error: Some("Region is not writable".into()),
};
}
match ctx.region_server.sync_region(region_id, request).await {
Ok(_) => {
info!("Successfully synced region: {}", region_id);
SyncRegionReply {
region_id,
ready: true,
exists: true,
error: None,
}
}
Err(e) => {
error!(e; "Failed to sync region: {}", region_id);
SyncRegionReply {
region_id,
ready: false,
exists: true,
error: Some(format!("{:?}", e)),
}
}
}
}
}
#[cfg(test)]
mod tests {
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_handle_sync_region_not_found() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let region_id = RegionId::new(1024, 1);
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(!reply[0].exists);
assert!(!reply[0].ready);
}
#[tokio::test]
async fn test_handle_sync_region_not_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
r.mock_role = Some(Some(RegionRole::Follower));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(reply[0].exists);
assert!(!reply[0].ready);
assert!(reply[0].error.is_some());
}
#[tokio::test]
async fn test_handle_sync_region_success() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
r.mock_role = Some(Some(RegionRole::Leader));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(reply[0].exists);
assert!(reply[0].ready);
assert!(reply[0].error.is_none());
}
}

View File

@@ -115,12 +115,17 @@ pub type MockSetReadonlyGracefullyHandler =
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub type MockSyncRegionHandler = Box<
dyn Fn(RegionId, SyncRegionFromRequest) -> Result<SyncRegionFromResponse, Error> + Send + Sync,
>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) handle_sync_region_mock_fn: Option<MockSyncRegionHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -136,6 +141,7 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -156,6 +162,7 @@ impl MockRegionEngine {
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -176,6 +183,7 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -197,6 +205,7 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -286,10 +295,14 @@ impl RegionEngine for MockRegionEngine {
async fn sync_region(
&self,
_region_id: RegionId,
_request: SyncRegionFromRequest,
region_id: RegionId,
request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
unimplemented!()
if let Some(mock_fn) = &self.handle_sync_region_mock_fn {
return mock_fn(region_id, request).map_err(BoxedError::new);
};
Ok(SyncRegionFromResponse::Mito { synced: true })
}
async fn remap_manifests(

View File

@@ -14,19 +14,15 @@
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::OptionExt;
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::error::{self, Result};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
use crate::procedure::utils;
/// Flushes the leader region before downgrading it.
///
@@ -61,15 +57,6 @@ impl State for PreFlushRegion {
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
Instruction::FlushRegions(FlushRegions::sync_batch(
pc.region_ids.clone(),
FlushErrorStrategy::TryAll,
))
}
/// Tries to flush a leader region.
///
/// Ignore:
@@ -89,109 +76,18 @@ impl PreFlushRegion {
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_ids = &ctx.persistent_ctx.region_ids;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
region_ids,
leader,
operation_timeout,
utils::ErrorStrategy::Ignore,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let reply_result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
};
let (result, error) = reply_result;
if let Some(error) = error {
warn!(
"Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
region_ids, leader, &error
);
} else if result {
info!(
"The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader regions",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, leader
);
Ok(())
}
Err(err) => Err(err),
}
.await
}
}
@@ -202,11 +98,13 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
};
use crate::service::mailbox::Channel;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))

View File

@@ -47,7 +47,7 @@ use common_procedure::{
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::error;
use common_telemetry::{error, info};
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -232,7 +232,10 @@ impl Context {
&new_region_routes,
table_id,
)?;
info!(
"Updating table route for table: {}, new region routes: {:?}",
table_id, new_region_routes
);
self.table_metadata_manager
.update_table_route(
table_id,
@@ -262,6 +265,13 @@ impl Context {
.await;
Ok(())
}
/// Returns the next operation timeout.
///
/// If the next operation timeout is not set, it will return `None`.
pub fn next_operation_timeout(&self) -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(10))
}
}
#[async_trait::async_trait]
@@ -335,6 +345,13 @@ impl Procedure for RepartitionProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let state_name = state.name();
// Log state transition
common_telemetry::info!(
"Repartition procedure executing state: {}, table_id: {}",
state_name,
self.context.persistent_ctx.table_id
);
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;

View File

@@ -65,6 +65,12 @@ impl State for AllocateRegion {
&mut next_region_number,
&self.plan_entries,
);
let plan_count = repartition_plan_entries.len();
let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
info!(
"Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
table_id, plan_count, to_allocate
);
// If no region to allocate, directly dispatch the plan.
if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
@@ -99,6 +105,20 @@ impl State for AllocateRegion {
.await
.context(error::AllocateWalOptionsSnafu { table_id })?;
let new_region_count = new_allocated_region_routes.len();
let new_regions_brief: Vec<_> = new_allocated_region_routes
.iter()
.map(|route| {
let region_id = route.region.id;
let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
format!("region_id: {}, peer: {}", region_id, peer)
})
.collect();
info!(
"Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
table_id, new_region_count, new_regions_brief
);
let _operating_guards = Self::register_operating_regions(
&ctx.memory_region_keeper,
&new_allocated_region_routes,
@@ -137,7 +157,6 @@ impl AllocateRegion {
Self { plan_entries }
}
#[allow(dead_code)]
fn register_operating_regions(
memory_region_keeper: &MemoryRegionKeeperRef,
region_routes: &[RegionRoute],
@@ -155,7 +174,6 @@ impl AllocateRegion {
Ok(operating_guards)
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
new_allocated_region_ids: &[RegionRoute],
@@ -177,7 +195,6 @@ impl AllocateRegion {
///
/// This method takes the allocation plan entries and converts them to repartition plan entries,
/// updating `next_region_number` for each newly allocated region.
#[allow(dead_code)]
fn convert_to_repartition_plans(
table_id: TableId,
next_region_number: &mut RegionNumber,
@@ -196,7 +213,6 @@ impl AllocateRegion {
}
/// Collects all regions that need to be allocated from the repartition plan entries.
#[allow(dead_code)]
fn collect_allocate_regions(
repartition_plan_entries: &[RepartitionPlanEntry],
) -> Vec<&RegionDescriptor> {
@@ -207,7 +223,6 @@ impl AllocateRegion {
}
/// Prepares region allocation data: region numbers and their partition expressions.
#[allow(dead_code)]
fn prepare_region_allocation_data(
allocate_regions: &[&RegionDescriptor],
) -> Result<Vec<(RegionNumber, String)>> {
@@ -225,7 +240,6 @@ impl AllocateRegion {
}
/// Calculates the total number of regions that need to be allocated.
#[allow(dead_code)]
fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
repartition_plan_entries
.iter()
@@ -234,12 +248,10 @@ impl AllocateRegion {
}
/// Gets the next region number from the physical table route.
#[allow(dead_code)]
fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
max_region_number + 1
}
#[allow(dead_code)]
async fn allocate_regions(
node_manager: &NodeManagerRef,
raw_table_info: &RawTableInfo,
@@ -252,12 +264,14 @@ impl AllocateRegion {
&raw_table_info.name,
);
let table_id = raw_table_info.ident.table_id;
let request = build_template_from_raw_table_info(raw_table_info)
let request = build_template_from_raw_table_info(raw_table_info, true)
.context(error::BuildCreateRequestSnafu { table_id })?;
let builder = CreateRequestBuilder::new(request, None);
let region_count = region_routes.len();
let wal_region_count = wal_options.len();
info!(
"Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}",
table_id, region_routes, wal_options
"Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
table_id, region_count, wal_region_count
);
let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
executor

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
use common_telemetry::error;
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -64,9 +64,10 @@ impl Collect {
impl State for Collect {
async fn next(
&mut self,
_ctx: &mut Context,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
for procedure_meta in self.inflight_procedures.iter() {
let procedure_id = procedure_meta.procedure_id;
let group_id = procedure_meta.group_id;
@@ -93,7 +94,16 @@ impl State for Collect {
}
}
if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
let inflight = self.inflight_procedures.len();
let succeeded = self.succeeded_procedures.len();
let failed = self.failed_procedures.len();
let unknown = self.unknown_procedures.len();
info!(
"Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
table_id, inflight, succeeded, failed, unknown
);
if failed > 0 || unknown > 0 {
// TODO(weny): retry the failed or unknown procedures.
}

View File

@@ -62,9 +62,10 @@ impl State for DeallocateRegion {
.flat_map(|p| p.pending_deallocate_region_ids.iter())
.cloned()
.collect::<HashSet<_>>();
let dealloc_count = pending_deallocate_region_ids.len();
info!(
"Deallocating regions: {:?} for table: {} during repartition procedure",
pending_deallocate_region_ids, table_id
"Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
table_id, dealloc_count, pending_deallocate_region_ids
);
let table_lock = TableLock::Write(table_id).into();
@@ -111,7 +112,6 @@ impl State for DeallocateRegion {
}
impl DeallocateRegion {
#[allow(dead_code)]
async fn deallocate_regions(
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
@@ -136,7 +136,6 @@ impl DeallocateRegion {
Ok(())
}
#[allow(dead_code)]
fn filter_deallocatable_region_routes(
table_id: TableId,
region_routes: &[RegionRoute],
@@ -161,7 +160,6 @@ impl DeallocateRegion {
.collect::<Vec<_>>()
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
pending_deallocate_region_ids: &HashSet<RegionId>,

View File

@@ -16,7 +16,9 @@ use std::any::Any;
use std::collections::HashMap;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::RegionId;
use crate::error::Result;
@@ -28,7 +30,6 @@ use crate::procedure::repartition::{self, Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dispatch;
#[allow(dead_code)]
fn build_region_mapping(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
@@ -57,8 +58,12 @@ impl State for Dispatch {
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
let table_info_value = ctx.get_table_info_value().await?;
let table_engine = table_info_value.table_info.meta.engine;
let sync_region = table_engine == METRIC_ENGINE_NAME;
let plan_count = ctx.persistent_ctx.plans.len();
let mut procedures = Vec::with_capacity(plan_count);
let mut procedure_metas = Vec::with_capacity(plan_count);
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
let region_mapping = build_region_mapping(
&plan.source_regions,
@@ -73,6 +78,9 @@ impl State for Dispatch {
plan.source_regions.clone(),
plan.target_regions.clone(),
region_mapping,
sync_region,
plan.allocated_region_ids.clone(),
plan.pending_deallocate_region_ids.clone(),
);
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
@@ -85,6 +93,14 @@ impl State for Dispatch {
procedures.push(procedure);
}
let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect();
info!(
"Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}",
table_id,
group_ids.len(),
group_ids
);
Ok((
Box::new(Collect::new(procedure_metas)),
Status::suspended(procedures, true),

View File

@@ -17,6 +17,7 @@ pub(crate) mod enter_staging_region;
pub(crate) mod remap_manifest;
pub(crate) mod repartition_end;
pub(crate) mod repartition_start;
pub(crate) mod sync_region;
pub(crate) mod update_metadata;
pub(crate) mod utils;
@@ -40,7 +41,7 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::error;
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -55,7 +56,6 @@ use crate::service::mailbox::MailboxRef;
pub type GroupId = Uuid;
#[allow(dead_code)]
pub struct RepartitionGroupProcedure {
state: Box<dyn State>,
context: Context,
@@ -113,6 +113,14 @@ impl Procedure for RepartitionGroupProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let state_name = state.name();
// Log state transition
common_telemetry::info!(
"Repartition group procedure executing state: {}, group id: {}, table id: {}",
state_name,
self.context.persistent_ctx.group_id,
self.context.persistent_ctx.table_id
);
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
@@ -221,9 +229,16 @@ pub struct PersistentContext {
/// The staging manifest paths of the repartition group.
/// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
pub staging_manifest_paths: HashMap<RegionId, String>,
/// Whether sync region is needed for this group.
pub sync_region: bool,
/// The region ids of the newly allocated regions.
pub allocated_region_ids: Vec<RegionId>,
/// The region ids of the regions that are pending deallocation.
pub pending_deallocate_region_ids: Vec<RegionId>,
}
impl PersistentContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
group_id: GroupId,
table_id: TableId,
@@ -232,6 +247,9 @@ impl PersistentContext {
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
sync_region: bool,
allocated_region_ids: Vec<RegionId>,
pending_deallocate_region_ids: Vec<RegionId>,
) -> Self {
Self {
group_id,
@@ -243,6 +261,9 @@ impl PersistentContext {
region_mapping,
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
sync_region,
allocated_region_ids,
pending_deallocate_region_ids,
}
}
@@ -334,6 +355,7 @@ impl Context {
new_region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
let central_region_datanode_table_value = self
@@ -345,6 +367,10 @@ impl Context {
..
} = &central_region_datanode_table_value.region_info;
info!(
"Updating table route for table: {}, group_id: {}, new region routes: {:?}",
table_id, group_id, new_region_routes
);
self.table_metadata_manager
.update_table_route(
table_id,

View File

@@ -31,7 +31,7 @@ use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
@@ -52,7 +52,10 @@ impl State for ApplyStagingManifest {
) -> Result<(Box<dyn State>, Status)> {
self.apply_staging_manifests(ctx).await?;
Ok((Box::new(RepartitionEnd), Status::executing(true)))
Ok((
Box::new(UpdateMetadata::ExitStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
@@ -125,7 +128,6 @@ impl ApplyStagingManifest {
})
}
#[allow(dead_code)]
async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -150,6 +152,7 @@ impl ApplyStagingManifest {
operation: "Apply staging manifests",
})?;
let instruction_region_count: usize = instructions.values().map(|v| v.len()).sum();
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, apply_staging_manifests)| {
@@ -166,8 +169,11 @@ impl ApplyStagingManifest {
})
.unzip();
info!(
"Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
"Sent apply staging manifests instructions, table_id: {}, group_id: {}, peers: {}, regions: {}",
table_id,
group_id,
peers.len(),
instruction_region_count
);
let format_err_msg = |idx: usize, error: &Error| {
@@ -292,11 +298,7 @@ impl ApplyStagingManifest {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received apply staging manifests reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let elapsed = now.elapsed();
let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) =
reply
else {
@@ -306,9 +308,23 @@ impl ApplyStagingManifest {
}
.fail();
};
let total = replies.len();
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
let region_ids = replies.iter().map(|r| r.region_id).collect::<Vec<_>>();
for reply in replies {
if reply.error.is_some() {
with_error += 1;
} else if reply.ready {
ready += 1;
} else {
not_ready += 1;
}
Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?;
}
info!(
"Received apply staging manifests reply, peer: {:?}, total_regions: {}, regions:{:?}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
peer, total, region_ids, ready, not_ready, with_error, elapsed
);
Ok(())
}

View File

@@ -23,7 +23,7 @@ use common_meta::instruction::{
use common_meta::peer::Peer;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use futures::future::{join_all, try_join_all};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
@@ -35,6 +35,7 @@ use crate::procedure::repartition::group::utils::{
};
use crate::procedure::repartition::group::{Context, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::utils::{self, ErrorStrategy};
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
@@ -48,6 +49,7 @@ impl State for EnterStagingRegion {
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
self.flush_pending_deallocate_regions(ctx).await?;
self.enter_staging_regions(ctx).await?;
Ok((Box::new(RemapManifest), Status::executing(true)))
@@ -94,7 +96,6 @@ impl EnterStagingRegion {
Ok(instructions)
}
#[allow(dead_code)]
async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -102,6 +103,8 @@ impl EnterStagingRegion {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let targets = &ctx.persistent_ctx.targets;
let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?;
let target_region_count = targets.len();
let peer_count = instructions.len();
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
@@ -123,8 +126,8 @@ impl EnterStagingRegion {
})
.unzip();
info!(
"Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
"Sent enter staging regions instructions, table_id: {}, group_id: {}, peers: {}, target_regions: {}",
table_id, group_id, peer_count, target_region_count
);
let format_err_msg = |idx: usize, error: &Error| {
@@ -242,11 +245,7 @@ impl EnterStagingRegion {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received enter staging regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let elapsed = now.elapsed();
let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) =
reply
else {
@@ -256,9 +255,22 @@ impl EnterStagingRegion {
}
.fail();
};
let total = replies.len();
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
for reply in replies {
if reply.error.is_some() {
with_error += 1;
} else if reply.ready {
ready += 1;
} else {
not_ready += 1;
}
Self::handle_enter_staging_region_reply(&reply, &now, peer)?;
}
info!(
"Received enter staging regions reply, peer: {:?}, total_regions: {}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
peer, total, ready, not_ready, with_error, elapsed
);
Ok(())
}
@@ -320,6 +332,61 @@ impl EnterStagingRegion {
Ok(())
}
async fn flush_pending_deallocate_regions(&self, ctx: &mut Context) -> Result<()> {
let pending_deallocate_region_ids = &ctx.persistent_ctx.pending_deallocate_region_ids;
if pending_deallocate_region_ids.is_empty() {
return Ok(());
}
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush pending deallocate regions",
})?;
let result = &ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let source_routes = result
.source_routes
.iter()
.filter(|route| pending_deallocate_region_ids.contains(&route.region.id))
.cloned()
.collect::<Vec<_>>();
let peer_region_ids_map = group_region_routes_by_peer(&source_routes);
info!(
"Flushing pending deallocate regions, table_id: {}, group_id: {}, peer_region_ids_map: {:?}",
table_id, group_id, peer_region_ids_map
);
let now = Instant::now();
let tasks = peer_region_ids_map
.iter()
.map(|(peer, region_ids)| {
utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
region_ids,
peer,
operation_timeout,
ErrorStrategy::Retry,
)
})
.collect::<Vec<_>>();
try_join_all(tasks).await?;
info!(
"Flushed pending deallocate regions: {:?}, table_id: {}, group_id: {}, elapsed: {:?}",
source_routes
.iter()
.map(|route| route.region.id)
.collect::<Vec<_>>(),
table_id,
group_id,
now.elapsed()
);
Ok(())
}
}
#[cfg(test)]

View File

@@ -65,6 +65,13 @@ impl State for RemapManifest {
.await?;
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let manifest_count = manifest_paths.len();
let input_region_count = ctx.persistent_ctx.sources.len();
let target_region_count = ctx.persistent_ctx.targets.len();
info!(
"Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
table_id, group_id, input_region_count, target_region_count, manifest_count
);
if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
warn!(
@@ -156,11 +163,7 @@ impl RemapManifest {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received remap manifest reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let elapsed = now.elapsed();
let InstructionReply::RemapManifest(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
@@ -168,6 +171,11 @@ impl RemapManifest {
}
.fail();
};
let manifest_count = reply.manifest_paths.len();
info!(
"Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
remap.region_id, manifest_count, elapsed
);
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
@@ -56,7 +57,6 @@ impl RepartitionStart {
/// Ensures that both source and target regions are present in the region routes.
///
/// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
#[allow(dead_code)]
fn ensure_route_present(
group_id: GroupId,
region_routes: &[RegionRoute],
@@ -172,6 +172,28 @@ impl State for RepartitionStart {
ctx.persistent_ctx.targets.len()
);
if ctx.persistent_ctx.sync_region {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let allocated_region_ids: HashSet<_> = ctx
.persistent_ctx
.allocated_region_ids
.iter()
.copied()
.collect();
let region_routes: Vec<_> = prepare_result
.target_routes
.iter()
.filter(|route| allocated_region_ids.contains(&route.region.id))
.cloned()
.collect();
if !region_routes.is_empty() {
return Ok((
Box::new(SyncRegion { region_routes }),
Status::executing(true),
));
}
}
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),

View File

@@ -0,0 +1,445 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::utils::ErrorStrategy;
use crate::service::mailbox::{Channel, MailboxRef};
const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
/// The state of syncing regions for a repartition group.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncRegion {
pub region_routes: Vec<RegionRoute>,
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for SyncRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Self::flush_central_region(ctx).await?;
self.sync_regions(ctx).await?;
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl SyncRegion {
async fn flush_central_region(ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush central region",
})?;
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
crate::procedure::utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
&[prepare_result.central_region],
&prepare_result.central_region_datanode,
operation_timeout,
ErrorStrategy::Retry,
)
.await
}
/// Builds instructions to sync regions on datanodes.
fn build_sync_region_instructions(
central_region: RegionId,
region_routes: &[RegionRoute],
) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
for (peer, region_ids) in target_region_routes_by_peer {
let sync_regions = region_ids
.into_iter()
.map(|region_id| {
let request = SyncRegionFromRequest::FromRegion {
source_region_id: central_region,
parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
};
common_meta::instruction::SyncRegion { region_id, request }
})
.collect();
instructions.insert((*peer).clone(), sync_regions);
}
instructions
}
/// Syncs regions on datanodes.
async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let instructions = Self::build_sync_region_instructions(
prepare_result.central_region,
&self.region_routes,
);
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Sync regions",
})?;
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, sync_regions)| {
(
peer,
Self::sync_region(
&ctx.mailbox,
&ctx.server_addr,
peer,
sync_regions,
operation_timeout,
),
)
})
.unzip();
info!(
"Sent sync regions instructions to peers: {:?} for repartition table {}",
peers, table_id
);
let format_err_msg = |idx: usize, error: &Error| {
let peer = peers[idx];
format!(
"Failed to sync regions on datanode {:?}, error: {:?}",
peer, error
)
};
let results = join_all(tasks).await;
let result = handle_multiple_results(&results);
match result {
HandleMultipleResult::AllSuccessful => Ok(()),
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
reason: format!(
"All retryable errors during syncing regions for repartition table {}: {:?}",
table_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
violated: format!(
"All non retryable errors during syncing regions for repartition table {}: {:?}",
table_id,
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
} => error::UnexpectedSnafu {
violated: format!(
"Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
table_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
),
}
.fail(),
}
}
/// Syncs regions on a datanode.
async fn sync_region(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
sync_regions: &[common_meta::instruction::SyncRegion],
timeout: Duration,
) -> Result<()> {
let ch = Channel::Datanode(peer.id);
let instruction = Instruction::SyncRegions(sync_regions.to_vec());
let message = MailboxMessage::json_message(
&format!(
"Sync regions: {:?}",
sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let now = std::time::Instant::now();
let receiver = mailbox.send(&ch, message, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received sync regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect sync regions reply",
}
.fail();
};
for reply in replies {
Self::handle_sync_region_reply(&reply, &now, peer)?;
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
fn handle_sync_region_reply(
SyncRegionReply {
region_id,
ready,
exists,
error,
}: &SyncRegionReply,
now: &Instant,
peer: &Peer,
) -> Result<()> {
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
)
}
);
if let Some(error) = error {
return error::RetryLaterSnafu {
reason: format!(
"Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id,
peer,
error,
now.elapsed()
),
}
.fail();
}
ensure!(
ready,
error::RetryLaterSnafu {
reason: format!(
"Region {} failed to sync on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
),
}
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::RegionId;
use crate::error::Error;
use crate::procedure::repartition::group::GroupPrepareResult;
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
use crate::service::mailbox::Channel;
#[test]
fn test_build_sync_region_instructions() {
let table_id = 1024;
let central_region = RegionId::new(table_id, 1);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let instructions =
SyncRegion::build_sync_region_instructions(central_region, &region_routes);
assert_eq!(instructions.len(), 1);
let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
assert_eq!(peer_instructions.len(), 1);
assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
let SyncRegionFromRequest::FromRegion {
source_region_id, ..
} = &peer_instructions[0].request
else {
panic!("expect from region request");
};
assert_eq!(*source_region_id, central_region);
}
fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
GroupPrepareResult {
source_routes: vec![],
target_routes: vec![],
central_region: RegionId::new(table_id, 1),
central_region_datanode: Peer::empty(1),
}
}
#[tokio::test]
async fn test_sync_regions_all_successful() {
let mut env = TestingEnv::new();
let table_id = 1024;
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_sync_region_reply(
id,
RegionId::new(1024, 3),
true,
true,
None,
))
});
let mut ctx = env.create_context(persistent_context);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let sync_region = SyncRegion { region_routes };
sync_region.sync_regions(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_sync_regions_retryable() {
let env = TestingEnv::new();
let table_id = 1024;
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let mut ctx = env.create_context(persistent_context);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let sync_region = SyncRegion { region_routes };
let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub(crate) mod apply_staging_region;
pub(crate) mod exit_staging_region;
pub(crate) mod rollback_staging_region;
use std::any::Any;
@@ -28,11 +29,14 @@ use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::enum_variant_names)]
pub enum UpdateMetadata {
/// Applies the new partition expressions for staging regions.
ApplyStaging,
/// Rolls back the new partition expressions for staging regions.
RollbackStaging,
/// Exits the staging regions.
ExitStaging,
}
#[async_trait::async_trait]
@@ -62,7 +66,18 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
err;
"Failed to broadcast the invalidate table cache message during the rollback staging regions"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))
}
UpdateMetadata::ExitStaging => {
self.exit_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during the exit staging regions"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -77,7 +77,6 @@ impl UpdateMetadata {
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -90,6 +89,13 @@ impl UpdateMetadata {
region_routes,
)?;
let source_count = ctx.persistent_ctx.sources.len();
let target_count = ctx.persistent_ctx.targets.len();
info!(
"Apply staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
fn exit_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for target in targets {
let region_route = region_routes_map.get_mut(&target.region_id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region_id,
},
)?;
region_route.clear_leader_staging();
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
region_route.clear_leader_staging();
}
Ok(region_routes)
}
/// Exits the staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Target region not found.
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
pub(crate) async fn exit_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
let new_region_routes = Self::exit_staging_region_routes(
group_id,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
region_routes,
)?;
let source_count = ctx.persistent_ctx.sources.len();
let target_count = ctx.persistent_ctx.targets.len();
info!(
"Exit staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -29,7 +29,6 @@ impl UpdateMetadata {
/// Abort:
/// - Source region not found.
/// - Target region not found.
#[allow(dead_code)]
fn rollback_staging_region_routes(
group_id: GroupId,
source_routes: &[RegionRoute],
@@ -74,7 +73,6 @@ impl UpdateMetadata {
/// - Target region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -89,6 +87,13 @@ impl UpdateMetadata {
region_routes,
)?;
let source_count = prepare_result.source_routes.len();
let target_count = prepare_result.target_routes.len();
info!(
"Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use partition::expr::PartitionExpr;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
@@ -69,6 +70,17 @@ impl State for RepartitionStart {
);
let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
let plan_count = plans.len();
let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
let total_target_regions: usize =
plans.iter().map(|p| p.target_partition_exprs.len()).sum();
common_telemetry::info!(
"Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
table_id,
plan_count,
total_source_regions,
total_target_regions
);
if plans.is_empty() {
return Ok((Box::new(RepartitionEnd), Status::done()));
@@ -86,7 +98,6 @@ impl State for RepartitionStart {
}
impl RepartitionStart {
#[allow(dead_code)]
fn build_plan(
physical_route: &PhysicalTableRouteValue,
from_exprs: &[PartitionExpr],
@@ -106,7 +117,6 @@ impl RepartitionStart {
))
}
#[allow(dead_code)]
fn build_plan_entries(
subtasks: Vec<RepartitionSubtask>,
source_index: &[RegionDescriptor],
@@ -159,8 +169,9 @@ impl RepartitionStart {
.find_map(|(region_id, existing_expr)| {
(existing_expr == &expr_json).then_some(*region_id)
})
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
expr: expr_json,
.with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
.inspect_err(|_| {
debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
})?;
Ok(RegionDescriptor {

View File

@@ -96,5 +96,8 @@ pub fn new_persistent_context(
region_mapping: HashMap::new(),
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
sync_region: false,
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
}
}

View File

@@ -18,7 +18,8 @@ use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, SyncRegionReply, SyncRegionsReply,
UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
@@ -253,6 +254,34 @@ pub fn new_enter_staging_region_reply(
}
}
/// Generates a [InstructionReply::SyncRegions] reply.
pub fn new_sync_region_reply(
id: u64,
region_id: RegionId,
ready: bool,
exists: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::SyncRegions(SyncRegionsReply::new(vec![
SyncRegionReply {
region_id,
ready,
exists,
error,
},
])))
.unwrap(),
)),
}
}
/// Mock the test data for WAL pruning.
pub async fn new_wal_prune_metadata(
table_metadata_manager: TableMetadataManagerRef,

View File

@@ -12,6 +12,185 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_meta::peer::Peer;
use common_telemetry::{info, warn};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
pub(crate) enum ErrorStrategy {
Ignore,
Retry,
}
fn handle_flush_region_reply(
reply: &InstructionReply,
region_ids: &[RegionId],
msg: &MailboxMessage,
) -> Result<(bool, Option<String>)> {
let result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {:?}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
};
Ok(result)
}
/// Flushes the regions on the datanode.
///
/// Retry Or Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
pub(crate) async fn flush_region(
mailbox: &MailboxRef,
server_addr: &str,
region_ids: &[RegionId],
datanode: &Peer,
timeout: Duration,
error_strategy: ErrorStrategy,
) -> Result<()> {
let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
region_ids.to_vec(),
FlushErrorStrategy::TryAll,
));
let msg = MailboxMessage::json_message(
&format!("Flush regions: {:?}", region_ids),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", datanode.id, datanode.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(datanode.id);
let now = Instant::now();
let receiver = mailbox.send(&ch, msg, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, datanode
);
return Ok(());
}
ErrorStrategy::Retry => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
datanode,
now.elapsed()
),
}
.fail()?,
},
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
if let Some(error) = error {
match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
region_ids, datanode, error
);
}
ErrorStrategy::Retry => {
return error::RetryLaterSnafu {
reason: format!(
"Failed to flush regions {:?}, the datanode({}) error is retried: {}",
region_ids,
datanode,
error,
),
}
.fail()?;
}
}
} else if result {
info!(
"The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
datanode,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush regions",
}
.fail(),
Err(err) => Err(err),
}
}
#[cfg(any(test, feature = "mock"))]
pub mod mock {
use std::io::Error;

View File

@@ -14,7 +14,7 @@
//! Drop a metric region
use common_telemetry::info;
use common_telemetry::{debug, info};
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
@@ -46,6 +46,15 @@ impl MetricEngineInner {
.physical_region_states()
.get(&data_region_id)
{
debug!(
"Physical region {} is busy, there are still some logical regions: {:?}",
data_region_id,
state
.logical_regions()
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
);
(true, !state.logical_regions().is_empty())
} else {
// the second argument is not used, just pass in a dummy value

View File

@@ -314,11 +314,8 @@ impl MitoRegion {
/// Sets the dropping state.
/// You should call this method in the worker loop.
pub(crate) fn set_dropping(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Dropping),
)
pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
}
/// Sets the truncating state.

View File

@@ -31,7 +31,7 @@ impl<S> RegionWorkerLoop<S> {
let region_id = request.region_id;
let source_region_id = request.source_region_id;
let sender = request.sender;
let region = match self.regions.writable_region(region_id) {
let region = match self.regions.writable_non_staging_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));

View File

@@ -42,12 +42,18 @@ where
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_non_staging_region(region_id)?;
let region = self.regions.writable_region(region_id)?;
info!("Try to drop region: {}, worker: {}", region_id, self.id);
let is_staging = region.is_staging();
let expect_state = if is_staging {
RegionLeaderState::Staging
} else {
RegionLeaderState::Writable
};
// Marks the region as dropping.
region.set_dropping()?;
region.set_dropping(expect_state)?;
// Writes dropping marker
// We rarely drop a region so we still operate in the worker loop.
let region_dir = region.access_layer.build_region_dir(region_id);

View File

@@ -638,7 +638,7 @@ impl RegionStatistic {
}
/// Request to sync the region from a manifest or a region.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SyncRegionFromRequest {
/// Syncs the region using manifest information.
/// Used in leader-follower manifest sync scenarios.

View File

@@ -99,3 +99,155 @@ DROP TABLE alter_repartition_table;
Affected Rows: 0
-- Metric engine repartition test
CREATE TABLE metric_physical_table (
ts TIMESTAMP TIME INDEX,
host STRING,
cpu DOUBLE,
PRIMARY KEY(host)
)
PARTITION ON COLUMNS (host) (
host < 'h1',
host >= 'h1' AND host < 'h2',
host >= 'h2'
)
ENGINE = metric
WITH (
physical_metric_table = "true"
);
Affected Rows: 0
CREATE TABLE logical_table_v1 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
Affected Rows: 0
CREATE TABLE logical_table_v2 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
Affected Rows: 0
-- Split physical table partition
ALTER TABLE metric_physical_table SPLIT PARTITION (
host < 'h1'
) INTO (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
Affected Rows: 0
SHOW CREATE TABLE metric_physical_table;
+-----------------------+------------------------------------------------------+
| Table | Create Table |
+-----------------------+------------------------------------------------------+
| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "host" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'h0', |
| | host >= 'h1' AND host < 'h2', |
| | host >= 'h2', |
| | host >= 'h0' AND host < 'h1' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = 'true' |
| | ) |
+-----------------------+------------------------------------------------------+
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
++
++
SELECT * FROM logical_table_v1;
++
++
SELECT * FROM logical_table_v2;
++
++
-- Merge physical table partition
ALTER TABLE metric_physical_table MERGE PARTITION (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
Affected Rows: 0
SHOW CREATE TABLE metric_physical_table;
+-----------------------+------------------------------------------------------+
| Table | Create Table |
+-----------------------+------------------------------------------------------+
| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "host" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'h0' OR host >= 'h0' AND host < 'h1', |
| | host >= 'h1' AND host < 'h2', |
| | host >= 'h2' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = 'true' |
| | ) |
+-----------------------+------------------------------------------------------+
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
++
++
SELECT * FROM logical_table_v1;
++
++
SELECT * FROM logical_table_v2;
++
++
DROP TABLE logical_table_v1;
Affected Rows: 0
DROP TABLE logical_table_v2;
Affected Rows: 0
DROP TABLE metric_physical_table;
Affected Rows: 0

View File

@@ -46,3 +46,78 @@ ALTER TABLE alter_repartition_table REPARTITION (
);
DROP TABLE alter_repartition_table;
-- Metric engine repartition test
CREATE TABLE metric_physical_table (
ts TIMESTAMP TIME INDEX,
host STRING,
cpu DOUBLE,
PRIMARY KEY(host)
)
PARTITION ON COLUMNS (host) (
host < 'h1',
host >= 'h1' AND host < 'h2',
host >= 'h2'
)
ENGINE = metric
WITH (
physical_metric_table = "true"
);
CREATE TABLE logical_table_v1 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
CREATE TABLE logical_table_v2 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
-- Split physical table partition
ALTER TABLE metric_physical_table SPLIT PARTITION (
host < 'h1'
) INTO (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
SHOW CREATE TABLE metric_physical_table;
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
SELECT * FROM logical_table_v1;
SELECT * FROM logical_table_v2;
-- Merge physical table partition
ALTER TABLE metric_physical_table MERGE PARTITION (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
SHOW CREATE TABLE metric_physical_table;
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
SELECT * FROM logical_table_v1;
SELECT * FROM logical_table_v2;
DROP TABLE logical_table_v1;
DROP TABLE logical_table_v2;
DROP TABLE metric_physical_table;