mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
7 Commits
feat/inges
...
zhongzc/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
72a6b9ff66 | ||
|
|
1286d4ca74 | ||
|
|
3c1d7fcb89 | ||
|
|
5be4987fd7 | ||
|
|
db11022cff | ||
|
|
15935ee89a | ||
|
|
d0877997a2 |
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -3934,6 +3934,7 @@ dependencies = [
|
||||
"mito2",
|
||||
"num_cpus",
|
||||
"object-store",
|
||||
"partition",
|
||||
"prometheus",
|
||||
"prost 0.13.5",
|
||||
"query",
|
||||
@@ -5324,7 +5325,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
@@ -6922,7 +6922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.52.6",
|
||||
"windows-targets 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7428,6 +7428,7 @@ dependencies = [
|
||||
"local-ip-address",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.4",
|
||||
"partition",
|
||||
"prometheus",
|
||||
"prost 0.13.5",
|
||||
"rand 0.9.1",
|
||||
@@ -9859,7 +9860,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
|
||||
dependencies = [
|
||||
"heck 0.5.0",
|
||||
"itertools 0.14.0",
|
||||
"itertools 0.10.5",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
@@ -9905,7 +9906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.14.0",
|
||||
"itertools 0.10.5",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.106",
|
||||
@@ -14510,7 +14511,7 @@ version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -147,7 +147,8 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
|
||||
# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
|
||||
greptime-proto = { path = "../greptime-proto" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true }
|
||||
tonic.workspace = true
|
||||
tracing.workspace = true
|
||||
typetag.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -53,6 +53,7 @@ metric-engine.workspace = true
|
||||
mito2.workspace = true
|
||||
num_cpus.workspace = true
|
||||
object-store.workspace = true
|
||||
partition.workspace = true
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_error::define_into_tonic_status;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use mito2::remap_manifest::Error as RemapManifestError;
|
||||
use snafu::{Location, Snafu};
|
||||
use store_api::storage::RegionId;
|
||||
use table::error::Error as TableError;
|
||||
@@ -396,6 +397,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to remap manifests: {}", source))]
|
||||
RemapManifest {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
source: RemapManifestError,
|
||||
},
|
||||
|
||||
#[snafu(display("Not yet implemented: {what}"))]
|
||||
NotYetImplemented { what: String },
|
||||
}
|
||||
@@ -469,6 +478,7 @@ impl ErrorExt for Error {
|
||||
|
||||
ObjectStore { source, .. } => source.status_code(),
|
||||
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
|
||||
RemapManifest { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,9 @@ use api::region::RegionResponse;
|
||||
use api::v1::meta::TopicStat;
|
||||
use api::v1::region::sync_request::ManifestInfo;
|
||||
use api::v1::region::{
|
||||
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
|
||||
ApplyStagedManifestRequest, ListMetadataRequest, PauseRequest, PublishRegionRuleRequest,
|
||||
RegionResponse as RegionResponseV1, RemapManifestRequest, ResumeRequest,
|
||||
StageRegionRuleRequest, SyncRequest, region_request,
|
||||
};
|
||||
use api::v1::{ResponseHeader, Status};
|
||||
use arrow_flight::{FlightData, Ticket};
|
||||
@@ -84,6 +86,8 @@ use crate::error::{
|
||||
use crate::event_listener::RegionServerEventListenerRef;
|
||||
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
|
||||
|
||||
const REMAP_STATS_EXTENSION_KEY: &str = "repartition.manifest.stats";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RegionServer {
|
||||
inner: Arc<RegionServerInner>,
|
||||
@@ -370,6 +374,24 @@ impl RegionServer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Temporarily pauses compaction and snapshot related activities for the region.
|
||||
///
|
||||
/// Currently a stub; real implementation will coordinate with region worker.
|
||||
pub async fn pause_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
|
||||
info!("pause_compaction_and_snapshot stub invoked for region {region_id}");
|
||||
let _ = region_id;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resumes compaction and snapshot related activities for the region.
|
||||
///
|
||||
/// Currently a stub; real implementation will coordinate with region worker.
|
||||
pub async fn resume_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
|
||||
info!("resume_compaction_and_snapshot stub invoked for region {region_id}");
|
||||
let _ = region_id;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the region server.
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
self.inner.stop().await
|
||||
@@ -538,6 +560,124 @@ impl RegionServer {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_pause_region_request(&self, request: &PauseRequest) -> Result<RegionResponse> {
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
let tracing_context = TracingContext::from_current_span();
|
||||
let span = tracing_context.attach(info_span!(
|
||||
"RegionServer::handle_pause_region_request",
|
||||
region_id = region_id.to_string()
|
||||
));
|
||||
|
||||
self.pause_compaction_and_snapshot(region_id)
|
||||
.trace(span)
|
||||
.await
|
||||
.map(|_| RegionResponse::new(AffectedRows::default()))
|
||||
}
|
||||
|
||||
async fn handle_resume_region_request(
|
||||
&self,
|
||||
request: &ResumeRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
let tracing_context = TracingContext::from_current_span();
|
||||
let span = tracing_context.attach(info_span!(
|
||||
"RegionServer::handle_resume_region_request",
|
||||
region_id = region_id.to_string()
|
||||
));
|
||||
|
||||
self.resume_compaction_and_snapshot(region_id)
|
||||
.trace(span)
|
||||
.await
|
||||
.map(|_| RegionResponse::new(AffectedRows::default()))
|
||||
}
|
||||
|
||||
async fn handle_stage_region_rule_request(
|
||||
&self,
|
||||
request: &StageRegionRuleRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
info!(
|
||||
"Stage region rule for region {region_id} with version {}",
|
||||
request.rule_version
|
||||
);
|
||||
match self
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
|
||||
.await?
|
||||
{
|
||||
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
|
||||
Ok(RegionResponse::new(AffectedRows::default()))
|
||||
}
|
||||
SetRegionRoleStateResponse::InvalidTransition(err) => {
|
||||
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_publish_region_rule_request(
|
||||
&self,
|
||||
request: &PublishRegionRuleRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
info!(
|
||||
"Publish region rule for region {region_id} with version {}",
|
||||
request.rule_version
|
||||
);
|
||||
match self
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
|
||||
.await?
|
||||
{
|
||||
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
|
||||
Ok(RegionResponse::new(AffectedRows::default()))
|
||||
}
|
||||
SetRegionRoleStateResponse::InvalidTransition(err) => {
|
||||
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_remap_manifest_request(
|
||||
&self,
|
||||
request: &RemapManifestRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
info!(
|
||||
"received remap manifest request for table {} group {}",
|
||||
request.table_id, request.group_id
|
||||
);
|
||||
|
||||
let stats_json = serde_json::to_vec(&serde_json::json!({
|
||||
"files_per_region": HashMap::<u64, usize>::new(),
|
||||
"total_file_refs": 0u64,
|
||||
"empty_regions": Vec::<u64>::new(),
|
||||
"group_id": &request.group_id,
|
||||
}))
|
||||
.context(SerializeJsonSnafu)?;
|
||||
|
||||
let mut extensions = HashMap::new();
|
||||
extensions.insert(REMAP_STATS_EXTENSION_KEY.to_string(), stats_json);
|
||||
|
||||
Ok(RegionResponse {
|
||||
affected_rows: 0,
|
||||
extensions,
|
||||
metadata: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_apply_staged_manifest_request(
|
||||
&self,
|
||||
request: &ApplyStagedManifestRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
info!(
|
||||
"received manifest apply request for table {} group {} publish={} regions {:?}",
|
||||
request.table_id, request.group_id, request.publish, request.region_ids
|
||||
);
|
||||
|
||||
Ok(RegionResponse {
|
||||
affected_rows: 0,
|
||||
extensions: HashMap::new(),
|
||||
metadata: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Sync region manifest and registers new opened logical regions.
|
||||
pub async fn sync_region(
|
||||
&self,
|
||||
@@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer {
|
||||
region_request::Body::Sync(sync_request) => {
|
||||
self.handle_sync_region_request(sync_request).await
|
||||
}
|
||||
region_request::Body::Pause(pause_request) => {
|
||||
self.handle_pause_region_request(pause_request).await
|
||||
}
|
||||
region_request::Body::Resume(resume_request) => {
|
||||
self.handle_resume_region_request(resume_request).await
|
||||
}
|
||||
region_request::Body::StageRegionRule(stage_request) => {
|
||||
self.handle_stage_region_rule_request(stage_request).await
|
||||
}
|
||||
region_request::Body::PublishRegionRule(publish_request) => {
|
||||
self.handle_publish_region_rule_request(publish_request)
|
||||
.await
|
||||
}
|
||||
region_request::Body::RemapManifest(remap_request) => {
|
||||
self.handle_remap_manifest_request(remap_request).await
|
||||
}
|
||||
region_request::Body::ApplyStagedManifest(apply_request) => {
|
||||
self.handle_apply_staged_manifest_request(apply_request)
|
||||
.await
|
||||
}
|
||||
region_request::Body::ListMetadata(list_metadata_request) => {
|
||||
self.handle_list_metadata_request(list_metadata_request)
|
||||
.await
|
||||
|
||||
@@ -45,6 +45,7 @@ common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
common-workload.workspace = true
|
||||
partition.workspace = true
|
||||
dashmap.workspace = true
|
||||
datatypes.workspace = true
|
||||
deadpool = { workspace = true, optional = true }
|
||||
|
||||
@@ -17,12 +17,14 @@ use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_meta::DatanodeId;
|
||||
use common_procedure::ProcedureId;
|
||||
use common_runtime::JoinError;
|
||||
use snafu::{Location, Snafu};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
use tonic::codegen::http;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::metasrv::SelectTarget;
|
||||
use crate::pubsub::Message;
|
||||
@@ -774,6 +776,129 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create repartition subtasks"))]
|
||||
RepartitionCreateSubtasks {
|
||||
source: partition::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize partition expression"))]
|
||||
RepartitionSerializePartitionExpr {
|
||||
source: partition::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Source partition expression '{}' does not match any existing region",
|
||||
expr
|
||||
))]
|
||||
RepartitionSourceExprMismatch {
|
||||
expr: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Repartition group {} is missing a source region id", group_id))]
|
||||
RepartitionMissingSourceRegionId {
|
||||
group_id: Uuid,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Repartition group {} cannot find route for source region {}",
|
||||
group_id,
|
||||
region_id
|
||||
))]
|
||||
RepartitionSourceRegionRouteMissing {
|
||||
group_id: Uuid,
|
||||
region_id: RegionId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Repartition group {} has no source regions after planning", group_id))]
|
||||
RepartitionNoSourceRegions {
|
||||
group_id: Uuid,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Repartition group {} target {} is missing an allocated region id",
|
||||
group_id,
|
||||
target_index
|
||||
))]
|
||||
RepartitionMissingTargetRegionId {
|
||||
group_id: Uuid,
|
||||
target_index: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Route for target region {} not found", region_id))]
|
||||
RepartitionTargetRegionRouteMissing {
|
||||
region_id: RegionId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Repartition group {} is missing prepare context", group_id))]
|
||||
RepartitionMissingPrepareContext {
|
||||
group_id: Uuid,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Repartition group {} has no registered subprocedure", group_id))]
|
||||
RepartitionSubprocedureUnknown {
|
||||
group_id: Uuid,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to fetch state for repartition group {} subprocedure {}",
|
||||
group_id,
|
||||
procedure_id
|
||||
))]
|
||||
RepartitionSubprocedureStateFetch {
|
||||
group_id: Uuid,
|
||||
procedure_id: ProcedureId,
|
||||
#[snafu(source)]
|
||||
source: common_procedure::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Repartition group {} subprocedure {} state missing",
|
||||
group_id,
|
||||
procedure_id
|
||||
))]
|
||||
RepartitionSubprocedureStateMissing {
|
||||
group_id: Uuid,
|
||||
procedure_id: ProcedureId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Repartition group {} subprocedure {} failed: {}",
|
||||
group_id,
|
||||
procedure_id,
|
||||
reason
|
||||
))]
|
||||
RepartitionSubprocedureFailed {
|
||||
group_id: Uuid,
|
||||
procedure_id: ProcedureId,
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported operation {}", operation))]
|
||||
Unsupported {
|
||||
operation: String,
|
||||
@@ -997,6 +1122,11 @@ impl Error {
|
||||
matches!(self, Error::RetryLater { .. })
|
||||
|| matches!(self, Error::RetryLaterWithSource { .. })
|
||||
}
|
||||
|
||||
/// Returns `true` if the error requires cleaning poison records.
|
||||
pub fn need_clean_poisons(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -1012,6 +1142,8 @@ impl ErrorExt for Error {
|
||||
| Error::TcpBind { .. }
|
||||
| Error::SerializeToJson { .. }
|
||||
| Error::DeserializeFromJson { .. }
|
||||
| Error::RepartitionCreateSubtasks { .. }
|
||||
| Error::RepartitionSerializePartitionExpr { .. }
|
||||
| Error::NoLeader { .. }
|
||||
| Error::LeaderLeaseExpired { .. }
|
||||
| Error::LeaderLeaseChanged { .. }
|
||||
@@ -1032,7 +1164,8 @@ impl ErrorExt for Error {
|
||||
| Error::FlowStateHandler { .. }
|
||||
| Error::BuildWalOptionsAllocator { .. }
|
||||
| Error::BuildPartitionClient { .. }
|
||||
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
|
||||
| Error::BuildKafkaClient { .. }
|
||||
| Error::RepartitionSubprocedureStateFetch { .. } => StatusCode::Internal,
|
||||
|
||||
Error::DeleteRecords { .. }
|
||||
| Error::GetOffset { .. }
|
||||
@@ -1066,7 +1199,14 @@ impl ErrorExt for Error {
|
||||
| Error::TooManyPartitions { .. }
|
||||
| Error::TomlFormat { .. }
|
||||
| Error::HandlerNotFound { .. }
|
||||
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
|
||||
| Error::LeaderPeerChanged { .. }
|
||||
| Error::RepartitionSourceExprMismatch { .. }
|
||||
| Error::RepartitionMissingSourceRegionId { .. }
|
||||
| Error::RepartitionSourceRegionRouteMissing { .. }
|
||||
| Error::RepartitionNoSourceRegions { .. }
|
||||
| Error::RepartitionMissingTargetRegionId { .. }
|
||||
| Error::RepartitionTargetRegionRouteMissing { .. }
|
||||
| Error::RepartitionMissingPrepareContext { .. } => StatusCode::InvalidArguments,
|
||||
Error::LeaseKeyFromUtf8 { .. }
|
||||
| Error::LeaseValueFromUtf8 { .. }
|
||||
| Error::InvalidRegionKeyFromUtf8 { .. }
|
||||
@@ -1080,7 +1220,10 @@ impl ErrorExt for Error {
|
||||
| Error::RegionRouteNotFound { .. }
|
||||
| Error::MigrationAbort { .. }
|
||||
| Error::MigrationRunning { .. }
|
||||
| Error::RegionMigrated { .. } => StatusCode::Unexpected,
|
||||
| Error::RegionMigrated { .. }
|
||||
| Error::RepartitionSubprocedureUnknown { .. }
|
||||
| Error::RepartitionSubprocedureStateMissing { .. }
|
||||
| Error::RepartitionSubprocedureFailed { .. } => StatusCode::Unexpected,
|
||||
Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
Error::SaveClusterInfo { source, .. }
|
||||
| Error::InvalidClusterInfoFormat { source, .. }
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
pub mod region_migration;
|
||||
pub mod repartition;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
#[cfg(test)]
|
||||
|
||||
506
src/meta-srv/src/procedure/repartition.rs
Normal file
506
src/meta-srv/src/procedure/repartition.rs
Normal file
@@ -0,0 +1,506 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod context;
|
||||
mod group;
|
||||
mod plan;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_meta::ddl::DdlContext;
|
||||
use common_meta::key::table_route::PhysicalTableRouteValue;
|
||||
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock};
|
||||
use common_procedure::error::{Error as ProcedureError, Result as ProcedureResult, ToJsonSnafu};
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, LockKey, Procedure, ProcedureId, ProcedureWithId, Status,
|
||||
};
|
||||
use common_telemetry::error;
|
||||
use partition::expr::PartitionExpr;
|
||||
use partition::subtask::{self, RepartitionSubtask};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::TableId;
|
||||
use strum::AsRefStr;
|
||||
use uuid::Uuid;
|
||||
|
||||
use self::context::{GroupManifestSummary, RepartitionContext};
|
||||
use self::group::RepartitionGroupProcedure;
|
||||
use self::plan::{PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand};
|
||||
use crate::error::{self, Result};
|
||||
|
||||
/// Task payload passed from the DDL entry point.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RepartitionTask {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
/// Partition expressions representing the source regions.
|
||||
pub from_exprs: Vec<PartitionExpr>,
|
||||
/// Partition expressions representing the target regions.
|
||||
pub into_exprs: Vec<PartitionExpr>,
|
||||
}
|
||||
|
||||
/// Procedure that orchestrates the repartition flow.
|
||||
pub struct RepartitionProcedure {
|
||||
context: DdlContext,
|
||||
group_context: RepartitionContext,
|
||||
data: RepartitionData,
|
||||
}
|
||||
|
||||
impl RepartitionProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
|
||||
|
||||
/// Constructs a new procedure instance from a task payload.
|
||||
pub fn new(task: RepartitionTask, context: DdlContext) -> Result<Self> {
|
||||
let group_context = RepartitionContext::new(&context);
|
||||
Ok(Self {
|
||||
context,
|
||||
group_context,
|
||||
data: RepartitionData::new(task),
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds the repartition plan if we have not done so yet.
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
if self.data.plan.is_none() {
|
||||
self.build_plan().await?;
|
||||
}
|
||||
|
||||
self.data.state = RepartitionState::AllocateResources;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
/// Allocates target regions and decides whether the procedure can proceed.
|
||||
async fn on_allocate_resources(&mut self) -> Result<Status> {
|
||||
if !self.data.resource_allocated {
|
||||
let allocated = self.allocate_resources().await?;
|
||||
if !allocated {
|
||||
if let Some(plan) = &self.data.plan {
|
||||
let failed_groups = plan.entries.iter().map(|entry| entry.group_id);
|
||||
self.data.failed_groups.extend(failed_groups);
|
||||
}
|
||||
self.data.state = RepartitionState::Finalize;
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
self.data.resource_allocated = true;
|
||||
}
|
||||
|
||||
self.data.state = RepartitionState::DispatchSubprocedures;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
/// Spawns group subprocedures for every pending plan entry.
|
||||
async fn on_dispatch_subprocedures(&mut self) -> Result<Status> {
|
||||
let plan = match self.data.plan.as_ref() {
|
||||
Some(plan) => plan,
|
||||
None => {
|
||||
self.data.state = RepartitionState::Finalize;
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
};
|
||||
|
||||
let entries_to_schedule: Vec<PlanEntry> = plan
|
||||
.entries
|
||||
.iter()
|
||||
.filter(|entry| {
|
||||
!self.data.succeeded_groups.contains(&entry.group_id)
|
||||
&& !self.data.failed_groups.contains(&entry.group_id)
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if entries_to_schedule.is_empty() {
|
||||
self.data.state = RepartitionState::Finalize;
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
|
||||
let groups_to_schedule: Vec<PlanGroupId> = entries_to_schedule
|
||||
.iter()
|
||||
.map(|entry| entry.group_id)
|
||||
.collect();
|
||||
|
||||
let subprocedures = self.spawn_group_procedures(
|
||||
plan.table_id,
|
||||
plan.route_snapshot.clone(),
|
||||
entries_to_schedule,
|
||||
);
|
||||
self.data.pending_groups = groups_to_schedule;
|
||||
self.data.state = RepartitionState::CollectSubprocedures;
|
||||
|
||||
Ok(Status::suspended(subprocedures, true))
|
||||
}
|
||||
|
||||
/// Records the list of subprocedures that finished and move to finalisation.
|
||||
async fn on_collect_subprocedures(&mut self, ctx: &ProcedureContext) -> Result<Status> {
|
||||
let pending = std::mem::take(&mut self.data.pending_groups);
|
||||
let mut first_error: Option<error::Error> = None;
|
||||
let mut succeeded = Vec::new();
|
||||
|
||||
for group_id in pending {
|
||||
let procedure_id = match self.data.group_subprocedures.remove(&group_id) {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
let err = error::RepartitionSubprocedureUnknownSnafu { group_id }.build();
|
||||
self.data.failed_groups.push(group_id);
|
||||
if first_error.is_none() {
|
||||
first_error = Some(err);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let state_opt = ctx.provider.procedure_state(procedure_id).await.context(
|
||||
error::RepartitionSubprocedureStateFetchSnafu {
|
||||
group_id,
|
||||
procedure_id,
|
||||
},
|
||||
)?;
|
||||
|
||||
let state = match state_opt {
|
||||
Some(state) => state,
|
||||
None => {
|
||||
let err = error::RepartitionSubprocedureStateMissingSnafu {
|
||||
group_id,
|
||||
procedure_id,
|
||||
}
|
||||
.build();
|
||||
self.data.failed_groups.push(group_id);
|
||||
if first_error.is_none() {
|
||||
first_error = Some(err);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if state.is_done() {
|
||||
succeeded.push(group_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let reason = state
|
||||
.error()
|
||||
.map(|err| err.to_string())
|
||||
.unwrap_or_else(|| format!("subprocedure state {}", state.as_str_name()));
|
||||
let err = error::RepartitionSubprocedureFailedSnafu {
|
||||
group_id,
|
||||
procedure_id,
|
||||
reason,
|
||||
}
|
||||
.build();
|
||||
self.data.failed_groups.push(group_id);
|
||||
if first_error.is_none() {
|
||||
first_error = Some(err);
|
||||
}
|
||||
}
|
||||
|
||||
self.data.succeeded_groups.extend(succeeded);
|
||||
self.data.state = RepartitionState::Finalize;
|
||||
|
||||
if let Some(err) = first_error {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
/// Builds the summary that will be returned to the caller.
|
||||
async fn on_finalize(&mut self) -> Result<Status> {
|
||||
self.deallocate_resources().await?;
|
||||
|
||||
self.data.summary = Some(RepartitionSummary {
|
||||
succeeded_groups: self.data.succeeded_groups.clone(),
|
||||
failed_groups: self.data.failed_groups.clone(),
|
||||
manifest_summaries: self.group_context.manifest_summaries(),
|
||||
});
|
||||
self.group_context.clear_group_records();
|
||||
self.data.state = RepartitionState::Finished;
|
||||
Ok(Status::done())
|
||||
}
|
||||
|
||||
/// Constructs the repartition plan from the task specification.
|
||||
async fn build_plan(&mut self) -> Result<()> {
|
||||
let table_id = self.data.task.table_id;
|
||||
let from_exprs = &self.data.task.from_exprs;
|
||||
let into_exprs = &self.data.task.into_exprs;
|
||||
|
||||
let (physical_table_id, physical_route) = self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_route(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
let src_descriptors = Self::source_region_descriptors(from_exprs, &physical_route)?;
|
||||
let subtasks = subtask::create_subtasks(from_exprs, into_exprs)
|
||||
.context(error::RepartitionCreateSubtasksSnafu)?;
|
||||
let entries = Self::build_plan_entries(subtasks, &src_descriptors, into_exprs);
|
||||
|
||||
let demand = ResourceDemand::from_plan_entries(&entries);
|
||||
let plan = RepartitionPlan::new(physical_table_id, entries, demand, physical_route.clone());
|
||||
self.data.plan = Some(plan);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn source_region_descriptors(
|
||||
from_exprs: &[PartitionExpr],
|
||||
physical_route: &PhysicalTableRouteValue,
|
||||
) -> Result<Vec<RegionDescriptor>> {
|
||||
let existing_regions = physical_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.map(|route| (route.region.id, route.region.partition_expr()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let descriptors = from_exprs
|
||||
.iter()
|
||||
.map(|expr| {
|
||||
let expr_json = expr
|
||||
.as_json_str()
|
||||
.context(error::RepartitionSerializePartitionExprSnafu)?;
|
||||
|
||||
let matched_region_id = existing_regions
|
||||
.iter()
|
||||
.find_map(|(region_id, existing_expr)| {
|
||||
(existing_expr == &expr_json).then_some(*region_id)
|
||||
})
|
||||
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
|
||||
expr: expr_json,
|
||||
})?;
|
||||
|
||||
Ok(RegionDescriptor {
|
||||
region_id: Some(matched_region_id),
|
||||
partition_expr: expr.clone(),
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
Ok(descriptors)
|
||||
}
|
||||
|
||||
fn build_plan_entries(
|
||||
subtasks: Vec<RepartitionSubtask>,
|
||||
source_index: &[RegionDescriptor],
|
||||
target_exprs: &[PartitionExpr],
|
||||
) -> Vec<PlanEntry> {
|
||||
let plan_entries = subtasks
|
||||
.into_iter()
|
||||
.map(|subtask| {
|
||||
let group_id = Uuid::new_v4();
|
||||
let sources = subtask
|
||||
.from_expr_indices
|
||||
.iter()
|
||||
.map(|&idx| source_index[idx].clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let targets = subtask
|
||||
.to_expr_indices
|
||||
.iter()
|
||||
.map(|&idx| RegionDescriptor {
|
||||
region_id: None, // will be assigned later
|
||||
partition_expr: target_exprs[idx].clone(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
PlanEntry::new(group_id, subtask, sources, targets)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
plan_entries
|
||||
}
|
||||
|
||||
/// Allocates resources required by the plan. Returning `false`
|
||||
/// indicates that the procedure should abort.
|
||||
async fn allocate_resources(&mut self) -> Result<bool> {
|
||||
todo!("allocate resources");
|
||||
}
|
||||
|
||||
async fn deallocate_resources(&mut self) -> Result<()> {
|
||||
if !self.data.resource_allocated {
|
||||
return Ok(());
|
||||
}
|
||||
self.data.resource_allocated = false;
|
||||
|
||||
todo!("deallocate resources");
|
||||
}
|
||||
|
||||
/// Builds the child procedure list for the provided plan groups.
|
||||
fn spawn_group_procedures(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
route_snapshot: PhysicalTableRouteValue,
|
||||
entries: Vec<PlanEntry>,
|
||||
) -> Vec<ProcedureWithId> {
|
||||
let mut id_map = HashMap::new();
|
||||
|
||||
let procedures = entries
|
||||
.into_iter()
|
||||
.map(|entry| {
|
||||
let group_id = entry.group_id;
|
||||
let group_procedure = RepartitionGroupProcedure::new(
|
||||
entry,
|
||||
table_id,
|
||||
route_snapshot.clone(),
|
||||
self.data.task.catalog_name.clone(),
|
||||
self.data.task.schema_name.clone(),
|
||||
self.group_context.clone(),
|
||||
);
|
||||
let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
|
||||
id_map.insert(group_id, procedure.id);
|
||||
procedure
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.data.group_subprocedures = id_map;
|
||||
procedures
|
||||
}
|
||||
|
||||
/// Composes the set of locks required to safely mutate table metadata.
|
||||
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
|
||||
let mut lock_key = Vec::with_capacity(3);
|
||||
let catalog = self.data.task.catalog_name.as_str();
|
||||
let schema = self.data.task.schema_name.as_str();
|
||||
lock_key.push(CatalogLock::Read(catalog).into());
|
||||
lock_key.push(SchemaLock::read(catalog, schema).into());
|
||||
lock_key.push(TableLock::Write(self.data.task.table_id).into());
|
||||
|
||||
lock_key
|
||||
}
|
||||
|
||||
async fn trigger_group_rollbacks(&mut self) {
|
||||
if self.data.rollback_triggered {
|
||||
return;
|
||||
}
|
||||
|
||||
match self.group_context.rollback_registered_groups().await {
|
||||
Ok(_) => {
|
||||
self.data.rollback_triggered = true;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "repartition: rollback of successful groups failed");
|
||||
self.data.rollback_triggered = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Procedure for RepartitionProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = self.data.state;
|
||||
let status = match state {
|
||||
RepartitionState::Prepare => self.on_prepare().await,
|
||||
RepartitionState::AllocateResources => self.on_allocate_resources().await,
|
||||
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
|
||||
RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await,
|
||||
RepartitionState::Finalize => self.on_finalize().await,
|
||||
RepartitionState::Finished => Ok(Status::done()),
|
||||
};
|
||||
|
||||
match status {
|
||||
Ok(status) => Ok(status),
|
||||
Err(err) => {
|
||||
self.trigger_group_rollbacks().await;
|
||||
if let Err(dealloc_err) = self.deallocate_resources().await {
|
||||
error!(dealloc_err; "repartition: deallocating resources after failure failed");
|
||||
}
|
||||
Err(map_repartition_error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::new(self.table_lock_key())
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialized data of the repartition procedure.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct RepartitionData {
|
||||
state: RepartitionState,
|
||||
task: RepartitionTask,
|
||||
#[serde(default)]
|
||||
plan: Option<RepartitionPlan>,
|
||||
#[serde(default)]
|
||||
resource_allocated: bool,
|
||||
#[serde(default)]
|
||||
pending_groups: Vec<PlanGroupId>,
|
||||
#[serde(default)]
|
||||
succeeded_groups: Vec<PlanGroupId>,
|
||||
#[serde(default)]
|
||||
failed_groups: Vec<PlanGroupId>,
|
||||
#[serde(default)]
|
||||
summary: Option<RepartitionSummary>,
|
||||
#[serde(default)]
|
||||
rollback_triggered: bool,
|
||||
#[serde(default)]
|
||||
group_subprocedures: HashMap<PlanGroupId, ProcedureId>,
|
||||
}
|
||||
|
||||
impl RepartitionData {
|
||||
/// Initialise the procedure data for a fresh run.
|
||||
fn new(task: RepartitionTask) -> Self {
|
||||
Self {
|
||||
state: RepartitionState::Prepare,
|
||||
task,
|
||||
plan: None,
|
||||
resource_allocated: false,
|
||||
pending_groups: Vec::new(),
|
||||
succeeded_groups: Vec::new(),
|
||||
failed_groups: Vec::new(),
|
||||
summary: None,
|
||||
rollback_triggered: false,
|
||||
group_subprocedures: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn map_repartition_error(err: error::Error) -> ProcedureError {
|
||||
match (err.is_retryable(), err.need_clean_poisons()) {
|
||||
(true, true) => ProcedureError::retry_later_and_clean_poisons(err),
|
||||
(true, false) => ProcedureError::retry_later(err),
|
||||
(false, true) => ProcedureError::external_and_clean_poisons(err),
|
||||
(false, false) => ProcedureError::external(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// High level states of the repartition procedure.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
|
||||
enum RepartitionState {
|
||||
Prepare,
|
||||
AllocateResources,
|
||||
DispatchSubprocedures,
|
||||
CollectSubprocedures,
|
||||
Finalize,
|
||||
Finished,
|
||||
}
|
||||
|
||||
/// Information returned to the caller after the procedure finishes.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
struct RepartitionSummary {
|
||||
succeeded_groups: Vec<PlanGroupId>,
|
||||
failed_groups: Vec<PlanGroupId>,
|
||||
#[serde(default)]
|
||||
manifest_summaries: Vec<GroupManifestSummary>,
|
||||
}
|
||||
351
src/meta-srv/src/procedure/repartition/context.rs
Normal file
351
src/meta-srv/src/procedure/repartition/context.rs
Normal file
@@ -0,0 +1,351 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{
|
||||
ApplyStagedManifestRequest, PauseRequest, PublishRegionRuleRequest, RegionRequest,
|
||||
RegionRequestHeader, RemapManifestRequest, ResumeRequest, StageRegionRuleRequest,
|
||||
region_request,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::DdlContext;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_telemetry::{error, info};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
pub const REMAP_MANIFEST_STATS_EXTENSION: &str = "repartition.manifest.stats";
|
||||
|
||||
use super::group::{GroupRollbackRecord, RepartitionGroupProcedure};
|
||||
use crate::procedure::repartition::plan::PlanGroupId;
|
||||
|
||||
/// Track the overall manifest stage for a repartition group.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
pub enum ManifestStatus {
|
||||
#[default]
|
||||
NotStarted,
|
||||
Staged,
|
||||
Published,
|
||||
Discarded,
|
||||
Skipped,
|
||||
Failed,
|
||||
}
|
||||
|
||||
/// Per-group status record that is collected by the top-level procedure.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct GroupManifestSummary {
|
||||
pub group_id: PlanGroupId,
|
||||
pub status: ManifestStatus,
|
||||
pub staged_region_count: u64,
|
||||
pub stats: Option<Value>,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
/// Shared context that allows group procedures to interact with metadata and
|
||||
/// datanodes. It also aggregates per-group manifest summaries.
|
||||
#[derive(Clone)]
|
||||
pub struct RepartitionContext {
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
pub node_manager: NodeManagerRef,
|
||||
manifest_records: Arc<Mutex<HashMap<PlanGroupId, GroupManifestSummary>>>,
|
||||
rollback_records: Arc<Mutex<HashMap<PlanGroupId, GroupRollbackRecord>>>,
|
||||
}
|
||||
|
||||
impl RepartitionContext {
|
||||
pub fn new(context: &DdlContext) -> Self {
|
||||
Self {
|
||||
table_metadata_manager: context.table_metadata_manager.clone(),
|
||||
node_manager: context.node_manager.clone(),
|
||||
manifest_records: Arc::new(Mutex::new(HashMap::new())),
|
||||
rollback_records: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a pause request to the region leader so that local IO is quiesced.
|
||||
pub async fn pause_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
|
||||
info!(
|
||||
"requesting pause to datanode {} for region {}",
|
||||
peer.id, region_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::Pause(PauseRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
})),
|
||||
};
|
||||
datanode
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to pause region {} on datanode {}",
|
||||
region_id, peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resume a previously paused region.
|
||||
pub async fn resume_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
|
||||
info!(
|
||||
"requesting resume to datanode {} for region {}",
|
||||
peer.id, region_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::Resume(ResumeRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
rule_version: String::new(),
|
||||
})),
|
||||
};
|
||||
datanode
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to resume region {} on datanode {}",
|
||||
region_id, peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stage the provided rule version on the datanode.
|
||||
pub async fn stage_region_rule_on_datanode(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
region_id: RegionId,
|
||||
rule_version: &str,
|
||||
) -> Result<()> {
|
||||
info!(
|
||||
"requesting region rule staging to datanode {} for region {}",
|
||||
peer.id, region_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::StageRegionRule(
|
||||
StageRegionRuleRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
rule_version: rule_version.to_string(),
|
||||
},
|
||||
)),
|
||||
};
|
||||
datanode
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to stage region rule for region {} on datanode {}",
|
||||
region_id, peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Publish the staged rule version to make it active.
|
||||
pub async fn publish_region_rule_on_datanode(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
region_id: RegionId,
|
||||
rule_version: &str,
|
||||
) -> Result<()> {
|
||||
info!(
|
||||
"requesting region rule publish to datanode {} for region {}",
|
||||
peer.id, region_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::PublishRegionRule(
|
||||
PublishRegionRuleRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
rule_version: rule_version.to_string(),
|
||||
},
|
||||
)),
|
||||
};
|
||||
datanode
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to publish region rule for region {} on datanode {}",
|
||||
region_id, peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drop the staged rule version during rollback.
|
||||
pub async fn clear_region_rule_stage_on_datanode(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
region_id: RegionId,
|
||||
) -> Result<()> {
|
||||
info!(
|
||||
"requesting region rule stage clear to datanode {} for region {}",
|
||||
peer.id, region_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::StageRegionRule(
|
||||
StageRegionRuleRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
rule_version: String::new(),
|
||||
},
|
||||
)),
|
||||
};
|
||||
datanode
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to clear staged region rule for region {} on datanode {}",
|
||||
region_id, peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Instruct the datanode to remap manifests for this group.
|
||||
pub async fn remap_manifests_on_datanode(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
manifest_request: RemapManifestRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
let table_id = manifest_request.table_id;
|
||||
let group_id = manifest_request.group_id.clone();
|
||||
info!(
|
||||
"requesting manifest remap to datanode {} for table {} in group {}",
|
||||
peer.id, table_id, group_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let region_request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::RemapManifest(manifest_request)),
|
||||
};
|
||||
let response = datanode
|
||||
.handle(region_request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to remap manifests for group {} on datanode {}",
|
||||
group_id, peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Publish or discard staged manifests.
|
||||
pub async fn apply_staged_manifests_on_datanode(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
manifest_request: ApplyStagedManifestRequest,
|
||||
) -> Result<RegionResponse> {
|
||||
let publish = manifest_request.publish;
|
||||
let table_id = manifest_request.table_id;
|
||||
let group_id = manifest_request.group_id.clone();
|
||||
info!(
|
||||
"requesting manifest {} on datanode {} for table {} in group {}",
|
||||
if publish { "publish" } else { "discard" },
|
||||
peer.id,
|
||||
table_id,
|
||||
group_id
|
||||
);
|
||||
let datanode = self.node_manager.datanode(peer).await;
|
||||
let region_request = RegionRequest {
|
||||
header: Some(RegionRequestHeader::default()),
|
||||
body: Some(region_request::Body::ApplyStagedManifest(manifest_request)),
|
||||
};
|
||||
let response = datanode
|
||||
.handle(region_request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"failed to {} staged manifests for group {} on datanode {}",
|
||||
if publish { "publish" } else { "discard" },
|
||||
group_id,
|
||||
peer.id
|
||||
),
|
||||
})?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Store the latest manifest summary for a group.
|
||||
pub fn record_manifest_summary(&self, summary: GroupManifestSummary) {
|
||||
let mut records = self.manifest_records.lock().unwrap();
|
||||
records.insert(summary.group_id, summary);
|
||||
}
|
||||
|
||||
pub fn register_group_success(&self, record: GroupRollbackRecord) {
|
||||
let mut records = self.rollback_records.lock().unwrap();
|
||||
let group_id = record.group_id;
|
||||
records.insert(group_id, record);
|
||||
}
|
||||
|
||||
pub async fn rollback_registered_groups(&self) -> Result<()> {
|
||||
let records: Vec<GroupRollbackRecord> = {
|
||||
let mut map = self.rollback_records.lock().unwrap();
|
||||
map.drain().map(|(_, record)| record).collect()
|
||||
};
|
||||
|
||||
let mut first_err: Option<error::Error> = None;
|
||||
for record in records {
|
||||
let group_id = record.group_id;
|
||||
if let Err(err) =
|
||||
RepartitionGroupProcedure::execute_rollback(self.clone(), record).await
|
||||
{
|
||||
error!(err; "repartition: rollback of group {:?} failed", group_id);
|
||||
if first_err.is_none() {
|
||||
first_err = Some(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(err) = first_err {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn clear_group_records(&self) {
|
||||
self.rollback_records.lock().unwrap().clear();
|
||||
}
|
||||
|
||||
/// Collect all manifest summaries recorded so far.
|
||||
pub fn manifest_summaries(&self) -> Vec<GroupManifestSummary> {
|
||||
let records = self.manifest_records.lock().unwrap();
|
||||
records.values().cloned().collect()
|
||||
}
|
||||
}
|
||||
1406
src/meta-srv/src/procedure/repartition/group.rs
Normal file
1406
src/meta-srv/src/procedure/repartition/group.rs
Normal file
File diff suppressed because it is too large
Load Diff
95
src/meta-srv/src/procedure/repartition/plan.rs
Normal file
95
src/meta-srv/src/procedure/repartition/plan.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::key::table_route::PhysicalTableRouteValue;
|
||||
use partition::expr::PartitionExpr;
|
||||
use partition::subtask::RepartitionSubtask;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Identifier of a plan group.
|
||||
pub type PlanGroupId = Uuid;
|
||||
|
||||
/// Logical description of the repartition plan.
|
||||
///
|
||||
/// The plan is persisted by the procedure framework so it must remain
|
||||
/// serializable/deserializable across versions.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct RepartitionPlan {
|
||||
pub table_id: TableId,
|
||||
pub entries: Vec<PlanEntry>,
|
||||
pub resource_demand: ResourceDemand,
|
||||
pub route_snapshot: PhysicalTableRouteValue,
|
||||
}
|
||||
|
||||
impl RepartitionPlan {
|
||||
pub fn new(
|
||||
table_id: TableId,
|
||||
entries: Vec<PlanEntry>,
|
||||
resource_demand: ResourceDemand,
|
||||
route_snapshot: PhysicalTableRouteValue,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
entries,
|
||||
resource_demand,
|
||||
route_snapshot,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct PlanEntry {
|
||||
pub group_id: PlanGroupId,
|
||||
pub subtask: RepartitionSubtask,
|
||||
pub sources: Vec<RegionDescriptor>,
|
||||
pub targets: Vec<RegionDescriptor>,
|
||||
}
|
||||
|
||||
impl PlanEntry {
|
||||
/// Construct a plan entry consisting of the connected component returned by
|
||||
/// the planner.
|
||||
pub fn new(
|
||||
group_id: PlanGroupId,
|
||||
subtask: RepartitionSubtask,
|
||||
sources: Vec<RegionDescriptor>,
|
||||
targets: Vec<RegionDescriptor>,
|
||||
) -> Self {
|
||||
Self {
|
||||
group_id,
|
||||
subtask,
|
||||
sources,
|
||||
targets,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata describing a region involved in the plan.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct RegionDescriptor {
|
||||
pub region_id: Option<RegionId>,
|
||||
pub partition_expr: PartitionExpr,
|
||||
}
|
||||
|
||||
/// Auxiliary information about resources required to execute the plan.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
pub struct ResourceDemand {}
|
||||
|
||||
impl ResourceDemand {
|
||||
pub fn from_plan_entries(_entries: &[PlanEntry]) -> Self {
|
||||
// placeholder
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
@@ -14,12 +14,14 @@
|
||||
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::expr::PartitionExpr;
|
||||
use crate::overlap::associate_from_to;
|
||||
|
||||
/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RepartitionSubtask {
|
||||
pub from_expr_indices: Vec<usize>,
|
||||
pub to_expr_indices: Vec<usize>,
|
||||
|
||||
@@ -179,6 +179,30 @@ impl RegionRequest {
|
||||
reason: "ListMetadata request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::Pause(_) => UnexpectedSnafu {
|
||||
reason: "Pause request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::Resume(_) => UnexpectedSnafu {
|
||||
reason: "Resume request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::StageRegionRule(_) => UnexpectedSnafu {
|
||||
reason: "StageRegionRule request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::PublishRegionRule(_) => UnexpectedSnafu {
|
||||
reason: "PublishRegionRule request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::RemapManifest(_) => UnexpectedSnafu {
|
||||
reason: "RemapManifest request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::ApplyStagedManifest(_) => UnexpectedSnafu {
|
||||
reason: "ApplyStagedManifest request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user