mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-07 12:40:38 +00:00
Compare commits
5 Commits
split-prox
...
chunk_load
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c28b6573b4 | ||
|
|
cb70e63f34 | ||
|
|
e6c82c9609 | ||
|
|
79ade52535 | ||
|
|
5c33095918 |
@@ -294,6 +294,7 @@ impl PostgresNode {
|
|||||||
conf.append("max_replication_slots", "10");
|
conf.append("max_replication_slots", "10");
|
||||||
conf.append("hot_standby", "on");
|
conf.append("hot_standby", "on");
|
||||||
conf.append("shared_buffers", "1MB");
|
conf.append("shared_buffers", "1MB");
|
||||||
|
conf.append("zenith.file_cache_size", "4096");
|
||||||
conf.append("fsync", "off");
|
conf.append("fsync", "off");
|
||||||
conf.append("max_connections", "100");
|
conf.append("max_connections", "100");
|
||||||
conf.append("wal_level", "replica");
|
conf.append("wal_level", "replica");
|
||||||
|
|||||||
@@ -43,6 +43,9 @@ pub mod defaults {
|
|||||||
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
||||||
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
||||||
|
|
||||||
|
pub const DEFAULT_MAX_DELTA_LAYERS: usize = 10;
|
||||||
|
pub const DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD: usize = 50;
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Default built-in configuration file.
|
/// Default built-in configuration file.
|
||||||
///
|
///
|
||||||
@@ -90,6 +93,21 @@ pub struct PageServerConf {
|
|||||||
pub page_cache_size: usize,
|
pub page_cache_size: usize,
|
||||||
pub max_file_descriptors: usize,
|
pub max_file_descriptors: usize,
|
||||||
|
|
||||||
|
//
|
||||||
|
// Minimal total size of delta layeres which triggers generation of image layer by checkpointer.
|
||||||
|
// It is specified as percent of maximal sigment size (RELISH_SEG_SIZE).
|
||||||
|
// I.e. it means that checkpoint will create image layer in addition to delta layer only when total size
|
||||||
|
// of delta layers since last image layer exceeds specified percent of segment size.
|
||||||
|
//
|
||||||
|
pub image_layer_generation_threshold: usize,
|
||||||
|
|
||||||
|
//
|
||||||
|
// Maximal number of delta layers which can be stored before image layere should be generated.
|
||||||
|
// The garbage collector needs image layers in order to delete files.
|
||||||
|
// If this number is too large it can result in too many small files on disk.
|
||||||
|
//
|
||||||
|
pub max_delta_layers: usize,
|
||||||
|
|
||||||
// Repository directory, relative to current working directory.
|
// Repository directory, relative to current working directory.
|
||||||
// Normally, the page server changes the current working directory
|
// Normally, the page server changes the current working directory
|
||||||
// to the repository, and 'workdir' is always '.'. But we don't do
|
// to the repository, and 'workdir' is always '.'. But we don't do
|
||||||
@@ -228,6 +246,9 @@ impl PageServerConf {
|
|||||||
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
|
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
|
||||||
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
|
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||||
|
|
||||||
|
max_delta_layers: DEFAULT_MAX_DELTA_LAYERS,
|
||||||
|
image_layer_generation_threshold: DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||||
|
|
||||||
pg_distrib_dir: PathBuf::new(),
|
pg_distrib_dir: PathBuf::new(),
|
||||||
auth_validation_public_key_path: None,
|
auth_validation_public_key_path: None,
|
||||||
auth_type: AuthType::Trust,
|
auth_type: AuthType::Trust,
|
||||||
@@ -250,6 +271,10 @@ impl PageServerConf {
|
|||||||
"max_file_descriptors" => {
|
"max_file_descriptors" => {
|
||||||
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
|
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
|
||||||
}
|
}
|
||||||
|
"max_delta_layers" => conf.max_delta_layers = parse_toml_u64(key, item)? as usize,
|
||||||
|
"image_layer_generation_threshold" => {
|
||||||
|
conf.image_layer_generation_threshold = parse_toml_u64(key, item)? as usize
|
||||||
|
}
|
||||||
"pg_distrib_dir" => {
|
"pg_distrib_dir" => {
|
||||||
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
|
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
|
||||||
}
|
}
|
||||||
@@ -379,6 +404,8 @@ impl PageServerConf {
|
|||||||
gc_period: Duration::from_secs(10),
|
gc_period: Duration::from_secs(10),
|
||||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||||
|
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
|
||||||
|
image_layer_generation_threshold: defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||||
superuser: "zenith_admin".to_string(),
|
superuser: "zenith_admin".to_string(),
|
||||||
@@ -450,6 +477,9 @@ gc_horizon = 222
|
|||||||
page_cache_size = 444
|
page_cache_size = 444
|
||||||
max_file_descriptors = 333
|
max_file_descriptors = 333
|
||||||
|
|
||||||
|
max_delta_layers = 10
|
||||||
|
image_layer_generation_threshold = 50
|
||||||
|
|
||||||
# initial superuser role name to use when creating a new tenant
|
# initial superuser role name to use when creating a new tenant
|
||||||
initial_superuser_name = 'zzzz'
|
initial_superuser_name = 'zzzz'
|
||||||
|
|
||||||
@@ -480,6 +510,9 @@ initial_superuser_name = 'zzzz'
|
|||||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||||
|
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
|
||||||
|
image_layer_generation_threshold:
|
||||||
|
defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||||
workdir,
|
workdir,
|
||||||
pg_distrib_dir,
|
pg_distrib_dir,
|
||||||
auth_type: AuthType::Trust,
|
auth_type: AuthType::Trust,
|
||||||
@@ -521,6 +554,8 @@ initial_superuser_name = 'zzzz'
|
|||||||
superuser: "zzzz".to_string(),
|
superuser: "zzzz".to_string(),
|
||||||
page_cache_size: 444,
|
page_cache_size: 444,
|
||||||
max_file_descriptors: 333,
|
max_file_descriptors: 333,
|
||||||
|
max_delta_layers: 10,
|
||||||
|
image_layer_generation_threshold: 50,
|
||||||
workdir,
|
workdir,
|
||||||
pg_distrib_dir,
|
pg_distrib_dir,
|
||||||
auth_type: AuthType::Trust,
|
auth_type: AuthType::Trust,
|
||||||
|
|||||||
@@ -1598,7 +1598,7 @@ impl LayeredTimeline {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
|
fn evict_layer(&self, layer_id: LayerId, mut reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
|
||||||
// Mark the layer as no longer accepting writes and record the end_lsn.
|
// Mark the layer as no longer accepting writes and record the end_lsn.
|
||||||
// This happens in-place, no new layers are created now.
|
// This happens in-place, no new layers are created now.
|
||||||
// We call `get_last_record_lsn` again, which may be different from the
|
// We call `get_last_record_lsn` again, which may be different from the
|
||||||
@@ -1611,8 +1611,27 @@ impl LayeredTimeline {
|
|||||||
|
|
||||||
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
||||||
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
|
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
|
||||||
|
let last_lsn = self.get_last_record_lsn();
|
||||||
|
// Avoid creation of image layers if there are not so much deltas
|
||||||
|
if reconstruct_pages
|
||||||
|
&& oldest_layer.get_seg_tag().rel.is_blocky()
|
||||||
|
&& self.conf.image_layer_generation_threshold != 0
|
||||||
|
{
|
||||||
|
let (n_delta_layers, total_delta_size) =
|
||||||
|
layers.count_delta_layers(oldest_layer.get_seg_tag(), last_lsn)?;
|
||||||
|
let logical_segment_size =
|
||||||
|
oldest_layer.get_seg_size(last_lsn)? as u64 * BLCKSZ as u64;
|
||||||
|
let physical_deltas_size = total_delta_size + oldest_layer.get_physical_size()?;
|
||||||
|
if logical_segment_size * self.conf.image_layer_generation_threshold as u64
|
||||||
|
> physical_deltas_size * 100
|
||||||
|
&& n_delta_layers < self.conf.max_delta_layers
|
||||||
|
{
|
||||||
|
reconstruct_pages = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
drop(global_layer_map);
|
drop(global_layer_map);
|
||||||
oldest_layer.freeze(self.get_last_record_lsn());
|
oldest_layer.freeze(last_lsn);
|
||||||
|
|
||||||
// The layer is no longer open, update the layer map to reflect this.
|
// The layer is no longer open, update the layer map to reflect this.
|
||||||
// We will replace it with on-disk historics below.
|
// We will replace it with on-disk historics below.
|
||||||
|
|||||||
@@ -161,6 +161,14 @@ pub struct DeltaLayerInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DeltaLayerInner {
|
impl DeltaLayerInner {
|
||||||
|
fn get_physical_size(&self) -> Result<u64> {
|
||||||
|
Ok(if let Some(book) = &self.book {
|
||||||
|
book.chapter_reader(PAGE_VERSIONS_CHAPTER)?.len()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||||
// Scan the VecMap backwards, starting from the given entry.
|
// Scan the VecMap backwards, starting from the given entry.
|
||||||
let slice = self
|
let slice = self
|
||||||
@@ -289,6 +297,12 @@ impl Layer for DeltaLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get physical size of the layer
|
||||||
|
fn get_physical_size(&self) -> Result<u64> {
|
||||||
|
// TODO: is it actually necessary to load layer to get it's size?
|
||||||
|
self.load()?.get_physical_size()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get size of the relation at given LSN
|
/// Get size of the relation at given LSN
|
||||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||||
assert!(lsn >= self.start_lsn);
|
assert!(lsn >= self.start_lsn);
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ pub struct EphemeralFile {
|
|||||||
_timelineid: ZTimelineId,
|
_timelineid: ZTimelineId,
|
||||||
file: Arc<VirtualFile>,
|
file: Arc<VirtualFile>,
|
||||||
|
|
||||||
pos: u64,
|
pub pos: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EphemeralFile {
|
impl EphemeralFile {
|
||||||
|
|||||||
@@ -201,6 +201,11 @@ impl Layer for ImageLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get physical size of the layer
|
||||||
|
fn get_physical_size(&self) -> Result<u64> {
|
||||||
|
Ok(self.get_seg_size(Lsn(0))? as u64 * BLOCK_SIZE as u64)
|
||||||
|
}
|
||||||
|
|
||||||
/// Does this segment exist at given LSN?
|
/// Does this segment exist at given LSN?
|
||||||
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
|
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
|
||||||
Ok(true)
|
Ok(true)
|
||||||
|
|||||||
@@ -80,6 +80,10 @@ impl InMemoryLayerInner {
|
|||||||
assert!(self.end_lsn.is_none());
|
assert!(self.end_lsn.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_physical_size(&self) -> u64 {
|
||||||
|
self.page_versions.size()
|
||||||
|
}
|
||||||
|
|
||||||
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
|
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
|
||||||
// Scan the BTreeMap backwards, starting from the given entry.
|
// Scan the BTreeMap backwards, starting from the given entry.
|
||||||
let slice = self.seg_sizes.slice_range(..=lsn);
|
let slice = self.seg_sizes.slice_range(..=lsn);
|
||||||
@@ -221,7 +225,12 @@ impl Layer for InMemoryLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get size of the relation at given LSN
|
// Get physical size of the layer
|
||||||
|
fn get_physical_size(&self) -> Result<u64> {
|
||||||
|
Ok(self.inner.read().unwrap().get_physical_size() as u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get logical size of the relation at given LSN
|
||||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||||
assert!(lsn >= self.start_lsn);
|
assert!(lsn >= self.start_lsn);
|
||||||
ensure!(
|
ensure!(
|
||||||
@@ -616,7 +625,7 @@ impl InMemoryLayer {
|
|||||||
let image_lsn: Option<Lsn>;
|
let image_lsn: Option<Lsn>;
|
||||||
let delta_end_lsn: Option<Lsn>;
|
let delta_end_lsn: Option<Lsn>;
|
||||||
if self.is_dropped() || !reconstruct_pages {
|
if self.is_dropped() || !reconstruct_pages {
|
||||||
// The segment was dropped. Create just a delta layer containing all the
|
// Create just a delta layer containing all the
|
||||||
// changes up to and including the drop.
|
// changes up to and including the drop.
|
||||||
delta_end_lsn = Some(end_lsn_exclusive);
|
delta_end_lsn = Some(end_lsn_exclusive);
|
||||||
image_lsn = None;
|
image_lsn = None;
|
||||||
|
|||||||
@@ -111,6 +111,14 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Iterate over all items with start bound <= 'key'
|
||||||
|
pub fn iter_older(&self, key: I::Key) -> IntervalIter<I> {
|
||||||
|
IntervalIter {
|
||||||
|
point_iter: self.points.range(..key),
|
||||||
|
elem_iter: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Iterate over all items
|
/// Iterate over all items
|
||||||
pub fn iter(&self) -> IntervalIter<I> {
|
pub fn iter(&self) -> IntervalIter<I> {
|
||||||
IntervalIter {
|
IntervalIter {
|
||||||
@@ -230,6 +238,35 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a, I> DoubleEndedIterator for IntervalIter<'a, I>
|
||||||
|
where
|
||||||
|
I: IntervalItem + ?Sized,
|
||||||
|
{
|
||||||
|
fn next_back(&mut self) -> Option<Self::Item> {
|
||||||
|
// Iterate over all elements in all the points in 'point_iter'. To avoid
|
||||||
|
// returning the same element twice, we only return each element at its
|
||||||
|
// starting point.
|
||||||
|
loop {
|
||||||
|
// Return next remaining element from the current point
|
||||||
|
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
|
||||||
|
while let Some(elem) = elem_iter.next_back() {
|
||||||
|
if elem.start_key() == *point_key {
|
||||||
|
return Some(Arc::clone(elem));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// No more elements at this point. Move to next point.
|
||||||
|
if let Some((point_key, point)) = self.point_iter.next_back() {
|
||||||
|
self.elem_iter = Some((*point_key, point.elements.iter()));
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// No more points, all done
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<I: ?Sized> Default for IntervalTree<I>
|
impl<I: ?Sized> Default for IntervalTree<I>
|
||||||
where
|
where
|
||||||
I: IntervalItem,
|
I: IntervalItem,
|
||||||
|
|||||||
@@ -199,6 +199,14 @@ impl LayerMap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn count_delta_layers(&self, seg: SegmentTag, lsn: Lsn) -> Result<(usize, u64)> {
|
||||||
|
if let Some(segentry) = self.segs.get(&seg) {
|
||||||
|
segentry.count_delta_layers(lsn)
|
||||||
|
} else {
|
||||||
|
Ok((0, 0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Is there any layer for given segment that is alive at the lsn?
|
/// Is there any layer for given segment that is alive at the lsn?
|
||||||
///
|
///
|
||||||
/// This is a public wrapper for SegEntry fucntion,
|
/// This is a public wrapper for SegEntry fucntion,
|
||||||
@@ -320,6 +328,22 @@ impl SegEntry {
|
|||||||
.any(|layer| !layer.is_incremental())
|
.any(|layer| !layer.is_incremental())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Count number of delta layers preceeding specified `lsn`.
|
||||||
|
// Perform backward iteration from exclusive upper bound until image layer is reached.
|
||||||
|
pub fn count_delta_layers(&self, lsn: Lsn) -> Result<(usize, u64)> {
|
||||||
|
let mut count: usize = 0;
|
||||||
|
let mut total_size: u64 = 0;
|
||||||
|
let mut iter = self.historic.iter_older(lsn);
|
||||||
|
while let Some(layer) = iter.next_back() {
|
||||||
|
if !layer.is_incremental() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
count += 1;
|
||||||
|
total_size += layer.get_physical_size()?;
|
||||||
|
}
|
||||||
|
Ok((count, total_size))
|
||||||
|
}
|
||||||
|
|
||||||
// Set new open layer for a SegEntry.
|
// Set new open layer for a SegEntry.
|
||||||
// It's ok to rewrite previous open layer,
|
// It's ok to rewrite previous open layer,
|
||||||
// but only if it is not writeable anymore.
|
// but only if it is not writeable anymore.
|
||||||
|
|||||||
@@ -39,6 +39,10 @@ impl PageVersions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn size(&self) -> u64 {
|
||||||
|
self.file.pos
|
||||||
|
}
|
||||||
|
|
||||||
pub fn append_or_update_last(
|
pub fn append_or_update_last(
|
||||||
&mut self,
|
&mut self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
|
|||||||
@@ -154,12 +154,15 @@ pub trait Layer: Send + Sync {
|
|||||||
reconstruct_data: &mut PageReconstructData,
|
reconstruct_data: &mut PageReconstructData,
|
||||||
) -> Result<PageReconstructResult>;
|
) -> Result<PageReconstructResult>;
|
||||||
|
|
||||||
/// Return size of the segment at given LSN. (Only for blocky relations.)
|
/// Return logical size of the segment at given LSN. (Only for blocky relations.)
|
||||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
|
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
|
||||||
|
|
||||||
/// Does the segment exist at given LSN? Or was it dropped before it.
|
/// Does the segment exist at given LSN? Or was it dropped before it.
|
||||||
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
|
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
|
||||||
|
|
||||||
|
// Get physical size of the layer
|
||||||
|
fn get_physical_size(&self) -> Result<u64>;
|
||||||
|
|
||||||
/// Does this layer only contain some data for the segment (incremental),
|
/// Does this layer only contain some data for the segment (incremental),
|
||||||
/// or does it contain a version of every page? This is important to know
|
/// or does it contain a version of every page? This is important to know
|
||||||
/// for garbage collecting old layers: an incremental layer depends on
|
/// for garbage collecting old layers: an incremental layer depends on
|
||||||
|
|||||||
@@ -13,6 +13,7 @@
|
|||||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
use postgres_ffi::pg_constants::BLCKSZ;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::net::TcpListener;
|
use std::net::TcpListener;
|
||||||
use std::str;
|
use std::str;
|
||||||
@@ -42,6 +43,8 @@ use crate::tenant_mgr;
|
|||||||
use crate::walreceiver;
|
use crate::walreceiver;
|
||||||
use crate::CheckpointConfig;
|
use crate::CheckpointConfig;
|
||||||
|
|
||||||
|
const CHUNK_SIZE: u32 = 128; // 1Mb
|
||||||
|
|
||||||
// Wrapped in libpq CopyData
|
// Wrapped in libpq CopyData
|
||||||
enum PagestreamFeMessage {
|
enum PagestreamFeMessage {
|
||||||
Exists(PagestreamExistsRequest),
|
Exists(PagestreamExistsRequest),
|
||||||
@@ -91,7 +94,8 @@ struct PagestreamNblocksResponse {
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamGetPageResponse {
|
struct PagestreamGetPageResponse {
|
||||||
page: Bytes,
|
n_blocks: u32,
|
||||||
|
data: Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -162,7 +166,8 @@ impl PagestreamBeMessage {
|
|||||||
|
|
||||||
Self::GetPage(resp) => {
|
Self::GetPage(resp) => {
|
||||||
bytes.put_u8(102); /* tag from pagestore_client.h */
|
bytes.put_u8(102); /* tag from pagestore_client.h */
|
||||||
bytes.put(&resp.page[..]);
|
bytes.put_u32(resp.n_blocks);
|
||||||
|
bytes.put(&resp.data[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::Error(resp) => {
|
Self::Error(resp) => {
|
||||||
@@ -438,11 +443,18 @@ impl PageServerHandler {
|
|||||||
.entered();
|
.entered();
|
||||||
let tag = RelishTag::Relation(req.rel);
|
let tag = RelishTag::Relation(req.rel);
|
||||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||||
|
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
|
||||||
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
|
let blkno = req.blkno;
|
||||||
|
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
|
||||||
|
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||||
|
for i in 0..n_blocks {
|
||||||
|
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
|
||||||
|
data.extend_from_slice(&page);
|
||||||
|
}
|
||||||
|
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
|
||||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||||
page,
|
n_blocks,
|
||||||
|
data: data.freeze(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -66,8 +66,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
|||||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||||
row = pscur.fetchone()
|
row = pscur.fetchone()
|
||||||
print_gc_result(row)
|
print_gc_result(row)
|
||||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
assert row['layer_relfiles_total'] == layer_relfiles_remain + 1
|
||||||
assert row['layer_relfiles_removed'] == 2
|
assert row['layer_relfiles_removed'] == 0
|
||||||
assert row['layer_relfiles_dropped'] == 0
|
assert row['layer_relfiles_dropped'] == 0
|
||||||
|
|
||||||
# Insert two more rows and run GC.
|
# Insert two more rows and run GC.
|
||||||
@@ -81,7 +81,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
|||||||
row = pscur.fetchone()
|
row = pscur.fetchone()
|
||||||
print_gc_result(row)
|
print_gc_result(row)
|
||||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||||
assert row['layer_relfiles_removed'] == 2
|
assert row['layer_relfiles_removed'] == 0
|
||||||
assert row['layer_relfiles_dropped'] == 0
|
assert row['layer_relfiles_dropped'] == 0
|
||||||
|
|
||||||
# Do it again. Should again create two new layer files and remove old ones.
|
# Do it again. Should again create two new layer files and remove old ones.
|
||||||
@@ -92,8 +92,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
|||||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||||
row = pscur.fetchone()
|
row = pscur.fetchone()
|
||||||
print_gc_result(row)
|
print_gc_result(row)
|
||||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
|
||||||
assert row['layer_relfiles_removed'] == 2
|
assert row['layer_relfiles_removed'] == 0
|
||||||
assert row['layer_relfiles_dropped'] == 0
|
assert row['layer_relfiles_dropped'] == 0
|
||||||
|
|
||||||
# Run GC again, with no changes in the database. Should not remove anything.
|
# Run GC again, with no changes in the database. Should not remove anything.
|
||||||
@@ -101,7 +101,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
|||||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||||
row = pscur.fetchone()
|
row = pscur.fetchone()
|
||||||
print_gc_result(row)
|
print_gc_result(row)
|
||||||
assert row['layer_relfiles_total'] == layer_relfiles_remain
|
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
|
||||||
assert row['layer_relfiles_removed'] == 0
|
assert row['layer_relfiles_removed'] == 0
|
||||||
assert row['layer_relfiles_dropped'] == 0
|
assert row['layer_relfiles_dropped'] == 0
|
||||||
|
|
||||||
@@ -121,9 +121,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
|||||||
# Each relation fork is counted separately, hence 3.
|
# Each relation fork is counted separately, hence 3.
|
||||||
assert row['layer_relfiles_needed_as_tombstone'] == 3
|
assert row['layer_relfiles_needed_as_tombstone'] == 3
|
||||||
|
|
||||||
# The catalog updates also create new layer files of the catalogs, which
|
assert row['layer_relfiles_removed'] == 0
|
||||||
# are counted as 'removed'
|
|
||||||
assert row['layer_relfiles_removed'] > 0
|
|
||||||
|
|
||||||
# TODO Change the test to check actual CG of dropped layers.
|
# TODO Change the test to check actual CG of dropped layers.
|
||||||
# Each relation fork is counted separately, hence 3.
|
# Each relation fork is counted separately, hence 3.
|
||||||
|
|||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 3b166d06cf...6309cf1b52
Reference in New Issue
Block a user