From 2bf9a350eff1f472e8e832b7069a1eb0b67fbb83 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Sat, 14 Sep 2024 14:07:35 +0100 Subject: [PATCH] Process chunks in parallel --- pageserver/src/pg_import.rs | 113 +++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 29 deletions(-) diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index ea5a4cde30..2fd7e44c63 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -7,7 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use itertools::Itertools; use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag}; use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, ControlFileData, BLCKSZ}; -use tokio::io::AsyncRead; +use tokio::{io::AsyncRead, task::{self, JoinHandle}}; use tracing::debug; use utils::{id::{NodeId, TenantId, TimelineId}, shard::{ShardCount, ShardNumber, TenantShardId}}; use walkdir::WalkDir; @@ -36,7 +36,6 @@ use std::collections::HashSet; use std::ops::Range; pub struct PgImportEnv { - ctx: RequestContext, conf: &'static PageServerConf, tli: TimelineId, tsi: TenantShardId, @@ -51,7 +50,6 @@ pub struct PgImportEnv { impl PgImportEnv { pub async fn init(dstdir: &Utf8Path, tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result { - let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let config = toml_edit::Document::new(); let conf = PageServerConf::parse_and_validate( NodeId(42), @@ -67,7 +65,6 @@ impl PgImportEnv { }; Ok(PgImportEnv { - ctx, conf, tli: timeline_id, tsi, @@ -127,24 +124,47 @@ impl PgImportEnv { let checkpoint_buf = control_file.checkPointCopy.encode()?; self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CHECKPOINT_KEY, checkpoint_buf))); - - // Execution time + // Assigns parts of key space to later parallel jobs let mut last_end_key = Key::MIN; let mut current_chunk = Vec::new(); let mut current_chunk_size: usize = 0; + let mut parallel_jobs = Vec::new(); for task in std::mem::take(&mut self.tasks).into_iter() { if current_chunk_size + task.total_size() > 1024*1024*1024 { let key_range = last_end_key..task.key_range().start; - self.flush_chunk(key_range.clone(), std::mem::take(&mut current_chunk)).await?; + parallel_jobs.push(ChunkProcessingJob::new( + key_range.clone(), + std::mem::take(&mut current_chunk), + self + )); last_end_key = key_range.end; current_chunk_size = 0; } current_chunk_size += task.total_size(); current_chunk.push(task); } - self.flush_chunk(last_end_key..Key::NON_L0_MAX, current_chunk).await?; + parallel_jobs.push(ChunkProcessingJob::new( + last_end_key..Key::NON_L0_MAX, + current_chunk, + self + )); - // should we anything about the wal? + // Start all jobs simultaneosly + // TODO: semaphore? + let mut handles = vec![]; + for job in parallel_jobs { + let handle: JoinHandle> = task::spawn(async move { + let layerdesc = job.run().await?; + Ok(layerdesc) + }); + handles.push(handle); + } + + // Wait for all jobs to complete + for handle in handles { + let layerdesc = handle.await??; + self.layers.push(layerdesc); + } // Create index_part.json file self.create_index_part(&control_file).await?; @@ -284,26 +304,6 @@ impl PgImportEnv { Ok(()) } - - - async fn flush_chunk(&mut self, key_range: Range, chunk: Vec) -> anyhow::Result<()> { - let mut layer = ImageLayerWriter::new( - &self.conf, - self.tli, - self.tsi, - &key_range, - self.pgdata_lsn, - &self.ctx, - ).await?; - - for task in chunk { - task.doit(&mut layer, &self.ctx).await?; - } - - let layerdesc = layer.finish_raw(&self.ctx).await?; - self.layers.push(layerdesc); - Ok(()) - } } // @@ -593,3 +593,58 @@ impl From for AnyImportTask { Self::SlruBlocks(t) } } + +struct ChunkProcessingJob { + range: Range, + tasks: Vec, + + dstdir: Utf8PathBuf, + tenant_id: TenantId, + timeline_id: TimelineId, + pgdata_lsn: Lsn, +} + +impl ChunkProcessingJob { + fn new(range: Range, tasks: Vec, env: &PgImportEnv) -> Self { + assert!(env.pgdata_lsn.is_valid()); + Self { + range, + tasks, + dstdir: env.conf.workdir.clone(), + tenant_id: env.tsi.tenant_id, + timeline_id: env.tli, + pgdata_lsn: env.pgdata_lsn, + } + } + + async fn run(self) -> anyhow::Result { + let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + let config = toml_edit::Document::new(); + let conf: &'static PageServerConf = Box::leak(Box::new(PageServerConf::parse_and_validate( + NodeId(42), + &config, + &self.dstdir + )?)); + let tsi = TenantShardId { + tenant_id: self.tenant_id, + shard_number: ShardNumber(0), + shard_count: ShardCount(0), + }; + + let mut layer = ImageLayerWriter::new( + &conf, + self.timeline_id, + tsi, + &self.range, + self.pgdata_lsn, + &ctx, + ).await?; + + for task in self.tasks { + task.doit(&mut layer, &ctx).await?; + } + + let layerdesc = layer.finish_raw(&ctx).await?; + Ok(layerdesc) + } +}