From 011a578f1bc889b06430052daaa9481e450689c8 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 9 Dec 2024 16:34:53 +0000 Subject: [PATCH] use timeline gate to guard flush task Signed-off-by: Yuchen Liang --- pageserver/benches/bench_ingest.rs | 2 +- pageserver/src/tenant/blob_io.rs | 2 +- .../storage_layer/batch_split_writer.rs | 51 ++++++++++++++++--- .../src/tenant/storage_layer/delta_layer.rs | 11 ++-- .../src/tenant/storage_layer/image_layer.rs | 23 ++++++--- .../tenant/storage_layer/inmemory_layer.rs | 2 + pageserver/src/tenant/timeline.rs | 10 +++- pageserver/src/tenant/timeline/compaction.rs | 20 ++++++-- .../src/tenant/timeline/detach_ancestor.rs | 1 + .../src/tenant/timeline/import_pgdata/flow.rs | 1 + pageserver/src/virtual_file.rs | 7 --- .../owned_buffers_io/aligned_buffer/raw.rs | 4 ++ .../owned_buffers_io/io_buf_aligned.rs | 6 ++- .../virtual_file/owned_buffers_io/write.rs | 16 +++--- 14 files changed, 114 insertions(+), 42 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index b67a9cc479..c163603842 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -116,7 +116,7 @@ async fn ingest( max_concurrency: NonZeroUsize::new(1).unwrap(), }); let (_desc, path) = layer - .write_to_disk(&ctx, None, l0_flush_state.inner()) + .write_to_disk(&ctx, None, l0_flush_state.inner(), &gate) .await? .unwrap(); tokio::fs::remove_file(path).await?; diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index bbf0ed4f66..cee3c1b824 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -426,7 +426,7 @@ impl BlobWriter { /// /// Unlike [`into_inner`](Self::into_inner), this doesn't flush /// the internal buffer before giving access. - pub fn into_inner_no_flush(self) -> VirtualFile { + pub fn into_inner_no_flush(self) -> Arc { self.writer.shutdown_no_flush() } } diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 8a397ceb7a..d997abbce9 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -172,6 +172,7 @@ impl SplitImageLayerWriter { start_key: Key, lsn: Lsn, target_layer_size: u64, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { @@ -182,6 +183,7 @@ impl SplitImageLayerWriter { tenant_shard_id, &(start_key..Key::MAX), lsn, + &gate, ctx, ) .await?, @@ -198,6 +200,7 @@ impl SplitImageLayerWriter { &mut self, key: Key, img: Bytes, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result<()> { // The current estimation is an upper bound of the space that the key/image could take @@ -213,6 +216,7 @@ impl SplitImageLayerWriter { self.tenant_shard_id, &(key..Key::MAX), self.lsn, + &gate, ctx, ) .await?; @@ -301,6 +305,7 @@ impl SplitDeltaLayerWriter { key: Key, lsn: Lsn, val: Value, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result<()> { // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate @@ -318,6 +323,7 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), + &gate, ctx, ) .await?, @@ -336,6 +342,7 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), + &gate, ctx, ) .await?; @@ -448,6 +455,7 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024 * 1024, + &tline.gate, &ctx, ) .await @@ -464,7 +472,7 @@ mod tests { .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &ctx) + .put_image(get_key(0), get_img(0), &tline.gate, &ctx) .await .unwrap(); let layers = image_writer @@ -474,7 +482,13 @@ mod tests { assert_eq!(layers.len(), 1); delta_writer - .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx) + .put_value( + get_key(0), + Lsn(0x18), + Value::Image(get_img(0)), + &tline.gate, + &ctx, + ) .await .unwrap(); let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); @@ -525,6 +539,7 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024 * 1024, + &tline.gate, &ctx, ) .await @@ -542,11 +557,17 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image(get_key(i), get_large_img(), &ctx) + .put_image(get_key(i), get_large_img(), &tline.gate, &ctx) .await .unwrap(); delta_writer - .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx) + .put_value( + get_key(i), + Lsn(0x20), + Value::Image(get_large_img()), + &tline.gate, + &ctx, + ) .await .unwrap(); } @@ -622,6 +643,7 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024, + &tline.gate, &ctx, ) .await @@ -638,11 +660,11 @@ mod tests { .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &ctx) + .put_image(get_key(0), get_img(0), &tline.gate, &ctx) .await .unwrap(); image_writer - .put_image(get_key(1), get_large_img(), &ctx) + .put_image(get_key(1), get_large_img(), &tline.gate, &ctx) .await .unwrap(); let layers = image_writer @@ -652,11 +674,23 @@ mod tests { assert_eq!(layers.len(), 2); delta_writer - .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx) + .put_value( + get_key(0), + Lsn(0x18), + Value::Image(get_img(0)), + &tline.gate, + &ctx, + ) .await .unwrap(); delta_writer - .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx) + .put_value( + get_key(1), + Lsn(0x1A), + Value::Image(get_large_img()), + &tline.gate, + &ctx, + ) .await .unwrap(); let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); @@ -720,6 +754,7 @@ mod tests { get_key(0), Lsn(i as u64 * 16 + 0x10), Value::Image(get_large_img()), + &tline.gate, &ctx, ) .await diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 86a9950f4a..9eb5124f67 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -408,6 +408,7 @@ impl DeltaLayerWriterInner { tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. We don't know @@ -421,9 +422,6 @@ impl DeltaLayerWriterInner { let file = Arc::new(VirtualFile::create(&path, ctx).await?); - // FIXME(yuchen): propagate &gate from parent - let gate = utils::sync::gate::Gate::default(); - // Start at PAGE_SZ, make room for the header block let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, &gate, ctx)?; @@ -639,6 +637,7 @@ impl DeltaLayerWriter { tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { @@ -649,6 +648,7 @@ impl DeltaLayerWriter { tenant_shard_id, key_start, lsn_range, + gate, ctx, ) .await?, @@ -728,7 +728,7 @@ impl Drop for DeltaLayerWriter { // We want to remove the virtual file here, so it's fine to not // having completely flushed unwritten data. let vfile = inner.blob_writer.into_inner_no_flush(); - vfile.remove(); + std::fs::remove_file(vfile.path()).expect("failed to remove the virtual file"); } } } @@ -1905,6 +1905,7 @@ pub(crate) mod test { harness.tenant_shard_id, entries_meta.key_range.start, entries_meta.lsn_range.clone(), + &timeline.gate, &ctx, ) .await?; @@ -2100,6 +2101,7 @@ pub(crate) mod test { tenant.tenant_shard_id, Key::MIN, Lsn(0x11)..truncate_at, + &branch.gate, ctx, ) .await @@ -2234,6 +2236,7 @@ pub(crate) mod test { tenant.tenant_shard_id, *key_start, (*lsn_min)..lsn_end, + &tline.gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index e6e10050f9..8ca2a5c84c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -742,6 +742,7 @@ impl ImageLayerWriterInner { tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. @@ -769,9 +770,6 @@ impl ImageLayerWriterInner { ) }; - // FIXME(yuchen): propagate &gate from parent - let gate = utils::sync::gate::Gate::default(); - // Start at `PAGE_SZ` to make room for the header block. let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, &gate, ctx)?; @@ -987,12 +985,21 @@ impl ImageLayerWriter { tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( - ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx) - .await?, + ImageLayerWriterInner::new( + conf, + timeline_id, + tenant_shard_id, + key_range, + lsn, + &gate, + ctx, + ) + .await?, ), }) } @@ -1044,7 +1051,8 @@ impl ImageLayerWriter { impl Drop for ImageLayerWriter { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - inner.blob_writer.into_inner_no_flush().remove(); + let vfile = inner.blob_writer.into_inner_no_flush(); + std::fs::remove_file(vfile.path()).expect("failed to remove the virtual file"); } } } @@ -1204,6 +1212,7 @@ mod test { harness.tenant_shard_id, &range, lsn, + &timeline.gate, &ctx, ) .await @@ -1269,6 +1278,7 @@ mod test { harness.tenant_shard_id, &range, lsn, + &timeline.gate, &ctx, ) .await @@ -1347,6 +1357,7 @@ mod test { tenant.tenant_shard_id, &key_range, lsn, + &tline.gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 71e53da20f..d54d51ceda 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -717,6 +717,7 @@ impl InMemoryLayer { ctx: &RequestContext, key_range: Option>, l0_flush_global_state: &l0_flush::Inner, + gate: &utils::sync::gate::Gate, ) -> Result> { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the @@ -757,6 +758,7 @@ impl InMemoryLayer { self.tenant_shard_id, Key::MIN, self.start_lsn..end_lsn, + &gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fc741826ab..989e5ad7b5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3917,7 +3917,12 @@ impl Timeline { let ctx = ctx.attached_child(); let work = async move { let Some((desc, path)) = frozen_layer - .write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner()) + .write_to_disk( + &ctx, + key_range, + self_clone.l0_flush_global_state.inner(), + &self_clone.gate, + ) .await? else { return Ok(None); @@ -4391,6 +4396,7 @@ impl Timeline { self.tenant_shard_id, &img_range, lsn, + &self.gate, ctx, ) .await?; @@ -5601,6 +5607,7 @@ impl Timeline { self.tenant_shard_id, &(min_key..end_key), lsn, + &self.gate, ctx, ) .await?; @@ -5654,6 +5661,7 @@ impl Timeline { self.tenant_shard_id, deltas.key_range.start, deltas.lsn_range, + &self.gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index ecd68ba55e..0b88333ade 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -142,6 +142,7 @@ impl KeyHistoryRetention { delta_writer: &mut SplitDeltaLayerWriter, mut image_writer: Option<&mut SplitImageLayerWriter>, stat: &mut CompactionStatistics, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; @@ -153,30 +154,30 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer.put_image(key, img.clone(), ctx).await?; + image_writer.put_image(key, img.clone(), &gate, ctx).await?; } else { delta_writer - .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx) + .put_value(key, cutoff_lsn, Value::Image(img.clone()), &gate, ctx) .await?; } } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.put_value(key, lsn, val, ctx).await?; + delta_writer.put_value(key, lsn, val, &gate, ctx).await?; } } first_batch = false; } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.put_value(key, lsn, val, ctx).await?; + delta_writer.put_value(key, lsn, val, &gate, ctx).await?; } } } let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; for (lsn, val) in above_horizon_logs { stat.produce_key(&val); - delta_writer.put_value(key, lsn, val, ctx).await?; + delta_writer.put_value(key, lsn, val, &gate, ctx).await?; } Ok(()) } @@ -557,6 +558,7 @@ impl Timeline { self.tenant_shard_id, &layer.layer_desc().key_range, layer.layer_desc().image_layer_lsn(), + &self.gate, ctx, ) .await @@ -1158,6 +1160,7 @@ impl Timeline { debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); lsn_range.clone() }, + &self.gate, ctx, ) .await @@ -1989,6 +1992,7 @@ impl Timeline { job_desc.compaction_key_range.start, lowest_retain_lsn, self.get_compaction_target_size(), + &self.gate, ctx, ) .await?, @@ -2064,6 +2068,7 @@ impl Timeline { self.tenant_shard_id, desc.key_range.start, desc.lsn_range.clone(), + &self.gate, ctx, ) .await?, @@ -2079,6 +2084,7 @@ impl Timeline { self.tenant_shard_id, job_desc.compaction_key_range.end, desc.lsn_range.clone(), + &self.gate, ctx, ) .await?, @@ -2119,6 +2125,7 @@ impl Timeline { &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, + &self.gate, ctx, ) .await?; @@ -2148,6 +2155,7 @@ impl Timeline { &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, + &self.gate, ctx, ) .await?; @@ -2467,6 +2475,7 @@ impl CompactionJobExecutor for TimelineAdaptor { self.timeline.tenant_shard_id, key_range.start, lsn_range.clone(), + &self.timeline.gate, ctx, ) .await?; @@ -2542,6 +2551,7 @@ impl TimelineAdaptor { self.timeline.tenant_shard_id, key_range, lsn, + &self.timeline.gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index f8bc4352e2..4e9cc837d0 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -583,6 +583,7 @@ async fn copy_lsn_prefix( target_timeline.tenant_shard_id, layer.layer_desc().key_range.start, layer.layer_desc().lsn_range.start..end_lsn, + &target_timeline.gate, ctx, ) .await diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 4388072606..6e224acf3e 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -752,6 +752,7 @@ impl ChunkProcessingJob { self.timeline.tenant_shard_id, &self.range, self.pgdata_lsn, + &self.timeline.gate, ctx, ) .await?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 2805fb61f5..b6ccce8453 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -1331,13 +1331,6 @@ impl OwnedAsyncWriter for VirtualFile { offset: u64, ctx: &RequestContext, ) -> std::io::Result> { - println!( - "offset={offset}, buf={:?}, buflen={}", - buf.as_ptr(), - buf.len() - ); - assert_eq!(offset % 512, 0); - assert_eq!(buf.as_ptr().align_offset(512), 0); let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await; let x = res.map(|_| buf).unwrap(); Ok(x) diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs index 6c26dec0db..6a3485d947 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs @@ -61,6 +61,10 @@ impl RawAlignedBuffer> { align, } } + + pub const fn const_align() -> usize { + A + } } impl RawAlignedBuffer { diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs index 4ea6b17744..1aa2c3027d 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -1,9 +1,11 @@ use tokio_epoll_uring::{IoBuf, IoBufMut}; -use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf}; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, PageWriteGuardBuf}; /// A marker trait for a mutable aligned buffer type. -pub trait IoBufAlignedMut: IoBufMut {} +pub trait IoBufAlignedMut: IoBufMut { + const ALIGN: usize = virtual_file::get_io_buffer_alignment(); +} /// A marker trait for an aligned buffer type. pub trait IoBufAligned: IoBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 6ab0e14166..df9a8e1327 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -11,7 +11,7 @@ use crate::{ }; use super::{ - io_buf_aligned::IoBufAligned, + io_buf_aligned::{IoBufAligned, IoBufAlignedMut}, io_buf_ext::{FullSlice, IoBufExt}, }; @@ -61,7 +61,7 @@ pub struct BufferedWriter { impl BufferedWriter where - B: Buffer + Send + 'static, + B: IoBufAlignedMut + Buffer + Send + 'static, Buf: IoBufAligned + Send + Sync + CheapCloneForRead, W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { @@ -111,8 +111,11 @@ where #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn shutdown(mut self, ctx: &RequestContext) -> std::io::Result<(u64, W)> { let buf = self.mutable_mut(); - if buf.pending() < buf.cap() { - let count = buf.pending().next_multiple_of(512) - buf.pending(); + let len = buf.pending(); + let cap = buf.cap(); + if len < cap { + // pad zeros to the next io alignment requirement. + let count = len.next_multiple_of(B::ALIGN).min(cap) - len; buf.extend_with(0, count); } if let Some(control) = self.flush(ctx).await? { @@ -132,7 +135,7 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub fn shutdown_no_flush(self) -> W { + pub fn shutdown_no_flush(self) -> Arc { let Self { mutable: _, writer, @@ -140,7 +143,6 @@ where submit_offset: _, } = self; flush_handle.shutdown_no_flush(); - let writer = Arc::into_inner(writer).expect("writer is the only strong reference"); writer } @@ -354,7 +356,7 @@ mod tests { assert_eq!( recorder.get_writes(), { - let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"]; + let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o\0"]; expect } .iter()