Compare commits

...

5 Commits

Author SHA1 Message Date
Konstantin Knizhnik
c28b6573b4 Use 1Mb chunk instead of page for loading data from pageserver 2022-01-21 10:55:58 +03:00
Konstantin Knizhnik
cb70e63f34 Update test_snapfiles_gc test 2022-01-20 12:11:38 +03:00
Konstantin Knizhnik
e6c82c9609 Add max_image_layers and image_layer_generation_threshold parameters to config and rewrite criteria of image layer generation 2022-01-17 18:59:34 +03:00
Konstantin Knizhnik
79ade52535 Add comments and make changes suggested by reviewers
refer #1133
2022-01-16 23:04:39 +03:00
Konstantin Knizhnik
5c33095918 Do not write full page images at each checkpoint 2022-01-14 15:15:31 +03:00
14 changed files with 183 additions and 22 deletions

View File

@@ -294,6 +294,7 @@ impl PostgresNode {
conf.append("max_replication_slots", "10"); conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on"); conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB"); conf.append("shared_buffers", "1MB");
conf.append("zenith.file_cache_size", "4096");
conf.append("fsync", "off"); conf.append("fsync", "off");
conf.append("max_connections", "100"); conf.append("max_connections", "100");
conf.append("wal_level", "replica"); conf.append("wal_level", "replica");

View File

@@ -43,6 +43,9 @@ pub mod defaults {
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100; pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
pub const DEFAULT_MAX_DELTA_LAYERS: usize = 10;
pub const DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD: usize = 50;
/// ///
/// Default built-in configuration file. /// Default built-in configuration file.
/// ///
@@ -90,6 +93,21 @@ pub struct PageServerConf {
pub page_cache_size: usize, pub page_cache_size: usize,
pub max_file_descriptors: usize, pub max_file_descriptors: usize,
//
// Minimal total size of delta layeres which triggers generation of image layer by checkpointer.
// It is specified as percent of maximal sigment size (RELISH_SEG_SIZE).
// I.e. it means that checkpoint will create image layer in addition to delta layer only when total size
// of delta layers since last image layer exceeds specified percent of segment size.
//
pub image_layer_generation_threshold: usize,
//
// Maximal number of delta layers which can be stored before image layere should be generated.
// The garbage collector needs image layers in order to delete files.
// If this number is too large it can result in too many small files on disk.
//
pub max_delta_layers: usize,
// Repository directory, relative to current working directory. // Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory // Normally, the page server changes the current working directory
// to the repository, and 'workdir' is always '.'. But we don't do // to the repository, and 'workdir' is always '.'. But we don't do
@@ -228,6 +246,9 @@ impl PageServerConf {
page_cache_size: DEFAULT_PAGE_CACHE_SIZE, page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS, max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold: DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
pg_distrib_dir: PathBuf::new(), pg_distrib_dir: PathBuf::new(),
auth_validation_public_key_path: None, auth_validation_public_key_path: None,
auth_type: AuthType::Trust, auth_type: AuthType::Trust,
@@ -250,6 +271,10 @@ impl PageServerConf {
"max_file_descriptors" => { "max_file_descriptors" => {
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
} }
"max_delta_layers" => conf.max_delta_layers = parse_toml_u64(key, item)? as usize,
"image_layer_generation_threshold" => {
conf.image_layer_generation_threshold = parse_toml_u64(key, item)? as usize
}
"pg_distrib_dir" => { "pg_distrib_dir" => {
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?) conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
} }
@@ -379,6 +404,8 @@ impl PageServerConf {
gc_period: Duration::from_secs(10), gc_period: Duration::from_secs(10),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS, max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold: defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
superuser: "zenith_admin".to_string(), superuser: "zenith_admin".to_string(),
@@ -450,6 +477,9 @@ gc_horizon = 222
page_cache_size = 444 page_cache_size = 444
max_file_descriptors = 333 max_file_descriptors = 333
max_delta_layers = 10
image_layer_generation_threshold = 50
# initial superuser role name to use when creating a new tenant # initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zzzz' initial_superuser_name = 'zzzz'
@@ -480,6 +510,9 @@ initial_superuser_name = 'zzzz'
superuser: defaults::DEFAULT_SUPERUSER.to_string(), superuser: defaults::DEFAULT_SUPERUSER.to_string(),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS, max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold:
defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
workdir, workdir,
pg_distrib_dir, pg_distrib_dir,
auth_type: AuthType::Trust, auth_type: AuthType::Trust,
@@ -521,6 +554,8 @@ initial_superuser_name = 'zzzz'
superuser: "zzzz".to_string(), superuser: "zzzz".to_string(),
page_cache_size: 444, page_cache_size: 444,
max_file_descriptors: 333, max_file_descriptors: 333,
max_delta_layers: 10,
image_layer_generation_threshold: 50,
workdir, workdir,
pg_distrib_dir, pg_distrib_dir,
auth_type: AuthType::Trust, auth_type: AuthType::Trust,

View File

@@ -1598,7 +1598,7 @@ impl LayeredTimeline {
Ok(()) Ok(())
} }
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> { fn evict_layer(&self, layer_id: LayerId, mut reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
// Mark the layer as no longer accepting writes and record the end_lsn. // Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now. // This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the // We call `get_last_record_lsn` again, which may be different from the
@@ -1611,8 +1611,27 @@ impl LayeredTimeline {
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
if let Some(oldest_layer) = global_layer_map.get(&layer_id) { if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
let last_lsn = self.get_last_record_lsn();
// Avoid creation of image layers if there are not so much deltas
if reconstruct_pages
&& oldest_layer.get_seg_tag().rel.is_blocky()
&& self.conf.image_layer_generation_threshold != 0
{
let (n_delta_layers, total_delta_size) =
layers.count_delta_layers(oldest_layer.get_seg_tag(), last_lsn)?;
let logical_segment_size =
oldest_layer.get_seg_size(last_lsn)? as u64 * BLCKSZ as u64;
let physical_deltas_size = total_delta_size + oldest_layer.get_physical_size()?;
if logical_segment_size * self.conf.image_layer_generation_threshold as u64
> physical_deltas_size * 100
&& n_delta_layers < self.conf.max_delta_layers
{
reconstruct_pages = false;
}
}
drop(global_layer_map); drop(global_layer_map);
oldest_layer.freeze(self.get_last_record_lsn()); oldest_layer.freeze(last_lsn);
// The layer is no longer open, update the layer map to reflect this. // The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below. // We will replace it with on-disk historics below.

View File

@@ -161,6 +161,14 @@ pub struct DeltaLayerInner {
} }
impl DeltaLayerInner { impl DeltaLayerInner {
fn get_physical_size(&self) -> Result<u64> {
Ok(if let Some(book) = &self.book {
book.chapter_reader(PAGE_VERSIONS_CHAPTER)?.len()
} else {
0
})
}
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> { fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
// Scan the VecMap backwards, starting from the given entry. // Scan the VecMap backwards, starting from the given entry.
let slice = self let slice = self
@@ -289,6 +297,12 @@ impl Layer for DeltaLayer {
} }
} }
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
// TODO: is it actually necessary to load layer to get it's size?
self.load()?.get_physical_size()
}
/// Get size of the relation at given LSN /// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> { fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn); assert!(lsn >= self.start_lsn);

View File

@@ -41,7 +41,7 @@ pub struct EphemeralFile {
_timelineid: ZTimelineId, _timelineid: ZTimelineId,
file: Arc<VirtualFile>, file: Arc<VirtualFile>,
pos: u64, pub pos: u64,
} }
impl EphemeralFile { impl EphemeralFile {

View File

@@ -201,6 +201,11 @@ impl Layer for ImageLayer {
} }
} }
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
Ok(self.get_seg_size(Lsn(0))? as u64 * BLOCK_SIZE as u64)
}
/// Does this segment exist at given LSN? /// Does this segment exist at given LSN?
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> { fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
Ok(true) Ok(true)

View File

@@ -80,6 +80,10 @@ impl InMemoryLayerInner {
assert!(self.end_lsn.is_none()); assert!(self.end_lsn.is_none());
} }
fn get_physical_size(&self) -> u64 {
self.page_versions.size()
}
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk { fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
// Scan the BTreeMap backwards, starting from the given entry. // Scan the BTreeMap backwards, starting from the given entry.
let slice = self.seg_sizes.slice_range(..=lsn); let slice = self.seg_sizes.slice_range(..=lsn);
@@ -221,7 +225,12 @@ impl Layer for InMemoryLayer {
} }
} }
/// Get size of the relation at given LSN // Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
Ok(self.inner.read().unwrap().get_physical_size() as u64)
}
/// Get logical size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> { fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn); assert!(lsn >= self.start_lsn);
ensure!( ensure!(
@@ -616,7 +625,7 @@ impl InMemoryLayer {
let image_lsn: Option<Lsn>; let image_lsn: Option<Lsn>;
let delta_end_lsn: Option<Lsn>; let delta_end_lsn: Option<Lsn>;
if self.is_dropped() || !reconstruct_pages { if self.is_dropped() || !reconstruct_pages {
// The segment was dropped. Create just a delta layer containing all the // Create just a delta layer containing all the
// changes up to and including the drop. // changes up to and including the drop.
delta_end_lsn = Some(end_lsn_exclusive); delta_end_lsn = Some(end_lsn_exclusive);
image_lsn = None; image_lsn = None;

View File

@@ -111,6 +111,14 @@ where
} }
} }
/// Iterate over all items with start bound <= 'key'
pub fn iter_older(&self, key: I::Key) -> IntervalIter<I> {
IntervalIter {
point_iter: self.points.range(..key),
elem_iter: None,
}
}
/// Iterate over all items /// Iterate over all items
pub fn iter(&self) -> IntervalIter<I> { pub fn iter(&self) -> IntervalIter<I> {
IntervalIter { IntervalIter {
@@ -230,6 +238,35 @@ where
} }
} }
impl<'a, I> DoubleEndedIterator for IntervalIter<'a, I>
where
I: IntervalItem + ?Sized,
{
fn next_back(&mut self) -> Option<Self::Item> {
// Iterate over all elements in all the points in 'point_iter'. To avoid
// returning the same element twice, we only return each element at its
// starting point.
loop {
// Return next remaining element from the current point
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
while let Some(elem) = elem_iter.next_back() {
if elem.start_key() == *point_key {
return Some(Arc::clone(elem));
}
}
}
// No more elements at this point. Move to next point.
if let Some((point_key, point)) = self.point_iter.next_back() {
self.elem_iter = Some((*point_key, point.elements.iter()));
continue;
} else {
// No more points, all done
return None;
}
}
}
}
impl<I: ?Sized> Default for IntervalTree<I> impl<I: ?Sized> Default for IntervalTree<I>
where where
I: IntervalItem, I: IntervalItem,

View File

@@ -199,6 +199,14 @@ impl LayerMap {
} }
} }
pub fn count_delta_layers(&self, seg: SegmentTag, lsn: Lsn) -> Result<(usize, u64)> {
if let Some(segentry) = self.segs.get(&seg) {
segentry.count_delta_layers(lsn)
} else {
Ok((0, 0))
}
}
/// Is there any layer for given segment that is alive at the lsn? /// Is there any layer for given segment that is alive at the lsn?
/// ///
/// This is a public wrapper for SegEntry fucntion, /// This is a public wrapper for SegEntry fucntion,
@@ -320,6 +328,22 @@ impl SegEntry {
.any(|layer| !layer.is_incremental()) .any(|layer| !layer.is_incremental())
} }
// Count number of delta layers preceeding specified `lsn`.
// Perform backward iteration from exclusive upper bound until image layer is reached.
pub fn count_delta_layers(&self, lsn: Lsn) -> Result<(usize, u64)> {
let mut count: usize = 0;
let mut total_size: u64 = 0;
let mut iter = self.historic.iter_older(lsn);
while let Some(layer) = iter.next_back() {
if !layer.is_incremental() {
break;
}
count += 1;
total_size += layer.get_physical_size()?;
}
Ok((count, total_size))
}
// Set new open layer for a SegEntry. // Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer, // It's ok to rewrite previous open layer,
// but only if it is not writeable anymore. // but only if it is not writeable anymore.

View File

@@ -39,6 +39,10 @@ impl PageVersions {
} }
} }
pub fn size(&self) -> u64 {
self.file.pos
}
pub fn append_or_update_last( pub fn append_or_update_last(
&mut self, &mut self,
blknum: u32, blknum: u32,

View File

@@ -154,12 +154,15 @@ pub trait Layer: Send + Sync {
reconstruct_data: &mut PageReconstructData, reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult>; ) -> Result<PageReconstructResult>;
/// Return size of the segment at given LSN. (Only for blocky relations.) /// Return logical size of the segment at given LSN. (Only for blocky relations.)
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>; fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
/// Does the segment exist at given LSN? Or was it dropped before it. /// Does the segment exist at given LSN? Or was it dropped before it.
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>; fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64>;
/// Does this layer only contain some data for the segment (incremental), /// Does this layer only contain some data for the segment (incremental),
/// or does it contain a version of every page? This is important to know /// or does it contain a version of every page? This is important to know
/// for garbage collecting old layers: an incremental layer depends on /// for garbage collecting old layers: an incremental layer depends on

View File

@@ -13,6 +13,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result}; use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use postgres_ffi::pg_constants::BLCKSZ;
use regex::Regex; use regex::Regex;
use std::net::TcpListener; use std::net::TcpListener;
use std::str; use std::str;
@@ -42,6 +43,8 @@ use crate::tenant_mgr;
use crate::walreceiver; use crate::walreceiver;
use crate::CheckpointConfig; use crate::CheckpointConfig;
const CHUNK_SIZE: u32 = 128; // 1Mb
// Wrapped in libpq CopyData // Wrapped in libpq CopyData
enum PagestreamFeMessage { enum PagestreamFeMessage {
Exists(PagestreamExistsRequest), Exists(PagestreamExistsRequest),
@@ -91,7 +94,8 @@ struct PagestreamNblocksResponse {
#[derive(Debug)] #[derive(Debug)]
struct PagestreamGetPageResponse { struct PagestreamGetPageResponse {
page: Bytes, n_blocks: u32,
data: Bytes,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -162,7 +166,8 @@ impl PagestreamBeMessage {
Self::GetPage(resp) => { Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */ bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]); bytes.put_u32(resp.n_blocks);
bytes.put(&resp.data[..]);
} }
Self::Error(resp) => { Self::Error(resp) => {
@@ -438,11 +443,18 @@ impl PageServerHandler {
.entered(); .entered();
let tag = RelishTag::Relation(req.rel); let tag = RelishTag::Relation(req.rel);
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?; let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?; let blkno = req.blkno;
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for i in 0..n_blocks {
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
data.extend_from_slice(&page);
}
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page, n_blocks,
data: data.freeze(),
})) }))
} }

View File

@@ -66,8 +66,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone() row = pscur.fetchone()
print_gc_result(row) print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2 assert row['layer_relfiles_total'] == layer_relfiles_remain + 1
assert row['layer_relfiles_removed'] == 2 assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0 assert row['layer_relfiles_dropped'] == 0
# Insert two more rows and run GC. # Insert two more rows and run GC.
@@ -81,7 +81,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
row = pscur.fetchone() row = pscur.fetchone()
print_gc_result(row) print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2 assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 2 assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0 assert row['layer_relfiles_dropped'] == 0
# Do it again. Should again create two new layer files and remove old ones. # Do it again. Should again create two new layer files and remove old ones.
@@ -92,8 +92,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone() row = pscur.fetchone()
print_gc_result(row) print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2 assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
assert row['layer_relfiles_removed'] == 2 assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0 assert row['layer_relfiles_dropped'] == 0
# Run GC again, with no changes in the database. Should not remove anything. # Run GC again, with no changes in the database. Should not remove anything.
@@ -101,7 +101,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone() row = pscur.fetchone()
print_gc_result(row) print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
assert row['layer_relfiles_removed'] == 0 assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0 assert row['layer_relfiles_dropped'] == 0
@@ -121,9 +121,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
# Each relation fork is counted separately, hence 3. # Each relation fork is counted separately, hence 3.
assert row['layer_relfiles_needed_as_tombstone'] == 3 assert row['layer_relfiles_needed_as_tombstone'] == 3
# The catalog updates also create new layer files of the catalogs, which assert row['layer_relfiles_removed'] == 0
# are counted as 'removed'
assert row['layer_relfiles_removed'] > 0
# TODO Change the test to check actual CG of dropped layers. # TODO Change the test to check actual CG of dropped layers.
# Each relation fork is counted separately, hence 3. # Each relation fork is counted separately, hence 3.