Cache relation size

This commit is contained in:
Konstantin Knizhnik
2021-05-29 13:10:54 +03:00
committed by Stas Kelvich
parent 5c70b52f4a
commit 09b2c66cf6

View File

@@ -24,8 +24,7 @@ use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::thread;
@@ -160,6 +159,11 @@ impl Repository for ObjectRepository {
}
}
struct RelMetadata {
size: Option<u32>,
last_updated: Lsn,
}
///
/// A handle to a specific timeline in the repository. This is the API
/// that's exposed to the rest of the system.
@@ -193,6 +197,8 @@ pub struct ObjectTimeline {
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
rel_meta: Mutex<BTreeMap<RelTag, RelMetadata>>,
}
impl ObjectTimeline {
@@ -218,6 +224,7 @@ impl ObjectTimeline {
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
ancestor_timeline: metadata.ancestor_timeline,
ancestor_lsn: metadata.ancestor_lsn,
rel_meta: Mutex::new(BTreeMap::new()),
};
Ok(timeline)
}
@@ -248,6 +255,17 @@ impl Timeline for ObjectTimeline {
/// Does relation exist at given LSN?
fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
{
let rel_meta = self.rel_meta.lock().unwrap();
if let Some(entry) = rel_meta.range(..=rel).next_back() {
if *entry.0 == rel {
let meta = entry.1;
if meta.last_updated <= lsn {
return Ok(meta.size.is_some());
}
}
}
}
let key = relation_size_key(self.timelineid, rel);
let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?;
if let Some((_key, _val)) = iter.next().transpose()? {
@@ -333,6 +351,14 @@ impl Timeline for ObjectTimeline {
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
let mut rel_meta = self.rel_meta.lock().unwrap();
rel_meta.insert(
tag.rel,
RelMetadata {
size: Some(new_nblocks),
last_updated: lsn,
},
);
}
Ok(())
@@ -373,6 +399,14 @@ impl Timeline for ObjectTimeline {
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
let mut rel_meta = self.rel_meta.lock().unwrap();
rel_meta.insert(
tag.rel,
RelMetadata {
size: Some(new_nblocks),
last_updated: lsn,
},
);
}
Ok(())
@@ -390,6 +424,14 @@ impl Timeline for ObjectTimeline {
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
let mut rel_meta = self.rel_meta.lock().unwrap();
rel_meta.insert(
rel,
RelMetadata {
size: Some(nblocks),
last_updated: lsn,
},
);
Ok(())
}
@@ -537,6 +579,17 @@ impl ObjectTimeline {
/// The caller must ensure that WAL has been received up to 'lsn'.
///
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<Option<u32>> {
{
let rel_meta = self.rel_meta.lock().unwrap();
if let Some(entry) = rel_meta.range(..=rel).next_back() {
if *entry.0 == rel {
let meta = entry.1;
if meta.last_updated <= lsn {
return Ok(meta.size);
}
}
}
}
let key = relation_size_key(self.timelineid, rel);
let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?;
@@ -550,6 +603,14 @@ impl ObjectTimeline {
version_lsn,
lsn
);
let mut rel_meta = self.rel_meta.lock().unwrap();
rel_meta.insert(
rel,
RelMetadata {
size: Some(nblocks),
last_updated: version_lsn,
},
);
Ok(Some(nblocks))
}
RelationSizeEntry::Unlink => {
@@ -558,6 +619,14 @@ impl ObjectTimeline {
rel,
version_lsn
);
let mut rel_meta = self.rel_meta.lock().unwrap();
rel_meta.insert(
rel,
RelMetadata {
size: None,
last_updated: version_lsn,
},
);
Ok(None)
}
}