From 2e7e5f4f3add585bfb80da84320ae62f94b6b78e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 13 Sep 2024 05:41:35 +0300 Subject: [PATCH] Refactor how the image layer partitioning is done It got pretty ugly after the last commit. Refactor it so that we first collect all the key ranges that need to be written out into a list of tasks, then partition the tasks into image layers, and then write them out. This will be much easier to parallelize, but that's not included in this commit yet. --- libs/pageserver_api/src/keyspace.rs | 4 +- pageserver/src/pg_import.rs | 428 ++++++++++++++++------------ 2 files changed, 248 insertions(+), 184 deletions(-) diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 401887d362..c55b9e9484 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -48,7 +48,7 @@ pub struct ShardedRange<'a> { // Calculate the size of a range within the blocks of the same relation, or spanning only the // top page in the previous relation's space. -fn contiguous_range_len(range: &Range) -> u32 { +pub fn contiguous_range_len(range: &Range) -> u32 { debug_assert!(is_contiguous_range(range)); if range.start.field6 == 0xffffffff { range.end.field6 + 1 @@ -67,7 +67,7 @@ fn contiguous_range_len(range: &Range) -> u32 { /// This matters, because: /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse. /// - Within such ranges, we may calculate distances using simple subtraction of field6. -fn is_contiguous_range(range: &Range) -> bool { +pub fn is_contiguous_range(range: &Range) -> bool { range.start.field1 == range.end.field1 && range.start.field2 == range.end.field2 && range.start.field3 == range.end.field3 diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index afb918c15a..ea5a4cde30 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -1,4 +1,4 @@ -use std::{fs::metadata, time::Instant}; +use std::fs::metadata; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -26,12 +26,14 @@ use crate::tenant::remote_timeline_client; use crate::tenant::remote_timeline_client::LayerFileMetadata; use pageserver_api::shard::ShardIndex; use pageserver_api::key::Key; +use pageserver_api::keyspace::{is_contiguous_range, contiguous_range_len}; +use pageserver_api::keyspace::singleton_range; use pageserver_api::reltag::SlruKind; use pageserver_api::key::{slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, TWOPHASEDIR_KEY, CONTROLFILE_KEY, CHECKPOINT_KEY}; -use pageserver_api::key::rel_key_range; use utils::bin_ser::BeSer; use std::collections::HashSet; +use std::ops::Range; pub struct PgImportEnv { ctx: RequestContext, @@ -41,6 +43,8 @@ pub struct PgImportEnv { pgdata_lsn: Lsn, + tasks: Vec, + layers: Vec, } @@ -69,6 +73,7 @@ impl PgImportEnv { tsi, pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported + tasks: Vec::new(), layers: Vec::new(), }) } @@ -90,48 +95,54 @@ impl PgImportEnv { // Import dbdir (00:00:00 keyspace) // This is just constructed here, but will be written to the image layer in the first call to import_db() - let mut dbdir_buf = Some(Bytes::from(DbDirectory::ser(&DbDirectory { + let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory { dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(), - })?)); - // Import databases (00:spcnode:dbnode keyspace for each db) - let mut start_key = Key::MIN; - for db in datadir.dbs { - start_key = self.import_db(start_key, &db, dbdir_buf.take()).await?; - } + })?); + self.tasks.push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into()); - let mut tail_layer = ImageLayerWriter::new( - &self.conf, - self.tli, - self.tsi, - &(start_key..Key::NON_L0_MAX), - pgdata_lsn, - &self.ctx, - ).await?; + // Import databases (00:spcnode:dbnode keyspace for each db) + for db in datadir.dbs { + self.import_db(&db).await?; + } // Import SLRUs // pg_xact (01:00 keyspace) - self.import_slru(&mut tail_layer, SlruKind::Clog, &pgdata_path.join("pg_xact")).await?; + self.import_slru(SlruKind::Clog, &pgdata_path.join("pg_xact")).await?; // pg_multixact/members (01:01 keyspace) - self.import_slru(&mut tail_layer, SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?; + self.import_slru(SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?; // pg_multixact/offsets (01:02 keyspace) - self.import_slru(&mut tail_layer, SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?; + self.import_slru(SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?; // Import pg_twophase. // TODO: as empty let twophasedir_buf = TwoPhaseDirectory::ser( &TwoPhaseDirectory { xids: HashSet::new() } )?; - tail_layer.put_image(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf), &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf)))); // Controlfile, checkpoint - tail_layer.put_image(CONTROLFILE_KEY, Bytes::from(controlfile_buf), &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CONTROLFILE_KEY, Bytes::from(controlfile_buf)))); let checkpoint_buf = control_file.checkPointCopy.encode()?; - tail_layer.put_image(CHECKPOINT_KEY, checkpoint_buf, &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CHECKPOINT_KEY, checkpoint_buf))); - let layerdesc = tail_layer.finish_raw(&self.ctx).await?; - self.layers.push(layerdesc); + + // Execution time + let mut last_end_key = Key::MIN; + let mut current_chunk = Vec::new(); + let mut current_chunk_size: usize = 0; + for task in std::mem::take(&mut self.tasks).into_iter() { + if current_chunk_size + task.total_size() > 1024*1024*1024 { + let key_range = last_end_key..task.key_range().start; + self.flush_chunk(key_range.clone(), std::mem::take(&mut current_chunk)).await?; + last_end_key = key_range.end; + current_chunk_size = 0; + } + current_chunk_size += task.total_size(); + current_chunk.push(task); + } + self.flush_chunk(last_end_key..Key::NON_L0_MAX, current_chunk).await?; // should we anything about the wal? @@ -143,146 +154,45 @@ impl PgImportEnv { async fn import_db( &mut self, - start_key: Key, db: &PgDataDirDb, - mut dbdir_buf: Option, - ) -> anyhow::Result { + ) -> anyhow::Result<()> { debug!( "Importing database (path={}, tablespace={}, dboid={})", db.path, db.spcnode, db.dboid ); + // Import relmap (00:spcnode:dbnode:00:*:00) + let relmap_key = relmap_file_key(db.spcnode, db.dboid); + debug!("Constructing relmap entry, key {relmap_key}"); + let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?; + let relmap_buf = read_all_bytes(&mut relmap_file).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(relmap_key, relmap_buf))); + + // Import reldir (00:spcnode:dbnode:00:*:01) + let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); + debug!("Constructing reldirs entry, key {reldir_key}"); + let reldir_buf = RelDirectory::ser(&RelDirectory { + rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(), + })?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(reldir_key, Bytes::from(reldir_buf)))); + // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff) - - let mut chunks: Vec> = Vec::new(); - let mut accumulated_bytes: usize = 0; - let mut current_chunk: Vec<&PgDataDirDbFile> = Vec::new(); - for file in &db.files { - let real_size = if let Some(nblocks) = file.nblocks { - nblocks * 8192 - } else { - 1024*1024*1024 - }; - if accumulated_bytes + real_size >= 512*1024*1024 { - chunks.push(current_chunk); - current_chunk = Vec::new(); - accumulated_bytes = 0; + let len = metadata(&file.path)?.len() as usize; + ensure!(len % 8192 == 0); + let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192); + let start_key = rel_block_to_key(file.rel_tag, start_blk); + let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32); + self.tasks.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(start_key..end_key, &file.path))); + + // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) + if let Some(nblocks) = file.nblocks { + let size_key = rel_size_to_key(file.rel_tag); + //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); + let buf = nblocks.to_le_bytes(); + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(size_key, Bytes::from(buf.to_vec())))); } - current_chunk.push(file); - accumulated_bytes += real_size; - }; - if !current_chunk.is_empty() { - chunks.push(current_chunk); - } - - let mut last_key_end: Key = start_key; - let mut first = true; - for chunk in chunks { - let key_start = last_key_end; - let last_file = chunk.last().unwrap(); - let key_end = if last_file.nblocks.is_some() { - rel_key_range(last_file.rel_tag).end - } else { - let end_blknum = (last_file.segno + 1) * (1024 * 1024 * 1024 / 8192); - rel_block_to_key(last_file.rel_tag, end_blknum) - }; - let mut layer_writer = ImageLayerWriter::new( - &self.conf, - self.tli, - self.tsi, - &(key_start..key_end), - self.pgdata_lsn, - &self.ctx, - ).await?; - - if first { - if let Some(dbdir_buf) = dbdir_buf.take() { - layer_writer.put_image(DBDIR_KEY, dbdir_buf, &self.ctx).await?; - } - - // Import relmap (00:spcnode:dbnode:00:*:00) - let relmap_key = relmap_file_key(db.spcnode, db.dboid); - debug!("Constructing relmap entry, key {relmap_key}"); - let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?; - let relmap_buf = read_all_bytes(&mut relmap_file).await?; - layer_writer.put_image(relmap_key, relmap_buf, &self.ctx).await?; - - // Import reldir (00:spcnode:dbnode:00:*:01) - let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); - debug!("Constructing reldirs entry, key {reldir_key}"); - let reldir_buf = RelDirectory::ser(&RelDirectory { - rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(), - })?; - layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?; - - first = false; - } - - for file in chunk { - self.import_rel_file(&mut layer_writer, &file).await?; - } - last_key_end = key_end; - - let layerdesc = layer_writer.finish_raw(&self.ctx).await?; - self.layers.push(layerdesc); - } - - Ok(last_key_end) - } - - async fn import_rel_file( - &mut self, - layer_writer: &mut ImageLayerWriter, - segment: &PgDataDirDbFile, - ) -> anyhow::Result<()> { - let (path, rel_tag, segno) = (&segment.path, segment.rel_tag, segment.segno); - - debug!("Importing relation file (path={path}, rel_tag={rel_tag}, segno={segno})"); - let start = Instant::now(); - - let mut reader = tokio::fs::File::open(&path).await?; - let len = metadata(&path)?.len() as usize; - - let mut buf: [u8; 8192] = [0u8; 8192]; - - ensure!(len % BLCKSZ as usize == 0); - let nblocks = len / BLCKSZ as usize; - - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32); - - loop { - let r = reader.read_exact(&mut buf).await; - match r { - Ok(_) => { - let key = rel_block_to_key(rel_tag.clone(), blknum); - layer_writer.put_image(key, Bytes::copy_from_slice(&buf), &self.ctx).await?; - } - - Err(err) => match err.kind() { - std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected. - let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32); - ensure!(relative_blknum == nblocks as u32, "unexpected EOF"); - break; - } - _ => { - bail!("error reading file {}: {:#}", path, err); - } - }, - }; - blknum += 1; - } - - debug!("Importing relation file (path={path}, rel_tag={rel_tag}, segno={segno}): done in {:.6} s", start.elapsed().as_secs_f64()); - - // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) - if let Some(nblocks) = segment.nblocks { - let size_key = rel_size_to_key(rel_tag); - debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); - let buf = nblocks.to_le_bytes(); - layer_writer.put_image(size_key, Bytes::from(buf.to_vec()), &self.ctx).await?; } Ok(()) @@ -290,7 +200,6 @@ impl PgImportEnv { async fn import_slru( &mut self, - layer_writer: &mut ImageLayerWriter, kind: SlruKind, path: &Utf8PathBuf, ) -> anyhow::Result<()> { @@ -312,39 +221,22 @@ impl PgImportEnv { segments: segnos, }; let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?; - layer_writer.put_image(slrudir_key, slrudir_buf.into(), &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(slrudir_key, Bytes::from(slrudir_buf)))); for (segpath, segno) in segments { // SlruSegBlocks for each segment let p = path.join(Utf8PathBuf::from(segpath)); - let mut reader = tokio::fs::File::open(&p).await - .context(format!("opening {}", &p))?; - - let mut rpageno = 0; - loop { - let mut buf: Vec = Vec::new(); - buf.resize(8192, 0); - let r = reader.read_exact(&mut buf).await; - match r { - Ok(_) => {}, - Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected - break; - } - Err(err) => { - bail!("error reading file {}: {:#}", &p, err); - } - }; - let slruseg_key = slru_block_to_key(kind, segno, rpageno); - layer_writer.put_image(slruseg_key, Bytes::from(buf), &self.ctx).await?; - rpageno += 1; - } - let npages: u32 = rpageno; + let file_size = std::fs::metadata(&p)?.len(); + ensure!(file_size % 8192 == 0); + let nblocks = u32::try_from(file_size / 8192)?; + let start_key = slru_block_to_key(kind, segno, 0); + let end_key = slru_block_to_key(kind, segno, nblocks); + self.tasks.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(start_key..end_key, &p))); // Followed by SlruSegSize let segsize_key = slru_segment_size_to_key(kind, segno); - let segsize_buf = npages.to_le_bytes(); - layer_writer.put_image(segsize_key, Bytes::copy_from_slice(&segsize_buf), &self.ctx).await?; + let segsize_buf = nblocks.to_le_bytes(); + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(segsize_key, Bytes::copy_from_slice(&segsize_buf)))); } Ok(()) } @@ -392,6 +284,26 @@ impl PgImportEnv { Ok(()) } + + + async fn flush_chunk(&mut self, key_range: Range, chunk: Vec) -> anyhow::Result<()> { + let mut layer = ImageLayerWriter::new( + &self.conf, + self.tli, + self.tsi, + &key_range, + self.pgdata_lsn, + &self.ctx, + ).await?; + + for task in chunk { + task.doit(&mut layer, &self.ctx).await?; + } + + let layerdesc = layer.finish_raw(&self.ctx).await?; + self.layers.push(layerdesc); + Ok(()) + } } // @@ -529,3 +441,155 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> anyhow::Result reader.read_to_end(&mut buf).await?; Ok(Bytes::from(buf)) } + +trait ImportTask { + fn key_range(&self) -> Range; + + fn total_size(&self) -> usize { + if is_contiguous_range(&self.key_range()) { + contiguous_range_len(&self.key_range()) as usize * 8192 + } else { + u32::MAX as usize + } + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()>; +} + +struct ImportSingleKeyTask { + key: Key, + buf: Bytes, +} + +impl ImportSingleKeyTask { + fn new(key: Key, buf: Bytes) -> Self { + ImportSingleKeyTask { key, buf } + } +} + +impl ImportTask for ImportSingleKeyTask { + fn key_range(&self) -> Range { + singleton_range(self.key) + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + layer_writer.put_image(self.key, self.buf, ctx).await?; + Ok(()) + } +} + +struct ImportRelBlocksTask { + key_range: Range, + path: Utf8PathBuf, +} + +impl ImportRelBlocksTask { + fn new(key_range: Range, path: &Utf8Path) -> Self { + ImportRelBlocksTask { + key_range, + path: path.into() + } + } +} + +impl ImportTask for ImportRelBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + debug!("Importing relation file {}", self.path); + let mut reader = tokio::fs::File::open(&self.path).await?; + let mut buf: [u8; 8192] = [0u8; 8192]; + + let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?; + let (_rel_tag, end_blk) = self.key_range.end.to_rel_block()?; + let mut blknum = start_blk; + while blknum < end_blk { + reader.read_exact(&mut buf).await?; + let key = rel_block_to_key(rel_tag.clone(), blknum); + layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?; + blknum += 1; + } + Ok(()) + } +} + +struct ImportSlruBlocksTask { + key_range: Range, + path: Utf8PathBuf, +} + +impl ImportSlruBlocksTask { + fn new(key_range: Range, path: &Utf8Path) -> Self { + ImportSlruBlocksTask { + key_range, + path: path.into() + } + } +} + +impl ImportTask for ImportSlruBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + debug!("Importing SLRU segment file {}", self.path); + let mut reader = tokio::fs::File::open(&self.path).await + .context(format!("opening {}", &self.path))?; + let mut buf: [u8; 8192] = [0u8; 8192]; + + let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?; + let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?; + let mut blknum = start_blk; + while blknum < end_blk { + reader.read_exact(&mut buf).await?; + let key = slru_block_to_key(kind, segno, blknum); + layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?; + blknum += 1; + } + Ok(()) + } +} + +enum AnyImportTask { + SingleKey(ImportSingleKeyTask), + RelBlocks(ImportRelBlocksTask), + SlruBlocks(ImportSlruBlocksTask), +} + +impl ImportTask for AnyImportTask { + fn key_range(&self) -> Range { + match self { + Self::SingleKey(t) => t.key_range(), + Self::RelBlocks(t) => t.key_range(), + Self::SlruBlocks(t) => t.key_range() + } + } + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + match self { + Self::SingleKey(t) => t.doit(layer_writer, ctx).await, + Self::RelBlocks(t) => t.doit(layer_writer, ctx).await, + Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await, + } + } +} + +impl From for AnyImportTask { + fn from(t: ImportSingleKeyTask) -> Self { + Self::SingleKey(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportRelBlocksTask) -> Self { + Self::RelBlocks(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportSlruBlocksTask) -> Self { + Self::SlruBlocks(t) + } +}