Compare commits

...

1 Commits

Author SHA1 Message Date
Patrick Insinger
51b1f2fab7 pageserver - call fsync in large batches
Doesn't parallelize the calls yet, just accumulates a number of files
before fsyncing.
2021-11-02 17:08:04 -07:00
6 changed files with 87 additions and 10 deletions

View File

@@ -17,6 +17,7 @@ use bytes::Bytes;
use lazy_static::lazy_static;
use postgres_ffi::pg_constants::BLCKSZ;
use tracing::*;
use zenith_utils::batch_fsync::BatchFsync;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
@@ -1145,6 +1146,8 @@ impl LayeredTimeline {
// a lot of memory and/or aren't receiving much updates anymore.
let mut disk_consistent_lsn = last_record_lsn;
let mut batch_fsync = BatchFsync::default();
let mut layer_uploads = Vec::new();
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) =
layers.peek_oldest_open()
@@ -1176,7 +1179,7 @@ impl LayeredTimeline {
drop(layers);
drop(write_guard);
let mut this_layer_uploads = self.evict_layer(oldest_layer_id)?;
let mut this_layer_uploads = self.evict_layer(oldest_layer_id, &mut batch_fsync)?;
layer_uploads.append(&mut this_layer_uploads);
write_guard = self.write_lock.lock().unwrap();
@@ -1198,9 +1201,11 @@ impl LayeredTimeline {
// new layer files are durable
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
batch_fsync.add(timeline_dir)?;
}
batch_fsync.done()?;
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
@@ -1246,7 +1251,7 @@ impl LayeredTimeline {
Ok(())
}
fn evict_layer(&self, layer_id: LayerId) -> Result<Vec<PathBuf>> {
fn evict_layer(&self, layer_id: LayerId, batch_fsync: &mut BatchFsync) -> Result<Vec<PathBuf>> {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
@@ -1271,7 +1276,7 @@ impl LayeredTimeline {
drop(layers);
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?;
let new_historics = oldest_layer.write_to_disk(self, batch_fsync)?;
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
@@ -1882,9 +1887,13 @@ pub fn evict_layer_if_needed(conf: &PageServerConf) -> Result<()> {
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
let mut batch_fsync = BatchFsync::default();
timeline
.upgrade_to_layered_timeline()
.evict_layer(layer_id)?;
.evict_layer(layer_id, &mut batch_fsync)?;
batch_fsync.done()?;
global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
}

View File

@@ -48,6 +48,7 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use log::*;
use serde::{Deserialize, Serialize};
use zenith_utils::batch_fsync::BatchFsync;
use zenith_utils::vec_map::VecMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
@@ -364,6 +365,7 @@ impl DeltaLayer {
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
relsizes: VecMap<Lsn, u32>,
batch_fsync: &mut BatchFsync,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
@@ -436,7 +438,8 @@ impl DeltaLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
let file = writer.into_inner()?;
batch_fsync.add(file)?;
trace!("saved {}", &path.display());

View File

@@ -39,6 +39,7 @@ use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
use zenith_utils::batch_fsync::BatchFsync;
use bookfile::{Book, BookWriter};
@@ -261,6 +262,7 @@ impl ImageLayer {
seg: SegmentTag,
lsn: Lsn,
base_images: Vec<Bytes>,
batch_fsync: &mut BatchFsync,
) -> Result<ImageLayer> {
let image_type = if seg.rel.is_blocky() {
let num_blocks: u32 = base_images.len().try_into()?;
@@ -320,7 +322,8 @@ impl ImageLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
let file = writer.into_inner()?;
batch_fsync.add(file)?;
trace!("saved {}", path.display());
@@ -336,6 +339,7 @@ impl ImageLayer {
timeline: &LayeredTimeline,
src: &dyn Layer,
lsn: Lsn,
batch_fsync: &mut BatchFsync,
) -> Result<ImageLayer> {
let seg = src.get_seg_tag();
let timelineid = timeline.timelineid;
@@ -364,7 +368,15 @@ impl ImageLayer {
base_images.push(img);
}
Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images)
Self::create(
conf,
timelineid,
timeline.tenantid,
seg,
lsn,
base_images,
batch_fsync,
)
}
///

View File

@@ -19,6 +19,7 @@ use log::*;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use zenith_utils::batch_fsync::BatchFsync;
use zenith_utils::vec_map::VecMap;
use zenith_utils::lsn::Lsn;
@@ -601,7 +602,11 @@ impl InMemoryLayer {
/// WAL records between start and end LSN. (The delta layer is not needed
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<LayersOnDisk> {
pub fn write_to_disk(
&self,
timeline: &LayeredTimeline,
batch_fsync: &mut BatchFsync,
) -> Result<LayersOnDisk> {
trace!(
"write_to_disk {} get_end_lsn is {}",
self.filename().display(),
@@ -631,6 +636,7 @@ impl InMemoryLayer {
true,
inner.page_versions.ordered_page_version_iter(None),
inner.segsizes.clone(),
batch_fsync,
)?;
trace!(
"freeze: created delta layer for dropped segment {} {}-{}",
@@ -668,6 +674,7 @@ impl InMemoryLayer {
false,
page_versions,
segsizes,
batch_fsync,
)?;
delta_layers.push(delta_layer);
trace!(
@@ -684,7 +691,7 @@ impl InMemoryLayer {
// Write a new base image layer at the cutoff point
let image_layer =
ImageLayer::create_from_src(self.conf, timeline, self, end_lsn_inclusive)?;
ImageLayer::create_from_src(self.conf, timeline, self, end_lsn_inclusive, batch_fsync)?;
trace!(
"freeze: created image layer {} at {}",
self.seg,

View File

@@ -0,0 +1,43 @@
use std::{fs::File, io};
const MAX_PENDING_FILES: usize = 100;
#[derive(Default)]
pub struct BatchFsync {
pending: Vec<File>,
done: bool,
}
impl BatchFsync {
pub fn add(&mut self, file: File) -> io::Result<()> {
if self.pending.len() == MAX_PENDING_FILES {
self.sync_batch()?;
}
self.pending.push(file);
Ok(())
}
/// Must be called before drop.
pub fn done(mut self) -> io::Result<()> {
self.done = true;
self.sync_batch()
}
fn sync_batch(&mut self) -> io::Result<()> {
// TODO parallelize
for pending_file in self.pending.drain(..) {
pending_file.sync_all()?;
}
self.pending.clear();
Ok(())
}
}
impl Drop for BatchFsync {
fn drop(&mut self) {
assert!(self.done);
}
}

View File

@@ -43,3 +43,6 @@ pub mod accum;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;
// Call fsync for many files at once.
pub mod batch_fsync;