From b69d103b909866905991ec8cd6f54a43893985f4 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 4 Jun 2025 11:44:23 +0100 Subject: [PATCH] pageserver: make import job max byte range size configurable (#12117) ## Problem We want to repro an OOM situation, but large partial reads are required. ## Summary of Changes Make the max partial read size configurable for import jobs. --- libs/pageserver_api/src/config.rs | 3 ++ .../src/tenant/timeline/import_pgdata/flow.rs | 28 +++++++++++++------ test_runner/fixtures/neon_fixtures.py | 1 + 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 237833b9de..46903965b1 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -330,6 +330,8 @@ pub struct TimelineImportConfig { pub import_job_concurrency: NonZeroUsize, pub import_job_soft_size_limit: NonZeroUsize, pub import_job_checkpoint_threshold: NonZeroUsize, + /// Max size of the remote storage partial read done by any job + pub import_job_max_byte_range_size: NonZeroUsize, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -735,6 +737,7 @@ impl Default for ConfigToml { import_job_concurrency: NonZeroUsize::new(32).unwrap(), import_job_soft_size_limit: NonZeroUsize::new(256 * 1024 * 1024).unwrap(), import_job_checkpoint_threshold: NonZeroUsize::new(32).unwrap(), + import_job_max_byte_range_size: NonZeroUsize::new(4 * 1024 * 1024).unwrap(), }, basebackup_cache_config: None, posthog_config: None, diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 2ec9d86720..e003bb6810 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -100,6 +100,7 @@ async fn run_v1( .unwrap(), import_job_concurrency: base.import_job_concurrency, import_job_checkpoint_threshold: base.import_job_checkpoint_threshold, + import_job_max_byte_range_size: base.import_job_max_byte_range_size, } } None => timeline.conf.timeline_import_config.clone(), @@ -441,6 +442,7 @@ impl Plan { let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0); let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into(); + let max_byte_range_size: usize = import_config.import_job_max_byte_range_size.into(); // Run import jobs concurrently up to the limit specified by the pageserver configuration. // Note that we process completed futures in the oreder of insertion. This will be the @@ -456,7 +458,7 @@ impl Plan { work.push_back(tokio::task::spawn(async move { let _permit = permit; - let res = job.run(job_timeline, &ctx).await; + let res = job.run(job_timeline, max_byte_range_size, &ctx).await; (job_idx, res) })); }, @@ -679,6 +681,7 @@ trait ImportTask { async fn doit( self, layer_writer: &mut ImageLayerWriter, + max_byte_range_size: usize, ctx: &RequestContext, ) -> anyhow::Result; } @@ -715,6 +718,7 @@ impl ImportTask for ImportSingleKeyTask { async fn doit( self, layer_writer: &mut ImageLayerWriter, + _max_byte_range_size: usize, ctx: &RequestContext, ) -> anyhow::Result { layer_writer.put_image(self.key, self.buf, ctx).await?; @@ -768,10 +772,9 @@ impl ImportTask for ImportRelBlocksTask { async fn doit( self, layer_writer: &mut ImageLayerWriter, + max_byte_range_size: usize, ctx: &RequestContext, ) -> anyhow::Result { - const MAX_BYTE_RANGE_SIZE: usize = 4 * 1024 * 1024; - debug!("Importing relation file"); let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?; @@ -796,7 +799,7 @@ impl ImportTask for ImportRelBlocksTask { assert_eq!(key.len(), 1); assert!(!acc.is_empty()); assert!(acc_end > acc_start); - if acc_end == start && end - acc_start <= MAX_BYTE_RANGE_SIZE { + if acc_end == start && end - acc_start <= max_byte_range_size { acc.push(key.pop().unwrap()); Ok((acc, acc_start, end)) } else { @@ -860,6 +863,7 @@ impl ImportTask for ImportSlruBlocksTask { async fn doit( self, layer_writer: &mut ImageLayerWriter, + _max_byte_range_size: usize, ctx: &RequestContext, ) -> anyhow::Result { debug!("Importing SLRU segment file {}", self.path); @@ -906,12 +910,13 @@ impl ImportTask for AnyImportTask { async fn doit( self, layer_writer: &mut ImageLayerWriter, + max_byte_range_size: usize, ctx: &RequestContext, ) -> anyhow::Result { match self { - Self::SingleKey(t) => t.doit(layer_writer, ctx).await, - Self::RelBlocks(t) => t.doit(layer_writer, ctx).await, - Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await, + Self::SingleKey(t) => t.doit(layer_writer, max_byte_range_size, ctx).await, + Self::RelBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await, + Self::SlruBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await, } } } @@ -952,7 +957,12 @@ impl ChunkProcessingJob { } } - async fn run(self, timeline: Arc, ctx: &RequestContext) -> anyhow::Result<()> { + async fn run( + self, + timeline: Arc, + max_byte_range_size: usize, + ctx: &RequestContext, + ) -> anyhow::Result<()> { let mut writer = ImageLayerWriter::new( timeline.conf, timeline.timeline_id, @@ -967,7 +977,7 @@ impl ChunkProcessingJob { let mut nimages = 0; for task in self.tasks { - nimages += task.doit(&mut writer, ctx).await?; + nimages += task.doit(&mut writer, max_byte_range_size, ctx).await?; } let resident_layer = if nimages > 0 { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ab4885ce6b..19d12da5e3 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -423,6 +423,7 @@ class PageserverImportConfig: "import_job_concurrency": self.import_job_concurrency, "import_job_soft_size_limit": self.import_job_soft_size_limit, "import_job_checkpoint_threshold": self.import_job_checkpoint_threshold, + "import_job_max_byte_range_size": 4 * 1024 * 1024, # Pageserver default } return ("timeline_import_config", value)