Compare commits

...

7 Commits

Author SHA1 Message Date
Zhenchi
72a6b9ff66 wip
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-10-31 05:03:38 +00:00
Zhenchi
1286d4ca74 Merge remote-tracking branch 'origin/main' into zhongzc/repartition-procedure-scaffold 2025-10-26 10:56:09 +00:00
Zhenchi
3c1d7fcb89 feat: add group procedure
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-10-23 10:19:06 +00:00
Zhenchi
5be4987fd7 Merge remote-tracking branch 'origin/main' into zhongzc/repartition-procedure-scaffold 2025-10-22 08:50:16 +00:00
Zhenchi
db11022cff Merge remote-tracking branch 'origin/main' into zhongzc/repartition-procedure-scaffold 2025-10-13 06:45:56 +00:00
Zhenchi
15935ee89a fix
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-10-13 06:45:37 +00:00
Zhenchi
d0877997a2 feat: scaffold repartition procedure with plan/resource stubs
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-10-13 02:41:02 +00:00
15 changed files with 2714 additions and 11 deletions

11
Cargo.lock generated
View File

@@ -3934,6 +3934,7 @@ dependencies = [
"mito2",
"num_cpus",
"object-store",
"partition",
"prometheus",
"prost 0.13.5",
"query",
@@ -5324,7 +5325,6 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -6922,7 +6922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -7428,6 +7428,7 @@ dependencies = [
"local-ip-address",
"once_cell",
"parking_lot 0.12.4",
"partition",
"prometheus",
"prost 0.13.5",
"rand 0.9.1",
@@ -9859,7 +9860,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck 0.5.0",
"itertools 0.14.0",
"itertools 0.10.5",
"log",
"multimap",
"once_cell",
@@ -9905,7 +9906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 2.0.106",
@@ -14510,7 +14511,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@@ -147,7 +147,8 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
greptime-proto = { path = "../greptime-proto" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true }
tonic.workspace = true
tracing.workspace = true
typetag.workspace = true
uuid.workspace = true
[dev-dependencies]
chrono.workspace = true

View File

@@ -53,6 +53,7 @@ metric-engine.workspace = true
mito2.workspace = true
num_cpus.workspace = true
object-store.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true

View File

@@ -19,6 +19,7 @@ use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use mito2::remap_manifest::Error as RemapManifestError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
@@ -396,6 +397,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to remap manifests: {}", source))]
RemapManifest {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: RemapManifestError,
},
#[snafu(display("Not yet implemented: {what}"))]
NotYetImplemented { what: String },
}
@@ -469,6 +478,7 @@ impl ErrorExt for Error {
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
RemapManifest { .. } => StatusCode::Unexpected,
}
}

View File

@@ -24,7 +24,9 @@ use api::region::RegionResponse;
use api::v1::meta::TopicStat;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
ApplyStagedManifestRequest, ListMetadataRequest, PauseRequest, PublishRegionRuleRequest,
RegionResponse as RegionResponseV1, RemapManifestRequest, ResumeRequest,
StageRegionRuleRequest, SyncRequest, region_request,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
@@ -84,6 +86,8 @@ use crate::error::{
use crate::event_listener::RegionServerEventListenerRef;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
const REMAP_STATS_EXTENSION_KEY: &str = "repartition.manifest.stats";
#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
@@ -370,6 +374,24 @@ impl RegionServer {
}
}
/// Temporarily pauses compaction and snapshot related activities for the region.
///
/// Currently a stub; real implementation will coordinate with region worker.
pub async fn pause_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
info!("pause_compaction_and_snapshot stub invoked for region {region_id}");
let _ = region_id;
Ok(())
}
/// Resumes compaction and snapshot related activities for the region.
///
/// Currently a stub; real implementation will coordinate with region worker.
pub async fn resume_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
info!("resume_compaction_and_snapshot stub invoked for region {region_id}");
let _ = region_id;
Ok(())
}
/// Stop the region server.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
@@ -538,6 +560,124 @@ impl RegionServer {
Ok(response)
}
async fn handle_pause_region_request(&self, request: &PauseRequest) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
let tracing_context = TracingContext::from_current_span();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_pause_region_request",
region_id = region_id.to_string()
));
self.pause_compaction_and_snapshot(region_id)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
}
async fn handle_resume_region_request(
&self,
request: &ResumeRequest,
) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
let tracing_context = TracingContext::from_current_span();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_resume_region_request",
region_id = region_id.to_string()
));
self.resume_compaction_and_snapshot(region_id)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
}
async fn handle_stage_region_rule_request(
&self,
request: &StageRegionRuleRequest,
) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
info!(
"Stage region rule for region {region_id} with version {}",
request.rule_version
);
match self
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await?
{
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
Ok(RegionResponse::new(AffectedRows::default()))
}
SetRegionRoleStateResponse::InvalidTransition(err) => {
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
}
}
}
async fn handle_publish_region_rule_request(
&self,
request: &PublishRegionRuleRequest,
) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
info!(
"Publish region rule for region {region_id} with version {}",
request.rule_version
);
match self
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await?
{
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
Ok(RegionResponse::new(AffectedRows::default()))
}
SetRegionRoleStateResponse::InvalidTransition(err) => {
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
}
}
}
async fn handle_remap_manifest_request(
&self,
request: &RemapManifestRequest,
) -> Result<RegionResponse> {
info!(
"received remap manifest request for table {} group {}",
request.table_id, request.group_id
);
let stats_json = serde_json::to_vec(&serde_json::json!({
"files_per_region": HashMap::<u64, usize>::new(),
"total_file_refs": 0u64,
"empty_regions": Vec::<u64>::new(),
"group_id": &request.group_id,
}))
.context(SerializeJsonSnafu)?;
let mut extensions = HashMap::new();
extensions.insert(REMAP_STATS_EXTENSION_KEY.to_string(), stats_json);
Ok(RegionResponse {
affected_rows: 0,
extensions,
metadata: Vec::new(),
})
}
async fn handle_apply_staged_manifest_request(
&self,
request: &ApplyStagedManifestRequest,
) -> Result<RegionResponse> {
info!(
"received manifest apply request for table {} group {} publish={} regions {:?}",
request.table_id, request.group_id, request.publish, request.region_ids
);
Ok(RegionResponse {
affected_rows: 0,
extensions: HashMap::new(),
metadata: Vec::new(),
})
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
region_request::Body::Pause(pause_request) => {
self.handle_pause_region_request(pause_request).await
}
region_request::Body::Resume(resume_request) => {
self.handle_resume_region_request(resume_request).await
}
region_request::Body::StageRegionRule(stage_request) => {
self.handle_stage_region_rule_request(stage_request).await
}
region_request::Body::PublishRegionRule(publish_request) => {
self.handle_publish_region_rule_request(publish_request)
.await
}
region_request::Body::RemapManifest(remap_request) => {
self.handle_remap_manifest_request(remap_request).await
}
region_request::Body::ApplyStagedManifest(apply_request) => {
self.handle_apply_staged_manifest_request(apply_request)
.await
}
region_request::Body::ListMetadata(list_metadata_request) => {
self.handle_list_metadata_request(list_metadata_request)
.await

View File

@@ -45,6 +45,7 @@ common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
common-workload.workspace = true
partition.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool = { workspace = true, optional = true }

View File

@@ -17,12 +17,14 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::DatanodeId;
use common_procedure::ProcedureId;
use common_runtime::JoinError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use uuid::Uuid;
use crate::metasrv::SelectTarget;
use crate::pubsub::Message;
@@ -774,6 +776,129 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create repartition subtasks"))]
RepartitionCreateSubtasks {
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize partition expression"))]
RepartitionSerializePartitionExpr {
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Source partition expression '{}' does not match any existing region",
expr
))]
RepartitionSourceExprMismatch {
expr: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} is missing a source region id", group_id))]
RepartitionMissingSourceRegionId {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} cannot find route for source region {}",
group_id,
region_id
))]
RepartitionSourceRegionRouteMissing {
group_id: Uuid,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} has no source regions after planning", group_id))]
RepartitionNoSourceRegions {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} target {} is missing an allocated region id",
group_id,
target_index
))]
RepartitionMissingTargetRegionId {
group_id: Uuid,
target_index: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Route for target region {} not found", region_id))]
RepartitionTargetRegionRouteMissing {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} is missing prepare context", group_id))]
RepartitionMissingPrepareContext {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} has no registered subprocedure", group_id))]
RepartitionSubprocedureUnknown {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to fetch state for repartition group {} subprocedure {}",
group_id,
procedure_id
))]
RepartitionSubprocedureStateFetch {
group_id: Uuid,
procedure_id: ProcedureId,
#[snafu(source)]
source: common_procedure::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} subprocedure {} state missing",
group_id,
procedure_id
))]
RepartitionSubprocedureStateMissing {
group_id: Uuid,
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} subprocedure {} failed: {}",
group_id,
procedure_id,
reason
))]
RepartitionSubprocedureFailed {
group_id: Uuid,
procedure_id: ProcedureId,
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported operation {}", operation))]
Unsupported {
operation: String,
@@ -997,6 +1122,11 @@ impl Error {
matches!(self, Error::RetryLater { .. })
|| matches!(self, Error::RetryLaterWithSource { .. })
}
/// Returns `true` if the error requires cleaning poison records.
pub fn need_clean_poisons(&self) -> bool {
false
}
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -1012,6 +1142,8 @@ impl ErrorExt for Error {
| Error::TcpBind { .. }
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::RepartitionCreateSubtasks { .. }
| Error::RepartitionSerializePartitionExpr { .. }
| Error::NoLeader { .. }
| Error::LeaderLeaseExpired { .. }
| Error::LeaderLeaseChanged { .. }
@@ -1032,7 +1164,8 @@ impl ErrorExt for Error {
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildPartitionClient { .. }
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
| Error::BuildKafkaClient { .. }
| Error::RepartitionSubprocedureStateFetch { .. } => StatusCode::Internal,
Error::DeleteRecords { .. }
| Error::GetOffset { .. }
@@ -1066,7 +1199,14 @@ impl ErrorExt for Error {
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. }
| Error::HandlerNotFound { .. }
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
| Error::LeaderPeerChanged { .. }
| Error::RepartitionSourceExprMismatch { .. }
| Error::RepartitionMissingSourceRegionId { .. }
| Error::RepartitionSourceRegionRouteMissing { .. }
| Error::RepartitionNoSourceRegions { .. }
| Error::RepartitionMissingTargetRegionId { .. }
| Error::RepartitionTargetRegionRouteMissing { .. }
| Error::RepartitionMissingPrepareContext { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }
@@ -1080,7 +1220,10 @@ impl ErrorExt for Error {
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. }
| Error::RegionMigrated { .. } => StatusCode::Unexpected,
| Error::RegionMigrated { .. }
| Error::RepartitionSubprocedureUnknown { .. }
| Error::RepartitionSubprocedureStateMissing { .. }
| Error::RepartitionSubprocedureFailed { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. }

View File

@@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
pub mod region_migration;
pub mod repartition;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]

View File

@@ -0,0 +1,506 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod context;
mod group;
mod plan;
use std::collections::HashMap;
use common_meta::ddl::DdlContext;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock};
use common_procedure::error::{Error as ProcedureError, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, ProcedureId, ProcedureWithId, Status,
};
use common_telemetry::error;
use partition::expr::PartitionExpr;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use strum::AsRefStr;
use uuid::Uuid;
use self::context::{GroupManifestSummary, RepartitionContext};
use self::group::RepartitionGroupProcedure;
use self::plan::{PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand};
use crate::error::{self, Result};
/// Task payload passed from the DDL entry point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionTask {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Partition expressions representing the source regions.
pub from_exprs: Vec<PartitionExpr>,
/// Partition expressions representing the target regions.
pub into_exprs: Vec<PartitionExpr>,
}
/// Procedure that orchestrates the repartition flow.
pub struct RepartitionProcedure {
context: DdlContext,
group_context: RepartitionContext,
data: RepartitionData,
}
impl RepartitionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
/// Constructs a new procedure instance from a task payload.
pub fn new(task: RepartitionTask, context: DdlContext) -> Result<Self> {
let group_context = RepartitionContext::new(&context);
Ok(Self {
context,
group_context,
data: RepartitionData::new(task),
})
}
/// Builds the repartition plan if we have not done so yet.
async fn on_prepare(&mut self) -> Result<Status> {
if self.data.plan.is_none() {
self.build_plan().await?;
}
self.data.state = RepartitionState::AllocateResources;
Ok(Status::executing(true))
}
/// Allocates target regions and decides whether the procedure can proceed.
async fn on_allocate_resources(&mut self) -> Result<Status> {
if !self.data.resource_allocated {
let allocated = self.allocate_resources().await?;
if !allocated {
if let Some(plan) = &self.data.plan {
let failed_groups = plan.entries.iter().map(|entry| entry.group_id);
self.data.failed_groups.extend(failed_groups);
}
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
self.data.resource_allocated = true;
}
self.data.state = RepartitionState::DispatchSubprocedures;
Ok(Status::executing(true))
}
/// Spawns group subprocedures for every pending plan entry.
async fn on_dispatch_subprocedures(&mut self) -> Result<Status> {
let plan = match self.data.plan.as_ref() {
Some(plan) => plan,
None => {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
};
let entries_to_schedule: Vec<PlanEntry> = plan
.entries
.iter()
.filter(|entry| {
!self.data.succeeded_groups.contains(&entry.group_id)
&& !self.data.failed_groups.contains(&entry.group_id)
})
.cloned()
.collect();
if entries_to_schedule.is_empty() {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
let groups_to_schedule: Vec<PlanGroupId> = entries_to_schedule
.iter()
.map(|entry| entry.group_id)
.collect();
let subprocedures = self.spawn_group_procedures(
plan.table_id,
plan.route_snapshot.clone(),
entries_to_schedule,
);
self.data.pending_groups = groups_to_schedule;
self.data.state = RepartitionState::CollectSubprocedures;
Ok(Status::suspended(subprocedures, true))
}
/// Records the list of subprocedures that finished and move to finalisation.
async fn on_collect_subprocedures(&mut self, ctx: &ProcedureContext) -> Result<Status> {
let pending = std::mem::take(&mut self.data.pending_groups);
let mut first_error: Option<error::Error> = None;
let mut succeeded = Vec::new();
for group_id in pending {
let procedure_id = match self.data.group_subprocedures.remove(&group_id) {
Some(id) => id,
None => {
let err = error::RepartitionSubprocedureUnknownSnafu { group_id }.build();
self.data.failed_groups.push(group_id);
if first_error.is_none() {
first_error = Some(err);
}
continue;
}
};
let state_opt = ctx.provider.procedure_state(procedure_id).await.context(
error::RepartitionSubprocedureStateFetchSnafu {
group_id,
procedure_id,
},
)?;
let state = match state_opt {
Some(state) => state,
None => {
let err = error::RepartitionSubprocedureStateMissingSnafu {
group_id,
procedure_id,
}
.build();
self.data.failed_groups.push(group_id);
if first_error.is_none() {
first_error = Some(err);
}
continue;
}
};
if state.is_done() {
succeeded.push(group_id);
continue;
}
let reason = state
.error()
.map(|err| err.to_string())
.unwrap_or_else(|| format!("subprocedure state {}", state.as_str_name()));
let err = error::RepartitionSubprocedureFailedSnafu {
group_id,
procedure_id,
reason,
}
.build();
self.data.failed_groups.push(group_id);
if first_error.is_none() {
first_error = Some(err);
}
}
self.data.succeeded_groups.extend(succeeded);
self.data.state = RepartitionState::Finalize;
if let Some(err) = first_error {
return Err(err);
}
Ok(Status::executing(true))
}
/// Builds the summary that will be returned to the caller.
async fn on_finalize(&mut self) -> Result<Status> {
self.deallocate_resources().await?;
self.data.summary = Some(RepartitionSummary {
succeeded_groups: self.data.succeeded_groups.clone(),
failed_groups: self.data.failed_groups.clone(),
manifest_summaries: self.group_context.manifest_summaries(),
});
self.group_context.clear_group_records();
self.data.state = RepartitionState::Finished;
Ok(Status::done())
}
/// Constructs the repartition plan from the task specification.
async fn build_plan(&mut self) -> Result<()> {
let table_id = self.data.task.table_id;
let from_exprs = &self.data.task.from_exprs;
let into_exprs = &self.data.task.into_exprs;
let (physical_table_id, physical_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.context(error::TableMetadataManagerSnafu)?;
let src_descriptors = Self::source_region_descriptors(from_exprs, &physical_route)?;
let subtasks = subtask::create_subtasks(from_exprs, into_exprs)
.context(error::RepartitionCreateSubtasksSnafu)?;
let entries = Self::build_plan_entries(subtasks, &src_descriptors, into_exprs);
let demand = ResourceDemand::from_plan_entries(&entries);
let plan = RepartitionPlan::new(physical_table_id, entries, demand, physical_route.clone());
self.data.plan = Some(plan);
Ok(())
}
fn source_region_descriptors(
from_exprs: &[PartitionExpr],
physical_route: &PhysicalTableRouteValue,
) -> Result<Vec<RegionDescriptor>> {
let existing_regions = physical_route
.region_routes
.iter()
.map(|route| (route.region.id, route.region.partition_expr()))
.collect::<Vec<_>>();
let descriptors = from_exprs
.iter()
.map(|expr| {
let expr_json = expr
.as_json_str()
.context(error::RepartitionSerializePartitionExprSnafu)?;
let matched_region_id = existing_regions
.iter()
.find_map(|(region_id, existing_expr)| {
(existing_expr == &expr_json).then_some(*region_id)
})
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
expr: expr_json,
})?;
Ok(RegionDescriptor {
region_id: Some(matched_region_id),
partition_expr: expr.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(descriptors)
}
fn build_plan_entries(
subtasks: Vec<RepartitionSubtask>,
source_index: &[RegionDescriptor],
target_exprs: &[PartitionExpr],
) -> Vec<PlanEntry> {
let plan_entries = subtasks
.into_iter()
.map(|subtask| {
let group_id = Uuid::new_v4();
let sources = subtask
.from_expr_indices
.iter()
.map(|&idx| source_index[idx].clone())
.collect::<Vec<_>>();
let targets = subtask
.to_expr_indices
.iter()
.map(|&idx| RegionDescriptor {
region_id: None, // will be assigned later
partition_expr: target_exprs[idx].clone(),
})
.collect::<Vec<_>>();
PlanEntry::new(group_id, subtask, sources, targets)
})
.collect::<Vec<_>>();
plan_entries
}
/// Allocates resources required by the plan. Returning `false`
/// indicates that the procedure should abort.
async fn allocate_resources(&mut self) -> Result<bool> {
todo!("allocate resources");
}
async fn deallocate_resources(&mut self) -> Result<()> {
if !self.data.resource_allocated {
return Ok(());
}
self.data.resource_allocated = false;
todo!("deallocate resources");
}
/// Builds the child procedure list for the provided plan groups.
fn spawn_group_procedures(
&mut self,
table_id: TableId,
route_snapshot: PhysicalTableRouteValue,
entries: Vec<PlanEntry>,
) -> Vec<ProcedureWithId> {
let mut id_map = HashMap::new();
let procedures = entries
.into_iter()
.map(|entry| {
let group_id = entry.group_id;
let group_procedure = RepartitionGroupProcedure::new(
entry,
table_id,
route_snapshot.clone(),
self.data.task.catalog_name.clone(),
self.data.task.schema_name.clone(),
self.group_context.clone(),
);
let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
id_map.insert(group_id, procedure.id);
procedure
})
.collect::<Vec<_>>();
self.data.group_subprocedures = id_map;
procedures
}
/// Composes the set of locks required to safely mutate table metadata.
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
let mut lock_key = Vec::with_capacity(3);
let catalog = self.data.task.catalog_name.as_str();
let schema = self.data.task.schema_name.as_str();
lock_key.push(CatalogLock::Read(catalog).into());
lock_key.push(SchemaLock::read(catalog, schema).into());
lock_key.push(TableLock::Write(self.data.task.table_id).into());
lock_key
}
async fn trigger_group_rollbacks(&mut self) {
if self.data.rollback_triggered {
return;
}
match self.group_context.rollback_registered_groups().await {
Ok(_) => {
self.data.rollback_triggered = true;
}
Err(err) => {
error!(err; "repartition: rollback of successful groups failed");
self.data.rollback_triggered = true;
}
}
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = self.data.state;
let status = match state {
RepartitionState::Prepare => self.on_prepare().await,
RepartitionState::AllocateResources => self.on_allocate_resources().await,
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await,
RepartitionState::Finalize => self.on_finalize().await,
RepartitionState::Finished => Ok(Status::done()),
};
match status {
Ok(status) => Ok(status),
Err(err) => {
self.trigger_group_rollbacks().await;
if let Err(dealloc_err) = self.deallocate_resources().await {
error!(dealloc_err; "repartition: deallocating resources after failure failed");
}
Err(map_repartition_error(err))
}
}
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::new(self.table_lock_key())
}
}
/// Serialized data of the repartition procedure.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RepartitionData {
state: RepartitionState,
task: RepartitionTask,
#[serde(default)]
plan: Option<RepartitionPlan>,
#[serde(default)]
resource_allocated: bool,
#[serde(default)]
pending_groups: Vec<PlanGroupId>,
#[serde(default)]
succeeded_groups: Vec<PlanGroupId>,
#[serde(default)]
failed_groups: Vec<PlanGroupId>,
#[serde(default)]
summary: Option<RepartitionSummary>,
#[serde(default)]
rollback_triggered: bool,
#[serde(default)]
group_subprocedures: HashMap<PlanGroupId, ProcedureId>,
}
impl RepartitionData {
/// Initialise the procedure data for a fresh run.
fn new(task: RepartitionTask) -> Self {
Self {
state: RepartitionState::Prepare,
task,
plan: None,
resource_allocated: false,
pending_groups: Vec::new(),
succeeded_groups: Vec::new(),
failed_groups: Vec::new(),
summary: None,
rollback_triggered: false,
group_subprocedures: HashMap::new(),
}
}
}
pub(super) fn map_repartition_error(err: error::Error) -> ProcedureError {
match (err.is_retryable(), err.need_clean_poisons()) {
(true, true) => ProcedureError::retry_later_and_clean_poisons(err),
(true, false) => ProcedureError::retry_later(err),
(false, true) => ProcedureError::external_and_clean_poisons(err),
(false, false) => ProcedureError::external(err),
}
}
/// High level states of the repartition procedure.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
enum RepartitionState {
Prepare,
AllocateResources,
DispatchSubprocedures,
CollectSubprocedures,
Finalize,
Finished,
}
/// Information returned to the caller after the procedure finishes.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RepartitionSummary {
succeeded_groups: Vec<PlanGroupId>,
failed_groups: Vec<PlanGroupId>,
#[serde(default)]
manifest_summaries: Vec<GroupManifestSummary>,
}

View File

@@ -0,0 +1,351 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use api::region::RegionResponse;
use api::v1::region::{
ApplyStagedManifestRequest, PauseRequest, PublishRegionRuleRequest, RegionRequest,
RegionRequestHeader, RemapManifestRequest, ResumeRequest, StageRegionRuleRequest,
region_request,
};
use common_error::ext::BoxedError;
use common_meta::ddl::DdlContext;
use common_meta::key::TableMetadataManagerRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::{self, Result};
pub const REMAP_MANIFEST_STATS_EXTENSION: &str = "repartition.manifest.stats";
use super::group::{GroupRollbackRecord, RepartitionGroupProcedure};
use crate::procedure::repartition::plan::PlanGroupId;
/// Track the overall manifest stage for a repartition group.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum ManifestStatus {
#[default]
NotStarted,
Staged,
Published,
Discarded,
Skipped,
Failed,
}
/// Per-group status record that is collected by the top-level procedure.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GroupManifestSummary {
pub group_id: PlanGroupId,
pub status: ManifestStatus,
pub staged_region_count: u64,
pub stats: Option<Value>,
pub error: Option<String>,
}
/// Shared context that allows group procedures to interact with metadata and
/// datanodes. It also aggregates per-group manifest summaries.
#[derive(Clone)]
pub struct RepartitionContext {
pub table_metadata_manager: TableMetadataManagerRef,
pub node_manager: NodeManagerRef,
manifest_records: Arc<Mutex<HashMap<PlanGroupId, GroupManifestSummary>>>,
rollback_records: Arc<Mutex<HashMap<PlanGroupId, GroupRollbackRecord>>>,
}
impl RepartitionContext {
pub fn new(context: &DdlContext) -> Self {
Self {
table_metadata_manager: context.table_metadata_manager.clone(),
node_manager: context.node_manager.clone(),
manifest_records: Arc::new(Mutex::new(HashMap::new())),
rollback_records: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Send a pause request to the region leader so that local IO is quiesced.
pub async fn pause_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
info!(
"requesting pause to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::Pause(PauseRequest {
region_id: region_id.as_u64(),
})),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to pause region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Resume a previously paused region.
pub async fn resume_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
info!(
"requesting resume to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::Resume(ResumeRequest {
region_id: region_id.as_u64(),
rule_version: String::new(),
})),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to resume region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Stage the provided rule version on the datanode.
pub async fn stage_region_rule_on_datanode(
&self,
peer: &Peer,
region_id: RegionId,
rule_version: &str,
) -> Result<()> {
info!(
"requesting region rule staging to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::StageRegionRule(
StageRegionRuleRequest {
region_id: region_id.as_u64(),
rule_version: rule_version.to_string(),
},
)),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to stage region rule for region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Publish the staged rule version to make it active.
pub async fn publish_region_rule_on_datanode(
&self,
peer: &Peer,
region_id: RegionId,
rule_version: &str,
) -> Result<()> {
info!(
"requesting region rule publish to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::PublishRegionRule(
PublishRegionRuleRequest {
region_id: region_id.as_u64(),
rule_version: rule_version.to_string(),
},
)),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to publish region rule for region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Drop the staged rule version during rollback.
pub async fn clear_region_rule_stage_on_datanode(
&self,
peer: &Peer,
region_id: RegionId,
) -> Result<()> {
info!(
"requesting region rule stage clear to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::StageRegionRule(
StageRegionRuleRequest {
region_id: region_id.as_u64(),
rule_version: String::new(),
},
)),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to clear staged region rule for region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Instruct the datanode to remap manifests for this group.
pub async fn remap_manifests_on_datanode(
&self,
peer: &Peer,
manifest_request: RemapManifestRequest,
) -> Result<RegionResponse> {
let table_id = manifest_request.table_id;
let group_id = manifest_request.group_id.clone();
info!(
"requesting manifest remap to datanode {} for table {} in group {}",
peer.id, table_id, group_id
);
let datanode = self.node_manager.datanode(peer).await;
let region_request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::RemapManifest(manifest_request)),
};
let response = datanode
.handle(region_request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to remap manifests for group {} on datanode {}",
group_id, peer.id
),
})?;
Ok(response)
}
/// Publish or discard staged manifests.
pub async fn apply_staged_manifests_on_datanode(
&self,
peer: &Peer,
manifest_request: ApplyStagedManifestRequest,
) -> Result<RegionResponse> {
let publish = manifest_request.publish;
let table_id = manifest_request.table_id;
let group_id = manifest_request.group_id.clone();
info!(
"requesting manifest {} on datanode {} for table {} in group {}",
if publish { "publish" } else { "discard" },
peer.id,
table_id,
group_id
);
let datanode = self.node_manager.datanode(peer).await;
let region_request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::ApplyStagedManifest(manifest_request)),
};
let response = datanode
.handle(region_request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to {} staged manifests for group {} on datanode {}",
if publish { "publish" } else { "discard" },
group_id,
peer.id
),
})?;
Ok(response)
}
/// Store the latest manifest summary for a group.
pub fn record_manifest_summary(&self, summary: GroupManifestSummary) {
let mut records = self.manifest_records.lock().unwrap();
records.insert(summary.group_id, summary);
}
pub fn register_group_success(&self, record: GroupRollbackRecord) {
let mut records = self.rollback_records.lock().unwrap();
let group_id = record.group_id;
records.insert(group_id, record);
}
pub async fn rollback_registered_groups(&self) -> Result<()> {
let records: Vec<GroupRollbackRecord> = {
let mut map = self.rollback_records.lock().unwrap();
map.drain().map(|(_, record)| record).collect()
};
let mut first_err: Option<error::Error> = None;
for record in records {
let group_id = record.group_id;
if let Err(err) =
RepartitionGroupProcedure::execute_rollback(self.clone(), record).await
{
error!(err; "repartition: rollback of group {:?} failed", group_id);
if first_err.is_none() {
first_err = Some(err);
}
}
}
if let Some(err) = first_err {
return Err(err);
}
Ok(())
}
pub fn clear_group_records(&self) {
self.rollback_records.lock().unwrap().clear();
}
/// Collect all manifest summaries recorded so far.
pub fn manifest_summaries(&self) -> Vec<GroupManifestSummary> {
let records = self.manifest_records.lock().unwrap();
records.values().cloned().collect()
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::key::table_route::PhysicalTableRouteValue;
use partition::expr::PartitionExpr;
use partition::subtask::RepartitionSubtask;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
/// Identifier of a plan group.
pub type PlanGroupId = Uuid;
/// Logical description of the repartition plan.
///
/// The plan is persisted by the procedure framework so it must remain
/// serializable/deserializable across versions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RepartitionPlan {
pub table_id: TableId,
pub entries: Vec<PlanEntry>,
pub resource_demand: ResourceDemand,
pub route_snapshot: PhysicalTableRouteValue,
}
impl RepartitionPlan {
pub fn new(
table_id: TableId,
entries: Vec<PlanEntry>,
resource_demand: ResourceDemand,
route_snapshot: PhysicalTableRouteValue,
) -> Self {
Self {
table_id,
entries,
resource_demand,
route_snapshot,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PlanEntry {
pub group_id: PlanGroupId,
pub subtask: RepartitionSubtask,
pub sources: Vec<RegionDescriptor>,
pub targets: Vec<RegionDescriptor>,
}
impl PlanEntry {
/// Construct a plan entry consisting of the connected component returned by
/// the planner.
pub fn new(
group_id: PlanGroupId,
subtask: RepartitionSubtask,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
) -> Self {
Self {
group_id,
subtask,
sources,
targets,
}
}
}
/// Metadata describing a region involved in the plan.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionDescriptor {
pub region_id: Option<RegionId>,
pub partition_expr: PartitionExpr,
}
/// Auxiliary information about resources required to execute the plan.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ResourceDemand {}
impl ResourceDemand {
pub fn from_plan_entries(_entries: &[PlanEntry]) -> Self {
// placeholder
Self {}
}
}

View File

@@ -14,12 +14,14 @@
use std::collections::VecDeque;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::expr::PartitionExpr;
use crate::overlap::associate_from_to;
/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RepartitionSubtask {
pub from_expr_indices: Vec<usize>,
pub to_expr_indices: Vec<usize>,

View File

@@ -179,6 +179,30 @@ impl RegionRequest {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::Pause(_) => UnexpectedSnafu {
reason: "Pause request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::Resume(_) => UnexpectedSnafu {
reason: "Resume request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::StageRegionRule(_) => UnexpectedSnafu {
reason: "StageRegionRule request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::PublishRegionRule(_) => UnexpectedSnafu {
reason: "PublishRegionRule request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::RemapManifest(_) => UnexpectedSnafu {
reason: "RemapManifest request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ApplyStagedManifest(_) => UnexpectedSnafu {
reason: "ApplyStagedManifest request should be handled separately by RegionServer",
}
.fail(),
}
}