feat(puffin): add partial reader (#2741)

* feat(puffin): add partial reader

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-11-15 14:28:20 +08:00
committed by GitHub
parent a24f8c96b3
commit 3329da5b72
8 changed files with 583 additions and 0 deletions

3
Cargo.lock generated
View File

@@ -6549,8 +6549,11 @@ name = "puffin"
version = "0.4.2"
dependencies = [
"derive_builder 0.12.0",
"futures",
"pin-project",
"serde",
"serde_json",
"tokio",
]
[[package]]

View File

@@ -98,6 +98,7 @@ opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.gi
] }
parquet = "47.0"
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
prost = "0.12"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }

View File

@@ -6,5 +6,10 @@ license.workspace = true
[dependencies]
derive_builder.workspace = true
futures.workspace = true
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
[dev-dependencies]
tokio.workspace = true

View File

@@ -14,3 +14,4 @@
pub mod blob_metadata;
pub mod file_metadata;
pub mod partial_reader;

View File

@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod r#async;
mod position;
mod sync;
use pin_project::pin_project;
/// `PartialReader` to perform synchronous or asynchronous reads on a portion of a resource.
#[pin_project]
pub struct PartialReader<R> {
/// offset of the portion in the resource
offset: u64,
/// size of the portion in the resource
size: u64,
/// Resource for the portion.
/// The `offset` and `size` fields are used to determine the slice of `source` to read.
#[pin]
source: R,
/// The current position within the portion.
///
/// A `None` value indicates that no read operations have been performed yet on this portion.
/// Before a read operation can be performed, the resource must be positioned at the correct offset in the portion.
/// After the first read operation, this field will be set to `Some(_)`, representing the current read position in the portion.
position_in_portion: Option<u64>,
}
impl<R> PartialReader<R> {
/// Creates a new `PartialReader` for the given resource.
pub fn new(source: R, offset: u64, size: u64) -> Self {
Self {
offset,
size,
source,
position_in_portion: None,
}
}
/// Returns the current position in the portion.
pub fn position(&self) -> u64 {
self.position_in_portion.unwrap_or_default()
}
/// Returns the size of the portion in portion.
pub fn size(&self) -> u64 {
self.size
}
/// Returns whether the portion is empty.
pub fn is_empty(&self) -> bool {
self.size == 0
}
/// Returns whether the current position is at the end of the portion.
pub fn is_eof(&self) -> bool {
self.position() == self.size
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
#[test]
fn is_empty_returns_true_for_zero_length_blob() {
let data: Vec<u8> = (0..100).collect();
let reader = PartialReader::new(Cursor::new(data), 10, 0);
assert!(reader.is_empty());
assert!(reader.is_eof());
}
#[test]
fn is_empty_returns_false_for_non_zero_length_blob() {
let data: Vec<u8> = (0..100).collect();
let reader = PartialReader::new(Cursor::new(data), 10, 30);
assert!(!reader.is_empty());
}
}

View File

@@ -0,0 +1,196 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{ready, AsyncRead, AsyncSeek};
use crate::partial_reader::position::position_after_seek;
use crate::partial_reader::PartialReader;
impl<R: AsyncRead + AsyncSeek + Unpin> AsyncRead for PartialReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// past end of portion
if self.position() > self.size() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid read past the end of the portion",
)));
}
// end of portion
if self.is_eof() {
return Poll::Ready(Ok(0));
}
// first read, seek to the correct offset
if self.position_in_portion.is_none() {
// seek operation
let seek_from = io::SeekFrom::Start(self.offset);
ready!(self.as_mut().project().source.poll_seek(cx, seek_from))?;
self.position_in_portion = Some(0);
}
// prevent reading over the end
let max_len = (self.size() - self.position_in_portion.unwrap()) as usize;
let actual_len = max_len.min(buf.len());
// create a limited reader
let target_buf = &mut buf[..actual_len];
// read operation
let read_bytes = ready!(self.as_mut().project().source.poll_read(cx, target_buf))?;
self.position_in_portion = Some(self.position() + read_bytes as u64);
Poll::Ready(Ok(read_bytes))
}
}
impl<R: AsyncRead + AsyncSeek + Unpin> AsyncSeek for PartialReader<R> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
let new_position = position_after_seek(pos, self.position(), self.size())?;
let pos = io::SeekFrom::Start(self.offset + new_position);
ready!(self.as_mut().project().source.poll_seek(cx, pos))?;
self.position_in_portion = Some(new_position);
Poll::Ready(Ok(new_position))
}
}
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use futures::{AsyncReadExt as _, AsyncSeekExt as _};
use super::*;
#[tokio::test]
async fn read_all_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100);
let mut buf = vec![0; 100];
assert_eq!(reader.read(&mut buf).await.unwrap(), 100);
assert_eq!(buf, data);
}
#[tokio::test]
async fn read_part_of_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 30];
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
assert_eq!(buf, (10..40).collect::<Vec<u8>>());
}
#[tokio::test]
async fn seek_and_read_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
let mut buf = vec![0; 10];
assert_eq!(reader.read(&mut buf).await.unwrap(), 10);
assert_eq!(buf, (20..30).collect::<Vec<u8>>());
}
#[tokio::test]
async fn read_past_end_of_portion_is_eof() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 50];
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
assert_eq!(reader.read(&mut buf).await.unwrap(), 0); // hit EOF
}
#[tokio::test]
async fn seek_past_end_of_portion_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
// seeking past the portion returns an error
assert!(reader.seek(io::SeekFrom::Start(31)).await.is_err());
}
#[tokio::test]
async fn seek_to_negative_position_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
// seeking back to the start of the portion
assert_eq!(reader.seek(io::SeekFrom::Current(-10)).await.unwrap(), 0);
// seeking to a negative position returns an error
assert!(reader.seek(io::SeekFrom::Current(-1)).await.is_err());
}
#[tokio::test]
async fn seek_from_end_of_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 10];
// seek to 10 bytes before the end of the portion
assert_eq!(reader.seek(io::SeekFrom::End(-10)).await.unwrap(), 20);
assert_eq!(reader.read(&mut buf).await.unwrap(), 10);
// the final 10 bytes of the portion
assert_eq!(buf, (30..40).collect::<Vec<u8>>());
assert!(reader.is_eof());
}
#[tokio::test]
async fn seek_from_end_to_negative_position_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30);
// seeking to a negative position returns an error
assert!(reader.seek(io::SeekFrom::End(-31)).await.is_err());
}
#[tokio::test]
async fn zero_length_portion_returns_zero_on_read() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 0);
let mut buf = vec![0; 10];
// reading a portion with zero length returns 0 bytes
assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
}
#[tokio::test]
async fn is_eof_returns_true_at_end_of_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
// we are not at the end of the portion
assert!(!reader.is_eof());
let mut buf = vec![0; 30];
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
// we are at the end of the portion
assert!(reader.is_eof());
}
#[tokio::test]
async fn position_resets_after_seek_to_start() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
assert_eq!(reader.position(), 10);
assert_eq!(reader.seek(io::SeekFrom::Start(0)).await.unwrap(), 0);
assert_eq!(reader.position(), 0);
}
}

View File

@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
/// Calculates the new position after seeking. It checks if the new position
/// is valid (within the portion bounds) before returning it.
pub fn position_after_seek(
seek_from: io::SeekFrom,
position_in_portion: u64,
size_of_portion: u64,
) -> io::Result<u64> {
let new_position = match seek_from {
io::SeekFrom::Start(offset) => offset,
io::SeekFrom::Current(offset) => {
let next = (position_in_portion as i64) + offset;
if next < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid seek to a negative or overflowing position",
));
}
next as u64
}
io::SeekFrom::End(offset) => {
let end = size_of_portion as i64;
(end + offset) as u64
}
};
if new_position > size_of_portion {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid seek to a position beyond the end of the portion",
));
}
Ok(new_position)
}
#[cfg(test)]
mod tests {
use std::io::ErrorKind;
use super::*;
#[test]
fn test_position_after_seek_from_start() {
let result = position_after_seek(io::SeekFrom::Start(10), 0, 20).unwrap();
assert_eq!(result, 10);
}
#[test]
fn test_position_after_seek_from_start_out_of_bounds() {
let result = position_after_seek(io::SeekFrom::Start(30), 0, 20);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput);
}
#[test]
fn test_position_after_seek_from_current() {
let result = position_after_seek(io::SeekFrom::Current(10), 10, 30).unwrap();
assert_eq!(result, 20);
}
#[test]
fn test_position_after_seek_from_current_negative_position_within_bounds() {
let result = position_after_seek(io::SeekFrom::Current(-10), 15, 20).unwrap();
assert_eq!(result, 5);
}
#[test]
fn test_position_after_seek_from_current_negative_position() {
let result = position_after_seek(io::SeekFrom::Current(-10), 5, 20);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput);
}
#[test]
fn test_position_after_seek_from_end() {
let result = position_after_seek(io::SeekFrom::End(-10), 0, 30).unwrap();
assert_eq!(result, 20);
}
#[test]
fn test_position_after_seek_from_end_out_of_bounds() {
let result = position_after_seek(io::SeekFrom::End(10), 0, 20);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput);
}
}

View File

@@ -0,0 +1,180 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use crate::partial_reader::position::position_after_seek;
use crate::partial_reader::PartialReader;
impl<R: io::Read + io::Seek> io::Read for PartialReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// past end of portion
if self.position() > self.size() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid read past the end of the portion",
));
}
// end of portion
if self.is_eof() {
return Ok(0);
}
// haven't read from the portion yet, need to seek to the start of it.
if self.position_in_portion.is_none() {
self.source.seek(io::SeekFrom::Start(self.offset))?;
self.position_in_portion = Some(0);
}
// prevent reading over the end
let max_len = (self.size() - self.position_in_portion.unwrap()) as usize;
let actual_len = max_len.min(buf.len());
// create a limited reader
let target_buf = &mut buf[..actual_len];
// perform the actual read from the source and update the position.
let read_bytes = self.source.read(target_buf)?;
self.position_in_portion = Some(self.position_in_portion.unwrap() + read_bytes as u64);
Ok(read_bytes)
}
}
impl<R: io::Read + io::Seek> io::Seek for PartialReader<R> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
let new_position = position_after_seek(pos, self.position(), self.size())?;
let pos = io::SeekFrom::Start(self.offset + new_position);
self.source.seek(pos)?;
self.position_in_portion = Some(new_position);
Ok(new_position)
}
}
#[cfg(test)]
mod tests {
use std::io::{Cursor, Read, Seek, SeekFrom};
use super::*;
#[test]
fn read_all_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100);
let mut buf = vec![0; 100];
assert_eq!(reader.read(&mut buf).unwrap(), 100);
assert_eq!(buf, data);
}
#[test]
fn read_part_of_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 30];
assert_eq!(reader.read(&mut buf).unwrap(), 30);
assert_eq!(buf, (10..40).collect::<Vec<u8>>());
}
#[test]
fn seek_and_read_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10);
let mut buf = vec![0; 10];
assert_eq!(reader.read(&mut buf).unwrap(), 10);
assert_eq!(buf, (20..30).collect::<Vec<u8>>());
}
#[test]
fn read_past_end_of_portion_is_eof() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 50];
assert_eq!(reader.read(&mut buf).unwrap(), 30);
assert_eq!(reader.read(&mut buf).unwrap(), 0); // hit EOF
}
#[test]
fn seek_past_end_of_portion_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
// seeking past the portion returns an error
assert!(reader.seek(SeekFrom::Start(31)).is_err());
}
#[test]
fn seek_to_negative_position_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10);
// seeking back to the start of the portion
assert_eq!(reader.seek(SeekFrom::Current(-10)).unwrap(), 0);
// seeking to a negative position returns an error
assert!(reader.seek(SeekFrom::Current(-1)).is_err());
}
#[test]
fn seek_from_end_of_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 10];
// seek to 10 bytes before the end of the portion
assert_eq!(reader.seek(SeekFrom::End(-10)).unwrap(), 20);
assert_eq!(reader.read(&mut buf).unwrap(), 10);
// the final 10 bytes of the portion
assert_eq!(buf, (30..40).collect::<Vec<u8>>());
assert!(reader.is_eof());
}
#[test]
fn seek_from_end_to_negative_position_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30);
// seeking to a negative position returns an error
assert!(reader.seek(SeekFrom::End(-31)).is_err());
}
#[test]
fn zero_length_portion_returns_zero_on_read() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 0);
let mut buf = vec![0; 10];
// reading a portion with zero length returns 0 bytes
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}
#[test]
fn is_eof_returns_true_at_end_of_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
// we are not at the end of the portion
assert!(!reader.is_eof());
let mut buf = vec![0; 30];
assert_eq!(reader.read(&mut buf).unwrap(), 30);
// we are at the end of the portion
assert!(reader.is_eof());
}
#[test]
fn position_resets_after_seek_to_start() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10);
assert_eq!(reader.position(), 10);
assert_eq!(reader.seek(SeekFrom::Start(0)).unwrap(), 0);
assert_eq!(reader.position(), 0);
}
}