mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-22 06:40:37 +00:00
Compare commits
1 Commits
split-prox
...
batch-fsyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51b1f2fab7 |
@@ -17,6 +17,7 @@ use bytes::Bytes;
|
|||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use postgres_ffi::pg_constants::BLCKSZ;
|
use postgres_ffi::pg_constants::BLCKSZ;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
use zenith_utils::batch_fsync::BatchFsync;
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -1145,6 +1146,8 @@ impl LayeredTimeline {
|
|||||||
// a lot of memory and/or aren't receiving much updates anymore.
|
// a lot of memory and/or aren't receiving much updates anymore.
|
||||||
let mut disk_consistent_lsn = last_record_lsn;
|
let mut disk_consistent_lsn = last_record_lsn;
|
||||||
|
|
||||||
|
let mut batch_fsync = BatchFsync::default();
|
||||||
|
|
||||||
let mut layer_uploads = Vec::new();
|
let mut layer_uploads = Vec::new();
|
||||||
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) =
|
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) =
|
||||||
layers.peek_oldest_open()
|
layers.peek_oldest_open()
|
||||||
@@ -1176,7 +1179,7 @@ impl LayeredTimeline {
|
|||||||
drop(layers);
|
drop(layers);
|
||||||
drop(write_guard);
|
drop(write_guard);
|
||||||
|
|
||||||
let mut this_layer_uploads = self.evict_layer(oldest_layer_id)?;
|
let mut this_layer_uploads = self.evict_layer(oldest_layer_id, &mut batch_fsync)?;
|
||||||
layer_uploads.append(&mut this_layer_uploads);
|
layer_uploads.append(&mut this_layer_uploads);
|
||||||
|
|
||||||
write_guard = self.write_lock.lock().unwrap();
|
write_guard = self.write_lock.lock().unwrap();
|
||||||
@@ -1198,9 +1201,11 @@ impl LayeredTimeline {
|
|||||||
// new layer files are durable
|
// new layer files are durable
|
||||||
let timeline_dir =
|
let timeline_dir =
|
||||||
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
||||||
timeline_dir.sync_all()?;
|
batch_fsync.add(timeline_dir)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
batch_fsync.done()?;
|
||||||
|
|
||||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||||
// After crash, we will restart WAL streaming and processing from that point.
|
// After crash, we will restart WAL streaming and processing from that point.
|
||||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||||
@@ -1246,7 +1251,7 @@ impl LayeredTimeline {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn evict_layer(&self, layer_id: LayerId) -> Result<Vec<PathBuf>> {
|
fn evict_layer(&self, layer_id: LayerId, batch_fsync: &mut BatchFsync) -> Result<Vec<PathBuf>> {
|
||||||
// Mark the layer as no longer accepting writes and record the end_lsn.
|
// Mark the layer as no longer accepting writes and record the end_lsn.
|
||||||
// This happens in-place, no new layers are created now.
|
// This happens in-place, no new layers are created now.
|
||||||
// We call `get_last_record_lsn` again, which may be different from the
|
// We call `get_last_record_lsn` again, which may be different from the
|
||||||
@@ -1271,7 +1276,7 @@ impl LayeredTimeline {
|
|||||||
drop(layers);
|
drop(layers);
|
||||||
drop(write_guard);
|
drop(write_guard);
|
||||||
|
|
||||||
let new_historics = oldest_layer.write_to_disk(self)?;
|
let new_historics = oldest_layer.write_to_disk(self, batch_fsync)?;
|
||||||
|
|
||||||
write_guard = self.write_lock.lock().unwrap();
|
write_guard = self.write_lock.lock().unwrap();
|
||||||
layers = self.layers.lock().unwrap();
|
layers = self.layers.lock().unwrap();
|
||||||
@@ -1882,9 +1887,13 @@ pub fn evict_layer_if_needed(conf: &PageServerConf) -> Result<()> {
|
|||||||
|
|
||||||
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
|
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
|
||||||
|
|
||||||
|
let mut batch_fsync = BatchFsync::default();
|
||||||
|
|
||||||
timeline
|
timeline
|
||||||
.upgrade_to_layered_timeline()
|
.upgrade_to_layered_timeline()
|
||||||
.evict_layer(layer_id)?;
|
.evict_layer(layer_id, &mut batch_fsync)?;
|
||||||
|
|
||||||
|
batch_fsync.done()?;
|
||||||
|
|
||||||
global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ use crate::{ZTenantId, ZTimelineId};
|
|||||||
use anyhow::{bail, ensure, Result};
|
use anyhow::{bail, ensure, Result};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use zenith_utils::batch_fsync::BatchFsync;
|
||||||
use zenith_utils::vec_map::VecMap;
|
use zenith_utils::vec_map::VecMap;
|
||||||
// avoid binding to Write (conflicts with std::io::Write)
|
// avoid binding to Write (conflicts with std::io::Write)
|
||||||
// while being able to use std::fmt::Write's methods
|
// while being able to use std::fmt::Write's methods
|
||||||
@@ -364,6 +365,7 @@ impl DeltaLayer {
|
|||||||
dropped: bool,
|
dropped: bool,
|
||||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
||||||
relsizes: VecMap<Lsn, u32>,
|
relsizes: VecMap<Lsn, u32>,
|
||||||
|
batch_fsync: &mut BatchFsync,
|
||||||
) -> Result<DeltaLayer> {
|
) -> Result<DeltaLayer> {
|
||||||
if seg.rel.is_blocky() {
|
if seg.rel.is_blocky() {
|
||||||
assert!(!relsizes.is_empty());
|
assert!(!relsizes.is_empty());
|
||||||
@@ -436,7 +438,8 @@ impl DeltaLayer {
|
|||||||
|
|
||||||
// This flushes the underlying 'buf_writer'.
|
// This flushes the underlying 'buf_writer'.
|
||||||
let writer = book.close()?;
|
let writer = book.close()?;
|
||||||
writer.get_ref().sync_all()?;
|
let file = writer.into_inner()?;
|
||||||
|
batch_fsync.add(file)?;
|
||||||
|
|
||||||
trace!("saved {}", &path.display());
|
trace!("saved {}", &path.display());
|
||||||
|
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ use std::fs::File;
|
|||||||
use std::io::{BufWriter, Write};
|
use std::io::{BufWriter, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::{Mutex, MutexGuard};
|
use std::sync::{Mutex, MutexGuard};
|
||||||
|
use zenith_utils::batch_fsync::BatchFsync;
|
||||||
|
|
||||||
use bookfile::{Book, BookWriter};
|
use bookfile::{Book, BookWriter};
|
||||||
|
|
||||||
@@ -261,6 +262,7 @@ impl ImageLayer {
|
|||||||
seg: SegmentTag,
|
seg: SegmentTag,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
base_images: Vec<Bytes>,
|
base_images: Vec<Bytes>,
|
||||||
|
batch_fsync: &mut BatchFsync,
|
||||||
) -> Result<ImageLayer> {
|
) -> Result<ImageLayer> {
|
||||||
let image_type = if seg.rel.is_blocky() {
|
let image_type = if seg.rel.is_blocky() {
|
||||||
let num_blocks: u32 = base_images.len().try_into()?;
|
let num_blocks: u32 = base_images.len().try_into()?;
|
||||||
@@ -320,7 +322,8 @@ impl ImageLayer {
|
|||||||
|
|
||||||
// This flushes the underlying 'buf_writer'.
|
// This flushes the underlying 'buf_writer'.
|
||||||
let writer = book.close()?;
|
let writer = book.close()?;
|
||||||
writer.get_ref().sync_all()?;
|
let file = writer.into_inner()?;
|
||||||
|
batch_fsync.add(file)?;
|
||||||
|
|
||||||
trace!("saved {}", path.display());
|
trace!("saved {}", path.display());
|
||||||
|
|
||||||
@@ -336,6 +339,7 @@ impl ImageLayer {
|
|||||||
timeline: &LayeredTimeline,
|
timeline: &LayeredTimeline,
|
||||||
src: &dyn Layer,
|
src: &dyn Layer,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
batch_fsync: &mut BatchFsync,
|
||||||
) -> Result<ImageLayer> {
|
) -> Result<ImageLayer> {
|
||||||
let seg = src.get_seg_tag();
|
let seg = src.get_seg_tag();
|
||||||
let timelineid = timeline.timelineid;
|
let timelineid = timeline.timelineid;
|
||||||
@@ -364,7 +368,15 @@ impl ImageLayer {
|
|||||||
base_images.push(img);
|
base_images.push(img);
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images)
|
Self::create(
|
||||||
|
conf,
|
||||||
|
timelineid,
|
||||||
|
timeline.tenantid,
|
||||||
|
seg,
|
||||||
|
lsn,
|
||||||
|
base_images,
|
||||||
|
batch_fsync,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use log::*;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
use zenith_utils::batch_fsync::BatchFsync;
|
||||||
use zenith_utils::vec_map::VecMap;
|
use zenith_utils::vec_map::VecMap;
|
||||||
|
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
@@ -601,7 +602,11 @@ impl InMemoryLayer {
|
|||||||
/// WAL records between start and end LSN. (The delta layer is not needed
|
/// WAL records between start and end LSN. (The delta layer is not needed
|
||||||
/// when a new relish is created with a single LSN, so that the start and
|
/// when a new relish is created with a single LSN, so that the start and
|
||||||
/// end LSN are the same.)
|
/// end LSN are the same.)
|
||||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<LayersOnDisk> {
|
pub fn write_to_disk(
|
||||||
|
&self,
|
||||||
|
timeline: &LayeredTimeline,
|
||||||
|
batch_fsync: &mut BatchFsync,
|
||||||
|
) -> Result<LayersOnDisk> {
|
||||||
trace!(
|
trace!(
|
||||||
"write_to_disk {} get_end_lsn is {}",
|
"write_to_disk {} get_end_lsn is {}",
|
||||||
self.filename().display(),
|
self.filename().display(),
|
||||||
@@ -631,6 +636,7 @@ impl InMemoryLayer {
|
|||||||
true,
|
true,
|
||||||
inner.page_versions.ordered_page_version_iter(None),
|
inner.page_versions.ordered_page_version_iter(None),
|
||||||
inner.segsizes.clone(),
|
inner.segsizes.clone(),
|
||||||
|
batch_fsync,
|
||||||
)?;
|
)?;
|
||||||
trace!(
|
trace!(
|
||||||
"freeze: created delta layer for dropped segment {} {}-{}",
|
"freeze: created delta layer for dropped segment {} {}-{}",
|
||||||
@@ -668,6 +674,7 @@ impl InMemoryLayer {
|
|||||||
false,
|
false,
|
||||||
page_versions,
|
page_versions,
|
||||||
segsizes,
|
segsizes,
|
||||||
|
batch_fsync,
|
||||||
)?;
|
)?;
|
||||||
delta_layers.push(delta_layer);
|
delta_layers.push(delta_layer);
|
||||||
trace!(
|
trace!(
|
||||||
@@ -684,7 +691,7 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
// Write a new base image layer at the cutoff point
|
// Write a new base image layer at the cutoff point
|
||||||
let image_layer =
|
let image_layer =
|
||||||
ImageLayer::create_from_src(self.conf, timeline, self, end_lsn_inclusive)?;
|
ImageLayer::create_from_src(self.conf, timeline, self, end_lsn_inclusive, batch_fsync)?;
|
||||||
trace!(
|
trace!(
|
||||||
"freeze: created image layer {} at {}",
|
"freeze: created image layer {} at {}",
|
||||||
self.seg,
|
self.seg,
|
||||||
|
|||||||
43
zenith_utils/src/batch_fsync.rs
Normal file
43
zenith_utils/src/batch_fsync.rs
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
use std::{fs::File, io};
|
||||||
|
|
||||||
|
const MAX_PENDING_FILES: usize = 100;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct BatchFsync {
|
||||||
|
pending: Vec<File>,
|
||||||
|
done: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BatchFsync {
|
||||||
|
pub fn add(&mut self, file: File) -> io::Result<()> {
|
||||||
|
if self.pending.len() == MAX_PENDING_FILES {
|
||||||
|
self.sync_batch()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.pending.push(file);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Must be called before drop.
|
||||||
|
pub fn done(mut self) -> io::Result<()> {
|
||||||
|
self.done = true;
|
||||||
|
self.sync_batch()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync_batch(&mut self) -> io::Result<()> {
|
||||||
|
// TODO parallelize
|
||||||
|
for pending_file in self.pending.drain(..) {
|
||||||
|
pending_file.sync_all()?;
|
||||||
|
}
|
||||||
|
self.pending.clear();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for BatchFsync {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
assert!(self.done);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -43,3 +43,6 @@ pub mod accum;
|
|||||||
|
|
||||||
// Utility for binding TcpListeners with proper socket options.
|
// Utility for binding TcpListeners with proper socket options.
|
||||||
pub mod tcp_listener;
|
pub mod tcp_listener;
|
||||||
|
|
||||||
|
// Call fsync for many files at once.
|
||||||
|
pub mod batch_fsync;
|
||||||
|
|||||||
Reference in New Issue
Block a user