mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
remove some levels of indentation
This commit is contained in:
@@ -1820,89 +1820,91 @@ impl Timeline {
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
let retrying = {
|
||||
let self_clone = self.clone();
|
||||
let try_once = |attempt: usize| {
|
||||
let background_ctx = &background_ctx;
|
||||
let self_ref = &self;
|
||||
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
||||
async move {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
attempt += 1;
|
||||
let try_once_res = async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||
&background_ctx,
|
||||
&cancel,
|
||||
);
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||
background_ctx,
|
||||
&cancel,
|
||||
);
|
||||
|
||||
use crate::metrics::initial_logical_size::StartCircumstances;
|
||||
let (_maybe_permit, circumstances) = tokio::select! {
|
||||
res = wait_for_permit => {
|
||||
match res {
|
||||
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
|
||||
Err(RateLimitError::Cancelled) => {
|
||||
return Err(BackgroundCalculationError::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
() = skip_concurrency_limiter.cancelled() => {
|
||||
// Some action that is part of a end user interaction requested logical size
|
||||
// => break out of the rate limit
|
||||
// TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
|
||||
// but then again what happens if they cancel; also, we should just be using
|
||||
// one runtime across the entire process, so, let's leave this for now.
|
||||
(None, StartCircumstances::SkippedConcurrencyLimiter)
|
||||
}
|
||||
};
|
||||
|
||||
let metrics_guard = if attempt == 1 {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
|
||||
} else {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
|
||||
};
|
||||
|
||||
match self_clone
|
||||
.logical_size_calculation_task(
|
||||
initial_part_end,
|
||||
LogicalSizeCalculationCause::Initial,
|
||||
&background_ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
|
||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
}
|
||||
Err(CalculateLogicalSizeError::Other(err)) => {
|
||||
if let Some(PageReconstructError::AncestorStopping(_)) =
|
||||
err.root_cause().downcast_ref()
|
||||
{
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
} else {
|
||||
Err(BackgroundCalculationError::Other(err))
|
||||
}
|
||||
use crate::metrics::initial_logical_size::StartCircumstances;
|
||||
let (_maybe_permit, circumstances) = tokio::select! {
|
||||
res = wait_for_permit => {
|
||||
match res {
|
||||
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
|
||||
Err(RateLimitError::Cancelled) => {
|
||||
return Err(BackgroundCalculationError::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
.await;
|
||||
() = skip_concurrency_limiter.cancelled() => {
|
||||
// Some action that is part of a end user interaction requested logical size
|
||||
// => break out of the rate limit
|
||||
// TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
|
||||
// but then again what happens if they cancel; also, we should just be using
|
||||
// one runtime across the entire process, so, let's leave this for now.
|
||||
(None, StartCircumstances::SkippedConcurrencyLimiter)
|
||||
}
|
||||
};
|
||||
|
||||
match try_once_res {
|
||||
Ok(res) => return ControlFlow::Continue(res),
|
||||
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
|
||||
Err(BackgroundCalculationError::Other(e)) => {
|
||||
warn!(attempt, "initial size calculation failed: {e:?}");
|
||||
// exponential back-off doesn't make sense at these long intervals;
|
||||
// use fixed retry interval with generous jitter instead
|
||||
let sleep_duration = Duration::from_secs(
|
||||
u64::try_from(
|
||||
// 1hour base
|
||||
(60_i64 * 60_i64)
|
||||
let metrics_guard = if attempt == 1 {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
|
||||
} else {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
|
||||
};
|
||||
|
||||
match self_ref
|
||||
.logical_size_calculation_task(
|
||||
initial_part_end,
|
||||
LogicalSizeCalculationCause::Initial,
|
||||
background_ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
|
||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
}
|
||||
Err(CalculateLogicalSizeError::Other(err)) => {
|
||||
if let Some(PageReconstructError::AncestorStopping(_)) =
|
||||
err.root_cause().downcast_ref()
|
||||
{
|
||||
Err(BackgroundCalculationError::Cancelled)
|
||||
} else {
|
||||
Err(BackgroundCalculationError::Other(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let retrying = async {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
attempt += 1;
|
||||
|
||||
match try_once(attempt).await {
|
||||
Ok(res) => return ControlFlow::Continue(res),
|
||||
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
|
||||
Err(BackgroundCalculationError::Other(e)) => {
|
||||
warn!(attempt, "initial size calculation failed: {e:?}");
|
||||
// exponential back-off doesn't make sense at these long intervals;
|
||||
// use fixed retry interval with generous jitter instead
|
||||
let sleep_duration = Duration::from_secs(
|
||||
u64::try_from(
|
||||
// 1hour base
|
||||
(60_i64 * 60_i64)
|
||||
// 10min jitter
|
||||
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
|
||||
)
|
||||
.expect("10min < 1hour"),
|
||||
);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
}
|
||||
)
|
||||
.expect("10min < 1hour"),
|
||||
);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user