Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-10-28 04:29:57 +00:00
parent 1286d4ca74
commit 72a6b9ff66
11 changed files with 2094 additions and 362 deletions

10
Cargo.lock generated
View File

@@ -3934,6 +3934,7 @@ dependencies = [
"mito2",
"num_cpus",
"object-store",
"partition",
"prometheus",
"prost 0.13.5",
"query",
@@ -5324,7 +5325,6 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -6922,7 +6922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -9860,7 +9860,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck 0.5.0",
"itertools 0.14.0",
"itertools 0.10.5",
"log",
"multimap",
"once_cell",
@@ -9906,7 +9906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 2.0.106",
@@ -14511,7 +14511,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@@ -147,7 +147,8 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
greptime-proto = { path = "../greptime-proto" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -53,6 +53,7 @@ metric-engine.workspace = true
mito2.workspace = true
num_cpus.workspace = true
object-store.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true

View File

@@ -19,6 +19,7 @@ use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use mito2::remap_manifest::Error as RemapManifestError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
@@ -396,6 +397,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to remap manifests: {}", source))]
RemapManifest {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: RemapManifestError,
},
#[snafu(display("Not yet implemented: {what}"))]
NotYetImplemented { what: String },
}
@@ -469,6 +478,7 @@ impl ErrorExt for Error {
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
RemapManifest { .. } => StatusCode::Unexpected,
}
}

View File

@@ -24,7 +24,9 @@ use api::region::RegionResponse;
use api::v1::meta::TopicStat;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
ApplyStagedManifestRequest, ListMetadataRequest, PauseRequest, PublishRegionRuleRequest,
RegionResponse as RegionResponseV1, RemapManifestRequest, ResumeRequest,
StageRegionRuleRequest, SyncRequest, region_request,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
@@ -84,6 +86,8 @@ use crate::error::{
use crate::event_listener::RegionServerEventListenerRef;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
const REMAP_STATS_EXTENSION_KEY: &str = "repartition.manifest.stats";
#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
@@ -370,6 +374,24 @@ impl RegionServer {
}
}
/// Temporarily pauses compaction and snapshot related activities for the region.
///
/// Currently a stub; real implementation will coordinate with region worker.
pub async fn pause_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
info!("pause_compaction_and_snapshot stub invoked for region {region_id}");
let _ = region_id;
Ok(())
}
/// Resumes compaction and snapshot related activities for the region.
///
/// Currently a stub; real implementation will coordinate with region worker.
pub async fn resume_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
info!("resume_compaction_and_snapshot stub invoked for region {region_id}");
let _ = region_id;
Ok(())
}
/// Stop the region server.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
@@ -538,6 +560,124 @@ impl RegionServer {
Ok(response)
}
async fn handle_pause_region_request(&self, request: &PauseRequest) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
let tracing_context = TracingContext::from_current_span();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_pause_region_request",
region_id = region_id.to_string()
));
self.pause_compaction_and_snapshot(region_id)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
}
async fn handle_resume_region_request(
&self,
request: &ResumeRequest,
) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
let tracing_context = TracingContext::from_current_span();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_resume_region_request",
region_id = region_id.to_string()
));
self.resume_compaction_and_snapshot(region_id)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
}
async fn handle_stage_region_rule_request(
&self,
request: &StageRegionRuleRequest,
) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
info!(
"Stage region rule for region {region_id} with version {}",
request.rule_version
);
match self
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await?
{
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
Ok(RegionResponse::new(AffectedRows::default()))
}
SetRegionRoleStateResponse::InvalidTransition(err) => {
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
}
}
}
async fn handle_publish_region_rule_request(
&self,
request: &PublishRegionRuleRequest,
) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
info!(
"Publish region rule for region {region_id} with version {}",
request.rule_version
);
match self
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await?
{
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
Ok(RegionResponse::new(AffectedRows::default()))
}
SetRegionRoleStateResponse::InvalidTransition(err) => {
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
}
}
}
async fn handle_remap_manifest_request(
&self,
request: &RemapManifestRequest,
) -> Result<RegionResponse> {
info!(
"received remap manifest request for table {} group {}",
request.table_id, request.group_id
);
let stats_json = serde_json::to_vec(&serde_json::json!({
"files_per_region": HashMap::<u64, usize>::new(),
"total_file_refs": 0u64,
"empty_regions": Vec::<u64>::new(),
"group_id": &request.group_id,
}))
.context(SerializeJsonSnafu)?;
let mut extensions = HashMap::new();
extensions.insert(REMAP_STATS_EXTENSION_KEY.to_string(), stats_json);
Ok(RegionResponse {
affected_rows: 0,
extensions,
metadata: Vec::new(),
})
}
async fn handle_apply_staged_manifest_request(
&self,
request: &ApplyStagedManifestRequest,
) -> Result<RegionResponse> {
info!(
"received manifest apply request for table {} group {} publish={} regions {:?}",
request.table_id, request.group_id, request.publish, request.region_ids
);
Ok(RegionResponse {
affected_rows: 0,
extensions: HashMap::new(),
metadata: Vec::new(),
})
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
region_request::Body::Pause(pause_request) => {
self.handle_pause_region_request(pause_request).await
}
region_request::Body::Resume(resume_request) => {
self.handle_resume_region_request(resume_request).await
}
region_request::Body::StageRegionRule(stage_request) => {
self.handle_stage_region_rule_request(stage_request).await
}
region_request::Body::PublishRegionRule(publish_request) => {
self.handle_publish_region_rule_request(publish_request)
.await
}
region_request::Body::RemapManifest(remap_request) => {
self.handle_remap_manifest_request(remap_request).await
}
region_request::Body::ApplyStagedManifest(apply_request) => {
self.handle_apply_staged_manifest_request(apply_request)
.await
}
region_request::Body::ListMetadata(list_metadata_request) => {
self.handle_list_metadata_request(list_metadata_request)
.await

View File

@@ -17,12 +17,14 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::DatanodeId;
use common_procedure::ProcedureId;
use common_runtime::JoinError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use uuid::Uuid;
use crate::metasrv::SelectTarget;
use crate::pubsub::Message;
@@ -774,6 +776,129 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create repartition subtasks"))]
RepartitionCreateSubtasks {
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize partition expression"))]
RepartitionSerializePartitionExpr {
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Source partition expression '{}' does not match any existing region",
expr
))]
RepartitionSourceExprMismatch {
expr: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} is missing a source region id", group_id))]
RepartitionMissingSourceRegionId {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} cannot find route for source region {}",
group_id,
region_id
))]
RepartitionSourceRegionRouteMissing {
group_id: Uuid,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} has no source regions after planning", group_id))]
RepartitionNoSourceRegions {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} target {} is missing an allocated region id",
group_id,
target_index
))]
RepartitionMissingTargetRegionId {
group_id: Uuid,
target_index: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Route for target region {} not found", region_id))]
RepartitionTargetRegionRouteMissing {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} is missing prepare context", group_id))]
RepartitionMissingPrepareContext {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition group {} has no registered subprocedure", group_id))]
RepartitionSubprocedureUnknown {
group_id: Uuid,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to fetch state for repartition group {} subprocedure {}",
group_id,
procedure_id
))]
RepartitionSubprocedureStateFetch {
group_id: Uuid,
procedure_id: ProcedureId,
#[snafu(source)]
source: common_procedure::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} subprocedure {} state missing",
group_id,
procedure_id
))]
RepartitionSubprocedureStateMissing {
group_id: Uuid,
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} subprocedure {} failed: {}",
group_id,
procedure_id,
reason
))]
RepartitionSubprocedureFailed {
group_id: Uuid,
procedure_id: ProcedureId,
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported operation {}", operation))]
Unsupported {
operation: String,
@@ -997,6 +1122,11 @@ impl Error {
matches!(self, Error::RetryLater { .. })
|| matches!(self, Error::RetryLaterWithSource { .. })
}
/// Returns `true` if the error requires cleaning poison records.
pub fn need_clean_poisons(&self) -> bool {
false
}
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -1012,6 +1142,8 @@ impl ErrorExt for Error {
| Error::TcpBind { .. }
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::RepartitionCreateSubtasks { .. }
| Error::RepartitionSerializePartitionExpr { .. }
| Error::NoLeader { .. }
| Error::LeaderLeaseExpired { .. }
| Error::LeaderLeaseChanged { .. }
@@ -1032,7 +1164,8 @@ impl ErrorExt for Error {
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildPartitionClient { .. }
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
| Error::BuildKafkaClient { .. }
| Error::RepartitionSubprocedureStateFetch { .. } => StatusCode::Internal,
Error::DeleteRecords { .. }
| Error::GetOffset { .. }
@@ -1066,7 +1199,14 @@ impl ErrorExt for Error {
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. }
| Error::HandlerNotFound { .. }
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
| Error::LeaderPeerChanged { .. }
| Error::RepartitionSourceExprMismatch { .. }
| Error::RepartitionMissingSourceRegionId { .. }
| Error::RepartitionSourceRegionRouteMissing { .. }
| Error::RepartitionNoSourceRegions { .. }
| Error::RepartitionMissingTargetRegionId { .. }
| Error::RepartitionTargetRegionRouteMissing { .. }
| Error::RepartitionMissingPrepareContext { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }
@@ -1080,7 +1220,10 @@ impl ErrorExt for Error {
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. }
| Error::RegionMigrated { .. } => StatusCode::Unexpected,
| Error::RegionMigrated { .. }
| Error::RepartitionSubprocedureUnknown { .. }
| Error::RepartitionSubprocedureStateMissing { .. }
| Error::RepartitionSubprocedureFailed { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. }

View File

@@ -16,28 +16,41 @@ mod context;
mod group;
mod plan;
use common_catalog::format_full_table_name;
use std::collections::HashMap;
use common_meta::ddl::DdlContext;
use common_meta::ddl::utils::map_to_procedure_error;
use common_meta::error::{self, Result as MetaResult};
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock};
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status};
use common_procedure::error::{Error as ProcedureError, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, ProcedureId, ProcedureWithId, Status,
};
use common_telemetry::error;
use partition::expr::PartitionExpr;
use partition::subtask;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
use serde_json;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use strum::AsRefStr;
use table::table_reference::TableReference;
use uuid::Uuid;
use self::context::RepartitionContext;
use self::context::{GroupManifestSummary, RepartitionContext};
use self::group::RepartitionGroupProcedure;
use self::plan::{
PartitionRuleDiff, PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand,
};
use self::plan::{PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand};
use crate::error::{self, Result};
/// Task payload passed from the DDL entry point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionTask {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Partition expressions representing the source regions.
pub from_exprs: Vec<PartitionExpr>,
/// Partition expressions representing the target regions.
pub into_exprs: Vec<PartitionExpr>,
}
/// Procedure that orchestrates the repartition flow.
pub struct RepartitionProcedure {
@@ -49,7 +62,8 @@ pub struct RepartitionProcedure {
impl RepartitionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
pub fn new(task: RepartitionTask, context: DdlContext) -> MetaResult<Self> {
/// Constructs a new procedure instance from a task payload.
pub fn new(task: RepartitionTask, context: DdlContext) -> Result<Self> {
let group_context = RepartitionContext::new(&context);
Ok(Self {
context,
@@ -58,17 +72,8 @@ impl RepartitionProcedure {
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?;
let group_context = RepartitionContext::new(&context);
Ok(Self {
context,
group_context,
data,
})
}
async fn on_prepare(&mut self) -> MetaResult<Status> {
/// Builds the repartition plan if we have not done so yet.
async fn on_prepare(&mut self) -> Result<Status> {
if self.data.plan.is_none() {
self.build_plan().await?;
}
@@ -77,15 +82,14 @@ impl RepartitionProcedure {
Ok(Status::executing(true))
}
async fn on_allocate_resources(&mut self) -> MetaResult<Status> {
/// Allocates target regions and decides whether the procedure can proceed.
async fn on_allocate_resources(&mut self) -> Result<Status> {
if !self.data.resource_allocated {
let demand = self.data.resource_demand.unwrap_or_default();
let allocated = self.allocate_resources(demand).await?;
let allocated = self.allocate_resources().await?;
if !allocated {
if let Some(plan) = &self.data.plan {
self.data
.failed_groups
.extend(plan.entries.iter().map(|entry| entry.group_id));
let failed_groups = plan.entries.iter().map(|entry| entry.group_id);
self.data.failed_groups.extend(failed_groups);
}
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
@@ -97,7 +101,8 @@ impl RepartitionProcedure {
Ok(Status::executing(true))
}
async fn on_dispatch_subprocedures(&mut self) -> MetaResult<Status> {
/// Spawns group subprocedures for every pending plan entry.
async fn on_dispatch_subprocedures(&mut self) -> Result<Status> {
let plan = match self.data.plan.as_ref() {
Some(plan) => plan,
None => {
@@ -106,243 +111,289 @@ impl RepartitionProcedure {
}
};
let groups_to_schedule: Vec<PlanGroupId> = plan
let entries_to_schedule: Vec<PlanEntry> = plan
.entries
.iter()
.filter(|entry| {
!self.data.succeeded_groups.contains(&entry.group_id)
&& !self.data.failed_groups.contains(&entry.group_id)
})
.map(|entry| entry.group_id)
.cloned()
.collect();
if groups_to_schedule.is_empty() {
if entries_to_schedule.is_empty() {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
let subprocedures = self.spawn_group_procedures(plan, &groups_to_schedule);
let groups_to_schedule: Vec<PlanGroupId> = entries_to_schedule
.iter()
.map(|entry| entry.group_id)
.collect();
let subprocedures = self.spawn_group_procedures(
plan.table_id,
plan.route_snapshot.clone(),
entries_to_schedule,
);
self.data.pending_groups = groups_to_schedule;
self.data.state = RepartitionState::CollectSubprocedures;
Ok(Status::suspended(subprocedures, true))
}
async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> MetaResult<Status> {
self.data
.succeeded_groups
.append(&mut self.data.pending_groups);
/// Records the list of subprocedures that finished and move to finalisation.
async fn on_collect_subprocedures(&mut self, ctx: &ProcedureContext) -> Result<Status> {
let pending = std::mem::take(&mut self.data.pending_groups);
let mut first_error: Option<error::Error> = None;
let mut succeeded = Vec::new();
for group_id in pending {
let procedure_id = match self.data.group_subprocedures.remove(&group_id) {
Some(id) => id,
None => {
let err = error::RepartitionSubprocedureUnknownSnafu { group_id }.build();
self.data.failed_groups.push(group_id);
if first_error.is_none() {
first_error = Some(err);
}
continue;
}
};
let state_opt = ctx.provider.procedure_state(procedure_id).await.context(
error::RepartitionSubprocedureStateFetchSnafu {
group_id,
procedure_id,
},
)?;
let state = match state_opt {
Some(state) => state,
None => {
let err = error::RepartitionSubprocedureStateMissingSnafu {
group_id,
procedure_id,
}
.build();
self.data.failed_groups.push(group_id);
if first_error.is_none() {
first_error = Some(err);
}
continue;
}
};
if state.is_done() {
succeeded.push(group_id);
continue;
}
let reason = state
.error()
.map(|err| err.to_string())
.unwrap_or_else(|| format!("subprocedure state {}", state.as_str_name()));
let err = error::RepartitionSubprocedureFailedSnafu {
group_id,
procedure_id,
reason,
}
.build();
self.data.failed_groups.push(group_id);
if first_error.is_none() {
first_error = Some(err);
}
}
self.data.succeeded_groups.extend(succeeded);
self.data.state = RepartitionState::Finalize;
if let Some(err) = first_error {
return Err(err);
}
Ok(Status::executing(true))
}
async fn on_finalize(&mut self) -> MetaResult<Status> {
/// Builds the summary that will be returned to the caller.
async fn on_finalize(&mut self) -> Result<Status> {
self.deallocate_resources().await?;
self.data.summary = Some(RepartitionSummary {
succeeded_groups: self.data.succeeded_groups.clone(),
failed_groups: self.data.failed_groups.clone(),
manifest_summaries: self.group_context.manifest_summaries(),
});
self.group_context.clear_group_records();
self.data.state = RepartitionState::Finished;
Ok(Status::done())
}
async fn build_plan(&mut self) -> MetaResult<()> {
/// Constructs the repartition plan from the task specification.
async fn build_plan(&mut self) -> Result<()> {
let table_id = self.data.task.table_id;
let table_ref = self.data.task.table_ref();
let table_name_str =
format_full_table_name(table_ref.catalog, table_ref.schema, table_ref.table);
self.context
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.ok_or_else(|| {
error::TableNotFoundSnafu {
table_name: table_name_str.clone(),
}
.build()
})?;
let from_exprs = &self.data.task.from_exprs;
let into_exprs = &self.data.task.into_exprs;
let (physical_table_id, physical_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
.await
.context(error::TableMetadataManagerSnafu)?;
let from_exprs_json = self.data.task.from_exprs_json.clone();
let into_exprs_json = self.data.task.into_exprs_json.clone();
let src_descriptors = Self::source_region_descriptors(from_exprs, &physical_route)?;
let subtasks = subtask::create_subtasks(from_exprs, into_exprs)
.context(error::RepartitionCreateSubtasksSnafu)?;
let entries = Self::build_plan_entries(subtasks, &src_descriptors, into_exprs);
let from_exprs = Self::deserialize_partition_exprs(&from_exprs_json)?;
let into_exprs = Self::deserialize_partition_exprs(&into_exprs_json)?;
let demand = ResourceDemand::from_plan_entries(&entries);
let plan = RepartitionPlan::new(physical_table_id, entries, demand, physical_route.clone());
self.data.plan = Some(plan);
Ok(())
}
fn source_region_descriptors(
from_exprs: &[PartitionExpr],
physical_route: &PhysicalTableRouteValue,
) -> Result<Vec<RegionDescriptor>> {
let existing_regions = physical_route
.region_routes
.iter()
.map(|route| (route.region.id, route.region.partition_expr()))
.collect::<Vec<_>>();
let mut used_regions = vec![false; existing_regions.len()];
let mut source_descriptor_by_index = Vec::with_capacity(from_exprs_json.len());
for expr_json in &from_exprs_json {
let mut found = None;
for (idx, (region_id, expr)) in existing_regions.iter().enumerate() {
if !used_regions[idx] && expr == expr_json {
found = Some((*region_id, expr.clone()));
used_regions[idx] = true;
break;
}
}
let (region_id, partition_expr_json) = found.ok_or_else(|| {
error::UnsupportedSnafu {
operation: format!(
"repartition source expression '{}' does not match any existing region",
expr_json
),
}
.build()
})?;
source_descriptor_by_index.push(RegionDescriptor {
region_id: Some(region_id),
partition_expr_json,
});
}
let subtasks = subtask::create_subtasks(&from_exprs, &into_exprs).map_err(|err| {
error::UnsupportedSnafu {
operation: format!("create_subtasks failed: {err}"),
}
.build()
})?;
let mut plan = RepartitionPlan::empty(physical_table_id);
let mut diff = PartitionRuleDiff::default();
let mut demand = ResourceDemand::default();
for subtask in subtasks {
let group_id = Uuid::new_v4();
let sources = subtask
.from_expr_indices
.iter()
.map(|&idx| source_descriptor_by_index[idx].clone())
.collect::<Vec<_>>();
let targets = subtask
.to_expr_indices
.iter()
.enumerate()
.map(|(position, &idx)| {
let reused_region = if subtask.from_expr_indices.len() == 1 {
if position == 0 {
sources.get(0).and_then(|descriptor| descriptor.region_id)
} else {
None
}
} else if subtask.to_expr_indices.len() == 1 {
sources.first().and_then(|descriptor| descriptor.region_id)
} else {
sources
.get(position)
.and_then(|descriptor| descriptor.region_id)
};
RegionDescriptor {
region_id: reused_region,
partition_expr_json: into_exprs_json[idx].clone(),
}
})
.collect::<Vec<_>>();
let entry = PlanEntry::new(group_id, subtask, sources, targets);
demand.add_entry(&entry);
diff.entries.push(group_id);
plan.entries.push(entry);
}
plan.resource_demand = demand;
plan.route_snapshot = physical_route.clone();
plan.plan_hash = format!(
"{}:{}->{}",
physical_table_id,
Self::expr_signature(&from_exprs_json),
Self::expr_signature(&into_exprs_json)
);
self.data.plan = Some(plan);
self.data.rule_diff = Some(diff);
self.data.resource_demand = Some(demand);
Ok(())
}
async fn allocate_resources(&self, _demand: ResourceDemand) -> MetaResult<bool> {
Ok(true)
}
fn spawn_group_procedures(
&self,
plan: &RepartitionPlan,
group_ids: &[PlanGroupId],
) -> Vec<ProcedureWithId> {
group_ids
let descriptors = from_exprs
.iter()
.filter_map(|group_id| {
plan.entries
.map(|expr| {
let expr_json = expr
.as_json_str()
.context(error::RepartitionSerializePartitionExprSnafu)?;
let matched_region_id = existing_regions
.iter()
.find(|entry| entry.group_id == *group_id)
.cloned()
.find_map(|(region_id, existing_expr)| {
(existing_expr == &expr_json).then_some(*region_id)
})
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
expr: expr_json,
})?;
Ok(RegionDescriptor {
region_id: Some(matched_region_id),
partition_expr: expr.clone(),
})
})
.map(|entry| {
let route_snapshot = plan.route_snapshot.region_routes.clone();
ProcedureWithId::with_random_id(Box::new(RepartitionGroupProcedure::new(
entry,
plan.table_id,
route_snapshot,
self.group_context.clone(),
)))
})
.collect()
.collect::<Result<Vec<_>>>()?;
Ok(descriptors)
}
fn build_plan_entries(
subtasks: Vec<RepartitionSubtask>,
source_index: &[RegionDescriptor],
target_exprs: &[PartitionExpr],
) -> Vec<PlanEntry> {
let plan_entries = subtasks
.into_iter()
.map(|subtask| {
let group_id = Uuid::new_v4();
let sources = subtask
.from_expr_indices
.iter()
.map(|&idx| source_index[idx].clone())
.collect::<Vec<_>>();
let targets = subtask
.to_expr_indices
.iter()
.map(|&idx| RegionDescriptor {
region_id: None, // will be assigned later
partition_expr: target_exprs[idx].clone(),
})
.collect::<Vec<_>>();
PlanEntry::new(group_id, subtask, sources, targets)
})
.collect::<Vec<_>>();
plan_entries
}
/// Allocates resources required by the plan. Returning `false`
/// indicates that the procedure should abort.
async fn allocate_resources(&mut self) -> Result<bool> {
todo!("allocate resources");
}
async fn deallocate_resources(&mut self) -> Result<()> {
if !self.data.resource_allocated {
return Ok(());
}
self.data.resource_allocated = false;
todo!("deallocate resources");
}
/// Builds the child procedure list for the provided plan groups.
fn spawn_group_procedures(
&mut self,
table_id: TableId,
route_snapshot: PhysicalTableRouteValue,
entries: Vec<PlanEntry>,
) -> Vec<ProcedureWithId> {
let mut id_map = HashMap::new();
let procedures = entries
.into_iter()
.map(|entry| {
let group_id = entry.group_id;
let group_procedure = RepartitionGroupProcedure::new(
entry,
table_id,
route_snapshot.clone(),
self.data.task.catalog_name.clone(),
self.data.task.schema_name.clone(),
self.group_context.clone(),
);
let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
id_map.insert(group_id, procedure.id);
procedure
})
.collect::<Vec<_>>();
self.data.group_subprocedures = id_map;
procedures
}
/// Composes the set of locks required to safely mutate table metadata.
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
let mut lock_key = Vec::with_capacity(3);
let table_ref = self.data.task.table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
let catalog = self.data.task.catalog_name.as_str();
let schema = self.data.task.schema_name.as_str();
lock_key.push(CatalogLock::Read(catalog).into());
lock_key.push(SchemaLock::read(catalog, schema).into());
lock_key.push(TableLock::Write(self.data.task.table_id).into());
lock_key
}
fn deserialize_partition_exprs(exprs_json: &[String]) -> MetaResult<Vec<PartitionExpr>> {
exprs_json
.iter()
.map(|json| {
let expr = PartitionExpr::from_json_str(json).map_err(|err| {
error::UnsupportedSnafu {
operation: format!(
"deserialize partition expression '{json}' failed: {err}"
),
}
.build()
})?;
async fn trigger_group_rollbacks(&mut self) {
if self.data.rollback_triggered {
return;
}
expr.ok_or_else(|| {
error::UnsupportedSnafu {
operation: format!("empty partition expression '{json}'"),
}
.build()
})
})
.collect()
}
fn expr_signature(exprs: &[String]) -> String {
exprs.join("|")
match self.group_context.rollback_registered_groups().await {
Ok(_) => {
self.data.rollback_triggered = true;
}
Err(err) => {
error!(err; "repartition: rollback of successful groups failed");
self.data.rollback_triggered = true;
}
}
}
}
@@ -363,7 +414,16 @@ impl Procedure for RepartitionProcedure {
RepartitionState::Finished => Ok(Status::done()),
};
status.map_err(map_to_procedure_error)
match status {
Ok(status) => Ok(status),
Err(err) => {
self.trigger_group_rollbacks().await;
if let Err(dealloc_err) = self.deallocate_resources().await {
error!(dealloc_err; "repartition: deallocating resources after failure failed");
}
Err(map_repartition_error(err))
}
}
}
fn dump(&self) -> ProcedureResult<String> {
@@ -383,10 +443,6 @@ struct RepartitionData {
#[serde(default)]
plan: Option<RepartitionPlan>,
#[serde(default)]
rule_diff: Option<PartitionRuleDiff>,
#[serde(default)]
resource_demand: Option<ResourceDemand>,
#[serde(default)]
resource_allocated: bool,
#[serde(default)]
pending_groups: Vec<PlanGroupId>,
@@ -396,25 +452,39 @@ struct RepartitionData {
failed_groups: Vec<PlanGroupId>,
#[serde(default)]
summary: Option<RepartitionSummary>,
#[serde(default)]
rollback_triggered: bool,
#[serde(default)]
group_subprocedures: HashMap<PlanGroupId, ProcedureId>,
}
impl RepartitionData {
/// Initialise the procedure data for a fresh run.
fn new(task: RepartitionTask) -> Self {
Self {
state: RepartitionState::Prepare,
task,
plan: None,
rule_diff: None,
resource_demand: None,
resource_allocated: false,
pending_groups: Vec::new(),
succeeded_groups: Vec::new(),
failed_groups: Vec::new(),
summary: None,
rollback_triggered: false,
group_subprocedures: HashMap::new(),
}
}
}
pub(super) fn map_repartition_error(err: error::Error) -> ProcedureError {
match (err.is_retryable(), err.need_clean_poisons()) {
(true, true) => ProcedureError::retry_later_and_clean_poisons(err),
(true, false) => ProcedureError::retry_later(err),
(false, true) => ProcedureError::external_and_clean_poisons(err),
(false, false) => ProcedureError::external(err),
}
}
/// High level states of the repartition procedure.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
enum RepartitionState {
@@ -426,31 +496,11 @@ enum RepartitionState {
Finished,
}
/// Information returned to the caller after the procedure finishes.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RepartitionSummary {
succeeded_groups: Vec<PlanGroupId>,
failed_groups: Vec<PlanGroupId>,
}
/// Task payload passed from the DDL entry point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionTask {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Partition expressions of source regions (JSON encoded `PartitionExpr`).
pub from_exprs_json: Vec<String>,
/// Partition expressions of target regions (JSON encoded `PartitionExpr`).
pub into_exprs_json: Vec<String>,
}
impl RepartitionTask {
fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.table_name,
}
}
#[serde(default)]
manifest_summaries: Vec<GroupManifestSummary>,
}

View File

@@ -12,21 +12,340 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use api::region::RegionResponse;
use api::v1::region::{
ApplyStagedManifestRequest, PauseRequest, PublishRegionRuleRequest, RegionRequest,
RegionRequestHeader, RemapManifestRequest, ResumeRequest, StageRegionRuleRequest,
region_request,
};
use common_error::ext::BoxedError;
use common_meta::ddl::DdlContext;
use common_meta::key::TableMetadataManagerRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::{self, Result};
pub const REMAP_MANIFEST_STATS_EXTENSION: &str = "repartition.manifest.stats";
use super::group::{GroupRollbackRecord, RepartitionGroupProcedure};
use crate::procedure::repartition::plan::PlanGroupId;
/// Track the overall manifest stage for a repartition group.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum ManifestStatus {
#[default]
NotStarted,
Staged,
Published,
Discarded,
Skipped,
Failed,
}
/// Per-group status record that is collected by the top-level procedure.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GroupManifestSummary {
pub group_id: PlanGroupId,
pub status: ManifestStatus,
pub staged_region_count: u64,
pub stats: Option<Value>,
pub error: Option<String>,
}
/// Shared context that allows group procedures to interact with metadata and
/// datanodes. It also aggregates per-group manifest summaries.
#[derive(Clone)]
pub struct RepartitionContext {
pub table_metadata_manager: TableMetadataManagerRef,
pub _node_manager: NodeManagerRef,
pub node_manager: NodeManagerRef,
manifest_records: Arc<Mutex<HashMap<PlanGroupId, GroupManifestSummary>>>,
rollback_records: Arc<Mutex<HashMap<PlanGroupId, GroupRollbackRecord>>>,
}
impl RepartitionContext {
pub fn new(context: &DdlContext) -> Self {
Self {
table_metadata_manager: context.table_metadata_manager.clone(),
_node_manager: context.node_manager.clone(),
node_manager: context.node_manager.clone(),
manifest_records: Arc::new(Mutex::new(HashMap::new())),
rollback_records: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Send a pause request to the region leader so that local IO is quiesced.
pub async fn pause_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
info!(
"requesting pause to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::Pause(PauseRequest {
region_id: region_id.as_u64(),
})),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to pause region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Resume a previously paused region.
pub async fn resume_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
info!(
"requesting resume to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::Resume(ResumeRequest {
region_id: region_id.as_u64(),
rule_version: String::new(),
})),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to resume region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Stage the provided rule version on the datanode.
pub async fn stage_region_rule_on_datanode(
&self,
peer: &Peer,
region_id: RegionId,
rule_version: &str,
) -> Result<()> {
info!(
"requesting region rule staging to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::StageRegionRule(
StageRegionRuleRequest {
region_id: region_id.as_u64(),
rule_version: rule_version.to_string(),
},
)),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to stage region rule for region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Publish the staged rule version to make it active.
pub async fn publish_region_rule_on_datanode(
&self,
peer: &Peer,
region_id: RegionId,
rule_version: &str,
) -> Result<()> {
info!(
"requesting region rule publish to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::PublishRegionRule(
PublishRegionRuleRequest {
region_id: region_id.as_u64(),
rule_version: rule_version.to_string(),
},
)),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to publish region rule for region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Drop the staged rule version during rollback.
pub async fn clear_region_rule_stage_on_datanode(
&self,
peer: &Peer,
region_id: RegionId,
) -> Result<()> {
info!(
"requesting region rule stage clear to datanode {} for region {}",
peer.id, region_id
);
let datanode = self.node_manager.datanode(peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::StageRegionRule(
StageRegionRuleRequest {
region_id: region_id.as_u64(),
rule_version: String::new(),
},
)),
};
datanode
.handle(request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to clear staged region rule for region {} on datanode {}",
region_id, peer.id
),
})?;
Ok(())
}
/// Instruct the datanode to remap manifests for this group.
pub async fn remap_manifests_on_datanode(
&self,
peer: &Peer,
manifest_request: RemapManifestRequest,
) -> Result<RegionResponse> {
let table_id = manifest_request.table_id;
let group_id = manifest_request.group_id.clone();
info!(
"requesting manifest remap to datanode {} for table {} in group {}",
peer.id, table_id, group_id
);
let datanode = self.node_manager.datanode(peer).await;
let region_request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::RemapManifest(manifest_request)),
};
let response = datanode
.handle(region_request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to remap manifests for group {} on datanode {}",
group_id, peer.id
),
})?;
Ok(response)
}
/// Publish or discard staged manifests.
pub async fn apply_staged_manifests_on_datanode(
&self,
peer: &Peer,
manifest_request: ApplyStagedManifestRequest,
) -> Result<RegionResponse> {
let publish = manifest_request.publish;
let table_id = manifest_request.table_id;
let group_id = manifest_request.group_id.clone();
info!(
"requesting manifest {} on datanode {} for table {} in group {}",
if publish { "publish" } else { "discard" },
peer.id,
table_id,
group_id
);
let datanode = self.node_manager.datanode(peer).await;
let region_request = RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::ApplyStagedManifest(manifest_request)),
};
let response = datanode
.handle(region_request)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"failed to {} staged manifests for group {} on datanode {}",
if publish { "publish" } else { "discard" },
group_id,
peer.id
),
})?;
Ok(response)
}
/// Store the latest manifest summary for a group.
pub fn record_manifest_summary(&self, summary: GroupManifestSummary) {
let mut records = self.manifest_records.lock().unwrap();
records.insert(summary.group_id, summary);
}
pub fn register_group_success(&self, record: GroupRollbackRecord) {
let mut records = self.rollback_records.lock().unwrap();
let group_id = record.group_id;
records.insert(group_id, record);
}
pub async fn rollback_registered_groups(&self) -> Result<()> {
let records: Vec<GroupRollbackRecord> = {
let mut map = self.rollback_records.lock().unwrap();
map.drain().map(|(_, record)| record).collect()
};
let mut first_err: Option<error::Error> = None;
for record in records {
let group_id = record.group_id;
if let Err(err) =
RepartitionGroupProcedure::execute_rollback(self.clone(), record).await
{
error!(err; "repartition: rollback of group {:?} failed", group_id);
if first_err.is_none() {
first_err = Some(err);
}
}
}
if let Some(err) = first_err {
return Err(err);
}
Ok(())
}
pub fn clear_group_records(&self) {
self.rollback_records.lock().unwrap().clear();
}
/// Collect all manifest summaries recorded so far.
pub fn manifest_summaries(&self) -> Vec<GroupManifestSummary> {
let records = self.manifest_records.lock().unwrap();
records.values().cloned().collect()
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_meta::key::table_route::PhysicalTableRouteValue;
use partition::expr::PartitionExpr;
use partition::subtask::RepartitionSubtask;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, TableId};
@@ -28,20 +29,23 @@ pub type PlanGroupId = Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RepartitionPlan {
pub table_id: TableId,
pub plan_hash: String,
pub entries: Vec<PlanEntry>,
pub resource_demand: ResourceDemand,
pub route_snapshot: PhysicalTableRouteValue,
}
impl RepartitionPlan {
pub fn empty(table_id: TableId) -> Self {
pub fn new(
table_id: TableId,
entries: Vec<PlanEntry>,
resource_demand: ResourceDemand,
route_snapshot: PhysicalTableRouteValue,
) -> Self {
Self {
table_id,
plan_hash: String::new(),
entries: Vec::new(),
resource_demand: ResourceDemand::default(),
route_snapshot: PhysicalTableRouteValue::default(),
entries,
resource_demand,
route_snapshot,
}
}
}
@@ -55,6 +59,8 @@ pub struct PlanEntry {
}
impl PlanEntry {
/// Construct a plan entry consisting of the connected component returned by
/// the planner.
pub fn new(
group_id: PlanGroupId,
subtask: RepartitionSubtask,
@@ -68,33 +74,22 @@ impl PlanEntry {
targets,
}
}
pub fn implied_new_regions(&self) -> u32 {
self.targets
.iter()
.filter(|target| target.region_id.is_none())
.count() as u32
}
}
/// Metadata describing a region involved in the plan.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionDescriptor {
pub region_id: Option<RegionId>,
pub partition_expr_json: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct PartitionRuleDiff {
pub entries: Vec<PlanGroupId>,
pub partition_expr: PartitionExpr,
}
/// Auxiliary information about resources required to execute the plan.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ResourceDemand {
pub new_regions: u32,
}
pub struct ResourceDemand {}
impl ResourceDemand {
pub fn add_entry(&mut self, entry: &PlanEntry) {
self.new_regions = self.new_regions.saturating_add(entry.implied_new_regions());
pub fn from_plan_entries(_entries: &[PlanEntry]) -> Self {
// placeholder
Self {}
}
}

View File

@@ -179,6 +179,30 @@ impl RegionRequest {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::Pause(_) => UnexpectedSnafu {
reason: "Pause request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::Resume(_) => UnexpectedSnafu {
reason: "Resume request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::StageRegionRule(_) => UnexpectedSnafu {
reason: "StageRegionRule request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::PublishRegionRule(_) => UnexpectedSnafu {
reason: "PublishRegionRule request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::RemapManifest(_) => UnexpectedSnafu {
reason: "RemapManifest request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ApplyStagedManifest(_) => UnexpectedSnafu {
reason: "ApplyStagedManifest request should be handled separately by RegionServer",
}
.fail(),
}
}