Compare commits

...

5 Commits

Author SHA1 Message Date
Konstantin Knizhnik
c28b6573b4 Use 1Mb chunk instead of page for loading data from pageserver 2022-01-21 10:55:58 +03:00
Konstantin Knizhnik
cb70e63f34 Update test_snapfiles_gc test 2022-01-20 12:11:38 +03:00
Konstantin Knizhnik
e6c82c9609 Add max_image_layers and image_layer_generation_threshold parameters to config and rewrite criteria of image layer generation 2022-01-17 18:59:34 +03:00
Konstantin Knizhnik
79ade52535 Add comments and make changes suggested by reviewers
refer #1133
2022-01-16 23:04:39 +03:00
Konstantin Knizhnik
5c33095918 Do not write full page images at each checkpoint 2022-01-14 15:15:31 +03:00
14 changed files with 183 additions and 22 deletions

View File

@@ -294,6 +294,7 @@ impl PostgresNode {
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
conf.append("zenith.file_cache_size", "4096");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "replica");

View File

@@ -43,6 +43,9 @@ pub mod defaults {
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
pub const DEFAULT_MAX_DELTA_LAYERS: usize = 10;
pub const DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD: usize = 50;
///
/// Default built-in configuration file.
///
@@ -90,6 +93,21 @@ pub struct PageServerConf {
pub page_cache_size: usize,
pub max_file_descriptors: usize,
//
// Minimal total size of delta layeres which triggers generation of image layer by checkpointer.
// It is specified as percent of maximal sigment size (RELISH_SEG_SIZE).
// I.e. it means that checkpoint will create image layer in addition to delta layer only when total size
// of delta layers since last image layer exceeds specified percent of segment size.
//
pub image_layer_generation_threshold: usize,
//
// Maximal number of delta layers which can be stored before image layere should be generated.
// The garbage collector needs image layers in order to delete files.
// If this number is too large it can result in too many small files on disk.
//
pub max_delta_layers: usize,
// Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory
// to the repository, and 'workdir' is always '.'. But we don't do
@@ -228,6 +246,9 @@ impl PageServerConf {
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold: DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
pg_distrib_dir: PathBuf::new(),
auth_validation_public_key_path: None,
auth_type: AuthType::Trust,
@@ -250,6 +271,10 @@ impl PageServerConf {
"max_file_descriptors" => {
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
}
"max_delta_layers" => conf.max_delta_layers = parse_toml_u64(key, item)? as usize,
"image_layer_generation_threshold" => {
conf.image_layer_generation_threshold = parse_toml_u64(key, item)? as usize
}
"pg_distrib_dir" => {
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
}
@@ -379,6 +404,8 @@ impl PageServerConf {
gc_period: Duration::from_secs(10),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold: defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
superuser: "zenith_admin".to_string(),
@@ -450,6 +477,9 @@ gc_horizon = 222
page_cache_size = 444
max_file_descriptors = 333
max_delta_layers = 10
image_layer_generation_threshold = 50
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zzzz'
@@ -480,6 +510,9 @@ initial_superuser_name = 'zzzz'
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold:
defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
workdir,
pg_distrib_dir,
auth_type: AuthType::Trust,
@@ -521,6 +554,8 @@ initial_superuser_name = 'zzzz'
superuser: "zzzz".to_string(),
page_cache_size: 444,
max_file_descriptors: 333,
max_delta_layers: 10,
image_layer_generation_threshold: 50,
workdir,
pg_distrib_dir,
auth_type: AuthType::Trust,

View File

@@ -1598,7 +1598,7 @@ impl LayeredTimeline {
Ok(())
}
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
fn evict_layer(&self, layer_id: LayerId, mut reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
@@ -1611,8 +1611,27 @@ impl LayeredTimeline {
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
let last_lsn = self.get_last_record_lsn();
// Avoid creation of image layers if there are not so much deltas
if reconstruct_pages
&& oldest_layer.get_seg_tag().rel.is_blocky()
&& self.conf.image_layer_generation_threshold != 0
{
let (n_delta_layers, total_delta_size) =
layers.count_delta_layers(oldest_layer.get_seg_tag(), last_lsn)?;
let logical_segment_size =
oldest_layer.get_seg_size(last_lsn)? as u64 * BLCKSZ as u64;
let physical_deltas_size = total_delta_size + oldest_layer.get_physical_size()?;
if logical_segment_size * self.conf.image_layer_generation_threshold as u64
> physical_deltas_size * 100
&& n_delta_layers < self.conf.max_delta_layers
{
reconstruct_pages = false;
}
}
drop(global_layer_map);
oldest_layer.freeze(self.get_last_record_lsn());
oldest_layer.freeze(last_lsn);
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.

View File

@@ -161,6 +161,14 @@ pub struct DeltaLayerInner {
}
impl DeltaLayerInner {
fn get_physical_size(&self) -> Result<u64> {
Ok(if let Some(book) = &self.book {
book.chapter_reader(PAGE_VERSIONS_CHAPTER)?.len()
} else {
0
})
}
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
// Scan the VecMap backwards, starting from the given entry.
let slice = self
@@ -289,6 +297,12 @@ impl Layer for DeltaLayer {
}
}
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
// TODO: is it actually necessary to load layer to get it's size?
self.load()?.get_physical_size()
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn);

View File

@@ -41,7 +41,7 @@ pub struct EphemeralFile {
_timelineid: ZTimelineId,
file: Arc<VirtualFile>,
pos: u64,
pub pos: u64,
}
impl EphemeralFile {

View File

@@ -201,6 +201,11 @@ impl Layer for ImageLayer {
}
}
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
Ok(self.get_seg_size(Lsn(0))? as u64 * BLOCK_SIZE as u64)
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
Ok(true)

View File

@@ -80,6 +80,10 @@ impl InMemoryLayerInner {
assert!(self.end_lsn.is_none());
}
fn get_physical_size(&self) -> u64 {
self.page_versions.size()
}
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
// Scan the BTreeMap backwards, starting from the given entry.
let slice = self.seg_sizes.slice_range(..=lsn);
@@ -221,7 +225,12 @@ impl Layer for InMemoryLayer {
}
}
/// Get size of the relation at given LSN
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
Ok(self.inner.read().unwrap().get_physical_size() as u64)
}
/// Get logical size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn);
ensure!(
@@ -616,7 +625,7 @@ impl InMemoryLayer {
let image_lsn: Option<Lsn>;
let delta_end_lsn: Option<Lsn>;
if self.is_dropped() || !reconstruct_pages {
// The segment was dropped. Create just a delta layer containing all the
// Create just a delta layer containing all the
// changes up to and including the drop.
delta_end_lsn = Some(end_lsn_exclusive);
image_lsn = None;

View File

@@ -111,6 +111,14 @@ where
}
}
/// Iterate over all items with start bound <= 'key'
pub fn iter_older(&self, key: I::Key) -> IntervalIter<I> {
IntervalIter {
point_iter: self.points.range(..key),
elem_iter: None,
}
}
/// Iterate over all items
pub fn iter(&self) -> IntervalIter<I> {
IntervalIter {
@@ -230,6 +238,35 @@ where
}
}
impl<'a, I> DoubleEndedIterator for IntervalIter<'a, I>
where
I: IntervalItem + ?Sized,
{
fn next_back(&mut self) -> Option<Self::Item> {
// Iterate over all elements in all the points in 'point_iter'. To avoid
// returning the same element twice, we only return each element at its
// starting point.
loop {
// Return next remaining element from the current point
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
while let Some(elem) = elem_iter.next_back() {
if elem.start_key() == *point_key {
return Some(Arc::clone(elem));
}
}
}
// No more elements at this point. Move to next point.
if let Some((point_key, point)) = self.point_iter.next_back() {
self.elem_iter = Some((*point_key, point.elements.iter()));
continue;
} else {
// No more points, all done
return None;
}
}
}
}
impl<I: ?Sized> Default for IntervalTree<I>
where
I: IntervalItem,

View File

@@ -199,6 +199,14 @@ impl LayerMap {
}
}
pub fn count_delta_layers(&self, seg: SegmentTag, lsn: Lsn) -> Result<(usize, u64)> {
if let Some(segentry) = self.segs.get(&seg) {
segentry.count_delta_layers(lsn)
} else {
Ok((0, 0))
}
}
/// Is there any layer for given segment that is alive at the lsn?
///
/// This is a public wrapper for SegEntry fucntion,
@@ -320,6 +328,22 @@ impl SegEntry {
.any(|layer| !layer.is_incremental())
}
// Count number of delta layers preceeding specified `lsn`.
// Perform backward iteration from exclusive upper bound until image layer is reached.
pub fn count_delta_layers(&self, lsn: Lsn) -> Result<(usize, u64)> {
let mut count: usize = 0;
let mut total_size: u64 = 0;
let mut iter = self.historic.iter_older(lsn);
while let Some(layer) = iter.next_back() {
if !layer.is_incremental() {
break;
}
count += 1;
total_size += layer.get_physical_size()?;
}
Ok((count, total_size))
}
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.

View File

@@ -39,6 +39,10 @@ impl PageVersions {
}
}
pub fn size(&self) -> u64 {
self.file.pos
}
pub fn append_or_update_last(
&mut self,
blknum: u32,

View File

@@ -154,12 +154,15 @@ pub trait Layer: Send + Sync {
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult>;
/// Return size of the segment at given LSN. (Only for blocky relations.)
/// Return logical size of the segment at given LSN. (Only for blocky relations.)
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
/// Does the segment exist at given LSN? Or was it dropped before it.
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64>;
/// Does this layer only contain some data for the segment (incremental),
/// or does it contain a version of every page? This is important to know
/// for garbage collecting old layers: an incremental layer depends on

View File

@@ -13,6 +13,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use postgres_ffi::pg_constants::BLCKSZ;
use regex::Regex;
use std::net::TcpListener;
use std::str;
@@ -42,6 +43,8 @@ use crate::tenant_mgr;
use crate::walreceiver;
use crate::CheckpointConfig;
const CHUNK_SIZE: u32 = 128; // 1Mb
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
@@ -91,7 +94,8 @@ struct PagestreamNblocksResponse {
#[derive(Debug)]
struct PagestreamGetPageResponse {
page: Bytes,
n_blocks: u32,
data: Bytes,
}
#[derive(Debug)]
@@ -162,7 +166,8 @@ impl PagestreamBeMessage {
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]);
bytes.put_u32(resp.n_blocks);
bytes.put(&resp.data[..]);
}
Self::Error(resp) => {
@@ -438,11 +443,18 @@ impl PageServerHandler {
.entered();
let tag = RelishTag::Relation(req.rel);
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
let blkno = req.blkno;
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for i in 0..n_blocks {
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
data.extend_from_slice(&page);
}
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
n_blocks,
data: data.freeze(),
}))
}

View File

@@ -66,8 +66,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 2
assert row['layer_relfiles_total'] == layer_relfiles_remain + 1
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0
# Insert two more rows and run GC.
@@ -81,7 +81,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 2
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0
# Do it again. Should again create two new layer files and remove old ones.
@@ -92,8 +92,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 2
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0
# Run GC again, with no changes in the database. Should not remove anything.
@@ -101,7 +101,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0
@@ -121,9 +121,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
# Each relation fork is counted separately, hence 3.
assert row['layer_relfiles_needed_as_tombstone'] == 3
# The catalog updates also create new layer files of the catalogs, which
# are counted as 'removed'
assert row['layer_relfiles_removed'] > 0
assert row['layer_relfiles_removed'] == 0
# TODO Change the test to check actual CG of dropped layers.
# Each relation fork is counted separately, hence 3.