mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
5 Commits
sk-patch-c
...
chunk_load
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c28b6573b4 | ||
|
|
cb70e63f34 | ||
|
|
e6c82c9609 | ||
|
|
79ade52535 | ||
|
|
5c33095918 |
@@ -294,6 +294,7 @@ impl PostgresNode {
|
||||
conf.append("max_replication_slots", "10");
|
||||
conf.append("hot_standby", "on");
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("zenith.file_cache_size", "4096");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "replica");
|
||||
|
||||
@@ -43,6 +43,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
||||
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
||||
|
||||
pub const DEFAULT_MAX_DELTA_LAYERS: usize = 10;
|
||||
pub const DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD: usize = 50;
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
///
|
||||
@@ -90,6 +93,21 @@ pub struct PageServerConf {
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
|
||||
//
|
||||
// Minimal total size of delta layeres which triggers generation of image layer by checkpointer.
|
||||
// It is specified as percent of maximal sigment size (RELISH_SEG_SIZE).
|
||||
// I.e. it means that checkpoint will create image layer in addition to delta layer only when total size
|
||||
// of delta layers since last image layer exceeds specified percent of segment size.
|
||||
//
|
||||
pub image_layer_generation_threshold: usize,
|
||||
|
||||
//
|
||||
// Maximal number of delta layers which can be stored before image layere should be generated.
|
||||
// The garbage collector needs image layers in order to delete files.
|
||||
// If this number is too large it can result in too many small files on disk.
|
||||
//
|
||||
pub max_delta_layers: usize,
|
||||
|
||||
// Repository directory, relative to current working directory.
|
||||
// Normally, the page server changes the current working directory
|
||||
// to the repository, and 'workdir' is always '.'. But we don't do
|
||||
@@ -228,6 +246,9 @@ impl PageServerConf {
|
||||
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
|
||||
max_delta_layers: DEFAULT_MAX_DELTA_LAYERS,
|
||||
image_layer_generation_threshold: DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||
|
||||
pg_distrib_dir: PathBuf::new(),
|
||||
auth_validation_public_key_path: None,
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -250,6 +271,10 @@ impl PageServerConf {
|
||||
"max_file_descriptors" => {
|
||||
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
|
||||
}
|
||||
"max_delta_layers" => conf.max_delta_layers = parse_toml_u64(key, item)? as usize,
|
||||
"image_layer_generation_threshold" => {
|
||||
conf.image_layer_generation_threshold = parse_toml_u64(key, item)? as usize
|
||||
}
|
||||
"pg_distrib_dir" => {
|
||||
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
|
||||
}
|
||||
@@ -379,6 +404,8 @@ impl PageServerConf {
|
||||
gc_period: Duration::from_secs(10),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
|
||||
image_layer_generation_threshold: defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
superuser: "zenith_admin".to_string(),
|
||||
@@ -450,6 +477,9 @@ gc_horizon = 222
|
||||
page_cache_size = 444
|
||||
max_file_descriptors = 333
|
||||
|
||||
max_delta_layers = 10
|
||||
image_layer_generation_threshold = 50
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
initial_superuser_name = 'zzzz'
|
||||
|
||||
@@ -480,6 +510,9 @@ initial_superuser_name = 'zzzz'
|
||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
|
||||
image_layer_generation_threshold:
|
||||
defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -521,6 +554,8 @@ initial_superuser_name = 'zzzz'
|
||||
superuser: "zzzz".to_string(),
|
||||
page_cache_size: 444,
|
||||
max_file_descriptors: 333,
|
||||
max_delta_layers: 10,
|
||||
image_layer_generation_threshold: 50,
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
|
||||
@@ -1598,7 +1598,7 @@ impl LayeredTimeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
|
||||
fn evict_layer(&self, layer_id: LayerId, mut reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
|
||||
// Mark the layer as no longer accepting writes and record the end_lsn.
|
||||
// This happens in-place, no new layers are created now.
|
||||
// We call `get_last_record_lsn` again, which may be different from the
|
||||
@@ -1611,8 +1611,27 @@ impl LayeredTimeline {
|
||||
|
||||
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
||||
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
// Avoid creation of image layers if there are not so much deltas
|
||||
if reconstruct_pages
|
||||
&& oldest_layer.get_seg_tag().rel.is_blocky()
|
||||
&& self.conf.image_layer_generation_threshold != 0
|
||||
{
|
||||
let (n_delta_layers, total_delta_size) =
|
||||
layers.count_delta_layers(oldest_layer.get_seg_tag(), last_lsn)?;
|
||||
let logical_segment_size =
|
||||
oldest_layer.get_seg_size(last_lsn)? as u64 * BLCKSZ as u64;
|
||||
let physical_deltas_size = total_delta_size + oldest_layer.get_physical_size()?;
|
||||
if logical_segment_size * self.conf.image_layer_generation_threshold as u64
|
||||
> physical_deltas_size * 100
|
||||
&& n_delta_layers < self.conf.max_delta_layers
|
||||
{
|
||||
reconstruct_pages = false;
|
||||
}
|
||||
}
|
||||
|
||||
drop(global_layer_map);
|
||||
oldest_layer.freeze(self.get_last_record_lsn());
|
||||
oldest_layer.freeze(last_lsn);
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
|
||||
@@ -161,6 +161,14 @@ pub struct DeltaLayerInner {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
Ok(if let Some(book) = &self.book {
|
||||
book.chapter_reader(PAGE_VERSIONS_CHAPTER)?.len()
|
||||
} else {
|
||||
0
|
||||
})
|
||||
}
|
||||
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||
// Scan the VecMap backwards, starting from the given entry.
|
||||
let slice = self
|
||||
@@ -289,6 +297,12 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
// TODO: is it actually necessary to load layer to get it's size?
|
||||
self.load()?.get_physical_size()
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||
assert!(lsn >= self.start_lsn);
|
||||
|
||||
@@ -41,7 +41,7 @@ pub struct EphemeralFile {
|
||||
_timelineid: ZTimelineId,
|
||||
file: Arc<VirtualFile>,
|
||||
|
||||
pos: u64,
|
||||
pub pos: u64,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
|
||||
@@ -201,6 +201,11 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
Ok(self.get_seg_size(Lsn(0))? as u64 * BLOCK_SIZE as u64)
|
||||
}
|
||||
|
||||
/// Does this segment exist at given LSN?
|
||||
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
|
||||
Ok(true)
|
||||
|
||||
@@ -80,6 +80,10 @@ impl InMemoryLayerInner {
|
||||
assert!(self.end_lsn.is_none());
|
||||
}
|
||||
|
||||
fn get_physical_size(&self) -> u64 {
|
||||
self.page_versions.size()
|
||||
}
|
||||
|
||||
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let slice = self.seg_sizes.slice_range(..=lsn);
|
||||
@@ -221,7 +225,12 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
Ok(self.inner.read().unwrap().get_physical_size() as u64)
|
||||
}
|
||||
|
||||
/// Get logical size of the relation at given LSN
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||
assert!(lsn >= self.start_lsn);
|
||||
ensure!(
|
||||
@@ -616,7 +625,7 @@ impl InMemoryLayer {
|
||||
let image_lsn: Option<Lsn>;
|
||||
let delta_end_lsn: Option<Lsn>;
|
||||
if self.is_dropped() || !reconstruct_pages {
|
||||
// The segment was dropped. Create just a delta layer containing all the
|
||||
// Create just a delta layer containing all the
|
||||
// changes up to and including the drop.
|
||||
delta_end_lsn = Some(end_lsn_exclusive);
|
||||
image_lsn = None;
|
||||
|
||||
@@ -111,6 +111,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate over all items with start bound <= 'key'
|
||||
pub fn iter_older(&self, key: I::Key) -> IntervalIter<I> {
|
||||
IntervalIter {
|
||||
point_iter: self.points.range(..key),
|
||||
elem_iter: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate over all items
|
||||
pub fn iter(&self) -> IntervalIter<I> {
|
||||
IntervalIter {
|
||||
@@ -230,6 +238,35 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, I> DoubleEndedIterator for IntervalIter<'a, I>
|
||||
where
|
||||
I: IntervalItem + ?Sized,
|
||||
{
|
||||
fn next_back(&mut self) -> Option<Self::Item> {
|
||||
// Iterate over all elements in all the points in 'point_iter'. To avoid
|
||||
// returning the same element twice, we only return each element at its
|
||||
// starting point.
|
||||
loop {
|
||||
// Return next remaining element from the current point
|
||||
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
|
||||
while let Some(elem) = elem_iter.next_back() {
|
||||
if elem.start_key() == *point_key {
|
||||
return Some(Arc::clone(elem));
|
||||
}
|
||||
}
|
||||
}
|
||||
// No more elements at this point. Move to next point.
|
||||
if let Some((point_key, point)) = self.point_iter.next_back() {
|
||||
self.elem_iter = Some((*point_key, point.elements.iter()));
|
||||
continue;
|
||||
} else {
|
||||
// No more points, all done
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: ?Sized> Default for IntervalTree<I>
|
||||
where
|
||||
I: IntervalItem,
|
||||
|
||||
@@ -199,6 +199,14 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn count_delta_layers(&self, seg: SegmentTag, lsn: Lsn) -> Result<(usize, u64)> {
|
||||
if let Some(segentry) = self.segs.get(&seg) {
|
||||
segentry.count_delta_layers(lsn)
|
||||
} else {
|
||||
Ok((0, 0))
|
||||
}
|
||||
}
|
||||
|
||||
/// Is there any layer for given segment that is alive at the lsn?
|
||||
///
|
||||
/// This is a public wrapper for SegEntry fucntion,
|
||||
@@ -320,6 +328,22 @@ impl SegEntry {
|
||||
.any(|layer| !layer.is_incremental())
|
||||
}
|
||||
|
||||
// Count number of delta layers preceeding specified `lsn`.
|
||||
// Perform backward iteration from exclusive upper bound until image layer is reached.
|
||||
pub fn count_delta_layers(&self, lsn: Lsn) -> Result<(usize, u64)> {
|
||||
let mut count: usize = 0;
|
||||
let mut total_size: u64 = 0;
|
||||
let mut iter = self.historic.iter_older(lsn);
|
||||
while let Some(layer) = iter.next_back() {
|
||||
if !layer.is_incremental() {
|
||||
break;
|
||||
}
|
||||
count += 1;
|
||||
total_size += layer.get_physical_size()?;
|
||||
}
|
||||
Ok((count, total_size))
|
||||
}
|
||||
|
||||
// Set new open layer for a SegEntry.
|
||||
// It's ok to rewrite previous open layer,
|
||||
// but only if it is not writeable anymore.
|
||||
|
||||
@@ -39,6 +39,10 @@ impl PageVersions {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.file.pos
|
||||
}
|
||||
|
||||
pub fn append_or_update_last(
|
||||
&mut self,
|
||||
blknum: u32,
|
||||
|
||||
@@ -154,12 +154,15 @@ pub trait Layer: Send + Sync {
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<PageReconstructResult>;
|
||||
|
||||
/// Return size of the segment at given LSN. (Only for blocky relations.)
|
||||
/// Return logical size of the segment at given LSN. (Only for blocky relations.)
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
|
||||
|
||||
/// Does the segment exist at given LSN? Or was it dropped before it.
|
||||
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
|
||||
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64>;
|
||||
|
||||
/// Does this layer only contain some data for the segment (incremental),
|
||||
/// or does it contain a version of every page? This is important to know
|
||||
/// for garbage collecting old layers: an incremental layer depends on
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use regex::Regex;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
@@ -42,6 +43,8 @@ use crate::tenant_mgr;
|
||||
use crate::walreceiver;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
const CHUNK_SIZE: u32 = 128; // 1Mb
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
@@ -91,7 +94,8 @@ struct PagestreamNblocksResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
n_blocks: u32,
|
||||
data: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -162,7 +166,8 @@ impl PagestreamBeMessage {
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(102); /* tag from pagestore_client.h */
|
||||
bytes.put(&resp.page[..]);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
bytes.put(&resp.data[..]);
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
@@ -438,11 +443,18 @@ impl PageServerHandler {
|
||||
.entered();
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||
|
||||
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
|
||||
|
||||
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
|
||||
let blkno = req.blkno;
|
||||
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
|
||||
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for i in 0..n_blocks {
|
||||
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
|
||||
data.extend_from_slice(&page);
|
||||
}
|
||||
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
n_blocks,
|
||||
data: data.freeze(),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -66,8 +66,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||
assert row['layer_relfiles_removed'] == 2
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 1
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
# Insert two more rows and run GC.
|
||||
@@ -81,7 +81,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||
assert row['layer_relfiles_removed'] == 2
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
# Do it again. Should again create two new layer files and remove old ones.
|
||||
@@ -92,8 +92,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||
assert row['layer_relfiles_removed'] == 2
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
# Run GC again, with no changes in the database. Should not remove anything.
|
||||
@@ -101,7 +101,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
@@ -121,9 +121,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
# Each relation fork is counted separately, hence 3.
|
||||
assert row['layer_relfiles_needed_as_tombstone'] == 3
|
||||
|
||||
# The catalog updates also create new layer files of the catalogs, which
|
||||
# are counted as 'removed'
|
||||
assert row['layer_relfiles_removed'] > 0
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
|
||||
# TODO Change the test to check actual CG of dropped layers.
|
||||
# Each relation fork is counted separately, hence 3.
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 3b166d06cf...6309cf1b52
Reference in New Issue
Block a user