Compare commits

..

3 Commits

Author SHA1 Message Date
Konstantin Knizhnik
7b1fea280b Fix clippy errors 2021-10-07 18:05:15 +03:00
Konstantin Knizhnik
20e7c0d76a Fix clippy errors 2021-10-07 18:03:51 +03:00
Konstantin Knizhnik
77d352400e Do not materialize pages during checkpoint, do it in GC 2021-10-07 16:52:59 +03:00
38 changed files with 581 additions and 1320 deletions

View File

@@ -24,12 +24,6 @@ jobs:
# A job to build postgres
build-postgres:
executor: zenith-build-executor
parameters:
build_type:
type: enum
enum: ["debug", "release"]
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
# Checkout the git repo (circleci doesn't have a flag to enable submodules here)
- checkout
@@ -45,7 +39,7 @@ jobs:
name: Restore postgres cache
keys:
# Restore ONLY if the rev key matches exactly
- v04-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
- v03-postgres-cache-{{ checksum "/tmp/cache-key-postgres" }}
# FIXME We could cache our own docker container, instead of installing packages every time.
- run:
@@ -65,12 +59,12 @@ jobs:
if [ ! -e tmp_install/bin/postgres ]; then
# "depth 1" saves some time by not cloning the whole repo
git submodule update --init --depth 1
make postgres -j8
make postgres
fi
- save_cache:
name: Save postgres cache
key: v04-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
key: v03-postgres-cache-{{ checksum "/tmp/cache-key-postgres" }}
paths:
- tmp_install
@@ -102,7 +96,7 @@ jobs:
name: Restore postgres cache
keys:
# Restore ONLY if the rev key matches exactly
- v04-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
- v03-postgres-cache-{{ checksum "/tmp/cache-key-postgres" }}
- restore_cache:
name: Restore rust cache
@@ -334,18 +328,14 @@ workflows:
build_and_test:
jobs:
- check-codestyle
- build-postgres:
name: build-postgres-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
- build-postgres
- build-zenith:
name: build-zenith-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
requires:
- build-postgres-<< matrix.build_type >>
- build-postgres
- run-pytest:
name: pg_regress-tests-<< matrix.build_type >>
matrix:

View File

@@ -11,8 +11,4 @@ test_output
.vscode
.zenith
integration_tests/.zenith
.mypy_cache
Dockerfile
.dockerignore
.mypy_cache

3
Cargo.lock generated
View File

@@ -2339,7 +2339,6 @@ dependencies = [
"byteorder",
"bytes",
"clap",
"const_format",
"crc32c",
"daemonize",
"fs2",
@@ -2359,7 +2358,6 @@ dependencies = [
"tokio-stream",
"walkdir",
"workspace_hack",
"zenith_metrics",
"zenith_utils",
]
@@ -2580,7 +2578,6 @@ version = "0.1.0"
dependencies = [
"lazy_static",
"libc",
"once_cell",
"prometheus",
]

View File

@@ -10,7 +10,6 @@ FROM zenithdb/build:buster AS pg-build
WORKDIR /zenith
COPY ./vendor/postgres vendor/postgres
COPY ./Makefile Makefile
ENV BUILD_TYPE release
RUN make -j $(getconf _NPROCESSORS_ONLN) -s postgres
RUN rm -rf postgres_install/build

View File

@@ -6,18 +6,6 @@ else
SECCOMP =
endif
#
# We differentiate between release / debug build types using the BUILD_TYPE
# environment variable.
#
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug
PG_CFLAGS = -O2 -g3 ${CFLAGS}
else
PG_CONFIGURE_OPTS = --enable-debug --enable-cassert --enable-depend
PG_CFLAGS = -O0 -g3 ${CFLAGS}
endif
#
# Top level Makefile to build Zenith and PostgreSQL
#
@@ -42,8 +30,10 @@ tmp_install/build/config.status:
+@echo "Configuring postgres build"
mkdir -p tmp_install/build
(cd tmp_install/build && \
../../vendor/postgres/configure CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
../../vendor/postgres/configure CFLAGS='-O0 -g3 $(CFLAGS)' \
--enable-cassert \
--enable-debug \
--enable-depend \
$(SECCOMP) \
--prefix=$(abspath tmp_install) > configure.log)

View File

@@ -25,7 +25,7 @@ Pageserver consists of:
On Ubuntu or Debian this set of packages should be sufficient to build the code:
```text
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev
libssl-dev clang
```
[Rust] 1.52 or later is also required.
@@ -108,13 +108,6 @@ postgres=# insert into t values(2,2);
INSERT 0 1
```
6. If you want to run tests afterwards (see below), you have to stop pageserver and all postgres instances you have just started:
```sh
> ./target/debug/zenith pg stop migration_check
> ./target/debug/zenith pg stop main
> ./target/debug/zenith stop
```
## Running tests
```sh

View File

@@ -7,9 +7,8 @@ The Page Server has a few different duties:
- Replay WAL that's applicable to the chunks that the Page Server maintains
- Backup to S3
S3 is the main fault-tolerant storage of all data, as there are no Page Server
replicas. We use a separate fault-tolerant WAL service to reduce latency. It
keeps track of WAL records which are not syncted to S3 yet.
The Page Server consists of multiple threads that operate on a shared
repository of page versions:

View File

@@ -240,7 +240,6 @@ impl CfgFileParams {
}
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("pageserver");
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
.arg(

View File

@@ -419,6 +419,7 @@ fn create_timeline(
let timelinedir = conf.timeline_path(&timelineid, tenantid);
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);

View File

@@ -23,7 +23,6 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashSet};
use std::convert::TryInto;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::Bound::Included;
@@ -31,8 +30,8 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use std::{fs, thread};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
@@ -57,7 +56,6 @@ mod image_layer;
mod inmemory_layer;
mod interval_tree;
mod layer_map;
mod page_versions;
mod storage_layer;
use delta_layer::DeltaLayer;
@@ -258,6 +256,19 @@ impl LayeredRepository {
timeline.init_current_logical_size()?;
let timeline = Arc::new(timeline);
// Load any new WAL after the last checkpoint into memory.
info!(
"Loading WAL for timeline {} starting at {}",
timelineid,
timeline.get_last_record_lsn()
);
if cfg!(debug_assertions) {
// check again after wal loading
Self::assert_size_calculation_matches_offloaded(Arc::clone(&timeline));
}
timelines.insert(timelineid, timeline.clone());
Ok(timeline)
}
@@ -523,6 +534,24 @@ impl LayeredRepository {
totals.elapsed = now.elapsed();
Ok(totals)
}
fn assert_size_calculation_matches(incremental: usize, timeline: &LayeredTimeline) {
match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) {
Ok(non_incremental) => {
if incremental != non_incremental {
error!("timeline size calculation diverged, incremental doesn't match non incremental. incremental={} non_incremental={}", incremental, non_incremental);
}
}
Err(e) => error!("failed to calculate non incremental timeline size: {:#}", e),
}
}
fn assert_size_calculation_matches_offloaded(timeline: Arc<LayeredTimeline>) {
let incremental = timeline.get_current_logical_size();
thread::spawn(move || {
Self::assert_size_calculation_matches(incremental, &timeline);
});
}
}
/// Metadata stored on disk for each timeline
@@ -1031,7 +1060,7 @@ impl LayeredTimeline {
self.timelineid
);
let mut layers = self.layers.lock().unwrap();
let (imgfilenames, deltafilenames) =
let (imgfilenames, mut deltafilenames) =
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
@@ -1059,7 +1088,11 @@ impl LayeredTimeline {
layers.insert_historic(Arc::new(layer));
}
// Then for the Delta files.
// Then for the Delta files. The delta files are created in order starting
// from the oldest file, because each DeltaLayer needs a reference to its
// predecessor.
deltafilenames.sort();
for filename in deltafilenames.iter() {
ensure!(filename.start_lsn < filename.end_lsn);
if filename.end_lsn > disk_consistent_lsn {
@@ -1072,12 +1105,27 @@ impl LayeredTimeline {
continue;
}
let layer = DeltaLayer::new(self.conf, self.timelineid, self.tenantid, filename);
let predecessor = layers.get(&filename.seg, filename.start_lsn);
let predecessor_str: String = if let Some(prec) = &predecessor {
prec.filename().display().to_string()
} else {
"none".to_string()
};
let layer = DeltaLayer::new(
self.conf,
self.timelineid,
self.tenantid,
filename,
predecessor,
);
info!(
"found layer {} on timeline {}",
"found layer {} on timeline {}, predecessor: {}",
layer.filename().display(),
self.timelineid,
predecessor_str,
);
layers.insert_historic(Arc::new(layer));
}
@@ -1252,7 +1300,7 @@ impl LayeredTimeline {
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn + 1;
start_lsn = self.ancestor_lsn;
trace!(
"creating layer for write for {} at branch point {}/{}",
seg,
@@ -1369,45 +1417,14 @@ impl LayeredTimeline {
break;
}
// Freeze the layer.
//
// This is a two-step process. First, we "freeze" the in-memory
// layer, to close it for new writes, and replace the original
// layer with the new frozen in-memory layer (and possibly a new
// open layer to hold changes newer than the cutoff.) Then we write
// the frozen layer to disk, and replace the in-memory frozen layer
// with the new on-disk layers.
let FreezeLayers {
frozen,
open: maybe_new_open,
} = oldest_layer.freeze(last_record_lsn)?;
// replace this layer with the new layers that 'freeze' returned
layers.pop_oldest_open();
if let Some(new_open) = maybe_new_open.clone() {
layers.insert_open(new_open);
}
// We temporarily insert InMemory layer into historic list here.
// TODO: check that all possible concurrent users of 'historic' treat it right
layers.insert_historic(frozen.clone());
let new_delta_layer = oldest_layer.write_to_disk(self)?;
created_historics = true;
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
let new_historics = frozen.write_to_disk(self)?;
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
// Add the historics to the LayerMap
for n in new_historics {
layers.insert_historic(n);
}
// Add the historic to the LayerMap
layers.insert_historic(new_delta_layer);
}
// Call unload() on all frozen layers, to release memory.
@@ -1505,181 +1522,212 @@ impl LayeredTimeline {
self.timelineid, cutoff
);
info!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
// Scan all on-disk layers in the timeline.
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk layer exists (only for non-dropped segments);
// 4. this layer doesn't serve as a tombstone for some older layer;
//
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
if seg.rel.is_relation() {
result.ondisk_relfiles_total += 1;
} else {
result.ondisk_nonrelfiles_total += 1;
}
'outer: loop {
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
let mut layer_to_materialize: Option<Arc<dyn Layer>> = None;
// Scan all on-disk layers in the timeline.
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk layer exists (only for non-dropped segments);
// 4. this layer doesn't serve as a tombstone for some older layer;
//
'for_all_historic_layers: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
// 1. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_cutoff += 1;
result.ondisk_relfiles_total += 1;
} else {
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
result.ondisk_nonrelfiles_total += 1;
}
continue 'outer;
}
// 2. Is it needed by a child branch?
for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
// 1. Create image layer for this relation?
if !l.is_dropped()
&& !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{
if l.is_incremental() {
layer_to_materialize = Some(Arc::clone(&l));
break 'for_all_historic_layers;
} else {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_cutoff += 1;
} else {
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
}
continue 'for_all_historic_layers;
}
}
// 2. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
"keeping {} {}-{} because it's needed by branch point {}",
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
*retain_lsn
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_branches += 1;
result.ondisk_relfiles_needed_by_cutoff += 1;
} else {
result.ondisk_nonrelfiles_needed_by_branches += 1;
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
}
continue 'outer;
continue 'for_all_historic_layers;
}
}
// 3. Is there a later on-disk layer for this relation?
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{
info!(
"keeping {} {}-{} because it is the latest layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
if seg.rel.is_relation() {
result.ondisk_relfiles_not_updated += 1;
} else {
result.ondisk_nonrelfiles_not_updated += 1;
}
continue 'outer;
}
// 4. Does this layer serve as a tombstome for some older layer?
if l.is_dropped() {
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
// Check if this layer serves as a tombstone for this timeline
// We have to do this separately from timeline check below,
// because LayerMap of this timeline is already locked.
let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
if is_tombstone {
info!(
"earlier layer exists at {} in {}",
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
// 3. Is it needed by a child branch?
for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
info!(
"check blocky relish size {} at {} in {} for layer {}-{}",
"keeping {} {}-{} because it's needed by branch point {}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
l.get_end_lsn(),
*retain_lsn
);
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
Some(size) => {
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
info!(
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
size, last_live_seg.segno, seg.segno
);
if last_live_seg.segno >= seg.segno {
is_tombstone = true;
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_branches += 1;
} else {
result.ondisk_nonrelfiles_needed_by_branches += 1;
}
continue 'for_all_historic_layers;
}
}
// 4. Does this layer serve as a tombstome for some older layer?
if l.is_dropped() {
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
// Check if this layer serves as a tombstone for this timeline
// We have to do this separately from timeline check below,
// because LayerMap of this timeline is already locked.
let mut is_tombstone =
layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
if is_tombstone {
info!(
"earlier layer exists at {} in {}",
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
info!(
"check blocky relish size {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
Some(size) => {
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
info!(
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
size, last_live_seg.segno, seg.segno
);
if last_live_seg.segno >= seg.segno {
is_tombstone = true;
}
}
_ => {
info!("blocky rel doesn't exist");
}
}
_ => {
info!("blocky rel doesn't exist");
}
} else {
info!(
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
is_tombstone =
ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
}
} else {
}
if is_tombstone {
info!(
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
is_tombstone = ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_as_tombstone += 1;
} else {
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
}
continue 'for_all_historic_layers;
}
}
if is_tombstone {
info!(
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
// We didn't find any reason to keep this file, so remove it.
info!(
"garbage collecting {} {}-{} {}",
l.get_seg_tag(),
l.get_start_lsn(),
l.get_end_lsn(),
l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
}
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_as_tombstone += 1;
} else {
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
}
continue 'outer;
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove {
doomed_layer.delete()?;
layers.remove_historic(doomed_layer.clone());
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
}
}
// We didn't find any reason to keep this file, so remove it.
info!(
"garbage collecting {} {}-{} {}",
l.get_seg_tag(),
l.get_start_lsn(),
l.get_end_lsn(),
l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
}
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove {
doomed_layer.delete()?;
layers.remove_historic(doomed_layer.clone());
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
if let Some(delta_layer) = layer_to_materialize {
drop(layers); // release lock, as far as new image layers are created only by GC thread,
let image_layer = ImageLayer::create_from_src(
self.conf,
&self,
&*delta_layer,
delta_layer.get_end_lsn(),
)?;
layers = self.layers.lock().unwrap();
info!(
"materialize layer {} {}-{}",
delta_layer.get_seg_tag(),
delta_layer.get_start_lsn(),
delta_layer.get_end_lsn()
);
layers.insert_historic(Arc::new(image_layer));
continue 'outer;
}
break 'outer;
}
result.elapsed = now.elapsed();
Ok(result)
}
@@ -1710,20 +1758,12 @@ impl LayeredTimeline {
loop {
match layer_ref.get_page_reconstruct_data(blknum, curr_lsn, &mut data)? {
PageReconstructResult::Complete => break,
PageReconstructResult::Continue(cont_lsn) => {
PageReconstructResult::Continue(cont_lsn, cont_layer) => {
// Fetch base image / more WAL from the returned predecessor layer
if let Some((cont_layer, cont_lsn)) = self.get_layer_for_read(seg, cont_lsn)? {
layer_arc = cont_layer;
layer_ref = &*layer_arc;
curr_lsn = cont_lsn;
continue;
} else {
bail!(
"could not find predecessor layer of segment {} at {}",
seg.rel,
cont_lsn
);
}
layer_arc = cont_layer;
layer_ref = &*layer_arc;
curr_lsn = cont_lsn;
continue;
}
PageReconstructResult::Missing(lsn) => {
// Oops, we could not reconstruct the page.

View File

@@ -42,13 +42,15 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
use crate::repository::WALRecord;
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use serde::{Deserialize, Serialize};
use zenith_utils::vec_map::VecMap;
use std::collections::BTreeMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
@@ -57,7 +59,7 @@ use std::fs::File;
use std::io::{BufWriter, Write};
use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use bookfile::{Book, BookWriter};
@@ -65,7 +67,6 @@ use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use super::blob::{read_blob, BlobRange};
use super::page_versions::OrderedBlockIter;
// Magic constant to identify a Zenith delta file
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
@@ -108,6 +109,12 @@ impl From<&DeltaLayer> for Summary {
}
}
#[derive(Serialize, Deserialize)]
struct PageVersionMeta {
page_image_range: Option<BlobRange>,
record_range: Option<BlobRange>,
}
///
/// DeltaLayer is the in-memory data structure associated with an
/// on-disk delta file. We keep a DeltaLayer in memory for each
@@ -132,6 +139,9 @@ pub struct DeltaLayer {
dropped: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
inner: Mutex<DeltaLayerInner>,
}
@@ -142,10 +152,10 @@ pub struct DeltaLayerInner {
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_version_metas: VecMap<(u32, Lsn), BlobRange>,
page_version_metas: BTreeMap<(u32, Lsn), PageVersionMeta>,
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: VecMap<Lsn, u32>,
relsizes: BTreeMap<Lsn, u32>,
}
impl Layer for DeltaLayer {
@@ -216,20 +226,18 @@ impl Layer for DeltaLayer {
// Scan the metadata BTreeMap backwards, starting from the given entry.
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let iter = inner
let mut iter = inner
.page_version_metas
.slice_range((Included(&minkey), Included(&maxkey)))
.iter()
.rev();
for ((_blknum, _lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
if let Some(img) = pv.page_image {
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() {
if let Some(img_range) = &entry.page_image_range {
// Found a page image, return it
let img = Bytes::from(read_blob(&page_version_reader, img_range)?);
reconstruct_data.page_img = Some(img);
need_image = false;
break;
} else if let Some(rec) = pv.record {
} else if let Some(rec_range) = &entry.record_range {
let rec = WALRecord::des(&read_blob(&page_version_reader, rec_range)?)?;
let will_init = rec.will_init;
reconstruct_data.records.push(rec);
if will_init {
@@ -247,9 +255,16 @@ impl Layer for DeltaLayer {
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
// caller know about the predecessor layer.
if need_image {
Ok(PageReconstructResult::Continue(self.start_lsn))
if let Some(cont_layer) = &self.predecessor {
Ok(PageReconstructResult::Continue(
self.start_lsn,
Arc::clone(cont_layer),
))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
} else {
Ok(PageReconstructResult::Complete)
}
@@ -258,22 +273,21 @@ impl Layer for DeltaLayer {
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
);
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.load()?;
let slice = inner
.relsizes
.slice_range((Included(&Lsn(0)), Included(&lsn)));
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
if let Some((_entry_lsn, entry)) = slice.last() {
Ok(*entry)
let result;
if let Some((_entry_lsn, entry)) = iter.next_back() {
result = *entry;
// Use the base image if needed
} else if let Some(predecessor) = &self.predecessor {
result = predecessor.get_seg_size(lsn)?;
} else {
Err(anyhow::anyhow!("could not find seg size in delta layer"))
result = 0;
}
Ok(result)
}
/// Does this segment exist at given LSN?
@@ -293,8 +307,8 @@ impl Layer for DeltaLayer {
///
fn unload(&self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.page_version_metas = VecMap::default();
inner.relsizes = VecMap::default();
inner.page_version_metas = BTreeMap::new();
inner.relsizes = BTreeMap::new();
inner.loaded = false;
Ok(())
}
@@ -320,22 +334,22 @@ impl Layer for DeltaLayer {
println!("--- relsizes ---");
let inner = self.load()?;
for (k, v) in inner.relsizes.as_slice() {
for (k, v) in inner.relsizes.iter() {
println!(" {}: {}", k, v);
}
println!("--- page versions ---");
let (_path, book) = self.open_book()?;
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
for (k, v) in inner.page_version_metas.iter() {
let mut desc = String::new();
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
if let Some(img) = pv.page_image.as_ref() {
write!(&mut desc, " img {} bytes", img.len())?;
if let Some(page_image_range) = v.page_image_range.as_ref() {
let image = read_blob(&chapter, page_image_range)?;
write!(&mut desc, " img {} bytes", image.len())?;
}
if let Some(rec) = pv.record.as_ref() {
if let Some(record_range) = v.record_range.as_ref() {
let record_bytes = read_blob(&chapter, record_range)?;
let rec = WALRecord::des(&record_bytes)?;
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
@@ -345,7 +359,7 @@ impl Layer for DeltaLayer {
wal_desc
)?;
}
println!(" blk {} at {}: {}", blk, lsn, desc);
println!(" blk {} at {}: {}", k.0, k.1, desc);
}
Ok(())
@@ -375,7 +389,7 @@ impl DeltaLayer {
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
#[allow(clippy::too_many_arguments)]
pub fn create(
pub fn create<'a>(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -383,13 +397,10 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: OrderedBlockIter,
relsizes: VecMap<Lsn, u32>,
predecessor: Option<Arc<dyn Layer>>,
page_versions: impl Iterator<Item = (&'a (u32, Lsn), &'a PageVersion)>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
}
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -400,9 +411,10 @@ impl DeltaLayer {
dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: true,
page_version_metas: VecMap::default(),
page_version_metas: BTreeMap::new(),
relsizes,
}),
predecessor,
};
let mut inner = delta_layer.inner.lock().unwrap();
@@ -419,33 +431,44 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, history) in page_versions {
for (lsn, page_version) in history.as_slice() {
if lsn >= &end_lsn {
continue;
}
for (key, page_version) in page_versions {
let page_image_range = page_version
.page_image
.as_ref()
.map(|page_image| page_version_writer.write_blob(page_image))
.transpose()?;
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
let record_range = page_version
.record
.as_ref()
.map(|record| {
let buf = WALRecord::ser(record)?;
page_version_writer.write_blob(&buf)
})
.transpose()?;
inner
.page_version_metas
.append((blknum, *lsn), blob_range)
.unwrap();
}
let old = inner.page_version_metas.insert(
*key,
PageVersionMeta {
page_image_range,
record_range,
},
);
assert!(old.is_none());
}
let book = page_version_writer.close()?;
// Write out page versions
let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER);
let buf = VecMap::ser(&inner.page_version_metas)?;
let buf = BTreeMap::ser(&inner.page_version_metas)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
// and relsizes to separate chapter
let mut chapter = book.new_chapter(REL_SIZES_CHAPTER);
let buf = VecMap::ser(&inner.relsizes)?;
let buf = BTreeMap::ser(&inner.relsizes)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
@@ -532,10 +555,10 @@ impl DeltaLayer {
}
let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?;
let page_version_metas = VecMap::des(&chapter)?;
let page_version_metas = BTreeMap::des(&chapter)?;
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
let relsizes = VecMap::des(&chapter)?;
let relsizes = BTreeMap::des(&chapter)?;
debug!("loaded from {}", &path.display());
@@ -554,6 +577,7 @@ impl DeltaLayer {
timelineid: ZTimelineId,
tenantid: ZTenantId,
filename: &DeltaFileName,
predecessor: Option<Arc<dyn Layer>>,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -565,9 +589,10 @@ impl DeltaLayer {
dropped: filename.dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
page_version_metas: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
predecessor,
}
}
@@ -588,9 +613,10 @@ impl DeltaLayer {
dropped: summary.dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
page_version_metas: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
predecessor: None,
})
}
}

View File

@@ -290,7 +290,11 @@ pub fn list_files(
deltafiles.push(deltafilename);
} else if let Some(imgfilename) = ImageFileName::from_str(fname) {
imgfiles.push(imgfilename);
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
} else if fname == "wal"
|| fname == "metadata"
|| fname == "ancestor"
|| fname.ends_with(".old")
{
// ignore these
} else {
warn!("unrecognized filename in timeline dir: {}", fname);

View File

@@ -12,19 +12,16 @@ use crate::layered_repository::{DeltaLayer, ImageLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::cmp::min;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::vec_map::VecMap;
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn;
use super::page_versions::PageVersions;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -37,42 +34,38 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
/// Frozen in-memory layers have an inclusive end LSN.
end_lsn: Option<Lsn>,
/// LSN of the oldest page version stored in this layer
oldest_pending_lsn: Lsn,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: RwLock<InMemoryLayerInner>,
/// Predecessor layer might be needed?
incremental: bool,
}
pub struct InMemoryLayerInner {
/// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>,
/// LSN of last record
end_lsn: Option<Lsn>,
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
///
page_versions: PageVersions,
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
///
/// `segsizes` tracks the size of the segment at different points in time.
///
/// For a blocky rel, there is always one entry, at the layer's start_lsn,
/// so that determining the size never depends on the predecessor layer. For
/// a non-blocky rel, 'segsizes' is not used and is always empty.
///
segsizes: VecMap<Lsn, u32>,
segsizes: BTreeMap<Lsn, u32>,
/// Writes are only allowed when true.
/// Set to false when this layer is in the process of being replaced.
writeable: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
}
impl InMemoryLayerInner {
@@ -86,13 +79,12 @@ impl InMemoryLayerInner {
fn get_seg_size(&self, lsn: Lsn) -> u32 {
// Scan the BTreeMap backwards, starting from the given entry.
let slice = self.segsizes.slice_range(..=lsn);
let mut iter = self.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
// We make sure there is always at least one entry
if let Some((_entry_lsn, entry)) = slice.last() {
if let Some((_entry_lsn, entry)) = iter.next_back() {
*entry
} else {
panic!("could not find seg size in in-memory layer");
0
}
}
}
@@ -124,6 +116,11 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn get_end_lsn(&self) -> Lsn {
let inner = self.inner.read().unwrap();
inner.drop_lsn.unwrap_or(Lsn(u64::MAX))
}
fn path(&self) -> Option<PathBuf> {
None
}
@@ -140,20 +137,6 @@ impl Layer for InMemoryLayer {
self.start_lsn
}
fn get_end_lsn(&self) -> Lsn {
if let Some(end_lsn) = self.end_lsn {
return Lsn(end_lsn.0 + 1);
}
let inner = self.inner.read().unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
drop_lsn
} else {
Lsn(u64::MAX)
}
}
fn is_dropped(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.drop_lsn.is_some()
@@ -170,16 +153,18 @@ impl Layer for InMemoryLayer {
assert!(self.seg.blknum_in_seg(blknum));
let predecessor: Option<Arc<dyn Layer>>;
{
let inner = self.inner.read().unwrap();
// Scan the page versions backwards, starting from `lsn`.
let iter = inner
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
.page_versions
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (_entry_lsn, entry) in iter {
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_image = false;
@@ -196,14 +181,16 @@ impl Layer for InMemoryLayer {
bail!("no page image or WAL record for requested page");
}
}
predecessor = inner.predecessor.clone();
// release lock on 'inner'
}
// If an older page image is needed to reconstruct the page, let the
// caller know
// caller know about the predecessor layer.
if need_image {
if self.incremental {
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
if let Some(cont_layer) = predecessor {
Ok(PageReconstructResult::Continue(self.start_lsn, cont_layer))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
@@ -215,10 +202,6 @@ impl Layer for InMemoryLayer {
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
);
let inner = self.inner.read().unwrap();
Ok(inner.get_seg_size(lsn))
@@ -258,7 +241,7 @@ impl Layer for InMemoryLayer {
}
fn is_incremental(&self) -> bool {
self.incremental
true
}
/// debugging function to print out the contents of the layer
@@ -276,20 +259,18 @@ impl Layer for InMemoryLayer {
self.timelineid, self.seg, self.start_lsn, end_str
);
for (k, v) in inner.segsizes.as_slice() {
for (k, v) in inner.segsizes.iter() {
println!("segsizes {}: {}", k, v);
}
for (blknum, history) in inner.page_versions.ordered_block_iter() {
for (lsn, pv) in history.as_slice() {
println!(
"blk {} at {}: {}/{}\n",
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
);
}
for (k, v) in inner.page_versions.iter() {
println!(
"blk {} at {}: {}/{}\n",
k.0,
k.1,
v.page_image.is_some(),
v.record.is_some()
);
}
Ok(())
@@ -303,19 +284,7 @@ pub struct NonWriteableError;
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
pub struct FreezeLayers {
/// Replacement layer for the layer which freeze was called on.
pub frozen: Arc<InMemoryLayer>,
/// New open layer containing leftover data.
pub open: Option<Arc<InMemoryLayer>>,
}
impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
}
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
@@ -339,26 +308,20 @@ impl InMemoryLayer {
start_lsn
);
// The segment is initially empty, so initialize 'segsizes' with 0.
let mut segsizes = VecMap::default();
if seg.rel.is_blocky() {
segsizes.append(start_lsn, 0).unwrap();
}
Ok(InMemoryLayer {
conf,
timelineid,
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: PageVersions::default(),
segsizes,
end_lsn: None,
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
writeable: true,
predecessor: None,
}),
})
}
@@ -392,7 +355,6 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
self.assert_not_frozen();
assert!(self.seg.blknum_in_seg(blknum));
trace!(
@@ -405,8 +367,9 @@ impl InMemoryLayer {
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
inner.end_lsn = Some(lsn);
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
let old = inner.page_versions.insert((blknum, lsn), pv);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -451,9 +414,7 @@ impl InMemoryLayer {
gapblknum,
blknum
);
let old = inner
.page_versions
.append_or_update_last(gapblknum, lsn, zeropv);
let old = inner.page_versions.insert((gapblknum, lsn), zeropv);
// We already had an entry for this LSN. That's odd..
if old.is_some() {
@@ -464,7 +425,7 @@ impl InMemoryLayer {
}
}
inner.segsizes.append_or_update_last(lsn, newsize).unwrap();
inner.segsizes.insert(lsn, newsize);
return Ok(newsize - oldsize);
}
}
@@ -473,20 +434,15 @@ impl InMemoryLayer {
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
assert!(
self.seg.rel.is_blocky(),
"put_truncation() called on a non-blocky rel"
);
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
inner.end_lsn = Some(lsn);
// check that this we truncate to a smaller size than segment was before the truncation
let oldsize = inner.get_seg_size(lsn);
assert!(segsize < oldsize);
let old = inner.segsizes.append_or_update_last(lsn, segsize).unwrap();
let old = inner.segsizes.insert(lsn, segsize);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -498,8 +454,6 @@ impl InMemoryLayer {
/// Remember that the segment was dropped at given LSN
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
@@ -537,11 +491,11 @@ impl InMemoryLayer {
start_lsn,
);
// Copy the segment size at the start LSN from the predecessor layer.
let mut segsizes = VecMap::default();
// For convenience, copy the segment size from the predecessor layer
let mut segsizes = BTreeMap::new();
if seg.rel.is_blocky() {
let size = src.get_seg_size(start_lsn)?;
segsizes.append(start_lsn, size).unwrap();
segsizes.insert(start_lsn, size);
}
Ok(InMemoryLayer {
@@ -550,14 +504,14 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: PageVersions::default(),
end_lsn: None,
page_versions: BTreeMap::new(),
segsizes,
writeable: true,
predecessor: Some(src),
}),
})
}
@@ -567,118 +521,12 @@ impl InMemoryLayer {
inner.writeable
}
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
/// All data up to and including `cutoff_lsn`
/// is copied to `frozen`, while the remaining data is copied to `open`.
/// After completion, self is non-writeable, but not frozen.
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
info!(
"freezing in memory layer {} on timeline {} at {} (oldest {})",
self.filename().display(),
self.timelineid,
cutoff_lsn,
self.oldest_pending_lsn
);
self.assert_not_frozen();
let self_ref = self.clone();
let mut inner = self_ref.inner.write().unwrap();
// Dropped layers don't need any special freeze actions,
// they are marked as non-writeable at drop and just
// written out to disk by checkpointer.
if inner.drop_lsn.is_some() {
assert!(!inner.writeable);
info!(
"freezing in memory layer for {} on timeline {} is dropped at {}",
self.seg,
self.timelineid,
inner.drop_lsn.unwrap()
);
// There should be no newer layer that refers this non-writeable layer,
// because layer that is created after dropped one represents a new rel.
return Ok(FreezeLayers {
frozen: self,
open: None,
});
}
assert!(inner.writeable);
inner.writeable = false;
// Divide all the page versions into old and new
// at the 'cutoff_lsn' point.
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
let cutoff_lsn_exclusive = Lsn(cutoff_lsn.0 + 1);
let (before_segsizes, mut after_segsizes) = inner.segsizes.split_at(&cutoff_lsn_exclusive);
if let Some((lsn, _size)) = after_segsizes.as_slice().first() {
after_oldest_lsn.accum(min, *lsn);
}
let (before_page_versions, after_page_versions) = inner
.page_versions
.split_at(cutoff_lsn_exclusive, &mut after_oldest_lsn);
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
tenantid: self.tenantid,
timelineid: self.timelineid,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
incremental: self.incremental,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
}),
});
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let mut new_open = Self::create_successor_layer(
self.conf,
frozen.clone(),
self.timelineid,
self.tenantid,
cutoff_lsn + 1,
after_oldest_lsn.0.unwrap(),
)?;
let new_inner = new_open.inner.get_mut().unwrap();
// Ensure page_versions doesn't contain anything
// so we can just replace it
assert!(new_inner.page_versions.is_empty());
new_inner.page_versions = after_page_versions;
new_inner.segsizes.extend(&mut after_segsizes).unwrap();
Some(Arc::new(new_open))
} else {
None
};
Ok(FreezeLayers { frozen, open })
}
/// Write the this frozen in-memory layer to disk.
///
/// Returns new layers that replace this one.
/// If not dropped, returns a new image layer containing the page versions
/// at the `end_lsn`. Can also return a DeltaLayer that includes all the
/// WAL records between start and end LSN. (The delta layer is not needed
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
self.filename().display(),
self.end_lsn.unwrap_or(Lsn(0)),
self.get_end_lsn()
);
/// Returns new DeltaLayer that includes all the
/// WAL records between start and end LSN.
///
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Arc<dyn Layer>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to aquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -689,7 +537,14 @@ impl InMemoryLayer {
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
assert!(!inner.writeable);
trace!(
"write_to_disk {} end_lsn is {}",
self.filename().display(),
inner.end_lsn.unwrap_or(Lsn(0)),
);
let predecessor = inner.predecessor.clone();
if let Some(drop_lsn) = inner.drop_lsn {
let delta_layer = DeltaLayer::create(
@@ -700,7 +555,8 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
inner.page_versions.ordered_block_iter(),
predecessor,
inner.page_versions.iter(),
inner.segsizes.clone(),
)?;
trace!(
@@ -709,16 +565,12 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn
);
return Ok(vec![Arc::new(delta_layer)]);
return Ok(Arc::new(delta_layer));
}
let end_lsn = self.end_lsn.unwrap();
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
let end_lsn = inner.end_lsn.unwrap();
if self.start_lsn != end_lsn {
let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1));
// Write the page versions before the cutoff to disk.
let delta_layer = DeltaLayer::create(
self.conf,
@@ -728,30 +580,14 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
false,
inner.page_versions.ordered_block_iter(),
before_segsizes,
predecessor,
inner.page_versions.iter(),
inner.segsizes.clone(),
)?;
frozen_layers.push(Arc::new(delta_layer));
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
self.start_lsn,
end_lsn
);
Ok(Arc::new(delta_layer))
} else {
for (_blknum, history) in inner.page_versions.ordered_block_iter() {
let (lsn, _pv) = history.as_slice().first().unwrap();
assert!(lsn >= &end_lsn);
}
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
Ok(Arc::new(image_layer))
}
drop(inner);
// Write a new base image layer at the cutoff point
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
frozen_layers.push(Arc::new(image_layer));
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
Ok(frozen_layers)
}
}

View File

@@ -219,7 +219,7 @@ where
}
}
// No more elements at this point. Move to next point.
if let Some((point_key, point)) = self.point_iter.next() {
if let Some((point_key, point)) = self.point_iter.next_back() {
self.elem_iter = Some((*point_key, point.elements.iter()));
continue;
} else {

View File

@@ -1,132 +0,0 @@
use std::{collections::HashMap, ops::RangeBounds};
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
#[derive(Debug, Default)]
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
impl PageVersions {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn append_or_update_last(
&mut self,
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> Option<PageVersion> {
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, page_version).unwrap()
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
blknum: u32,
range: R,
) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
}
/// Split the page version map into two.
///
/// Left contains everything up to and not including [`cutoff_lsn`].
/// Right contains [`cutoff_lsn`] and everything after.
pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum<Lsn>) -> (Self, Self) {
let mut before_blocks = HashMap::new();
let mut after_blocks = HashMap::new();
for (blknum, vec_map) in self.0.iter() {
let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn);
if !before_versions.is_empty() {
let old = before_blocks.insert(*blknum, before_versions);
assert!(old.is_none());
}
if !after_versions.is_empty() {
let (first_lsn, _first_pv) = &after_versions.as_slice()[0];
after_oldest_lsn.accum(std::cmp::min, *first_lsn);
let old = after_blocks.insert(*blknum, after_versions);
assert!(old.is_none());
}
}
(Self(before_blocks), Self(after_blocks))
}
/// Iterate through block-history pairs in block order.
pub fn ordered_block_iter(&self) -> OrderedBlockIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
ordered_blocks.sort_unstable();
OrderedBlockIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
}
}
}
pub struct OrderedBlockIter<'a> {
page_versions: &'a PageVersions,
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
}
impl<'a> Iterator for OrderedBlockIter<'a> {
type Item = (u32, &'a VecMap<Lsn, PageVersion>);
fn next(&mut self) -> Option<Self::Item> {
let blknum: u32 = *self.ordered_blocks.get(self.cur_block_idx)?;
self.cur_block_idx += 1;
Some((blknum, self.page_versions.0.get(&blknum).unwrap()))
}
}
#[cfg(test)]
mod tests {
use super::*;
const EMPTY_PAGE_VERSION: PageVersion = PageVersion {
page_image: None,
record: None,
};
#[test]
fn test_ordered_iter() {
let mut page_versions = PageVersions::default();
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let old = page_versions.append_or_update_last(blknum, Lsn(lsn), EMPTY_PAGE_VERSION);
assert!(old.is_none());
}
}
let mut iter = page_versions.ordered_block_iter();
for blknum in 0..BLOCKS {
let (actual_blknum, vec_map) = iter.next().unwrap();
let slice = vec_map.as_slice();
assert_eq!(actual_blknum, blknum);
assert_eq!(slice.len(), LSNS as usize);
for lsn in 0..LSNS {
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
}
}

View File

@@ -10,6 +10,7 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
@@ -86,9 +87,9 @@ pub struct PageReconstructData {
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain all the required data, the caller should collect
/// more data from the returned predecessor layer at the returned LSN.
Continue(Lsn, Arc<dyn Layer>),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
@@ -144,8 +145,8 @@ pub trait Layer: Send + Sync {
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call. If this returns PageReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data'
/// on first call. If this returns PageReconstructResult::Continue, call
/// again on the returned predecessor layer with the same 'reconstruct_data'
/// to collect more data.
fn get_page_reconstruct_data(
&self,

View File

@@ -36,8 +36,8 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_GC_HORIZON: u64 = 64u64 * 1024 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
@@ -126,6 +126,10 @@ impl PageServerConf {
self.timeline_path(timelineid, tenantid).join("ancestor")
}
fn wal_dir_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf {
self.timeline_path(timelineid, tenantid).join("wal")
}
//
// Postgres distribution paths
//

View File

@@ -100,6 +100,10 @@ pub fn create_repository_for_tenant(
Ok(())
}
pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc<dyn Repository>) {
access_repository().insert(tenantid, repo);
}
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
access_repository()
.get(&tenantid)

View File

@@ -16,11 +16,14 @@ use log::*;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::cell::Cell;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs;
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
@@ -122,7 +125,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
}
fn walreceiver_main(
_conf: &PageServerConf,
conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
@@ -192,6 +195,7 @@ fn walreceiver_main(
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn;
trace!("received XLogData between {} and {}", startlsn, endlsn);
@@ -232,6 +236,34 @@ fn walreceiver_main(
last_rec_lsn = lsn;
}
// Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each),
// "checkpoint" the repository to flush all the changes from WAL we've processed
// so far to disk. After this, we don't need the original WAL anymore, and it
// can be removed. This is probably too aggressive for production, but it's useful
// to expose bugs now.
//
// TODO: We don't actually dare to remove the WAL. It's useful for debugging,
// and we might it for logical decoding other things in the future. Although
// we should also be able to fetch it back from the WAL safekeepers or S3 if
// needed.
if prev_last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE)
!= last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE)
{
info!("switched segment {} to {}", prev_last_rec_lsn, last_rec_lsn);
let (oldest_segno, newest_segno) = find_wal_file_range(
conf,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
last_rec_lsn,
&tenantid,
)?;
if newest_segno - oldest_segno >= 10 {
// TODO: This is where we could remove WAL older than last_rec_lsn.
//remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?;
}
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {}", endlsn);
caught_up = true;
@@ -253,7 +285,7 @@ fn walreceiver_main(
);
if reply_requested {
Some(last_rec_lsn)
Some(timeline.get_last_record_lsn())
} else {
None
}
@@ -277,6 +309,47 @@ fn walreceiver_main(
Ok(())
}
fn find_wal_file_range(
conf: &PageServerConf,
timeline: &ZTimelineId,
wal_seg_size: usize,
written_upto: Lsn,
tenant: &ZTenantId,
) -> Result<(u64, u64)> {
let written_upto_segno = written_upto.segment_number(wal_seg_size);
let mut oldest_segno = written_upto_segno;
let mut newest_segno = written_upto_segno;
// Scan the wal directory, and count how many WAL filed we could remove
let wal_dir = conf.wal_dir_path(timeline, tenant);
for entry in fs::read_dir(wal_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
continue;
}
let filename = path.file_name().unwrap().to_str().unwrap();
if IsXLogFileName(filename) {
let (segno, _tli) = XLogFromFileName(filename, wal_seg_size);
if segno > written_upto_segno {
// that's strange.
warn!("there is a WAL file from future at {}", path.display());
continue;
}
oldest_segno = min(oldest_segno, segno);
newest_segno = max(newest_segno, segno);
}
}
// FIXME: would be good to assert that there are no gaps in the WAL files
Ok((oldest_segno, newest_segno))
}
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
///
/// See the [postgres docs] for more details.

View File

@@ -33,11 +33,11 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
FROM generate_series(1, 100000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
print('LSN after 200100 rows: ' + lsn_b)
print('LSN after 100100 rows: ' + lsn_b)
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a])
@@ -46,15 +46,15 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
FROM generate_series(1, 100000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
print('LSN after 400100 rows: ' + lsn_c)
print('LSN after 200100 rows: ' + lsn_c)
# Branch at the point where only 200100 rows were inserted
# Branch at the point where only 200 rows were inserted
zenith_cli.run(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b])
pg_hundred = postgres.create_start("test_branch_behind_hundred")
@@ -70,11 +70,11 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
more_pg_conn = pg_more.connect()
more_cur = more_pg_conn.cursor()
more_cur.execute('SELECT count(*) FROM foo')
assert more_cur.fetchone() == (200100, )
assert more_cur.fetchone() == (100100, )
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (400100, )
assert main_cur.fetchone() == (200100, )
# Check bad lsn's for branching

View File

@@ -136,34 +136,9 @@ class ZenithBenchmarker:
# The metric should be an integer, as it's a number of bytes. But in general
# all prometheus metrics are floats. So to be pedantic, read it as a float
# and round to integer.
matches = re.search(r'^pageserver_disk_io_bytes{io_operation="write"} (\S+)$', all_metrics,
re.MULTILINE)
matches = re.search(r'pageserver_disk_io_bytes{io_operation="write"} (\S+)', all_metrics)
return int(round(float(matches.group(1))))
def get_peak_mem(self, pageserver) -> int:
"""
Fetch the "maxrss" metric from the pageserver
"""
# Fetch all the exposed prometheus metrics from page server
all_metrics = pageserver.http_client().get_metrics()
# See comment in get_io_writes()
matches = re.search(r'^pageserver_maxrss_kb (\S+)$', all_metrics,
re.MULTILINE)
return int(round(float(matches.group(1))))
def get_timeline_size(self, repo_dir: str, tenantid: str, timelineid: str):
"""
Calculate the on-disk size of a timeline
"""
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes
@contextmanager
def record_pageserver_writes(self, pageserver, metric_name):
"""

View File

@@ -71,9 +71,7 @@ def pytest_configure(config):
# This is bad; we don't want any of those processes polluting the
# result of the test.
# NOTE this shows as an internal pytest error, there might be a better way
raise Exception(
'Found interfering processes running. Stop all Zenith pageservers, nodes, WALs, as well as stand-alone Postgres.'
)
raise Exception('found interfering processes running')
def determine_scope(fixture_name: str, config: Any) -> str:
@@ -794,17 +792,12 @@ def read_pid(path: Path):
return int(path.read_text())
@dataclass
class WalAcceptorPort:
pg: int
http: int
@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
wa_bin_path: Path
data_dir: Path
port: WalAcceptorPort
port: int
num: int # identifier for logging
pageserver_port: int
auth_token: Optional[str] = None
@@ -816,8 +809,7 @@ class WalAcceptor:
cmd = [str(self.wa_bin_path)]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
cmd.extend(["-l", f"localhost:{self.port}"])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
@@ -874,14 +866,14 @@ class WalAcceptor:
# "replication=0" hacks psycopg not to send additional queries
# on startup, see https://github.com/psycopg/psycopg2/pull/482
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
with closing(psycopg2.connect(connstr)) as conn:
# server doesn't support transactions
conn.autocommit = True
with conn.cursor() as cur:
request_json = json.dumps(request)
print(f"JSON_CTRL request on port {self.port.pg}: {request_json}")
print(f"JSON_CTRL request on port {self.port}: {request_json}")
cur.execute("JSON_CTRL " + request_json)
all = cur.fetchall()
print(f"JSON_CTRL response: {all[0][0]}")
@@ -904,10 +896,7 @@ class WalAcceptorFactory:
wa = WalAcceptor(
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=WalAcceptorPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
),
port=self.port_distributor.get_port(),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
@@ -931,7 +920,7 @@ class WalAcceptorFactory:
def get_connstrs(self) -> str:
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances])
return ','.join(["localhost:{}".format(wa.port) for wa in self.instances])
@zenfixture

View File

@@ -4,6 +4,19 @@ from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def get_timeline_size(repo_dir: str, tenantid: str, timelineid: str):
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
if 'wal' in dirs:
dirs.remove('wal') # don't visit 'wal' subdirectory
return totalbytes
#
# Run bulk INSERT test.
#
@@ -12,7 +25,6 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
# 1. Time to INSERT 5 million rows
# 2. Disk writes
# 3. Disk space used
# 4. Peak memory usage
#
def test_bulk_insert(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin, zenith_cli, zenbenchmark, repo_dir: str):
# Create a branch for us
@@ -43,9 +55,6 @@ def test_bulk_insert(postgres: PostgresFactory, pageserver: ZenithPageserver, pg
# time and I/O
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Record peak memory usage
zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(pageserver) / 1024, 'MB')
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
timeline_size = get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')

View File

@@ -4,6 +4,19 @@ from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def get_timeline_size(repo_dir: str, tenantid: str, timelineid: str):
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
if 'wal' in dirs:
dirs.remove('wal') # don't visit 'wal' subdirectory
return totalbytes
#
# Run a very short pgbench test.
#
@@ -51,5 +64,5 @@ def test_pgbench(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
timeline_size = get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')

View File

@@ -1,74 +0,0 @@
# Demonstrate Write Amplification with naive oldest-first layer checkpointing
# algorithm.
#
# In each iteration of the test, we create a new table that's slightly under 10
# MB in size (10 MB is the current "segment size" used by the page server). Then
# we make a tiny update to all the tables already created. This creates a WAL
# pattern where you have a lot of updates on one segment (the newly created
# one), alternating with a small updates on all relations. This is the worst
# case scenario for the naive checkpointing policy where we write out the layers
# in LSN order, writing the oldest layer first. That creates a new 10 MB image
# layer to be created for each of those small updates. This is the Write
# Amplification problem at its finest.
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def test_write_amplification(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin, zenith_cli, zenbenchmark, repo_dir: str):
# Create a branch for us
zenith_cli.run(["branch", "test_write_amplification", "empty"])
pg = postgres.create_start('test_write_amplification')
print("postgres is running on 'test_write_amplification' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect();
pscur = psconn.cursor()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Get the timeline ID of our branch. We need it for the 'do_gc' command
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'):
with zenbenchmark.record_duration('run'):
# NOTE: Because each iteration updates every table already created,
# the runtime and write amplification is O(n^2), where n is the
# number of iterations.
for i in range(25):
cur.execute(f'''
CREATE TABLE tbl{i} AS
SELECT g as i, 'long string to consume some space' || g as t
FROM generate_series(1, 100000) g
''')
cur.execute(f"create index on tbl{i} (i);")
for j in range(1, i):
cur.execute(f"delete from tbl{j} where i = {i}")
# Force checkpointing. As of this writing, we don't have
# a back-pressure mechanism, and the page server cannot
# keep up digesting and checkpointing the WAL at the
# rate that it is generated. If we don't force a
# checkpoint, the WAL will just accumulate in memory
# until you hit OOM error. So in effect, we use much
# more memory to hold the incoming WAL, and write them
# out in larger batches than we'd really want. Using
# more memory hides the write amplification problem this
# test tries to demonstrate.
#
# The write amplification problem is real, and using
# more memory isn't the right solution. We could
# demonstrate the effect also by generating the WAL
# slower, adding some delays in this loop. But forcing
# the the checkpointing and GC makes the test go faster,
# with the same total I/O effect.
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')

View File

@@ -28,11 +28,9 @@ humantime = "2.1.0"
walkdir = "2"
serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3"
const_format = "0.2.21"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { path = "../workspace_hack" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -76,43 +76,6 @@ safekeepers.
See README_PROTO.md for a more detailed desription of the consensus
protocol. spec/ contains TLA+ specification of it.
# Q&A
Q: Why have a separate service instead of connecting Page Server directly to a
primary PostgreSQL node?
A: Page Server is a single server which can be lost. As our primary
fault-tolerant storage is S3, we do not want to wait for it before
committing a transaction. The WAL service acts as a temporary fault-tolerant
storage for recent data before it gets to the Page Server and then finally
to S3. Whenever WALs and pages are committed to S3, WAL's storage can be
trimmed.
Q: What if the compute node evicts a page, needs it back, but the page is yet
to reach the Page Server?
A: If the compute node has evicted a page, all changes from that page are
already committed, i.e. they are saved on majority of WAL safekeepers. These
WAL records will eventually reach the Page Server. The Page Server notes
that the compute note requests pages with a very recent LSN and will not
respond to the compute node until it a corresponding WAL is received from WAL
safekeepers.
Q: How long may Page Server wait for?
A: Not too long, hopefully. If a page is evicted, it probably was not used for
a while, so the WAL service have had enough time to push changes to the Page
Server. There may be issues if there is no backpressure and compute node with
WAL service run ahead of Page Server, though.
There is no backpressure right now, so you may even see some spurious
timeouts in tests.
Q: How do WAL safekeepers communicate with each other?
A: They may only send each other messages via the compute node, they never
communicate directly with each other.
Q: Why have a consensus algorithm if there is only a single compute node?
A: Actually there may be moments with multiple PostgreSQL nodes running at the
same time. E.g. we are bringing one up and one down. We would like to avoid
simultaneous writes from different nodes, so there should be a consensus on
who is the primary node.
# Terminology

View File

@@ -3,23 +3,18 @@
//
use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::env;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::logging;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
let arg_matches = App::new("Zenith wal_acceptor")
.about("Store WAL stream to local file system and push it to WAL receivers")
.arg(
@@ -30,18 +25,11 @@ fn main() -> Result<()> {
.help("Path to the WAL acceptor data directory"),
)
.arg(
Arg::with_name("listen-pg")
Arg::with_name("listen")
.short("l")
.long("listen-pg")
.alias("listen") // for compatibility
.long("listen")
.takes_value(true)
.help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")),
)
.arg(
Arg::with_name("listen-http")
.long("listen-http")
.takes_value(true)
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("pageserver")
@@ -82,8 +70,7 @@ fn main() -> Result<()> {
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
listen_addr: "localhost:5454".to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
@@ -104,12 +91,8 @@ fn main() -> Result<()> {
conf.daemonize = true;
}
if let Some(addr) = arg_matches.value_of("listen-pg") {
conf.listen_pg_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("listen-http") {
conf.listen_http_addr = addr.to_owned();
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
@@ -131,11 +114,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let log_filename = conf.data_dir.join("wal_acceptor.log");
let (_scope_guard, log_file) = logging::init(log_filename, conf.daemonize)?;
let http_listener = TcpListener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
})?;
if conf.daemonize {
info!("daemonizing...");
@@ -158,16 +136,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let mut threads = Vec::new();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
// No authentication at all: read-only metrics only, early stage.
let router = endpoint::make_router();
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
threads.push(http_endpoint_thread);
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()

View File

@@ -11,23 +11,12 @@ pub mod send_wal;
pub mod timeline;
pub mod wal_service;
pub mod defaults {
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
}
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub listen_addr: String,
pub pageserver_addr: Option<String>,
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
pub pageserver_auth_token: Option<String>,

View File

@@ -42,7 +42,7 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
);
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);
let me_conf: Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
let callme = format!(

View File

@@ -15,11 +15,8 @@ use std::cmp::min;
use std::io;
use std::io::Read;
use lazy_static::lazy_static;
use crate::replication::HotStandbyFeedback;
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::pq_proto::SystemId;
@@ -282,38 +279,6 @@ pub trait Storage {
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
}
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_flush_lsn",
"Current flush_lsn, grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_flush_lsn int gauge vec");
static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_commit_lsn",
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_commit_lsn int gauge vec");
}
struct SafeKeeperMetrics {
flush_lsn: Gauge,
commit_lsn: Gauge,
}
impl SafeKeeperMetrics {
fn new(ztli: ZTimelineId) -> SafeKeeperMetrics {
let ztli_str = format!("{}", ztli);
SafeKeeperMetrics {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&ztli_str]),
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&ztli_str]),
}
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<ST: Storage> {
@@ -321,8 +286,6 @@ pub struct SafeKeeper<ST: Storage> {
/// Established by reading wal.
pub flush_lsn: Lsn,
pub tli: u32,
// Cached metrics so we don't have to recompute labels on each update.
metrics: Option<SafeKeeperMetrics>,
/// not-yet-flushed pairs of same named fields in s.*
pub commit_lsn: Lsn,
pub truncate_lsn: Lsn,
@@ -341,7 +304,6 @@ where
SafeKeeper {
flush_lsn,
tli,
metrics: None,
commit_lsn: state.commit_lsn,
truncate_lsn: state.truncate_lsn,
storage,
@@ -393,8 +355,6 @@ where
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage.persist(&self.s, true)?;
self.metrics = Some(SafeKeeperMetrics::new(self.s.server.ztli));
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
@@ -518,11 +478,6 @@ where
}
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
self.metrics
.as_ref()
.unwrap()
.flush_lsn
.set(u64::from(self.flush_lsn) as f64);
}
// Advance commit_lsn taking into account what we have locally. xxx this
@@ -540,11 +495,6 @@ where
sync_control_file |=
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.as_ref()
.unwrap()
.commit_lsn
.set(u64::from(self.commit_lsn) as f64);
}
self.truncate_lsn = msg.h.truncate_lsn;

View File

@@ -13,9 +13,9 @@ use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: WalAcceptorConf) -> Result<()> {
info!("Starting wal acceptor on {}", conf.listen_pg_addr);
let listener = TcpListener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
info!("Starting wal acceptor on {}", conf.listen_addr);
let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_addr, e);
e
})?;

View File

@@ -7,4 +7,3 @@ edition = "2018"
prometheus = {version = "0.12", default_features=false} # removes protobuf dependency
libc = "0.2"
lazy_static = "1.4"
once_cell = "1.8.0"

View File

@@ -3,10 +3,7 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use lazy_static::lazy_static;
use once_cell::race::OnceBox;
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};
@@ -23,55 +20,17 @@ pub use wrappers::{CountedReader, CountedWriter};
/// Metrics gathering is a relatively simple and standalone operation, so
/// it might be fine to do it this way to keep things simple.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
update_rusage_metrics();
update_io_metrics();
prometheus::gather()
}
static COMMON_METRICS_PREFIX: OnceBox<&str> = OnceBox::new();
/// Sets a prefix which will be used for all common metrics, typically a service
/// name like 'pageserver'. Should be executed exactly once in the beginning of
/// any executable which uses common metrics.
pub fn set_common_metrics_prefix(prefix: &'static str) {
// Not unwrap() because metrics may be initialized after multiple threads have been started.
COMMON_METRICS_PREFIX
.set(prefix.into())
.unwrap_or_else(|_| {
eprintln!(
"set_common_metrics_prefix() was called second time with '{}', exiting",
prefix
);
std::process::exit(1);
});
}
/// Prepends a prefix to a common metric name so they are distinguished between
/// different services, see https://github.com/zenithdb/zenith/pull/681
/// A call to set_common_metrics_prefix() is necessary prior to calling this.
pub fn new_common_metric_name(unprefixed_metric_name: &str) -> String {
// Not unwrap() because metrics may be initialized after multiple threads have been started.
format!(
"{}_{}",
COMMON_METRICS_PREFIX.get().unwrap_or_else(|| {
eprintln!("set_common_metrics_prefix() was not called, but metrics are used, exiting");
std::process::exit(1);
}),
unprefixed_metric_name
)
}
lazy_static! {
static ref DISK_IO_BYTES: IntGaugeVec = register_int_gauge_vec!(
new_common_metric_name("disk_io_bytes"),
"pageserver_disk_io_bytes",
"Bytes written and read from disk, grouped by the operation (read|write)",
&["io_operation"]
)
.expect("Failed to register disk i/o bytes int gauge vec");
static ref MAXRSS_KB: IntGauge = register_int_gauge!(
new_common_metric_name("maxrss_kb"),
"Memory usage (Maximum Resident Set Size)"
)
.expect("Failed to register maxrss_kb int gauge");
}
// Records I/O stats in a "cross-platform" way.
@@ -83,7 +42,7 @@ lazy_static! {
// We know the size of the block, so we can determine the I/O bytes out of it.
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
#[allow(clippy::unnecessary_cast)]
fn update_rusage_metrics() {
fn update_io_metrics() {
let rusage_stats = get_rusage_stats();
const BYTES_IN_BLOCK: i64 = 512;
@@ -93,7 +52,6 @@ fn update_rusage_metrics() {
DISK_IO_BYTES
.with_label_values(&["write"])
.set(rusage_stats.ru_oublock * BYTES_IN_BLOCK);
MAXRSS_KB.set(rusage_stats.ru_maxrss);
}
fn get_rusage_stats() -> libc::rusage {

View File

@@ -9,14 +9,14 @@ use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use std::net::TcpListener;
use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter};
use zenith_metrics::{register_int_counter, IntCounter};
use zenith_metrics::{Encoder, TextEncoder};
use super::error::ApiError;
lazy_static! {
static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!(
new_common_metric_name("serve_metrics_count"),
"pageserver_serve_metrics_count",
"Number of metric requests made"
)
.expect("failed to define a metric");

View File

@@ -8,9 +8,6 @@ pub mod lsn;
/// SeqWait allows waiting for a future sequence number to arrive
pub mod seqwait;
/// append only ordered map implemented with a Vec
pub mod vec_map;
// Async version of SeqWait. Currently unused.
// pub mod seqwait_async;

View File

@@ -1,293 +0,0 @@
use std::{cmp::Ordering, ops::RangeBounds};
use serde::{Deserialize, Serialize};
/// Ordered map datastructure implemented in a Vec.
/// Append only - can only add keys that are larger than the
/// current max key.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VecMap<K, V>(Vec<(K, V)>);
impl<K, V> Default for VecMap<K, V> {
fn default() -> Self {
VecMap(Default::default())
}
}
#[derive(Debug)]
pub struct InvalidKey;
impl<K: Ord, V> VecMap<K, V> {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn as_slice(&self) -> &[(K, V)] {
self.0.as_slice()
}
/// This function may panic if given a range where the lower bound is
/// greater than the upper bound.
pub fn slice_range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
use std::ops::Bound::*;
let binary_search = |k: &K| self.0.binary_search_by_key(&k, extract_key);
let start_idx = match range.start_bound() {
Unbounded => 0,
Included(k) => binary_search(k).unwrap_or_else(std::convert::identity),
Excluded(k) => match binary_search(k) {
Ok(idx) => idx + 1,
Err(idx) => idx,
},
};
let end_idx = match range.end_bound() {
Unbounded => self.0.len(),
Included(k) => match binary_search(k) {
Ok(idx) => idx + 1,
Err(idx) => idx,
},
Excluded(k) => binary_search(k).unwrap_or_else(std::convert::identity),
};
&self.0[start_idx..end_idx]
}
/// Add a key value pair to the map.
/// If [`key`] is less than or equal to the current maximum key
/// the pair will not be added and InvalidKey error will be returned.
pub fn append(&mut self, key: K, value: V) -> Result<(), InvalidKey> {
if let Some((last_key, _last_value)) = self.0.last() {
if &key <= last_key {
return Err(InvalidKey);
}
}
self.0.push((key, value));
Ok(())
}
/// Update the maximum key value pair or add a new key value pair to the map.
/// If [`key`] is less than the current maximum key no updates or additions
/// will occur and InvalidKey error will be returned.
pub fn append_or_update_last(&mut self, key: K, mut value: V) -> Result<Option<V>, InvalidKey> {
if let Some((last_key, last_value)) = self.0.last_mut() {
match key.cmp(last_key) {
Ordering::Less => return Err(InvalidKey),
Ordering::Equal => {
std::mem::swap(last_value, &mut value);
return Ok(Some(value));
}
Ordering::Greater => {}
}
}
self.0.push((key, value));
Ok(None)
}
/// Split the map into two.
///
/// The left map contains everything before [`cutoff`] (exclusive).
/// Right map contains [`cutoff`] and everything after (inclusive).
pub fn split_at(&self, cutoff: &K) -> (Self, Self)
where
K: Clone,
V: Clone,
{
let split_idx = self
.0
.binary_search_by_key(&cutoff, extract_key)
.unwrap_or_else(std::convert::identity);
(
VecMap(self.0[..split_idx].to_vec()),
VecMap(self.0[split_idx..].to_vec()),
)
}
/// Move items from [`other`] to the end of [`self`], leaving [`other`] empty.
/// If any keys in [`other`] is less than or equal to any key in [`self`],
/// [`InvalidKey`] error will be returned and no mutation will occur.
pub fn extend(&mut self, other: &mut Self) -> Result<(), InvalidKey> {
let self_last_opt = self.0.last().map(extract_key);
let other_first_opt = other.0.last().map(extract_key);
if let (Some(self_last), Some(other_first)) = (self_last_opt, other_first_opt) {
if self_last >= other_first {
return Err(InvalidKey);
}
}
self.0.append(&mut other.0);
Ok(())
}
}
fn extract_key<K, V>(entry: &(K, V)) -> &K {
&entry.0
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, ops::Bound};
use super::VecMap;
#[test]
fn unbounded_range() {
let mut vec = VecMap::default();
vec.append(0, ()).unwrap();
assert_eq!(vec.slice_range(0..0), &[]);
}
#[test]
#[should_panic]
fn invalid_ordering_range() {
let mut vec = VecMap::default();
vec.append(0, ()).unwrap();
#[allow(clippy::reversed_empty_ranges)]
vec.slice_range(1..0);
}
#[test]
fn range_tests() {
let mut vec = VecMap::default();
vec.append(0, ()).unwrap();
vec.append(2, ()).unwrap();
vec.append(4, ()).unwrap();
assert_eq!(vec.slice_range(0..0), &[]);
assert_eq!(vec.slice_range(0..1), &[(0, ())]);
assert_eq!(vec.slice_range(0..2), &[(0, ())]);
assert_eq!(vec.slice_range(0..3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..0), &[]);
assert_eq!(vec.slice_range(..1), &[(0, ())]);
assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(0..=0), &[(0, ())]);
assert_eq!(vec.slice_range(0..=1), &[(0, ())]);
assert_eq!(vec.slice_range(0..=2), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(0..=3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..=0), &[(0, ())]);
assert_eq!(vec.slice_range(..=1), &[(0, ())]);
assert_eq!(vec.slice_range(..=2), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..=3), &[(0, ()), (2, ())]);
}
struct BoundIter {
min: i32,
max: i32,
next: Option<Bound<i32>>,
}
impl BoundIter {
fn new(min: i32, max: i32) -> Self {
Self {
min,
max,
next: Some(Bound::Unbounded),
}
}
}
impl Iterator for BoundIter {
type Item = Bound<i32>;
fn next(&mut self) -> Option<Self::Item> {
let cur = self.next?;
self.next = match &cur {
Bound::Unbounded => Some(Bound::Included(self.min)),
Bound::Included(x) => {
if *x >= self.max {
Some(Bound::Excluded(self.min))
} else {
Some(Bound::Included(x + 1))
}
}
Bound::Excluded(x) => {
if *x >= self.max {
None
} else {
Some(Bound::Excluded(x + 1))
}
}
};
Some(cur)
}
}
#[test]
fn range_exhaustive() {
let map: BTreeMap<i32, ()> = (1..=7).step_by(2).map(|x| (x, ())).collect();
let mut vec = VecMap::default();
for &key in map.keys() {
vec.append(key, ()).unwrap();
}
const RANGE_MIN: i32 = 0;
const RANGE_MAX: i32 = 8;
for lower_bound in BoundIter::new(RANGE_MIN, RANGE_MAX) {
let ub_min = match lower_bound {
Bound::Unbounded => RANGE_MIN,
Bound::Included(x) => x,
Bound::Excluded(x) => x + 1,
};
for upper_bound in BoundIter::new(ub_min, RANGE_MAX) {
let map_range: Vec<(i32, ())> = map
.range((lower_bound, upper_bound))
.map(|(&x, _)| (x, ()))
.collect();
let vec_slice = vec.slice_range((lower_bound, upper_bound));
assert_eq!(map_range, vec_slice);
}
}
}
#[test]
fn extend() {
let mut left = VecMap::default();
left.append(0, ()).unwrap();
assert_eq!(left.as_slice(), &[(0, ())]);
let mut empty = VecMap::default();
left.extend(&mut empty).unwrap();
assert_eq!(left.as_slice(), &[(0, ())]);
assert_eq!(empty.as_slice(), &[]);
let mut right = VecMap::default();
right.append(1, ()).unwrap();
left.extend(&mut right).unwrap();
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
assert_eq!(right.as_slice(), &[]);
let mut zero_map = VecMap::default();
zero_map.append(0, ()).unwrap();
left.extend(&mut zero_map).unwrap_err();
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
assert_eq!(zero_map.as_slice(), &[(0, ())]);
let mut one_map = VecMap::default();
one_map.append(1, ()).unwrap();
left.extend(&mut one_map).unwrap_err();
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
assert_eq!(one_map.as_slice(), &[(1, ())]);
}
}