mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-30 08:40:37 +00:00
Compare commits
1 Commits
batch-fsyn
...
ordered-bl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3de51296a |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2592,7 +2592,6 @@ dependencies = [
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"crossbeam-utils",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"hyper",
|
||||
|
||||
@@ -18,7 +18,6 @@ use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zenith_utils::batch_fsync::batch_fsync;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
@@ -112,9 +111,6 @@ lazy_static! {
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
/// The name of the metadata file pageserver creates per timeline.
|
||||
pub const METADATA_FILE_NAME: &str = "metadata";
|
||||
|
||||
///
|
||||
/// Repository consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -256,16 +252,7 @@ impl LayeredRepository {
|
||||
)?;
|
||||
|
||||
// List the layers on disk, and load them into the layer map
|
||||
let _loaded_layers = timeline.load_layer_map(disk_consistent_lsn)?;
|
||||
if self.upload_relishes {
|
||||
schedule_timeline_upload(());
|
||||
// schedule_timeline_upload(
|
||||
// self.tenantid,
|
||||
// timelineid,
|
||||
// loaded_layers,
|
||||
// disk_consistent_lsn,
|
||||
// );
|
||||
}
|
||||
timeline.load_layer_map(disk_consistent_lsn)?;
|
||||
|
||||
// needs to be after load_layer_map
|
||||
timeline.init_current_logical_size()?;
|
||||
@@ -364,8 +351,9 @@ impl LayeredRepository {
|
||||
tenantid: ZTenantId,
|
||||
data: &TimelineMetadata,
|
||||
first_save: bool,
|
||||
) -> Result<()> {
|
||||
let path = metadata_path(conf, timelineid, tenantid);
|
||||
) -> Result<PathBuf> {
|
||||
let timeline_path = conf.timeline_path(&timelineid, &tenantid);
|
||||
let path = timeline_path.join("metadata");
|
||||
// use OpenOptions to ensure file presence is consistent with first_save
|
||||
let mut file = OpenOptions::new()
|
||||
.write(true)
|
||||
@@ -389,15 +377,11 @@ impl LayeredRepository {
|
||||
|
||||
// fsync the parent directory to ensure the directory entry is durable
|
||||
if first_save {
|
||||
let timeline_dir = File::open(
|
||||
&path
|
||||
.parent()
|
||||
.expect("Metadata should always have a parent dir"),
|
||||
)?;
|
||||
let timeline_dir = File::open(&timeline_path)?;
|
||||
timeline_dir.sync_all()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
fn load_metadata(
|
||||
@@ -405,7 +389,7 @@ impl LayeredRepository {
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<TimelineMetadata> {
|
||||
let path = metadata_path(conf, timelineid, tenantid);
|
||||
let path = conf.timeline_path(&timelineid, &tenantid).join("metadata");
|
||||
let metadata_bytes = std::fs::read(&path)?;
|
||||
ensure!(metadata_bytes.len() == METADATA_MAX_SAFE_SIZE);
|
||||
|
||||
@@ -485,7 +469,7 @@ impl LayeredRepository {
|
||||
let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?;
|
||||
|
||||
if let Some(ancestor_timeline) = &timeline.ancestor_timeline {
|
||||
// If target_timeline is specified, we only need to know branchpoints of its children
|
||||
// If target_timeline is specified, we only need to know branchpoints of its childs
|
||||
if let Some(timelineid) = target_timelineid {
|
||||
if ancestor_timeline.timelineid == timelineid {
|
||||
all_branchpoints
|
||||
@@ -1039,10 +1023,9 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
/// Scan the timeline directory to populate the layer map
|
||||
///
|
||||
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<Vec<PathBuf>> {
|
||||
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"loading layer map for timeline {} into memory",
|
||||
self.timelineid
|
||||
@@ -1052,9 +1035,9 @@ impl LayeredTimeline {
|
||||
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
|
||||
|
||||
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
|
||||
let mut local_layers = Vec::with_capacity(imgfilenames.len() + deltafilenames.len());
|
||||
|
||||
// First create ImageLayer structs for each image file.
|
||||
for filename in &imgfilenames {
|
||||
for filename in imgfilenames.iter() {
|
||||
if filename.lsn > disk_consistent_lsn {
|
||||
warn!(
|
||||
"found future image layer {} on timeline {}",
|
||||
@@ -1073,11 +1056,11 @@ impl LayeredTimeline {
|
||||
layer.get_start_lsn(),
|
||||
self.timelineid
|
||||
);
|
||||
local_layers.push(layer.path());
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
}
|
||||
|
||||
for filename in &deltafilenames {
|
||||
// Then for the Delta files.
|
||||
for filename in deltafilenames.iter() {
|
||||
ensure!(filename.start_lsn < filename.end_lsn);
|
||||
if filename.end_lsn > disk_consistent_lsn {
|
||||
warn!(
|
||||
@@ -1096,11 +1079,10 @@ impl LayeredTimeline {
|
||||
layer.filename().display(),
|
||||
self.timelineid,
|
||||
);
|
||||
local_layers.push(layer.path());
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
}
|
||||
|
||||
Ok(local_layers)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1347,6 +1329,8 @@ impl LayeredTimeline {
|
||||
last_record_lsn
|
||||
);
|
||||
|
||||
let timeline_dir = File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
||||
|
||||
// Take the in-memory layer with the oldest WAL record. If it's older
|
||||
// than the threshold, write it out to disk as a new image and delta file.
|
||||
// Repeat until all remaining in-memory layers are within the threshold.
|
||||
@@ -1359,7 +1343,7 @@ impl LayeredTimeline {
|
||||
let mut disk_consistent_lsn = last_record_lsn;
|
||||
|
||||
let mut created_historics = false;
|
||||
let mut layer_uploads = Vec::new();
|
||||
|
||||
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
|
||||
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
|
||||
|
||||
@@ -1421,13 +1405,8 @@ impl LayeredTimeline {
|
||||
layers.remove_historic(frozen.clone());
|
||||
|
||||
// Add the historics to the LayerMap
|
||||
for delta_layer in new_historics.delta_layers {
|
||||
layer_uploads.push(delta_layer.path());
|
||||
layers.insert_historic(Arc::new(delta_layer));
|
||||
}
|
||||
for image_layer in new_historics.image_layers {
|
||||
layer_uploads.push(image_layer.path());
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
for n in new_historics {
|
||||
layers.insert_historic(n);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1443,9 +1422,7 @@ impl LayeredTimeline {
|
||||
if created_historics {
|
||||
// We must fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
|
||||
batch_fsync(&layer_uploads)?;
|
||||
layer_uploads.pop().unwrap();
|
||||
timeline_dir.sync_all()?;
|
||||
}
|
||||
|
||||
// Save the metadata, with updated 'disk_consistent_lsn', to a
|
||||
@@ -1472,7 +1449,7 @@ impl LayeredTimeline {
|
||||
ancestor_timeline: ancestor_timelineid,
|
||||
ancestor_lsn: self.ancestor_lsn,
|
||||
};
|
||||
LayeredRepository::save_metadata(
|
||||
let _metadata_path = LayeredRepository::save_metadata(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
@@ -1481,10 +1458,12 @@ impl LayeredTimeline {
|
||||
)?;
|
||||
if self.upload_relishes {
|
||||
schedule_timeline_upload(())
|
||||
// schedule_timeline_upload(
|
||||
// self.tenantid,
|
||||
// self.timelineid,
|
||||
// layer_uploads,
|
||||
// schedule_timeline_upload(LocalTimeline {
|
||||
// tenant_id: self.tenantid,
|
||||
// timeline_id: self.timelineid,
|
||||
// metadata_path,
|
||||
// image_layers: image_layer_uploads,
|
||||
// delta_layers: delta_layer_uploads,
|
||||
// disk_consistent_lsn,
|
||||
// });
|
||||
}
|
||||
@@ -1917,15 +1896,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn metadata_path(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> PathBuf {
|
||||
conf.timeline_path(&timelineid, &tenantid)
|
||||
.join(METADATA_FILE_NAME)
|
||||
}
|
||||
|
||||
/// Add a suffix to a layer file's name: .{num}.old
|
||||
/// Uses the first available num (starts at 0)
|
||||
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
|
||||
|
||||
@@ -65,6 +65,7 @@ use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::blob::{read_blob, BlobRange};
|
||||
use super::page_versions::OrderedBlockIter;
|
||||
|
||||
// Magic constant to identify a Zenith delta file
|
||||
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
|
||||
@@ -169,7 +170,29 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
fn filename(&self) -> PathBuf {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
PathBuf::from(
|
||||
DeltaFileName {
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: self.end_lsn,
|
||||
dropped: self.dropped,
|
||||
}
|
||||
.to_string(),
|
||||
)
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
Some(Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&DeltaFileName {
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: self.end_lsn,
|
||||
dropped: self.dropped,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// Look up given page in the cache.
|
||||
@@ -278,7 +301,9 @@ impl Layer for DeltaLayer {
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
if let Some(path) = self.path() {
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -350,7 +375,7 @@ impl DeltaLayer {
|
||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||
/// expedient.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn create<'a>(
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
@@ -358,7 +383,7 @@ impl DeltaLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
||||
page_versions: OrderedBlockIter,
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
) -> Result<DeltaLayer> {
|
||||
if seg.rel.is_blocky() {
|
||||
@@ -382,7 +407,9 @@ impl DeltaLayer {
|
||||
let mut inner = delta_layer.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
let path = delta_layer.path();
|
||||
let path = delta_layer
|
||||
.path()
|
||||
.expect("DeltaLayer is supposed to have a layer path on disk");
|
||||
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
@@ -392,14 +419,20 @@ impl DeltaLayer {
|
||||
|
||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||
|
||||
for (blknum, lsn, page_version) in page_versions {
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
for (blknum, history) in page_versions {
|
||||
for (lsn, page_version) in history.as_slice() {
|
||||
if lsn >= &end_lsn {
|
||||
continue;
|
||||
}
|
||||
|
||||
inner
|
||||
.page_version_metas
|
||||
.append((blknum, lsn), blob_range)
|
||||
.unwrap();
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
|
||||
inner
|
||||
.page_version_metas
|
||||
.append((blknum, *lsn), blob_range)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let book = page_version_writer.close()?;
|
||||
@@ -431,7 +464,8 @@ impl DeltaLayer {
|
||||
let book = chapter.close()?;
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
book.close()?;
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
@@ -445,7 +479,12 @@ impl DeltaLayer {
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&self.layer_name(),
|
||||
&DeltaFileName {
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: self.end_lsn,
|
||||
dropped: self.dropped,
|
||||
},
|
||||
);
|
||||
|
||||
let file = File::open(&path)?;
|
||||
@@ -554,23 +593,4 @@ impl DeltaLayer {
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn layer_name(&self) -> DeltaFileName {
|
||||
DeltaFileName {
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: self.end_lsn,
|
||||
dropped: self.dropped,
|
||||
}
|
||||
}
|
||||
|
||||
/// Path to the layer file in pageserver workdir.
|
||||
pub fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,6 @@ use anyhow::Result;
|
||||
use log::*;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::METADATA_FILE_NAME;
|
||||
|
||||
// Note: LayeredTimeline::load_layer_map() relies on this sort order
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
pub struct DeltaFileName {
|
||||
@@ -37,7 +35,7 @@ impl DeltaFileName {
|
||||
/// Parse a string as a delta file name. Returns None if the filename does not
|
||||
/// match the expected pattern.
|
||||
///
|
||||
pub fn parse_str(fname: &str) -> Option<Self> {
|
||||
pub fn from_str(fname: &str) -> Option<Self> {
|
||||
let rel;
|
||||
let mut parts;
|
||||
if let Some(rest) = fname.strip_prefix("rel_") {
|
||||
@@ -170,7 +168,7 @@ impl ImageFileName {
|
||||
/// Parse a string as an image file name. Returns None if the filename does not
|
||||
/// match the expected pattern.
|
||||
///
|
||||
pub fn parse_str(fname: &str) -> Option<Self> {
|
||||
pub fn from_str(fname: &str) -> Option<Self> {
|
||||
let rel;
|
||||
let mut parts;
|
||||
if let Some(rest) = fname.strip_prefix("rel_") {
|
||||
@@ -288,11 +286,11 @@ pub fn list_files(
|
||||
let fname = direntry?.file_name();
|
||||
let fname = fname.to_str().unwrap();
|
||||
|
||||
if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
|
||||
if let Some(deltafilename) = DeltaFileName::from_str(fname) {
|
||||
deltafiles.push(deltafilename);
|
||||
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
|
||||
} else if let Some(imgfilename) = ImageFileName::from_str(fname) {
|
||||
imgfiles.push(imgfilename);
|
||||
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
|
||||
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
} else {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
|
||||
@@ -114,7 +114,25 @@ pub struct ImageLayerInner {
|
||||
|
||||
impl Layer for ImageLayer {
|
||||
fn filename(&self) -> PathBuf {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
PathBuf::from(
|
||||
ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
}
|
||||
.to_string(),
|
||||
)
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
Some(Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
@@ -204,7 +222,9 @@ impl Layer for ImageLayer {
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
if let Some(path) = self.path() {
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -280,7 +300,9 @@ impl ImageLayer {
|
||||
let inner = layer.inner.lock().unwrap();
|
||||
|
||||
// Write the images into a file
|
||||
let path = layer.path();
|
||||
let path = layer
|
||||
.path()
|
||||
.expect("ImageLayer is supposed to have a layer path on disk");
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let file = File::create(&path)?;
|
||||
@@ -315,9 +337,10 @@ impl ImageLayer {
|
||||
let book = chapter.close()?;
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
book.close()?;
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
|
||||
trace!("saved {}", path.display());
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
drop(inner);
|
||||
|
||||
@@ -422,7 +445,15 @@ impl ImageLayer {
|
||||
}
|
||||
|
||||
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
|
||||
let path = self.path();
|
||||
let path = Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
|
||||
let file = File::open(&path)?;
|
||||
let book = Book::new(file)?;
|
||||
@@ -469,21 +500,4 @@ impl ImageLayer {
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn layer_name(&self) -> ImageFileName {
|
||||
ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
}
|
||||
}
|
||||
|
||||
/// Path to the layer file in pageserver workdir.
|
||||
pub fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,6 +124,10 @@ impl Layer for InMemoryLayer {
|
||||
PathBuf::from(format!("inmem-{}", delta_filename))
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
@@ -276,14 +280,16 @@ impl Layer for InMemoryLayer {
|
||||
println!("segsizes {}: {}", k, v);
|
||||
}
|
||||
|
||||
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
|
||||
println!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
blknum,
|
||||
lsn,
|
||||
pv.page_image.is_some(),
|
||||
pv.record.is_some()
|
||||
);
|
||||
for (blknum, history) in inner.page_versions.ordered_block_iter() {
|
||||
for (lsn, pv) in history.as_slice() {
|
||||
println!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
blknum,
|
||||
lsn,
|
||||
pv.page_image.is_some(),
|
||||
pv.record.is_some()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -305,18 +311,6 @@ pub struct FreezeLayers {
|
||||
pub open: Option<Arc<InMemoryLayer>>,
|
||||
}
|
||||
|
||||
/// A result of an inmemory layer data being written to disk.
|
||||
pub struct LayersOnDisk {
|
||||
pub delta_layers: Vec<DeltaLayer>,
|
||||
pub image_layers: Vec<ImageLayer>,
|
||||
}
|
||||
|
||||
impl LayersOnDisk {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.delta_layers.is_empty() && self.image_layers.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
fn assert_not_frozen(&self) {
|
||||
assert!(self.end_lsn.is_none());
|
||||
@@ -677,7 +671,7 @@ impl InMemoryLayer {
|
||||
/// WAL records between start and end LSN. (The delta layer is not needed
|
||||
/// when a new relish is created with a single LSN, so that the start and
|
||||
/// end LSN are the same.)
|
||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<LayersOnDisk> {
|
||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
trace!(
|
||||
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
|
||||
self.filename().display(),
|
||||
@@ -686,7 +680,7 @@ impl InMemoryLayer {
|
||||
);
|
||||
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// layer is not writeable anymore, no one should be trying to aquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
// though: another thread might have grabbed a reference to this layer
|
||||
// in `get_layer_for_write' just before the checkpointer called
|
||||
@@ -706,7 +700,7 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
drop_lsn,
|
||||
true,
|
||||
inner.page_versions.ordered_page_version_iter(None),
|
||||
inner.page_versions.ordered_block_iter(),
|
||||
inner.segsizes.clone(),
|
||||
)?;
|
||||
trace!(
|
||||
@@ -715,17 +709,13 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
drop_lsn
|
||||
);
|
||||
return Ok(LayersOnDisk {
|
||||
delta_layers: vec![delta_layer],
|
||||
image_layers: Vec::new(),
|
||||
});
|
||||
return Ok(vec![Arc::new(delta_layer)]);
|
||||
}
|
||||
|
||||
let end_lsn = self.end_lsn.unwrap();
|
||||
|
||||
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
|
||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
|
||||
let mut delta_layers = Vec::new();
|
||||
if self.start_lsn != end_lsn {
|
||||
let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1));
|
||||
|
||||
@@ -738,10 +728,10 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn,
|
||||
false,
|
||||
before_page_versions,
|
||||
inner.page_versions.ordered_block_iter(),
|
||||
before_segsizes,
|
||||
)?;
|
||||
delta_layers.push(delta_layer);
|
||||
frozen_layers.push(Arc::new(delta_layer));
|
||||
trace!(
|
||||
"freeze: created delta layer {} {}-{}",
|
||||
self.seg,
|
||||
@@ -749,18 +739,19 @@ impl InMemoryLayer {
|
||||
end_lsn
|
||||
);
|
||||
} else {
|
||||
assert!(before_page_versions.next().is_none());
|
||||
for (_blknum, history) in inner.page_versions.ordered_block_iter() {
|
||||
let (lsn, _pv) = history.as_slice().first().unwrap();
|
||||
assert!(lsn >= &end_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
drop(inner);
|
||||
|
||||
// Write a new base image layer at the cutoff point
|
||||
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
|
||||
frozen_layers.push(Arc::new(image_layer));
|
||||
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
|
||||
|
||||
Ok(LayersOnDisk {
|
||||
delta_layers,
|
||||
image_layers: vec![image_layer],
|
||||
})
|
||||
Ok(frozen_layers)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
||||
use std::{collections::HashMap, ops::RangeBounds};
|
||||
|
||||
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
|
||||
|
||||
@@ -24,14 +24,6 @@ impl PageVersions {
|
||||
map.append_or_update_last(lsn, page_version).unwrap()
|
||||
}
|
||||
|
||||
/// Get all [`PageVersion`]s in a block
|
||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
||||
self.0
|
||||
.get(&blknum)
|
||||
.map(VecMap::as_slice)
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
}
|
||||
|
||||
/// Get a range of [`PageVersions`] in a block
|
||||
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
|
||||
&self,
|
||||
@@ -72,65 +64,33 @@ impl PageVersions {
|
||||
(Self(before_blocks), Self(after_blocks))
|
||||
}
|
||||
|
||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
||||
/// Iterate through block-history pairs in block order.
|
||||
pub fn ordered_block_iter(&self) -> OrderedBlockIter<'_> {
|
||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
||||
ordered_blocks.sort_unstable();
|
||||
|
||||
let slice = ordered_blocks
|
||||
.first()
|
||||
.map(|&blknum| self.get_block_slice(blknum))
|
||||
.unwrap_or(EMPTY_SLICE);
|
||||
|
||||
OrderedPageVersionIter {
|
||||
OrderedBlockIter {
|
||||
page_versions: self,
|
||||
ordered_blocks,
|
||||
cur_block_idx: 0,
|
||||
cutoff_lsn,
|
||||
cur_slice_iter: slice.iter(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OrderedPageVersionIter<'a> {
|
||||
pub struct OrderedBlockIter<'a> {
|
||||
page_versions: &'a PageVersions,
|
||||
|
||||
ordered_blocks: Vec<u32>,
|
||||
cur_block_idx: usize,
|
||||
|
||||
cutoff_lsn: Option<Lsn>,
|
||||
|
||||
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
|
||||
}
|
||||
|
||||
impl OrderedPageVersionIter<'_> {
|
||||
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
|
||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
||||
lsn < cutoff_lsn
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
||||
type Item = (u32, Lsn, &'a PageVersion);
|
||||
impl<'a> Iterator for OrderedBlockIter<'a> {
|
||||
type Item = (u32, &'a VecMap<Lsn, PageVersion>);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
loop {
|
||||
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
|
||||
if self.is_lsn_before_cutoff(lsn) {
|
||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||
return Some((blknum, *lsn, page_version));
|
||||
}
|
||||
}
|
||||
|
||||
let next_block_idx = self.cur_block_idx + 1;
|
||||
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
|
||||
self.cur_block_idx = next_block_idx;
|
||||
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
|
||||
}
|
||||
let blknum: u32 = *self.ordered_blocks.get(self.cur_block_idx)?;
|
||||
self.cur_block_idx += 1;
|
||||
Some((blknum, self.page_versions.0.get(&blknum).unwrap()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,24 +116,14 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let mut iter = page_versions.ordered_page_version_iter(None);
|
||||
let mut iter = page_versions.ordered_block_iter();
|
||||
for blknum in 0..BLOCKS {
|
||||
let (actual_blknum, vec_map) = iter.next().unwrap();
|
||||
let slice = vec_map.as_slice();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(slice.len(), LSNS as usize);
|
||||
for lsn in 0..LSNS {
|
||||
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(Lsn(lsn), actual_lsn);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
const CUTOFF_LSN: Lsn = Lsn(30);
|
||||
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
|
||||
for blknum in 0..BLOCKS {
|
||||
for lsn in 0..CUTOFF_LSN.0 {
|
||||
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(Lsn(lsn), actual_lsn);
|
||||
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
|
||||
@@ -123,6 +123,10 @@ pub trait Layer: Send + Sync {
|
||||
/// Is the segment represented by this layer dropped by PostgreSQL?
|
||||
fn is_dropped(&self) -> bool;
|
||||
|
||||
/// Gets the physical location of the layer on disk.
|
||||
/// Some layers, such as in-memory, might not have the location.
|
||||
fn path(&self) -> Option<PathBuf>;
|
||||
|
||||
/// Filename used to store this layer on disk. (Even in-memory layers
|
||||
/// implement this, to print a handy unique identifier for the layer for
|
||||
/// log messages, even though they're never not on disk.)
|
||||
|
||||
@@ -12,12 +12,14 @@ mod rust_s3;
|
||||
/// local page server layer files with external storage.
|
||||
mod synced_storage;
|
||||
|
||||
use std::{path::Path, thread};
|
||||
use std::path::Path;
|
||||
use std::thread;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use self::local_fs::LocalFs;
|
||||
pub use self::synced_storage::schedule_timeline_upload;
|
||||
use self::{local_fs::LocalFs, rust_s3::RustS3};
|
||||
use crate::relish_storage::rust_s3::RustS3;
|
||||
use crate::{PageServerConf, RelishStorageKind};
|
||||
|
||||
pub fn run_storage_sync_thread(
|
||||
|
||||
@@ -5,10 +5,9 @@ use std::path::Path;
|
||||
use anyhow::Context;
|
||||
use s3::{bucket::Bucket, creds::Credentials, region::Region};
|
||||
|
||||
use crate::{
|
||||
relish_storage::{strip_workspace_prefix, RelishStorage},
|
||||
S3Config,
|
||||
};
|
||||
use crate::{relish_storage::strip_workspace_prefix, S3Config};
|
||||
|
||||
use super::RelishStorage;
|
||||
|
||||
const S3_FILE_SEPARATOR: char = '/';
|
||||
|
||||
|
||||
@@ -209,7 +209,7 @@ impl WALRecord {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME};
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::walredo::{WalRedoError, WalRedoManager};
|
||||
use crate::PageServerConf;
|
||||
use hex_literal::hex;
|
||||
@@ -728,7 +728,7 @@ mod tests {
|
||||
repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
drop(repo);
|
||||
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join("metadata");
|
||||
|
||||
assert!(metadata_path.is_file());
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ pub fn set_common_metrics_prefix(prefix: &'static str) {
|
||||
}
|
||||
|
||||
/// Prepends a prefix to a common metric name so they are distinguished between
|
||||
/// different services, see <https://github.com/zenithdb/zenith/pull/681>
|
||||
/// different services, see https://github.com/zenithdb/zenith/pull/681
|
||||
/// A call to set_common_metrics_prefix() is necessary prior to calling this.
|
||||
pub fn new_common_metric_name(unprefixed_metric_name: &str) -> String {
|
||||
// Not unwrap() because metrics may be initialized after multiple threads have been started.
|
||||
|
||||
@@ -18,7 +18,6 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
thiserror = "1.0"
|
||||
tokio = "1.11"
|
||||
crossbeam-utils = "0.8.5"
|
||||
|
||||
slog-async = "2.6.0"
|
||||
slog-stdlog = "4.1.0"
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
use crossbeam_utils::thread;
|
||||
|
||||
use std::{
|
||||
fs::File,
|
||||
io,
|
||||
path::PathBuf,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> {
|
||||
let next = AtomicUsize::new(0);
|
||||
|
||||
let num_threads = std::cmp::min(paths.len() / 2, 256);
|
||||
|
||||
if num_threads <= 1 {
|
||||
for path in paths {
|
||||
let file = File::open(&path)?;
|
||||
file.sync_all()?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
thread::scope(|s| -> io::Result<()> {
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..num_threads {
|
||||
handles.push(s.spawn(|_| loop {
|
||||
let idx = next.fetch_add(1, Ordering::Relaxed);
|
||||
if idx >= paths.len() {
|
||||
return io::Result::Ok(());
|
||||
}
|
||||
|
||||
let file = File::open(&paths[idx])?;
|
||||
file.sync_all()?;
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
@@ -40,6 +40,3 @@ pub mod logging;
|
||||
|
||||
// Misc
|
||||
pub mod accum;
|
||||
|
||||
/// Utility for quickly fsyncing many files at once
|
||||
pub mod batch_fsync;
|
||||
|
||||
@@ -55,7 +55,7 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
}
|
||||
|
||||
/// Add a key value pair to the map.
|
||||
/// If `key` is less than or equal to the current maximum key
|
||||
/// If [`key`] is less than or equal to the current maximum key
|
||||
/// the pair will not be added and InvalidKey error will be returned.
|
||||
pub fn append(&mut self, key: K, value: V) -> Result<(), InvalidKey> {
|
||||
if let Some((last_key, _last_value)) = self.0.last() {
|
||||
@@ -69,7 +69,7 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
}
|
||||
|
||||
/// Update the maximum key value pair or add a new key value pair to the map.
|
||||
/// If `key` is less than the current maximum key no updates or additions
|
||||
/// If [`key`] is less than the current maximum key no updates or additions
|
||||
/// will occur and InvalidKey error will be returned.
|
||||
pub fn append_or_update_last(&mut self, key: K, mut value: V) -> Result<Option<V>, InvalidKey> {
|
||||
if let Some((last_key, last_value)) = self.0.last_mut() {
|
||||
@@ -89,8 +89,8 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
|
||||
/// Split the map into two.
|
||||
///
|
||||
/// The left map contains everything before `cutoff` (exclusive).
|
||||
/// Right map contains `cutoff` and everything after (inclusive).
|
||||
/// The left map contains everything before [`cutoff`] (exclusive).
|
||||
/// Right map contains [`cutoff`] and everything after (inclusive).
|
||||
pub fn split_at(&self, cutoff: &K) -> (Self, Self)
|
||||
where
|
||||
K: Clone,
|
||||
@@ -107,9 +107,9 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Move items from `other` to the end of `self`, leaving `other` empty.
|
||||
/// If any keys in `other` is less than or equal to any key in `self`,
|
||||
/// `InvalidKey` error will be returned and no mutation will occur.
|
||||
/// Move items from [`other`] to the end of [`self`], leaving [`other`] empty.
|
||||
/// If any keys in [`other`] is less than or equal to any key in [`self`],
|
||||
/// [`InvalidKey`] error will be returned and no mutation will occur.
|
||||
pub fn extend(&mut self, other: &mut Self) -> Result<(), InvalidKey> {
|
||||
let self_last_opt = self.0.last().map(extract_key);
|
||||
let other_first_opt = other.0.last().map(extract_key);
|
||||
|
||||
Reference in New Issue
Block a user