import relation sizes

This commit is contained in:
Stas Kelvich
2024-09-12 18:19:25 +01:00
parent 80fed9cfb1
commit b81dbc887b

View File

@@ -5,10 +5,10 @@ use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag};
use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag};
use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, ControlFileData, BLCKSZ};
use tokio::io::AsyncRead;
use tracing::{debug};
use tracing::debug;
use utils::{id::{NodeId, TenantId, TimelineId}, shard::{ShardCount, ShardNumber, TenantShardId}};
use walkdir::WalkDir;
@@ -148,23 +148,21 @@ impl PgImportEnv {
})?;
layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?;
// Import data (00:spcnode:dbnode:reloid:fork:blk)
// Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
// segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
for file in &db.files {
self.import_rel_file(layer_writer, &file.path, &file.rel_tag, file.segno).await?;
self.import_rel_file(layer_writer, &file).await?;
};
// Import relsize (00:spcnode:dbnode:reloid:fork:ff)
Ok(())
}
async fn import_rel_file(
&mut self,
layer_writer: &mut ImageLayerWriter,
path: &Utf8PathBuf,
rel_tag: &RelTag,
segno: u32,
segment: &PgDataDirDbFile,
) -> anyhow::Result<()> {
let (path, rel_tag, segno) = (&segment.path, segment.rel_tag, segment.segno);
debug!("Importing relation file (path={path}, rel_tag={rel_tag}, segno={segno})");
let mut reader = tokio::fs::File::open(&path).await?;
@@ -200,6 +198,14 @@ impl PgImportEnv {
blknum += 1;
}
// Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
if let Some(nblocks) = segment.nblocks {
let size_key = rel_size_to_key(rel_tag);
debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
let buf = nblocks.to_le_bytes();
layer_writer.put_image(size_key, Bytes::from(buf.to_vec()), &self.ctx).await?;
}
Ok(())
}
@@ -263,6 +269,9 @@ struct PgDataDirDbFile {
pub path: Utf8PathBuf,
pub rel_tag: RelTag,
pub segno: u32,
// Cummulative size of the given fork, set only for the last segment of that fork
pub nblocks: Option<usize>,
}
impl PgDataDir {
@@ -306,34 +315,64 @@ impl PgDataDir {
impl PgDataDirDb {
fn new(db_path: Utf8PathBuf, spcnode: u32, dboid: u32, datadir_path: &Utf8PathBuf) -> Self {
PgDataDirDb {
files: WalkDir::new(&db_path)
.min_depth(1)
.max_depth(2)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
let relfile = path.file_name().to_string_lossy();
// returns (relnode, forknum, segno)
parse_relfilename(&relfile).ok()
})
let mut files: Vec<PgDataDirDbFile> = WalkDir::new(&db_path)
.min_depth(1)
.max_depth(2)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
let relfile = path.file_name().to_string_lossy();
// returns (relnode, forknum, segno)
parse_relfilename(&relfile).ok()
})
.sorted()
.map(|(relnode, forknum, segno)| {
let rel_tag = RelTag {
spcnode,
dbnode: dboid,
relnode,
forknum,
};
})
.sorted()
.map(|(relnode, forknum, segno)| {
let rel_tag = RelTag {
spcnode,
dbnode: dboid,
relnode,
forknum,
};
PgDataDirDbFile {
path: datadir_path.join(rel_tag.to_segfile_name(segno)),
rel_tag,
segno,
}
})
.collect(),
let path = datadir_path.join(rel_tag.to_segfile_name(segno));
let len = metadata(&path).unwrap().len() as usize;
assert!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
PgDataDirDbFile {
path,
rel_tag,
segno,
nblocks: Some(nblocks), // first non-cummulative sizes
}
})
.collect();
// Set cummulative sizes. Do all of that math here, so that later we could easier
// parallelize over segments and know with which segments we need to write relsize
// entry.
let mut cumulative_nblocks: usize= 0;
let mut prev_rel_tag: Option<RelTag> = None;
for i in 0..files.len() {
if prev_rel_tag == Some(files[i].rel_tag) {
cumulative_nblocks += files[i].nblocks.unwrap();
} else {
cumulative_nblocks = files[i].nblocks.unwrap();
}
files[i].nblocks = if i == files.len() - 1 || files[i+1].rel_tag != files[i].rel_tag {
Some(cumulative_nblocks)
} else {
None
};
prev_rel_tag = Some(files[i].rel_tag);
}
PgDataDirDb {
files,
path: db_path,
spcnode,
dboid,