mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 18:40:38 +00:00
review: shutdown in background task and let caller decide padding behavior
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -312,18 +312,15 @@ impl BlobWriter {
|
||||
///
|
||||
/// This function flushes the internal buffer before giving access
|
||||
/// to the underlying `VirtualFile`.
|
||||
pub async fn into_inner(self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
|
||||
let (_, file) = self.writer.shutdown(ctx).await?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
/// Access the underlying `VirtualFile`.
|
||||
///
|
||||
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
|
||||
/// the internal buffer before giving access.
|
||||
pub fn into_inner_no_flush(self) -> Arc<VirtualFile> {
|
||||
self.writer.shutdown_no_flush()
|
||||
/// The caller can use the `handle_tail` function to change the tail of the buffer before flushing it to disk.
|
||||
/// The buffer will not be flushed to disk if handle_tail returns `None`.
|
||||
pub async fn into_inner(
|
||||
self,
|
||||
handle_tail: impl FnMut(IoBufferMut) -> Option<IoBufferMut>,
|
||||
) -> Result<VirtualFile, Error> {
|
||||
let (_, file) = self.writer.shutdown(handle_tail).await?;
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,7 +371,7 @@ pub(crate) mod tests {
|
||||
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
|
||||
let offs = res?;
|
||||
println!("Writing final blob at offs={offs}");
|
||||
wtr.into_inner(ctx).await?;
|
||||
wtr.into_inner(|_| None).await?;
|
||||
}
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
}
|
||||
|
||||
@@ -246,7 +246,7 @@ async fn download_object<'a>(
|
||||
};
|
||||
buffered.write_buffered_borrowed(&chunk, ctx).await?;
|
||||
}
|
||||
let inner = buffered.shutdown(ctx).await?;
|
||||
let inner = buffered.shutdown(|_| None).await?;
|
||||
Ok(inner)
|
||||
}
|
||||
.await?;
|
||||
|
||||
@@ -408,6 +408,8 @@ struct DeltaLayerWriterInner {
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriterInner {
|
||||
@@ -450,6 +452,7 @@ impl DeltaLayerWriterInner {
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
num_keys: 0,
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -545,7 +548,19 @@ impl DeltaLayerWriterInner {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
let file = self.blob_writer.into_inner(ctx).await?;
|
||||
let file = self
|
||||
.blob_writer
|
||||
.into_inner(|mut buf| {
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
@@ -737,12 +752,37 @@ impl DeltaLayerWriter {
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
// We want to remove the virtual file here, so it's fine to not
|
||||
// having completely flushed unwritten data.
|
||||
let vfile = inner.blob_writer.into_inner_no_flush();
|
||||
std::fs::remove_file(vfile.path()).expect("failed to remove the virtual file");
|
||||
}
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let DeltaLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = match blob_writer
|
||||
.into_inner(|_| None)
|
||||
.await
|
||||
.maybe_fatal_err("failed to access inner virtual file")
|
||||
{
|
||||
Ok(vfile) => vfile,
|
||||
Err(e) => {
|
||||
error!(err=%e, "failed to remove image layer writer file");
|
||||
drop(_gate_guard);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = std::fs::remove_file(vfile.path())
|
||||
.maybe_fatal_err("failed to remove the virtual file")
|
||||
{
|
||||
error!(err=%e, path=%vfile.path(), "failed to remove image layer writer file");
|
||||
}
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1625,7 +1665,7 @@ pub(crate) mod test {
|
||||
|
||||
use itertools::MinMaxResult;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::RngCore;
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
use super::*;
|
||||
use crate::tenant::harness::TIMELINE_ID;
|
||||
|
||||
@@ -55,7 +55,6 @@ use pageserver_api::key::{Key, KEY_SIZE};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::File;
|
||||
@@ -741,6 +740,8 @@ struct ImageLayerWriterInner {
|
||||
blob_writer: BlobWriter,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key,
|
||||
}
|
||||
@@ -805,6 +806,8 @@ impl ImageLayerWriterInner {
|
||||
num_keys: 0,
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key::MIN,
|
||||
|
||||
_gate_guard: gate.enter()?,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -890,7 +893,19 @@ impl ImageLayerWriterInner {
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
|
||||
let file = self.blob_writer.into_inner(ctx).await?;
|
||||
let file = self
|
||||
.blob_writer
|
||||
.into_inner(|mut buf| {
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
@@ -1064,10 +1079,37 @@ impl ImageLayerWriter {
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
let vfile = inner.blob_writer.into_inner_no_flush();
|
||||
std::fs::remove_file(vfile.path()).expect("failed to remove the virtual file");
|
||||
}
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let ImageLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = match blob_writer
|
||||
.into_inner(|_| None)
|
||||
.await
|
||||
.maybe_fatal_err("failed to access inner virtual file")
|
||||
{
|
||||
Ok(vfile) => vfile,
|
||||
Err(e) => {
|
||||
error!(err=%e, "failed to remove image layer writer file");
|
||||
drop(_gate_guard);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = std::fs::remove_file(vfile.path())
|
||||
.maybe_fatal_err("failed to remove the virtual file")
|
||||
{
|
||||
error!(err=%e, path=%vfile.path(), "failed to remove image layer writer file");
|
||||
}
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -189,10 +189,6 @@ impl VirtualFile {
|
||||
self.inner.metadata().await
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
self.inner.remove();
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
self.inner.seek(pos).await
|
||||
}
|
||||
@@ -758,12 +754,6 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => {
|
||||
|
||||
@@ -109,41 +109,28 @@ where
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn shutdown(mut self, ctx: &RequestContext) -> std::io::Result<(u64, W)> {
|
||||
let buf = self.tail_mut();
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
if len < cap {
|
||||
// pad zeros to the next io alignment requirement.
|
||||
let count = len.next_multiple_of(B::ALIGN).min(cap) - len;
|
||||
buf.extend_with(0, count);
|
||||
}
|
||||
if let Some(control) = self.flush(ctx).await? {
|
||||
control.release().await;
|
||||
}
|
||||
|
||||
pub async fn shutdown(
|
||||
self,
|
||||
mut handle_tail: impl FnMut(B) -> Option<B>,
|
||||
) -> std::io::Result<(u64, W)> {
|
||||
let Self {
|
||||
tail: buf,
|
||||
tail,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
submit_offset: bytes_amount,
|
||||
submit_offset,
|
||||
} = self;
|
||||
flush_handle.shutdown().await?;
|
||||
assert!(buf.is_some());
|
||||
let writer = Arc::into_inner(writer).expect("writer is the only strong reference");
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub fn shutdown_no_flush(self) -> Arc<W> {
|
||||
let Self {
|
||||
tail: _,
|
||||
writer,
|
||||
flush_handle,
|
||||
submit_offset: _,
|
||||
} = self;
|
||||
flush_handle.shutdown_no_flush();
|
||||
writer
|
||||
let ctx = flush_handle.shutdown().await?;
|
||||
let buf = tail.expect("must not use after an error");
|
||||
let writer = Arc::into_inner(writer).expect("writer is the only strong reference");
|
||||
let mut bytes_amount = submit_offset;
|
||||
if let Some(buf) = handle_tail(buf) {
|
||||
bytes_amount += buf.pending() as u64;
|
||||
let _ = writer
|
||||
.write_all_at(buf.flush(), submit_offset, &ctx)
|
||||
.await?;
|
||||
}
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
|
||||
/// Gets a immutable reference to the tail in-memory buffer.
|
||||
@@ -353,11 +340,11 @@ mod tests {
|
||||
writer.write_buffered_borrowed(b"j", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"klmno", ctx).await?;
|
||||
|
||||
let (_, recorder) = writer.shutdown(ctx).await?;
|
||||
let (_, recorder) = writer.shutdown(|buf| Some(buf)).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
{
|
||||
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o\0"];
|
||||
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
|
||||
expect
|
||||
}
|
||||
.iter()
|
||||
|
||||
@@ -22,7 +22,7 @@ pub struct FlushHandleInner<Buf, W> {
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<std::io::Result<()>>,
|
||||
join_handle: tokio::task::JoinHandle<std::io::Result<RequestContext>>,
|
||||
|
||||
_phantom: PhantomData<W>,
|
||||
}
|
||||
@@ -187,7 +187,7 @@ where
|
||||
}
|
||||
|
||||
/// Cleans up the channel, join the flush task.
|
||||
pub async fn shutdown(&mut self) -> std::io::Result<()> {
|
||||
pub async fn shutdown(&mut self) -> std::io::Result<RequestContext> {
|
||||
let handle = self
|
||||
.inner
|
||||
.take()
|
||||
@@ -196,14 +196,6 @@ where
|
||||
handle.join_handle.await.unwrap()
|
||||
}
|
||||
|
||||
pub fn shutdown_no_flush(mut self) {
|
||||
let handle = self
|
||||
.inner
|
||||
.take()
|
||||
.expect("must not use after we returned an error");
|
||||
handle.join_handle.abort();
|
||||
}
|
||||
|
||||
/// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
|
||||
/// This only happens if the handle is used after an error.
|
||||
fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
|
||||
@@ -247,7 +239,7 @@ where
|
||||
|
||||
/// Runs the background flush task.
|
||||
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
|
||||
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<()> {
|
||||
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<RequestContext> {
|
||||
// Sends the extra buffer back to the handle.
|
||||
self.channel.send(slice).await.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
|
||||
@@ -283,8 +275,7 @@ where
|
||||
continue;
|
||||
}
|
||||
}
|
||||
drop(self);
|
||||
Ok(())
|
||||
Ok(self.ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user