Change chunked_buffer API

This commit is contained in:
Patrick Insinger
2021-11-03 23:07:12 -07:00
parent 556422f9bb
commit 445288d7c1
5 changed files with 155 additions and 183 deletions

View File

@@ -1,4 +1,4 @@
use std::{fs::File, io::Read, io::Write};
use std::{fs::File, io::Write};
use anyhow::Result;
use bookfile::{BookWriter, BoundedReader, ChapterId, ChapterWriter};
@@ -28,14 +28,14 @@ impl<W: Write> BlobWriter<W> {
Self { writer, offset: 0 }
}
pub fn write_blob_from_reader(&mut self, r: &mut impl Read) -> Result<BlobRange> {
let len = std::io::copy(r, &mut self.writer)?;
pub fn write_blob(&mut self, blob: &[u8]) -> Result<BlobRange> {
self.writer.write_all(blob)?;
let range = BlobRange {
offset: self.offset,
size: len as usize,
size: blob.len(),
};
self.offset += len as u64;
self.offset += blob.len() as u64;
Ok(range)
}

View File

@@ -402,9 +402,9 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
let page_versions_iter = page_versions.ordered_page_version_iter(cutoff);
for (blknum, lsn, pos) in page_versions_iter {
let blob_range =
page_version_writer.write_blob_from_reader(&mut page_versions.reader(pos)?)?;
for (blknum, lsn, token) in page_versions_iter {
let blob_range = page_version_writer
.write_blob(page_versions.get_page_version_bytes(token).as_slice())?;
inner
.page_version_metas

View File

@@ -184,8 +184,8 @@ impl Layer for InMemoryLayer {
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (entry_lsn, pos) in iter {
match inner.page_versions.get_page_version(*pos)? {
for (entry_lsn, token) in iter {
match inner.page_versions.get_page_version(token)? {
PageVersion::Page(img) => {
reconstruct_data.page_img = Some(img);
need_image = false;
@@ -285,8 +285,8 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) {
let pv_description = match inner.page_versions.get_page_version(pos)? {
for (blknum, lsn, token) in inner.page_versions.ordered_page_version_iter(None) {
let pv_description = match inner.page_versions.get_page_version(token)? {
PageVersion::Page(_img) => "page",
PageVersion::Wal(_rec) => "wal",
};

View File

@@ -2,20 +2,18 @@ use std::{collections::HashMap, ops::RangeBounds, slice};
use anyhow::Result;
use std::io::{Read, Write};
use zenith_utils::chunked_buffer::{ChunkedBuffer, ChunkedBufferReader};
use zenith_utils::chunked_buffer::{ChunkToken, ChunkedBuffer};
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
use zenith_utils::bin_ser::LeSer;
const EMPTY_SLICE: &[(Lsn, u64)] = &[];
const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[];
#[derive(Debug, Default)]
pub struct PageVersions {
map: HashMap<u32, VecMap<Lsn, u64>>,
map: HashMap<u32, VecMap<Lsn, ChunkToken>>,
/// The PageVersion structs are stored in a serialized format in this buffer.
/// Each serialized PageVersion is preceded by a 'u32' length field.
@@ -29,26 +27,15 @@ impl PageVersions {
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> Option<u64> {
// remember starting position
let pos = self.buffer.len();
// make room for the 'length' field by writing zeros as a placeholder.
self.buffer.write_all(&[0u8; 4]).unwrap();
page_version.ser_into(&mut self.buffer).unwrap();
// write the 'length' field.
let len = self.buffer.len() - pos - 4;
let lenbuf = u32::to_ne_bytes(len as u32);
self.buffer.write_all_at(&lenbuf, pos).unwrap();
) -> Option<ChunkToken> {
let token = self.buffer.write(page_version.ser().unwrap().as_slice());
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, pos as u64).unwrap()
map.append_or_update_last(lsn, token).unwrap()
}
/// Get all [`PageVersion`]s in a block
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] {
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, ChunkToken)] {
self.map
.get(&blknum)
.map(VecMap::as_slice)
@@ -56,7 +43,11 @@ impl PageVersions {
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] {
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
blknum: u32,
range: R,
) -> &[(Lsn, ChunkToken)] {
self.map
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
@@ -83,20 +74,13 @@ impl PageVersions {
}
}
/// Returns a 'Read' that reads the page version at given offset.
pub fn reader(&self, pos: u64) -> Result<std::io::Take<ChunkedBufferReader>, std::io::Error> {
// read length
let mut lenbuf = [0u8; 4];
let mut reader = self.buffer.reader(pos);
reader.read_exact(&mut lenbuf)?;
let len = u32::from_ne_bytes(lenbuf);
Ok(reader.take(len as u64))
pub fn get_page_version_bytes(&self, token: &ChunkToken) -> Vec<u8> {
self.buffer.read(token)
}
pub fn get_page_version(&self, pos: u64) -> Result<PageVersion> {
let mut reader = self.reader(pos)?;
Ok(PageVersion::des_from(&mut reader)?)
pub fn get_page_version(&self, token: &ChunkToken) -> Result<PageVersion> {
let buf = self.get_page_version_bytes(token);
Ok(PageVersion::des(buf.as_slice())?) // TODO unwrap
}
}
@@ -108,7 +92,7 @@ pub struct OrderedPageVersionIter<'a> {
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, u64)>,
cur_slice_iter: slice::Iter<'a, (Lsn, ChunkToken)>,
}
impl OrderedPageVersionIter<'_> {
@@ -122,14 +106,14 @@ impl OrderedPageVersionIter<'_> {
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, u64);
type Item = (u32, Lsn, &'a ChunkToken);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, pos)) = self.cur_slice_iter.next() {
if let Some((lsn, token)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, *pos));
return Some((blknum, *lsn, token));
}
}

View File

@@ -1,5 +1,4 @@
use std::cmp::min;
use std::io::{Read, Write};
use std::convert::TryInto;
const CHUNK_SIZE: usize = 1024;
@@ -20,166 +19,155 @@ pub struct ChunkedBuffer {
// heuristic doesn't save us.)
#[allow(clippy::vec_box)]
chunks: Vec<Box<[u8; CHUNK_SIZE]>>,
len: usize,
length: usize,
}
#[derive(Debug, PartialEq, Eq)]
pub struct ChunkToken {
start: usize,
length: usize,
}
impl ChunkedBuffer {
/// Return current length of the buffer, in bytes
pub fn len(&self) -> usize {
self.len
}
pub fn read(&self, token: &ChunkToken) -> Vec<u8> {
let mut buf = Vec::with_capacity(token.length);
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Create a "cursor" for reading from the buffer.
pub fn reader(&self, pos: u64) -> ChunkedBufferReader {
ChunkedBufferReader {
parent: self,
pos: pos as usize,
}
}
/// Write 'buf' to the given byte position in the buffer.
///
/// The buffer is expanded if needed. If 'pos' is past the end of the buffer,
/// the "gap" is filled with zeros.
///
/// This can stop short, depending on where the chunk boundaries are. But
/// always writes at least one byte.
pub fn write_at(&mut self, buf: &[u8], pos: usize) -> Result<usize, std::io::Error> {
let chunk_idx = pos / CHUNK_SIZE;
let chunk_off = pos % CHUNK_SIZE;
while chunk_idx >= self.chunks.len() {
self.chunks.push(Box::new([0; CHUNK_SIZE]));
let chunk_idx = token.start / CHUNK_SIZE;
let mut chunk_iter = self.chunks[chunk_idx..].iter();
let mut bytes_remaining = token.length;
if bytes_remaining > 0 {
let start_offset = token.start % CHUNK_SIZE;
let chunk_bytes = &chunk_iter.next().unwrap()[start_offset..];
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
buf.extend_from_slice(bytes);
bytes_remaining -= bytes.len();
}
let n = min(CHUNK_SIZE - chunk_off, buf.len());
let chunk = &mut self.chunks[chunk_idx];
chunk[chunk_off..(chunk_off + n)].copy_from_slice(&buf[0..n]);
if pos + n > self.len {
self.len = pos + n;
while bytes_remaining > 0 {
let chunk_bytes = chunk_iter.next().unwrap();
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
buf.extend_from_slice(bytes);
bytes_remaining -= bytes.len();
}
Ok(n)
debug_assert_eq!(buf.len(), token.length);
buf
}
/// Like 'write_at', but doesn't stop until the whole buffer has been written.
pub fn write_all_at(&mut self, buf: &[u8], pos: usize) -> Result<(), std::io::Error> {
let mut nwritten = 0;
pub fn write(&mut self, mut buf: &[u8]) -> ChunkToken {
let token = ChunkToken {
start: self.length,
length: buf.len(),
};
while nwritten < buf.len() {
nwritten += self.write_at(&buf[nwritten..], pos + nwritten)?;
while !buf.is_empty() {
let chunk_idx = self.length / CHUNK_SIZE;
let chunk = match self.chunks.get_mut(chunk_idx) {
Some(chunk) => chunk,
None => {
debug_assert_eq!(self.length % CHUNK_SIZE, 0);
debug_assert_eq!(chunk_idx, self.chunks.len());
self.chunks.push(heap_alloc_chunk());
self.chunks.last_mut().unwrap()
}
};
let offset = self.length % CHUNK_SIZE;
let length = buf.len().min(CHUNK_SIZE - offset);
chunk[offset..][..length].copy_from_slice(&buf[..length]);
buf = &buf[length..];
self.length += length;
}
Ok(())
token
}
}
impl Write for ChunkedBuffer {
/// Append to the end of the buffer.
///
/// Note how this interacts with 'write_at'. If you use write_at to expand the
/// buffer, the "write position" for this function changes too.
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.write_at(buf, self.len)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
Ok(())
}
fn heap_alloc_chunk() -> Box<[u8; CHUNK_SIZE]> {
vec![0u8; CHUNK_SIZE].into_boxed_slice().try_into().unwrap()
}
pub struct ChunkedBufferReader<'a> {
parent: &'a ChunkedBuffer,
pos: usize,
}
#[cfg(test)]
mod tests {
use super::{ChunkToken, ChunkedBuffer, CHUNK_SIZE};
impl<'a> Read for ChunkedBufferReader<'a> {
fn read(&mut self, dst: &mut [u8]) -> Result<usize, std::io::Error> {
if self.pos >= self.parent.len {
return Ok(0);
fn gen_bytes(len: usize) -> Vec<u8> {
let mut buf = vec![0u8; len];
for idx in 0..len {
buf[idx] = idx as u8;
}
let chunk_idx = self.pos / CHUNK_SIZE;
let chunk_off = self.pos % CHUNK_SIZE;
let len = min(
min(self.parent.len - self.pos, CHUNK_SIZE - chunk_off),
dst.len(),
);
let chunk = &self.parent.chunks[chunk_idx];
dst[0..len].copy_from_slice(&chunk[chunk_off..(chunk_off + len)]);
self.pos += len;
Ok(len)
}
}
#[test]
fn test_chunked_buffer() -> Result<(), std::io::Error> {
let mut chunked_buffer = ChunkedBuffer::default();
let mut testdata = [0u8; 10000];
for (i, b) in testdata.iter_mut().enumerate() {
*b = (i % 256) as u8;
buf
}
chunked_buffer.write_all(&testdata)?;
#[test]
fn one_item() {
fn test(data: &[u8]) {
let mut chunked_buffer = ChunkedBuffer::default();
let token = chunked_buffer.write(data);
assert_eq!(
ChunkToken {
start: 0,
length: data.len(),
},
token
);
// Test reader
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader(0);
reader.read_exact(&mut resultdata)?;
assert_eq!(resultdata, testdata);
let buf = chunked_buffer.read(&token);
assert_eq!(data, buf.as_slice());
}
// test zero-length read
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader(0);
reader.read_exact(&mut resultdata[0..0])?;
test(b"");
test(b"a");
test(b"abc");
// test reads around chunk boundaries
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader(0);
reader.read_exact(&mut resultdata[0..1])?;
reader.read_exact(&mut resultdata[1..CHUNK_SIZE])?;
assert_eq!(resultdata[0..CHUNK_SIZE], testdata[0..CHUNK_SIZE]);
test(&gen_bytes(CHUNK_SIZE - 1));
test(&gen_bytes(CHUNK_SIZE));
test(&gen_bytes(CHUNK_SIZE + 1));
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader(10);
reader.read_exact(&mut resultdata[10..CHUNK_SIZE + 10])?;
assert_eq!(
resultdata[10..CHUNK_SIZE + 10],
testdata[10..CHUNK_SIZE + 10]
);
test(&gen_bytes(2 * CHUNK_SIZE - 1));
test(&gen_bytes(2 * CHUNK_SIZE));
test(&gen_bytes(2 * CHUNK_SIZE + 1));
}
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader((CHUNK_SIZE - 1) as u64);
reader.read_exact(&mut resultdata[(CHUNK_SIZE - 1)..CHUNK_SIZE])?;
assert_eq!(resultdata[CHUNK_SIZE - 1], testdata[CHUNK_SIZE - 1]);
reader.read_exact(&mut resultdata[(CHUNK_SIZE)..(CHUNK_SIZE + 1)])?;
assert_eq!(resultdata[CHUNK_SIZE], testdata[CHUNK_SIZE]);
#[test]
fn many_items() {
let mut chunked_buffer = ChunkedBuffer::default();
let mut start = 0;
let mut expected = Vec::new();
// Read at the end
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader(9900);
reader.read_exact(&mut resultdata[9900..10000])?;
assert_eq!(resultdata[9900..10000], testdata[9900..10000]);
let mut test = |data: &[u8]| {
let token = chunked_buffer.write(data);
assert_eq!(
ChunkToken {
start,
length: data.len(),
},
token
);
// Read past the end
let mut resultdata = [0u8; 10001];
let mut reader = chunked_buffer.reader(9900);
assert!(reader.read_exact(&mut resultdata[9900..10001]).is_err());
expected.push((token, data.to_vec()));
let mut resultdata = [0u8; 10000];
let mut reader = chunked_buffer.reader(20000);
assert_eq!(reader.read(&mut resultdata)?, 0);
for (token, expected_data) in &expected {
let buf = chunked_buffer.read(&token);
assert_eq!(expected_data, buf.as_slice());
}
Ok(())
start += data.len();
};
test(b"abc");
test(b"");
test(b"a");
test(&gen_bytes(CHUNK_SIZE - 1));
test(&gen_bytes(CHUNK_SIZE));
test(&gen_bytes(CHUNK_SIZE + 1));
test(&gen_bytes(2 * CHUNK_SIZE - 1));
test(&gen_bytes(2 * CHUNK_SIZE));
test(&gen_bytes(2 * CHUNK_SIZE + 1));
}
}