[1/4] initial logical size calculation: if it fails, retry on next call

Before this patch, if the task fails, we would not reset
self.initial_size_computation_started.
So, if it fails, we will return the approximate value forever.

In practice, it probably never failed because the local filesystem
is quite reliable.

But with on-demand download, the logical size calculation may need
to download layers, which is more likely to fail at times.
There will be internal retires with a timeout, but eventually,
the downloads will give up.
We want to retry in those cases.

While we're at it, also change the handling of the timeline state
watch so that we treat it as an error. Most likely, we'll not be
called again, but if we are, retrying is the right thing.
This commit is contained in:
Christian Schwarz
2022-12-15 12:15:23 +01:00
committed by Christian Schwarz
parent c785a516aa
commit ee2b5dc9ac

View File

@@ -15,7 +15,7 @@ use std::collections::HashMap;
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant, SystemTime};
@@ -176,7 +176,7 @@ pub struct Timeline {
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
initial_size_computation_started: AtomicBool,
initial_size_computation_state: Mutex<InitialLogicalSizeComputationState>,
/// Information about the last processed message by the WAL receiver,
/// or None if WAL receiver has not received anything for this timeline
@@ -189,6 +189,14 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum InitialLogicalSizeComputationState {
NotStarted,
Running,
FailedWillRetryNextTime,
Success,
}
/// Internal structure to hold all data needed for logical size calculation.
/// Calculation consists of two parts:
/// 1. Initial size calculation. That might take a long time, because it requires
@@ -804,7 +812,9 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
initial_size_computation_started: AtomicBool::new(false),
initial_size_computation_state: Mutex::new(
InitialLogicalSizeComputationState::NotStarted,
),
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
@@ -1221,59 +1231,85 @@ impl Timeline {
}
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
// Atomically check if the timeline size calculation had already started.
// If the flag was not already set, this sets it.
if !self
.initial_size_computation_started
.swap(true, AtomicOrdering::SeqCst)
{
// We need to start the computation task.
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"initial size calculation",
false,
async move {
let mut timeline_state_updates = self_clone.subscribe_for_state_updates();
let self_calculation = Arc::clone(&self_clone);
tokio::select! {
calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => {
let calculated_size = calculation_result
.context("Failed to spawn calculation result task")?
.context("Failed to calculate logical size")?;
match self_clone.current_logical_size.initial_logical_size.set(calculated_size) {
Ok(()) => info!("Successfully calculated initial logical size"),
Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"),
}
Ok(())
},
new_event = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state),
}
}
Err(_sender_dropped_error) => return None,
}
}
} => {
match new_event {
Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"),
None => info!("Timeline dropped state updates sender, stopping init size calculation"),
}
Ok(())
},
use InitialLogicalSizeComputationState::*;
let mut guard = self.initial_size_computation_state.lock().unwrap();
match *guard {
Running | Success => return,
NotStarted | FailedWillRetryNextTime => *guard = Running,
}
drop(guard);
// We need to start the computation task.
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"initial size calculation",
false,
async move {
let res = self_clone
.initial_logical_size_calculation_task(init_lsn)
.await;
// task_mgr will log the result
let new_state = match res {
Ok(_) => Success,
Err(_) => FailedWillRetryNextTime,
};
let mut state = self_clone.initial_size_computation_state.lock().unwrap();
if *state != Running {
// Should be unreachable, but no reason to crash the pageserver. Don't touch anything.
error!("expecting initial size computation task to be in state {Running:?}, got {state:?}")
} else {
*state = new_state;
}
res
},
);
}
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
async fn initial_logical_size_calculation_task(
self: &Arc<Self>,
init_lsn: Lsn,
) -> anyhow::Result<()> {
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
tokio::select! {
calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => {
let calculated_size = calculation_result
.context("Failed to spawn calculation result task")?
.context("Failed to calculate logical size")?;
match self.current_logical_size.initial_logical_size.set(calculated_size) {
Ok(()) => (),
Err(existing_size) => {
// This shouldn't happen because we use self.initial_size_computation_running to ensure exlusivity here.
// But if it happens, just complain & report success so there are no further retries.
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
}
}.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)),
);
}
Ok(())
},
new_event = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state),
}
}
Err(_sender_dropped_error) => return None,
}
}
} => {
match new_event {
Some(new_state) => anyhow::bail!("aborted because timeline became inactive (new state: {new_state:?})"),
None => anyhow::bail!("aborted because state watch was dropped"), // can't happen, the sender is not dropped as long as the Timeline exists
}
},
}
}