From eadabeddb892b330a9aa65034adb05141ec64035 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 28 May 2025 16:19:41 +0100 Subject: [PATCH] pageserver: use the same job size throughout the import lifetime (#12026) ## Problem Import planning takes a job size limit as its input. Previously, the job size came from a pageserver config field. This field may change while imports are in progress. If this happens, plans will no longer be identical and the import would fail permanently. ## Summary of Changes Bake the job size into the import progress reported to the storage controller. For new imports, use the value from the pagesever config, and, for existing imports, use the value present in the shard progress. This value is identical for all shards, but we want it to be versioned since future versions of the planner might split the jobs up differently. Hence, it ends up in `ShardImportProgress`. Closes https://github.com/neondatabase/neon/issues/11983 --- libs/pageserver_api/src/models.rs | 3 +++ .../src/tenant/timeline/import_pgdata/flow.rs | 24 ++++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9f3736d57a..e7d612bb7a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -354,6 +354,9 @@ pub struct ShardImportProgressV1 { pub completed: usize, /// Hash of the plan pub import_plan_hash: u64, + /// Soft limit for the job size + /// This needs to remain constant throughout the import + pub job_soft_size_limit: usize, } impl ShardImportStatus { diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 2ba4ca69ac..0d87a2f135 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -30,6 +30,7 @@ use std::collections::HashSet; use std::hash::{Hash, Hasher}; +use std::num::NonZeroUsize; use std::ops::Range; use std::sync::Arc; @@ -100,8 +101,24 @@ async fn run_v1( tasks: Vec::default(), }; - let import_config = &timeline.conf.timeline_import_config; - let plan = planner.plan(import_config).await?; + // Use the job size limit encoded in the progress if we are resuming an import. + // This ensures that imports have stable plans even if the pageserver config changes. + let import_config = { + match &import_progress { + Some(progress) => { + let base = &timeline.conf.timeline_import_config; + TimelineImportConfig { + import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit) + .unwrap(), + import_job_concurrency: base.import_job_concurrency, + import_job_checkpoint_threshold: base.import_job_checkpoint_threshold, + } + } + None => timeline.conf.timeline_import_config.clone(), + } + }; + + let plan = planner.plan(&import_config).await?; // Hash the plan and compare with the hash of the plan we got back from the storage controller. // If the two match, it means that the planning stage had the same output. @@ -126,7 +143,7 @@ async fn run_v1( pausable_failpoint!("import-timeline-pre-execute-pausable"); let start_from_job_idx = import_progress.map(|progress| progress.completed); - plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx) + plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx) .await } @@ -453,6 +470,7 @@ impl Plan { jobs: jobs_in_plan, completed: last_completed_job_idx, import_plan_hash, + job_soft_size_limit: import_config.import_job_soft_size_limit.into(), }; timeline.remote_client.schedule_index_upload_for_file_changes()?;