mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 13:00:40 +00:00
fix: avoid stale route update during repartition allocation (#8115)
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -148,9 +148,6 @@ impl ExecutePlan {
|
||||
let region_number_and_partition_exprs =
|
||||
AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
|
||||
let table_info_value = ctx.get_table_info_value().await?;
|
||||
let table_route_value = ctx.get_table_route_value().await?;
|
||||
// Safety: it is physical table route value.
|
||||
let region_routes = table_route_value.region_routes().unwrap();
|
||||
let new_allocated_region_routes = ctx
|
||||
.region_routes_allocator
|
||||
.allocate(
|
||||
@@ -205,6 +202,11 @@ impl ExecutePlan {
|
||||
// Updates the table routes.
|
||||
let table_lock = TableLock::Write(table_id).into();
|
||||
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
|
||||
// MUST refresh the table route value after acquiring the lock to avoid lost update.
|
||||
// Otherwise, the new allocated regions might be overridden by concurrent repartition procedures.
|
||||
let table_route_value = ctx.get_table_route_value().await?;
|
||||
// Safety: it is physical table route value.
|
||||
let region_routes = table_route_value.region_routes().unwrap();
|
||||
let new_region_routes =
|
||||
AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
|
||||
ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
|
||||
@@ -397,14 +399,26 @@ impl AllocateRegion {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::datanode_table::DatanodeTableKey;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use common_meta::test_util::MockDatanodeManager;
|
||||
use common_procedure::{ContextProvider, ProcedureId, ProcedureState};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::watch;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::repartition::State;
|
||||
use crate::procedure::repartition::test_util::range_expr;
|
||||
use crate::procedure::repartition::test_util::{
|
||||
TestingEnv, current_parent_region_routes, new_parent_context, range_expr,
|
||||
test_region_wal_options,
|
||||
};
|
||||
|
||||
fn create_region_descriptor(
|
||||
table_id: TableId,
|
||||
@@ -461,6 +475,88 @@ mod tests {
|
||||
.collect()
|
||||
}
|
||||
|
||||
struct ConcurrentTableRouteUpdateProvider {
|
||||
inner: MockContextProvider,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_id: TableId,
|
||||
concurrent_region_route: RegionRoute,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ContextProvider for ConcurrentTableRouteUpdateProvider {
|
||||
async fn procedure_state(
|
||||
&self,
|
||||
procedure_id: ProcedureId,
|
||||
) -> common_procedure::Result<Option<ProcedureState>> {
|
||||
self.inner.procedure_state(procedure_id).await
|
||||
}
|
||||
|
||||
async fn procedure_state_receiver(
|
||||
&self,
|
||||
procedure_id: ProcedureId,
|
||||
) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
|
||||
self.inner.procedure_state_receiver(procedure_id).await
|
||||
}
|
||||
|
||||
async fn try_put_poison(
|
||||
&self,
|
||||
key: &common_procedure::PoisonKey,
|
||||
procedure_id: ProcedureId,
|
||||
) -> common_procedure::Result<()> {
|
||||
self.inner.try_put_poison(key, procedure_id).await
|
||||
}
|
||||
|
||||
async fn acquire_lock(
|
||||
&self,
|
||||
key: &common_procedure::StringKey,
|
||||
) -> common_procedure::local::DynamicKeyLockGuard {
|
||||
let current_table_route_value = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(self.table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let mut region_routes = current_table_route_value.region_routes().unwrap().clone();
|
||||
|
||||
if !region_routes
|
||||
.iter()
|
||||
.any(|route| route.region.id == self.concurrent_region_route.region.id)
|
||||
{
|
||||
region_routes.push(self.concurrent_region_route.clone());
|
||||
let datanode_id = current_table_route_value.region_routes().unwrap()[0]
|
||||
.leader_peer
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.id;
|
||||
let datanode_table_value = self
|
||||
.table_metadata_manager
|
||||
.datanode_table_manager()
|
||||
.get(&DatanodeTableKey::new(datanode_id, self.table_id))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let region_options = &datanode_table_value.region_info.region_options;
|
||||
|
||||
self.table_metadata_manager
|
||||
.update_table_route(
|
||||
self.table_id,
|
||||
datanode_table_value.region_info.clone(),
|
||||
¤t_table_route_value,
|
||||
region_routes,
|
||||
region_options,
|
||||
&self.region_wal_options,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
self.inner.acquire_lock(key).await
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_to_repartition_plans_no_allocation() {
|
||||
let table_id = 1024;
|
||||
@@ -617,6 +713,62 @@ mod tests {
|
||||
assert!(!result[1].1.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_plan_uses_latest_table_route_after_lock() {
|
||||
let env = TestingEnv::new();
|
||||
let table_id = 1024;
|
||||
let original_region_routes = create_current_region_routes(table_id, &[1]);
|
||||
env.create_physical_table_metadata_for_repartition(
|
||||
table_id,
|
||||
original_region_routes,
|
||||
test_region_wal_options(&[1]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let mut ctx = new_parent_context(&env, node_manager, table_id);
|
||||
ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
|
||||
group_id: Uuid::new_v4(),
|
||||
source_regions: vec![],
|
||||
target_regions: vec![create_region_descriptor(table_id, 3, "x", 0, 100)],
|
||||
allocated_region_ids: vec![RegionId::new(table_id, 3)],
|
||||
pending_deallocate_region_ids: vec![],
|
||||
transition_map: vec![],
|
||||
original_target_routes: vec![],
|
||||
}];
|
||||
let concurrent_region_route = create_current_region_routes(table_id, &[2])
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap();
|
||||
let procedure_ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(ConcurrentTableRouteUpdateProvider {
|
||||
inner: MockContextProvider::default(),
|
||||
table_metadata_manager: env.table_metadata_manager.clone(),
|
||||
table_id,
|
||||
concurrent_region_route,
|
||||
region_wal_options: test_region_wal_options(&[1, 2]),
|
||||
}),
|
||||
};
|
||||
let mut state = ExecutePlan;
|
||||
|
||||
state.next(&mut ctx, &procedure_ctx).await.unwrap();
|
||||
|
||||
let region_ids = current_parent_region_routes(&ctx)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|route| route.region.id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
region_ids,
|
||||
vec![
|
||||
RegionId::new(table_id, 1),
|
||||
RegionId::new(table_id, 2),
|
||||
RegionId::new(table_id, 3),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_allocate_region_state_backward_compatibility() {
|
||||
// Arrange
|
||||
|
||||
Reference in New Issue
Block a user