diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index b64c42a808..5b0c13dd89 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -182,6 +182,7 @@ pub struct ConfigToml { pub tracing: Option, pub enable_tls_page_service_api: bool, pub dev_mode: bool, + pub timeline_import_config: TimelineImportConfig, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -300,6 +301,12 @@ impl From for tracing_utils::Protocol { } } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct TimelineImportConfig { + pub import_job_concurrency: NonZeroUsize, + pub import_job_soft_size_limit: NonZeroUsize, +} + pub mod statvfs { pub mod mock { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -659,6 +666,10 @@ impl Default for ConfigToml { tracing: None, enable_tls_page_service_api: false, dev_mode: false, + timeline_import_config: TimelineImportConfig { + import_job_concurrency: NonZeroUsize::new(128).unwrap(), + import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(), + }, } } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index ded2805602..7e773f56b3 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -230,6 +230,8 @@ pub struct PageServerConf { /// such as authentication requirements for HTTP and PostgreSQL APIs. /// This is insecure and should only be used in development environments. pub dev_mode: bool, + + pub timeline_import_config: pageserver_api::config::TimelineImportConfig, } /// Token for authentication to safekeepers @@ -404,6 +406,7 @@ impl PageServerConf { tracing, enable_tls_page_service_api, dev_mode, + timeline_import_config, } = config_toml; let mut conf = PageServerConf { @@ -457,6 +460,7 @@ impl PageServerConf { tracing, enable_tls_page_service_api, dev_mode, + timeline_import_config, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 6ab6b90cb6..c4a8df39a3 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -149,14 +149,7 @@ pub async fn doit( } .await?; - flow::run( - timeline.clone(), - base_lsn, - control_file, - storage.clone(), - ctx, - ) - .await?; + flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?; // // Communicate that shard is done. diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index c6d2944769..34c073365d 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -34,7 +34,9 @@ use std::sync::Arc; use anyhow::{bail, ensure}; use bytes::Bytes; +use futures::stream::FuturesOrdered; use itertools::Itertools; +use pageserver_api::config::TimelineImportConfig; use pageserver_api::key::{ CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, @@ -46,8 +48,9 @@ use pageserver_api::shard::ShardIdentity; use postgres_ffi::relfile_utils::parse_relfilename; use postgres_ffi::{BLCKSZ, pg_constants}; use remote_storage::RemotePath; -use tokio::task::JoinSet; -use tracing::{Instrument, debug, info_span, instrument}; +use tokio::sync::Semaphore; +use tokio_stream::StreamExt; +use tracing::{debug, instrument}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; @@ -63,37 +66,39 @@ use crate::tenant::storage_layer::{ImageLayerWriter, Layer}; pub async fn run( timeline: Arc, - pgdata_lsn: Lsn, control_file: ControlFile, storage: RemoteStorageWrapper, ctx: &RequestContext, ) -> anyhow::Result<()> { - Flow { - timeline, - pgdata_lsn, + let planner = Planner { control_file, - tasks: Vec::new(), - storage, - } - .run(ctx) - .await + storage: storage.clone(), + shard: timeline.shard_identity, + tasks: Vec::default(), + }; + + let import_config = &timeline.conf.timeline_import_config; + let plan = planner.plan(import_config).await?; + plan.execute(timeline, import_config, ctx).await } -struct Flow { - timeline: Arc, - pgdata_lsn: Lsn, +struct Planner { control_file: ControlFile, - tasks: Vec, storage: RemoteStorageWrapper, + shard: ShardIdentity, + tasks: Vec, } -impl Flow { - /// Perform the ingestion into [`Self::timeline`]. - /// Assumes the timeline is empty (= no layers). - pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> { - let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align(); +struct Plan { + jobs: Vec, +} - self.pgdata_lsn = pgdata_lsn; +impl Planner { + /// Creates an import plan + /// + /// This function is and must remain pure: given the same input, it will generate the same import plan. + async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result { + let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align(); let datadir = PgDataDir::new(&self.storage).await?; @@ -115,7 +120,7 @@ impl Flow { } // Import SLRUs - if self.timeline.tenant_shard_id.is_shard_zero() { + if self.shard.is_shard_zero() { // pg_xact (01:00 keyspace) self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact")) .await?; @@ -166,14 +171,16 @@ impl Flow { let mut last_end_key = Key::MIN; let mut current_chunk = Vec::new(); let mut current_chunk_size: usize = 0; - let mut parallel_jobs = Vec::new(); + let mut jobs = Vec::new(); for task in std::mem::take(&mut self.tasks).into_iter() { - if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 { + if current_chunk_size + task.total_size() + > import_config.import_job_soft_size_limit.into() + { let key_range = last_end_key..task.key_range().start; - parallel_jobs.push(ChunkProcessingJob::new( + jobs.push(ChunkProcessingJob::new( key_range.clone(), std::mem::take(&mut current_chunk), - &self, + pgdata_lsn, )); last_end_key = key_range.end; current_chunk_size = 0; @@ -181,45 +188,13 @@ impl Flow { current_chunk_size += task.total_size(); current_chunk.push(task); } - parallel_jobs.push(ChunkProcessingJob::new( + jobs.push(ChunkProcessingJob::new( last_end_key..Key::MAX, current_chunk, - &self, + pgdata_lsn, )); - // Start all jobs simultaneosly - let mut work = JoinSet::new(); - // TODO: semaphore? - for job in parallel_jobs { - let ctx: RequestContext = - ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error); - work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job"))); - } - let mut results = Vec::new(); - while let Some(result) = work.join_next().await { - match result { - Ok(res) => { - results.push(res); - } - Err(_joinset_err) => { - results.push(Err(anyhow::anyhow!( - "parallel job panicked or cancelled, check pageserver logs" - ))); - } - } - } - - if results.iter().all(|r| r.is_ok()) { - Ok(()) - } else { - let mut msg = String::new(); - for result in results { - if let Err(err) = result { - msg.push_str(&format!("{err:?}\n\n")); - } - } - bail!("Some parallel jobs failed:\n\n{msg}"); - } + Ok(Plan { jobs }) } #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))] @@ -266,7 +241,7 @@ impl Flow { let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32); self.tasks .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new( - *self.timeline.get_shard_identity(), + self.shard, start_key..end_key, &file.path, self.storage.clone(), @@ -289,7 +264,7 @@ impl Flow { } async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> { - assert!(self.timeline.tenant_shard_id.is_shard_zero()); + assert!(self.shard.is_shard_zero()); let segments = self.storage.listfilesindir(path).await?; let segments: Vec<(String, u32, usize)> = segments @@ -344,6 +319,68 @@ impl Flow { } } +impl Plan { + async fn execute( + self, + timeline: Arc, + import_config: &TimelineImportConfig, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut work = FuturesOrdered::new(); + let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into())); + + let jobs_in_plan = self.jobs.len(); + + let mut jobs = self.jobs.into_iter().enumerate().peekable(); + let mut results = Vec::new(); + + // Run import jobs concurrently up to the limit specified by the pageserver configuration. + // Note that we process completed futures in the oreder of insertion. This will be the + // building block for resuming imports across pageserver restarts or tenant migrations. + while results.len() < jobs_in_plan { + tokio::select! { + permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => { + let permit = permit.expect("never closed"); + let (job_idx, job) = jobs.next().expect("we peeked"); + let job_timeline = timeline.clone(); + let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error); + + work.push_back(tokio::task::spawn(async move { + let _permit = permit; + let res = job.run(job_timeline, &ctx).await; + (job_idx, res) + })); + }, + maybe_complete_job_idx = work.next() => { + match maybe_complete_job_idx { + Some(Ok((_job_idx, res))) => { + results.push(res); + }, + Some(Err(_)) => { + results.push(Err(anyhow::anyhow!( + "parallel job panicked or cancelled, check pageserver logs" + ))); + } + None => {} + } + } + } + } + + if results.iter().all(|r| r.is_ok()) { + Ok(()) + } else { + let mut msg = String::new(); + for result in results { + if let Err(err) = result { + msg.push_str(&format!("{err:?}\n\n")); + } + } + bail!("Some parallel jobs failed:\n\n{msg}"); + } + } +} + // // dbdir iteration tools // @@ -713,7 +750,6 @@ impl From for AnyImportTask { } struct ChunkProcessingJob { - timeline: Arc, range: Range, tasks: Vec, @@ -721,25 +757,24 @@ struct ChunkProcessingJob { } impl ChunkProcessingJob { - fn new(range: Range, tasks: Vec, env: &Flow) -> Self { - assert!(env.pgdata_lsn.is_valid()); + fn new(range: Range, tasks: Vec, pgdata_lsn: Lsn) -> Self { + assert!(pgdata_lsn.is_valid()); Self { - timeline: env.timeline.clone(), range, tasks, - pgdata_lsn: env.pgdata_lsn, + pgdata_lsn, } } - async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> { + async fn run(self, timeline: Arc, ctx: &RequestContext) -> anyhow::Result<()> { let mut writer = ImageLayerWriter::new( - self.timeline.conf, - self.timeline.timeline_id, - self.timeline.tenant_shard_id, + timeline.conf, + timeline.timeline_id, + timeline.tenant_shard_id, &self.range, self.pgdata_lsn, - &self.timeline.gate, - self.timeline.cancel.clone(), + &timeline.gate, + timeline.cancel.clone(), ctx, ) .await?; @@ -751,24 +786,20 @@ impl ChunkProcessingJob { let resident_layer = if nimages > 0 { let (desc, path) = writer.finish(ctx).await?; - Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)? + Layer::finish_creating(timeline.conf, &timeline, desc, &path)? } else { // dropping the writer cleans up return Ok(()); }; // this is sharing the same code as create_image_layers - let mut guard = self.timeline.layers.write().await; + let mut guard = timeline.layers.write().await; guard .open_mut()? - .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics); + .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics); crate::tenant::timeline::drop_wlock(guard); - // Schedule the layer for upload but don't add barriers such as - // wait for completion or index upload, so we don't inhibit upload parallelism. - // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?) - // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level. - self.timeline + timeline .remote_client .schedule_layer_file_upload(resident_layer)?;