extract summary serialization logic into Summary::ser_into_page

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-12-10 22:29:47 +00:00
parent c6795532a0
commit e05b5d6ae4
2 changed files with 32 additions and 22 deletions

View File

@@ -44,8 +44,8 @@ use crate::tenant::vectored_blob_io::{
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{IoBuffer, IoBufferMut};
use crate::TEMP_FILE_SUFFIX;
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
@@ -70,6 +70,7 @@ use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
use tracing::*;
use utils::bin_ser::SerializeError;
use utils::{
bin_ser::BeSer,
@@ -114,6 +115,15 @@ impl From<&DeltaLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(&self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -556,11 +566,9 @@ impl DeltaLayerWriterInner {
index_root_blk,
};
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
let metadata = file
@@ -773,11 +781,8 @@ impl DeltaLayer {
let new_summary = rewrite(actual_summary);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
Ok(())
}

View File

@@ -41,8 +41,8 @@ use crate::tenant::vectored_blob_io::{
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{IoBuffer, IoBufferMut};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
@@ -66,6 +66,7 @@ use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
use utils::bin_ser::SerializeError;
use utils::{
bin_ser::BeSer,
@@ -112,6 +113,15 @@ impl From<&ImageLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(&self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -366,11 +376,8 @@ impl ImageLayer {
let new_summary = rewrite(actual_summary);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
Ok(())
}
@@ -907,11 +914,9 @@ impl ImageLayerWriterInner {
index_root_blk,
};
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
let metadata = file