Merge WITH CONFLICTS 2025-03-11 main commit '158db414bf881fb358494e3215d192c8fa420a53' into yuchen/dire

ct-io-delta-image-layer-write

Conflicts:
	pageserver/src/virtual_file.rs
	pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
This commit is contained in:
Christian Schwarz
2025-04-09 19:39:56 +02:00
26 changed files with 428 additions and 225 deletions

View File

@@ -3223,6 +3223,7 @@ async fn post_top_tenants(
match order_by {
TenantSorting::ResidentSize => sizes.resident_size,
TenantSorting::MaxLogicalSize => sizes.max_logical_size,
TenantSorting::MaxLogicalSizePerShard => sizes.max_logical_size_per_shard,
}
}

View File

@@ -3842,6 +3842,7 @@ impl Tenant {
resident_size: 0,
physical_size: 0,
max_logical_size: 0,
max_logical_size_per_shard: 0,
};
for timeline in self.timelines.lock().unwrap().values() {
@@ -3858,6 +3859,10 @@ impl Tenant {
);
}
result.max_logical_size_per_shard = result
.max_logical_size
.div_ceil(self.tenant_shard_id.shard_count.count() as u64);
result
}
}

View File

@@ -175,6 +175,7 @@ impl BlobWriter {
start_offset: u64,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> anyhow::Result<Self> {
Ok(Self {
io_buf: Some(BytesMut::new()),
@@ -184,6 +185,7 @@ impl BlobWriter {
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate.enter()?,
ctx,
flush_task_span,
),
offset: start_offset,
})
@@ -331,6 +333,7 @@ pub(crate) mod tests {
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use rand::{Rng, SeedableRng};
use tracing::info_span;
use super::*;
use crate::context::DownloadBehavior;
@@ -354,7 +357,7 @@ pub(crate) mod tests {
let mut offsets = Vec::new();
{
let file = Arc::new(VirtualFile::create_v2(pathbuf.as_path(), ctx).await?);
let mut wtr = BlobWriter::new(file, 0, &gate, ctx).unwrap();
let mut wtr = BlobWriter::new(file, 0, &gate, ctx, info_span!("test")).unwrap();
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr

View File

@@ -9,7 +9,7 @@ use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, Slice};
use tracing::error;
use tracing::{error, info_span};
use utils::id::TimelineId;
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
@@ -77,6 +77,7 @@ impl EphemeralFile {
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
})

View File

@@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{info_span, warn};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use utils::{backoff, pausable_failpoint};
@@ -230,6 +230,7 @@ async fn download_object(
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
ctx,
info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.

View File

@@ -433,7 +433,13 @@ impl DeltaLayerWriterInner {
let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?);
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, ctx)?;
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();

View File

@@ -796,7 +796,13 @@ impl ImageLayerWriterInner {
};
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, ctx)?;
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();

View File

@@ -393,6 +393,9 @@ impl GcCompactionQueue {
if job.dry_run {
flags |= CompactFlags::DryRun;
}
if options.flags.contains(CompactFlags::NoYield) {
flags |= CompactFlags::NoYield;
}
let options = CompactOptions {
flags,
sub_compaction: false,
@@ -1092,7 +1095,7 @@ impl Timeline {
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
tracing::info!(
"latest_gc_cutoff: {}, pitr cutoff {}",
"starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}",
*latest_gc_cutoff,
self.gc_info.read().unwrap().cutoffs.time
);
@@ -1121,6 +1124,7 @@ impl Timeline {
// Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
// wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
// should be !is_key_disposable()
// TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop.
let range = layer_desc.get_key_range();
let mut key = range.start;
while key < range.end {
@@ -2619,6 +2623,7 @@ impl Timeline {
) -> Result<CompactionOutcome, CompactionError> {
let sub_compaction = options.sub_compaction;
let job = GcCompactJob::from_compact_options(options.clone());
let no_yield = options.flags.contains(CompactFlags::NoYield);
if sub_compaction {
info!(
"running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
@@ -2633,14 +2638,15 @@ impl Timeline {
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx).await?;
self.compact_with_gc_inner(cancel, job, ctx, no_yield)
.await?;
}
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(CompactionOutcome::Done);
}
self.compact_with_gc_inner(cancel, job, ctx).await
self.compact_with_gc_inner(cancel, job, ctx, no_yield).await
}
async fn compact_with_gc_inner(
@@ -2648,6 +2654,7 @@ impl Timeline {
cancel: &CancellationToken,
job: GcCompactJob,
ctx: &RequestContext,
no_yield: bool,
) -> Result<CompactionOutcome, CompactionError> {
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
@@ -2917,14 +2924,18 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
if !no_yield {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction when downloading layers: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
}
let resident_layer = layer
.download_and_keep_resident(ctx)
@@ -3058,16 +3069,21 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
keys_processed += 1;
if keys_processed % 1000 == 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
if !no_yield {
keys_processed += 1;
if keys_processed % 1000 == 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction in the main loop: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
}
}
if self.shard_identity.is_key_disposable(&key) {

View File

@@ -1289,10 +1289,8 @@ impl OwnedAsyncWriter for VirtualFile {
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
res?;
Ok(buf)
) -> (FullSlice<Buf>, std::io::Result<()>) {
VirtualFile::write_all_at(self, buf, offset, ctx).await
}
}

View File

@@ -33,7 +33,7 @@ pub trait OwnedAsyncWriter {
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
@@ -69,6 +69,7 @@ where
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
@@ -78,6 +79,7 @@ where
buf_new(),
gate_guard,
ctx.attached_child(),
flush_task_span,
),
submit_offset: start_offset,
}
@@ -121,9 +123,8 @@ where
let mut bytes_amount = submit_offset;
if let Some(buf) = handle_tail(buf) {
bytes_amount += buf.pending() as u64;
let _ = writer
.write_all_at(buf.flush(), submit_offset, &ctx)
.await?;
let (_, res) = writer.write_all_at(buf.flush(), submit_offset, &ctx).await;
let _: () = res?;
}
Ok((bytes_amount, writer))
}
@@ -299,12 +300,12 @@ mod tests {
buf: FullSlice<Buf>,
offset: u64,
_: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
) -> (FullSlice<Buf>, std::io::Result<()>) {
self.writes
.lock()
.unwrap()
.push((Vec::from(&buf[..]), offset));
Ok(buf)
(buf, Ok(()))
}
}
@@ -324,6 +325,7 @@ mod tests {
|| IoBufferMut::with_capacity(2),
gate.enter()?,
ctx,
tracing::Span::none(),
);
writer.write_buffered_borrowed(b"abc", ctx).await?;

View File

@@ -1,9 +1,14 @@
use std::ops::ControlFlow;
use std::{marker::PhantomData, sync::Arc};
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
use utils::sync::duplex;
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
use crate::context::RequestContext;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
@@ -120,6 +125,7 @@ where
buf: B,
gate_guard: utils::sync::gate::GateGuard,
ctx: RequestContext,
span: tracing::Span,
) -> Self
where
B: Buffer<IoBuf = Buf> + Send + 'static,
@@ -127,11 +133,14 @@ where
// It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
let (front, back) = duplex::mpsc::channel(1);
let join_handle = tokio::spawn(async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
});
let join_handle = tokio::spawn(
async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
}
.instrument(span),
);
FlushHandle {
inner: Some(FlushHandleInner {
@@ -240,6 +249,7 @@ where
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<RequestContext> {
// Sends the extra buffer back to the handle.
// TODO: can this ever await and or fail? I think not.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
@@ -255,10 +265,47 @@ where
}
// Write slice to disk at `offset`.
let slice = self
.writer
.write_all_at(request.slice, request.offset, &self.ctx)
.await?;
//
// Error handling happens according to the current policy of crashing
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
// (The upper layers of the Pageserver write path are not equipped to retry write errors
// becasuse they often deallocate the buffers that were already written).
//
// TODO: cancellation sensitiity.
// Without it, if we hit a bug where retrying is never successful,
// then we can't shut down the timeline/tenant/pageserver cleanly because
// layers of the Pageserver write path are holding the gate open for EphemeralFile.
//
// TODO: use utils::backoff::retry once async closures are actually usable
//
let mut slice_storage = Some(request.slice);
for attempt in 1.. {
let result = async {
if attempt > 1 {
info!("retrying flush");
}
let slice = slice_storage.take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
slice_storage = Some(slice);
let res = res.maybe_fatal_err("owned_buffers_io flush");
let Err(err) = res else {
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
ControlFlow::Continue(())
}
.instrument(info_span!("flush_attempt", %attempt))
.await;
match result {
ControlFlow::Break(()) => break,
ControlFlow::Continue(()) => continue,
}
}
let slice = slice_storage.expect("loop must have run at least once");
#[cfg(test)]
{