Fix issues with garbage collector

This commit is contained in:
Konstantin Knizhnik
2021-11-03 12:15:24 +03:00
parent a3e94e888a
commit 9947de4a2a
8 changed files with 171 additions and 156 deletions

4
Cargo.lock generated
View File

@@ -2571,9 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17eda78e0aabbe323628af9b09766827e5b321b5e742cb38a6dd8f09c60a0c1"
version = "0.1.8"
dependencies = [
"anyhow",
"crc32c",

View File

@@ -15,3 +15,7 @@ members = [
# This is useful for profiling and, to some extent, debug.
# Besides, debug info should not affect the performance.
debug = true
panic = 'abort'
[profile.dev]
panic = 'abort'

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.1.6"
yakv = "0.1.8"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -59,6 +59,13 @@ const METADATA_MAX_SAFE_SIZE: usize = 512;
const METADATA_CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
const METADATA_MAX_DATA_SIZE: usize = METADATA_MAX_SAFE_SIZE - METADATA_CHECKSUM_SIZE;
const ZERO_TAG: RelTag = RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: 0,
};
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
@@ -279,6 +286,12 @@ impl Repository for BufferedRepository {
/// Private functions
impl BufferedRepository {
fn get_buffered_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<BufferedTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
Ok(self.get_timeline_locked(timelineid, &mut timelines)?)
}
// Implementation of the public `get_timeline` function. This differs from the public
// interface in that the caller must already hold the mutex on the 'timelines' hashmap.
fn get_timeline_locked(
@@ -374,12 +387,19 @@ impl BufferedRepository {
fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
trace!("checkpointer thread for tenant {} waking up", self.tenantid);
info!("checkpointer thread for tenant {} waking up", self.tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
{
let timelines = self.timelines.lock().unwrap();
let timelines: Vec<(ZTimelineId, Arc<BufferedTimeline>)> = self
.timelines
.lock()
.unwrap()
.iter()
.map(|pair| (pair.0.clone(), pair.1.clone()))
.collect();
//let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid)
@@ -421,7 +441,8 @@ impl BufferedRepository {
while !tenant_mgr::shutdown_requested() {
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
let result = self.gc_iteration(None, conf.gc_horizon, false)?;
info!("GC result: {:?}", result);
}
// TODO Write it in more adequate way using
@@ -536,7 +557,7 @@ impl BufferedRepository {
// grab mutex to prevent new timelines from being created here.
// TODO: We will hold it for a long time
let mut timelines = self.timelines.lock().unwrap();
//let mut timelines = self.timelines.lock().unwrap();
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
@@ -560,7 +581,7 @@ impl BufferedRepository {
//Now collect info about branchpoints
let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new();
for timelineid in &timelineids {
let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?;
let timeline = self.get_buffered_timeline(*timelineid)?;
if let Some(ancestor_timeline) = &timeline.ancestor_timeline {
// If target_timeline is specified, we only need to know branchpoints of its childs
@@ -582,7 +603,7 @@ impl BufferedRepository {
for timelineid in timelineids {
// We have already loaded all timelines above
// so this operation is just a quick map lookup.
let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?;
let timeline = self.get_buffered_timeline(timelineid)?;
// If target_timeline is specified, only GC it
if let Some(target_timelineid) = target_timelineid {
@@ -1163,12 +1184,7 @@ impl BufferedTimeline {
fn checkpoint_internal(&self, checkpoint_distance: u64, _forced: bool) -> Result<()> {
// From boundary is constant and till boundary is changed at each iteration.
let from = StoreKey::Data(DataKey {
rel: RelishTag::Relation(RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: 0,
}),
rel: RelishTag::Relation(ZERO_TAG),
blknum: 0,
lsn: Lsn(0),
})
@@ -1194,6 +1210,7 @@ impl BufferedTimeline {
if let Some(entry) = iter.next_back() {
let pair = entry?;
let key = pair.0;
debug_assert!(key < till);
if let StoreKey::Data(dk) = StoreKey::des(&key)? {
let ver = PageVersion::des(&pair.1)?;
if let PageVersion::Delta(rec) = ver {
@@ -1277,7 +1294,7 @@ impl BufferedTimeline {
&metadata,
false,
)?;
trace!("Checkpoint {} records", n_checkpointed_records);
info!("Checkpoint {} records", n_checkpointed_records);
if self.upload_relishes {
schedule_timeline_upload(())
@@ -1322,108 +1339,160 @@ impl BufferedTimeline {
info!("GC starting");
let mut from_rel = RelishTag::Relation(RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: 0,
});
let mut from_rel = RelishTag::Relation(ZERO_TAG);
let mut from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn(0),
});
// We can not remove deteriorated version immediately, we need to check first that successor exists
let mut last_key: Option<yakv::storage::Key> = None;
// Keep tracked dropped relish
let mut dropped: HashSet<RelishTag> = HashSet::new();
'meta: loop {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from.ser()?..);
// We can not remove deteriorated version immediately, we need to check first that successor exists
let mut last_key: Option<yakv::storage::Key> = None;
while let Some(entry) = iter.next() {
let raw_key = entry?.0;
let pair = entry?;
let raw_key = pair.0;
let key = StoreKey::des(&raw_key)?;
if let StoreKey::Metadata(dk) = key {
// processing metadata
let same_rel = from_rel == dk.rel;
if !same_rel {
// we jumped to the next relation
result.meta_total += 1;
from_rel = dk.rel;
from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn(0),
});
}
if dk.lsn < cutoff {
// we have something deteriorated
// Has previos version
if let Some(prev_key) = last_key {
// We are still on the same relish...
if from_rel == dk.rel {
// If we are still on the same relish...
if same_rel {
// then drop previus version as it is not needed any more
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(&prev_key)?;
last_key = None;
result.meta_removed += 1;
// We should reset iterator and start from the current point
continue 'meta;
}
}
from_rel = dk.rel;
from = key;
// Remember key as candidate for deletion
last_key = Some(raw_key);
let meta = MetadataValue::des(&pair.1)?;
if meta.size.is_none() {
// object was dropped, so we can immediately remove deteriorated version
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(&raw_key)?;
dropped.insert(dk.rel);
result.meta_dropped += 1;
// We should reset iterator and start from the current point
continue 'meta;
} else {
// Remember key as candidate for deletion and contiune iteration
last_key = Some(raw_key);
}
} else {
from_rel = dk.rel;
// We reached version which should be preserved to enable PITR, so jump to the next object
from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn::MAX,
});
last_key = None;
continue 'meta;
}
} else {
// End of metadata
break 'meta;
}
}
break;
}
// Array to accumulate keys we can remove
from_rel = RelishTag::Relation(ZERO_TAG);
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: 0,
lsn: Lsn(0),
});
// Array to accumulate keys we can remove.
// Place it outside main loop to reduce number of dynamic memory allocations
let mut deteriorated: Vec<yakv::storage::Key> = Vec::new();
// currently proceed block number
let mut from_blknum = 0;
'data: loop {
'pages: loop {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from.ser()?..);
deteriorated.clear();
while let Some(entry) = iter.next() {
let pair = entry?;
let raw_key = pair.0;
let key = StoreKey::des(&raw_key)?;
if let StoreKey::Data(dk) = key {
if dk.lsn < cutoff {
// we have something deteriorated
let ver = PageVersion::des(&pair.1)?;
// We are still on the same page...
if from_rel == dk.rel && from_blknum == dk.blknum {
if let PageVersion::Image(_) = ver {
// We have full page image: remove all preceding deteriorated records
drop(store);
let mut store = self.store.write().unwrap();
for key in deteriorated.iter() {
store.data.remove(key)?;
}
deteriorated.clear();
// We should reset iterator and start from the current point
continue 'data;
}
// No full page image, so we can't remove deteriorated stuff
deteriorated.clear();
}
// Remember key as candidate for deletion
deteriorated.push(raw_key);
let same_page = from_rel == dk.rel && from_blknum == dk.blknum;
if !same_page {
result.pages_total += 1;
from_rel = dk.rel;
from_blknum = dk.blknum;
from = key;
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: from_blknum,
lsn: Lsn(0),
});
}
if dk.lsn < cutoff {
// we have something deteriorated
// If we still on the same page...
if same_page {
if !deteriorated.is_empty() {
// .. and have something to remove
// ... and have page image
let ver = PageVersion::des(&pair.1)?;
if let PageVersion::Image(_) = ver {
// ... then remove all previously accumulated deltas and images, as them are not needed any more
drop(store);
let mut store = self.store.write().unwrap();
for key in deteriorated.iter() {
store.data.remove(key)?;
}
result.pages_removed += deteriorated.len() as u64;
// We should reset iterator and start from the current point
continue 'pages;
}
}
} else {
// we jumped to the next relation
deteriorated.clear();
}
if dropped.contains(&dk.rel) {
// This relations was dropped beyond PITR interval:
// we can remove all its pages
assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation
drop(store);
let mut store = self.store.write().unwrap();
// We should reset iterator and start from the current point
store.data.remove(&raw_key)?;
result.pages_dropped += 1;
continue 'pages;
}
// Remember key as candidate for deletion and continue iteration
deteriorated.push(raw_key);
} else {
// Jump to next page
from_rel = dk.rel;
// We reached version which should be preserved to enable PITR, so jump to the next object
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: from_blknum + 1,
lsn: Lsn(0),
});
deteriorated.clear();
}
} else {
break 'data;
break 'pages;
}
}
break;

View File

@@ -35,7 +35,7 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 1 * 1024 * 1024;
pub const DEFAULT_GC_HORIZON: u64 = 1600_000_000u64;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";

View File

@@ -693,67 +693,21 @@ impl postgres_backend::Handler for PageServerHandler {
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layer_relfiles_total"),
RowDescriptor::int8_col(b"layer_relfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"layer_relfiles_needed_by_branches"),
RowDescriptor::int8_col(b"layer_relfiles_not_updated"),
RowDescriptor::int8_col(b"layer_relfiles_needed_as_tombstone"),
RowDescriptor::int8_col(b"layer_relfiles_removed"),
RowDescriptor::int8_col(b"layer_relfiles_dropped"),
RowDescriptor::int8_col(b"layer_nonrelfiles_total"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_branches"),
RowDescriptor::int8_col(b"layer_nonrelfiles_not_updated"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_as_tombstone"),
RowDescriptor::int8_col(b"layer_nonrelfiles_removed"),
RowDescriptor::int8_col(b"layer_nonrelfiles_dropped"),
RowDescriptor::int8_col(b"meta_total"),
RowDescriptor::int8_col(b"meta_removed"),
RowDescriptor::int8_col(b"meta_dropped"),
RowDescriptor::int8_col(b"pages_total"),
RowDescriptor::int8_col(b"pages_removed"),
RowDescriptor::int8_col(b"pages_dropped"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(result.ondisk_relfiles_total.to_string().as_bytes()),
Some(
result
.ondisk_relfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
result
.ondisk_relfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(result.ondisk_relfiles_not_updated.to_string().as_bytes()),
Some(
result
.ondisk_relfiles_needed_as_tombstone
.to_string()
.as_bytes(),
),
Some(result.ondisk_relfiles_removed.to_string().as_bytes()),
Some(result.ondisk_relfiles_dropped.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_total.to_string().as_bytes()),
Some(
result
.ondisk_nonrelfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
result
.ondisk_nonrelfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()),
Some(
result
.ondisk_nonrelfiles_needed_as_tombstone
.to_string()
.as_bytes(),
),
Some(result.ondisk_nonrelfiles_removed.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_dropped.to_string().as_bytes()),
Some(result.meta_total.to_string().as_bytes()),
Some(result.meta_removed.to_string().as_bytes()),
Some(result.meta_dropped.to_string().as_bytes()),
Some(result.pages_total.to_string().as_bytes()),
Some(result.pages_removed.to_string().as_bytes()),
Some(result.pages_dropped.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -44,45 +44,27 @@ pub trait Repository: Send + Sync {
///
/// Result of performing GC
///
#[derive(Default)]
#[derive(Default, Debug)]
pub struct GcResult {
pub ondisk_relfiles_total: u64,
pub ondisk_relfiles_needed_by_cutoff: u64,
pub ondisk_relfiles_needed_by_branches: u64,
pub ondisk_relfiles_not_updated: u64,
pub ondisk_relfiles_needed_as_tombstone: u64,
pub ondisk_relfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
pub ondisk_relfiles_dropped: u64, // # of layer files removed because the relation was dropped
pub meta_removed: u64, // removed versions beyond PITR interval for which new page image exists
pub meta_dropped: u64, // removed versions beyond PITR interval of dropped relations
pub meta_total: u64, // total number of metaobject version histories
pub ondisk_nonrelfiles_total: u64,
pub ondisk_nonrelfiles_needed_by_cutoff: u64,
pub ondisk_nonrelfiles_needed_by_branches: u64,
pub ondisk_nonrelfiles_not_updated: u64,
pub ondisk_nonrelfiles_needed_as_tombstone: u64,
pub ondisk_nonrelfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
pub ondisk_nonrelfiles_dropped: u64, // # of layer files removed because the relation was dropped
pub pages_removed: u64, // removed versions beyond PITR interval for which new page image exists
pub pages_dropped: u64, // removed versions beyond PITR interval of dropped relations
pub pages_total: u64, // total number of page vaersion histories
pub elapsed: Duration,
}
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.ondisk_relfiles_total += other.ondisk_relfiles_total;
self.ondisk_relfiles_needed_by_cutoff += other.ondisk_relfiles_needed_by_cutoff;
self.ondisk_relfiles_needed_by_branches += other.ondisk_relfiles_needed_by_branches;
self.ondisk_relfiles_not_updated += other.ondisk_relfiles_not_updated;
self.ondisk_relfiles_needed_as_tombstone += other.ondisk_relfiles_needed_as_tombstone;
self.ondisk_relfiles_removed += other.ondisk_relfiles_removed;
self.ondisk_relfiles_dropped += other.ondisk_relfiles_dropped;
self.ondisk_nonrelfiles_total += other.ondisk_nonrelfiles_total;
self.ondisk_nonrelfiles_needed_by_cutoff += other.ondisk_nonrelfiles_needed_by_cutoff;
self.ondisk_nonrelfiles_needed_by_branches += other.ondisk_nonrelfiles_needed_by_branches;
self.ondisk_nonrelfiles_not_updated += other.ondisk_nonrelfiles_not_updated;
self.ondisk_nonrelfiles_needed_as_tombstone += other.ondisk_nonrelfiles_needed_as_tombstone;
self.ondisk_nonrelfiles_removed += other.ondisk_nonrelfiles_removed;
self.ondisk_nonrelfiles_dropped += other.ondisk_nonrelfiles_dropped;
self.meta_total += other.meta_total;
self.meta_removed += other.meta_removed;
self.meta_dropped += other.meta_dropped;
self.pages_total += other.pages_total;
self.pages_removed += other.pages_removed;
self.pages_dropped += other.pages_dropped;
self.elapsed += other.elapsed;
}
}

View File

@@ -3,7 +3,9 @@ use lz4_flex;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use tracing::*;
use yakv::storage::{Key, Storage, StorageIterator, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
@@ -70,6 +72,7 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> {
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next_back() {
if let Ok((key, value)) = elem {
assert!(value.len() != 0);
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
@@ -90,7 +93,12 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> {
}
next_segno = segno;
if next_segno == 0 {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
let toast = toast.unwrap();
if toast.len() == 0 {
warn!("n_segments={}", n_segments);
}
assert!(toast.len() != 0);
let res = lz4_flex::decompress_size_prepended(&toast);
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {