mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
inmemorylayer: flush integration, partial?
This commit is contained in:
@@ -10,11 +10,12 @@ use crate::repository::{Key, Value};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord;
|
||||
use anyhow::{ensure, Result};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
@@ -28,7 +29,7 @@ use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{DeltaLayer, DeltaLayerWriter, Layer};
|
||||
use super::{DeltaLayer, DeltaLayerWriter, Layer, ResidentLayer};
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -313,7 +314,7 @@ impl InMemoryLayer {
|
||||
/// Write this frozen in-memory layer to disk.
|
||||
///
|
||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||
pub(crate) async fn write_to_disk(&self) -> Result<DeltaLayer> {
|
||||
pub(crate) async fn write_to_disk(&self, timeline: &Arc<Timeline>) -> Result<ResidentLayer> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
@@ -352,7 +353,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX)?;
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline)?;
|
||||
Ok(delta_layer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2393,10 +2393,9 @@ impl Timeline {
|
||||
// We will remove frozen layer and add delta layer in one atomic operation later.
|
||||
let layer = self.create_delta_layer(&frozen_layer).await?;
|
||||
(
|
||||
HashMap::from([(
|
||||
layer.filename(),
|
||||
LayerFileMetadata::new(layer.layer_desc().file_size),
|
||||
)]),
|
||||
// FIXME: even though we have a single image and single delta layer assumption
|
||||
// we push them to vec
|
||||
vec![layer.clone()],
|
||||
Some(layer),
|
||||
)
|
||||
};
|
||||
@@ -2421,7 +2420,7 @@ impl Timeline {
|
||||
self.metrics.persistent_bytes_written.inc_by(sz);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer);
|
||||
// release lock on 'layers'
|
||||
}
|
||||
|
||||
@@ -2458,7 +2457,7 @@ impl Timeline {
|
||||
fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
layer_paths_to_upload: impl IntoIterator<Item = ResidentLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
@@ -2506,8 +2505,9 @@ impl Timeline {
|
||||
.context("save_metadata")?;
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for (path, layer_metadata) in layer_paths_to_upload {
|
||||
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
|
||||
for layer in layer_paths_to_upload {
|
||||
let m = LayerFileMetadata::new(layer.layer_desc().file_size);
|
||||
remote_client.schedule_layer_file_upload(layer, &m)?;
|
||||
}
|
||||
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
|
||||
}
|
||||
@@ -2520,10 +2520,9 @@ impl Timeline {
|
||||
async fn create_delta_layer(
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: &Arc<InMemoryLayer>,
|
||||
) -> anyhow::Result<DeltaLayer> {
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let span = tracing::info_span!("blocking");
|
||||
let new_delta: DeltaLayer = tokio::task::spawn_blocking({
|
||||
let _g = span.entered();
|
||||
let new_delta: ResidentLayer = tokio::task::spawn_blocking({
|
||||
let self_clone = Arc::clone(self);
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
move || {
|
||||
@@ -2532,7 +2531,9 @@ impl Timeline {
|
||||
// as long as the write path is still sync and the read impl
|
||||
// is still not fully async. Otherwise executor threads would
|
||||
// be blocked.
|
||||
let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?;
|
||||
let _g = span.entered();
|
||||
let new_delta =
|
||||
Handle::current().block_on(frozen_layer.write_to_disk(&self_clone))?;
|
||||
let new_delta_path = new_delta.path();
|
||||
|
||||
// Sync it to disk.
|
||||
@@ -2550,6 +2551,8 @@ impl Timeline {
|
||||
// 3. rename to the final name
|
||||
// 4. fsync the parent directory.
|
||||
// Note that (1),(2),(3) today happen inside write_to_disk().
|
||||
//
|
||||
// FIXME: writer has already fsynced contents, we only need to fsync the rename
|
||||
par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
|
||||
par_fsync::par_fsync(&[self_clone
|
||||
.conf
|
||||
@@ -2560,7 +2563,8 @@ impl Timeline {
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")??;
|
||||
.context("spawn_blocking")
|
||||
.and_then(|x| x)?;
|
||||
|
||||
Ok(new_delta)
|
||||
}
|
||||
@@ -3775,7 +3779,7 @@ impl Timeline {
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), None)?;
|
||||
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
|
||||
Reference in New Issue
Block a user