pageserver: checkpoint import progress in the storage controller (#11862)

## Problem

Timeline imports do not have progress checkpointing. Any time that the
tenant is shut-down, all progress is lost
and the import restarts from the beginning when the tenant is
re-attached.

## Summary of changes

This PR adds progress checkpointing.


### Preliminaries

The **unit of work** is a `ChunkProcessingJob`. Each
`ChunkProcessingJob` deals with the import for a set of key ranges. The
job split is done by using an estimation of how many pages each job will
produce.

The planning stage must be **pure**: given a fixed set of contents in
the import bucket, it will always yield the same plan. This property is
enforced by checking that the hash of the plan is identical when
resuming from a checkpoint.

The storage controller tracks the progress of each shard in the import
in the database in the form of the **latest
job** that has has completed.

### Flow

This is the high level flow for the happy path:
1. On the first run of the import task, the import task queries storcon
for the progress and sees that none is recorded.
2. Execute the preparatory stage of the import
3. Import jobs start running concurrently in a `FuturesOrdered`. Every
time the checkpointing threshold of jobs has been reached, notify the
storage controller.
4. Tenant is detached and re-attached
5. Import task starts up again and gets the latest progress checkpoint
from the storage controller in the form of a job index.
6. The plan is computed again and we check that the hash matches with
the original plan.
7. Jobs are spawned from where the previous import task left off. Note
that we will not report progress after the completion of each job, so
some jobs might run twice.

Closes https://github.com/neondatabase/neon/issues/11568
Closes https://github.com/neondatabase/neon/issues/11664
This commit is contained in:
Vlad Lazar
2025-05-15 14:18:22 +01:00
committed by GitHub
parent a703cd342b
commit 2621ce2daf
9 changed files with 357 additions and 137 deletions

1
Cargo.lock generated
View File

@@ -4331,6 +4331,7 @@ dependencies = [
"toml_edit",
"tracing",
"tracing-utils",
"twox-hash",
"url",
"utils",
"uuid",

View File

@@ -305,6 +305,7 @@ impl From<OtelExporterProtocol> for tracing_utils::Protocol {
pub struct TimelineImportConfig {
pub import_job_concurrency: NonZeroUsize,
pub import_job_soft_size_limit: NonZeroUsize,
pub import_job_checkpoint_threshold: NonZeroUsize,
}
pub mod statvfs {
@@ -661,6 +662,7 @@ impl Default for ConfigToml {
timeline_import_config: TimelineImportConfig {
import_job_concurrency: NonZeroUsize::new(128).unwrap(),
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
},
}
}

View File

@@ -336,14 +336,25 @@ impl TimelineCreateRequest {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum ShardImportStatus {
InProgress,
InProgress(Option<ShardImportProgress>),
Done,
Error(String),
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ShardImportProgress {
/// Total number of jobs in the import plan
pub jobs: usize,
/// Number of jobs completed
pub completed: usize,
/// Hash of the plan
pub import_plan_hash: u64,
}
impl ShardImportStatus {
pub fn is_terminal(&self) -> bool {
match self {
ShardImportStatus::InProgress => false,
ShardImportStatus::InProgress(_) => false,
ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
}
}

View File

@@ -96,6 +96,7 @@ strum.workspace = true
strum_macros.workspace = true
wal_decoder.workspace = true
smallvec.workspace = true
twox-hash.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
procfs.workspace = true

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use anyhow::{Context, bail};
use importbucket_client::{ControlFile, RemoteStorageWrapper};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath;
use tokio::task::JoinHandle;
@@ -57,115 +58,40 @@ pub async fn doit(
.map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
info!(?shard_status, "peeking shard status");
match shard_status {
None | Some(ShardImportStatus::InProgress) => {
// TODO: checkpoint the progress into the IndexPart instead of restarting
// from the beginning.
//
// Wipe the slate clean - the flow does not allow resuming.
// We can implement resuming in the future by checkpointing the progress into the IndexPart.
//
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
match shard_status.unwrap_or(ShardImportStatus::InProgress(None)) {
ShardImportStatus::InProgress(maybe_progress) => {
let storage =
importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
let status_prefix = RemotePath::from_string("status").unwrap();
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
cancel.cancelled(),
)
.await
.is_ok()
{
bail!("cancelled while waiting for pgdata");
}
}
}
}
//
// Do the import
//
info!("do the import");
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
let control_file_res = if maybe_progress.is_none() {
// Only prepare the import once when there's no progress.
prepare_import(timeline, storage.clone(), &cancel).await
} else {
storage.get_control_file().await
};
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
let _ctx: &RequestContext = ctx;
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
let control_file = match control_file_res {
Ok(cf) => cf,
Err(err) => {
return Err(
terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
);
let _start_lsn = disk_consistent_lsn + 1;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline.remote_client.wait_completion().await?;
anyhow::Ok(())
}
}
.await?;
};
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
let res = flow::run(
timeline.clone(),
control_file,
storage.clone(),
maybe_progress,
ctx,
)
.await;
if let Err(err) = res {
return Err(
terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
);
}
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
@@ -180,7 +106,6 @@ pub async fn doit(
timeline.tenant_shard_id,
timeline.timeline_id,
timeline.generation,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
@@ -188,16 +113,151 @@ pub async fn doit(
anyhow::anyhow!("Shut down while putting timeline import status")
})?;
}
Some(ShardImportStatus::Error(err)) => {
ShardImportStatus::Error(err) => {
info!(
"shard status indicates that the shard is done (error), skipping import {}",
err
);
}
Some(ShardImportStatus::Done) => {
ShardImportStatus::Done => {
info!("shard status indicates that the shard is done (success), skipping import");
}
}
Ok(())
}
async fn prepare_import(
timeline: &Arc<Timeline>,
storage: RemoteStorageWrapper,
cancel: &CancellationToken,
) -> anyhow::Result<ControlFile> {
// Wipe the slate clean before starting the import as a precaution.
// This method is only called when there's no recorded checkpoint for the import
// in the storage controller.
//
// Note that this is split-brain safe (two imports for same timeline shards running in
// different generations) because we go through the usual deletion path, including deletion queue.
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
let status_prefix = RemotePath::from_string("status").unwrap();
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
.await
.is_ok()
{
bail!("cancelled while waiting for pgdata");
}
}
}
}
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
);
let _start_lsn = disk_consistent_lsn + 1;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline.remote_client.wait_completion().await?;
anyhow::Ok(())
}
}
.await?;
Ok(control_file)
}
async fn terminate_flow_with_error(
timeline: &Arc<Timeline>,
error: anyhow::Error,
storcon_client: &StorageControllerUpcallClient,
cancel: &CancellationToken,
) -> anyhow::Error {
// The import task is a aborted on tenant shutdown, so in principle, it should
// never be cancelled. To be on the safe side, check the cancellation tokens
// before marking the import as failed.
if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
let notify_res = storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
timeline.generation,
ShardImportStatus::Error(format!("{error:#}")),
)
.await;
if let Err(_notify_error) = notify_res {
// The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
// forever internally, so errors returned by it can only be due to cancellation.
info!("failed to notify storcon about permanent import error");
}
// Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
error
} else {
anyhow::anyhow!("Import task cancelled")
}
}

View File

@@ -29,10 +29,11 @@
//! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Range;
use std::sync::Arc;
use anyhow::{bail, ensure};
use anyhow::ensure;
use bytes::Bytes;
use futures::stream::FuturesOrdered;
use itertools::Itertools;
@@ -43,6 +44,7 @@ use pageserver_api::key::{
slru_segment_size_to_key,
};
use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range};
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::parse_relfilename;
@@ -59,16 +61,18 @@ use super::Timeline;
use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::context::{DownloadBehavior, RequestContext};
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::pgdatadir_mapping::{
DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory,
};
use crate::task_mgr::TaskKind;
use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
pub async fn run(
timeline: Arc<Timeline>,
control_file: ControlFile,
storage: RemoteStorageWrapper,
import_progress: Option<ShardImportProgress>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let planner = Planner {
@@ -81,9 +85,31 @@ pub async fn run(
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
// Hash the plan and compare with the hash of the plan we got back from the storage controller.
// If the two match, it means that the planning stage had the same output.
//
// This is not intended to be a cryptographically secure hash.
const SEED: u64 = 42;
let mut hasher = twox_hash::XxHash64::with_seed(SEED);
plan.hash(&mut hasher);
let plan_hash = hasher.finish();
if let Some(progress) = &import_progress {
if plan_hash != progress.import_plan_hash {
anyhow::bail!("Import plan does not match storcon metadata");
}
// Handle collisions on jobs of unequal length
if progress.jobs != plan.jobs.len() {
anyhow::bail!("Import plan job length does not match storcon metadata")
}
}
pausable_failpoint!("import-timeline-pre-execute-pausable");
plan.execute(timeline, import_config, ctx).await
let start_from_job_idx = import_progress.map(|progress| progress.completed);
plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
.await
}
struct Planner {
@@ -93,8 +119,11 @@ struct Planner {
tasks: Vec<AnyImportTask>,
}
#[derive(Hash)]
struct Plan {
jobs: Vec<ChunkProcessingJob>,
// Included here such that it ends up in the hash for the plan
shard: ShardIdentity,
}
impl Planner {
@@ -198,7 +227,10 @@ impl Planner {
pgdata_lsn,
));
Ok(Plan { jobs })
Ok(Plan {
jobs,
shard: self.shard,
})
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
@@ -327,25 +359,45 @@ impl Plan {
async fn execute(
self,
timeline: Arc<Timeline>,
start_after_job_idx: Option<usize>,
import_plan_hash: u64,
import_config: &TimelineImportConfig,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &timeline.cancel);
let mut work = FuturesOrdered::new();
let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
let jobs_in_plan = self.jobs.len();
let mut jobs = self.jobs.into_iter().enumerate().peekable();
let mut results = Vec::new();
let mut jobs = self
.jobs
.into_iter()
.enumerate()
.map(|(idx, job)| (idx + 1, job))
.filter(|(idx, _job)| {
// Filter out any jobs that have been done already
if let Some(start_after) = start_after_job_idx {
*idx > start_after
} else {
true
}
})
.peekable();
let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0);
let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into();
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
// Note that we process completed futures in the oreder of insertion. This will be the
// building block for resuming imports across pageserver restarts or tenant migrations.
while results.len() < jobs_in_plan {
while last_completed_job_idx < jobs_in_plan {
tokio::select! {
permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
let permit = permit.expect("never closed");
let (job_idx, job) = jobs.next().expect("we peeked");
let job_timeline = timeline.clone();
let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
@@ -357,13 +409,33 @@ impl Plan {
},
maybe_complete_job_idx = work.next() => {
match maybe_complete_job_idx {
Some(Ok((_job_idx, res))) => {
results.push(res);
Some(Ok((job_idx, res))) => {
assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
res?;
last_completed_job_idx = job_idx;
if last_completed_job_idx % checkpoint_every == 0 {
storcon_client.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
timeline.generation,
ShardImportStatus::InProgress(Some(ShardImportProgress {
jobs: jobs_in_plan,
completed: last_completed_job_idx,
import_plan_hash,
}))
)
.await
.map_err(|_err| {
anyhow::anyhow!("Shut down while putting timeline import status")
})?;
}
},
Some(Err(_)) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
anyhow::bail!(
"import job panicked or cancelled"
);
}
None => {}
}
@@ -371,17 +443,7 @@ impl Plan {
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
Ok(())
}
}
@@ -553,6 +615,15 @@ struct ImportSingleKeyTask {
buf: Bytes,
}
impl Hash for ImportSingleKeyTask {
fn hash<H: Hasher>(&self, state: &mut H) {
let ImportSingleKeyTask { key, buf } = self;
key.hash(state);
buf.hash(state);
}
}
impl ImportSingleKeyTask {
fn new(key: Key, buf: Bytes) -> Self {
ImportSingleKeyTask { key, buf }
@@ -581,6 +652,20 @@ struct ImportRelBlocksTask {
storage: RemoteStorageWrapper,
}
impl Hash for ImportRelBlocksTask {
fn hash<H: Hasher>(&self, state: &mut H) {
let ImportRelBlocksTask {
shard_identity: _,
key_range,
path,
storage: _,
} = self;
key_range.hash(state);
path.hash(state);
}
}
impl ImportRelBlocksTask {
fn new(
shard_identity: ShardIdentity,
@@ -665,6 +750,19 @@ struct ImportSlruBlocksTask {
storage: RemoteStorageWrapper,
}
impl Hash for ImportSlruBlocksTask {
fn hash<H: Hasher>(&self, state: &mut H) {
let ImportSlruBlocksTask {
key_range,
path,
storage: _,
} = self;
key_range.hash(state);
path.hash(state);
}
}
impl ImportSlruBlocksTask {
fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
ImportSlruBlocksTask {
@@ -707,6 +805,7 @@ impl ImportTask for ImportSlruBlocksTask {
}
}
#[derive(Hash)]
enum AnyImportTask {
SingleKey(ImportSingleKeyTask),
RelBlocks(ImportRelBlocksTask),
@@ -753,6 +852,7 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
}
}
#[derive(Hash)]
struct ChunkProcessingJob {
range: Range<Key>,
tasks: Vec<AnyImportTask>,
@@ -790,17 +890,51 @@ impl ChunkProcessingJob {
let resident_layer = if nimages > 0 {
let (desc, path) = writer.finish(ctx).await?;
{
let guard = timeline.layers.read().await;
let existing_layer = guard.try_get_from_key(&desc.key());
if let Some(layer) = existing_layer {
if layer.metadata().generation != timeline.generation {
return Err(anyhow::anyhow!(
"Import attempted to rewrite layer file in the same generation: {}",
layer.local_path()
));
}
}
}
Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
} else {
// dropping the writer cleans up
return Ok(());
};
// this is sharing the same code as create_image_layers
// The same import job might run multiple times since not each job is checkpointed.
// Hence, we must support the cases where the layer already exists. We cannot be
// certain that the existing layer is identical to the new one, so in that case
// we replace the old layer with the one we just generated.
let mut guard = timeline.layers.write().await;
guard
.open_mut()?
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
let existing_layer = guard
.try_get_from_key(&resident_layer.layer_desc().key())
.cloned();
match existing_layer {
Some(existing) => {
guard.open_mut()?.rewrite_layers(
&[(existing.clone(), resident_layer.clone())],
&[],
&timeline.metrics,
);
}
None => {
guard
.open_mut()?
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
}
}
crate::tenant::timeline::drop_wlock(guard);
timeline

View File

@@ -4082,7 +4082,7 @@ impl Service {
/// imports are stored in the database).
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
timeline_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,

View File

@@ -5,7 +5,7 @@ use http_utils::error::ApiError;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use pageserver_api::models::ShardImportStatus;
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
@@ -28,7 +28,12 @@ impl ShardImportStatuses {
ShardImportStatuses(
shards
.into_iter()
.map(|ts_id| (ts_id, ShardImportStatus::InProgress))
.map(|ts_id| {
(
ts_id,
ShardImportStatus::InProgress(None::<ShardImportProgress>),
)
})
.collect(),
)
}

View File

@@ -1255,6 +1255,12 @@ class NeonEnv:
"no_sync": True,
# Look for gaps in WAL received from safekeepeers
"validate_wal_contiguity": True,
# TODO(vlad): make these configurable through the builder
"timeline_import_config": {
"import_job_concurrency": 4,
"import_job_soft_size_limit": 512 * 1024,
"import_job_checkpoint_threshold": 4,
},
}
# Batching (https://github.com/neondatabase/neon/issues/9377):