diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index c9a2a8e0e7..710eb93954 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -5,10 +5,10 @@ use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use itertools::Itertools; -use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag}; +use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag}; use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, ControlFileData, BLCKSZ}; use tokio::io::AsyncRead; -use tracing::{debug}; +use tracing::debug; use utils::{id::{NodeId, TenantId, TimelineId}, shard::{ShardCount, ShardNumber, TenantShardId}}; use walkdir::WalkDir; @@ -148,23 +148,21 @@ impl PgImportEnv { })?; layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?; - // Import data (00:spcnode:dbnode:reloid:fork:blk) + // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last + // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff) for file in &db.files { - self.import_rel_file(layer_writer, &file.path, &file.rel_tag, file.segno).await?; + self.import_rel_file(layer_writer, &file).await?; }; - // Import relsize (00:spcnode:dbnode:reloid:fork:ff) - Ok(()) } async fn import_rel_file( &mut self, layer_writer: &mut ImageLayerWriter, - path: &Utf8PathBuf, - rel_tag: &RelTag, - segno: u32, + segment: &PgDataDirDbFile, ) -> anyhow::Result<()> { + let (path, rel_tag, segno) = (&segment.path, segment.rel_tag, segment.segno); debug!("Importing relation file (path={path}, rel_tag={rel_tag}, segno={segno})"); let mut reader = tokio::fs::File::open(&path).await?; @@ -200,6 +198,14 @@ impl PgImportEnv { blknum += 1; } + // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) + if let Some(nblocks) = segment.nblocks { + let size_key = rel_size_to_key(rel_tag); + debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); + let buf = nblocks.to_le_bytes(); + layer_writer.put_image(size_key, Bytes::from(buf.to_vec()), &self.ctx).await?; + } + Ok(()) } @@ -263,6 +269,9 @@ struct PgDataDirDbFile { pub path: Utf8PathBuf, pub rel_tag: RelTag, pub segno: u32, + + // Cummulative size of the given fork, set only for the last segment of that fork + pub nblocks: Option, } impl PgDataDir { @@ -306,34 +315,64 @@ impl PgDataDir { impl PgDataDirDb { fn new(db_path: Utf8PathBuf, spcnode: u32, dboid: u32, datadir_path: &Utf8PathBuf) -> Self { - PgDataDirDb { - files: WalkDir::new(&db_path) - .min_depth(1) - .max_depth(2) - .into_iter() - .filter_map(|entry| { - entry.ok().and_then(|path| { - let relfile = path.file_name().to_string_lossy(); - // returns (relnode, forknum, segno) - parse_relfilename(&relfile).ok() - }) + let mut files: Vec = WalkDir::new(&db_path) + .min_depth(1) + .max_depth(2) + .into_iter() + .filter_map(|entry| { + entry.ok().and_then(|path| { + let relfile = path.file_name().to_string_lossy(); + // returns (relnode, forknum, segno) + parse_relfilename(&relfile).ok() }) - .sorted() - .map(|(relnode, forknum, segno)| { - let rel_tag = RelTag { - spcnode, - dbnode: dboid, - relnode, - forknum, - }; + }) + .sorted() + .map(|(relnode, forknum, segno)| { + let rel_tag = RelTag { + spcnode, + dbnode: dboid, + relnode, + forknum, + }; - PgDataDirDbFile { - path: datadir_path.join(rel_tag.to_segfile_name(segno)), - rel_tag, - segno, - } - }) - .collect(), + let path = datadir_path.join(rel_tag.to_segfile_name(segno)); + let len = metadata(&path).unwrap().len() as usize; + assert!(len % BLCKSZ as usize == 0); + let nblocks = len / BLCKSZ as usize; + + PgDataDirDbFile { + path, + rel_tag, + segno, + nblocks: Some(nblocks), // first non-cummulative sizes + } + }) + .collect(); + + // Set cummulative sizes. Do all of that math here, so that later we could easier + // parallelize over segments and know with which segments we need to write relsize + // entry. + let mut cumulative_nblocks: usize= 0; + let mut prev_rel_tag: Option = None; + for i in 0..files.len() { + if prev_rel_tag == Some(files[i].rel_tag) { + cumulative_nblocks += files[i].nblocks.unwrap(); + } else { + cumulative_nblocks = files[i].nblocks.unwrap(); + } + + files[i].nblocks = if i == files.len() - 1 || files[i+1].rel_tag != files[i].rel_tag { + Some(cumulative_nblocks) + } else { + None + }; + + prev_rel_tag = Some(files[i].rel_tag); + } + + + PgDataDirDb { + files, path: db_path, spcnode, dboid,