mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
7 Commits
feat/inges
...
zhongzc/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
72a6b9ff66 | ||
|
|
1286d4ca74 | ||
|
|
3c1d7fcb89 | ||
|
|
5be4987fd7 | ||
|
|
db11022cff | ||
|
|
15935ee89a | ||
|
|
d0877997a2 |
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -3934,6 +3934,7 @@ dependencies = [
|
|||||||
"mito2",
|
"mito2",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"object-store",
|
"object-store",
|
||||||
|
"partition",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"query",
|
"query",
|
||||||
@@ -5324,7 +5325,6 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "greptime-proto"
|
name = "greptime-proto"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"prost-types 0.13.5",
|
"prost-types 0.13.5",
|
||||||
@@ -6922,7 +6922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
|
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"windows-targets 0.52.6",
|
"windows-targets 0.48.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -7428,6 +7428,7 @@ dependencies = [
|
|||||||
"local-ip-address",
|
"local-ip-address",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot 0.12.4",
|
"parking_lot 0.12.4",
|
||||||
|
"partition",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"rand 0.9.1",
|
"rand 0.9.1",
|
||||||
@@ -9859,7 +9860,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
|
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"heck 0.5.0",
|
"heck 0.5.0",
|
||||||
"itertools 0.14.0",
|
"itertools 0.10.5",
|
||||||
"log",
|
"log",
|
||||||
"multimap",
|
"multimap",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
@@ -9905,7 +9906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"itertools 0.14.0",
|
"itertools 0.10.5",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"syn 2.0.106",
|
"syn 2.0.106",
|
||||||
@@ -14510,7 +14511,7 @@ version = "0.1.9"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"windows-sys 0.59.0",
|
"windows-sys 0.48.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -147,7 +147,8 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
|||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
|
# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
|
||||||
|
greptime-proto = { path = "../greptime-proto" }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
http = "1"
|
http = "1"
|
||||||
humantime = "2.1"
|
humantime = "2.1"
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true }
|
|||||||
tonic.workspace = true
|
tonic.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
typetag.workspace = true
|
typetag.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ metric-engine.workspace = true
|
|||||||
mito2.workspace = true
|
mito2.workspace = true
|
||||||
num_cpus.workspace = true
|
num_cpus.workspace = true
|
||||||
object-store.workspace = true
|
object-store.workspace = true
|
||||||
|
partition.workspace = true
|
||||||
prometheus.workspace = true
|
prometheus.workspace = true
|
||||||
prost.workspace = true
|
prost.workspace = true
|
||||||
query.workspace = true
|
query.workspace = true
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use common_error::define_into_tonic_status;
|
|||||||
use common_error::ext::{BoxedError, ErrorExt};
|
use common_error::ext::{BoxedError, ErrorExt};
|
||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_macro::stack_trace_debug;
|
use common_macro::stack_trace_debug;
|
||||||
|
use mito2::remap_manifest::Error as RemapManifestError;
|
||||||
use snafu::{Location, Snafu};
|
use snafu::{Location, Snafu};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use table::error::Error as TableError;
|
use table::error::Error as TableError;
|
||||||
@@ -396,6 +397,14 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to remap manifests: {}", source))]
|
||||||
|
RemapManifest {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
#[snafu(source)]
|
||||||
|
source: RemapManifestError,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Not yet implemented: {what}"))]
|
#[snafu(display("Not yet implemented: {what}"))]
|
||||||
NotYetImplemented { what: String },
|
NotYetImplemented { what: String },
|
||||||
}
|
}
|
||||||
@@ -469,6 +478,7 @@ impl ErrorExt for Error {
|
|||||||
|
|
||||||
ObjectStore { source, .. } => source.status_code(),
|
ObjectStore { source, .. } => source.status_code(),
|
||||||
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
|
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
|
||||||
|
RemapManifest { .. } => StatusCode::Unexpected,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,9 @@ use api::region::RegionResponse;
|
|||||||
use api::v1::meta::TopicStat;
|
use api::v1::meta::TopicStat;
|
||||||
use api::v1::region::sync_request::ManifestInfo;
|
use api::v1::region::sync_request::ManifestInfo;
|
||||||
use api::v1::region::{
|
use api::v1::region::{
|
||||||
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
|
ApplyStagedManifestRequest, ListMetadataRequest, PauseRequest, PublishRegionRuleRequest,
|
||||||
|
RegionResponse as RegionResponseV1, RemapManifestRequest, ResumeRequest,
|
||||||
|
StageRegionRuleRequest, SyncRequest, region_request,
|
||||||
};
|
};
|
||||||
use api::v1::{ResponseHeader, Status};
|
use api::v1::{ResponseHeader, Status};
|
||||||
use arrow_flight::{FlightData, Ticket};
|
use arrow_flight::{FlightData, Ticket};
|
||||||
@@ -84,6 +86,8 @@ use crate::error::{
|
|||||||
use crate::event_listener::RegionServerEventListenerRef;
|
use crate::event_listener::RegionServerEventListenerRef;
|
||||||
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
|
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
|
||||||
|
|
||||||
|
const REMAP_STATS_EXTENSION_KEY: &str = "repartition.manifest.stats";
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RegionServer {
|
pub struct RegionServer {
|
||||||
inner: Arc<RegionServerInner>,
|
inner: Arc<RegionServerInner>,
|
||||||
@@ -370,6 +374,24 @@ impl RegionServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Temporarily pauses compaction and snapshot related activities for the region.
|
||||||
|
///
|
||||||
|
/// Currently a stub; real implementation will coordinate with region worker.
|
||||||
|
pub async fn pause_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
|
||||||
|
info!("pause_compaction_and_snapshot stub invoked for region {region_id}");
|
||||||
|
let _ = region_id;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resumes compaction and snapshot related activities for the region.
|
||||||
|
///
|
||||||
|
/// Currently a stub; real implementation will coordinate with region worker.
|
||||||
|
pub async fn resume_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> {
|
||||||
|
info!("resume_compaction_and_snapshot stub invoked for region {region_id}");
|
||||||
|
let _ = region_id;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Stop the region server.
|
/// Stop the region server.
|
||||||
pub async fn stop(&self) -> Result<()> {
|
pub async fn stop(&self) -> Result<()> {
|
||||||
self.inner.stop().await
|
self.inner.stop().await
|
||||||
@@ -538,6 +560,124 @@ impl RegionServer {
|
|||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_pause_region_request(&self, request: &PauseRequest) -> Result<RegionResponse> {
|
||||||
|
let region_id = RegionId::from_u64(request.region_id);
|
||||||
|
let tracing_context = TracingContext::from_current_span();
|
||||||
|
let span = tracing_context.attach(info_span!(
|
||||||
|
"RegionServer::handle_pause_region_request",
|
||||||
|
region_id = region_id.to_string()
|
||||||
|
));
|
||||||
|
|
||||||
|
self.pause_compaction_and_snapshot(region_id)
|
||||||
|
.trace(span)
|
||||||
|
.await
|
||||||
|
.map(|_| RegionResponse::new(AffectedRows::default()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_resume_region_request(
|
||||||
|
&self,
|
||||||
|
request: &ResumeRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
let region_id = RegionId::from_u64(request.region_id);
|
||||||
|
let tracing_context = TracingContext::from_current_span();
|
||||||
|
let span = tracing_context.attach(info_span!(
|
||||||
|
"RegionServer::handle_resume_region_request",
|
||||||
|
region_id = region_id.to_string()
|
||||||
|
));
|
||||||
|
|
||||||
|
self.resume_compaction_and_snapshot(region_id)
|
||||||
|
.trace(span)
|
||||||
|
.await
|
||||||
|
.map(|_| RegionResponse::new(AffectedRows::default()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_stage_region_rule_request(
|
||||||
|
&self,
|
||||||
|
request: &StageRegionRuleRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
let region_id = RegionId::from_u64(request.region_id);
|
||||||
|
info!(
|
||||||
|
"Stage region rule for region {region_id} with version {}",
|
||||||
|
request.rule_version
|
||||||
|
);
|
||||||
|
match self
|
||||||
|
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
|
||||||
|
Ok(RegionResponse::new(AffectedRows::default()))
|
||||||
|
}
|
||||||
|
SetRegionRoleStateResponse::InvalidTransition(err) => {
|
||||||
|
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_publish_region_rule_request(
|
||||||
|
&self,
|
||||||
|
request: &PublishRegionRuleRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
let region_id = RegionId::from_u64(request.region_id);
|
||||||
|
info!(
|
||||||
|
"Publish region rule for region {region_id} with version {}",
|
||||||
|
request.rule_version
|
||||||
|
);
|
||||||
|
match self
|
||||||
|
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => {
|
||||||
|
Ok(RegionResponse::new(AffectedRows::default()))
|
||||||
|
}
|
||||||
|
SetRegionRoleStateResponse::InvalidTransition(err) => {
|
||||||
|
Err(err).with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_remap_manifest_request(
|
||||||
|
&self,
|
||||||
|
request: &RemapManifestRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
info!(
|
||||||
|
"received remap manifest request for table {} group {}",
|
||||||
|
request.table_id, request.group_id
|
||||||
|
);
|
||||||
|
|
||||||
|
let stats_json = serde_json::to_vec(&serde_json::json!({
|
||||||
|
"files_per_region": HashMap::<u64, usize>::new(),
|
||||||
|
"total_file_refs": 0u64,
|
||||||
|
"empty_regions": Vec::<u64>::new(),
|
||||||
|
"group_id": &request.group_id,
|
||||||
|
}))
|
||||||
|
.context(SerializeJsonSnafu)?;
|
||||||
|
|
||||||
|
let mut extensions = HashMap::new();
|
||||||
|
extensions.insert(REMAP_STATS_EXTENSION_KEY.to_string(), stats_json);
|
||||||
|
|
||||||
|
Ok(RegionResponse {
|
||||||
|
affected_rows: 0,
|
||||||
|
extensions,
|
||||||
|
metadata: Vec::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_apply_staged_manifest_request(
|
||||||
|
&self,
|
||||||
|
request: &ApplyStagedManifestRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
info!(
|
||||||
|
"received manifest apply request for table {} group {} publish={} regions {:?}",
|
||||||
|
request.table_id, request.group_id, request.publish, request.region_ids
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(RegionResponse {
|
||||||
|
affected_rows: 0,
|
||||||
|
extensions: HashMap::new(),
|
||||||
|
metadata: Vec::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Sync region manifest and registers new opened logical regions.
|
/// Sync region manifest and registers new opened logical regions.
|
||||||
pub async fn sync_region(
|
pub async fn sync_region(
|
||||||
&self,
|
&self,
|
||||||
@@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer {
|
|||||||
region_request::Body::Sync(sync_request) => {
|
region_request::Body::Sync(sync_request) => {
|
||||||
self.handle_sync_region_request(sync_request).await
|
self.handle_sync_region_request(sync_request).await
|
||||||
}
|
}
|
||||||
|
region_request::Body::Pause(pause_request) => {
|
||||||
|
self.handle_pause_region_request(pause_request).await
|
||||||
|
}
|
||||||
|
region_request::Body::Resume(resume_request) => {
|
||||||
|
self.handle_resume_region_request(resume_request).await
|
||||||
|
}
|
||||||
|
region_request::Body::StageRegionRule(stage_request) => {
|
||||||
|
self.handle_stage_region_rule_request(stage_request).await
|
||||||
|
}
|
||||||
|
region_request::Body::PublishRegionRule(publish_request) => {
|
||||||
|
self.handle_publish_region_rule_request(publish_request)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
region_request::Body::RemapManifest(remap_request) => {
|
||||||
|
self.handle_remap_manifest_request(remap_request).await
|
||||||
|
}
|
||||||
|
region_request::Body::ApplyStagedManifest(apply_request) => {
|
||||||
|
self.handle_apply_staged_manifest_request(apply_request)
|
||||||
|
.await
|
||||||
|
}
|
||||||
region_request::Body::ListMetadata(list_metadata_request) => {
|
region_request::Body::ListMetadata(list_metadata_request) => {
|
||||||
self.handle_list_metadata_request(list_metadata_request)
|
self.handle_list_metadata_request(list_metadata_request)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ common-time.workspace = true
|
|||||||
common-version.workspace = true
|
common-version.workspace = true
|
||||||
common-wal.workspace = true
|
common-wal.workspace = true
|
||||||
common-workload.workspace = true
|
common-workload.workspace = true
|
||||||
|
partition.workspace = true
|
||||||
dashmap.workspace = true
|
dashmap.workspace = true
|
||||||
datatypes.workspace = true
|
datatypes.workspace = true
|
||||||
deadpool = { workspace = true, optional = true }
|
deadpool = { workspace = true, optional = true }
|
||||||
|
|||||||
@@ -17,12 +17,14 @@ use common_error::ext::{BoxedError, ErrorExt};
|
|||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_macro::stack_trace_debug;
|
use common_macro::stack_trace_debug;
|
||||||
use common_meta::DatanodeId;
|
use common_meta::DatanodeId;
|
||||||
|
use common_procedure::ProcedureId;
|
||||||
use common_runtime::JoinError;
|
use common_runtime::JoinError;
|
||||||
use snafu::{Location, Snafu};
|
use snafu::{Location, Snafu};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use table::metadata::TableId;
|
use table::metadata::TableId;
|
||||||
use tokio::sync::mpsc::error::SendError;
|
use tokio::sync::mpsc::error::SendError;
|
||||||
use tonic::codegen::http;
|
use tonic::codegen::http;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::metasrv::SelectTarget;
|
use crate::metasrv::SelectTarget;
|
||||||
use crate::pubsub::Message;
|
use crate::pubsub::Message;
|
||||||
@@ -774,6 +776,129 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to create repartition subtasks"))]
|
||||||
|
RepartitionCreateSubtasks {
|
||||||
|
source: partition::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to serialize partition expression"))]
|
||||||
|
RepartitionSerializePartitionExpr {
|
||||||
|
source: partition::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Source partition expression '{}' does not match any existing region",
|
||||||
|
expr
|
||||||
|
))]
|
||||||
|
RepartitionSourceExprMismatch {
|
||||||
|
expr: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Repartition group {} is missing a source region id", group_id))]
|
||||||
|
RepartitionMissingSourceRegionId {
|
||||||
|
group_id: Uuid,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Repartition group {} cannot find route for source region {}",
|
||||||
|
group_id,
|
||||||
|
region_id
|
||||||
|
))]
|
||||||
|
RepartitionSourceRegionRouteMissing {
|
||||||
|
group_id: Uuid,
|
||||||
|
region_id: RegionId,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Repartition group {} has no source regions after planning", group_id))]
|
||||||
|
RepartitionNoSourceRegions {
|
||||||
|
group_id: Uuid,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Repartition group {} target {} is missing an allocated region id",
|
||||||
|
group_id,
|
||||||
|
target_index
|
||||||
|
))]
|
||||||
|
RepartitionMissingTargetRegionId {
|
||||||
|
group_id: Uuid,
|
||||||
|
target_index: usize,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Route for target region {} not found", region_id))]
|
||||||
|
RepartitionTargetRegionRouteMissing {
|
||||||
|
region_id: RegionId,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Repartition group {} is missing prepare context", group_id))]
|
||||||
|
RepartitionMissingPrepareContext {
|
||||||
|
group_id: Uuid,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Repartition group {} has no registered subprocedure", group_id))]
|
||||||
|
RepartitionSubprocedureUnknown {
|
||||||
|
group_id: Uuid,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Failed to fetch state for repartition group {} subprocedure {}",
|
||||||
|
group_id,
|
||||||
|
procedure_id
|
||||||
|
))]
|
||||||
|
RepartitionSubprocedureStateFetch {
|
||||||
|
group_id: Uuid,
|
||||||
|
procedure_id: ProcedureId,
|
||||||
|
#[snafu(source)]
|
||||||
|
source: common_procedure::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Repartition group {} subprocedure {} state missing",
|
||||||
|
group_id,
|
||||||
|
procedure_id
|
||||||
|
))]
|
||||||
|
RepartitionSubprocedureStateMissing {
|
||||||
|
group_id: Uuid,
|
||||||
|
procedure_id: ProcedureId,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Repartition group {} subprocedure {} failed: {}",
|
||||||
|
group_id,
|
||||||
|
procedure_id,
|
||||||
|
reason
|
||||||
|
))]
|
||||||
|
RepartitionSubprocedureFailed {
|
||||||
|
group_id: Uuid,
|
||||||
|
procedure_id: ProcedureId,
|
||||||
|
reason: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Unsupported operation {}", operation))]
|
#[snafu(display("Unsupported operation {}", operation))]
|
||||||
Unsupported {
|
Unsupported {
|
||||||
operation: String,
|
operation: String,
|
||||||
@@ -997,6 +1122,11 @@ impl Error {
|
|||||||
matches!(self, Error::RetryLater { .. })
|
matches!(self, Error::RetryLater { .. })
|
||||||
|| matches!(self, Error::RetryLaterWithSource { .. })
|
|| matches!(self, Error::RetryLaterWithSource { .. })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the error requires cleaning poison records.
|
||||||
|
pub fn need_clean_poisons(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@@ -1012,6 +1142,8 @@ impl ErrorExt for Error {
|
|||||||
| Error::TcpBind { .. }
|
| Error::TcpBind { .. }
|
||||||
| Error::SerializeToJson { .. }
|
| Error::SerializeToJson { .. }
|
||||||
| Error::DeserializeFromJson { .. }
|
| Error::DeserializeFromJson { .. }
|
||||||
|
| Error::RepartitionCreateSubtasks { .. }
|
||||||
|
| Error::RepartitionSerializePartitionExpr { .. }
|
||||||
| Error::NoLeader { .. }
|
| Error::NoLeader { .. }
|
||||||
| Error::LeaderLeaseExpired { .. }
|
| Error::LeaderLeaseExpired { .. }
|
||||||
| Error::LeaderLeaseChanged { .. }
|
| Error::LeaderLeaseChanged { .. }
|
||||||
@@ -1032,7 +1164,8 @@ impl ErrorExt for Error {
|
|||||||
| Error::FlowStateHandler { .. }
|
| Error::FlowStateHandler { .. }
|
||||||
| Error::BuildWalOptionsAllocator { .. }
|
| Error::BuildWalOptionsAllocator { .. }
|
||||||
| Error::BuildPartitionClient { .. }
|
| Error::BuildPartitionClient { .. }
|
||||||
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
|
| Error::BuildKafkaClient { .. }
|
||||||
|
| Error::RepartitionSubprocedureStateFetch { .. } => StatusCode::Internal,
|
||||||
|
|
||||||
Error::DeleteRecords { .. }
|
Error::DeleteRecords { .. }
|
||||||
| Error::GetOffset { .. }
|
| Error::GetOffset { .. }
|
||||||
@@ -1066,7 +1199,14 @@ impl ErrorExt for Error {
|
|||||||
| Error::TooManyPartitions { .. }
|
| Error::TooManyPartitions { .. }
|
||||||
| Error::TomlFormat { .. }
|
| Error::TomlFormat { .. }
|
||||||
| Error::HandlerNotFound { .. }
|
| Error::HandlerNotFound { .. }
|
||||||
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
|
| Error::LeaderPeerChanged { .. }
|
||||||
|
| Error::RepartitionSourceExprMismatch { .. }
|
||||||
|
| Error::RepartitionMissingSourceRegionId { .. }
|
||||||
|
| Error::RepartitionSourceRegionRouteMissing { .. }
|
||||||
|
| Error::RepartitionNoSourceRegions { .. }
|
||||||
|
| Error::RepartitionMissingTargetRegionId { .. }
|
||||||
|
| Error::RepartitionTargetRegionRouteMissing { .. }
|
||||||
|
| Error::RepartitionMissingPrepareContext { .. } => StatusCode::InvalidArguments,
|
||||||
Error::LeaseKeyFromUtf8 { .. }
|
Error::LeaseKeyFromUtf8 { .. }
|
||||||
| Error::LeaseValueFromUtf8 { .. }
|
| Error::LeaseValueFromUtf8 { .. }
|
||||||
| Error::InvalidRegionKeyFromUtf8 { .. }
|
| Error::InvalidRegionKeyFromUtf8 { .. }
|
||||||
@@ -1080,7 +1220,10 @@ impl ErrorExt for Error {
|
|||||||
| Error::RegionRouteNotFound { .. }
|
| Error::RegionRouteNotFound { .. }
|
||||||
| Error::MigrationAbort { .. }
|
| Error::MigrationAbort { .. }
|
||||||
| Error::MigrationRunning { .. }
|
| Error::MigrationRunning { .. }
|
||||||
| Error::RegionMigrated { .. } => StatusCode::Unexpected,
|
| Error::RegionMigrated { .. }
|
||||||
|
| Error::RepartitionSubprocedureUnknown { .. }
|
||||||
|
| Error::RepartitionSubprocedureStateMissing { .. }
|
||||||
|
| Error::RepartitionSubprocedureFailed { .. } => StatusCode::Unexpected,
|
||||||
Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||||
Error::SaveClusterInfo { source, .. }
|
Error::SaveClusterInfo { source, .. }
|
||||||
| Error::InvalidClusterInfoFormat { source, .. }
|
| Error::InvalidClusterInfoFormat { source, .. }
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef;
|
|||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
pub mod region_migration;
|
pub mod region_migration;
|
||||||
|
pub mod repartition;
|
||||||
#[cfg(any(test, feature = "testing"))]
|
#[cfg(any(test, feature = "testing"))]
|
||||||
pub mod test_util;
|
pub mod test_util;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
506
src/meta-srv/src/procedure/repartition.rs
Normal file
506
src/meta-srv/src/procedure/repartition.rs
Normal file
@@ -0,0 +1,506 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
mod context;
|
||||||
|
mod group;
|
||||||
|
mod plan;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use common_meta::ddl::DdlContext;
|
||||||
|
use common_meta::key::table_route::PhysicalTableRouteValue;
|
||||||
|
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock};
|
||||||
|
use common_procedure::error::{Error as ProcedureError, Result as ProcedureResult, ToJsonSnafu};
|
||||||
|
use common_procedure::{
|
||||||
|
Context as ProcedureContext, LockKey, Procedure, ProcedureId, ProcedureWithId, Status,
|
||||||
|
};
|
||||||
|
use common_telemetry::error;
|
||||||
|
use partition::expr::PartitionExpr;
|
||||||
|
use partition::subtask::{self, RepartitionSubtask};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use store_api::storage::TableId;
|
||||||
|
use strum::AsRefStr;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use self::context::{GroupManifestSummary, RepartitionContext};
|
||||||
|
use self::group::RepartitionGroupProcedure;
|
||||||
|
use self::plan::{PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand};
|
||||||
|
use crate::error::{self, Result};
|
||||||
|
|
||||||
|
/// Task payload passed from the DDL entry point.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RepartitionTask {
|
||||||
|
pub catalog_name: String,
|
||||||
|
pub schema_name: String,
|
||||||
|
pub table_name: String,
|
||||||
|
pub table_id: TableId,
|
||||||
|
/// Partition expressions representing the source regions.
|
||||||
|
pub from_exprs: Vec<PartitionExpr>,
|
||||||
|
/// Partition expressions representing the target regions.
|
||||||
|
pub into_exprs: Vec<PartitionExpr>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Procedure that orchestrates the repartition flow.
|
||||||
|
pub struct RepartitionProcedure {
|
||||||
|
context: DdlContext,
|
||||||
|
group_context: RepartitionContext,
|
||||||
|
data: RepartitionData,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RepartitionProcedure {
|
||||||
|
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
|
||||||
|
|
||||||
|
/// Constructs a new procedure instance from a task payload.
|
||||||
|
pub fn new(task: RepartitionTask, context: DdlContext) -> Result<Self> {
|
||||||
|
let group_context = RepartitionContext::new(&context);
|
||||||
|
Ok(Self {
|
||||||
|
context,
|
||||||
|
group_context,
|
||||||
|
data: RepartitionData::new(task),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds the repartition plan if we have not done so yet.
|
||||||
|
async fn on_prepare(&mut self) -> Result<Status> {
|
||||||
|
if self.data.plan.is_none() {
|
||||||
|
self.build_plan().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.data.state = RepartitionState::AllocateResources;
|
||||||
|
Ok(Status::executing(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Allocates target regions and decides whether the procedure can proceed.
|
||||||
|
async fn on_allocate_resources(&mut self) -> Result<Status> {
|
||||||
|
if !self.data.resource_allocated {
|
||||||
|
let allocated = self.allocate_resources().await?;
|
||||||
|
if !allocated {
|
||||||
|
if let Some(plan) = &self.data.plan {
|
||||||
|
let failed_groups = plan.entries.iter().map(|entry| entry.group_id);
|
||||||
|
self.data.failed_groups.extend(failed_groups);
|
||||||
|
}
|
||||||
|
self.data.state = RepartitionState::Finalize;
|
||||||
|
return Ok(Status::executing(true));
|
||||||
|
}
|
||||||
|
self.data.resource_allocated = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.data.state = RepartitionState::DispatchSubprocedures;
|
||||||
|
Ok(Status::executing(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawns group subprocedures for every pending plan entry.
|
||||||
|
async fn on_dispatch_subprocedures(&mut self) -> Result<Status> {
|
||||||
|
let plan = match self.data.plan.as_ref() {
|
||||||
|
Some(plan) => plan,
|
||||||
|
None => {
|
||||||
|
self.data.state = RepartitionState::Finalize;
|
||||||
|
return Ok(Status::executing(true));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let entries_to_schedule: Vec<PlanEntry> = plan
|
||||||
|
.entries
|
||||||
|
.iter()
|
||||||
|
.filter(|entry| {
|
||||||
|
!self.data.succeeded_groups.contains(&entry.group_id)
|
||||||
|
&& !self.data.failed_groups.contains(&entry.group_id)
|
||||||
|
})
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if entries_to_schedule.is_empty() {
|
||||||
|
self.data.state = RepartitionState::Finalize;
|
||||||
|
return Ok(Status::executing(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
let groups_to_schedule: Vec<PlanGroupId> = entries_to_schedule
|
||||||
|
.iter()
|
||||||
|
.map(|entry| entry.group_id)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let subprocedures = self.spawn_group_procedures(
|
||||||
|
plan.table_id,
|
||||||
|
plan.route_snapshot.clone(),
|
||||||
|
entries_to_schedule,
|
||||||
|
);
|
||||||
|
self.data.pending_groups = groups_to_schedule;
|
||||||
|
self.data.state = RepartitionState::CollectSubprocedures;
|
||||||
|
|
||||||
|
Ok(Status::suspended(subprocedures, true))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Records the list of subprocedures that finished and move to finalisation.
|
||||||
|
async fn on_collect_subprocedures(&mut self, ctx: &ProcedureContext) -> Result<Status> {
|
||||||
|
let pending = std::mem::take(&mut self.data.pending_groups);
|
||||||
|
let mut first_error: Option<error::Error> = None;
|
||||||
|
let mut succeeded = Vec::new();
|
||||||
|
|
||||||
|
for group_id in pending {
|
||||||
|
let procedure_id = match self.data.group_subprocedures.remove(&group_id) {
|
||||||
|
Some(id) => id,
|
||||||
|
None => {
|
||||||
|
let err = error::RepartitionSubprocedureUnknownSnafu { group_id }.build();
|
||||||
|
self.data.failed_groups.push(group_id);
|
||||||
|
if first_error.is_none() {
|
||||||
|
first_error = Some(err);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let state_opt = ctx.provider.procedure_state(procedure_id).await.context(
|
||||||
|
error::RepartitionSubprocedureStateFetchSnafu {
|
||||||
|
group_id,
|
||||||
|
procedure_id,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let state = match state_opt {
|
||||||
|
Some(state) => state,
|
||||||
|
None => {
|
||||||
|
let err = error::RepartitionSubprocedureStateMissingSnafu {
|
||||||
|
group_id,
|
||||||
|
procedure_id,
|
||||||
|
}
|
||||||
|
.build();
|
||||||
|
self.data.failed_groups.push(group_id);
|
||||||
|
if first_error.is_none() {
|
||||||
|
first_error = Some(err);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if state.is_done() {
|
||||||
|
succeeded.push(group_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let reason = state
|
||||||
|
.error()
|
||||||
|
.map(|err| err.to_string())
|
||||||
|
.unwrap_or_else(|| format!("subprocedure state {}", state.as_str_name()));
|
||||||
|
let err = error::RepartitionSubprocedureFailedSnafu {
|
||||||
|
group_id,
|
||||||
|
procedure_id,
|
||||||
|
reason,
|
||||||
|
}
|
||||||
|
.build();
|
||||||
|
self.data.failed_groups.push(group_id);
|
||||||
|
if first_error.is_none() {
|
||||||
|
first_error = Some(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.data.succeeded_groups.extend(succeeded);
|
||||||
|
self.data.state = RepartitionState::Finalize;
|
||||||
|
|
||||||
|
if let Some(err) = first_error {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Status::executing(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds the summary that will be returned to the caller.
|
||||||
|
async fn on_finalize(&mut self) -> Result<Status> {
|
||||||
|
self.deallocate_resources().await?;
|
||||||
|
|
||||||
|
self.data.summary = Some(RepartitionSummary {
|
||||||
|
succeeded_groups: self.data.succeeded_groups.clone(),
|
||||||
|
failed_groups: self.data.failed_groups.clone(),
|
||||||
|
manifest_summaries: self.group_context.manifest_summaries(),
|
||||||
|
});
|
||||||
|
self.group_context.clear_group_records();
|
||||||
|
self.data.state = RepartitionState::Finished;
|
||||||
|
Ok(Status::done())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Constructs the repartition plan from the task specification.
|
||||||
|
async fn build_plan(&mut self) -> Result<()> {
|
||||||
|
let table_id = self.data.task.table_id;
|
||||||
|
let from_exprs = &self.data.task.from_exprs;
|
||||||
|
let into_exprs = &self.data.task.into_exprs;
|
||||||
|
|
||||||
|
let (physical_table_id, physical_route) = self
|
||||||
|
.context
|
||||||
|
.table_metadata_manager
|
||||||
|
.table_route_manager()
|
||||||
|
.get_physical_table_route(table_id)
|
||||||
|
.await
|
||||||
|
.context(error::TableMetadataManagerSnafu)?;
|
||||||
|
|
||||||
|
let src_descriptors = Self::source_region_descriptors(from_exprs, &physical_route)?;
|
||||||
|
let subtasks = subtask::create_subtasks(from_exprs, into_exprs)
|
||||||
|
.context(error::RepartitionCreateSubtasksSnafu)?;
|
||||||
|
let entries = Self::build_plan_entries(subtasks, &src_descriptors, into_exprs);
|
||||||
|
|
||||||
|
let demand = ResourceDemand::from_plan_entries(&entries);
|
||||||
|
let plan = RepartitionPlan::new(physical_table_id, entries, demand, physical_route.clone());
|
||||||
|
self.data.plan = Some(plan);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn source_region_descriptors(
|
||||||
|
from_exprs: &[PartitionExpr],
|
||||||
|
physical_route: &PhysicalTableRouteValue,
|
||||||
|
) -> Result<Vec<RegionDescriptor>> {
|
||||||
|
let existing_regions = physical_route
|
||||||
|
.region_routes
|
||||||
|
.iter()
|
||||||
|
.map(|route| (route.region.id, route.region.partition_expr()))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let descriptors = from_exprs
|
||||||
|
.iter()
|
||||||
|
.map(|expr| {
|
||||||
|
let expr_json = expr
|
||||||
|
.as_json_str()
|
||||||
|
.context(error::RepartitionSerializePartitionExprSnafu)?;
|
||||||
|
|
||||||
|
let matched_region_id = existing_regions
|
||||||
|
.iter()
|
||||||
|
.find_map(|(region_id, existing_expr)| {
|
||||||
|
(existing_expr == &expr_json).then_some(*region_id)
|
||||||
|
})
|
||||||
|
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
|
||||||
|
expr: expr_json,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(RegionDescriptor {
|
||||||
|
region_id: Some(matched_region_id),
|
||||||
|
partition_expr: expr.clone(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
Ok(descriptors)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_plan_entries(
|
||||||
|
subtasks: Vec<RepartitionSubtask>,
|
||||||
|
source_index: &[RegionDescriptor],
|
||||||
|
target_exprs: &[PartitionExpr],
|
||||||
|
) -> Vec<PlanEntry> {
|
||||||
|
let plan_entries = subtasks
|
||||||
|
.into_iter()
|
||||||
|
.map(|subtask| {
|
||||||
|
let group_id = Uuid::new_v4();
|
||||||
|
let sources = subtask
|
||||||
|
.from_expr_indices
|
||||||
|
.iter()
|
||||||
|
.map(|&idx| source_index[idx].clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let targets = subtask
|
||||||
|
.to_expr_indices
|
||||||
|
.iter()
|
||||||
|
.map(|&idx| RegionDescriptor {
|
||||||
|
region_id: None, // will be assigned later
|
||||||
|
partition_expr: target_exprs[idx].clone(),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
PlanEntry::new(group_id, subtask, sources, targets)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
plan_entries
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Allocates resources required by the plan. Returning `false`
|
||||||
|
/// indicates that the procedure should abort.
|
||||||
|
async fn allocate_resources(&mut self) -> Result<bool> {
|
||||||
|
todo!("allocate resources");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deallocate_resources(&mut self) -> Result<()> {
|
||||||
|
if !self.data.resource_allocated {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.data.resource_allocated = false;
|
||||||
|
|
||||||
|
todo!("deallocate resources");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds the child procedure list for the provided plan groups.
|
||||||
|
fn spawn_group_procedures(
|
||||||
|
&mut self,
|
||||||
|
table_id: TableId,
|
||||||
|
route_snapshot: PhysicalTableRouteValue,
|
||||||
|
entries: Vec<PlanEntry>,
|
||||||
|
) -> Vec<ProcedureWithId> {
|
||||||
|
let mut id_map = HashMap::new();
|
||||||
|
|
||||||
|
let procedures = entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|entry| {
|
||||||
|
let group_id = entry.group_id;
|
||||||
|
let group_procedure = RepartitionGroupProcedure::new(
|
||||||
|
entry,
|
||||||
|
table_id,
|
||||||
|
route_snapshot.clone(),
|
||||||
|
self.data.task.catalog_name.clone(),
|
||||||
|
self.data.task.schema_name.clone(),
|
||||||
|
self.group_context.clone(),
|
||||||
|
);
|
||||||
|
let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
|
||||||
|
id_map.insert(group_id, procedure.id);
|
||||||
|
procedure
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
self.data.group_subprocedures = id_map;
|
||||||
|
procedures
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Composes the set of locks required to safely mutate table metadata.
|
||||||
|
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
|
||||||
|
let mut lock_key = Vec::with_capacity(3);
|
||||||
|
let catalog = self.data.task.catalog_name.as_str();
|
||||||
|
let schema = self.data.task.schema_name.as_str();
|
||||||
|
lock_key.push(CatalogLock::Read(catalog).into());
|
||||||
|
lock_key.push(SchemaLock::read(catalog, schema).into());
|
||||||
|
lock_key.push(TableLock::Write(self.data.task.table_id).into());
|
||||||
|
|
||||||
|
lock_key
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn trigger_group_rollbacks(&mut self) {
|
||||||
|
if self.data.rollback_triggered {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.group_context.rollback_registered_groups().await {
|
||||||
|
Ok(_) => {
|
||||||
|
self.data.rollback_triggered = true;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!(err; "repartition: rollback of successful groups failed");
|
||||||
|
self.data.rollback_triggered = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Procedure for RepartitionProcedure {
|
||||||
|
fn type_name(&self) -> &str {
|
||||||
|
Self::TYPE_NAME
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||||
|
let state = self.data.state;
|
||||||
|
let status = match state {
|
||||||
|
RepartitionState::Prepare => self.on_prepare().await,
|
||||||
|
RepartitionState::AllocateResources => self.on_allocate_resources().await,
|
||||||
|
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
|
||||||
|
RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await,
|
||||||
|
RepartitionState::Finalize => self.on_finalize().await,
|
||||||
|
RepartitionState::Finished => Ok(Status::done()),
|
||||||
|
};
|
||||||
|
|
||||||
|
match status {
|
||||||
|
Ok(status) => Ok(status),
|
||||||
|
Err(err) => {
|
||||||
|
self.trigger_group_rollbacks().await;
|
||||||
|
if let Err(dealloc_err) = self.deallocate_resources().await {
|
||||||
|
error!(dealloc_err; "repartition: deallocating resources after failure failed");
|
||||||
|
}
|
||||||
|
Err(map_repartition_error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dump(&self) -> ProcedureResult<String> {
|
||||||
|
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lock_key(&self) -> LockKey {
|
||||||
|
LockKey::new(self.table_lock_key())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serialized data of the repartition procedure.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
struct RepartitionData {
|
||||||
|
state: RepartitionState,
|
||||||
|
task: RepartitionTask,
|
||||||
|
#[serde(default)]
|
||||||
|
plan: Option<RepartitionPlan>,
|
||||||
|
#[serde(default)]
|
||||||
|
resource_allocated: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pending_groups: Vec<PlanGroupId>,
|
||||||
|
#[serde(default)]
|
||||||
|
succeeded_groups: Vec<PlanGroupId>,
|
||||||
|
#[serde(default)]
|
||||||
|
failed_groups: Vec<PlanGroupId>,
|
||||||
|
#[serde(default)]
|
||||||
|
summary: Option<RepartitionSummary>,
|
||||||
|
#[serde(default)]
|
||||||
|
rollback_triggered: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
group_subprocedures: HashMap<PlanGroupId, ProcedureId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RepartitionData {
|
||||||
|
/// Initialise the procedure data for a fresh run.
|
||||||
|
fn new(task: RepartitionTask) -> Self {
|
||||||
|
Self {
|
||||||
|
state: RepartitionState::Prepare,
|
||||||
|
task,
|
||||||
|
plan: None,
|
||||||
|
resource_allocated: false,
|
||||||
|
pending_groups: Vec::new(),
|
||||||
|
succeeded_groups: Vec::new(),
|
||||||
|
failed_groups: Vec::new(),
|
||||||
|
summary: None,
|
||||||
|
rollback_triggered: false,
|
||||||
|
group_subprocedures: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn map_repartition_error(err: error::Error) -> ProcedureError {
|
||||||
|
match (err.is_retryable(), err.need_clean_poisons()) {
|
||||||
|
(true, true) => ProcedureError::retry_later_and_clean_poisons(err),
|
||||||
|
(true, false) => ProcedureError::retry_later(err),
|
||||||
|
(false, true) => ProcedureError::external_and_clean_poisons(err),
|
||||||
|
(false, false) => ProcedureError::external(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// High level states of the repartition procedure.
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
|
||||||
|
enum RepartitionState {
|
||||||
|
Prepare,
|
||||||
|
AllocateResources,
|
||||||
|
DispatchSubprocedures,
|
||||||
|
CollectSubprocedures,
|
||||||
|
Finalize,
|
||||||
|
Finished,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Information returned to the caller after the procedure finishes.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
|
struct RepartitionSummary {
|
||||||
|
succeeded_groups: Vec<PlanGroupId>,
|
||||||
|
failed_groups: Vec<PlanGroupId>,
|
||||||
|
#[serde(default)]
|
||||||
|
manifest_summaries: Vec<GroupManifestSummary>,
|
||||||
|
}
|
||||||
351
src/meta-srv/src/procedure/repartition/context.rs
Normal file
351
src/meta-srv/src/procedure/repartition/context.rs
Normal file
@@ -0,0 +1,351 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use api::region::RegionResponse;
|
||||||
|
use api::v1::region::{
|
||||||
|
ApplyStagedManifestRequest, PauseRequest, PublishRegionRuleRequest, RegionRequest,
|
||||||
|
RegionRequestHeader, RemapManifestRequest, ResumeRequest, StageRegionRuleRequest,
|
||||||
|
region_request,
|
||||||
|
};
|
||||||
|
use common_error::ext::BoxedError;
|
||||||
|
use common_meta::ddl::DdlContext;
|
||||||
|
use common_meta::key::TableMetadataManagerRef;
|
||||||
|
use common_meta::node_manager::NodeManagerRef;
|
||||||
|
use common_meta::peer::Peer;
|
||||||
|
use common_telemetry::{error, info};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::Value;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use crate::error::{self, Result};
|
||||||
|
|
||||||
|
pub const REMAP_MANIFEST_STATS_EXTENSION: &str = "repartition.manifest.stats";
|
||||||
|
|
||||||
|
use super::group::{GroupRollbackRecord, RepartitionGroupProcedure};
|
||||||
|
use crate::procedure::repartition::plan::PlanGroupId;
|
||||||
|
|
||||||
|
/// Track the overall manifest stage for a repartition group.
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||||
|
pub enum ManifestStatus {
|
||||||
|
#[default]
|
||||||
|
NotStarted,
|
||||||
|
Staged,
|
||||||
|
Published,
|
||||||
|
Discarded,
|
||||||
|
Skipped,
|
||||||
|
Failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Per-group status record that is collected by the top-level procedure.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
|
pub struct GroupManifestSummary {
|
||||||
|
pub group_id: PlanGroupId,
|
||||||
|
pub status: ManifestStatus,
|
||||||
|
pub staged_region_count: u64,
|
||||||
|
pub stats: Option<Value>,
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Shared context that allows group procedures to interact with metadata and
|
||||||
|
/// datanodes. It also aggregates per-group manifest summaries.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct RepartitionContext {
|
||||||
|
pub table_metadata_manager: TableMetadataManagerRef,
|
||||||
|
pub node_manager: NodeManagerRef,
|
||||||
|
manifest_records: Arc<Mutex<HashMap<PlanGroupId, GroupManifestSummary>>>,
|
||||||
|
rollback_records: Arc<Mutex<HashMap<PlanGroupId, GroupRollbackRecord>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RepartitionContext {
|
||||||
|
pub fn new(context: &DdlContext) -> Self {
|
||||||
|
Self {
|
||||||
|
table_metadata_manager: context.table_metadata_manager.clone(),
|
||||||
|
node_manager: context.node_manager.clone(),
|
||||||
|
manifest_records: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
rollback_records: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a pause request to the region leader so that local IO is quiesced.
|
||||||
|
pub async fn pause_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
|
||||||
|
info!(
|
||||||
|
"requesting pause to datanode {} for region {}",
|
||||||
|
peer.id, region_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::Pause(PauseRequest {
|
||||||
|
region_id: region_id.as_u64(),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
datanode
|
||||||
|
.handle(request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to pause region {} on datanode {}",
|
||||||
|
region_id, peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resume a previously paused region.
|
||||||
|
pub async fn resume_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> {
|
||||||
|
info!(
|
||||||
|
"requesting resume to datanode {} for region {}",
|
||||||
|
peer.id, region_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::Resume(ResumeRequest {
|
||||||
|
region_id: region_id.as_u64(),
|
||||||
|
rule_version: String::new(),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
datanode
|
||||||
|
.handle(request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to resume region {} on datanode {}",
|
||||||
|
region_id, peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stage the provided rule version on the datanode.
|
||||||
|
pub async fn stage_region_rule_on_datanode(
|
||||||
|
&self,
|
||||||
|
peer: &Peer,
|
||||||
|
region_id: RegionId,
|
||||||
|
rule_version: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
info!(
|
||||||
|
"requesting region rule staging to datanode {} for region {}",
|
||||||
|
peer.id, region_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::StageRegionRule(
|
||||||
|
StageRegionRuleRequest {
|
||||||
|
region_id: region_id.as_u64(),
|
||||||
|
rule_version: rule_version.to_string(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
datanode
|
||||||
|
.handle(request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to stage region rule for region {} on datanode {}",
|
||||||
|
region_id, peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish the staged rule version to make it active.
|
||||||
|
pub async fn publish_region_rule_on_datanode(
|
||||||
|
&self,
|
||||||
|
peer: &Peer,
|
||||||
|
region_id: RegionId,
|
||||||
|
rule_version: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
info!(
|
||||||
|
"requesting region rule publish to datanode {} for region {}",
|
||||||
|
peer.id, region_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::PublishRegionRule(
|
||||||
|
PublishRegionRuleRequest {
|
||||||
|
region_id: region_id.as_u64(),
|
||||||
|
rule_version: rule_version.to_string(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
datanode
|
||||||
|
.handle(request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to publish region rule for region {} on datanode {}",
|
||||||
|
region_id, peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drop the staged rule version during rollback.
|
||||||
|
pub async fn clear_region_rule_stage_on_datanode(
|
||||||
|
&self,
|
||||||
|
peer: &Peer,
|
||||||
|
region_id: RegionId,
|
||||||
|
) -> Result<()> {
|
||||||
|
info!(
|
||||||
|
"requesting region rule stage clear to datanode {} for region {}",
|
||||||
|
peer.id, region_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::StageRegionRule(
|
||||||
|
StageRegionRuleRequest {
|
||||||
|
region_id: region_id.as_u64(),
|
||||||
|
rule_version: String::new(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
datanode
|
||||||
|
.handle(request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to clear staged region rule for region {} on datanode {}",
|
||||||
|
region_id, peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Instruct the datanode to remap manifests for this group.
|
||||||
|
pub async fn remap_manifests_on_datanode(
|
||||||
|
&self,
|
||||||
|
peer: &Peer,
|
||||||
|
manifest_request: RemapManifestRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
let table_id = manifest_request.table_id;
|
||||||
|
let group_id = manifest_request.group_id.clone();
|
||||||
|
info!(
|
||||||
|
"requesting manifest remap to datanode {} for table {} in group {}",
|
||||||
|
peer.id, table_id, group_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let region_request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::RemapManifest(manifest_request)),
|
||||||
|
};
|
||||||
|
let response = datanode
|
||||||
|
.handle(region_request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to remap manifests for group {} on datanode {}",
|
||||||
|
group_id, peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish or discard staged manifests.
|
||||||
|
pub async fn apply_staged_manifests_on_datanode(
|
||||||
|
&self,
|
||||||
|
peer: &Peer,
|
||||||
|
manifest_request: ApplyStagedManifestRequest,
|
||||||
|
) -> Result<RegionResponse> {
|
||||||
|
let publish = manifest_request.publish;
|
||||||
|
let table_id = manifest_request.table_id;
|
||||||
|
let group_id = manifest_request.group_id.clone();
|
||||||
|
info!(
|
||||||
|
"requesting manifest {} on datanode {} for table {} in group {}",
|
||||||
|
if publish { "publish" } else { "discard" },
|
||||||
|
peer.id,
|
||||||
|
table_id,
|
||||||
|
group_id
|
||||||
|
);
|
||||||
|
let datanode = self.node_manager.datanode(peer).await;
|
||||||
|
let region_request = RegionRequest {
|
||||||
|
header: Some(RegionRequestHeader::default()),
|
||||||
|
body: Some(region_request::Body::ApplyStagedManifest(manifest_request)),
|
||||||
|
};
|
||||||
|
let response = datanode
|
||||||
|
.handle(region_request)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(error::RetryLaterWithSourceSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"failed to {} staged manifests for group {} on datanode {}",
|
||||||
|
if publish { "publish" } else { "discard" },
|
||||||
|
group_id,
|
||||||
|
peer.id
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store the latest manifest summary for a group.
|
||||||
|
pub fn record_manifest_summary(&self, summary: GroupManifestSummary) {
|
||||||
|
let mut records = self.manifest_records.lock().unwrap();
|
||||||
|
records.insert(summary.group_id, summary);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_group_success(&self, record: GroupRollbackRecord) {
|
||||||
|
let mut records = self.rollback_records.lock().unwrap();
|
||||||
|
let group_id = record.group_id;
|
||||||
|
records.insert(group_id, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn rollback_registered_groups(&self) -> Result<()> {
|
||||||
|
let records: Vec<GroupRollbackRecord> = {
|
||||||
|
let mut map = self.rollback_records.lock().unwrap();
|
||||||
|
map.drain().map(|(_, record)| record).collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut first_err: Option<error::Error> = None;
|
||||||
|
for record in records {
|
||||||
|
let group_id = record.group_id;
|
||||||
|
if let Err(err) =
|
||||||
|
RepartitionGroupProcedure::execute_rollback(self.clone(), record).await
|
||||||
|
{
|
||||||
|
error!(err; "repartition: rollback of group {:?} failed", group_id);
|
||||||
|
if first_err.is_none() {
|
||||||
|
first_err = Some(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(err) = first_err {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear_group_records(&self) {
|
||||||
|
self.rollback_records.lock().unwrap().clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collect all manifest summaries recorded so far.
|
||||||
|
pub fn manifest_summaries(&self) -> Vec<GroupManifestSummary> {
|
||||||
|
let records = self.manifest_records.lock().unwrap();
|
||||||
|
records.values().cloned().collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
1406
src/meta-srv/src/procedure/repartition/group.rs
Normal file
1406
src/meta-srv/src/procedure/repartition/group.rs
Normal file
File diff suppressed because it is too large
Load Diff
95
src/meta-srv/src/procedure/repartition/plan.rs
Normal file
95
src/meta-srv/src/procedure/repartition/plan.rs
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use common_meta::key::table_route::PhysicalTableRouteValue;
|
||||||
|
use partition::expr::PartitionExpr;
|
||||||
|
use partition::subtask::RepartitionSubtask;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use store_api::storage::{RegionId, TableId};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// Identifier of a plan group.
|
||||||
|
pub type PlanGroupId = Uuid;
|
||||||
|
|
||||||
|
/// Logical description of the repartition plan.
|
||||||
|
///
|
||||||
|
/// The plan is persisted by the procedure framework so it must remain
|
||||||
|
/// serializable/deserializable across versions.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct RepartitionPlan {
|
||||||
|
pub table_id: TableId,
|
||||||
|
pub entries: Vec<PlanEntry>,
|
||||||
|
pub resource_demand: ResourceDemand,
|
||||||
|
pub route_snapshot: PhysicalTableRouteValue,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RepartitionPlan {
|
||||||
|
pub fn new(
|
||||||
|
table_id: TableId,
|
||||||
|
entries: Vec<PlanEntry>,
|
||||||
|
resource_demand: ResourceDemand,
|
||||||
|
route_snapshot: PhysicalTableRouteValue,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
table_id,
|
||||||
|
entries,
|
||||||
|
resource_demand,
|
||||||
|
route_snapshot,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct PlanEntry {
|
||||||
|
pub group_id: PlanGroupId,
|
||||||
|
pub subtask: RepartitionSubtask,
|
||||||
|
pub sources: Vec<RegionDescriptor>,
|
||||||
|
pub targets: Vec<RegionDescriptor>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PlanEntry {
|
||||||
|
/// Construct a plan entry consisting of the connected component returned by
|
||||||
|
/// the planner.
|
||||||
|
pub fn new(
|
||||||
|
group_id: PlanGroupId,
|
||||||
|
subtask: RepartitionSubtask,
|
||||||
|
sources: Vec<RegionDescriptor>,
|
||||||
|
targets: Vec<RegionDescriptor>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
group_id,
|
||||||
|
subtask,
|
||||||
|
sources,
|
||||||
|
targets,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Metadata describing a region involved in the plan.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct RegionDescriptor {
|
||||||
|
pub region_id: Option<RegionId>,
|
||||||
|
pub partition_expr: PartitionExpr,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Auxiliary information about resources required to execute the plan.
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||||
|
pub struct ResourceDemand {}
|
||||||
|
|
||||||
|
impl ResourceDemand {
|
||||||
|
pub fn from_plan_entries(_entries: &[PlanEntry]) -> Self {
|
||||||
|
// placeholder
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,12 +14,14 @@
|
|||||||
|
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::expr::PartitionExpr;
|
use crate::expr::PartitionExpr;
|
||||||
use crate::overlap::associate_from_to;
|
use crate::overlap::associate_from_to;
|
||||||
|
|
||||||
/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
|
/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub struct RepartitionSubtask {
|
pub struct RepartitionSubtask {
|
||||||
pub from_expr_indices: Vec<usize>,
|
pub from_expr_indices: Vec<usize>,
|
||||||
pub to_expr_indices: Vec<usize>,
|
pub to_expr_indices: Vec<usize>,
|
||||||
|
|||||||
@@ -179,6 +179,30 @@ impl RegionRequest {
|
|||||||
reason: "ListMetadata request should be handled separately by RegionServer",
|
reason: "ListMetadata request should be handled separately by RegionServer",
|
||||||
}
|
}
|
||||||
.fail(),
|
.fail(),
|
||||||
|
region_request::Body::Pause(_) => UnexpectedSnafu {
|
||||||
|
reason: "Pause request should be handled separately by RegionServer",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
region_request::Body::Resume(_) => UnexpectedSnafu {
|
||||||
|
reason: "Resume request should be handled separately by RegionServer",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
region_request::Body::StageRegionRule(_) => UnexpectedSnafu {
|
||||||
|
reason: "StageRegionRule request should be handled separately by RegionServer",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
region_request::Body::PublishRegionRule(_) => UnexpectedSnafu {
|
||||||
|
reason: "PublishRegionRule request should be handled separately by RegionServer",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
region_request::Body::RemapManifest(_) => UnexpectedSnafu {
|
||||||
|
reason: "RemapManifest request should be handled separately by RegionServer",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
region_request::Body::ApplyStagedManifest(_) => UnexpectedSnafu {
|
||||||
|
reason: "ApplyStagedManifest request should be handled separately by RegionServer",
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user