Compare commits

..

10 Commits

Author SHA1 Message Date
Patrick Insinger
3e1e934a21 use copy_split in freeze 2021-10-05 13:14:19 -07:00
Patrick Insinger
2ccc8f3e36 OrderedVec copy_prefix 2021-10-05 12:57:58 -07:00
Patrick Insinger
77722b4c9d skip a block when larger lsn is found 2021-10-05 12:52:32 -07:00
Patrick Insinger
7ea2251f02 switch to HashMap for block indexing 2021-10-05 12:46:47 -07:00
Patrick Insinger
896a172d46 OrderedVec - copy_split for segsizes 2021-10-05 11:49:13 -07:00
Patrick Insinger
b7c3f02ed1 page versions 2021-10-05 11:20:40 -07:00
Patrick Insinger
98f58fa2ab use for seqsizes in inmemorylayer 2021-10-05 11:18:29 -07:00
Patrick Insinger
adf4ac0ef7 range fixes 2021-10-05 11:18:29 -07:00
Patrick Insinger
9dd1393f0e page version meta 2021-10-05 11:18:27 -07:00
Patrick Insinger
935984018d ordered-vec and seqsizes 2021-10-05 11:14:09 -07:00
38 changed files with 851 additions and 1113 deletions

View File

@@ -24,12 +24,6 @@ jobs:
# A job to build postgres
build-postgres:
executor: zenith-build-executor
parameters:
build_type:
type: enum
enum: ["debug", "release"]
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
# Checkout the git repo (circleci doesn't have a flag to enable submodules here)
- checkout
@@ -45,7 +39,7 @@ jobs:
name: Restore postgres cache
keys:
# Restore ONLY if the rev key matches exactly
- v04-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
- v03-postgres-cache-{{ checksum "/tmp/cache-key-postgres" }}
# FIXME We could cache our own docker container, instead of installing packages every time.
- run:
@@ -65,12 +59,12 @@ jobs:
if [ ! -e tmp_install/bin/postgres ]; then
# "depth 1" saves some time by not cloning the whole repo
git submodule update --init --depth 1
make postgres -j8
make postgres
fi
- save_cache:
name: Save postgres cache
key: v04-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
key: v03-postgres-cache-{{ checksum "/tmp/cache-key-postgres" }}
paths:
- tmp_install
@@ -102,7 +96,7 @@ jobs:
name: Restore postgres cache
keys:
# Restore ONLY if the rev key matches exactly
- v04-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
- v03-postgres-cache-{{ checksum "/tmp/cache-key-postgres" }}
- restore_cache:
name: Restore rust cache
@@ -334,18 +328,14 @@ workflows:
build_and_test:
jobs:
- check-codestyle
- build-postgres:
name: build-postgres-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
- build-postgres
- build-zenith:
name: build-zenith-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
requires:
- build-postgres-<< matrix.build_type >>
- build-postgres
- run-pytest:
name: pg_regress-tests-<< matrix.build_type >>
matrix:

View File

@@ -11,8 +11,4 @@ test_output
.vscode
.zenith
integration_tests/.zenith
.mypy_cache
Dockerfile
.dockerignore
.mypy_cache

3
Cargo.lock generated
View File

@@ -2339,7 +2339,6 @@ dependencies = [
"byteorder",
"bytes",
"clap",
"const_format",
"crc32c",
"daemonize",
"fs2",
@@ -2359,7 +2358,6 @@ dependencies = [
"tokio-stream",
"walkdir",
"workspace_hack",
"zenith_metrics",
"zenith_utils",
]
@@ -2592,7 +2590,6 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"crossbeam-utils",
"hex",
"hex-literal",
"hyper",

View File

@@ -10,7 +10,6 @@ FROM zenithdb/build:buster AS pg-build
WORKDIR /zenith
COPY ./vendor/postgres vendor/postgres
COPY ./Makefile Makefile
ENV BUILD_TYPE release
RUN make -j $(getconf _NPROCESSORS_ONLN) -s postgres
RUN rm -rf postgres_install/build

View File

@@ -6,18 +6,6 @@ else
SECCOMP =
endif
#
# We differentiate between release / debug build types using the BUILD_TYPE
# environment variable.
#
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug
PG_CFLAGS = -O2 -g3 ${CFLAGS}
else
PG_CONFIGURE_OPTS = --enable-debug --enable-cassert --enable-depend
PG_CFLAGS = -O0 -g3 ${CFLAGS}
endif
#
# Top level Makefile to build Zenith and PostgreSQL
#
@@ -42,8 +30,10 @@ tmp_install/build/config.status:
+@echo "Configuring postgres build"
mkdir -p tmp_install/build
(cd tmp_install/build && \
../../vendor/postgres/configure CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
../../vendor/postgres/configure CFLAGS='-O0 -g3 $(CFLAGS)' \
--enable-cassert \
--enable-debug \
--enable-depend \
$(SECCOMP) \
--prefix=$(abspath tmp_install) > configure.log)

View File

@@ -419,6 +419,7 @@ fn create_timeline(
let timelinedir = conf.timeline_path(&timelineid, tenantid);
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);

View File

@@ -18,13 +18,11 @@ use lazy_static::lazy_static;
use log::*;
use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize};
use zenith_utils::batch_fsync::batch_fsync;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashSet};
use std::convert::TryInto;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::Bound::Included;
@@ -32,6 +30,7 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use std::{fs, thread};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
@@ -58,7 +57,6 @@ mod image_layer;
mod inmemory_layer;
mod interval_tree;
mod layer_map;
mod page_versions;
mod storage_layer;
use delta_layer::DeltaLayer;
@@ -112,9 +110,6 @@ lazy_static! {
.expect("failed to define a metric");
}
/// The name of the metadata file pageserver creates per timeline.
pub const METADATA_FILE_NAME: &str = "metadata";
///
/// Repository consists of multiple timelines. Keep them in a hash table.
///
@@ -256,21 +251,25 @@ impl LayeredRepository {
)?;
// List the layers on disk, and load them into the layer map
let _loaded_layers = timeline.load_layer_map(disk_consistent_lsn)?;
if self.upload_relishes {
schedule_timeline_upload(());
// schedule_timeline_upload(
// self.tenantid,
// timelineid,
// loaded_layers,
// disk_consistent_lsn,
// );
}
timeline.load_layer_map(disk_consistent_lsn)?;
// needs to be after load_layer_map
timeline.init_current_logical_size()?;
let timeline = Arc::new(timeline);
// Load any new WAL after the last checkpoint into memory.
info!(
"Loading WAL for timeline {} starting at {}",
timelineid,
timeline.get_last_record_lsn()
);
if cfg!(debug_assertions) {
// check again after wal loading
Self::assert_size_calculation_matches_offloaded(Arc::clone(&timeline));
}
timelines.insert(timelineid, timeline.clone());
Ok(timeline)
}
@@ -364,8 +363,9 @@ impl LayeredRepository {
tenantid: ZTenantId,
data: &TimelineMetadata,
first_save: bool,
) -> Result<()> {
let path = metadata_path(conf, timelineid, tenantid);
) -> Result<PathBuf> {
let timeline_path = conf.timeline_path(&timelineid, &tenantid);
let path = timeline_path.join("metadata");
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = OpenOptions::new()
.write(true)
@@ -389,15 +389,11 @@ impl LayeredRepository {
// fsync the parent directory to ensure the directory entry is durable
if first_save {
let timeline_dir = File::open(
&path
.parent()
.expect("Metadata should always have a parent dir"),
)?;
let timeline_dir = File::open(&timeline_path)?;
timeline_dir.sync_all()?;
}
Ok(())
Ok(path)
}
fn load_metadata(
@@ -405,7 +401,7 @@ impl LayeredRepository {
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> Result<TimelineMetadata> {
let path = metadata_path(conf, timelineid, tenantid);
let path = conf.timeline_path(&timelineid, &tenantid).join("metadata");
let metadata_bytes = std::fs::read(&path)?;
ensure!(metadata_bytes.len() == METADATA_MAX_SAFE_SIZE);
@@ -485,7 +481,7 @@ impl LayeredRepository {
let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?;
if let Some(ancestor_timeline) = &timeline.ancestor_timeline {
// If target_timeline is specified, we only need to know branchpoints of its children
// If target_timeline is specified, we only need to know branchpoints of its childs
if let Some(timelineid) = target_timelineid {
if ancestor_timeline.timelineid == timelineid {
all_branchpoints
@@ -539,6 +535,24 @@ impl LayeredRepository {
totals.elapsed = now.elapsed();
Ok(totals)
}
fn assert_size_calculation_matches(incremental: usize, timeline: &LayeredTimeline) {
match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) {
Ok(non_incremental) => {
if incremental != non_incremental {
error!("timeline size calculation diverged, incremental doesn't match non incremental. incremental={} non_incremental={}", incremental, non_incremental);
}
}
Err(e) => error!("failed to calculate non incremental timeline size: {:#}", e),
}
}
fn assert_size_calculation_matches_offloaded(timeline: Arc<LayeredTimeline>) {
let incremental = timeline.get_current_logical_size();
thread::spawn(move || {
Self::assert_size_calculation_matches(incremental, &timeline);
});
}
}
/// Metadata stored on disk for each timeline
@@ -1039,22 +1053,21 @@ impl LayeredTimeline {
}
///
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
/// Scan the timeline directory to populate the layer map
///
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<Vec<PathBuf>> {
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
info!(
"loading layer map for timeline {} into memory",
self.timelineid
);
let mut layers = self.layers.lock().unwrap();
let (imgfilenames, deltafilenames) =
let (imgfilenames, mut deltafilenames) =
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
let mut local_layers = Vec::with_capacity(imgfilenames.len() + deltafilenames.len());
// First create ImageLayer structs for each image file.
for filename in &imgfilenames {
for filename in imgfilenames.iter() {
if filename.lsn > disk_consistent_lsn {
warn!(
"found future image layer {} on timeline {}",
@@ -1073,11 +1086,15 @@ impl LayeredTimeline {
layer.get_start_lsn(),
self.timelineid
);
local_layers.push(layer.path());
layers.insert_historic(Arc::new(layer));
}
for filename in &deltafilenames {
// Then for the Delta files. The delta files are created in order starting
// from the oldest file, because each DeltaLayer needs a reference to its
// predecessor.
deltafilenames.sort();
for filename in deltafilenames.iter() {
ensure!(filename.start_lsn < filename.end_lsn);
if filename.end_lsn > disk_consistent_lsn {
warn!(
@@ -1089,18 +1106,32 @@ impl LayeredTimeline {
continue;
}
let layer = DeltaLayer::new(self.conf, self.timelineid, self.tenantid, filename);
let predecessor = layers.get(&filename.seg, filename.start_lsn);
let predecessor_str: String = if let Some(prec) = &predecessor {
prec.filename().display().to_string()
} else {
"none".to_string()
};
let layer = DeltaLayer::new(
self.conf,
self.timelineid,
self.tenantid,
filename,
predecessor,
);
info!(
"found layer {} on timeline {}",
"found layer {} on timeline {}, predecessor: {}",
layer.filename().display(),
self.timelineid,
predecessor_str,
);
local_layers.push(layer.path());
layers.insert_historic(Arc::new(layer));
}
Ok(local_layers)
Ok(())
}
///
@@ -1270,7 +1301,7 @@ impl LayeredTimeline {
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn + 1;
start_lsn = self.ancestor_lsn;
trace!(
"creating layer for write for {} at branch point {}/{}",
seg,
@@ -1347,6 +1378,8 @@ impl LayeredTimeline {
last_record_lsn
);
let timeline_dir = File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
// Take the in-memory layer with the oldest WAL record. If it's older
// than the threshold, write it out to disk as a new image and delta file.
// Repeat until all remaining in-memory layers are within the threshold.
@@ -1359,7 +1392,7 @@ impl LayeredTimeline {
let mut disk_consistent_lsn = last_record_lsn;
let mut created_historics = false;
let mut layer_uploads = Vec::new();
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
@@ -1420,14 +1453,22 @@ impl LayeredTimeline {
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
// If we created a successor InMemoryLayer, its predecessor is
// currently the frozen layer. We need to update the predecessor
// to be the latest on-disk layer.
if let Some(last_historic) = new_historics.last() {
if let Some(new_open) = &maybe_new_open {
let maybe_old_predecessor =
new_open.update_predecessor(Arc::clone(last_historic));
let old_predecessor = maybe_old_predecessor
.expect("new_open should always be a successor to frozen");
assert!(layer_ptr_eq(frozen.as_ref(), old_predecessor.as_ref()));
}
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
// Add the historics to the LayerMap
for n in new_historics {
layers.insert_historic(n);
}
}
@@ -1443,9 +1484,7 @@ impl LayeredTimeline {
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
batch_fsync(&layer_uploads)?;
layer_uploads.pop().unwrap();
timeline_dir.sync_all()?;
}
// Save the metadata, with updated 'disk_consistent_lsn', to a
@@ -1472,7 +1511,7 @@ impl LayeredTimeline {
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
LayeredRepository::save_metadata(
let _metadata_path = LayeredRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
@@ -1481,10 +1520,12 @@ impl LayeredTimeline {
)?;
if self.upload_relishes {
schedule_timeline_upload(())
// schedule_timeline_upload(
// self.tenantid,
// self.timelineid,
// layer_uploads,
// schedule_timeline_upload(LocalTimeline {
// tenant_id: self.tenantid,
// timeline_id: self.timelineid,
// metadata_path,
// image_layers: image_layer_uploads,
// delta_layers: delta_layer_uploads,
// disk_consistent_lsn,
// });
}
@@ -1731,20 +1772,12 @@ impl LayeredTimeline {
loop {
match layer_ref.get_page_reconstruct_data(blknum, curr_lsn, &mut data)? {
PageReconstructResult::Complete => break,
PageReconstructResult::Continue(cont_lsn) => {
PageReconstructResult::Continue(cont_lsn, cont_layer) => {
// Fetch base image / more WAL from the returned predecessor layer
if let Some((cont_layer, cont_lsn)) = self.get_layer_for_read(seg, cont_lsn)? {
layer_arc = cont_layer;
layer_ref = &*layer_arc;
curr_lsn = cont_lsn;
continue;
} else {
bail!(
"could not find predecessor layer of segment {} at {}",
seg.rel,
cont_lsn
);
}
layer_arc = cont_layer;
layer_ref = &*layer_arc;
curr_lsn = cont_lsn;
continue;
}
PageReconstructResult::Missing(lsn) => {
// Oops, we could not reconstruct the page.
@@ -1917,13 +1950,15 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
Ok(())
}
fn metadata_path(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> PathBuf {
conf.timeline_path(&timelineid, &tenantid)
.join(METADATA_FILE_NAME)
/// Check for equality of Layer memory addresses
fn layer_ptr_eq(l1: &dyn Layer, l2: &dyn Layer) -> bool {
let l1_ptr = l1 as *const dyn Layer;
let l2_ptr = l2 as *const dyn Layer;
// comparing *const dyn Layer will not only check for data address equality,
// but also for vtable address equality.
// to avoid this, we compare *const ().
// see here for more https://github.com/rust-lang/rust/issues/46139
std::ptr::eq(l1_ptr as *const (), l2_ptr as *const ())
}
/// Add a suffix to a layer file's name: .{num}.old

View File

@@ -45,10 +45,9 @@ use crate::layered_repository::storage_layer::{
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use anyhow::{bail, Result};
use log::*;
use serde::{Deserialize, Serialize};
use zenith_utils::vec_map::VecMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
@@ -57,12 +56,13 @@ use std::fs::File;
use std::io::{BufWriter, Write};
use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use bookfile::{Book, BookWriter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::ordered_vec::OrderedVec;
use super::blob::{read_blob, BlobRange};
@@ -131,6 +131,9 @@ pub struct DeltaLayer {
dropped: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
inner: Mutex<DeltaLayerInner>,
}
@@ -141,10 +144,10 @@ pub struct DeltaLayerInner {
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_version_metas: VecMap<(u32, Lsn), BlobRange>,
page_version_metas: OrderedVec<(u32, Lsn), BlobRange>,
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: VecMap<Lsn, u32>,
relsizes: OrderedVec<Lsn, u32>,
}
impl Layer for DeltaLayer {
@@ -169,7 +172,29 @@ impl Layer for DeltaLayer {
}
fn filename(&self) -> PathBuf {
PathBuf::from(self.layer_name().to_string())
PathBuf::from(
DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
}
.to_string(),
)
}
fn path(&self) -> Option<PathBuf> {
Some(Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
},
))
}
/// Look up given page in the cache.
@@ -193,12 +218,11 @@ impl Layer for DeltaLayer {
// Scan the metadata BTreeMap backwards, starting from the given entry.
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let iter = inner
let mut iter = inner
.page_version_metas
.slice_range((Included(&minkey), Included(&maxkey)))
.iter()
.rev();
for ((_blknum, _lsn), blob_range) in iter {
.range((Included(&minkey), Included(&maxkey)))
.iter();
while let Some(((_blknum, _entry_lsn), blob_range)) = iter.next_back() {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
if let Some(img) = pv.page_image {
@@ -224,9 +248,16 @@ impl Layer for DeltaLayer {
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
// caller know about the predecessor layer.
if need_image {
Ok(PageReconstructResult::Continue(self.start_lsn))
if let Some(cont_layer) = &self.predecessor {
Ok(PageReconstructResult::Continue(
self.start_lsn,
Arc::clone(cont_layer),
))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
} else {
Ok(PageReconstructResult::Complete)
}
@@ -235,22 +266,21 @@ impl Layer for DeltaLayer {
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
);
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.load()?;
let slice = inner
.relsizes
.slice_range((Included(&Lsn(0)), Included(&lsn)));
let slice = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
let result;
if let Some((_entry_lsn, entry)) = slice.last() {
Ok(*entry)
result = *entry;
// Use the base image if needed
} else if let Some(predecessor) = &self.predecessor {
result = predecessor.get_seg_size(lsn)?;
} else {
Err(anyhow::anyhow!("could not find seg size in delta layer"))
result = 0;
}
Ok(result)
}
/// Does this segment exist at given LSN?
@@ -270,15 +300,17 @@ impl Layer for DeltaLayer {
///
fn unload(&self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.page_version_metas = VecMap::default();
inner.relsizes = VecMap::default();
inner.page_version_metas = OrderedVec::default();
inner.relsizes = OrderedVec::default();
inner.loaded = false;
Ok(())
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
if let Some(path) = self.path() {
fs::remove_file(path)?;
}
Ok(())
}
@@ -295,13 +327,13 @@ impl Layer for DeltaLayer {
println!("--- relsizes ---");
let inner = self.load()?;
for (k, v) in inner.relsizes.as_slice() {
for (k, v) in inner.relsizes.iter() {
println!(" {}: {}", k, v);
}
println!("--- page versions ---");
let (_path, book) = self.open_book()?;
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
for ((blk, lsn), blob_range) in inner.page_version_metas.iter() {
let mut desc = String::new();
let buf = read_blob(&chapter, blob_range)?;
@@ -358,13 +390,10 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
predecessor: Option<Arc<dyn Layer>>,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
relsizes: VecMap<Lsn, u32>,
relsizes: OrderedVec<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
}
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -375,14 +404,17 @@ impl DeltaLayer {
dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: true,
page_version_metas: VecMap::default(),
page_version_metas: OrderedVec::default(), // TODO create with a size estimate
relsizes,
}),
predecessor,
};
let mut inner = delta_layer.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
let path = delta_layer.path();
let path = delta_layer
.path()
.expect("DeltaLayer is supposed to have a layer path on disk");
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
@@ -396,23 +428,20 @@ impl DeltaLayer {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
inner
.page_version_metas
.append((blknum, lsn), blob_range)
.unwrap();
inner.page_version_metas.append((blknum, lsn), blob_range);
}
let book = page_version_writer.close()?;
// Write out page versions
let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER);
let buf = VecMap::ser(&inner.page_version_metas)?;
let buf = OrderedVec::ser(&inner.page_version_metas)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
// and relsizes to separate chapter
let mut chapter = book.new_chapter(REL_SIZES_CHAPTER);
let buf = VecMap::ser(&inner.relsizes)?;
let buf = OrderedVec::ser(&inner.relsizes)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
@@ -431,7 +460,8 @@ impl DeltaLayer {
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
book.close()?;
let writer = book.close()?;
writer.get_ref().sync_all()?;
trace!("saved {}", &path.display());
@@ -445,7 +475,12 @@ impl DeltaLayer {
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
&DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
},
);
let file = File::open(&path)?;
@@ -493,10 +528,10 @@ impl DeltaLayer {
}
let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?;
let page_version_metas = VecMap::des(&chapter)?;
let page_version_metas = OrderedVec::des(&chapter)?;
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
let relsizes = VecMap::des(&chapter)?;
let relsizes = OrderedVec::des(&chapter)?;
debug!("loaded from {}", &path.display());
@@ -515,6 +550,7 @@ impl DeltaLayer {
timelineid: ZTimelineId,
tenantid: ZTenantId,
filename: &DeltaFileName,
predecessor: Option<Arc<dyn Layer>>,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -526,9 +562,10 @@ impl DeltaLayer {
dropped: filename.dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
page_version_metas: OrderedVec::default(),
relsizes: OrderedVec::default(),
}),
predecessor,
}
}
@@ -549,28 +586,10 @@ impl DeltaLayer {
dropped: summary.dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
page_version_metas: OrderedVec::default(),
relsizes: OrderedVec::default(),
}),
predecessor: None,
})
}
fn layer_name(&self) -> DeltaFileName {
DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
}
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
)
}
}

View File

@@ -13,8 +13,6 @@ use anyhow::Result;
use log::*;
use zenith_utils::lsn::Lsn;
use super::METADATA_FILE_NAME;
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct DeltaFileName {
@@ -37,7 +35,7 @@ impl DeltaFileName {
/// Parse a string as a delta file name. Returns None if the filename does not
/// match the expected pattern.
///
pub fn parse_str(fname: &str) -> Option<Self> {
pub fn from_str(fname: &str) -> Option<Self> {
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
@@ -170,7 +168,7 @@ impl ImageFileName {
/// Parse a string as an image file name. Returns None if the filename does not
/// match the expected pattern.
///
pub fn parse_str(fname: &str) -> Option<Self> {
pub fn from_str(fname: &str) -> Option<Self> {
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
@@ -288,11 +286,15 @@ pub fn list_files(
let fname = direntry?.file_name();
let fname = fname.to_str().unwrap();
if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
if let Some(deltafilename) = DeltaFileName::from_str(fname) {
deltafiles.push(deltafilename);
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
} else if let Some(imgfilename) = ImageFileName::from_str(fname) {
imgfiles.push(imgfilename);
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
} else if fname == "wal"
|| fname == "metadata"
|| fname == "ancestor"
|| fname.ends_with(".old")
{
// ignore these
} else {
warn!("unrecognized filename in timeline dir: {}", fname);

View File

@@ -114,7 +114,25 @@ pub struct ImageLayerInner {
impl Layer for ImageLayer {
fn filename(&self) -> PathBuf {
PathBuf::from(self.layer_name().to_string())
PathBuf::from(
ImageFileName {
seg: self.seg,
lsn: self.lsn,
}
.to_string(),
)
}
fn path(&self) -> Option<PathBuf> {
Some(Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&ImageFileName {
seg: self.seg,
lsn: self.lsn,
},
))
}
fn get_timeline_id(&self) -> ZTimelineId {
@@ -204,7 +222,9 @@ impl Layer for ImageLayer {
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
if let Some(path) = self.path() {
fs::remove_file(path)?;
}
Ok(())
}
@@ -280,7 +300,9 @@ impl ImageLayer {
let inner = layer.inner.lock().unwrap();
// Write the images into a file
let path = layer.path();
let path = layer
.path()
.expect("ImageLayer is supposed to have a layer path on disk");
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
@@ -315,9 +337,10 @@ impl ImageLayer {
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
book.close()?;
let writer = book.close()?;
writer.get_ref().sync_all()?;
trace!("saved {}", path.display());
trace!("saved {}", &path.display());
drop(inner);
@@ -422,7 +445,15 @@ impl ImageLayer {
}
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
let path = self.path();
let path = Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&ImageFileName {
seg: self.seg,
lsn: self.lsn,
},
);
let file = File::open(&path)?;
let book = Book::new(file)?;
@@ -469,21 +500,4 @@ impl ImageLayer {
}),
})
}
fn layer_name(&self) -> ImageFileName {
ImageFileName {
seg: self.seg,
lsn: self.lsn,
}
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
)
}
}

View File

@@ -12,19 +12,20 @@ use crate::layered_repository::{DeltaLayer, ImageLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::cmp::min;
use std::collections::HashMap;
use std::ops::Bound::Included;
use std::ops::RangeBounds;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::vec_map::VecMap;
use zenith_utils::ordered_vec::OrderedVec;
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn;
use super::page_versions::PageVersions;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -46,9 +47,83 @@ pub struct InMemoryLayer {
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: RwLock<InMemoryLayerInner>,
}
/// Predecessor layer might be needed?
incremental: bool,
#[derive(Default)]
struct PageVersions(HashMap<u32, OrderedVec<Lsn, PageVersion>>);
impl PageVersions {
fn range<R: RangeBounds<Lsn>>(&self, blknum: u32, lsn_range: R) -> &[(Lsn, PageVersion)] {
match self.0.get(&blknum) {
Some(ov) => ov.range(lsn_range),
None => &[],
}
}
fn update(&mut self, blknum: u32, lsn: Lsn, page_version: PageVersion) {
let ordered_vec = self.0.entry(blknum).or_insert_with(OrderedVec::default);
ordered_vec.append_update(lsn, page_version);
}
fn ordered_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedBlockIter {
let mut block_numbers: Vec<u32> = self.0.keys().cloned().collect();
// TODO consider counting sort given the small size of the key space
block_numbers.sort_unstable();
let cur_idx = 0;
let ordered_vec_iter = block_numbers
.get(cur_idx)
.map(|blk_num| self.0.get(blk_num).unwrap().iter())
.unwrap_or_else(|| [].iter());
OrderedBlockIter {
page_versions: self,
cutoff_lsn,
block_numbers,
cur_idx,
ordered_vec_iter,
}
}
fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
struct OrderedBlockIter<'a> {
page_versions: &'a PageVersions,
cutoff_lsn: Option<Lsn>,
block_numbers: Vec<u32>,
cur_idx: usize,
ordered_vec_iter: std::slice::Iter<'a, (Lsn, PageVersion)>,
}
impl<'a> Iterator for OrderedBlockIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.ordered_vec_iter.next() {
if self
.cutoff_lsn
.as_ref()
.map(|cutoff| lsn < cutoff)
.unwrap_or(true)
{
let blk_num = self.block_numbers[self.cur_idx];
return Some((blk_num, *lsn, page_version));
}
}
let blk_num = self.block_numbers.get(self.cur_idx + 1)?;
self.cur_idx += 1;
self.ordered_vec_iter = self.page_versions.0.get(blk_num).unwrap().iter();
}
}
}
pub struct InMemoryLayerInner {
@@ -64,15 +139,14 @@ pub struct InMemoryLayerInner {
///
/// `segsizes` tracks the size of the segment at different points in time.
///
/// For a blocky rel, there is always one entry, at the layer's start_lsn,
/// so that determining the size never depends on the predecessor layer. For
/// a non-blocky rel, 'segsizes' is not used and is always empty.
///
segsizes: VecMap<Lsn, u32>,
segsizes: OrderedVec<Lsn, u32>,
/// Writes are only allowed when true.
/// Set to false when this layer is in the process of being replaced.
writeable: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
}
impl InMemoryLayerInner {
@@ -86,13 +160,12 @@ impl InMemoryLayerInner {
fn get_seg_size(&self, lsn: Lsn) -> u32 {
// Scan the BTreeMap backwards, starting from the given entry.
let slice = self.segsizes.slice_range(..=lsn);
let slice = self.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
// We make sure there is always at least one entry
if let Some((_entry_lsn, entry)) = slice.last() {
*entry
} else {
panic!("could not find seg size in in-memory layer");
0
}
}
}
@@ -124,6 +197,10 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn path(&self) -> Option<PathBuf> {
None
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -166,16 +243,14 @@ impl Layer for InMemoryLayer {
assert!(self.seg.blknum_in_seg(blknum));
let predecessor: Option<Arc<dyn Layer>>;
{
let inner = self.inner.read().unwrap();
// Scan the page versions backwards, starting from `lsn`.
let iter = inner
.page_versions
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (_entry_lsn, entry) in iter {
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let mut iter = inner.page_versions.range(blknum, ..=lsn).iter();
while let Some((_entry_lsn, entry)) = iter.next_back() {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_image = false;
@@ -192,14 +267,16 @@ impl Layer for InMemoryLayer {
bail!("no page image or WAL record for requested page");
}
}
predecessor = inner.predecessor.clone();
// release lock on 'inner'
}
// If an older page image is needed to reconstruct the page, let the
// caller know
// caller know about the predecessor layer.
if need_image {
if self.incremental {
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
if let Some(cont_layer) = predecessor {
Ok(PageReconstructResult::Continue(self.start_lsn, cont_layer))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
@@ -211,10 +288,6 @@ impl Layer for InMemoryLayer {
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
);
let inner = self.inner.read().unwrap();
Ok(inner.get_seg_size(lsn))
@@ -254,7 +327,8 @@ impl Layer for InMemoryLayer {
}
fn is_incremental(&self) -> bool {
self.incremental
let inner = self.inner.read().unwrap();
inner.predecessor.is_some()
}
/// debugging function to print out the contents of the layer
@@ -272,17 +346,17 @@ impl Layer for InMemoryLayer {
self.timelineid, self.seg, self.start_lsn, end_str
);
for (k, v) in inner.segsizes.as_slice() {
for (k, v) in inner.segsizes.iter() {
println!("segsizes {}: {}", k, v);
}
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
for (blknum, lsn, v) in inner.page_versions.ordered_iter(None) {
println!(
"blk {} at {}: {}/{}\n",
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
v.page_image.is_some(),
v.record.is_some()
);
}
@@ -305,18 +379,6 @@ pub struct FreezeLayers {
pub open: Option<Arc<InMemoryLayer>>,
}
/// A result of an inmemory layer data being written to disk.
pub struct LayersOnDisk {
pub delta_layers: Vec<DeltaLayer>,
pub image_layers: Vec<ImageLayer>,
}
impl LayersOnDisk {
pub fn is_empty(&self) -> bool {
self.delta_layers.is_empty() && self.image_layers.is_empty()
}
}
impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
@@ -345,12 +407,6 @@ impl InMemoryLayer {
start_lsn
);
// The segment is initially empty, so initialize 'segsizes' with 0.
let mut segsizes = VecMap::default();
if seg.rel.is_blocky() {
segsizes.append(start_lsn, 0).unwrap();
}
Ok(InMemoryLayer {
conf,
timelineid,
@@ -359,12 +415,12 @@ impl InMemoryLayer {
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: PageVersions::default(),
segsizes,
segsizes: OrderedVec::default(),
writeable: true,
predecessor: None,
}),
})
}
@@ -412,8 +468,9 @@ impl InMemoryLayer {
inner.check_writeable()?;
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
inner.page_versions.update(blknum, lsn, pv);
/*
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
@@ -421,6 +478,7 @@ impl InMemoryLayer {
self.seg.rel, blknum, lsn
);
}
*/
// Also update the relation size, if this extended the relation.
if self.seg.rel.is_blocky() {
@@ -457,20 +515,20 @@ impl InMemoryLayer {
gapblknum,
blknum
);
let old = inner
.page_versions
.append_or_update_last(gapblknum, lsn, zeropv);
inner.page_versions.update(gapblknum, lsn, zeropv);
// We already had an entry for this LSN. That's odd..
/*
if old.is_some() {
warn!(
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
}
*/
}
inner.segsizes.append_or_update_last(lsn, newsize).unwrap();
inner.segsizes.append_update(lsn, newsize);
return Ok(newsize - oldsize);
}
}
@@ -479,10 +537,6 @@ impl InMemoryLayer {
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
assert!(
self.seg.rel.is_blocky(),
"put_truncation() called on a non-blocky rel"
);
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
@@ -492,12 +546,7 @@ impl InMemoryLayer {
let oldsize = inner.get_seg_size(lsn);
assert!(segsize < oldsize);
let old = inner.segsizes.append_or_update_last(lsn, segsize).unwrap();
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Inserting truncation, but had an entry for the LSN already");
}
inner.segsizes.append_update(lsn, segsize);
Ok(())
}
@@ -543,11 +592,11 @@ impl InMemoryLayer {
start_lsn,
);
// Copy the segment size at the start LSN from the predecessor layer.
let mut segsizes = VecMap::default();
// For convenience, copy the segment size from the predecessor layer
let mut segsizes = OrderedVec::default();
if seg.rel.is_blocky() {
let size = src.get_seg_size(start_lsn)?;
segsizes.append(start_lsn, size).unwrap();
segsizes.append(start_lsn, size);
}
Ok(InMemoryLayer {
@@ -558,12 +607,12 @@ impl InMemoryLayer {
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: PageVersions::default(),
segsizes,
writeable: true,
predecessor: Some(src),
}),
})
}
@@ -614,19 +663,30 @@ impl InMemoryLayer {
// Divide all the page versions into old and new
// at the 'cutoff_lsn' point.
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
let (before_segsizes, after_segsizes) = inner.segsizes.copy_split(&Lsn(cutoff_lsn.0 + 1));
let cutoff_lsn_exclusive = Lsn(cutoff_lsn.0 + 1);
// The iterator is in Lsn order, so the first element will have the smallest Lsn
let mut after_oldest_lsn: Accum<Lsn> =
Accum(after_segsizes.iter().next().map(|(lsn, _size)| *lsn));
let (before_segsizes, mut after_segsizes) = inner.segsizes.split_at(&cutoff_lsn_exclusive);
if let Some((lsn, _size)) = after_segsizes.as_slice().first() {
after_oldest_lsn.accum(min, *lsn);
let mut before_page_versions = PageVersions::default();
let mut after_page_versions = PageVersions::default();
for (blknum, ordered_vec) in inner.page_versions.0.iter() {
let (before_ov, after_ov) = ordered_vec.copy_split(&Lsn(cutoff_lsn.0 + 1));
if let Some((lsn, _pv)) = after_ov.iter().next() {
after_oldest_lsn.accum(min, *lsn);
}
if !before_ov.is_empty() {
before_page_versions.0.insert(*blknum, before_ov);
}
if !after_ov.is_empty() {
after_page_versions.0.insert(*blknum, after_ov);
}
}
let (before_page_versions, after_page_versions) = inner
.page_versions
.split_at(cutoff_lsn_exclusive, &mut after_oldest_lsn);
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
tenantid: self.tenantid,
@@ -635,12 +695,12 @@ impl InMemoryLayer {
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
incremental: self.incremental,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
predecessor: inner.predecessor.clone(),
}),
});
@@ -655,11 +715,8 @@ impl InMemoryLayer {
)?;
let new_inner = new_open.inner.get_mut().unwrap();
// Ensure page_versions doesn't contain anything
// so we can just replace it
assert!(new_inner.page_versions.is_empty());
new_inner.page_versions = after_page_versions;
new_inner.segsizes.extend(&mut after_segsizes).unwrap();
new_inner.segsizes.extend(after_segsizes);
Some(Arc::new(new_open))
} else {
@@ -677,7 +734,7 @@ impl InMemoryLayer {
/// WAL records between start and end LSN. (The delta layer is not needed
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<LayersOnDisk> {
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
self.filename().display(),
@@ -686,7 +743,7 @@ impl InMemoryLayer {
);
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// layer is not writeable anymore, no one should be trying to aquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
@@ -697,6 +754,8 @@ impl InMemoryLayer {
let inner = self.inner.read().unwrap();
assert!(!inner.writeable);
let predecessor = inner.predecessor.clone();
if let Some(drop_lsn) = inner.drop_lsn {
let delta_layer = DeltaLayer::create(
self.conf,
@@ -706,7 +765,8 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
inner.page_versions.ordered_page_version_iter(None),
predecessor,
inner.page_versions.ordered_iter(None),
inner.segsizes.clone(),
)?;
trace!(
@@ -715,20 +775,17 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn
);
return Ok(LayersOnDisk {
delta_layers: vec![delta_layer],
image_layers: Vec::new(),
});
return Ok(vec![Arc::new(delta_layer)]);
}
let end_lsn = self.end_lsn.unwrap();
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
let before_segsizes = inner.segsizes.copy_prefix(&Lsn(end_lsn.0 + 1));
let mut before_page_versions = inner.page_versions.ordered_iter(Some(end_lsn));
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
let mut delta_layers = Vec::new();
if self.start_lsn != end_lsn {
let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1));
// Write the page versions before the cutoff to disk.
let delta_layer = DeltaLayer::create(
self.conf,
@@ -738,10 +795,11 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
false,
predecessor,
before_page_versions,
before_segsizes,
)?;
delta_layers.push(delta_layer);
frozen_layers.push(Arc::new(delta_layer));
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
@@ -756,11 +814,14 @@ impl InMemoryLayer {
// Write a new base image layer at the cutoff point
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
frozen_layers.push(Arc::new(image_layer));
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
Ok(LayersOnDisk {
delta_layers,
image_layers: vec![image_layer],
})
Ok(frozen_layers)
}
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
let mut inner = self.inner.write().unwrap();
inner.predecessor.replace(predecessor)
}
}

View File

@@ -1,182 +0,0 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
#[derive(Debug, Default)]
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
impl PageVersions {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn append_or_update_last(
&mut self,
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> Option<PageVersion> {
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, page_version).unwrap()
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
blknum: u32,
range: R,
) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
}
/// Split the page version map into two.
///
/// Left contains everything up to and not including [`cutoff_lsn`].
/// Right contains [`cutoff_lsn`] and everything after.
pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum<Lsn>) -> (Self, Self) {
let mut before_blocks = HashMap::new();
let mut after_blocks = HashMap::new();
for (blknum, vec_map) in self.0.iter() {
let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn);
if !before_versions.is_empty() {
let old = before_blocks.insert(*blknum, before_versions);
assert!(old.is_none());
}
if !after_versions.is_empty() {
let (first_lsn, _first_pv) = &after_versions.as_slice()[0];
after_oldest_lsn.accum(std::cmp::min, *first_lsn);
let old = after_blocks.insert(*blknum, after_versions);
assert!(old.is_none());
}
}
(Self(before_blocks), Self(after_blocks))
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
.first()
.map(|&blknum| self.get_block_slice(blknum))
.unwrap_or(EMPTY_SLICE);
OrderedPageVersionIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
cur_slice_iter: slice.iter(),
}
}
}
pub struct OrderedPageVersionIter<'a> {
page_versions: &'a PageVersions,
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
}
impl OrderedPageVersionIter<'_> {
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
lsn < cutoff_lsn
} else {
true
}
}
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
}
}
let next_block_idx = self.cur_block_idx + 1;
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
self.cur_block_idx = next_block_idx;
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const EMPTY_PAGE_VERSION: PageVersion = PageVersion {
page_image: None,
record: None,
};
#[test]
fn test_ordered_iter() {
let mut page_versions = PageVersions::default();
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let old = page_versions.append_or_update_last(blknum, Lsn(lsn), EMPTY_PAGE_VERSION);
assert!(old.is_none());
}
}
let mut iter = page_versions.ordered_page_version_iter(None);
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
const CUTOFF_LSN: Lsn = Lsn(30);
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
for blknum in 0..BLOCKS {
for lsn in 0..CUTOFF_LSN.0 {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
}
}

View File

@@ -10,6 +10,7 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
@@ -86,9 +87,9 @@ pub struct PageReconstructData {
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain all the required data, the caller should collect
/// more data from the returned predecessor layer at the returned LSN.
Continue(Lsn, Arc<dyn Layer>),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
@@ -123,6 +124,10 @@ pub trait Layer: Send + Sync {
/// Is the segment represented by this layer dropped by PostgreSQL?
fn is_dropped(&self) -> bool;
/// Gets the physical location of the layer on disk.
/// Some layers, such as in-memory, might not have the location.
fn path(&self) -> Option<PathBuf>;
/// Filename used to store this layer on disk. (Even in-memory layers
/// implement this, to print a handy unique identifier for the layer for
/// log messages, even though they're never not on disk.)
@@ -140,8 +145,8 @@ pub trait Layer: Send + Sync {
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call. If this returns PageReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data'
/// on first call. If this returns PageReconstructResult::Continue, call
/// again on the returned predecessor layer with the same 'reconstruct_data'
/// to collect more data.
fn get_page_reconstruct_data(
&self,

View File

@@ -126,6 +126,10 @@ impl PageServerConf {
self.timeline_path(timelineid, tenantid).join("ancestor")
}
fn wal_dir_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf {
self.timeline_path(timelineid, tenantid).join("wal")
}
//
// Postgres distribution paths
//

View File

@@ -12,12 +12,14 @@ mod rust_s3;
/// local page server layer files with external storage.
mod synced_storage;
use std::{path::Path, thread};
use std::path::Path;
use std::thread;
use anyhow::Context;
use self::local_fs::LocalFs;
pub use self::synced_storage::schedule_timeline_upload;
use self::{local_fs::LocalFs, rust_s3::RustS3};
use crate::relish_storage::rust_s3::RustS3;
use crate::{PageServerConf, RelishStorageKind};
pub fn run_storage_sync_thread(

View File

@@ -5,10 +5,9 @@ use std::path::Path;
use anyhow::Context;
use s3::{bucket::Bucket, creds::Credentials, region::Region};
use crate::{
relish_storage::{strip_workspace_prefix, RelishStorage},
S3Config,
};
use crate::{relish_storage::strip_workspace_prefix, S3Config};
use super::RelishStorage;
const S3_FILE_SEPARATOR: char = '/';

View File

@@ -209,7 +209,7 @@ impl WALRecord {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME};
use crate::layered_repository::LayeredRepository;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use hex_literal::hex;
@@ -728,7 +728,7 @@ mod tests {
repo.create_empty_timeline(TIMELINE_ID)?;
drop(repo);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join("metadata");
assert!(metadata_path.is_file());

View File

@@ -100,6 +100,10 @@ pub fn create_repository_for_tenant(
Ok(())
}
pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc<dyn Repository>) {
access_repository().insert(tenantid, repo);
}
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
access_repository()
.get(&tenantid)

View File

@@ -16,11 +16,14 @@ use log::*;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::cell::Cell;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs;
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
@@ -122,7 +125,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
}
fn walreceiver_main(
_conf: &PageServerConf,
conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
@@ -192,6 +195,7 @@ fn walreceiver_main(
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn;
trace!("received XLogData between {} and {}", startlsn, endlsn);
@@ -232,6 +236,34 @@ fn walreceiver_main(
last_rec_lsn = lsn;
}
// Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each),
// "checkpoint" the repository to flush all the changes from WAL we've processed
// so far to disk. After this, we don't need the original WAL anymore, and it
// can be removed. This is probably too aggressive for production, but it's useful
// to expose bugs now.
//
// TODO: We don't actually dare to remove the WAL. It's useful for debugging,
// and we might it for logical decoding other things in the future. Although
// we should also be able to fetch it back from the WAL safekeepers or S3 if
// needed.
if prev_last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE)
!= last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE)
{
info!("switched segment {} to {}", prev_last_rec_lsn, last_rec_lsn);
let (oldest_segno, newest_segno) = find_wal_file_range(
conf,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
last_rec_lsn,
&tenantid,
)?;
if newest_segno - oldest_segno >= 10 {
// TODO: This is where we could remove WAL older than last_rec_lsn.
//remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?;
}
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {}", endlsn);
caught_up = true;
@@ -253,7 +285,7 @@ fn walreceiver_main(
);
if reply_requested {
Some(last_rec_lsn)
Some(timeline.get_last_record_lsn())
} else {
None
}
@@ -277,6 +309,47 @@ fn walreceiver_main(
Ok(())
}
fn find_wal_file_range(
conf: &PageServerConf,
timeline: &ZTimelineId,
wal_seg_size: usize,
written_upto: Lsn,
tenant: &ZTenantId,
) -> Result<(u64, u64)> {
let written_upto_segno = written_upto.segment_number(wal_seg_size);
let mut oldest_segno = written_upto_segno;
let mut newest_segno = written_upto_segno;
// Scan the wal directory, and count how many WAL filed we could remove
let wal_dir = conf.wal_dir_path(timeline, tenant);
for entry in fs::read_dir(wal_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
continue;
}
let filename = path.file_name().unwrap().to_str().unwrap();
if IsXLogFileName(filename) {
let (segno, _tli) = XLogFromFileName(filename, wal_seg_size);
if segno > written_upto_segno {
// that's strange.
warn!("there is a WAL file from future at {}", path.display());
continue;
}
oldest_segno = min(oldest_segno, segno);
newest_segno = max(newest_segno, segno);
}
}
// FIXME: would be good to assert that there are no gaps in the WAL files
Ok((oldest_segno, newest_segno))
}
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
///
/// See the [postgres docs] for more details.

View File

@@ -33,11 +33,11 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
FROM generate_series(1, 100000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
print('LSN after 200100 rows: ' + lsn_b)
print('LSN after 100100 rows: ' + lsn_b)
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a])
@@ -46,15 +46,15 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
FROM generate_series(1, 100000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
print('LSN after 400100 rows: ' + lsn_c)
print('LSN after 200100 rows: ' + lsn_c)
# Branch at the point where only 200100 rows were inserted
# Branch at the point where only 200 rows were inserted
zenith_cli.run(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b])
pg_hundred = postgres.create_start("test_branch_behind_hundred")
@@ -70,11 +70,11 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
more_pg_conn = pg_more.connect()
more_cur = more_pg_conn.cursor()
more_cur.execute('SELECT count(*) FROM foo')
assert more_cur.fetchone() == (200100, )
assert more_cur.fetchone() == (100100, )
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (400100, )
assert main_cur.fetchone() == (200100, )
# Check bad lsn's for branching

View File

@@ -140,30 +140,6 @@ class ZenithBenchmarker:
re.MULTILINE)
return int(round(float(matches.group(1))))
def get_peak_mem(self, pageserver) -> int:
"""
Fetch the "maxrss" metric from the pageserver
"""
# Fetch all the exposed prometheus metrics from page server
all_metrics = pageserver.http_client().get_metrics()
# See comment in get_io_writes()
matches = re.search(r'^pageserver_maxrss_kb (\S+)$', all_metrics,
re.MULTILINE)
return int(round(float(matches.group(1))))
def get_timeline_size(self, repo_dir: str, tenantid: str, timelineid: str):
"""
Calculate the on-disk size of a timeline
"""
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes
@contextmanager
def record_pageserver_writes(self, pageserver, metric_name):
"""

View File

@@ -794,17 +794,12 @@ def read_pid(path: Path):
return int(path.read_text())
@dataclass
class WalAcceptorPort:
pg: int
http: int
@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
wa_bin_path: Path
data_dir: Path
port: WalAcceptorPort
port: int
num: int # identifier for logging
pageserver_port: int
auth_token: Optional[str] = None
@@ -816,8 +811,7 @@ class WalAcceptor:
cmd = [str(self.wa_bin_path)]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
cmd.extend(["-l", f"localhost:{self.port}"])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
@@ -874,14 +868,14 @@ class WalAcceptor:
# "replication=0" hacks psycopg not to send additional queries
# on startup, see https://github.com/psycopg/psycopg2/pull/482
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
with closing(psycopg2.connect(connstr)) as conn:
# server doesn't support transactions
conn.autocommit = True
with conn.cursor() as cur:
request_json = json.dumps(request)
print(f"JSON_CTRL request on port {self.port.pg}: {request_json}")
print(f"JSON_CTRL request on port {self.port}: {request_json}")
cur.execute("JSON_CTRL " + request_json)
all = cur.fetchall()
print(f"JSON_CTRL response: {all[0][0]}")
@@ -904,10 +898,7 @@ class WalAcceptorFactory:
wa = WalAcceptor(
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=WalAcceptorPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
),
port=self.port_distributor.get_port(),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
@@ -931,7 +922,7 @@ class WalAcceptorFactory:
def get_connstrs(self) -> str:
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances])
return ','.join(["localhost:{}".format(wa.port) for wa in self.instances])
@zenfixture

View File

@@ -4,6 +4,19 @@ from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def get_timeline_size(repo_dir: str, tenantid: str, timelineid: str):
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
if 'wal' in dirs:
dirs.remove('wal') # don't visit 'wal' subdirectory
return totalbytes
#
# Run bulk INSERT test.
#
@@ -12,7 +25,6 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
# 1. Time to INSERT 5 million rows
# 2. Disk writes
# 3. Disk space used
# 4. Peak memory usage
#
def test_bulk_insert(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin, zenith_cli, zenbenchmark, repo_dir: str):
# Create a branch for us
@@ -43,9 +55,6 @@ def test_bulk_insert(postgres: PostgresFactory, pageserver: ZenithPageserver, pg
# time and I/O
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Record peak memory usage
zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(pageserver) / 1024, 'MB')
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
timeline_size = get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')

View File

@@ -4,6 +4,19 @@ from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def get_timeline_size(repo_dir: str, tenantid: str, timelineid: str):
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
if 'wal' in dirs:
dirs.remove('wal') # don't visit 'wal' subdirectory
return totalbytes
#
# Run a very short pgbench test.
#
@@ -51,5 +64,5 @@ def test_pgbench(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
timeline_size = get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')

View File

@@ -1,74 +0,0 @@
# Demonstrate Write Amplification with naive oldest-first layer checkpointing
# algorithm.
#
# In each iteration of the test, we create a new table that's slightly under 10
# MB in size (10 MB is the current "segment size" used by the page server). Then
# we make a tiny update to all the tables already created. This creates a WAL
# pattern where you have a lot of updates on one segment (the newly created
# one), alternating with a small updates on all relations. This is the worst
# case scenario for the naive checkpointing policy where we write out the layers
# in LSN order, writing the oldest layer first. That creates a new 10 MB image
# layer to be created for each of those small updates. This is the Write
# Amplification problem at its finest.
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def test_write_amplification(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin, zenith_cli, zenbenchmark, repo_dir: str):
# Create a branch for us
zenith_cli.run(["branch", "test_write_amplification", "empty"])
pg = postgres.create_start('test_write_amplification')
print("postgres is running on 'test_write_amplification' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect();
pscur = psconn.cursor()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Get the timeline ID of our branch. We need it for the 'do_gc' command
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'):
with zenbenchmark.record_duration('run'):
# NOTE: Because each iteration updates every table already created,
# the runtime and write amplification is O(n^2), where n is the
# number of iterations.
for i in range(25):
cur.execute(f'''
CREATE TABLE tbl{i} AS
SELECT g as i, 'long string to consume some space' || g as t
FROM generate_series(1, 100000) g
''')
cur.execute(f"create index on tbl{i} (i);")
for j in range(1, i):
cur.execute(f"delete from tbl{j} where i = {i}")
# Force checkpointing. As of this writing, we don't have
# a back-pressure mechanism, and the page server cannot
# keep up digesting and checkpointing the WAL at the
# rate that it is generated. If we don't force a
# checkpoint, the WAL will just accumulate in memory
# until you hit OOM error. So in effect, we use much
# more memory to hold the incoming WAL, and write them
# out in larger batches than we'd really want. Using
# more memory hides the write amplification problem this
# test tries to demonstrate.
#
# The write amplification problem is real, and using
# more memory isn't the right solution. We could
# demonstrate the effect also by generating the WAL
# slower, adding some delays in this loop. But forcing
# the the checkpointing and GC makes the test go faster,
# with the same total I/O effect.
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')

View File

@@ -28,11 +28,9 @@ humantime = "2.1.0"
walkdir = "2"
serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3"
const_format = "0.2.21"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { path = "../workspace_hack" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -3,23 +3,18 @@
//
use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::env;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::logging;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
let arg_matches = App::new("Zenith wal_acceptor")
.about("Store WAL stream to local file system and push it to WAL receivers")
.arg(
@@ -30,18 +25,11 @@ fn main() -> Result<()> {
.help("Path to the WAL acceptor data directory"),
)
.arg(
Arg::with_name("listen-pg")
Arg::with_name("listen")
.short("l")
.long("listen-pg")
.alias("listen") // for compatibility
.long("listen")
.takes_value(true)
.help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")),
)
.arg(
Arg::with_name("listen-http")
.long("listen-http")
.takes_value(true)
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("pageserver")
@@ -82,8 +70,7 @@ fn main() -> Result<()> {
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
listen_addr: "localhost:5454".to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
@@ -104,12 +91,8 @@ fn main() -> Result<()> {
conf.daemonize = true;
}
if let Some(addr) = arg_matches.value_of("listen-pg") {
conf.listen_pg_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("listen-http") {
conf.listen_http_addr = addr.to_owned();
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
@@ -131,11 +114,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let log_filename = conf.data_dir.join("wal_acceptor.log");
let (_scope_guard, log_file) = logging::init(log_filename, conf.daemonize)?;
let http_listener = TcpListener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
})?;
if conf.daemonize {
info!("daemonizing...");
@@ -158,16 +136,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let mut threads = Vec::new();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
// No authentication at all: read-only metrics only, early stage.
let router = endpoint::make_router();
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
threads.push(http_endpoint_thread);
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()

View File

@@ -11,23 +11,12 @@ pub mod send_wal;
pub mod timeline;
pub mod wal_service;
pub mod defaults {
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
}
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub listen_addr: String,
pub pageserver_addr: Option<String>,
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
pub pageserver_auth_token: Option<String>,

View File

@@ -42,7 +42,7 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
);
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);
let me_conf: Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
let callme = format!(

View File

@@ -15,11 +15,8 @@ use std::cmp::min;
use std::io;
use std::io::Read;
use lazy_static::lazy_static;
use crate::replication::HotStandbyFeedback;
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::pq_proto::SystemId;
@@ -282,38 +279,6 @@ pub trait Storage {
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
}
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_flush_lsn",
"Current flush_lsn, grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_flush_lsn int gauge vec");
static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_commit_lsn",
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_commit_lsn int gauge vec");
}
struct SafeKeeperMetrics {
flush_lsn: Gauge,
commit_lsn: Gauge,
}
impl SafeKeeperMetrics {
fn new(ztli: ZTimelineId) -> SafeKeeperMetrics {
let ztli_str = format!("{}", ztli);
SafeKeeperMetrics {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&ztli_str]),
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&ztli_str]),
}
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<ST: Storage> {
@@ -321,8 +286,6 @@ pub struct SafeKeeper<ST: Storage> {
/// Established by reading wal.
pub flush_lsn: Lsn,
pub tli: u32,
// Cached metrics so we don't have to recompute labels on each update.
metrics: Option<SafeKeeperMetrics>,
/// not-yet-flushed pairs of same named fields in s.*
pub commit_lsn: Lsn,
pub truncate_lsn: Lsn,
@@ -341,7 +304,6 @@ where
SafeKeeper {
flush_lsn,
tli,
metrics: None,
commit_lsn: state.commit_lsn,
truncate_lsn: state.truncate_lsn,
storage,
@@ -393,8 +355,6 @@ where
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage.persist(&self.s, true)?;
self.metrics = Some(SafeKeeperMetrics::new(self.s.server.ztli));
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
@@ -518,11 +478,6 @@ where
}
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
self.metrics
.as_ref()
.unwrap()
.flush_lsn
.set(u64::from(self.flush_lsn) as f64);
}
// Advance commit_lsn taking into account what we have locally. xxx this
@@ -540,11 +495,6 @@ where
sync_control_file |=
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.as_ref()
.unwrap()
.commit_lsn
.set(u64::from(self.commit_lsn) as f64);
}
self.truncate_lsn = msg.h.truncate_lsn;

View File

@@ -13,9 +13,9 @@ use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: WalAcceptorConf) -> Result<()> {
info!("Starting wal acceptor on {}", conf.listen_pg_addr);
let listener = TcpListener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
info!("Starting wal acceptor on {}", conf.listen_addr);
let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_addr, e);
e
})?;

View File

@@ -5,8 +5,6 @@
use lazy_static::lazy_static;
use once_cell::race::OnceBox;
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};
@@ -23,7 +21,7 @@ pub use wrappers::{CountedReader, CountedWriter};
/// Metrics gathering is a relatively simple and standalone operation, so
/// it might be fine to do it this way to keep things simple.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
update_rusage_metrics();
update_io_metrics();
prometheus::gather()
}
@@ -33,29 +31,16 @@ static COMMON_METRICS_PREFIX: OnceBox<&str> = OnceBox::new();
/// name like 'pageserver'. Should be executed exactly once in the beginning of
/// any executable which uses common metrics.
pub fn set_common_metrics_prefix(prefix: &'static str) {
// Not unwrap() because metrics may be initialized after multiple threads have been started.
COMMON_METRICS_PREFIX
.set(prefix.into())
.unwrap_or_else(|_| {
eprintln!(
"set_common_metrics_prefix() was called second time with '{}', exiting",
prefix
);
std::process::exit(1);
});
COMMON_METRICS_PREFIX.set(prefix.into()).unwrap();
}
/// Prepends a prefix to a common metric name so they are distinguished between
/// different services, see <https://github.com/zenithdb/zenith/pull/681>
/// different services, see https://github.com/zenithdb/zenith/pull/681
/// A call to set_common_metrics_prefix() is necessary prior to calling this.
pub fn new_common_metric_name(unprefixed_metric_name: &str) -> String {
// Not unwrap() because metrics may be initialized after multiple threads have been started.
format!(
"{}_{}",
COMMON_METRICS_PREFIX.get().unwrap_or_else(|| {
eprintln!("set_common_metrics_prefix() was not called, but metrics are used, exiting");
std::process::exit(1);
}),
COMMON_METRICS_PREFIX.get().unwrap(),
unprefixed_metric_name
)
}
@@ -67,11 +52,6 @@ lazy_static! {
&["io_operation"]
)
.expect("Failed to register disk i/o bytes int gauge vec");
static ref MAXRSS_KB: IntGauge = register_int_gauge!(
new_common_metric_name("maxrss_kb"),
"Memory usage (Maximum Resident Set Size)"
)
.expect("Failed to register maxrss_kb int gauge");
}
// Records I/O stats in a "cross-platform" way.
@@ -83,7 +63,7 @@ lazy_static! {
// We know the size of the block, so we can determine the I/O bytes out of it.
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
#[allow(clippy::unnecessary_cast)]
fn update_rusage_metrics() {
fn update_io_metrics() {
let rusage_stats = get_rusage_stats();
const BYTES_IN_BLOCK: i64 = 512;
@@ -93,7 +73,6 @@ fn update_rusage_metrics() {
DISK_IO_BYTES
.with_label_values(&["write"])
.set(rusage_stats.ru_oublock * BYTES_IN_BLOCK);
MAXRSS_KB.set(rusage_stats.ru_maxrss);
}
fn get_rusage_stats() -> libc::rusage {

View File

@@ -18,7 +18,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
thiserror = "1.0"
tokio = "1.11"
crossbeam-utils = "0.8.5"
slog-async = "2.6.0"
slog-stdlog = "4.1.0"

View File

@@ -1,45 +0,0 @@
use crossbeam_utils::thread;
use std::{
fs::File,
io,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> {
let next = AtomicUsize::new(0);
let num_threads = std::cmp::min(paths.len() / 2, 256);
if num_threads <= 1 {
for path in paths {
let file = File::open(&path)?;
file.sync_all()?;
}
return Ok(());
}
thread::scope(|s| -> io::Result<()> {
let mut handles = Vec::new();
for _ in 0..num_threads {
handles.push(s.spawn(|_| loop {
let idx = next.fetch_add(1, Ordering::Relaxed);
if idx >= paths.len() {
return io::Result::Ok(());
}
let file = File::open(&paths[idx])?;
file.sync_all()?;
}));
}
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
})
.unwrap()
}

View File

@@ -8,8 +8,7 @@ pub mod lsn;
/// SeqWait allows waiting for a future sequence number to arrive
pub mod seqwait;
/// append only ordered map implemented with a Vec
pub mod vec_map;
pub mod ordered_vec;
// Async version of SeqWait. Currently unused.
// pub mod seqwait_async;
@@ -40,6 +39,3 @@ pub mod logging;
// Misc
pub mod accum;
/// Utility for quickly fsyncing many files at once
pub mod batch_fsync;

View File

@@ -0,0 +1,273 @@
use std::{
collections::BTreeMap,
ops::{Bound, RangeBounds},
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrderedVec<K, V>(Vec<(K, V)>);
impl<K, V> Default for OrderedVec<K, V> {
fn default() -> Self {
Self(Default::default())
}
}
impl<K: Ord + Copy, V> OrderedVec<K, V> {
pub fn iter(&self) -> std::slice::Iter<'_, (K, V)> {
self.0.iter()
}
pub fn range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
match (range.start_bound(), range.end_bound()) {
(Bound::Excluded(l), Bound::Excluded(u)) if l == u => panic!("Invalid excluded"),
// TODO check for l <= x with or patterns
_ => {}
}
let start_idx = match range.start_bound() {
Bound::Included(key) => match self.0.binary_search_by_key(key, extract_key) {
Ok(idx) => idx,
Err(idx) => idx,
},
Bound::Excluded(key) => match self.0.binary_search_by_key(key, extract_key) {
Ok(idx) => idx + 1,
Err(idx) => idx,
},
Bound::Unbounded => 0,
};
let end_idx = match range.end_bound() {
Bound::Included(key) => match self.0.binary_search_by_key(key, extract_key) {
Ok(idx) => idx + 1,
Err(idx) => idx,
},
Bound::Excluded(key) => match self.0.binary_search_by_key(key, extract_key) {
Ok(idx) => idx,
Err(idx) => idx,
},
Bound::Unbounded => self.0.len(),
};
&self.0[start_idx..end_idx]
}
pub fn append(&mut self, key: K, value: V) {
if let Some((last_key, _last_value)) = self.0.last() {
debug_assert!(last_key < &key);
}
self.0.push((key, value));
}
pub fn append_update(&mut self, key: K, value: V) {
if let Some((last_key, this_value)) = self.0.last_mut() {
use std::cmp::Ordering;
match (*last_key).cmp(&key) {
Ordering::Less => {}
Ordering::Equal => {
*this_value = value;
return;
}
Ordering::Greater => {
panic!();
}
}
}
self.0.push((key, value));
}
pub fn extend(&mut self, other: OrderedVec<K, V>) {
if let (Some((last, _)), Some((first, _))) = (self.0.last(), other.0.first()) {
assert!(last < first);
}
self.0.extend(other.0);
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn copy_prefix(&self, key: &K) -> Self
where
K: Clone,
V: Clone,
{
let idx = match self.0.binary_search_by_key(key, extract_key) {
Ok(idx) => idx,
Err(idx) => idx,
};
OrderedVec(Vec::from(&self.0[..idx]))
}
pub fn copy_split(&self, key: &K) -> (Self, Self)
where
K: Clone,
V: Clone,
{
let idx = match self.0.binary_search_by_key(key, extract_key) {
Ok(idx) => idx,
Err(idx) => idx,
};
(
OrderedVec(Vec::from(&self.0[..idx])),
OrderedVec(Vec::from(&self.0[idx..])),
)
}
}
impl<K: Ord, V> From<BTreeMap<K, V>> for OrderedVec<K, V> {
fn from(map: BTreeMap<K, V>) -> Self {
let vec: Vec<(K, V)> = map.into_iter().collect();
// TODO probably change this
for windows in vec.windows(2) {
let (k1, _data1) = &windows[0];
let (k2, _data2) = &windows[1];
debug_assert!(k1 < k2);
}
OrderedVec(vec)
}
}
fn extract_key<K: Copy, V>(pair: &(K, V)) -> K {
pair.0
}
#[cfg(test)]
mod tests {
use std::{
collections::BTreeMap,
ops::{Bound, RangeBounds},
};
use super::OrderedVec;
#[test]
#[should_panic]
fn invalid_range() {
let mut map = BTreeMap::new();
map.insert(0, ());
let vec: OrderedVec<i32, ()> = OrderedVec::from(map);
struct InvalidRange;
impl RangeBounds<i32> for InvalidRange {
fn start_bound(&self) -> Bound<&i32> {
Bound::Excluded(&0)
}
fn end_bound(&self) -> Bound<&i32> {
Bound::Excluded(&0)
}
}
vec.range(InvalidRange);
}
#[test]
fn range_tests() {
let mut map = BTreeMap::new();
map.insert(0, ());
map.insert(2, ());
map.insert(4, ());
let vec = OrderedVec::from(map);
assert_eq!(vec.range(0..0), &[]);
assert_eq!(vec.range(0..1), &[(0, ())]);
assert_eq!(vec.range(0..2), &[(0, ())]);
assert_eq!(vec.range(0..3), &[(0, ()), (2, ())]);
assert_eq!(vec.range(..0), &[]);
assert_eq!(vec.range(..1), &[(0, ())]);
assert_eq!(vec.range(..3), &[(0, ()), (2, ())]);
assert_eq!(vec.range(..3), &[(0, ()), (2, ())]);
assert_eq!(vec.range(0..=0), &[(0, ())]);
assert_eq!(vec.range(0..=1), &[(0, ())]);
assert_eq!(vec.range(0..=2), &[(0, ()), (2, ())]);
assert_eq!(vec.range(0..=3), &[(0, ()), (2, ())]);
assert_eq!(vec.range(..=0), &[(0, ())]);
assert_eq!(vec.range(..=1), &[(0, ())]);
assert_eq!(vec.range(..=2), &[(0, ()), (2, ())]);
assert_eq!(vec.range(..=3), &[(0, ()), (2, ())]);
}
struct BoundIter {
min: i32,
max: i32,
next: Option<Bound<i32>>,
}
impl BoundIter {
fn new(min: i32, max: i32) -> Self {
Self {
min,
max,
next: Some(Bound::Unbounded),
}
}
}
impl Iterator for BoundIter {
type Item = Bound<i32>;
fn next(&mut self) -> Option<Self::Item> {
let cur = self.next?;
self.next = match &cur {
Bound::Unbounded => Some(Bound::Included(self.min)),
Bound::Included(x) => {
if *x >= self.max {
Some(Bound::Excluded(self.min))
} else {
Some(Bound::Included(x + 1))
}
}
Bound::Excluded(x) => {
if *x >= self.max {
None
} else {
Some(Bound::Excluded(x + 1))
}
}
};
Some(cur)
}
}
#[test]
fn range_exhaustive() {
let map: BTreeMap<i32, ()> = (1..=7).step_by(2).map(|x| (x, ())).collect();
let vec = OrderedVec::from(map.clone());
const RANGE_MIN: i32 = 0;
const RANGE_MAX: i32 = 8;
for lower_bound in BoundIter::new(RANGE_MIN, RANGE_MAX) {
let ub_min = match lower_bound {
Bound::Unbounded => RANGE_MIN,
Bound::Included(x) => x,
Bound::Excluded(x) => x + 1,
};
for upper_bound in BoundIter::new(ub_min, RANGE_MAX) {
let map_range: Vec<(i32, ())> = map
.range((lower_bound, upper_bound))
.map(|(&x, _)| (x, ()))
.collect();
let vec_slice = vec.range((lower_bound, upper_bound));
assert_eq!(map_range, vec_slice);
}
}
}
}

View File

@@ -1,293 +0,0 @@
use std::{cmp::Ordering, ops::RangeBounds};
use serde::{Deserialize, Serialize};
/// Ordered map datastructure implemented in a Vec.
/// Append only - can only add keys that are larger than the
/// current max key.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VecMap<K, V>(Vec<(K, V)>);
impl<K, V> Default for VecMap<K, V> {
fn default() -> Self {
VecMap(Default::default())
}
}
#[derive(Debug)]
pub struct InvalidKey;
impl<K: Ord, V> VecMap<K, V> {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn as_slice(&self) -> &[(K, V)] {
self.0.as_slice()
}
/// This function may panic if given a range where the lower bound is
/// greater than the upper bound.
pub fn slice_range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
use std::ops::Bound::*;
let binary_search = |k: &K| self.0.binary_search_by_key(&k, extract_key);
let start_idx = match range.start_bound() {
Unbounded => 0,
Included(k) => binary_search(k).unwrap_or_else(std::convert::identity),
Excluded(k) => match binary_search(k) {
Ok(idx) => idx + 1,
Err(idx) => idx,
},
};
let end_idx = match range.end_bound() {
Unbounded => self.0.len(),
Included(k) => match binary_search(k) {
Ok(idx) => idx + 1,
Err(idx) => idx,
},
Excluded(k) => binary_search(k).unwrap_or_else(std::convert::identity),
};
&self.0[start_idx..end_idx]
}
/// Add a key value pair to the map.
/// If `key` is less than or equal to the current maximum key
/// the pair will not be added and InvalidKey error will be returned.
pub fn append(&mut self, key: K, value: V) -> Result<(), InvalidKey> {
if let Some((last_key, _last_value)) = self.0.last() {
if &key <= last_key {
return Err(InvalidKey);
}
}
self.0.push((key, value));
Ok(())
}
/// Update the maximum key value pair or add a new key value pair to the map.
/// If `key` is less than the current maximum key no updates or additions
/// will occur and InvalidKey error will be returned.
pub fn append_or_update_last(&mut self, key: K, mut value: V) -> Result<Option<V>, InvalidKey> {
if let Some((last_key, last_value)) = self.0.last_mut() {
match key.cmp(last_key) {
Ordering::Less => return Err(InvalidKey),
Ordering::Equal => {
std::mem::swap(last_value, &mut value);
return Ok(Some(value));
}
Ordering::Greater => {}
}
}
self.0.push((key, value));
Ok(None)
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).
/// Right map contains `cutoff` and everything after (inclusive).
pub fn split_at(&self, cutoff: &K) -> (Self, Self)
where
K: Clone,
V: Clone,
{
let split_idx = self
.0
.binary_search_by_key(&cutoff, extract_key)
.unwrap_or_else(std::convert::identity);
(
VecMap(self.0[..split_idx].to_vec()),
VecMap(self.0[split_idx..].to_vec()),
)
}
/// Move items from `other` to the end of `self`, leaving `other` empty.
/// If any keys in `other` is less than or equal to any key in `self`,
/// `InvalidKey` error will be returned and no mutation will occur.
pub fn extend(&mut self, other: &mut Self) -> Result<(), InvalidKey> {
let self_last_opt = self.0.last().map(extract_key);
let other_first_opt = other.0.last().map(extract_key);
if let (Some(self_last), Some(other_first)) = (self_last_opt, other_first_opt) {
if self_last >= other_first {
return Err(InvalidKey);
}
}
self.0.append(&mut other.0);
Ok(())
}
}
fn extract_key<K, V>(entry: &(K, V)) -> &K {
&entry.0
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, ops::Bound};
use super::VecMap;
#[test]
fn unbounded_range() {
let mut vec = VecMap::default();
vec.append(0, ()).unwrap();
assert_eq!(vec.slice_range(0..0), &[]);
}
#[test]
#[should_panic]
fn invalid_ordering_range() {
let mut vec = VecMap::default();
vec.append(0, ()).unwrap();
#[allow(clippy::reversed_empty_ranges)]
vec.slice_range(1..0);
}
#[test]
fn range_tests() {
let mut vec = VecMap::default();
vec.append(0, ()).unwrap();
vec.append(2, ()).unwrap();
vec.append(4, ()).unwrap();
assert_eq!(vec.slice_range(0..0), &[]);
assert_eq!(vec.slice_range(0..1), &[(0, ())]);
assert_eq!(vec.slice_range(0..2), &[(0, ())]);
assert_eq!(vec.slice_range(0..3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..0), &[]);
assert_eq!(vec.slice_range(..1), &[(0, ())]);
assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(0..=0), &[(0, ())]);
assert_eq!(vec.slice_range(0..=1), &[(0, ())]);
assert_eq!(vec.slice_range(0..=2), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(0..=3), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..=0), &[(0, ())]);
assert_eq!(vec.slice_range(..=1), &[(0, ())]);
assert_eq!(vec.slice_range(..=2), &[(0, ()), (2, ())]);
assert_eq!(vec.slice_range(..=3), &[(0, ()), (2, ())]);
}
struct BoundIter {
min: i32,
max: i32,
next: Option<Bound<i32>>,
}
impl BoundIter {
fn new(min: i32, max: i32) -> Self {
Self {
min,
max,
next: Some(Bound::Unbounded),
}
}
}
impl Iterator for BoundIter {
type Item = Bound<i32>;
fn next(&mut self) -> Option<Self::Item> {
let cur = self.next?;
self.next = match &cur {
Bound::Unbounded => Some(Bound::Included(self.min)),
Bound::Included(x) => {
if *x >= self.max {
Some(Bound::Excluded(self.min))
} else {
Some(Bound::Included(x + 1))
}
}
Bound::Excluded(x) => {
if *x >= self.max {
None
} else {
Some(Bound::Excluded(x + 1))
}
}
};
Some(cur)
}
}
#[test]
fn range_exhaustive() {
let map: BTreeMap<i32, ()> = (1..=7).step_by(2).map(|x| (x, ())).collect();
let mut vec = VecMap::default();
for &key in map.keys() {
vec.append(key, ()).unwrap();
}
const RANGE_MIN: i32 = 0;
const RANGE_MAX: i32 = 8;
for lower_bound in BoundIter::new(RANGE_MIN, RANGE_MAX) {
let ub_min = match lower_bound {
Bound::Unbounded => RANGE_MIN,
Bound::Included(x) => x,
Bound::Excluded(x) => x + 1,
};
for upper_bound in BoundIter::new(ub_min, RANGE_MAX) {
let map_range: Vec<(i32, ())> = map
.range((lower_bound, upper_bound))
.map(|(&x, _)| (x, ()))
.collect();
let vec_slice = vec.slice_range((lower_bound, upper_bound));
assert_eq!(map_range, vec_slice);
}
}
}
#[test]
fn extend() {
let mut left = VecMap::default();
left.append(0, ()).unwrap();
assert_eq!(left.as_slice(), &[(0, ())]);
let mut empty = VecMap::default();
left.extend(&mut empty).unwrap();
assert_eq!(left.as_slice(), &[(0, ())]);
assert_eq!(empty.as_slice(), &[]);
let mut right = VecMap::default();
right.append(1, ()).unwrap();
left.extend(&mut right).unwrap();
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
assert_eq!(right.as_slice(), &[]);
let mut zero_map = VecMap::default();
zero_map.append(0, ()).unwrap();
left.extend(&mut zero_map).unwrap_err();
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
assert_eq!(zero_map.as_slice(), &[(0, ())]);
let mut one_map = VecMap::default();
one_map.append(1, ()).unwrap();
left.extend(&mut one_map).unwrap_err();
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
assert_eq!(one_map.as_slice(), &[(1, ())]);
}
}