use timeline gate to guard flush task

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-12-09 16:34:53 +00:00
parent d079bf1d48
commit 011a578f1b
14 changed files with 114 additions and 42 deletions

View File

@@ -116,7 +116,7 @@ async fn ingest(
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate)
.await?
.unwrap();
tokio::fs::remove_file(path).await?;

View File

@@ -426,7 +426,7 @@ impl BlobWriter {
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
pub fn into_inner_no_flush(self) -> Arc<VirtualFile> {
self.writer.shutdown_no_flush()
}
}

View File

@@ -172,6 +172,7 @@ impl SplitImageLayerWriter {
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -182,6 +183,7 @@ impl SplitImageLayerWriter {
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
&gate,
ctx,
)
.await?,
@@ -198,6 +200,7 @@ impl SplitImageLayerWriter {
&mut self,
key: Key,
img: Bytes,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
@@ -213,6 +216,7 @@ impl SplitImageLayerWriter {
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
&gate,
ctx,
)
.await?;
@@ -301,6 +305,7 @@ impl SplitDeltaLayerWriter {
key: Key,
lsn: Lsn,
val: Value,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
@@ -318,6 +323,7 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
&gate,
ctx,
)
.await?,
@@ -336,6 +342,7 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
&gate,
ctx,
)
.await?;
@@ -448,6 +455,7 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
&ctx,
)
.await
@@ -464,7 +472,7 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &ctx)
.put_image(get_key(0), get_img(0), &tline.gate, &ctx)
.await
.unwrap();
let layers = image_writer
@@ -474,7 +482,13 @@ mod tests {
assert_eq!(layers.len(), 1);
delta_writer
.put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline.gate,
&ctx,
)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
@@ -525,6 +539,7 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
&ctx,
)
.await
@@ -542,11 +557,17 @@ mod tests {
for i in 0..N {
let i = i as u32;
image_writer
.put_image(get_key(i), get_large_img(), &ctx)
.put_image(get_key(i), get_large_img(), &tline.gate, &ctx)
.await
.unwrap();
delta_writer
.put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
.put_value(
get_key(i),
Lsn(0x20),
Value::Image(get_large_img()),
&tline.gate,
&ctx,
)
.await
.unwrap();
}
@@ -622,6 +643,7 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024,
&tline.gate,
&ctx,
)
.await
@@ -638,11 +660,11 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &ctx)
.put_image(get_key(0), get_img(0), &tline.gate, &ctx)
.await
.unwrap();
image_writer
.put_image(get_key(1), get_large_img(), &ctx)
.put_image(get_key(1), get_large_img(), &tline.gate, &ctx)
.await
.unwrap();
let layers = image_writer
@@ -652,11 +674,23 @@ mod tests {
assert_eq!(layers.len(), 2);
delta_writer
.put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline.gate,
&ctx,
)
.await
.unwrap();
delta_writer
.put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
.put_value(
get_key(1),
Lsn(0x1A),
Value::Image(get_large_img()),
&tline.gate,
&ctx,
)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
@@ -720,6 +754,7 @@ mod tests {
get_key(0),
Lsn(i as u64 * 16 + 0x10),
Value::Image(get_large_img()),
&tline.gate,
&ctx,
)
.await

View File

@@ -408,6 +408,7 @@ impl DeltaLayerWriterInner {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
@@ -421,9 +422,6 @@ impl DeltaLayerWriterInner {
let file = Arc::new(VirtualFile::create(&path, ctx).await?);
// FIXME(yuchen): propagate &gate from parent
let gate = utils::sync::gate::Gate::default();
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, &gate, ctx)?;
@@ -639,6 +637,7 @@ impl DeltaLayerWriter {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -649,6 +648,7 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
gate,
ctx,
)
.await?,
@@ -728,7 +728,7 @@ impl Drop for DeltaLayerWriter {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
std::fs::remove_file(vfile.path()).expect("failed to remove the virtual file");
}
}
}
@@ -1905,6 +1905,7 @@ pub(crate) mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&timeline.gate,
&ctx,
)
.await?;
@@ -2100,6 +2101,7 @@ pub(crate) mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
&branch.gate,
ctx,
)
.await
@@ -2234,6 +2236,7 @@ pub(crate) mod test {
tenant.tenant_shard_id,
*key_start,
(*lsn_min)..lsn_end,
&tline.gate,
ctx,
)
.await?;

View File

@@ -742,6 +742,7 @@ impl ImageLayerWriterInner {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
@@ -769,9 +770,6 @@ impl ImageLayerWriterInner {
)
};
// FIXME(yuchen): propagate &gate from parent
let gate = utils::sync::gate::Gate::default();
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, &gate, ctx)?;
@@ -987,12 +985,21 @@ impl ImageLayerWriter {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
ImageLayerWriterInner::new(
conf,
timeline_id,
tenant_shard_id,
key_range,
lsn,
&gate,
ctx,
)
.await?,
),
})
}
@@ -1044,7 +1051,8 @@ impl ImageLayerWriter {
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner_no_flush().remove();
let vfile = inner.blob_writer.into_inner_no_flush();
std::fs::remove_file(vfile.path()).expect("failed to remove the virtual file");
}
}
}
@@ -1204,6 +1212,7 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
&ctx,
)
.await
@@ -1269,6 +1278,7 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
&ctx,
)
.await
@@ -1347,6 +1357,7 @@ mod test {
tenant.tenant_shard_id,
&key_range,
lsn,
&tline.gate,
ctx,
)
.await?;

View File

@@ -717,6 +717,7 @@ impl InMemoryLayer {
ctx: &RequestContext,
key_range: Option<Range<Key>>,
l0_flush_global_state: &l0_flush::Inner,
gate: &utils::sync::gate::Gate,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
@@ -757,6 +758,7 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
&gate,
ctx,
)
.await?;

View File

@@ -3917,7 +3917,12 @@ impl Timeline {
let ctx = ctx.attached_child();
let work = async move {
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.write_to_disk(
&ctx,
key_range,
self_clone.l0_flush_global_state.inner(),
&self_clone.gate,
)
.await?
else {
return Ok(None);
@@ -4391,6 +4396,7 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
&self.gate,
ctx,
)
.await?;
@@ -5601,6 +5607,7 @@ impl Timeline {
self.tenant_shard_id,
&(min_key..end_key),
lsn,
&self.gate,
ctx,
)
.await?;
@@ -5654,6 +5661,7 @@ impl Timeline {
self.tenant_shard_id,
deltas.key_range.start,
deltas.lsn_range,
&self.gate,
ctx,
)
.await?;

View File

@@ -142,6 +142,7 @@ impl KeyHistoryRetention {
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
stat: &mut CompactionStatistics,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
@@ -153,30 +154,30 @@ impl KeyHistoryRetention {
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), ctx).await?;
image_writer.put_image(key, img.clone(), &gate, ctx).await?;
} else {
delta_writer
.put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
.put_value(key, cutoff_lsn, Value::Image(img.clone()), &gate, ctx)
.await?;
}
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.put_value(key, lsn, val, ctx).await?;
delta_writer.put_value(key, lsn, val, &gate, ctx).await?;
}
}
first_batch = false;
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.put_value(key, lsn, val, ctx).await?;
delta_writer.put_value(key, lsn, val, &gate, ctx).await?;
}
}
}
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
for (lsn, val) in above_horizon_logs {
stat.produce_key(&val);
delta_writer.put_value(key, lsn, val, ctx).await?;
delta_writer.put_value(key, lsn, val, &gate, ctx).await?;
}
Ok(())
}
@@ -557,6 +558,7 @@ impl Timeline {
self.tenant_shard_id,
&layer.layer_desc().key_range,
layer.layer_desc().image_layer_lsn(),
&self.gate,
ctx,
)
.await
@@ -1158,6 +1160,7 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
&self.gate,
ctx,
)
.await
@@ -1989,6 +1992,7 @@ impl Timeline {
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
ctx,
)
.await?,
@@ -2064,6 +2068,7 @@ impl Timeline {
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
&self.gate,
ctx,
)
.await?,
@@ -2079,6 +2084,7 @@ impl Timeline {
self.tenant_shard_id,
job_desc.compaction_key_range.end,
desc.lsn_range.clone(),
&self.gate,
ctx,
)
.await?,
@@ -2119,6 +2125,7 @@ impl Timeline {
&mut delta_layer_writer,
image_layer_writer.as_mut(),
&mut stat,
&self.gate,
ctx,
)
.await?;
@@ -2148,6 +2155,7 @@ impl Timeline {
&mut delta_layer_writer,
image_layer_writer.as_mut(),
&mut stat,
&self.gate,
ctx,
)
.await?;
@@ -2467,6 +2475,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
&self.timeline.gate,
ctx,
)
.await?;
@@ -2542,6 +2551,7 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
&self.timeline.gate,
ctx,
)
.await?;

View File

@@ -583,6 +583,7 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
&target_timeline.gate,
ctx,
)
.await

View File

@@ -752,6 +752,7 @@ impl ChunkProcessingJob {
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&self.timeline.gate,
ctx,
)
.await?;

View File

@@ -1331,13 +1331,6 @@ impl OwnedAsyncWriter for VirtualFile {
offset: u64,
ctx: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
println!(
"offset={offset}, buf={:?}, buflen={}",
buf.as_ptr(),
buf.len()
);
assert_eq!(offset % 512, 0);
assert_eq!(buf.as_ptr().align_offset(512), 0);
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
let x = res.map(|_| buf).unwrap();
Ok(x)

View File

@@ -61,6 +61,10 @@ impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
align,
}
}
pub const fn const_align() -> usize {
A
}
}
impl<A: Alignment> RawAlignedBuffer<A> {

View File

@@ -1,9 +1,11 @@
use tokio_epoll_uring::{IoBuf, IoBufMut};
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, PageWriteGuardBuf};
/// A marker trait for a mutable aligned buffer type.
pub trait IoBufAlignedMut: IoBufMut {}
pub trait IoBufAlignedMut: IoBufMut {
const ALIGN: usize = virtual_file::get_io_buffer_alignment();
}
/// A marker trait for an aligned buffer type.
pub trait IoBufAligned: IoBuf {}

View File

@@ -11,7 +11,7 @@ use crate::{
};
use super::{
io_buf_aligned::IoBufAligned,
io_buf_aligned::{IoBufAligned, IoBufAlignedMut},
io_buf_ext::{FullSlice, IoBufExt},
};
@@ -61,7 +61,7 @@ pub struct BufferedWriter<B: Buffer, W> {
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
@@ -111,8 +111,11 @@ where
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn shutdown(mut self, ctx: &RequestContext) -> std::io::Result<(u64, W)> {
let buf = self.mutable_mut();
if buf.pending() < buf.cap() {
let count = buf.pending().next_multiple_of(512) - buf.pending();
let len = buf.pending();
let cap = buf.cap();
if len < cap {
// pad zeros to the next io alignment requirement.
let count = len.next_multiple_of(B::ALIGN).min(cap) - len;
buf.extend_with(0, count);
}
if let Some(control) = self.flush(ctx).await? {
@@ -132,7 +135,7 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub fn shutdown_no_flush(self) -> W {
pub fn shutdown_no_flush(self) -> Arc<W> {
let Self {
mutable: _,
writer,
@@ -140,7 +143,6 @@ where
submit_offset: _,
} = self;
flush_handle.shutdown_no_flush();
let writer = Arc::into_inner(writer).expect("writer is the only strong reference");
writer
}
@@ -354,7 +356,7 @@ mod tests {
assert_eq!(
recorder.get_writes(),
{
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o\0"];
expect
}
.iter()