mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: refactor DeltaLayerWriter to not need a Timeline
This commit is contained in:
@@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
|
||||
#[derive(Clone)]
|
||||
pub struct L0FlushGlobalState(Arc<Inner>);
|
||||
|
||||
pub(crate) enum Inner {
|
||||
pub enum Inner {
|
||||
PageCached,
|
||||
Direct { semaphore: tokio::sync::Semaphore },
|
||||
}
|
||||
@@ -40,7 +40,7 @@ impl L0FlushGlobalState {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn inner(&self) -> &Arc<Inner> {
|
||||
pub fn inner(&self) -> &Arc<Inner> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,13 +36,13 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
@@ -73,8 +73,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -373,7 +372,6 @@ impl DeltaLayer {
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
struct DeltaLayerWriterInner {
|
||||
conf: &'static PageServerConf,
|
||||
pub path: Utf8PathBuf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -417,7 +415,6 @@ impl DeltaLayerWriterInner {
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
Ok(Self {
|
||||
conf,
|
||||
path,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
@@ -488,11 +485,10 @@ impl DeltaLayerWriterInner {
|
||||
async fn finish(
|
||||
self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let temp_path = self.path.clone();
|
||||
let result = self.finish0(key_end, timeline, ctx).await;
|
||||
let result = self.finish0(key_end, ctx).await;
|
||||
if result.is_err() {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
@@ -505,9 +501,8 @@ impl DeltaLayerWriterInner {
|
||||
async fn finish0(
|
||||
self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -572,11 +567,9 @@ impl DeltaLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all().await?;
|
||||
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
trace!("created delta layer {}", layer.local_path());
|
||||
|
||||
Ok(layer)
|
||||
Ok((desc, self.path))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -677,14 +670,9 @@ impl DeltaLayerWriter {
|
||||
pub(crate) async fn finish(
|
||||
mut self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
self.inner
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(key_end, timeline, ctx)
|
||||
.await
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1669,8 +1657,9 @@ pub(crate) mod test {
|
||||
use super::*;
|
||||
use crate::repository::Value;
|
||||
use crate::tenant::harness::TIMELINE_ID;
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
use crate::{
|
||||
context::DownloadBehavior,
|
||||
task_mgr::TaskKind,
|
||||
@@ -1964,9 +1953,8 @@ pub(crate) mod test {
|
||||
res?;
|
||||
}
|
||||
|
||||
let resident = writer
|
||||
.finish(entries_meta.key_range.end, &timeline, &ctx)
|
||||
.await?;
|
||||
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
|
||||
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
|
||||
|
||||
let inner = resident.get_as_delta(&ctx).await?;
|
||||
|
||||
@@ -2155,7 +2143,8 @@ pub(crate) mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
|
||||
let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
|
||||
let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
|
||||
|
||||
copied_layer.get_as_delta(ctx).await.unwrap();
|
||||
|
||||
@@ -2283,7 +2272,9 @@ pub(crate) mod test {
|
||||
for (key, lsn, value) in deltas {
|
||||
writer.put_value(key, lsn, value, ctx).await?;
|
||||
}
|
||||
let delta_layer = writer.finish(key_end, tline, ctx).await?;
|
||||
|
||||
let (desc, path) = writer.finish(key_end, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
|
||||
|
||||
Ok::<_, anyhow::Error>(delta_layer)
|
||||
}
|
||||
|
||||
@@ -12,9 +12,10 @@ use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::ValueReconstructResult;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::{l0_flush, page_cache, walrecord};
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -34,7 +35,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValueReconstructState,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
|
||||
@@ -580,10 +581,10 @@ impl InMemoryLayer {
|
||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||
pub async fn write_to_disk(
|
||||
&self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
key_range: Option<Range<Key>>,
|
||||
) -> Result<Option<ResidentLayer>> {
|
||||
l0_flush_global_state: &l0_flush::Inner,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
@@ -595,9 +596,8 @@ impl InMemoryLayer {
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
|
||||
use l0_flush::Inner;
|
||||
let _concurrency_permit = match &*l0_flush_global_state {
|
||||
let _concurrency_permit = match l0_flush_global_state {
|
||||
Inner::PageCached => None,
|
||||
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
|
||||
};
|
||||
@@ -627,7 +627,7 @@ impl InMemoryLayer {
|
||||
)
|
||||
.await?;
|
||||
|
||||
match &*l0_flush_global_state {
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::PageCached => {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
@@ -692,7 +692,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
|
||||
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
|
||||
|
||||
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
||||
//
|
||||
@@ -704,6 +704,6 @@ impl InMemoryLayer {
|
||||
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
|
||||
drop(_concurrency_permit);
|
||||
|
||||
Ok(Some(delta_layer))
|
||||
Ok(Some((desc, path)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4181,12 +4181,14 @@ impl Timeline {
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
let ctx = ctx.attached_child();
|
||||
let work = async move {
|
||||
let Some(new_delta) = frozen_layer
|
||||
.write_to_disk(&self_clone, &ctx, key_range)
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?;
|
||||
|
||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
@@ -5797,9 +5799,8 @@ impl Timeline {
|
||||
for (key, lsn, val) in deltas.data {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
let delta_layer = delta_layer_writer
|
||||
.finish(deltas.key_range.end, self, ctx)
|
||||
.await?;
|
||||
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
@@ -1006,14 +1006,16 @@ impl Timeline {
|
||||
|| contains_hole
|
||||
{
|
||||
// ... if so, flush previous layer and prepare to write new one
|
||||
new_layers.push(
|
||||
writer
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), self, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
let (desc, path) = writer
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
new_layers.push(new_delta);
|
||||
writer = None;
|
||||
|
||||
if contains_hole {
|
||||
@@ -1076,12 +1078,13 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(
|
||||
writer
|
||||
.finish(prev_key.unwrap().next(), self, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
let (desc, path) = writer
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
new_layers.push(new_delta);
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
@@ -1853,9 +1856,11 @@ impl Timeline {
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
let delta_layer = delta_layer_writer
|
||||
.finish(delta_key.key_range.end, tline, ctx)
|
||||
|
||||
let (desc, path) = delta_layer_writer
|
||||
.finish(delta_key.key_range.end, ctx)
|
||||
.await?;
|
||||
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
|
||||
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
|
||||
}
|
||||
|
||||
@@ -2262,9 +2267,9 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
))
|
||||
});
|
||||
|
||||
let new_delta_layer = writer
|
||||
.finish(prev.unwrap().0.next(), &self.timeline, ctx)
|
||||
.await?;
|
||||
let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
|
||||
let new_delta_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
|
||||
self.new_deltas.push(new_delta_layer);
|
||||
Ok(())
|
||||
|
||||
@@ -488,10 +488,12 @@ async fn copy_lsn_prefix(
|
||||
// reuse the key instead of adding more holes between layers by using the real
|
||||
// highest key in the layer.
|
||||
let reused_highest_key = layer.layer_desc().key_range.end;
|
||||
let copied = writer
|
||||
.finish(reused_highest_key, target_timeline, ctx)
|
||||
let (desc, path) = writer
|
||||
.finish(reused_highest_key, ctx)
|
||||
.await
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
|
||||
tracing::debug!(%layer, %copied, "new layer produced");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user