Compare commits

...

4 Commits

Author SHA1 Message Date
Patrick Insinger
fa2d53366f Fix chunked_buffer clippy 2021-11-04 15:14:35 -07:00
Patrick Insinger
b26b471250 Use binary format for PageVersion
Still lots of work to do here. Also a lot of copying going on that
should be avoided.
2021-11-04 15:14:34 -07:00
Patrick Insinger
445288d7c1 Change chunked_buffer API 2021-11-04 15:09:32 -07:00
Heikki Linnakangas
556422f9bb Use a chunked byte buffer to hold page versions in in-memory layer.
Change the way the page versions are stored in an in-memory layer.
Instead of storing the original PageVersion structs, serialize them
into an expandable buffer. That has a couple of advantages: first, the
serialized form is more compact, which greatly reduces the memory
usage. Secondly, we can track the memory used more accurately.

This moves the serialization work from the checkpointer to the WAL
receiver. It also means that if a page in an in-memory layer is
accessed, it needs to be deserialized on the GetPage call. That's adds
a bit of latency, but it's not significant compared to the overhead of
replaying WAL records. The simpler memory accounting is worth it.

Per Konstantin's idea a long time ago.
2021-11-04 15:09:32 -07:00
12 changed files with 359 additions and 92 deletions

View File

@@ -1601,7 +1601,7 @@ impl LayeredTimeline {
rel,
request_lsn
);
Ok(img.clone())
Ok(Bytes::from(img.image().to_vec()))
} else {
// FIXME: this ought to be an error?
warn!("Page {} blk {} at {} not found", rel, blknum, request_lsn);
@@ -1612,7 +1612,7 @@ impl LayeredTimeline {
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init {
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() {
// FIXME: this ought to be an error?
warn!(
"Base image for page {}/{} at {} not found, but got {} WAL records",
@@ -1632,7 +1632,7 @@ impl LayeredTimeline {
rel,
blknum,
request_lsn,
data.page_img.clone(),
data.page_img.map(|page| Bytes::from(page.image().to_vec())), // FIXME
data.records,
)?;

View File

@@ -10,10 +10,10 @@ pub struct BlobRange {
size: usize,
}
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Box<[u8]>> {
let mut buf = vec![0u8; range.size];
reader.read_exact_at(&mut buf, range.offset)?;
Ok(buf)
Ok(buf.into_boxed_slice())
}
pub struct BlobWriter<W> {

View File

@@ -39,6 +39,7 @@
//!
use crate::layered_repository::blob::BlobWriter;
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::page_versions::PageVersions;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
@@ -46,6 +47,7 @@ use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use bytes::Bytes;
use log::*;
use serde::{Deserialize, Serialize};
use zenith_utils::vec_map::VecMap;
@@ -203,7 +205,8 @@ impl Layer for DeltaLayer {
.iter()
.rev();
for ((_blknum, pv_lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
let pv_bytes = read_blob(&page_version_reader, blob_range)?;
let pv = PageVersion::from_bytes(pv_bytes);
match pv {
PageVersion::Page(img) => {
@@ -213,7 +216,7 @@ impl Layer for DeltaLayer {
break;
}
PageVersion::Wal(rec) => {
let will_init = rec.will_init;
let will_init = rec.will_init();
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
@@ -309,19 +312,20 @@ impl Layer for DeltaLayer {
let mut desc = String::new();
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
let pv = PageVersion::from_bytes(buf);
match pv {
PageVersion::Page(img) => {
write!(&mut desc, " img {} bytes", img.len())?;
PageVersion::Page(page) => {
write!(&mut desc, " img {} bytes", page.image().len())?;
}
PageVersion::Wal(rec) => {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
let wal_desc =
waldecoder::describe_wal_record(&Bytes::from(rec.rec().to_vec()));
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
rec.rec().len(),
rec.will_init(),
wal_desc
)?;
}
@@ -350,14 +354,14 @@ impl DeltaLayer {
}
/// Create a new delta file, using the given page versions and relsizes.
/// The page versions are passed by an iterator; the iterator must return
/// page versions in blknum+lsn order.
/// The page versions are passed in a PageVersions struct. If 'cutoff' is
/// given, only page versions with LSN < cutoff are included.
///
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
/// This is used to write the in-memory layer to disk. The page_versions and
/// relsizes are thus passed in the same format as they are in the in-memory
/// layer, as that's expedient.
#[allow(clippy::too_many_arguments)]
pub fn create<'a>(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -365,7 +369,8 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
page_versions: &PageVersions,
cutoff: Option<Lsn>,
relsizes: VecMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
@@ -399,9 +404,10 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
let page_versions_iter = page_versions.ordered_page_version_iter(cutoff);
for (blknum, lsn, token) in page_versions_iter {
let blob_range = page_version_writer
.write_blob(page_versions.get_page_version_bytes(token).as_slice())?;
inner
.page_version_metas

View File

@@ -23,7 +23,7 @@
//!
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
Layer, Page, PageReconstructData, PageReconstructResult, SegmentTag,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
@@ -177,7 +177,7 @@ impl Layer for ImageLayer {
}
};
reconstruct_data.page_img = Some(Bytes::from(buf));
reconstruct_data.page_img = Some(Page::from_bytes(&buf));
Ok(PageReconstructResult::Complete)
}

View File

@@ -8,7 +8,6 @@ use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::ZERO_PAGE;
use crate::layered_repository::{DeltaLayer, ImageLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
@@ -24,6 +23,7 @@ use zenith_utils::vec_map::VecMap;
use zenith_utils::lsn::Lsn;
use super::page_versions::PageVersions;
use super::storage_layer::Page;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -184,16 +184,17 @@ impl Layer for InMemoryLayer {
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (entry_lsn, pv) in iter {
match pv {
PageVersion::Page(img) => {
reconstruct_data.page_img = Some(img.clone());
for (entry_lsn, token) in iter {
match inner.page_versions.get_page_version(token)? {
PageVersion::Page(page) => {
reconstruct_data.page_img = Some(page);
need_image = false;
break;
}
PageVersion::Wal(rec) => {
reconstruct_data.records.push((*entry_lsn, rec.clone()));
if rec.will_init {
let will_init = rec.will_init();
reconstruct_data.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
@@ -285,8 +286,8 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
let pv_description = match pv {
for (blknum, lsn, token) in inner.page_versions.ordered_page_version_iter(None) {
let pv_description = match inner.page_versions.get_page_version(token)? {
PageVersion::Page(_img) => "page",
PageVersion::Wal(_rec) => "wal",
};
@@ -361,7 +362,7 @@ impl InMemoryLayer {
/// Remember new page version, as a full page image
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
self.put_page_version(blknum, lsn, PageVersion::Page(img))
self.put_page_version(blknum, lsn, PageVersion::Page(Page::from_bytes(&img)))
}
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
@@ -382,8 +383,8 @@ impl InMemoryLayer {
let mut mem_usage = 0;
mem_usage += match &pv {
PageVersion::Page(img) => img.len(),
PageVersion::Wal(rec) => rec.rec.len(),
PageVersion::Page(page) => page.image().len(),
PageVersion::Wal(rec) => rec.rec().len(),
};
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
@@ -425,7 +426,7 @@ impl InMemoryLayer {
// subsequent call to initialize the gap page.
let gapstart = self.seg.segno * RELISH_SEG_SIZE + oldsize;
for gapblknum in gapstart..blknum {
let zeropv = PageVersion::Page(ZERO_PAGE.clone());
let zeropv = PageVersion::Page(Page::zero_page());
trace!(
"filling gap blk {} with zeros for write of {}",
gapblknum,
@@ -610,7 +611,8 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn_exclusive,
true,
inner.page_versions.ordered_page_version_iter(None),
&inner.page_versions,
None,
inner.segsizes.clone(),
)?;
trace!(
@@ -627,13 +629,9 @@ impl InMemoryLayer {
// Since `end_lsn` is inclusive, subtract 1.
// We want to make an ImageLayer for the last included LSN,
// so the DeltaLayer should exlcude that LSN.
// so the DeltaLayer should exclude that LSN.
let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1);
let mut page_versions = inner
.page_versions
.ordered_page_version_iter(Some(end_lsn_inclusive));
let mut delta_layers = Vec::new();
if self.start_lsn != end_lsn_inclusive {
@@ -647,7 +645,8 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn_inclusive,
false,
page_versions,
&inner.page_versions,
Some(end_lsn_inclusive),
segsizes,
)?;
delta_layers.push(delta_layer);
@@ -658,7 +657,11 @@ impl InMemoryLayer {
end_lsn_inclusive
);
} else {
assert!(page_versions.next().is_none());
assert!(inner
.page_versions
.ordered_page_version_iter(None)
.next()
.is_none());
}
drop(inner);

View File

@@ -1,13 +1,23 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use anyhow::Result;
use zenith_utils::chunked_buffer::{ChunkToken, ChunkedBuffer};
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[];
#[derive(Debug, Default)]
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
pub struct PageVersions {
map: HashMap<u32, VecMap<Lsn, ChunkToken>>,
/// The PageVersion structs are stored in a serialized format in this buffer.
/// Each serialized PageVersion is preceded by a 'u32' length field.
/// The 'map' stores offsets into this buffer.
buffer: ChunkedBuffer,
}
impl PageVersions {
pub fn append_or_update_last(
@@ -15,14 +25,16 @@ impl PageVersions {
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> Option<PageVersion> {
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, page_version).unwrap()
) -> Option<ChunkToken> {
let token = self.buffer.write(page_version.bytes());
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, token).unwrap()
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, ChunkToken)] {
self.map
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
@@ -33,8 +45,8 @@ impl PageVersions {
&self,
blknum: u32,
range: R,
) -> &[(Lsn, PageVersion)] {
self.0
) -> &[(Lsn, ChunkToken)] {
self.map
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
@@ -43,7 +55,7 @@ impl PageVersions {
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
let mut ordered_blocks: Vec<u32> = self.map.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
@@ -59,6 +71,15 @@ impl PageVersions {
cur_slice_iter: slice.iter(),
}
}
pub fn get_page_version_bytes(&self, token: &ChunkToken) -> Vec<u8> {
self.buffer.read(token)
}
pub fn get_page_version(&self, token: &ChunkToken) -> Result<PageVersion> {
let buf = self.get_page_version_bytes(token).into_boxed_slice();
Ok(PageVersion::from_bytes(buf))
}
}
pub struct OrderedPageVersionIter<'a> {
@@ -69,7 +90,7 @@ pub struct OrderedPageVersionIter<'a> {
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
cur_slice_iter: slice::Iter<'a, (Lsn, ChunkToken)>,
}
impl OrderedPageVersionIter<'_> {
@@ -83,14 +104,14 @@ impl OrderedPageVersionIter<'_> {
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
type Item = (u32, Lsn, &'a ChunkToken);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if let Some((lsn, token)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
return Some((blknum, *lsn, token));
}
}
@@ -104,7 +125,7 @@ impl<'a> Iterator for OrderedPageVersionIter<'a> {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use crate::layered_repository::storage_layer::Page;
use super::*;
@@ -114,8 +135,7 @@ mod tests {
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
let empty_page = Bytes::from_static(&[0u8; 8192]);
let empty_page_version = PageVersion::Page(empty_page);
let empty_page_version = PageVersion::Page(Page::zero_page());
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {

View File

@@ -3,10 +3,9 @@
//!
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::repository::{WALRecord, WAL_BIT};
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -51,12 +50,52 @@ impl SegmentTag {
///
/// A page version can be stored as a full page image, or as WAL record that needs
/// to be applied over the previous page version to reconstruct this version.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub enum PageVersion {
Page(Bytes),
Page(Page),
Wal(WALRecord),
}
impl PageVersion {
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
if bytes[0] & WAL_BIT != 0 {
Self::Wal(WALRecord::from_bytes(bytes))
} else {
Self::Page(Page(bytes))
}
}
pub fn bytes(&self) -> &[u8] {
match self {
Self::Page(page) => page.bytes(),
Self::Wal(wal) => wal.bytes(),
}
}
}
#[derive(Debug, Clone)]
pub struct Page(Box<[u8]>);
impl Page {
pub fn zero_page() -> Self {
// TODO optimize this
Self(vec![0u8; 1 + 8192].into_boxed_slice())
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let mut buf = vec![0u8; 1 + bytes.len()];
buf[1..].copy_from_slice(bytes);
Self(buf.into_boxed_slice())
}
pub fn image(&self) -> &[u8] {
&self.0[1..]
}
pub fn bytes(&self) -> &[u8] {
&self.0
}
}
///
/// Data needed to reconstruct a page version
///
@@ -66,7 +105,7 @@ pub enum PageVersion {
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
pub page_img: Option<Page>,
}
/// Return value from Layer::get_page_reconstruct_data

View File

@@ -2,8 +2,8 @@ use crate::relish::*;
use crate::CheckpointConfig;
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::convert::TryInto;
use std::ops::{AddAssign, Deref};
use std::sync::Arc;
use std::time::Duration;
@@ -192,14 +192,45 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
fn advance_last_record_lsn(&self, lsn: Lsn);
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WALRecord {
pub will_init: bool,
pub rec: Bytes,
#[derive(Debug, Clone)]
pub struct WALRecord(Box<[u8]>);
pub const WAL_BIT: u8 = 0b1;
const WILL_INIT_BIT: u8 = 0b10;
impl WALRecord {
pub fn new(will_init: bool, main_data_offset: u32, rec: &[u8]) -> Self {
// TODO avoid init
let mut buf = vec![0u8; 1 + 4 + rec.len()];
buf[0] = WAL_BIT | if will_init { WILL_INIT_BIT } else { 0 };
let mdo = u32::to_le_bytes(main_data_offset);
buf[1..5].copy_from_slice(&mdo);
buf[5..].copy_from_slice(rec);
Self(buf.into_boxed_slice())
}
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
Self(bytes)
}
pub fn will_init(&self) -> bool {
self.0[0] & WILL_INIT_BIT != 0
}
// Remember the offset of main_data in rec,
// so that we don't have to parse the record again.
// If record has no main_data, this offset equals rec.len().
pub main_data_offset: u32,
pub fn main_data_offset(&self) -> u32 {
u32::from_le_bytes(self.0[1..5].try_into().unwrap())
}
pub fn rec(&self) -> &[u8] {
&self.0[5..]
}
pub fn bytes(&self) -> &[u8] {
&self.0[..]
}
}
#[cfg(test)]

View File

@@ -427,11 +427,11 @@ pub fn save_decoded_record(
forknum: blk.forknum as u8,
});
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let rec = WALRecord::new(
blk.will_init || blk.apply_image,
decoded.main_data_offset as u32,
&recdata[..],
);
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
}
@@ -770,11 +770,7 @@ fn save_xact_record(
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
let rec = WALRecord {
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
timeline.put_wal_record(
lsn,
RelishTag::Slru {
@@ -886,11 +882,7 @@ fn save_multixact_create_record(
xlrec: &XlMultiXactCreate,
decoded: &DecodedWALRecord,
) -> Result<()> {
let rec = WALRecord {
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;

View File

@@ -317,7 +317,7 @@ impl PostgresRedoManager {
}
// Apply all collected WAL records
for (_lsn, record) in records {
let mut buf = record.rec.clone();
let mut buf = Bytes::from(record.rec().to_vec());
WAL_REDO_RECORD_COUNTER.inc();
@@ -328,7 +328,7 @@ impl PostgresRedoManager {
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
let skip = (record.main_data_offset() - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
@@ -574,7 +574,7 @@ impl PostgresRedoProcess {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
build_apply_record_msg(*lsn, rec.rec(), &mut writebuf);
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);

View File

@@ -0,0 +1,173 @@
use std::convert::TryInto;
const CHUNK_SIZE: usize = 1024;
///
/// ChunkedBuffer is an expandable byte buffer. You can append to the end of it, and you
/// can read at any byte position. The advantage over a plain Vec<u8> is that the
/// buffer consists of smaller chunks, so that a big buffer doesn't require one huge
/// allocation, and expanding doesn't require copying all the data.
///
#[derive(Debug, Default)]
pub struct ChunkedBuffer {
// Clippy considers it unnecessary to have Box here, since the vector is stored
// on the heap anyway. But we have our reasons for that: we want each chunk to
// be allocated separately, so that we don't require one huge allocation, and so
// that expanding the array doesn't require copying all of the data. (Clippy
// knows about that and doesn't complain if the array is large enough, 4096
// bytes is the default threshold. Our chunk size is smaller than that so that
// heuristic doesn't save us.)
#[allow(clippy::vec_box)]
chunks: Vec<Box<[u8; CHUNK_SIZE]>>,
length: usize,
}
#[derive(Debug, PartialEq, Eq)]
pub struct ChunkToken {
start: usize,
length: usize,
}
impl ChunkedBuffer {
pub fn read(&self, token: &ChunkToken) -> Vec<u8> {
let mut buf = Vec::with_capacity(token.length);
let chunk_idx = token.start / CHUNK_SIZE;
let mut chunk_iter = self.chunks[chunk_idx..].iter();
let mut bytes_remaining = token.length;
if bytes_remaining > 0 {
let start_offset = token.start % CHUNK_SIZE;
let chunk_bytes = &chunk_iter.next().unwrap()[start_offset..];
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
buf.extend_from_slice(bytes);
bytes_remaining -= bytes.len();
}
while bytes_remaining > 0 {
let chunk_bytes = chunk_iter.next().unwrap();
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
buf.extend_from_slice(bytes);
bytes_remaining -= bytes.len();
}
debug_assert_eq!(buf.len(), token.length);
buf
}
pub fn write(&mut self, mut buf: &[u8]) -> ChunkToken {
let token = ChunkToken {
start: self.length,
length: buf.len(),
};
while !buf.is_empty() {
let chunk_idx = self.length / CHUNK_SIZE;
let chunk = match self.chunks.get_mut(chunk_idx) {
Some(chunk) => chunk,
None => {
debug_assert_eq!(self.length % CHUNK_SIZE, 0);
debug_assert_eq!(chunk_idx, self.chunks.len());
self.chunks.push(heap_alloc_chunk());
self.chunks.last_mut().unwrap()
}
};
let offset = self.length % CHUNK_SIZE;
let length = buf.len().min(CHUNK_SIZE - offset);
chunk[offset..][..length].copy_from_slice(&buf[..length]);
buf = &buf[length..];
self.length += length;
}
token
}
}
fn heap_alloc_chunk() -> Box<[u8; CHUNK_SIZE]> {
vec![0u8; CHUNK_SIZE].into_boxed_slice().try_into().unwrap()
}
#[cfg(test)]
mod tests {
use super::{ChunkToken, ChunkedBuffer, CHUNK_SIZE};
fn gen_bytes(len: usize) -> Vec<u8> {
let mut buf = vec![0u8; len];
for (idx, val) in buf.iter_mut().enumerate() {
*val = idx as u8;
}
buf
}
#[test]
fn one_item() {
fn test(data: &[u8]) {
let mut chunked_buffer = ChunkedBuffer::default();
let token = chunked_buffer.write(data);
assert_eq!(
ChunkToken {
start: 0,
length: data.len(),
},
token
);
let buf = chunked_buffer.read(&token);
assert_eq!(data, buf.as_slice());
}
test(b"");
test(b"a");
test(b"abc");
test(&gen_bytes(CHUNK_SIZE - 1));
test(&gen_bytes(CHUNK_SIZE));
test(&gen_bytes(CHUNK_SIZE + 1));
test(&gen_bytes(2 * CHUNK_SIZE - 1));
test(&gen_bytes(2 * CHUNK_SIZE));
test(&gen_bytes(2 * CHUNK_SIZE + 1));
}
#[test]
fn many_items() {
let mut chunked_buffer = ChunkedBuffer::default();
let mut start = 0;
let mut expected = Vec::new();
let mut test = |data: &[u8]| {
let token = chunked_buffer.write(data);
assert_eq!(
ChunkToken {
start,
length: data.len(),
},
token
);
expected.push((token, data.to_vec()));
for (token, expected_data) in &expected {
let buf = chunked_buffer.read(token);
assert_eq!(expected_data, buf.as_slice());
}
start += data.len();
};
test(b"abc");
test(b"");
test(b"a");
test(&gen_bytes(CHUNK_SIZE - 1));
test(&gen_bytes(CHUNK_SIZE));
test(&gen_bytes(CHUNK_SIZE + 1));
test(&gen_bytes(2 * CHUNK_SIZE - 1));
test(&gen_bytes(2 * CHUNK_SIZE));
test(&gen_bytes(2 * CHUNK_SIZE + 1));
}
}

View File

@@ -11,6 +11,9 @@ pub mod seqwait;
/// append only ordered map implemented with a Vec
pub mod vec_map;
/// expandable byte buffer consisting of fixed-size chunks
pub mod chunked_buffer;
// Async version of SeqWait. Currently unused.
// pub mod seqwait_async;