diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 35a77a7331..c87d1ee052 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -10,11 +10,12 @@ use crate::repository::{Key, Value}; use crate::tenant::block_io::BlockReader; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState}; +use crate::tenant::Timeline; use crate::walrecord; use anyhow::{ensure, Result}; use pageserver_api::models::InMemoryLayerInfo; use std::collections::HashMap; -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use tracing::*; use utils::{ bin_ser::BeSer, @@ -28,7 +29,7 @@ use std::fmt::Write as _; use std::ops::Range; use tokio::sync::RwLock; -use super::{DeltaLayer, DeltaLayerWriter, Layer}; +use super::{DeltaLayer, DeltaLayerWriter, Layer, ResidentLayer}; pub struct InMemoryLayer { conf: &'static PageServerConf, @@ -313,7 +314,7 @@ impl InMemoryLayer { /// Write this frozen in-memory layer to disk. /// /// Returns a new delta layer with all the same data as this in-memory layer - pub(crate) async fn write_to_disk(&self) -> Result { + pub(crate) async fn write_to_disk(&self, timeline: &Arc) -> Result { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the // write lock on it, so we shouldn't block anyone. There's one exception @@ -352,7 +353,7 @@ impl InMemoryLayer { } } - let delta_layer = delta_layer_writer.finish(Key::MAX)?; + let delta_layer = delta_layer_writer.finish(Key::MAX, timeline)?; Ok(delta_layer) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3be999211c..baba433f16 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2393,10 +2393,9 @@ impl Timeline { // We will remove frozen layer and add delta layer in one atomic operation later. let layer = self.create_delta_layer(&frozen_layer).await?; ( - HashMap::from([( - layer.filename(), - LayerFileMetadata::new(layer.layer_desc().file_size), - )]), + // FIXME: even though we have a single image and single delta layer assumption + // we push them to vec + vec![layer.clone()], Some(layer), ) }; @@ -2421,7 +2420,7 @@ impl Timeline { self.metrics.persistent_bytes_written.inc_by(sz); } - guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer); + guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer); // release lock on 'layers' } @@ -2458,7 +2457,7 @@ impl Timeline { fn update_metadata_file( &self, disk_consistent_lsn: Lsn, - layer_paths_to_upload: HashMap, + layer_paths_to_upload: impl IntoIterator, ) -> anyhow::Result<()> { // We can only save a valid 'prev_record_lsn' value on disk if we // flushed *all* in-memory changes to disk. We only track @@ -2506,8 +2505,9 @@ impl Timeline { .context("save_metadata")?; if let Some(remote_client) = &self.remote_client { - for (path, layer_metadata) in layer_paths_to_upload { - remote_client.schedule_layer_file_upload(&path, &layer_metadata)?; + for layer in layer_paths_to_upload { + let m = LayerFileMetadata::new(layer.layer_desc().file_size); + remote_client.schedule_layer_file_upload(layer, &m)?; } remote_client.schedule_index_upload_for_metadata_update(&metadata)?; } @@ -2520,10 +2520,9 @@ impl Timeline { async fn create_delta_layer( self: &Arc, frozen_layer: &Arc, - ) -> anyhow::Result { + ) -> anyhow::Result { let span = tracing::info_span!("blocking"); - let new_delta: DeltaLayer = tokio::task::spawn_blocking({ - let _g = span.entered(); + let new_delta: ResidentLayer = tokio::task::spawn_blocking({ let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); move || { @@ -2532,7 +2531,9 @@ impl Timeline { // as long as the write path is still sync and the read impl // is still not fully async. Otherwise executor threads would // be blocked. - let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?; + let _g = span.entered(); + let new_delta = + Handle::current().block_on(frozen_layer.write_to_disk(&self_clone))?; let new_delta_path = new_delta.path(); // Sync it to disk. @@ -2550,6 +2551,8 @@ impl Timeline { // 3. rename to the final name // 4. fsync the parent directory. // Note that (1),(2),(3) today happen inside write_to_disk(). + // + // FIXME: writer has already fsynced contents, we only need to fsync the rename par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?; par_fsync::par_fsync(&[self_clone .conf @@ -2560,7 +2563,8 @@ impl Timeline { } }) .await - .context("spawn_blocking")??; + .context("spawn_blocking") + .and_then(|x| x)?; Ok(new_delta) } @@ -3775,7 +3779,7 @@ impl Timeline { if !layers_to_remove.is_empty() { // Persist the new GC cutoff value in the metadata file, before // we actually remove anything. - self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?; + self.update_metadata_file(self.disk_consistent_lsn.load(), None)?; // Actually delete the layers from disk and remove them from the map. // (couldn't do this in the loop above, because you cannot modify a collection