mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
feat(repartition): implement region deallocation for repartition procedure (#7522)
* feat: implement deallocate regions for repartition procedure Signed-off-by: WenyXu <wenymedia@gmail.com> * feat(metric-engine): add force flag to drop physical regions with associated logical regions Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: update table metadata after deallocating regions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update proto Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5467,7 +5467,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa0f5716556b3276317701fffa702002e7fce275#fa0f5716556b3276317701fffa702002e7fce275"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0e316b86d765e4718d6f0ca77b1ad179f222b822#0e316b86d765e4718d6f0ca77b1ad179f222b822"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
|
||||
@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa0f5716556b3276317701fffa702002e7fce275" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -120,7 +120,13 @@ impl State for DropDatabaseExecutor {
|
||||
.await?;
|
||||
executor.invalidate_table_cache(ddl_ctx).await?;
|
||||
executor
|
||||
.on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
|
||||
.on_drop_regions(
|
||||
&ddl_ctx.node_manager,
|
||||
&ddl_ctx.leader_region_registry,
|
||||
&self.physical_region_routes,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
info!("Table: {}({}) is dropped", self.table_name, self.table_id);
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod executor;
|
||||
pub mod executor;
|
||||
mod metadata;
|
||||
|
||||
use std::collections::HashMap;
|
||||
@@ -156,7 +156,13 @@ impl DropTableProcedure {
|
||||
|
||||
pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
|
||||
self.executor
|
||||
.on_drop_regions(&self.context, &self.data.physical_region_routes, false)
|
||||
.on_drop_regions(
|
||||
&self.context.node_manager,
|
||||
&self.context.leader_region_registry,
|
||||
&self.data.physical_region_routes,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
self.data.state = DropTableState::DeleteTombstone;
|
||||
Ok(Status::executing(true))
|
||||
|
||||
@@ -36,6 +36,8 @@ use crate::error::{self, Result};
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::node_manager::NodeManagerRef;
|
||||
use crate::region_registry::LeaderRegionRegistryRef;
|
||||
use crate::rpc::router::{
|
||||
RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
|
||||
operating_leader_regions,
|
||||
@@ -212,16 +214,18 @@ impl DropTableExecutor {
|
||||
/// Drops region on datanode.
|
||||
pub async fn on_drop_regions(
|
||||
&self,
|
||||
ctx: &DdlContext,
|
||||
node_manager: &NodeManagerRef,
|
||||
leader_region_registry: &LeaderRegionRegistryRef,
|
||||
region_routes: &[RegionRoute],
|
||||
fast_path: bool,
|
||||
force: bool,
|
||||
) -> Result<()> {
|
||||
// Drops leader regions on datanodes.
|
||||
let leaders = find_leaders(region_routes);
|
||||
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
|
||||
let table_id = self.table_id;
|
||||
for datanode in leaders {
|
||||
let requester = ctx.node_manager.datanode(&datanode).await;
|
||||
let requester = node_manager.datanode(&datanode).await;
|
||||
let regions = find_leader_regions(region_routes, &datanode);
|
||||
let region_ids = regions
|
||||
.iter()
|
||||
@@ -238,6 +242,7 @@ impl DropTableExecutor {
|
||||
body: Some(region_request::Body::Drop(PbDropRegionRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
fast_path,
|
||||
force,
|
||||
})),
|
||||
};
|
||||
let datanode = datanode.clone();
|
||||
@@ -262,7 +267,7 @@ impl DropTableExecutor {
|
||||
let followers = find_followers(region_routes);
|
||||
let mut close_region_tasks = Vec::with_capacity(followers.len());
|
||||
for datanode in followers {
|
||||
let requester = ctx.node_manager.datanode(&datanode).await;
|
||||
let requester = node_manager.datanode(&datanode).await;
|
||||
let regions = find_follower_regions(region_routes, &datanode);
|
||||
let region_ids = regions
|
||||
.iter()
|
||||
@@ -307,8 +312,7 @@ impl DropTableExecutor {
|
||||
|
||||
// Deletes the leader region from registry.
|
||||
let region_ids = operating_leader_regions(region_routes);
|
||||
ctx.leader_region_registry
|
||||
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
|
||||
leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1641,7 +1641,10 @@ mod tests {
|
||||
let response = mock_region_server
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1739,7 +1742,10 @@ mod tests {
|
||||
mock_region_server
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
@@ -110,11 +110,11 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Another procedure is opening the region: {} on peer: {}",
|
||||
"Another procedure is operating the region: {} on peer: {}",
|
||||
region_id,
|
||||
peer_id
|
||||
))]
|
||||
RegionOpeningRace {
|
||||
RegionOperatingRace {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
peer_id: DatanodeId,
|
||||
@@ -1059,6 +1059,15 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deallocate regions for table: {}", table_id))]
|
||||
DeallocateRegions {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
table_id: TableId,
|
||||
#[snafu(source)]
|
||||
source: common_meta::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@@ -1154,7 +1163,7 @@ impl ErrorExt for Error {
|
||||
| Error::InvalidUtf8Value { .. }
|
||||
| Error::UnexpectedInstructionReply { .. }
|
||||
| Error::Unexpected { .. }
|
||||
| Error::RegionOpeningRace { .. }
|
||||
| Error::RegionOperatingRace { .. }
|
||||
| Error::RegionRouteNotFound { .. }
|
||||
| Error::MigrationAbort { .. }
|
||||
| Error::MigrationRunning { .. }
|
||||
@@ -1206,6 +1215,7 @@ impl ErrorExt for Error {
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
|
||||
Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
|
||||
Error::DeallocateRegions { source, .. } => source.status_code(),
|
||||
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
|
||||
@@ -129,7 +129,7 @@ impl OpenCandidateRegion {
|
||||
let guard = ctx
|
||||
.opening_region_keeper
|
||||
.register(candidate.id, *region_id)
|
||||
.context(error::RegionOpeningRaceSnafu {
|
||||
.context(error::RegionOperatingRaceSnafu {
|
||||
peer_id: candidate.id,
|
||||
region_id: *region_id,
|
||||
})?;
|
||||
@@ -302,7 +302,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::RegionOpeningRace { .. });
|
||||
assert_matches!(err, Error::RegionOperatingRace { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
|
||||
@@ -20,18 +20,29 @@ pub mod group;
|
||||
pub mod plan;
|
||||
pub mod repartition_end;
|
||||
pub mod repartition_start;
|
||||
pub mod utils;
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
|
||||
use common_meta::region_registry::LeaderRegionRegistryRef;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::TableId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::repartition::plan::RepartitionPlanEntry;
|
||||
use crate::procedure::repartition::utils::get_datanode_table_value;
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -46,14 +57,115 @@ pub struct PersistentContext {
|
||||
pub plans: Vec<RepartitionPlanEntry>,
|
||||
}
|
||||
|
||||
pub struct VolatileContext {
|
||||
pub allocating_regions: Vec<OperatingRegionGuard>,
|
||||
}
|
||||
|
||||
pub struct Context {
|
||||
pub persistent_ctx: PersistentContext,
|
||||
pub volatile_ctx: VolatileContext,
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
pub memory_region_keeper: MemoryRegionKeeperRef,
|
||||
pub node_manager: NodeManagerRef,
|
||||
pub leader_region_registry: LeaderRegionRegistryRef,
|
||||
pub mailbox: MailboxRef,
|
||||
pub server_addr: String,
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
/// Retrieves the table route value for the given table id.
|
||||
///
|
||||
/// Retry:
|
||||
/// - Failed to retrieve the metadata of table.
|
||||
///
|
||||
/// Abort:
|
||||
/// - Table route not found.
|
||||
pub async fn get_table_route_value(
|
||||
&self,
|
||||
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
|
||||
let table_id = self.persistent_ctx.table_id;
|
||||
let table_route_value = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get table route for table: {}", table_id),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
Ok(table_route_value)
|
||||
}
|
||||
|
||||
/// Updates the table route.
|
||||
///
|
||||
/// Retry:
|
||||
/// - Failed to retrieve the metadata of datanode table.
|
||||
///
|
||||
/// Abort:
|
||||
/// - Table route not found.
|
||||
/// - Failed to update the table route.
|
||||
pub async fn update_table_route(
|
||||
&self,
|
||||
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
new_region_routes: Vec<RegionRoute>,
|
||||
) -> Result<()> {
|
||||
let table_id = self.persistent_ctx.table_id;
|
||||
if new_region_routes.is_empty() {
|
||||
return error::UnexpectedSnafu {
|
||||
violated: format!("new_region_routes is empty for table: {}", table_id),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let datanode_id = new_region_routes
|
||||
.first()
|
||||
.unwrap()
|
||||
.leader_peer
|
||||
.as_ref()
|
||||
.context(error::NoLeaderSnafu)?
|
||||
.id;
|
||||
let datanode_table_value =
|
||||
get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
|
||||
|
||||
let RegionInfo {
|
||||
region_options,
|
||||
region_wal_options,
|
||||
..
|
||||
} = &datanode_table_value.region_info;
|
||||
self.table_metadata_manager
|
||||
.update_table_route(
|
||||
table_id,
|
||||
datanode_table_value.region_info.clone(),
|
||||
current_table_route_value,
|
||||
new_region_routes,
|
||||
region_options,
|
||||
region_wal_options,
|
||||
)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
}
|
||||
|
||||
/// Broadcasts the invalidate table cache message.
|
||||
pub async fn invalidate_table_cache(&self) -> Result<()> {
|
||||
let table_id = self.persistent_ctx.table_id;
|
||||
let subject = format!(
|
||||
"Invalidate table cache for repartition table, table: {}",
|
||||
table_id,
|
||||
);
|
||||
let ctx = common_meta::cache_invalidator::Context {
|
||||
subject: Some(subject),
|
||||
};
|
||||
let _ = self
|
||||
.cache_invalidator
|
||||
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde(tag = "repartition_state")]
|
||||
pub(crate) trait State: Sync + Send + Debug {
|
||||
|
||||
@@ -13,11 +13,23 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use common_meta::ddl::drop_table::executor::DropTableExecutor;
|
||||
use common_meta::lock_key::TableLock;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::region_registry::LeaderRegionRegistryRef;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::{info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::table_name::TableName;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::repartition::group::region_routes;
|
||||
use crate::procedure::repartition::repartition_end::RepartitionEnd;
|
||||
use crate::procedure::repartition::{Context, State};
|
||||
|
||||
@@ -30,7 +42,7 @@ impl State for DeallocateRegion {
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
_procedure_ctx: &ProcedureContext,
|
||||
procedure_ctx: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
let region_to_deallocate = ctx
|
||||
.persistent_ctx
|
||||
@@ -42,11 +54,185 @@ impl State for DeallocateRegion {
|
||||
return Ok((Box::new(RepartitionEnd), Status::done()));
|
||||
}
|
||||
|
||||
// TODO(weny): deallocate regions.
|
||||
todo!()
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let pending_deallocate_region_ids = ctx
|
||||
.persistent_ctx
|
||||
.plans
|
||||
.iter()
|
||||
.flat_map(|p| p.pending_deallocate_region_ids.iter())
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
info!(
|
||||
"Deallocating regions: {:?} for table: {} during repartition procedure",
|
||||
pending_deallocate_region_ids, table_id
|
||||
);
|
||||
|
||||
let table_lock = TableLock::Write(table_id).into();
|
||||
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
|
||||
let table_route_value = ctx.get_table_route_value().await?;
|
||||
let deallocating_regions = {
|
||||
let region_routes = region_routes(table_id, &table_route_value)?;
|
||||
Self::filter_deallocatable_region_routes(
|
||||
table_id,
|
||||
region_routes,
|
||||
&pending_deallocate_region_ids,
|
||||
)
|
||||
};
|
||||
|
||||
let table_ref = TableReference::full(
|
||||
&ctx.persistent_ctx.catalog_name,
|
||||
&ctx.persistent_ctx.schema_name,
|
||||
&ctx.persistent_ctx.table_name,
|
||||
);
|
||||
// Deallocates the regions on datanodes.
|
||||
Self::deallocate_regions(
|
||||
&ctx.node_manager,
|
||||
&ctx.leader_region_registry,
|
||||
table_ref.into(),
|
||||
table_id,
|
||||
&deallocating_regions,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Safety: the table route must be physical, so we can safely unwrap the region routes.
|
||||
let region_routes = table_route_value.region_routes().unwrap();
|
||||
let new_region_routes =
|
||||
Self::generate_region_routes(region_routes, &pending_deallocate_region_ids);
|
||||
ctx.update_table_route(&table_route_value, new_region_routes)
|
||||
.await?;
|
||||
ctx.invalidate_table_cache().await?;
|
||||
|
||||
Ok((Box::new(RepartitionEnd), Status::executing(false)))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl DeallocateRegion {
|
||||
#[allow(dead_code)]
|
||||
async fn deallocate_regions(
|
||||
node_manager: &NodeManagerRef,
|
||||
leader_region_registry: &LeaderRegionRegistryRef,
|
||||
table: TableName,
|
||||
table_id: TableId,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<()> {
|
||||
let executor = DropTableExecutor::new(table, table_id, false);
|
||||
// Note: Consider adding an option to forcefully drop the physical region,
|
||||
// which would involve dropping all logical regions associated with that physical region.
|
||||
executor
|
||||
.on_drop_regions(
|
||||
node_manager,
|
||||
leader_region_registry,
|
||||
region_routes,
|
||||
false,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.context(error::DeallocateRegionsSnafu { table_id })?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn filter_deallocatable_region_routes(
|
||||
table_id: TableId,
|
||||
region_routes: &[RegionRoute],
|
||||
pending_deallocate_region_ids: &HashSet<RegionId>,
|
||||
) -> Vec<RegionRoute> {
|
||||
let region_routes_map = region_routes
|
||||
.iter()
|
||||
.map(|r| (r.region.id, r.clone()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
pending_deallocate_region_ids
|
||||
.iter()
|
||||
.filter_map(|region_id| match region_routes_map.get(region_id) {
|
||||
Some(region_route) => Some(region_route.clone()),
|
||||
None => {
|
||||
warn!(
|
||||
"Region {} not found during deallocate regions for table {:?}",
|
||||
region_id, table_id
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn generate_region_routes(
|
||||
region_routes: &[RegionRoute],
|
||||
pending_deallocate_region_ids: &HashSet<RegionId>,
|
||||
) -> Vec<RegionRoute> {
|
||||
// Safety: the table route must be physical, so we can safely unwrap the region routes.
|
||||
region_routes
|
||||
.iter()
|
||||
.filter(|r| !pending_deallocate_region_ids.contains(&r.region.id))
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
|
||||
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
|
||||
|
||||
fn test_region_routes(table_id: TableId) -> Vec<RegionRoute> {
|
||||
vec![
|
||||
RegionRoute {
|
||||
region: Region {
|
||||
id: RegionId::new(table_id, 1),
|
||||
..Default::default()
|
||||
},
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
..Default::default()
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region {
|
||||
id: RegionId::new(table_id, 2),
|
||||
..Default::default()
|
||||
},
|
||||
leader_peer: Some(Peer::empty(2)),
|
||||
..Default::default()
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_deallocatable_region_routes() {
|
||||
let table_id = 1024;
|
||||
let region_routes = test_region_routes(table_id);
|
||||
let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
|
||||
let deallocatable_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
|
||||
table_id,
|
||||
®ion_routes,
|
||||
&pending_deallocate_region_ids,
|
||||
);
|
||||
assert_eq!(deallocatable_region_routes.len(), 1);
|
||||
assert_eq!(
|
||||
deallocatable_region_routes[0].region.id,
|
||||
RegionId::new(table_id, 1)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_generate_region_routes() {
|
||||
let table_id = 1024;
|
||||
let region_routes = test_region_routes(table_id);
|
||||
let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
|
||||
let new_region_routes = DeallocateRegion::generate_region_routes(
|
||||
®ion_routes,
|
||||
&pending_deallocate_region_ids,
|
||||
);
|
||||
assert_eq!(new_region_routes.len(), 1);
|
||||
assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use std::time::Duration;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
|
||||
use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
|
||||
@@ -48,6 +48,7 @@ use uuid::Uuid;
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
|
||||
use crate::procedure::repartition::plan::RegionDescriptor;
|
||||
use crate::procedure::repartition::utils::get_datanode_table_value;
|
||||
use crate::procedure::repartition::{self};
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
@@ -261,24 +262,7 @@ impl Context {
|
||||
table_id: TableId,
|
||||
datanode_id: u64,
|
||||
) -> Result<DatanodeTableValue> {
|
||||
let datanode_table_value = self
|
||||
.table_metadata_manager
|
||||
.datanode_table_manager()
|
||||
.get(&DatanodeTableKey {
|
||||
datanode_id,
|
||||
table_id,
|
||||
})
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get DatanodeTable: {table_id}"),
|
||||
})?
|
||||
.context(error::DatanodeTableNotFoundSnafu {
|
||||
table_id,
|
||||
datanode_id,
|
||||
})?;
|
||||
Ok(datanode_table_value)
|
||||
get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
|
||||
}
|
||||
|
||||
/// Broadcasts the invalidate table cache message.
|
||||
|
||||
@@ -19,7 +19,7 @@ use common_procedure::{Context as ProcedureContext, Status};
|
||||
use partition::expr::PartitionExpr;
|
||||
use partition::subtask::{self, RepartitionSubtask};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -51,12 +51,22 @@ impl State for RepartitionStart {
|
||||
ctx: &mut Context,
|
||||
_: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
let (_, table_route) = ctx
|
||||
let (physical_table_id, table_route) = ctx
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_route(ctx.persistent_ctx.table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
ensure!(
|
||||
physical_table_id == table_id,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Repartition only works on the physical table, but got logical table: {}, physical table id: {}",
|
||||
table_id, physical_table_id
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
|
||||
|
||||
|
||||
49
src/meta-srv/src/procedure/repartition/utils.rs
Normal file
49
src/meta-srv/src/procedure/repartition/utils.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::TableId;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
/// Returns the `datanode_table_value`
|
||||
///
|
||||
/// Retry:
|
||||
/// - Failed to retrieve the metadata of datanode table.
|
||||
pub async fn get_datanode_table_value(
|
||||
table_metadata_manager: &TableMetadataManagerRef,
|
||||
table_id: TableId,
|
||||
datanode_id: u64,
|
||||
) -> Result<DatanodeTableValue> {
|
||||
let datanode_table_value = table_metadata_manager
|
||||
.datanode_table_manager()
|
||||
.get(&DatanodeTableKey {
|
||||
datanode_id,
|
||||
table_id,
|
||||
})
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get DatanodeTable: {table_id}"),
|
||||
})?
|
||||
.context(error::DatanodeTableNotFoundSnafu {
|
||||
table_id,
|
||||
datanode_id,
|
||||
})?;
|
||||
Ok(datanode_table_value)
|
||||
}
|
||||
@@ -564,6 +564,7 @@ struct MetricEngineInner {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_telemetry::info;
|
||||
@@ -573,12 +574,13 @@ mod test {
|
||||
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
|
||||
use store_api::mito_engine_options::WAL_OPTIONS_KEY;
|
||||
use store_api::region_request::{
|
||||
PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
|
||||
PathType, RegionCloseRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest,
|
||||
RegionRequest,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::maybe_skip_kafka_log_store_integration_test;
|
||||
use crate::test_util::TestEnv;
|
||||
use crate::test_util::{TestEnv, create_logical_region_request};
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_open_regions() {
|
||||
@@ -882,4 +884,68 @@ mod test {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_region() {
|
||||
let env = TestEnv::new().await;
|
||||
let engine = env.metric();
|
||||
let physical_region_id1 = RegionId::new(1024, 0);
|
||||
let logical_region_id1 = RegionId::new(1025, 0);
|
||||
env.create_physical_region(physical_region_id1, "/test_dir1", vec![])
|
||||
.await;
|
||||
let region_create_request1 =
|
||||
create_logical_region_request(&["job"], physical_region_id1, "logical1");
|
||||
engine
|
||||
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![(
|
||||
logical_region_id1,
|
||||
region_create_request1,
|
||||
)]))
|
||||
.await
|
||||
.unwrap();
|
||||
let err = engine
|
||||
.handle_request(
|
||||
physical_region_id1,
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(
|
||||
err.as_any().downcast_ref::<Error>().unwrap(),
|
||||
&Error::PhysicalRegionBusy { .. }
|
||||
);
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
physical_region_id1,
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: true,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
engine
|
||||
.inner
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_region_states()
|
||||
.get(&physical_region_id1)
|
||||
.is_none()
|
||||
);
|
||||
assert!(
|
||||
engine
|
||||
.inner
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.logical_regions()
|
||||
.get(&logical_region_id1)
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Drop a metric region
|
||||
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
|
||||
@@ -34,6 +35,7 @@ impl MetricEngineInner {
|
||||
) -> Result<AffectedRows> {
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let fast_path = req.fast_path;
|
||||
let force = req.force;
|
||||
|
||||
// enclose the guard in a block to prevent the guard from polluting the async context
|
||||
let (is_physical_region, is_physical_region_busy) = {
|
||||
@@ -53,14 +55,16 @@ impl MetricEngineInner {
|
||||
|
||||
if is_physical_region {
|
||||
// check if there is no logical region relates to this physical region
|
||||
if is_physical_region_busy && !fast_path {
|
||||
if is_physical_region_busy && !force {
|
||||
// reject if there is any present logical region
|
||||
return Err(PhysicalRegionBusySnafu {
|
||||
region_id: data_region_id,
|
||||
}
|
||||
.build());
|
||||
}
|
||||
|
||||
if is_physical_region_busy && force {
|
||||
info!("Dropping physical region {} with force", data_region_id);
|
||||
}
|
||||
return self.drop_physical_region(data_region_id).await;
|
||||
}
|
||||
|
||||
@@ -102,14 +106,20 @@ impl MetricEngineInner {
|
||||
self.mito
|
||||
.handle_request(
|
||||
data_region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
|
||||
self.mito
|
||||
.handle_request(
|
||||
metadata_region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
|
||||
|
||||
@@ -71,7 +71,10 @@ async fn test_engine_drop_region_with_format(flat_format: bool) {
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
@@ -105,7 +108,10 @@ async fn test_engine_drop_region_with_format(flat_format: bool) {
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -249,7 +255,10 @@ async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool)
|
||||
engine
|
||||
.handle_request(
|
||||
custom_region_id,
|
||||
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -271,6 +271,7 @@ fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)>
|
||||
region_id,
|
||||
RegionDropRequest {
|
||||
fast_path: drop.fast_path,
|
||||
force: drop.force,
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -534,7 +535,13 @@ impl RegionCreateRequest {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegionDropRequest {
|
||||
/// Enables fast-path drop optimizations for logical regions.
|
||||
/// Only applicable to the Metric Engine; ignored by others.
|
||||
pub fast_path: bool,
|
||||
|
||||
/// Forces the drop of a physical region and all its associated logical regions.
|
||||
/// Only relevant for physical regions managed by the Metric Engine.
|
||||
pub force: bool,
|
||||
}
|
||||
|
||||
/// Open region request.
|
||||
|
||||
Reference in New Issue
Block a user