Compare commits

...

2 Commits

Author SHA1 Message Date
Bojan Serafimov
c4046713dd Inline to avoid locking 2023-11-30 12:52:13 -05:00
Bojan Serafimov
a71cd22e09 Speed up rel extend 2023-11-29 15:03:20 -05:00
2 changed files with 85 additions and 11 deletions

View File

@@ -205,6 +205,77 @@ impl Timeline {
Ok(total_blocks)
}
pub async fn get_rel_size_if_exists(
&self,
tag: RelTag,
lsn: Lsn,
latest: bool,
ctx: &RequestContext,
) -> Result<Option<BlockNumber>, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
// NOTE there exist helper functions for this but we inline it all
// so we can do the work without acquiring lock more than once
// and searching the hashmap more than once. This is a performance
// critical path.
// TODO we could get away with read lock if latest != false. But latest
// is not known at compile time so this work requires some rust type
// acrobatics to do. For now it's fine because we only call this
// with latest = true.
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
match rel_size_cache.entry(tag) {
hash_map::Entry::Occupied(entry) => {
return Ok(Some(entry.get().1));
}
hash_map::Entry::Vacant(entry) => {
let exists = {
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = self.get(key, lsn, ctx).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => dir.rels.get(&(tag.relnode, tag.forknum)).is_some(),
Err(e) => return Err(PageReconstructError::from(e)),
}
};
// Return early if not exists
if !exists {
if tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM {
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
// without extending it. Tolerate that by claiming that
// any non-existent FSM fork has size 0.
return Ok(Some(0));
} else {
return Ok(None);
}
}
// Get size from repo
let key = rel_size_to_key(tag);
let mut buf = self.get(key, lsn, ctx).await?;
let nblocks = buf.get_u32_le();
if latest {
// Update relation size cache only if "latest" flag is set.
// This flag is set by compute when it is working with most recent version of relation.
// Typically master compute node always set latest=true.
// Please notice, that even if compute node "by mistake" specifies old LSN but set
// latest=true, then it can not cause cache corruption, because with latest=true
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
// associated with most recent value of LSN.
entry.insert((lsn, nblocks));
}
Ok(Some(nblocks))
}
}
}
/// Get size of a relation file
pub async fn get_rel_size(
&self,

View File

@@ -1440,19 +1440,20 @@ impl<'a> WalIngest<'a> {
// record.
// TODO: would be nice if to be more explicit about it
let last_lsn = modification.lsn;
let old_nblocks = if !self
let old_nblocks = match self
.timeline
.get_rel_exists(rel, last_lsn, true, ctx)
.get_rel_size_if_exists(rel, last_lsn, true, ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
modification
.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
0
} else {
self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
Some(nblocks) => nblocks,
None => {
// create it with 0 size initially, the logic below will extend it
modification
.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
0
}
};
if new_nblocks > old_nblocks {
@@ -2150,9 +2151,11 @@ mod tests {
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
let mut n_records = 0;
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
n_records += 1;
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
@@ -2161,6 +2164,6 @@ mod tests {
}
let duration = started_at.elapsed();
println!("done in {:?}", duration);
println!("ingested {} records in {:?}", n_records, duration);
}
}