mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 02:30:37 +00:00
Compare commits
4 Commits
mx_offset_
...
layer-chun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fa2d53366f | ||
|
|
b26b471250 | ||
|
|
445288d7c1 | ||
|
|
556422f9bb |
@@ -1601,7 +1601,7 @@ impl LayeredTimeline {
|
|||||||
rel,
|
rel,
|
||||||
request_lsn
|
request_lsn
|
||||||
);
|
);
|
||||||
Ok(img.clone())
|
Ok(Bytes::from(img.image().to_vec()))
|
||||||
} else {
|
} else {
|
||||||
// FIXME: this ought to be an error?
|
// FIXME: this ought to be an error?
|
||||||
warn!("Page {} blk {} at {} not found", rel, blknum, request_lsn);
|
warn!("Page {} blk {} at {} not found", rel, blknum, request_lsn);
|
||||||
@@ -1612,7 +1612,7 @@ impl LayeredTimeline {
|
|||||||
//
|
//
|
||||||
// If we don't have a base image, then the oldest WAL record better initialize
|
// If we don't have a base image, then the oldest WAL record better initialize
|
||||||
// the page
|
// the page
|
||||||
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init {
|
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() {
|
||||||
// FIXME: this ought to be an error?
|
// FIXME: this ought to be an error?
|
||||||
warn!(
|
warn!(
|
||||||
"Base image for page {}/{} at {} not found, but got {} WAL records",
|
"Base image for page {}/{} at {} not found, but got {} WAL records",
|
||||||
@@ -1632,7 +1632,7 @@ impl LayeredTimeline {
|
|||||||
rel,
|
rel,
|
||||||
blknum,
|
blknum,
|
||||||
request_lsn,
|
request_lsn,
|
||||||
data.page_img.clone(),
|
data.page_img.map(|page| Bytes::from(page.image().to_vec())), // FIXME
|
||||||
data.records,
|
data.records,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|||||||
@@ -10,10 +10,10 @@ pub struct BlobRange {
|
|||||||
size: usize,
|
size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {
|
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Box<[u8]>> {
|
||||||
let mut buf = vec![0u8; range.size];
|
let mut buf = vec![0u8; range.size];
|
||||||
reader.read_exact_at(&mut buf, range.offset)?;
|
reader.read_exact_at(&mut buf, range.offset)?;
|
||||||
Ok(buf)
|
Ok(buf.into_boxed_slice())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BlobWriter<W> {
|
pub struct BlobWriter<W> {
|
||||||
|
|||||||
@@ -39,6 +39,7 @@
|
|||||||
//!
|
//!
|
||||||
use crate::layered_repository::blob::BlobWriter;
|
use crate::layered_repository::blob::BlobWriter;
|
||||||
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
|
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
|
||||||
|
use crate::layered_repository::page_versions::PageVersions;
|
||||||
use crate::layered_repository::storage_layer::{
|
use crate::layered_repository::storage_layer::{
|
||||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
|
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
|
||||||
};
|
};
|
||||||
@@ -46,6 +47,7 @@ use crate::waldecoder;
|
|||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use crate::{ZTenantId, ZTimelineId};
|
use crate::{ZTenantId, ZTimelineId};
|
||||||
use anyhow::{bail, ensure, Result};
|
use anyhow::{bail, ensure, Result};
|
||||||
|
use bytes::Bytes;
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use zenith_utils::vec_map::VecMap;
|
use zenith_utils::vec_map::VecMap;
|
||||||
@@ -203,7 +205,8 @@ impl Layer for DeltaLayer {
|
|||||||
.iter()
|
.iter()
|
||||||
.rev();
|
.rev();
|
||||||
for ((_blknum, pv_lsn), blob_range) in iter {
|
for ((_blknum, pv_lsn), blob_range) in iter {
|
||||||
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
|
let pv_bytes = read_blob(&page_version_reader, blob_range)?;
|
||||||
|
let pv = PageVersion::from_bytes(pv_bytes);
|
||||||
|
|
||||||
match pv {
|
match pv {
|
||||||
PageVersion::Page(img) => {
|
PageVersion::Page(img) => {
|
||||||
@@ -213,7 +216,7 @@ impl Layer for DeltaLayer {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
PageVersion::Wal(rec) => {
|
PageVersion::Wal(rec) => {
|
||||||
let will_init = rec.will_init;
|
let will_init = rec.will_init();
|
||||||
reconstruct_data.records.push((*pv_lsn, rec));
|
reconstruct_data.records.push((*pv_lsn, rec));
|
||||||
if will_init {
|
if will_init {
|
||||||
// This WAL record initializes the page, so no need to go further back
|
// This WAL record initializes the page, so no need to go further back
|
||||||
@@ -309,19 +312,20 @@ impl Layer for DeltaLayer {
|
|||||||
let mut desc = String::new();
|
let mut desc = String::new();
|
||||||
|
|
||||||
let buf = read_blob(&chapter, blob_range)?;
|
let buf = read_blob(&chapter, blob_range)?;
|
||||||
let pv = PageVersion::des(&buf)?;
|
let pv = PageVersion::from_bytes(buf);
|
||||||
|
|
||||||
match pv {
|
match pv {
|
||||||
PageVersion::Page(img) => {
|
PageVersion::Page(page) => {
|
||||||
write!(&mut desc, " img {} bytes", img.len())?;
|
write!(&mut desc, " img {} bytes", page.image().len())?;
|
||||||
}
|
}
|
||||||
PageVersion::Wal(rec) => {
|
PageVersion::Wal(rec) => {
|
||||||
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
|
let wal_desc =
|
||||||
|
waldecoder::describe_wal_record(&Bytes::from(rec.rec().to_vec()));
|
||||||
write!(
|
write!(
|
||||||
&mut desc,
|
&mut desc,
|
||||||
" rec {} bytes will_init: {} {}",
|
" rec {} bytes will_init: {} {}",
|
||||||
rec.rec.len(),
|
rec.rec().len(),
|
||||||
rec.will_init,
|
rec.will_init(),
|
||||||
wal_desc
|
wal_desc
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@@ -350,14 +354,14 @@ impl DeltaLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new delta file, using the given page versions and relsizes.
|
/// Create a new delta file, using the given page versions and relsizes.
|
||||||
/// The page versions are passed by an iterator; the iterator must return
|
/// The page versions are passed in a PageVersions struct. If 'cutoff' is
|
||||||
/// page versions in blknum+lsn order.
|
/// given, only page versions with LSN < cutoff are included.
|
||||||
///
|
///
|
||||||
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
|
/// This is used to write the in-memory layer to disk. The page_versions and
|
||||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
/// relsizes are thus passed in the same format as they are in the in-memory
|
||||||
/// expedient.
|
/// layer, as that's expedient.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn create<'a>(
|
pub fn create(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
@@ -365,7 +369,8 @@ impl DeltaLayer {
|
|||||||
start_lsn: Lsn,
|
start_lsn: Lsn,
|
||||||
end_lsn: Lsn,
|
end_lsn: Lsn,
|
||||||
dropped: bool,
|
dropped: bool,
|
||||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
page_versions: &PageVersions,
|
||||||
|
cutoff: Option<Lsn>,
|
||||||
relsizes: VecMap<Lsn, u32>,
|
relsizes: VecMap<Lsn, u32>,
|
||||||
) -> Result<DeltaLayer> {
|
) -> Result<DeltaLayer> {
|
||||||
if seg.rel.is_blocky() {
|
if seg.rel.is_blocky() {
|
||||||
@@ -399,9 +404,10 @@ impl DeltaLayer {
|
|||||||
|
|
||||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||||
|
|
||||||
for (blknum, lsn, page_version) in page_versions {
|
let page_versions_iter = page_versions.ordered_page_version_iter(cutoff);
|
||||||
let buf = PageVersion::ser(page_version)?;
|
for (blknum, lsn, token) in page_versions_iter {
|
||||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
let blob_range = page_version_writer
|
||||||
|
.write_blob(page_versions.get_page_version_bytes(token).as_slice())?;
|
||||||
|
|
||||||
inner
|
inner
|
||||||
.page_version_metas
|
.page_version_metas
|
||||||
|
|||||||
@@ -23,7 +23,7 @@
|
|||||||
//!
|
//!
|
||||||
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
|
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
|
||||||
use crate::layered_repository::storage_layer::{
|
use crate::layered_repository::storage_layer::{
|
||||||
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
|
Layer, Page, PageReconstructData, PageReconstructResult, SegmentTag,
|
||||||
};
|
};
|
||||||
use crate::layered_repository::LayeredTimeline;
|
use crate::layered_repository::LayeredTimeline;
|
||||||
use crate::layered_repository::RELISH_SEG_SIZE;
|
use crate::layered_repository::RELISH_SEG_SIZE;
|
||||||
@@ -177,7 +177,7 @@ impl Layer for ImageLayer {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
reconstruct_data.page_img = Some(Bytes::from(buf));
|
reconstruct_data.page_img = Some(Page::from_bytes(&buf));
|
||||||
Ok(PageReconstructResult::Complete)
|
Ok(PageReconstructResult::Complete)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ use crate::layered_repository::storage_layer::{
|
|||||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||||
};
|
};
|
||||||
use crate::layered_repository::LayeredTimeline;
|
use crate::layered_repository::LayeredTimeline;
|
||||||
use crate::layered_repository::ZERO_PAGE;
|
|
||||||
use crate::layered_repository::{DeltaLayer, ImageLayer};
|
use crate::layered_repository::{DeltaLayer, ImageLayer};
|
||||||
use crate::repository::WALRecord;
|
use crate::repository::WALRecord;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
@@ -24,6 +23,7 @@ use zenith_utils::vec_map::VecMap;
|
|||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
use super::page_versions::PageVersions;
|
use super::page_versions::PageVersions;
|
||||||
|
use super::storage_layer::Page;
|
||||||
|
|
||||||
pub struct InMemoryLayer {
|
pub struct InMemoryLayer {
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
@@ -184,16 +184,17 @@ impl Layer for InMemoryLayer {
|
|||||||
.get_block_lsn_range(blknum, ..=lsn)
|
.get_block_lsn_range(blknum, ..=lsn)
|
||||||
.iter()
|
.iter()
|
||||||
.rev();
|
.rev();
|
||||||
for (entry_lsn, pv) in iter {
|
for (entry_lsn, token) in iter {
|
||||||
match pv {
|
match inner.page_versions.get_page_version(token)? {
|
||||||
PageVersion::Page(img) => {
|
PageVersion::Page(page) => {
|
||||||
reconstruct_data.page_img = Some(img.clone());
|
reconstruct_data.page_img = Some(page);
|
||||||
need_image = false;
|
need_image = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
PageVersion::Wal(rec) => {
|
PageVersion::Wal(rec) => {
|
||||||
reconstruct_data.records.push((*entry_lsn, rec.clone()));
|
let will_init = rec.will_init();
|
||||||
if rec.will_init {
|
reconstruct_data.records.push((*entry_lsn, rec));
|
||||||
|
if will_init {
|
||||||
// This WAL record initializes the page, so no need to go further back
|
// This WAL record initializes the page, so no need to go further back
|
||||||
need_image = false;
|
need_image = false;
|
||||||
break;
|
break;
|
||||||
@@ -285,8 +286,8 @@ impl Layer for InMemoryLayer {
|
|||||||
println!("segsizes {}: {}", k, v);
|
println!("segsizes {}: {}", k, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
|
for (blknum, lsn, token) in inner.page_versions.ordered_page_version_iter(None) {
|
||||||
let pv_description = match pv {
|
let pv_description = match inner.page_versions.get_page_version(token)? {
|
||||||
PageVersion::Page(_img) => "page",
|
PageVersion::Page(_img) => "page",
|
||||||
PageVersion::Wal(_rec) => "wal",
|
PageVersion::Wal(_rec) => "wal",
|
||||||
};
|
};
|
||||||
@@ -361,7 +362,7 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
/// Remember new page version, as a full page image
|
/// Remember new page version, as a full page image
|
||||||
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
|
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
|
||||||
self.put_page_version(blknum, lsn, PageVersion::Page(img))
|
self.put_page_version(blknum, lsn, PageVersion::Page(Page::from_bytes(&img)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||||
@@ -382,8 +383,8 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
let mut mem_usage = 0;
|
let mut mem_usage = 0;
|
||||||
mem_usage += match &pv {
|
mem_usage += match &pv {
|
||||||
PageVersion::Page(img) => img.len(),
|
PageVersion::Page(page) => page.image().len(),
|
||||||
PageVersion::Wal(rec) => rec.rec.len(),
|
PageVersion::Wal(rec) => rec.rec().len(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
|
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
|
||||||
@@ -425,7 +426,7 @@ impl InMemoryLayer {
|
|||||||
// subsequent call to initialize the gap page.
|
// subsequent call to initialize the gap page.
|
||||||
let gapstart = self.seg.segno * RELISH_SEG_SIZE + oldsize;
|
let gapstart = self.seg.segno * RELISH_SEG_SIZE + oldsize;
|
||||||
for gapblknum in gapstart..blknum {
|
for gapblknum in gapstart..blknum {
|
||||||
let zeropv = PageVersion::Page(ZERO_PAGE.clone());
|
let zeropv = PageVersion::Page(Page::zero_page());
|
||||||
trace!(
|
trace!(
|
||||||
"filling gap blk {} with zeros for write of {}",
|
"filling gap blk {} with zeros for write of {}",
|
||||||
gapblknum,
|
gapblknum,
|
||||||
@@ -610,7 +611,8 @@ impl InMemoryLayer {
|
|||||||
self.start_lsn,
|
self.start_lsn,
|
||||||
end_lsn_exclusive,
|
end_lsn_exclusive,
|
||||||
true,
|
true,
|
||||||
inner.page_versions.ordered_page_version_iter(None),
|
&inner.page_versions,
|
||||||
|
None,
|
||||||
inner.segsizes.clone(),
|
inner.segsizes.clone(),
|
||||||
)?;
|
)?;
|
||||||
trace!(
|
trace!(
|
||||||
@@ -627,13 +629,9 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
// Since `end_lsn` is inclusive, subtract 1.
|
// Since `end_lsn` is inclusive, subtract 1.
|
||||||
// We want to make an ImageLayer for the last included LSN,
|
// We want to make an ImageLayer for the last included LSN,
|
||||||
// so the DeltaLayer should exlcude that LSN.
|
// so the DeltaLayer should exclude that LSN.
|
||||||
let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1);
|
let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1);
|
||||||
|
|
||||||
let mut page_versions = inner
|
|
||||||
.page_versions
|
|
||||||
.ordered_page_version_iter(Some(end_lsn_inclusive));
|
|
||||||
|
|
||||||
let mut delta_layers = Vec::new();
|
let mut delta_layers = Vec::new();
|
||||||
|
|
||||||
if self.start_lsn != end_lsn_inclusive {
|
if self.start_lsn != end_lsn_inclusive {
|
||||||
@@ -647,7 +645,8 @@ impl InMemoryLayer {
|
|||||||
self.start_lsn,
|
self.start_lsn,
|
||||||
end_lsn_inclusive,
|
end_lsn_inclusive,
|
||||||
false,
|
false,
|
||||||
page_versions,
|
&inner.page_versions,
|
||||||
|
Some(end_lsn_inclusive),
|
||||||
segsizes,
|
segsizes,
|
||||||
)?;
|
)?;
|
||||||
delta_layers.push(delta_layer);
|
delta_layers.push(delta_layer);
|
||||||
@@ -658,7 +657,11 @@ impl InMemoryLayer {
|
|||||||
end_lsn_inclusive
|
end_lsn_inclusive
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
assert!(page_versions.next().is_none());
|
assert!(inner
|
||||||
|
.page_versions
|
||||||
|
.ordered_page_version_iter(None)
|
||||||
|
.next()
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(inner);
|
drop(inner);
|
||||||
|
|||||||
@@ -1,13 +1,23 @@
|
|||||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
use std::{collections::HashMap, ops::RangeBounds, slice};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
use zenith_utils::chunked_buffer::{ChunkToken, ChunkedBuffer};
|
||||||
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
|
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
|
||||||
|
|
||||||
use super::storage_layer::PageVersion;
|
use super::storage_layer::PageVersion;
|
||||||
|
|
||||||
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
|
const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[];
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
|
pub struct PageVersions {
|
||||||
|
map: HashMap<u32, VecMap<Lsn, ChunkToken>>,
|
||||||
|
|
||||||
|
/// The PageVersion structs are stored in a serialized format in this buffer.
|
||||||
|
/// Each serialized PageVersion is preceded by a 'u32' length field.
|
||||||
|
/// The 'map' stores offsets into this buffer.
|
||||||
|
buffer: ChunkedBuffer,
|
||||||
|
}
|
||||||
|
|
||||||
impl PageVersions {
|
impl PageVersions {
|
||||||
pub fn append_or_update_last(
|
pub fn append_or_update_last(
|
||||||
@@ -15,14 +25,16 @@ impl PageVersions {
|
|||||||
blknum: u32,
|
blknum: u32,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
page_version: PageVersion,
|
page_version: PageVersion,
|
||||||
) -> Option<PageVersion> {
|
) -> Option<ChunkToken> {
|
||||||
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
|
let token = self.buffer.write(page_version.bytes());
|
||||||
map.append_or_update_last(lsn, page_version).unwrap()
|
|
||||||
|
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
|
||||||
|
map.append_or_update_last(lsn, token).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all [`PageVersion`]s in a block
|
/// Get all [`PageVersion`]s in a block
|
||||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, ChunkToken)] {
|
||||||
self.0
|
self.map
|
||||||
.get(&blknum)
|
.get(&blknum)
|
||||||
.map(VecMap::as_slice)
|
.map(VecMap::as_slice)
|
||||||
.unwrap_or(EMPTY_SLICE)
|
.unwrap_or(EMPTY_SLICE)
|
||||||
@@ -33,8 +45,8 @@ impl PageVersions {
|
|||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
range: R,
|
range: R,
|
||||||
) -> &[(Lsn, PageVersion)] {
|
) -> &[(Lsn, ChunkToken)] {
|
||||||
self.0
|
self.map
|
||||||
.get(&blknum)
|
.get(&blknum)
|
||||||
.map(|vec_map| vec_map.slice_range(range))
|
.map(|vec_map| vec_map.slice_range(range))
|
||||||
.unwrap_or(EMPTY_SLICE)
|
.unwrap_or(EMPTY_SLICE)
|
||||||
@@ -43,7 +55,7 @@ impl PageVersions {
|
|||||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
||||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
||||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
||||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
let mut ordered_blocks: Vec<u32> = self.map.keys().cloned().collect();
|
||||||
ordered_blocks.sort_unstable();
|
ordered_blocks.sort_unstable();
|
||||||
|
|
||||||
let slice = ordered_blocks
|
let slice = ordered_blocks
|
||||||
@@ -59,6 +71,15 @@ impl PageVersions {
|
|||||||
cur_slice_iter: slice.iter(),
|
cur_slice_iter: slice.iter(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_page_version_bytes(&self, token: &ChunkToken) -> Vec<u8> {
|
||||||
|
self.buffer.read(token)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_page_version(&self, token: &ChunkToken) -> Result<PageVersion> {
|
||||||
|
let buf = self.get_page_version_bytes(token).into_boxed_slice();
|
||||||
|
Ok(PageVersion::from_bytes(buf))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct OrderedPageVersionIter<'a> {
|
pub struct OrderedPageVersionIter<'a> {
|
||||||
@@ -69,7 +90,7 @@ pub struct OrderedPageVersionIter<'a> {
|
|||||||
|
|
||||||
cutoff_lsn: Option<Lsn>,
|
cutoff_lsn: Option<Lsn>,
|
||||||
|
|
||||||
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
|
cur_slice_iter: slice::Iter<'a, (Lsn, ChunkToken)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OrderedPageVersionIter<'_> {
|
impl OrderedPageVersionIter<'_> {
|
||||||
@@ -83,14 +104,14 @@ impl OrderedPageVersionIter<'_> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
||||||
type Item = (u32, Lsn, &'a PageVersion);
|
type Item = (u32, Lsn, &'a ChunkToken);
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
loop {
|
loop {
|
||||||
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
|
if let Some((lsn, token)) = self.cur_slice_iter.next() {
|
||||||
if self.is_lsn_before_cutoff(lsn) {
|
if self.is_lsn_before_cutoff(lsn) {
|
||||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||||
return Some((blknum, *lsn, page_version));
|
return Some((blknum, *lsn, token));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -104,7 +125,7 @@ impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use bytes::Bytes;
|
use crate::layered_repository::storage_layer::Page;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
@@ -114,8 +135,7 @@ mod tests {
|
|||||||
const BLOCKS: u32 = 1000;
|
const BLOCKS: u32 = 1000;
|
||||||
const LSNS: u64 = 50;
|
const LSNS: u64 = 50;
|
||||||
|
|
||||||
let empty_page = Bytes::from_static(&[0u8; 8192]);
|
let empty_page_version = PageVersion::Page(Page::zero_page());
|
||||||
let empty_page_version = PageVersion::Page(empty_page);
|
|
||||||
|
|
||||||
for blknum in 0..BLOCKS {
|
for blknum in 0..BLOCKS {
|
||||||
for lsn in 0..LSNS {
|
for lsn in 0..LSNS {
|
||||||
|
|||||||
@@ -3,10 +3,9 @@
|
|||||||
//!
|
//!
|
||||||
|
|
||||||
use crate::relish::RelishTag;
|
use crate::relish::RelishTag;
|
||||||
use crate::repository::WALRecord;
|
use crate::repository::{WALRecord, WAL_BIT};
|
||||||
use crate::{ZTenantId, ZTimelineId};
|
use crate::{ZTenantId, ZTimelineId};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use bytes::Bytes;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -51,12 +50,52 @@ impl SegmentTag {
|
|||||||
///
|
///
|
||||||
/// A page version can be stored as a full page image, or as WAL record that needs
|
/// A page version can be stored as a full page image, or as WAL record that needs
|
||||||
/// to be applied over the previous page version to reconstruct this version.
|
/// to be applied over the previous page version to reconstruct this version.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum PageVersion {
|
pub enum PageVersion {
|
||||||
Page(Bytes),
|
Page(Page),
|
||||||
Wal(WALRecord),
|
Wal(WALRecord),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PageVersion {
|
||||||
|
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
|
||||||
|
if bytes[0] & WAL_BIT != 0 {
|
||||||
|
Self::Wal(WALRecord::from_bytes(bytes))
|
||||||
|
} else {
|
||||||
|
Self::Page(Page(bytes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes(&self) -> &[u8] {
|
||||||
|
match self {
|
||||||
|
Self::Page(page) => page.bytes(),
|
||||||
|
Self::Wal(wal) => wal.bytes(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Page(Box<[u8]>);
|
||||||
|
|
||||||
|
impl Page {
|
||||||
|
pub fn zero_page() -> Self {
|
||||||
|
// TODO optimize this
|
||||||
|
Self(vec![0u8; 1 + 8192].into_boxed_slice())
|
||||||
|
}
|
||||||
|
pub fn from_bytes(bytes: &[u8]) -> Self {
|
||||||
|
let mut buf = vec![0u8; 1 + bytes.len()];
|
||||||
|
buf[1..].copy_from_slice(bytes);
|
||||||
|
Self(buf.into_boxed_slice())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn image(&self) -> &[u8] {
|
||||||
|
&self.0[1..]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes(&self) -> &[u8] {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Data needed to reconstruct a page version
|
/// Data needed to reconstruct a page version
|
||||||
///
|
///
|
||||||
@@ -66,7 +105,7 @@ pub enum PageVersion {
|
|||||||
///
|
///
|
||||||
pub struct PageReconstructData {
|
pub struct PageReconstructData {
|
||||||
pub records: Vec<(Lsn, WALRecord)>,
|
pub records: Vec<(Lsn, WALRecord)>,
|
||||||
pub page_img: Option<Bytes>,
|
pub page_img: Option<Page>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return value from Layer::get_page_reconstruct_data
|
/// Return value from Layer::get_page_reconstruct_data
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ use crate::relish::*;
|
|||||||
use crate::CheckpointConfig;
|
use crate::CheckpointConfig;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::ops::{AddAssign, Deref};
|
use std::ops::{AddAssign, Deref};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -192,14 +192,45 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
|
|||||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct WALRecord {
|
pub struct WALRecord(Box<[u8]>);
|
||||||
pub will_init: bool,
|
|
||||||
pub rec: Bytes,
|
pub const WAL_BIT: u8 = 0b1;
|
||||||
|
const WILL_INIT_BIT: u8 = 0b10;
|
||||||
|
|
||||||
|
impl WALRecord {
|
||||||
|
pub fn new(will_init: bool, main_data_offset: u32, rec: &[u8]) -> Self {
|
||||||
|
// TODO avoid init
|
||||||
|
let mut buf = vec![0u8; 1 + 4 + rec.len()];
|
||||||
|
buf[0] = WAL_BIT | if will_init { WILL_INIT_BIT } else { 0 };
|
||||||
|
let mdo = u32::to_le_bytes(main_data_offset);
|
||||||
|
buf[1..5].copy_from_slice(&mdo);
|
||||||
|
buf[5..].copy_from_slice(rec);
|
||||||
|
Self(buf.into_boxed_slice())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
|
||||||
|
Self(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn will_init(&self) -> bool {
|
||||||
|
self.0[0] & WILL_INIT_BIT != 0
|
||||||
|
}
|
||||||
|
|
||||||
// Remember the offset of main_data in rec,
|
// Remember the offset of main_data in rec,
|
||||||
// so that we don't have to parse the record again.
|
// so that we don't have to parse the record again.
|
||||||
// If record has no main_data, this offset equals rec.len().
|
// If record has no main_data, this offset equals rec.len().
|
||||||
pub main_data_offset: u32,
|
pub fn main_data_offset(&self) -> u32 {
|
||||||
|
u32::from_le_bytes(self.0[1..5].try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rec(&self) -> &[u8] {
|
||||||
|
&self.0[5..]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes(&self) -> &[u8] {
|
||||||
|
&self.0[..]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -427,11 +427,11 @@ pub fn save_decoded_record(
|
|||||||
forknum: blk.forknum as u8,
|
forknum: blk.forknum as u8,
|
||||||
});
|
});
|
||||||
|
|
||||||
let rec = WALRecord {
|
let rec = WALRecord::new(
|
||||||
will_init: blk.will_init || blk.apply_image,
|
blk.will_init || blk.apply_image,
|
||||||
rec: recdata.clone(),
|
decoded.main_data_offset as u32,
|
||||||
main_data_offset: decoded.main_data_offset as u32,
|
&recdata[..],
|
||||||
};
|
);
|
||||||
|
|
||||||
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
|
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
|
||||||
}
|
}
|
||||||
@@ -770,11 +770,7 @@ fn save_xact_record(
|
|||||||
|
|
||||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
let rec = WALRecord {
|
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
|
||||||
will_init: false,
|
|
||||||
rec: decoded.record.clone(),
|
|
||||||
main_data_offset: decoded.main_data_offset as u32,
|
|
||||||
};
|
|
||||||
timeline.put_wal_record(
|
timeline.put_wal_record(
|
||||||
lsn,
|
lsn,
|
||||||
RelishTag::Slru {
|
RelishTag::Slru {
|
||||||
@@ -886,11 +882,7 @@ fn save_multixact_create_record(
|
|||||||
xlrec: &XlMultiXactCreate,
|
xlrec: &XlMultiXactCreate,
|
||||||
decoded: &DecodedWALRecord,
|
decoded: &DecodedWALRecord,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let rec = WALRecord {
|
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
|
||||||
will_init: false,
|
|
||||||
rec: decoded.record.clone(),
|
|
||||||
main_data_offset: decoded.main_data_offset as u32,
|
|
||||||
};
|
|
||||||
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
|
|||||||
@@ -317,7 +317,7 @@ impl PostgresRedoManager {
|
|||||||
}
|
}
|
||||||
// Apply all collected WAL records
|
// Apply all collected WAL records
|
||||||
for (_lsn, record) in records {
|
for (_lsn, record) in records {
|
||||||
let mut buf = record.rec.clone();
|
let mut buf = Bytes::from(record.rec().to_vec());
|
||||||
|
|
||||||
WAL_REDO_RECORD_COUNTER.inc();
|
WAL_REDO_RECORD_COUNTER.inc();
|
||||||
|
|
||||||
@@ -328,7 +328,7 @@ impl PostgresRedoManager {
|
|||||||
//move to main data
|
//move to main data
|
||||||
// TODO probably, we should store some records in our special format
|
// TODO probably, we should store some records in our special format
|
||||||
// to avoid this weird parsing on replay
|
// to avoid this weird parsing on replay
|
||||||
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
|
let skip = (record.main_data_offset() - pg_constants::SIZEOF_XLOGRECORD) as usize;
|
||||||
if buf.remaining() > skip {
|
if buf.remaining() > skip {
|
||||||
buf.advance(skip);
|
buf.advance(skip);
|
||||||
}
|
}
|
||||||
@@ -574,7 +574,7 @@ impl PostgresRedoProcess {
|
|||||||
build_push_page_msg(tag, &img, &mut writebuf);
|
build_push_page_msg(tag, &img, &mut writebuf);
|
||||||
}
|
}
|
||||||
for (lsn, rec) in records.iter() {
|
for (lsn, rec) in records.iter() {
|
||||||
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
|
build_apply_record_msg(*lsn, rec.rec(), &mut writebuf);
|
||||||
}
|
}
|
||||||
build_get_page_msg(tag, &mut writebuf);
|
build_get_page_msg(tag, &mut writebuf);
|
||||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||||
|
|||||||
173
zenith_utils/src/chunked_buffer.rs
Normal file
173
zenith_utils/src/chunked_buffer.rs
Normal file
@@ -0,0 +1,173 @@
|
|||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
const CHUNK_SIZE: usize = 1024;
|
||||||
|
|
||||||
|
///
|
||||||
|
/// ChunkedBuffer is an expandable byte buffer. You can append to the end of it, and you
|
||||||
|
/// can read at any byte position. The advantage over a plain Vec<u8> is that the
|
||||||
|
/// buffer consists of smaller chunks, so that a big buffer doesn't require one huge
|
||||||
|
/// allocation, and expanding doesn't require copying all the data.
|
||||||
|
///
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct ChunkedBuffer {
|
||||||
|
// Clippy considers it unnecessary to have Box here, since the vector is stored
|
||||||
|
// on the heap anyway. But we have our reasons for that: we want each chunk to
|
||||||
|
// be allocated separately, so that we don't require one huge allocation, and so
|
||||||
|
// that expanding the array doesn't require copying all of the data. (Clippy
|
||||||
|
// knows about that and doesn't complain if the array is large enough, 4096
|
||||||
|
// bytes is the default threshold. Our chunk size is smaller than that so that
|
||||||
|
// heuristic doesn't save us.)
|
||||||
|
#[allow(clippy::vec_box)]
|
||||||
|
chunks: Vec<Box<[u8; CHUNK_SIZE]>>,
|
||||||
|
|
||||||
|
length: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
pub struct ChunkToken {
|
||||||
|
start: usize,
|
||||||
|
length: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChunkedBuffer {
|
||||||
|
pub fn read(&self, token: &ChunkToken) -> Vec<u8> {
|
||||||
|
let mut buf = Vec::with_capacity(token.length);
|
||||||
|
|
||||||
|
let chunk_idx = token.start / CHUNK_SIZE;
|
||||||
|
let mut chunk_iter = self.chunks[chunk_idx..].iter();
|
||||||
|
let mut bytes_remaining = token.length;
|
||||||
|
if bytes_remaining > 0 {
|
||||||
|
let start_offset = token.start % CHUNK_SIZE;
|
||||||
|
let chunk_bytes = &chunk_iter.next().unwrap()[start_offset..];
|
||||||
|
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
|
||||||
|
buf.extend_from_slice(bytes);
|
||||||
|
bytes_remaining -= bytes.len();
|
||||||
|
}
|
||||||
|
|
||||||
|
while bytes_remaining > 0 {
|
||||||
|
let chunk_bytes = chunk_iter.next().unwrap();
|
||||||
|
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
|
||||||
|
buf.extend_from_slice(bytes);
|
||||||
|
bytes_remaining -= bytes.len();
|
||||||
|
}
|
||||||
|
|
||||||
|
debug_assert_eq!(buf.len(), token.length);
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(&mut self, mut buf: &[u8]) -> ChunkToken {
|
||||||
|
let token = ChunkToken {
|
||||||
|
start: self.length,
|
||||||
|
length: buf.len(),
|
||||||
|
};
|
||||||
|
|
||||||
|
while !buf.is_empty() {
|
||||||
|
let chunk_idx = self.length / CHUNK_SIZE;
|
||||||
|
let chunk = match self.chunks.get_mut(chunk_idx) {
|
||||||
|
Some(chunk) => chunk,
|
||||||
|
None => {
|
||||||
|
debug_assert_eq!(self.length % CHUNK_SIZE, 0);
|
||||||
|
debug_assert_eq!(chunk_idx, self.chunks.len());
|
||||||
|
self.chunks.push(heap_alloc_chunk());
|
||||||
|
self.chunks.last_mut().unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let offset = self.length % CHUNK_SIZE;
|
||||||
|
let length = buf.len().min(CHUNK_SIZE - offset);
|
||||||
|
|
||||||
|
chunk[offset..][..length].copy_from_slice(&buf[..length]);
|
||||||
|
buf = &buf[length..];
|
||||||
|
self.length += length;
|
||||||
|
}
|
||||||
|
|
||||||
|
token
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn heap_alloc_chunk() -> Box<[u8; CHUNK_SIZE]> {
|
||||||
|
vec![0u8; CHUNK_SIZE].into_boxed_slice().try_into().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{ChunkToken, ChunkedBuffer, CHUNK_SIZE};
|
||||||
|
|
||||||
|
fn gen_bytes(len: usize) -> Vec<u8> {
|
||||||
|
let mut buf = vec![0u8; len];
|
||||||
|
for (idx, val) in buf.iter_mut().enumerate() {
|
||||||
|
*val = idx as u8;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn one_item() {
|
||||||
|
fn test(data: &[u8]) {
|
||||||
|
let mut chunked_buffer = ChunkedBuffer::default();
|
||||||
|
let token = chunked_buffer.write(data);
|
||||||
|
assert_eq!(
|
||||||
|
ChunkToken {
|
||||||
|
start: 0,
|
||||||
|
length: data.len(),
|
||||||
|
},
|
||||||
|
token
|
||||||
|
);
|
||||||
|
|
||||||
|
let buf = chunked_buffer.read(&token);
|
||||||
|
assert_eq!(data, buf.as_slice());
|
||||||
|
}
|
||||||
|
|
||||||
|
test(b"");
|
||||||
|
test(b"a");
|
||||||
|
test(b"abc");
|
||||||
|
|
||||||
|
test(&gen_bytes(CHUNK_SIZE - 1));
|
||||||
|
test(&gen_bytes(CHUNK_SIZE));
|
||||||
|
test(&gen_bytes(CHUNK_SIZE + 1));
|
||||||
|
|
||||||
|
test(&gen_bytes(2 * CHUNK_SIZE - 1));
|
||||||
|
test(&gen_bytes(2 * CHUNK_SIZE));
|
||||||
|
test(&gen_bytes(2 * CHUNK_SIZE + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn many_items() {
|
||||||
|
let mut chunked_buffer = ChunkedBuffer::default();
|
||||||
|
let mut start = 0;
|
||||||
|
let mut expected = Vec::new();
|
||||||
|
|
||||||
|
let mut test = |data: &[u8]| {
|
||||||
|
let token = chunked_buffer.write(data);
|
||||||
|
assert_eq!(
|
||||||
|
ChunkToken {
|
||||||
|
start,
|
||||||
|
length: data.len(),
|
||||||
|
},
|
||||||
|
token
|
||||||
|
);
|
||||||
|
|
||||||
|
expected.push((token, data.to_vec()));
|
||||||
|
|
||||||
|
for (token, expected_data) in &expected {
|
||||||
|
let buf = chunked_buffer.read(token);
|
||||||
|
assert_eq!(expected_data, buf.as_slice());
|
||||||
|
}
|
||||||
|
|
||||||
|
start += data.len();
|
||||||
|
};
|
||||||
|
|
||||||
|
test(b"abc");
|
||||||
|
test(b"");
|
||||||
|
test(b"a");
|
||||||
|
|
||||||
|
test(&gen_bytes(CHUNK_SIZE - 1));
|
||||||
|
test(&gen_bytes(CHUNK_SIZE));
|
||||||
|
test(&gen_bytes(CHUNK_SIZE + 1));
|
||||||
|
|
||||||
|
test(&gen_bytes(2 * CHUNK_SIZE - 1));
|
||||||
|
test(&gen_bytes(2 * CHUNK_SIZE));
|
||||||
|
test(&gen_bytes(2 * CHUNK_SIZE + 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,6 +11,9 @@ pub mod seqwait;
|
|||||||
/// append only ordered map implemented with a Vec
|
/// append only ordered map implemented with a Vec
|
||||||
pub mod vec_map;
|
pub mod vec_map;
|
||||||
|
|
||||||
|
/// expandable byte buffer consisting of fixed-size chunks
|
||||||
|
pub mod chunked_buffer;
|
||||||
|
|
||||||
// Async version of SeqWait. Currently unused.
|
// Async version of SeqWait. Currently unused.
|
||||||
// pub mod seqwait_async;
|
// pub mod seqwait_async;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user