mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
ephemeral file: refactor write_blob impl to concentrate mutable state (#5004)
Before this patch, we had the `off` and `blknum` as function-wide mutable state. Now it's contained in the `Writer` struct. The use of `push_bytes` instead of index-based filling of the buffer also makes it easier to reason about what's going on. This is prep for https://github.com/neondatabase/neon/pull/4994
This commit is contained in:
committed by
GitHub
parent
786c7b3708
commit
957af049c2
@@ -102,7 +102,10 @@ impl EphemeralFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_buf_for_write(&self, blkno: u32) -> Result<page_cache::PageWriteGuard, io::Error> {
|
||||
fn get_buf_for_write(
|
||||
&self,
|
||||
blkno: u32,
|
||||
) -> Result<page_cache::PageWriteGuard<'static>, io::Error> {
|
||||
// Look up the right page
|
||||
let cache = page_cache::get();
|
||||
let mut write_guard = match cache
|
||||
@@ -137,50 +140,77 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
|
||||
|
||||
impl BlobWriter for EphemeralFile {
|
||||
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
|
||||
struct Writer<'a> {
|
||||
ephemeral_file: &'a mut EphemeralFile,
|
||||
/// The block to which the next [`push_bytes`] will write.
|
||||
blknum: u32,
|
||||
/// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
|
||||
off: usize,
|
||||
/// Used by [`push_bytes`] to memoize the page cache write guard across calls to it.
|
||||
memo_page_guard: MemoizedPageWriteGuard,
|
||||
}
|
||||
struct MemoizedPageWriteGuard {
|
||||
guard: page_cache::PageWriteGuard<'static>,
|
||||
/// The block number of the page in `guard`.
|
||||
blknum: u32,
|
||||
}
|
||||
impl<'a> Writer<'a> {
|
||||
fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
|
||||
let blknum = (ephemeral_file.size / PAGE_SZ as u64) as u32;
|
||||
Ok(Writer {
|
||||
blknum,
|
||||
off: (ephemeral_file.size % PAGE_SZ as u64) as usize,
|
||||
memo_page_guard: MemoizedPageWriteGuard {
|
||||
guard: ephemeral_file.get_buf_for_write(blknum)?,
|
||||
blknum,
|
||||
},
|
||||
ephemeral_file,
|
||||
})
|
||||
}
|
||||
#[inline(always)]
|
||||
fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
|
||||
// `src_remaining` is the remaining bytes to be written
|
||||
let mut src_remaining = src;
|
||||
while !src_remaining.is_empty() {
|
||||
let page = if self.memo_page_guard.blknum == self.blknum {
|
||||
&mut self.memo_page_guard.guard
|
||||
} else {
|
||||
self.memo_page_guard.guard =
|
||||
self.ephemeral_file.get_buf_for_write(self.blknum)?;
|
||||
self.memo_page_guard.blknum = self.blknum;
|
||||
&mut self.memo_page_guard.guard
|
||||
};
|
||||
let dst_remaining = &mut page[self.off..];
|
||||
let n = min(dst_remaining.len(), src_remaining.len());
|
||||
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
|
||||
self.off += n;
|
||||
src_remaining = &src_remaining[n..];
|
||||
if self.off == PAGE_SZ {
|
||||
// This block is done, move to next one.
|
||||
self.blknum += 1;
|
||||
self.off = 0;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let pos = self.size;
|
||||
|
||||
let mut blknum = (self.size / PAGE_SZ as u64) as u32;
|
||||
let mut off = (pos % PAGE_SZ as u64) as usize;
|
||||
|
||||
let mut buf = self.get_buf_for_write(blknum)?;
|
||||
let mut writer = Writer::new(self)?;
|
||||
|
||||
// Write the length field
|
||||
if srcbuf.len() < 0x80 {
|
||||
buf[off] = srcbuf.len() as u8;
|
||||
off += 1;
|
||||
// short one-byte length header
|
||||
let len_buf = [srcbuf.len() as u8];
|
||||
writer.push_bytes(&len_buf)?;
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
let thislen = PAGE_SZ - off;
|
||||
if thislen < 4 {
|
||||
// it needs to be split across pages
|
||||
buf[off..(off + thislen)].copy_from_slice(&len_buf[..thislen]);
|
||||
blknum += 1;
|
||||
buf = self.get_buf_for_write(blknum)?;
|
||||
buf[0..4 - thislen].copy_from_slice(&len_buf[thislen..]);
|
||||
off = 4 - thislen;
|
||||
} else {
|
||||
buf[off..off + 4].copy_from_slice(&len_buf);
|
||||
off += 4;
|
||||
}
|
||||
writer.push_bytes(&len_buf)?;
|
||||
}
|
||||
|
||||
// Write the payload
|
||||
let mut buf_remain = srcbuf;
|
||||
while !buf_remain.is_empty() {
|
||||
let mut page_remain = PAGE_SZ - off;
|
||||
if page_remain == 0 {
|
||||
blknum += 1;
|
||||
buf = self.get_buf_for_write(blknum)?;
|
||||
off = 0;
|
||||
page_remain = PAGE_SZ;
|
||||
}
|
||||
let this_blk_len = min(page_remain, buf_remain.len());
|
||||
buf[off..(off + this_blk_len)].copy_from_slice(&buf_remain[..this_blk_len]);
|
||||
off += this_blk_len;
|
||||
buf_remain = &buf_remain[this_blk_len..];
|
||||
}
|
||||
drop(buf);
|
||||
writer.push_bytes(srcbuf)?;
|
||||
|
||||
if srcbuf.len() < 0x80 {
|
||||
self.size += 1;
|
||||
|
||||
Reference in New Issue
Block a user