mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
4 Commits
hackathon/
...
layer-chun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fa2d53366f | ||
|
|
b26b471250 | ||
|
|
445288d7c1 | ||
|
|
556422f9bb |
@@ -1601,7 +1601,7 @@ impl LayeredTimeline {
|
||||
rel,
|
||||
request_lsn
|
||||
);
|
||||
Ok(img.clone())
|
||||
Ok(Bytes::from(img.image().to_vec()))
|
||||
} else {
|
||||
// FIXME: this ought to be an error?
|
||||
warn!("Page {} blk {} at {} not found", rel, blknum, request_lsn);
|
||||
@@ -1612,7 +1612,7 @@ impl LayeredTimeline {
|
||||
//
|
||||
// If we don't have a base image, then the oldest WAL record better initialize
|
||||
// the page
|
||||
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init {
|
||||
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() {
|
||||
// FIXME: this ought to be an error?
|
||||
warn!(
|
||||
"Base image for page {}/{} at {} not found, but got {} WAL records",
|
||||
@@ -1632,7 +1632,7 @@ impl LayeredTimeline {
|
||||
rel,
|
||||
blknum,
|
||||
request_lsn,
|
||||
data.page_img.clone(),
|
||||
data.page_img.map(|page| Bytes::from(page.image().to_vec())), // FIXME
|
||||
data.records,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -10,10 +10,10 @@ pub struct BlobRange {
|
||||
size: usize,
|
||||
}
|
||||
|
||||
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {
|
||||
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Box<[u8]>> {
|
||||
let mut buf = vec![0u8; range.size];
|
||||
reader.read_exact_at(&mut buf, range.offset)?;
|
||||
Ok(buf)
|
||||
Ok(buf.into_boxed_slice())
|
||||
}
|
||||
|
||||
pub struct BlobWriter<W> {
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
//!
|
||||
use crate::layered_repository::blob::BlobWriter;
|
||||
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
|
||||
use crate::layered_repository::page_versions::PageVersions;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
|
||||
};
|
||||
@@ -46,6 +47,7 @@ use crate::waldecoder;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zenith_utils::vec_map::VecMap;
|
||||
@@ -203,7 +205,8 @@ impl Layer for DeltaLayer {
|
||||
.iter()
|
||||
.rev();
|
||||
for ((_blknum, pv_lsn), blob_range) in iter {
|
||||
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
|
||||
let pv_bytes = read_blob(&page_version_reader, blob_range)?;
|
||||
let pv = PageVersion::from_bytes(pv_bytes);
|
||||
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
@@ -213,7 +216,7 @@ impl Layer for DeltaLayer {
|
||||
break;
|
||||
}
|
||||
PageVersion::Wal(rec) => {
|
||||
let will_init = rec.will_init;
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_data.records.push((*pv_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
@@ -309,19 +312,20 @@ impl Layer for DeltaLayer {
|
||||
let mut desc = String::new();
|
||||
|
||||
let buf = read_blob(&chapter, blob_range)?;
|
||||
let pv = PageVersion::des(&buf)?;
|
||||
let pv = PageVersion::from_bytes(buf);
|
||||
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
PageVersion::Page(page) => {
|
||||
write!(&mut desc, " img {} bytes", page.image().len())?;
|
||||
}
|
||||
PageVersion::Wal(rec) => {
|
||||
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
|
||||
let wal_desc =
|
||||
waldecoder::describe_wal_record(&Bytes::from(rec.rec().to_vec()));
|
||||
write!(
|
||||
&mut desc,
|
||||
" rec {} bytes will_init: {} {}",
|
||||
rec.rec.len(),
|
||||
rec.will_init,
|
||||
rec.rec().len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)?;
|
||||
}
|
||||
@@ -350,14 +354,14 @@ impl DeltaLayer {
|
||||
}
|
||||
|
||||
/// Create a new delta file, using the given page versions and relsizes.
|
||||
/// The page versions are passed by an iterator; the iterator must return
|
||||
/// page versions in blknum+lsn order.
|
||||
/// The page versions are passed in a PageVersions struct. If 'cutoff' is
|
||||
/// given, only page versions with LSN < cutoff are included.
|
||||
///
|
||||
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
|
||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||
/// expedient.
|
||||
/// This is used to write the in-memory layer to disk. The page_versions and
|
||||
/// relsizes are thus passed in the same format as they are in the in-memory
|
||||
/// layer, as that's expedient.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn create<'a>(
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
@@ -365,7 +369,8 @@ impl DeltaLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
||||
page_versions: &PageVersions,
|
||||
cutoff: Option<Lsn>,
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
) -> Result<DeltaLayer> {
|
||||
if seg.rel.is_blocky() {
|
||||
@@ -399,9 +404,10 @@ impl DeltaLayer {
|
||||
|
||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||
|
||||
for (blknum, lsn, page_version) in page_versions {
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
let page_versions_iter = page_versions.ordered_page_version_iter(cutoff);
|
||||
for (blknum, lsn, token) in page_versions_iter {
|
||||
let blob_range = page_version_writer
|
||||
.write_blob(page_versions.get_page_version_bytes(token).as_slice())?;
|
||||
|
||||
inner
|
||||
.page_version_metas
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
//!
|
||||
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
|
||||
Layer, Page, PageReconstructData, PageReconstructResult, SegmentTag,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::RELISH_SEG_SIZE;
|
||||
@@ -177,7 +177,7 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
};
|
||||
|
||||
reconstruct_data.page_img = Some(Bytes::from(buf));
|
||||
reconstruct_data.page_img = Some(Page::from_bytes(&buf));
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::ZERO_PAGE;
|
||||
use crate::layered_repository::{DeltaLayer, ImageLayer};
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
@@ -24,6 +23,7 @@ use zenith_utils::vec_map::VecMap;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::page_versions::PageVersions;
|
||||
use super::storage_layer::Page;
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -184,16 +184,17 @@ impl Layer for InMemoryLayer {
|
||||
.get_block_lsn_range(blknum, ..=lsn)
|
||||
.iter()
|
||||
.rev();
|
||||
for (entry_lsn, pv) in iter {
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
for (entry_lsn, token) in iter {
|
||||
match inner.page_versions.get_page_version(token)? {
|
||||
PageVersion::Page(page) => {
|
||||
reconstruct_data.page_img = Some(page);
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
PageVersion::Wal(rec) => {
|
||||
reconstruct_data.records.push((*entry_lsn, rec.clone()));
|
||||
if rec.will_init {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_data.records.push((*entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
@@ -285,8 +286,8 @@ impl Layer for InMemoryLayer {
|
||||
println!("segsizes {}: {}", k, v);
|
||||
}
|
||||
|
||||
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
|
||||
let pv_description = match pv {
|
||||
for (blknum, lsn, token) in inner.page_versions.ordered_page_version_iter(None) {
|
||||
let pv_description = match inner.page_versions.get_page_version(token)? {
|
||||
PageVersion::Page(_img) => "page",
|
||||
PageVersion::Wal(_rec) => "wal",
|
||||
};
|
||||
@@ -361,7 +362,7 @@ impl InMemoryLayer {
|
||||
|
||||
/// Remember new page version, as a full page image
|
||||
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
|
||||
self.put_page_version(blknum, lsn, PageVersion::Page(img))
|
||||
self.put_page_version(blknum, lsn, PageVersion::Page(Page::from_bytes(&img)))
|
||||
}
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
@@ -382,8 +383,8 @@ impl InMemoryLayer {
|
||||
|
||||
let mut mem_usage = 0;
|
||||
mem_usage += match &pv {
|
||||
PageVersion::Page(img) => img.len(),
|
||||
PageVersion::Wal(rec) => rec.rec.len(),
|
||||
PageVersion::Page(page) => page.image().len(),
|
||||
PageVersion::Wal(rec) => rec.rec().len(),
|
||||
};
|
||||
|
||||
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
|
||||
@@ -425,7 +426,7 @@ impl InMemoryLayer {
|
||||
// subsequent call to initialize the gap page.
|
||||
let gapstart = self.seg.segno * RELISH_SEG_SIZE + oldsize;
|
||||
for gapblknum in gapstart..blknum {
|
||||
let zeropv = PageVersion::Page(ZERO_PAGE.clone());
|
||||
let zeropv = PageVersion::Page(Page::zero_page());
|
||||
trace!(
|
||||
"filling gap blk {} with zeros for write of {}",
|
||||
gapblknum,
|
||||
@@ -610,7 +611,8 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn_exclusive,
|
||||
true,
|
||||
inner.page_versions.ordered_page_version_iter(None),
|
||||
&inner.page_versions,
|
||||
None,
|
||||
inner.segsizes.clone(),
|
||||
)?;
|
||||
trace!(
|
||||
@@ -627,13 +629,9 @@ impl InMemoryLayer {
|
||||
|
||||
// Since `end_lsn` is inclusive, subtract 1.
|
||||
// We want to make an ImageLayer for the last included LSN,
|
||||
// so the DeltaLayer should exlcude that LSN.
|
||||
// so the DeltaLayer should exclude that LSN.
|
||||
let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1);
|
||||
|
||||
let mut page_versions = inner
|
||||
.page_versions
|
||||
.ordered_page_version_iter(Some(end_lsn_inclusive));
|
||||
|
||||
let mut delta_layers = Vec::new();
|
||||
|
||||
if self.start_lsn != end_lsn_inclusive {
|
||||
@@ -647,7 +645,8 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn_inclusive,
|
||||
false,
|
||||
page_versions,
|
||||
&inner.page_versions,
|
||||
Some(end_lsn_inclusive),
|
||||
segsizes,
|
||||
)?;
|
||||
delta_layers.push(delta_layer);
|
||||
@@ -658,7 +657,11 @@ impl InMemoryLayer {
|
||||
end_lsn_inclusive
|
||||
);
|
||||
} else {
|
||||
assert!(page_versions.next().is_none());
|
||||
assert!(inner
|
||||
.page_versions
|
||||
.ordered_page_version_iter(None)
|
||||
.next()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
drop(inner);
|
||||
|
||||
@@ -1,13 +1,23 @@
|
||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use zenith_utils::chunked_buffer::{ChunkToken, ChunkedBuffer};
|
||||
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
|
||||
|
||||
use super::storage_layer::PageVersion;
|
||||
|
||||
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
|
||||
const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[];
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
|
||||
pub struct PageVersions {
|
||||
map: HashMap<u32, VecMap<Lsn, ChunkToken>>,
|
||||
|
||||
/// The PageVersion structs are stored in a serialized format in this buffer.
|
||||
/// Each serialized PageVersion is preceded by a 'u32' length field.
|
||||
/// The 'map' stores offsets into this buffer.
|
||||
buffer: ChunkedBuffer,
|
||||
}
|
||||
|
||||
impl PageVersions {
|
||||
pub fn append_or_update_last(
|
||||
@@ -15,14 +25,16 @@ impl PageVersions {
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
page_version: PageVersion,
|
||||
) -> Option<PageVersion> {
|
||||
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
|
||||
map.append_or_update_last(lsn, page_version).unwrap()
|
||||
) -> Option<ChunkToken> {
|
||||
let token = self.buffer.write(page_version.bytes());
|
||||
|
||||
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
|
||||
map.append_or_update_last(lsn, token).unwrap()
|
||||
}
|
||||
|
||||
/// Get all [`PageVersion`]s in a block
|
||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
||||
self.0
|
||||
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, ChunkToken)] {
|
||||
self.map
|
||||
.get(&blknum)
|
||||
.map(VecMap::as_slice)
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
@@ -33,8 +45,8 @@ impl PageVersions {
|
||||
&self,
|
||||
blknum: u32,
|
||||
range: R,
|
||||
) -> &[(Lsn, PageVersion)] {
|
||||
self.0
|
||||
) -> &[(Lsn, ChunkToken)] {
|
||||
self.map
|
||||
.get(&blknum)
|
||||
.map(|vec_map| vec_map.slice_range(range))
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
@@ -43,7 +55,7 @@ impl PageVersions {
|
||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
||||
let mut ordered_blocks: Vec<u32> = self.map.keys().cloned().collect();
|
||||
ordered_blocks.sort_unstable();
|
||||
|
||||
let slice = ordered_blocks
|
||||
@@ -59,6 +71,15 @@ impl PageVersions {
|
||||
cur_slice_iter: slice.iter(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_page_version_bytes(&self, token: &ChunkToken) -> Vec<u8> {
|
||||
self.buffer.read(token)
|
||||
}
|
||||
|
||||
pub fn get_page_version(&self, token: &ChunkToken) -> Result<PageVersion> {
|
||||
let buf = self.get_page_version_bytes(token).into_boxed_slice();
|
||||
Ok(PageVersion::from_bytes(buf))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OrderedPageVersionIter<'a> {
|
||||
@@ -69,7 +90,7 @@ pub struct OrderedPageVersionIter<'a> {
|
||||
|
||||
cutoff_lsn: Option<Lsn>,
|
||||
|
||||
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
|
||||
cur_slice_iter: slice::Iter<'a, (Lsn, ChunkToken)>,
|
||||
}
|
||||
|
||||
impl OrderedPageVersionIter<'_> {
|
||||
@@ -83,14 +104,14 @@ impl OrderedPageVersionIter<'_> {
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
||||
type Item = (u32, Lsn, &'a PageVersion);
|
||||
type Item = (u32, Lsn, &'a ChunkToken);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
loop {
|
||||
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
|
||||
if let Some((lsn, token)) = self.cur_slice_iter.next() {
|
||||
if self.is_lsn_before_cutoff(lsn) {
|
||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||
return Some((blknum, *lsn, page_version));
|
||||
return Some((blknum, *lsn, token));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,7 +125,7 @@ impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::Bytes;
|
||||
use crate::layered_repository::storage_layer::Page;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -114,8 +135,7 @@ mod tests {
|
||||
const BLOCKS: u32 = 1000;
|
||||
const LSNS: u64 = 50;
|
||||
|
||||
let empty_page = Bytes::from_static(&[0u8; 8192]);
|
||||
let empty_page_version = PageVersion::Page(empty_page);
|
||||
let empty_page_version = PageVersion::Page(Page::zero_page());
|
||||
|
||||
for blknum in 0..BLOCKS {
|
||||
for lsn in 0..LSNS {
|
||||
|
||||
@@ -3,10 +3,9 @@
|
||||
//!
|
||||
|
||||
use crate::relish::RelishTag;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::repository::{WALRecord, WAL_BIT};
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
@@ -51,12 +50,52 @@ impl SegmentTag {
|
||||
///
|
||||
/// A page version can be stored as a full page image, or as WAL record that needs
|
||||
/// to be applied over the previous page version to reconstruct this version.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum PageVersion {
|
||||
Page(Bytes),
|
||||
Page(Page),
|
||||
Wal(WALRecord),
|
||||
}
|
||||
|
||||
impl PageVersion {
|
||||
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
|
||||
if bytes[0] & WAL_BIT != 0 {
|
||||
Self::Wal(WALRecord::from_bytes(bytes))
|
||||
} else {
|
||||
Self::Page(Page(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes(&self) -> &[u8] {
|
||||
match self {
|
||||
Self::Page(page) => page.bytes(),
|
||||
Self::Wal(wal) => wal.bytes(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Page(Box<[u8]>);
|
||||
|
||||
impl Page {
|
||||
pub fn zero_page() -> Self {
|
||||
// TODO optimize this
|
||||
Self(vec![0u8; 1 + 8192].into_boxed_slice())
|
||||
}
|
||||
pub fn from_bytes(bytes: &[u8]) -> Self {
|
||||
let mut buf = vec![0u8; 1 + bytes.len()];
|
||||
buf[1..].copy_from_slice(bytes);
|
||||
Self(buf.into_boxed_slice())
|
||||
}
|
||||
|
||||
pub fn image(&self) -> &[u8] {
|
||||
&self.0[1..]
|
||||
}
|
||||
|
||||
pub fn bytes(&self) -> &[u8] {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Data needed to reconstruct a page version
|
||||
///
|
||||
@@ -66,7 +105,7 @@ pub enum PageVersion {
|
||||
///
|
||||
pub struct PageReconstructData {
|
||||
pub records: Vec<(Lsn, WALRecord)>,
|
||||
pub page_img: Option<Bytes>,
|
||||
pub page_img: Option<Page>,
|
||||
}
|
||||
|
||||
/// Return value from Layer::get_page_reconstruct_data
|
||||
|
||||
@@ -2,8 +2,8 @@ use crate::relish::*;
|
||||
use crate::CheckpointConfig;
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryInto;
|
||||
use std::ops::{AddAssign, Deref};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -192,14 +192,45 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct WALRecord {
|
||||
pub will_init: bool,
|
||||
pub rec: Bytes,
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WALRecord(Box<[u8]>);
|
||||
|
||||
pub const WAL_BIT: u8 = 0b1;
|
||||
const WILL_INIT_BIT: u8 = 0b10;
|
||||
|
||||
impl WALRecord {
|
||||
pub fn new(will_init: bool, main_data_offset: u32, rec: &[u8]) -> Self {
|
||||
// TODO avoid init
|
||||
let mut buf = vec![0u8; 1 + 4 + rec.len()];
|
||||
buf[0] = WAL_BIT | if will_init { WILL_INIT_BIT } else { 0 };
|
||||
let mdo = u32::to_le_bytes(main_data_offset);
|
||||
buf[1..5].copy_from_slice(&mdo);
|
||||
buf[5..].copy_from_slice(rec);
|
||||
Self(buf.into_boxed_slice())
|
||||
}
|
||||
|
||||
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
|
||||
Self(bytes)
|
||||
}
|
||||
|
||||
pub fn will_init(&self) -> bool {
|
||||
self.0[0] & WILL_INIT_BIT != 0
|
||||
}
|
||||
|
||||
// Remember the offset of main_data in rec,
|
||||
// so that we don't have to parse the record again.
|
||||
// If record has no main_data, this offset equals rec.len().
|
||||
pub main_data_offset: u32,
|
||||
pub fn main_data_offset(&self) -> u32 {
|
||||
u32::from_le_bytes(self.0[1..5].try_into().unwrap())
|
||||
}
|
||||
|
||||
pub fn rec(&self) -> &[u8] {
|
||||
&self.0[5..]
|
||||
}
|
||||
|
||||
pub fn bytes(&self) -> &[u8] {
|
||||
&self.0[..]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -427,11 +427,11 @@ pub fn save_decoded_record(
|
||||
forknum: blk.forknum as u8,
|
||||
});
|
||||
|
||||
let rec = WALRecord {
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
let rec = WALRecord::new(
|
||||
blk.will_init || blk.apply_image,
|
||||
decoded.main_data_offset as u32,
|
||||
&recdata[..],
|
||||
);
|
||||
|
||||
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
|
||||
}
|
||||
@@ -770,11 +770,7 @@ fn save_xact_record(
|
||||
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rec = WALRecord {
|
||||
will_init: false,
|
||||
rec: decoded.record.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
|
||||
timeline.put_wal_record(
|
||||
lsn,
|
||||
RelishTag::Slru {
|
||||
@@ -886,11 +882,7 @@ fn save_multixact_create_record(
|
||||
xlrec: &XlMultiXactCreate,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> Result<()> {
|
||||
let rec = WALRecord {
|
||||
will_init: false,
|
||||
rec: decoded.record.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
|
||||
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
@@ -317,7 +317,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
// Apply all collected WAL records
|
||||
for (_lsn, record) in records {
|
||||
let mut buf = record.rec.clone();
|
||||
let mut buf = Bytes::from(record.rec().to_vec());
|
||||
|
||||
WAL_REDO_RECORD_COUNTER.inc();
|
||||
|
||||
@@ -328,7 +328,7 @@ impl PostgresRedoManager {
|
||||
//move to main data
|
||||
// TODO probably, we should store some records in our special format
|
||||
// to avoid this weird parsing on replay
|
||||
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
|
||||
let skip = (record.main_data_offset() - pg_constants::SIZEOF_XLOGRECORD) as usize;
|
||||
if buf.remaining() > skip {
|
||||
buf.advance(skip);
|
||||
}
|
||||
@@ -574,7 +574,7 @@ impl PostgresRedoProcess {
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
}
|
||||
for (lsn, rec) in records.iter() {
|
||||
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
|
||||
build_apply_record_msg(*lsn, rec.rec(), &mut writebuf);
|
||||
}
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
173
zenith_utils/src/chunked_buffer.rs
Normal file
173
zenith_utils/src/chunked_buffer.rs
Normal file
@@ -0,0 +1,173 @@
|
||||
use std::convert::TryInto;
|
||||
|
||||
const CHUNK_SIZE: usize = 1024;
|
||||
|
||||
///
|
||||
/// ChunkedBuffer is an expandable byte buffer. You can append to the end of it, and you
|
||||
/// can read at any byte position. The advantage over a plain Vec<u8> is that the
|
||||
/// buffer consists of smaller chunks, so that a big buffer doesn't require one huge
|
||||
/// allocation, and expanding doesn't require copying all the data.
|
||||
///
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ChunkedBuffer {
|
||||
// Clippy considers it unnecessary to have Box here, since the vector is stored
|
||||
// on the heap anyway. But we have our reasons for that: we want each chunk to
|
||||
// be allocated separately, so that we don't require one huge allocation, and so
|
||||
// that expanding the array doesn't require copying all of the data. (Clippy
|
||||
// knows about that and doesn't complain if the array is large enough, 4096
|
||||
// bytes is the default threshold. Our chunk size is smaller than that so that
|
||||
// heuristic doesn't save us.)
|
||||
#[allow(clippy::vec_box)]
|
||||
chunks: Vec<Box<[u8; CHUNK_SIZE]>>,
|
||||
|
||||
length: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct ChunkToken {
|
||||
start: usize,
|
||||
length: usize,
|
||||
}
|
||||
|
||||
impl ChunkedBuffer {
|
||||
pub fn read(&self, token: &ChunkToken) -> Vec<u8> {
|
||||
let mut buf = Vec::with_capacity(token.length);
|
||||
|
||||
let chunk_idx = token.start / CHUNK_SIZE;
|
||||
let mut chunk_iter = self.chunks[chunk_idx..].iter();
|
||||
let mut bytes_remaining = token.length;
|
||||
if bytes_remaining > 0 {
|
||||
let start_offset = token.start % CHUNK_SIZE;
|
||||
let chunk_bytes = &chunk_iter.next().unwrap()[start_offset..];
|
||||
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
|
||||
buf.extend_from_slice(bytes);
|
||||
bytes_remaining -= bytes.len();
|
||||
}
|
||||
|
||||
while bytes_remaining > 0 {
|
||||
let chunk_bytes = chunk_iter.next().unwrap();
|
||||
let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)];
|
||||
buf.extend_from_slice(bytes);
|
||||
bytes_remaining -= bytes.len();
|
||||
}
|
||||
|
||||
debug_assert_eq!(buf.len(), token.length);
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn write(&mut self, mut buf: &[u8]) -> ChunkToken {
|
||||
let token = ChunkToken {
|
||||
start: self.length,
|
||||
length: buf.len(),
|
||||
};
|
||||
|
||||
while !buf.is_empty() {
|
||||
let chunk_idx = self.length / CHUNK_SIZE;
|
||||
let chunk = match self.chunks.get_mut(chunk_idx) {
|
||||
Some(chunk) => chunk,
|
||||
None => {
|
||||
debug_assert_eq!(self.length % CHUNK_SIZE, 0);
|
||||
debug_assert_eq!(chunk_idx, self.chunks.len());
|
||||
self.chunks.push(heap_alloc_chunk());
|
||||
self.chunks.last_mut().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
let offset = self.length % CHUNK_SIZE;
|
||||
let length = buf.len().min(CHUNK_SIZE - offset);
|
||||
|
||||
chunk[offset..][..length].copy_from_slice(&buf[..length]);
|
||||
buf = &buf[length..];
|
||||
self.length += length;
|
||||
}
|
||||
|
||||
token
|
||||
}
|
||||
}
|
||||
|
||||
fn heap_alloc_chunk() -> Box<[u8; CHUNK_SIZE]> {
|
||||
vec![0u8; CHUNK_SIZE].into_boxed_slice().try_into().unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{ChunkToken, ChunkedBuffer, CHUNK_SIZE};
|
||||
|
||||
fn gen_bytes(len: usize) -> Vec<u8> {
|
||||
let mut buf = vec![0u8; len];
|
||||
for (idx, val) in buf.iter_mut().enumerate() {
|
||||
*val = idx as u8;
|
||||
}
|
||||
|
||||
buf
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_item() {
|
||||
fn test(data: &[u8]) {
|
||||
let mut chunked_buffer = ChunkedBuffer::default();
|
||||
let token = chunked_buffer.write(data);
|
||||
assert_eq!(
|
||||
ChunkToken {
|
||||
start: 0,
|
||||
length: data.len(),
|
||||
},
|
||||
token
|
||||
);
|
||||
|
||||
let buf = chunked_buffer.read(&token);
|
||||
assert_eq!(data, buf.as_slice());
|
||||
}
|
||||
|
||||
test(b"");
|
||||
test(b"a");
|
||||
test(b"abc");
|
||||
|
||||
test(&gen_bytes(CHUNK_SIZE - 1));
|
||||
test(&gen_bytes(CHUNK_SIZE));
|
||||
test(&gen_bytes(CHUNK_SIZE + 1));
|
||||
|
||||
test(&gen_bytes(2 * CHUNK_SIZE - 1));
|
||||
test(&gen_bytes(2 * CHUNK_SIZE));
|
||||
test(&gen_bytes(2 * CHUNK_SIZE + 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn many_items() {
|
||||
let mut chunked_buffer = ChunkedBuffer::default();
|
||||
let mut start = 0;
|
||||
let mut expected = Vec::new();
|
||||
|
||||
let mut test = |data: &[u8]| {
|
||||
let token = chunked_buffer.write(data);
|
||||
assert_eq!(
|
||||
ChunkToken {
|
||||
start,
|
||||
length: data.len(),
|
||||
},
|
||||
token
|
||||
);
|
||||
|
||||
expected.push((token, data.to_vec()));
|
||||
|
||||
for (token, expected_data) in &expected {
|
||||
let buf = chunked_buffer.read(token);
|
||||
assert_eq!(expected_data, buf.as_slice());
|
||||
}
|
||||
|
||||
start += data.len();
|
||||
};
|
||||
|
||||
test(b"abc");
|
||||
test(b"");
|
||||
test(b"a");
|
||||
|
||||
test(&gen_bytes(CHUNK_SIZE - 1));
|
||||
test(&gen_bytes(CHUNK_SIZE));
|
||||
test(&gen_bytes(CHUNK_SIZE + 1));
|
||||
|
||||
test(&gen_bytes(2 * CHUNK_SIZE - 1));
|
||||
test(&gen_bytes(2 * CHUNK_SIZE));
|
||||
test(&gen_bytes(2 * CHUNK_SIZE + 1));
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,9 @@ pub mod seqwait;
|
||||
/// append only ordered map implemented with a Vec
|
||||
pub mod vec_map;
|
||||
|
||||
/// expandable byte buffer consisting of fixed-size chunks
|
||||
pub mod chunked_buffer;
|
||||
|
||||
// Async version of SeqWait. Currently unused.
|
||||
// pub mod seqwait_async;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user