From 445288d7c1f3540a15acc47efb8b16a057d10e0f Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Wed, 3 Nov 2021 23:07:12 -0700 Subject: [PATCH] Change chunked_buffer API --- pageserver/src/layered_repository/blob.rs | 10 +- .../src/layered_repository/delta_layer.rs | 6 +- .../src/layered_repository/inmemory_layer.rs | 8 +- .../src/layered_repository/page_versions.rs | 58 ++-- zenith_utils/src/chunked_buffer.rs | 256 +++++++++--------- 5 files changed, 155 insertions(+), 183 deletions(-) diff --git a/pageserver/src/layered_repository/blob.rs b/pageserver/src/layered_repository/blob.rs index 5773db878c..b7c7c3f460 100644 --- a/pageserver/src/layered_repository/blob.rs +++ b/pageserver/src/layered_repository/blob.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::Read, io::Write}; +use std::{fs::File, io::Write}; use anyhow::Result; use bookfile::{BookWriter, BoundedReader, ChapterId, ChapterWriter}; @@ -28,14 +28,14 @@ impl BlobWriter { Self { writer, offset: 0 } } - pub fn write_blob_from_reader(&mut self, r: &mut impl Read) -> Result { - let len = std::io::copy(r, &mut self.writer)?; + pub fn write_blob(&mut self, blob: &[u8]) -> Result { + self.writer.write_all(blob)?; let range = BlobRange { offset: self.offset, - size: len as usize, + size: blob.len(), }; - self.offset += len as u64; + self.offset += blob.len() as u64; Ok(range) } diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index ed2166230d..b41b0341cb 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -402,9 +402,9 @@ impl DeltaLayer { let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); let page_versions_iter = page_versions.ordered_page_version_iter(cutoff); - for (blknum, lsn, pos) in page_versions_iter { - let blob_range = - page_version_writer.write_blob_from_reader(&mut page_versions.reader(pos)?)?; + for (blknum, lsn, token) in page_versions_iter { + let blob_range = page_version_writer + .write_blob(page_versions.get_page_version_bytes(token).as_slice())?; inner .page_version_metas diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 5b60b4b106..3fab51f76d 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -184,8 +184,8 @@ impl Layer for InMemoryLayer { .get_block_lsn_range(blknum, ..=lsn) .iter() .rev(); - for (entry_lsn, pos) in iter { - match inner.page_versions.get_page_version(*pos)? { + for (entry_lsn, token) in iter { + match inner.page_versions.get_page_version(token)? { PageVersion::Page(img) => { reconstruct_data.page_img = Some(img); need_image = false; @@ -285,8 +285,8 @@ impl Layer for InMemoryLayer { println!("segsizes {}: {}", k, v); } - for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) { - let pv_description = match inner.page_versions.get_page_version(pos)? { + for (blknum, lsn, token) in inner.page_versions.ordered_page_version_iter(None) { + let pv_description = match inner.page_versions.get_page_version(token)? { PageVersion::Page(_img) => "page", PageVersion::Wal(_rec) => "wal", }; diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index ea60989d63..1cbd25b851 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -2,20 +2,18 @@ use std::{collections::HashMap, ops::RangeBounds, slice}; use anyhow::Result; -use std::io::{Read, Write}; - -use zenith_utils::chunked_buffer::{ChunkedBuffer, ChunkedBufferReader}; +use zenith_utils::chunked_buffer::{ChunkToken, ChunkedBuffer}; use zenith_utils::{lsn::Lsn, vec_map::VecMap}; use super::storage_layer::PageVersion; use zenith_utils::bin_ser::LeSer; -const EMPTY_SLICE: &[(Lsn, u64)] = &[]; +const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[]; #[derive(Debug, Default)] pub struct PageVersions { - map: HashMap>, + map: HashMap>, /// The PageVersion structs are stored in a serialized format in this buffer. /// Each serialized PageVersion is preceded by a 'u32' length field. @@ -29,26 +27,15 @@ impl PageVersions { blknum: u32, lsn: Lsn, page_version: PageVersion, - ) -> Option { - // remember starting position - let pos = self.buffer.len(); - - // make room for the 'length' field by writing zeros as a placeholder. - self.buffer.write_all(&[0u8; 4]).unwrap(); - - page_version.ser_into(&mut self.buffer).unwrap(); - - // write the 'length' field. - let len = self.buffer.len() - pos - 4; - let lenbuf = u32::to_ne_bytes(len as u32); - self.buffer.write_all_at(&lenbuf, pos).unwrap(); + ) -> Option { + let token = self.buffer.write(page_version.ser().unwrap().as_slice()); let map = self.map.entry(blknum).or_insert_with(VecMap::default); - map.append_or_update_last(lsn, pos as u64).unwrap() + map.append_or_update_last(lsn, token).unwrap() } /// Get all [`PageVersion`]s in a block - fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] { + fn get_block_slice(&self, blknum: u32) -> &[(Lsn, ChunkToken)] { self.map .get(&blknum) .map(VecMap::as_slice) @@ -56,7 +43,11 @@ impl PageVersions { } /// Get a range of [`PageVersions`] in a block - pub fn get_block_lsn_range>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] { + pub fn get_block_lsn_range>( + &self, + blknum: u32, + range: R, + ) -> &[(Lsn, ChunkToken)] { self.map .get(&blknum) .map(|vec_map| vec_map.slice_range(range)) @@ -83,20 +74,13 @@ impl PageVersions { } } - /// Returns a 'Read' that reads the page version at given offset. - pub fn reader(&self, pos: u64) -> Result, std::io::Error> { - // read length - let mut lenbuf = [0u8; 4]; - let mut reader = self.buffer.reader(pos); - reader.read_exact(&mut lenbuf)?; - let len = u32::from_ne_bytes(lenbuf); - - Ok(reader.take(len as u64)) + pub fn get_page_version_bytes(&self, token: &ChunkToken) -> Vec { + self.buffer.read(token) } - pub fn get_page_version(&self, pos: u64) -> Result { - let mut reader = self.reader(pos)?; - Ok(PageVersion::des_from(&mut reader)?) + pub fn get_page_version(&self, token: &ChunkToken) -> Result { + let buf = self.get_page_version_bytes(token); + Ok(PageVersion::des(buf.as_slice())?) // TODO unwrap } } @@ -108,7 +92,7 @@ pub struct OrderedPageVersionIter<'a> { cutoff_lsn: Option, - cur_slice_iter: slice::Iter<'a, (Lsn, u64)>, + cur_slice_iter: slice::Iter<'a, (Lsn, ChunkToken)>, } impl OrderedPageVersionIter<'_> { @@ -122,14 +106,14 @@ impl OrderedPageVersionIter<'_> { } impl<'a> Iterator for OrderedPageVersionIter<'a> { - type Item = (u32, Lsn, u64); + type Item = (u32, Lsn, &'a ChunkToken); fn next(&mut self) -> Option { loop { - if let Some((lsn, pos)) = self.cur_slice_iter.next() { + if let Some((lsn, token)) = self.cur_slice_iter.next() { if self.is_lsn_before_cutoff(lsn) { let blknum = self.ordered_blocks[self.cur_block_idx]; - return Some((blknum, *lsn, *pos)); + return Some((blknum, *lsn, token)); } } diff --git a/zenith_utils/src/chunked_buffer.rs b/zenith_utils/src/chunked_buffer.rs index 678b0926d3..585573753e 100644 --- a/zenith_utils/src/chunked_buffer.rs +++ b/zenith_utils/src/chunked_buffer.rs @@ -1,5 +1,4 @@ -use std::cmp::min; -use std::io::{Read, Write}; +use std::convert::TryInto; const CHUNK_SIZE: usize = 1024; @@ -20,166 +19,155 @@ pub struct ChunkedBuffer { // heuristic doesn't save us.) #[allow(clippy::vec_box)] chunks: Vec>, - len: usize, + + length: usize, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct ChunkToken { + start: usize, + length: usize, } impl ChunkedBuffer { - /// Return current length of the buffer, in bytes - pub fn len(&self) -> usize { - self.len - } + pub fn read(&self, token: &ChunkToken) -> Vec { + let mut buf = Vec::with_capacity(token.length); - pub fn is_empty(&self) -> bool { - self.len == 0 - } - - /// Create a "cursor" for reading from the buffer. - pub fn reader(&self, pos: u64) -> ChunkedBufferReader { - ChunkedBufferReader { - parent: self, - pos: pos as usize, - } - } - - /// Write 'buf' to the given byte position in the buffer. - /// - /// The buffer is expanded if needed. If 'pos' is past the end of the buffer, - /// the "gap" is filled with zeros. - /// - /// This can stop short, depending on where the chunk boundaries are. But - /// always writes at least one byte. - pub fn write_at(&mut self, buf: &[u8], pos: usize) -> Result { - let chunk_idx = pos / CHUNK_SIZE; - let chunk_off = pos % CHUNK_SIZE; - - while chunk_idx >= self.chunks.len() { - self.chunks.push(Box::new([0; CHUNK_SIZE])); + let chunk_idx = token.start / CHUNK_SIZE; + let mut chunk_iter = self.chunks[chunk_idx..].iter(); + let mut bytes_remaining = token.length; + if bytes_remaining > 0 { + let start_offset = token.start % CHUNK_SIZE; + let chunk_bytes = &chunk_iter.next().unwrap()[start_offset..]; + let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)]; + buf.extend_from_slice(bytes); + bytes_remaining -= bytes.len(); } - let n = min(CHUNK_SIZE - chunk_off, buf.len()); - - let chunk = &mut self.chunks[chunk_idx]; - - chunk[chunk_off..(chunk_off + n)].copy_from_slice(&buf[0..n]); - - if pos + n > self.len { - self.len = pos + n; + while bytes_remaining > 0 { + let chunk_bytes = chunk_iter.next().unwrap(); + let bytes = &chunk_bytes[..chunk_bytes.len().min(bytes_remaining)]; + buf.extend_from_slice(bytes); + bytes_remaining -= bytes.len(); } - Ok(n) + + debug_assert_eq!(buf.len(), token.length); + buf } - /// Like 'write_at', but doesn't stop until the whole buffer has been written. - pub fn write_all_at(&mut self, buf: &[u8], pos: usize) -> Result<(), std::io::Error> { - let mut nwritten = 0; + pub fn write(&mut self, mut buf: &[u8]) -> ChunkToken { + let token = ChunkToken { + start: self.length, + length: buf.len(), + }; - while nwritten < buf.len() { - nwritten += self.write_at(&buf[nwritten..], pos + nwritten)?; + while !buf.is_empty() { + let chunk_idx = self.length / CHUNK_SIZE; + let chunk = match self.chunks.get_mut(chunk_idx) { + Some(chunk) => chunk, + None => { + debug_assert_eq!(self.length % CHUNK_SIZE, 0); + debug_assert_eq!(chunk_idx, self.chunks.len()); + self.chunks.push(heap_alloc_chunk()); + self.chunks.last_mut().unwrap() + } + }; + + let offset = self.length % CHUNK_SIZE; + let length = buf.len().min(CHUNK_SIZE - offset); + + chunk[offset..][..length].copy_from_slice(&buf[..length]); + buf = &buf[length..]; + self.length += length; } - Ok(()) + + token } } -impl Write for ChunkedBuffer { - /// Append to the end of the buffer. - /// - /// Note how this interacts with 'write_at'. If you use write_at to expand the - /// buffer, the "write position" for this function changes too. - fn write(&mut self, buf: &[u8]) -> Result { - self.write_at(buf, self.len) - } - - fn flush(&mut self) -> Result<(), std::io::Error> { - Ok(()) - } +fn heap_alloc_chunk() -> Box<[u8; CHUNK_SIZE]> { + vec![0u8; CHUNK_SIZE].into_boxed_slice().try_into().unwrap() } -pub struct ChunkedBufferReader<'a> { - parent: &'a ChunkedBuffer, - pos: usize, -} +#[cfg(test)] +mod tests { + use super::{ChunkToken, ChunkedBuffer, CHUNK_SIZE}; -impl<'a> Read for ChunkedBufferReader<'a> { - fn read(&mut self, dst: &mut [u8]) -> Result { - if self.pos >= self.parent.len { - return Ok(0); + fn gen_bytes(len: usize) -> Vec { + let mut buf = vec![0u8; len]; + for idx in 0..len { + buf[idx] = idx as u8; } - let chunk_idx = self.pos / CHUNK_SIZE; - let chunk_off = self.pos % CHUNK_SIZE; - - let len = min( - min(self.parent.len - self.pos, CHUNK_SIZE - chunk_off), - dst.len(), - ); - - let chunk = &self.parent.chunks[chunk_idx]; - - dst[0..len].copy_from_slice(&chunk[chunk_off..(chunk_off + len)]); - - self.pos += len; - - Ok(len) - } -} - -#[test] -fn test_chunked_buffer() -> Result<(), std::io::Error> { - let mut chunked_buffer = ChunkedBuffer::default(); - - let mut testdata = [0u8; 10000]; - for (i, b) in testdata.iter_mut().enumerate() { - *b = (i % 256) as u8; + buf } - chunked_buffer.write_all(&testdata)?; + #[test] + fn one_item() { + fn test(data: &[u8]) { + let mut chunked_buffer = ChunkedBuffer::default(); + let token = chunked_buffer.write(data); + assert_eq!( + ChunkToken { + start: 0, + length: data.len(), + }, + token + ); - // Test reader - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader(0); - reader.read_exact(&mut resultdata)?; - assert_eq!(resultdata, testdata); + let buf = chunked_buffer.read(&token); + assert_eq!(data, buf.as_slice()); + } - // test zero-length read - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader(0); - reader.read_exact(&mut resultdata[0..0])?; + test(b""); + test(b"a"); + test(b"abc"); - // test reads around chunk boundaries - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader(0); - reader.read_exact(&mut resultdata[0..1])?; - reader.read_exact(&mut resultdata[1..CHUNK_SIZE])?; - assert_eq!(resultdata[0..CHUNK_SIZE], testdata[0..CHUNK_SIZE]); + test(&gen_bytes(CHUNK_SIZE - 1)); + test(&gen_bytes(CHUNK_SIZE)); + test(&gen_bytes(CHUNK_SIZE + 1)); - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader(10); - reader.read_exact(&mut resultdata[10..CHUNK_SIZE + 10])?; - assert_eq!( - resultdata[10..CHUNK_SIZE + 10], - testdata[10..CHUNK_SIZE + 10] - ); + test(&gen_bytes(2 * CHUNK_SIZE - 1)); + test(&gen_bytes(2 * CHUNK_SIZE)); + test(&gen_bytes(2 * CHUNK_SIZE + 1)); + } - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader((CHUNK_SIZE - 1) as u64); - reader.read_exact(&mut resultdata[(CHUNK_SIZE - 1)..CHUNK_SIZE])?; - assert_eq!(resultdata[CHUNK_SIZE - 1], testdata[CHUNK_SIZE - 1]); - reader.read_exact(&mut resultdata[(CHUNK_SIZE)..(CHUNK_SIZE + 1)])?; - assert_eq!(resultdata[CHUNK_SIZE], testdata[CHUNK_SIZE]); + #[test] + fn many_items() { + let mut chunked_buffer = ChunkedBuffer::default(); + let mut start = 0; + let mut expected = Vec::new(); - // Read at the end - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader(9900); - reader.read_exact(&mut resultdata[9900..10000])?; - assert_eq!(resultdata[9900..10000], testdata[9900..10000]); + let mut test = |data: &[u8]| { + let token = chunked_buffer.write(data); + assert_eq!( + ChunkToken { + start, + length: data.len(), + }, + token + ); - // Read past the end - let mut resultdata = [0u8; 10001]; - let mut reader = chunked_buffer.reader(9900); - assert!(reader.read_exact(&mut resultdata[9900..10001]).is_err()); + expected.push((token, data.to_vec())); - let mut resultdata = [0u8; 10000]; - let mut reader = chunked_buffer.reader(20000); - assert_eq!(reader.read(&mut resultdata)?, 0); + for (token, expected_data) in &expected { + let buf = chunked_buffer.read(&token); + assert_eq!(expected_data, buf.as_slice()); + } - Ok(()) + start += data.len(); + }; + + test(b"abc"); + test(b""); + test(b"a"); + + test(&gen_bytes(CHUNK_SIZE - 1)); + test(&gen_bytes(CHUNK_SIZE)); + test(&gen_bytes(CHUNK_SIZE + 1)); + + test(&gen_bytes(2 * CHUNK_SIZE - 1)); + test(&gen_bytes(2 * CHUNK_SIZE)); + test(&gen_bytes(2 * CHUNK_SIZE + 1)); + } }