Compare commits

...

4 Commits

Author SHA1 Message Date
Patrick Insinger
c491256f12 pageserver - use batch_fsync for historics 2021-10-09 11:25:42 -07:00
Kirill Bulatov
bf58f7f649 Expose certain layered repository structs to reuse in relish storage (#688) 2021-10-09 19:23:57 +03:00
Patrick Insinger
3f0ebc6a40 pageserver - move early File::open call 2021-10-09 08:45:52 -07:00
Patrick Insinger
0baf4bc796 fix cargo doc complaints 2021-10-09 08:45:46 -07:00
15 changed files with 202 additions and 140 deletions

1
Cargo.lock generated
View File

@@ -2592,6 +2592,7 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"crossbeam-utils",
"hex",
"hex-literal",
"hyper",

View File

@@ -18,6 +18,7 @@ use lazy_static::lazy_static;
use log::*;
use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize};
use zenith_utils::batch_fsync::batch_fsync;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
@@ -111,6 +112,9 @@ lazy_static! {
.expect("failed to define a metric");
}
/// The name of the metadata file pageserver creates per timeline.
pub const METADATA_FILE_NAME: &str = "metadata";
///
/// Repository consists of multiple timelines. Keep them in a hash table.
///
@@ -252,7 +256,16 @@ impl LayeredRepository {
)?;
// List the layers on disk, and load them into the layer map
timeline.load_layer_map(disk_consistent_lsn)?;
let _loaded_layers = timeline.load_layer_map(disk_consistent_lsn)?;
if self.upload_relishes {
schedule_timeline_upload(());
// schedule_timeline_upload(
// self.tenantid,
// timelineid,
// loaded_layers,
// disk_consistent_lsn,
// );
}
// needs to be after load_layer_map
timeline.init_current_logical_size()?;
@@ -351,9 +364,8 @@ impl LayeredRepository {
tenantid: ZTenantId,
data: &TimelineMetadata,
first_save: bool,
) -> Result<PathBuf> {
let timeline_path = conf.timeline_path(&timelineid, &tenantid);
let path = timeline_path.join("metadata");
) -> Result<()> {
let path = metadata_path(conf, timelineid, tenantid);
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = OpenOptions::new()
.write(true)
@@ -377,11 +389,15 @@ impl LayeredRepository {
// fsync the parent directory to ensure the directory entry is durable
if first_save {
let timeline_dir = File::open(&timeline_path)?;
let timeline_dir = File::open(
&path
.parent()
.expect("Metadata should always have a parent dir"),
)?;
timeline_dir.sync_all()?;
}
Ok(path)
Ok(())
}
fn load_metadata(
@@ -389,7 +405,7 @@ impl LayeredRepository {
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> Result<TimelineMetadata> {
let path = conf.timeline_path(&timelineid, &tenantid).join("metadata");
let path = metadata_path(conf, timelineid, tenantid);
let metadata_bytes = std::fs::read(&path)?;
ensure!(metadata_bytes.len() == METADATA_MAX_SAFE_SIZE);
@@ -469,7 +485,7 @@ impl LayeredRepository {
let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?;
if let Some(ancestor_timeline) = &timeline.ancestor_timeline {
// If target_timeline is specified, we only need to know branchpoints of its childs
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timelineid) = target_timelineid {
if ancestor_timeline.timelineid == timelineid {
all_branchpoints
@@ -1023,9 +1039,10 @@ impl LayeredTimeline {
}
///
/// Scan the timeline directory to populate the layer map
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<Vec<PathBuf>> {
info!(
"loading layer map for timeline {} into memory",
self.timelineid
@@ -1035,9 +1052,9 @@ impl LayeredTimeline {
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
let mut local_layers = Vec::with_capacity(imgfilenames.len() + deltafilenames.len());
// First create ImageLayer structs for each image file.
for filename in imgfilenames.iter() {
for filename in &imgfilenames {
if filename.lsn > disk_consistent_lsn {
warn!(
"found future image layer {} on timeline {}",
@@ -1056,11 +1073,11 @@ impl LayeredTimeline {
layer.get_start_lsn(),
self.timelineid
);
local_layers.push(layer.path());
layers.insert_historic(Arc::new(layer));
}
// Then for the Delta files.
for filename in deltafilenames.iter() {
for filename in &deltafilenames {
ensure!(filename.start_lsn < filename.end_lsn);
if filename.end_lsn > disk_consistent_lsn {
warn!(
@@ -1079,10 +1096,11 @@ impl LayeredTimeline {
layer.filename().display(),
self.timelineid,
);
local_layers.push(layer.path());
layers.insert_historic(Arc::new(layer));
}
Ok(())
Ok(local_layers)
}
///
@@ -1329,8 +1347,6 @@ impl LayeredTimeline {
last_record_lsn
);
let timeline_dir = File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
// Take the in-memory layer with the oldest WAL record. If it's older
// than the threshold, write it out to disk as a new image and delta file.
// Repeat until all remaining in-memory layers are within the threshold.
@@ -1343,7 +1359,7 @@ impl LayeredTimeline {
let mut disk_consistent_lsn = last_record_lsn;
let mut created_historics = false;
let mut layer_uploads = Vec::new();
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
@@ -1405,8 +1421,13 @@ impl LayeredTimeline {
layers.remove_historic(frozen.clone());
// Add the historics to the LayerMap
for n in new_historics {
layers.insert_historic(n);
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
}
@@ -1422,7 +1443,9 @@ impl LayeredTimeline {
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
timeline_dir.sync_all()?;
layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
batch_fsync(&layer_uploads)?;
layer_uploads.pop().unwrap();
}
// Save the metadata, with updated 'disk_consistent_lsn', to a
@@ -1449,7 +1472,7 @@ impl LayeredTimeline {
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
let _metadata_path = LayeredRepository::save_metadata(
LayeredRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
@@ -1458,12 +1481,10 @@ impl LayeredTimeline {
)?;
if self.upload_relishes {
schedule_timeline_upload(())
// schedule_timeline_upload(LocalTimeline {
// tenant_id: self.tenantid,
// timeline_id: self.timelineid,
// metadata_path,
// image_layers: image_layer_uploads,
// delta_layers: delta_layer_uploads,
// schedule_timeline_upload(
// self.tenantid,
// self.timelineid,
// layer_uploads,
// disk_consistent_lsn,
// });
}
@@ -1896,6 +1917,15 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
Ok(())
}
fn metadata_path(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> PathBuf {
conf.timeline_path(&timelineid, &tenantid)
.join(METADATA_FILE_NAME)
}
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {

View File

@@ -169,29 +169,7 @@ impl Layer for DeltaLayer {
}
fn filename(&self) -> PathBuf {
PathBuf::from(
DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
}
.to_string(),
)
}
fn path(&self) -> Option<PathBuf> {
Some(Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
},
))
PathBuf::from(self.layer_name().to_string())
}
/// Look up given page in the cache.
@@ -300,9 +278,7 @@ impl Layer for DeltaLayer {
fn delete(&self) -> Result<()> {
// delete underlying file
if let Some(path) = self.path() {
fs::remove_file(path)?;
}
fs::remove_file(self.path())?;
Ok(())
}
@@ -406,9 +382,7 @@ impl DeltaLayer {
let mut inner = delta_layer.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
let path = delta_layer
.path()
.expect("DeltaLayer is supposed to have a layer path on disk");
let path = delta_layer.path();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
@@ -457,8 +431,7 @@ impl DeltaLayer {
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
book.close()?;
trace!("saved {}", &path.display());
@@ -472,12 +445,7 @@ impl DeltaLayer {
&self.path_or_conf,
self.timelineid,
self.tenantid,
&DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
},
&self.layer_name(),
);
let file = File::open(&path)?;
@@ -586,4 +554,23 @@ impl DeltaLayer {
}),
})
}
fn layer_name(&self) -> DeltaFileName {
DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
}
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
)
}
}

View File

@@ -13,6 +13,8 @@ use anyhow::Result;
use log::*;
use zenith_utils::lsn::Lsn;
use super::METADATA_FILE_NAME;
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct DeltaFileName {
@@ -35,7 +37,7 @@ impl DeltaFileName {
/// Parse a string as a delta file name. Returns None if the filename does not
/// match the expected pattern.
///
pub fn from_str(fname: &str) -> Option<Self> {
pub fn parse_str(fname: &str) -> Option<Self> {
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
@@ -168,7 +170,7 @@ impl ImageFileName {
/// Parse a string as an image file name. Returns None if the filename does not
/// match the expected pattern.
///
pub fn from_str(fname: &str) -> Option<Self> {
pub fn parse_str(fname: &str) -> Option<Self> {
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
@@ -286,11 +288,11 @@ pub fn list_files(
let fname = direntry?.file_name();
let fname = fname.to_str().unwrap();
if let Some(deltafilename) = DeltaFileName::from_str(fname) {
if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
deltafiles.push(deltafilename);
} else if let Some(imgfilename) = ImageFileName::from_str(fname) {
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
imgfiles.push(imgfilename);
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
// ignore these
} else {
warn!("unrecognized filename in timeline dir: {}", fname);

View File

@@ -114,25 +114,7 @@ pub struct ImageLayerInner {
impl Layer for ImageLayer {
fn filename(&self) -> PathBuf {
PathBuf::from(
ImageFileName {
seg: self.seg,
lsn: self.lsn,
}
.to_string(),
)
}
fn path(&self) -> Option<PathBuf> {
Some(Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&ImageFileName {
seg: self.seg,
lsn: self.lsn,
},
))
PathBuf::from(self.layer_name().to_string())
}
fn get_timeline_id(&self) -> ZTimelineId {
@@ -222,9 +204,7 @@ impl Layer for ImageLayer {
fn delete(&self) -> Result<()> {
// delete underlying file
if let Some(path) = self.path() {
fs::remove_file(path)?;
}
fs::remove_file(self.path())?;
Ok(())
}
@@ -300,9 +280,7 @@ impl ImageLayer {
let inner = layer.inner.lock().unwrap();
// Write the images into a file
let path = layer
.path()
.expect("ImageLayer is supposed to have a layer path on disk");
let path = layer.path();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
@@ -337,10 +315,9 @@ impl ImageLayer {
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
book.close()?;
trace!("saved {}", &path.display());
trace!("saved {}", path.display());
drop(inner);
@@ -445,15 +422,7 @@ impl ImageLayer {
}
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
let path = Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&ImageFileName {
seg: self.seg,
lsn: self.lsn,
},
);
let path = self.path();
let file = File::open(&path)?;
let book = Book::new(file)?;
@@ -500,4 +469,21 @@ impl ImageLayer {
}),
})
}
fn layer_name(&self) -> ImageFileName {
ImageFileName {
seg: self.seg,
lsn: self.lsn,
}
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
)
}
}

View File

@@ -124,10 +124,6 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn path(&self) -> Option<PathBuf> {
None
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -309,6 +305,18 @@ pub struct FreezeLayers {
pub open: Option<Arc<InMemoryLayer>>,
}
/// A result of an inmemory layer data being written to disk.
pub struct LayersOnDisk {
pub delta_layers: Vec<DeltaLayer>,
pub image_layers: Vec<ImageLayer>,
}
impl LayersOnDisk {
pub fn is_empty(&self) -> bool {
self.delta_layers.is_empty() && self.image_layers.is_empty()
}
}
impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
@@ -669,7 +677,7 @@ impl InMemoryLayer {
/// WAL records between start and end LSN. (The delta layer is not needed
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<LayersOnDisk> {
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
self.filename().display(),
@@ -678,7 +686,7 @@ impl InMemoryLayer {
);
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to aquire the
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
@@ -707,15 +715,17 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn
);
return Ok(vec![Arc::new(delta_layer)]);
return Ok(LayersOnDisk {
delta_layers: vec![delta_layer],
image_layers: Vec::new(),
});
}
let end_lsn = self.end_lsn.unwrap();
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
let mut delta_layers = Vec::new();
if self.start_lsn != end_lsn {
let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1));
@@ -731,7 +741,7 @@ impl InMemoryLayer {
before_page_versions,
before_segsizes,
)?;
frozen_layers.push(Arc::new(delta_layer));
delta_layers.push(delta_layer);
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
@@ -746,9 +756,11 @@ impl InMemoryLayer {
// Write a new base image layer at the cutoff point
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
frozen_layers.push(Arc::new(image_layer));
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
Ok(frozen_layers)
Ok(LayersOnDisk {
delta_layers,
image_layers: vec![image_layer],
})
}
}

View File

@@ -123,10 +123,6 @@ pub trait Layer: Send + Sync {
/// Is the segment represented by this layer dropped by PostgreSQL?
fn is_dropped(&self) -> bool;
/// Gets the physical location of the layer on disk.
/// Some layers, such as in-memory, might not have the location.
fn path(&self) -> Option<PathBuf>;
/// Filename used to store this layer on disk. (Even in-memory layers
/// implement this, to print a handy unique identifier for the layer for
/// log messages, even though they're never not on disk.)

View File

@@ -12,14 +12,12 @@ mod rust_s3;
/// local page server layer files with external storage.
mod synced_storage;
use std::path::Path;
use std::thread;
use std::{path::Path, thread};
use anyhow::Context;
use self::local_fs::LocalFs;
pub use self::synced_storage::schedule_timeline_upload;
use crate::relish_storage::rust_s3::RustS3;
use self::{local_fs::LocalFs, rust_s3::RustS3};
use crate::{PageServerConf, RelishStorageKind};
pub fn run_storage_sync_thread(

View File

@@ -5,9 +5,10 @@ use std::path::Path;
use anyhow::Context;
use s3::{bucket::Bucket, creds::Credentials, region::Region};
use crate::{relish_storage::strip_workspace_prefix, S3Config};
use super::RelishStorage;
use crate::{
relish_storage::{strip_workspace_prefix, RelishStorage},
S3Config,
};
const S3_FILE_SEPARATOR: char = '/';

View File

@@ -209,7 +209,7 @@ impl WALRecord {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::LayeredRepository;
use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME};
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use hex_literal::hex;
@@ -728,7 +728,7 @@ mod tests {
repo.create_empty_timeline(TIMELINE_ID)?;
drop(repo);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join("metadata");
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
assert!(metadata_path.is_file());

View File

@@ -46,7 +46,7 @@ pub fn set_common_metrics_prefix(prefix: &'static str) {
}
/// Prepends a prefix to a common metric name so they are distinguished between
/// different services, see https://github.com/zenithdb/zenith/pull/681
/// different services, see <https://github.com/zenithdb/zenith/pull/681>
/// A call to set_common_metrics_prefix() is necessary prior to calling this.
pub fn new_common_metric_name(unprefixed_metric_name: &str) -> String {
// Not unwrap() because metrics may be initialized after multiple threads have been started.

View File

@@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
thiserror = "1.0"
tokio = "1.11"
crossbeam-utils = "0.8.5"
slog-async = "2.6.0"
slog-stdlog = "4.1.0"

View File

@@ -0,0 +1,45 @@
use crossbeam_utils::thread;
use std::{
fs::File,
io,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> {
let next = AtomicUsize::new(0);
let num_threads = std::cmp::min(paths.len() / 2, 256);
if num_threads <= 1 {
for path in paths {
let file = File::open(&path)?;
file.sync_all()?;
}
return Ok(());
}
thread::scope(|s| -> io::Result<()> {
let mut handles = Vec::new();
for _ in 0..num_threads {
handles.push(s.spawn(|_| loop {
let idx = next.fetch_add(1, Ordering::Relaxed);
if idx >= paths.len() {
return io::Result::Ok(());
}
let file = File::open(&paths[idx])?;
file.sync_all()?;
}));
}
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
})
.unwrap()
}

View File

@@ -40,3 +40,6 @@ pub mod logging;
// Misc
pub mod accum;
/// Utility for quickly fsyncing many files at once
pub mod batch_fsync;

View File

@@ -55,7 +55,7 @@ impl<K: Ord, V> VecMap<K, V> {
}
/// Add a key value pair to the map.
/// If [`key`] is less than or equal to the current maximum key
/// If `key` is less than or equal to the current maximum key
/// the pair will not be added and InvalidKey error will be returned.
pub fn append(&mut self, key: K, value: V) -> Result<(), InvalidKey> {
if let Some((last_key, _last_value)) = self.0.last() {
@@ -69,7 +69,7 @@ impl<K: Ord, V> VecMap<K, V> {
}
/// Update the maximum key value pair or add a new key value pair to the map.
/// If [`key`] is less than the current maximum key no updates or additions
/// If `key` is less than the current maximum key no updates or additions
/// will occur and InvalidKey error will be returned.
pub fn append_or_update_last(&mut self, key: K, mut value: V) -> Result<Option<V>, InvalidKey> {
if let Some((last_key, last_value)) = self.0.last_mut() {
@@ -89,8 +89,8 @@ impl<K: Ord, V> VecMap<K, V> {
/// Split the map into two.
///
/// The left map contains everything before [`cutoff`] (exclusive).
/// Right map contains [`cutoff`] and everything after (inclusive).
/// The left map contains everything before `cutoff` (exclusive).
/// Right map contains `cutoff` and everything after (inclusive).
pub fn split_at(&self, cutoff: &K) -> (Self, Self)
where
K: Clone,
@@ -107,9 +107,9 @@ impl<K: Ord, V> VecMap<K, V> {
)
}
/// Move items from [`other`] to the end of [`self`], leaving [`other`] empty.
/// If any keys in [`other`] is less than or equal to any key in [`self`],
/// [`InvalidKey`] error will be returned and no mutation will occur.
/// Move items from `other` to the end of `self`, leaving `other` empty.
/// If any keys in `other` is less than or equal to any key in `self`,
/// `InvalidKey` error will be returned and no mutation will occur.
pub fn extend(&mut self, other: &mut Self) -> Result<(), InvalidKey> {
let self_last_opt = self.0.last().map(extract_key);
let other_first_opt = other.0.last().map(extract_key);