mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 07:20:41 +00:00
fix: deregister failure detectors on rollback and improve timeout handling (#7212)
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -685,6 +685,9 @@ impl RegionMigrationProcedure {
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
|
||||
})?;
|
||||
self.context
|
||||
.deregister_failure_detectors_for_candidate_region()
|
||||
.await;
|
||||
}
|
||||
|
||||
self.context.register_failure_detectors().await;
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::ops::Div;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
@@ -139,12 +140,15 @@ impl OpenCandidateRegion {
|
||||
input: open_instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let operation_timeout =
|
||||
ctx.next_operation_timeout()
|
||||
.context(error::ExceededDeadlineSnafu {
|
||||
operation: "Open candidate region",
|
||||
})?;
|
||||
let operation_timeout = operation_timeout.div(2).max(OPEN_CANDIDATE_REGION_TIMEOUT);
|
||||
let ch = Channel::Datanode(candidate.id);
|
||||
let now = Instant::now();
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
|
||||
.await?;
|
||||
let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
|
||||
|
||||
match receiver.await {
|
||||
Ok(msg) => {
|
||||
|
||||
@@ -401,6 +401,7 @@ impl RegionOpener {
|
||||
config: &MitoConfig,
|
||||
wal: &Wal<S>,
|
||||
) -> Result<Option<MitoRegionRef>> {
|
||||
let now = Instant::now();
|
||||
let region_options = self.options.as_ref().unwrap().clone();
|
||||
|
||||
let region_manifest_options = Self::manifest_options(
|
||||
@@ -492,8 +493,12 @@ impl RegionOpener {
|
||||
.unwrap_or_default()
|
||||
.max(flushed_entry_id);
|
||||
info!(
|
||||
"Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
|
||||
replay_from_entry_id, region_id, manifest.manifest_version, flushed_entry_id
|
||||
"Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
|
||||
replay_from_entry_id,
|
||||
region_id,
|
||||
manifest.manifest_version,
|
||||
flushed_entry_id,
|
||||
now.elapsed()
|
||||
);
|
||||
replay_memtable(
|
||||
&provider,
|
||||
@@ -515,8 +520,11 @@ impl RegionOpener {
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
|
||||
region_id, manifest.manifest_version, flushed_entry_id
|
||||
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
|
||||
region_id,
|
||||
manifest.manifest_version,
|
||||
flushed_entry_id,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
0
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Handling open request.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::info;
|
||||
use object_store::util::join_path;
|
||||
@@ -119,6 +120,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
};
|
||||
|
||||
let now = Instant::now();
|
||||
let regions = self.regions.clone();
|
||||
let wal = self.wal.clone();
|
||||
let config = self.config.clone();
|
||||
@@ -129,7 +131,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
common_runtime::spawn_global(async move {
|
||||
match opener.open(&config, &wal).await {
|
||||
Ok(region) => {
|
||||
info!("Region {} is opened, worker: {}", region_id, worker_id);
|
||||
info!(
|
||||
"Region {} is opened, worker: {}, elapsed: {:?}",
|
||||
region_id,
|
||||
worker_id,
|
||||
now.elapsed()
|
||||
);
|
||||
region_count.inc();
|
||||
|
||||
// Insert the Region into the RegionMap.
|
||||
|
||||
Reference in New Issue
Block a user