mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
feat(index): add RangeReader trait (#4718)
* feat(index): add `RangeReader` trait` Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix: return content_length as read bytes Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: remove buffer & use `BufMut` Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -9,10 +9,12 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
async-trait.workspace = true
|
||||
bitvec = "1.0"
|
||||
bytes.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
futures.workspace = true
|
||||
paste = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -1,242 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_macro::stack_trace_debug;
|
||||
use paste::paste;
|
||||
use snafu::{ensure, Location, ResultExt, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Destination buffer overflow, src_len: {}, dst_len: {}",
|
||||
src_len,
|
||||
dst_len
|
||||
))]
|
||||
Overflow {
|
||||
src_len: usize,
|
||||
dst_len: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Buffer underflow"))]
|
||||
Underflow {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("IO operation reach EOF"))]
|
||||
Eof {
|
||||
#[snafu(source)]
|
||||
error: std::io::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_read_le {
|
||||
( $($num_ty: ty), *) => {
|
||||
$(
|
||||
paste!{
|
||||
// TODO(hl): default implementation requires allocating a
|
||||
// temp buffer. maybe use more efficient impls in concrete buffers.
|
||||
// see https://github.com/GrepTimeTeam/greptimedb/pull/97#discussion_r930798941
|
||||
fn [<read_ $num_ty _le>](&mut self) -> Result<$num_ty> {
|
||||
let mut buf = [0u8; std::mem::size_of::<$num_ty>()];
|
||||
self.read_to_slice(&mut buf)?;
|
||||
Ok($num_ty::from_le_bytes(buf))
|
||||
}
|
||||
|
||||
fn [<peek_ $num_ty _le>](&mut self) -> Result<$num_ty> {
|
||||
let mut buf = [0u8; std::mem::size_of::<$num_ty>()];
|
||||
self.peek_to_slice(&mut buf)?;
|
||||
Ok($num_ty::from_le_bytes(buf))
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_write_le {
|
||||
( $($num_ty: ty), *) => {
|
||||
$(
|
||||
paste!{
|
||||
fn [<write_ $num_ty _le>](&mut self, n: $num_ty) -> Result<()> {
|
||||
self.write_from_slice(&n.to_le_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Buffer {
|
||||
/// Returns remaining data size for read.
|
||||
fn remaining_size(&self) -> usize;
|
||||
|
||||
/// Returns true if buffer has no data for read.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.remaining_size() == 0
|
||||
}
|
||||
|
||||
/// Peeks data into dst. This method should not change internal cursor,
|
||||
/// invoke `advance_by` if needed.
|
||||
/// # Panics
|
||||
/// This method **may** panic if buffer does not have enough data to be copied to dst.
|
||||
fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()>;
|
||||
|
||||
/// Reads data into dst. This method will change internal cursor.
|
||||
/// # Panics
|
||||
/// This method **may** panic if buffer does not have enough data to be copied to dst.
|
||||
fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> {
|
||||
self.peek_to_slice(dst)?;
|
||||
self.advance_by(dst.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Advances internal cursor for next read.
|
||||
/// # Panics
|
||||
/// This method **may** panic if the offset after advancing exceeds the length of underlying buffer.
|
||||
fn advance_by(&mut self, by: usize);
|
||||
|
||||
impl_read_le![u8, i8, u16, i16, u32, i32, u64, i64, f32, f64];
|
||||
}
|
||||
|
||||
macro_rules! impl_buffer_for_bytes {
|
||||
( $($buf_ty:ty), *) => {
|
||||
$(
|
||||
impl Buffer for $buf_ty {
|
||||
fn remaining_size(&self) -> usize{
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> {
|
||||
let dst_len = dst.len();
|
||||
ensure!(self.remaining() >= dst.len(), OverflowSnafu {
|
||||
src_len: self.remaining_size(),
|
||||
dst_len,
|
||||
}
|
||||
);
|
||||
dst.copy_from_slice(&self[0..dst_len]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn advance_by(&mut self, by: usize) {
|
||||
self.advance(by);
|
||||
}
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
impl_buffer_for_bytes![bytes::Bytes, bytes::BytesMut];
|
||||
|
||||
impl Buffer for &[u8] {
|
||||
fn remaining_size(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> {
|
||||
let dst_len = dst.len();
|
||||
ensure!(
|
||||
self.len() >= dst.len(),
|
||||
OverflowSnafu {
|
||||
src_len: self.remaining_size(),
|
||||
dst_len,
|
||||
}
|
||||
);
|
||||
dst.copy_from_slice(&self[0..dst_len]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> {
|
||||
ensure!(
|
||||
self.len() >= dst.len(),
|
||||
OverflowSnafu {
|
||||
src_len: self.remaining_size(),
|
||||
dst_len: dst.len(),
|
||||
}
|
||||
);
|
||||
self.read_exact(dst).context(EofSnafu)
|
||||
}
|
||||
|
||||
fn advance_by(&mut self, by: usize) {
|
||||
*self = &self[by..];
|
||||
}
|
||||
}
|
||||
|
||||
/// Mutable buffer.
|
||||
pub trait BufferMut {
|
||||
fn as_slice(&self) -> &[u8];
|
||||
|
||||
fn write_from_slice(&mut self, src: &[u8]) -> Result<()>;
|
||||
|
||||
impl_write_le![i8, u8, i16, u16, i32, u32, i64, u64, f32, f64];
|
||||
}
|
||||
|
||||
impl BufferMut for BytesMut {
|
||||
fn as_slice(&self) -> &[u8] {
|
||||
self
|
||||
}
|
||||
|
||||
fn write_from_slice(&mut self, src: &[u8]) -> Result<()> {
|
||||
self.put_slice(src);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl BufferMut for &mut [u8] {
|
||||
fn as_slice(&self) -> &[u8] {
|
||||
self
|
||||
}
|
||||
|
||||
fn write_from_slice(&mut self, src: &[u8]) -> Result<()> {
|
||||
// see std::io::Write::write_all
|
||||
// https://doc.rust-lang.org/src/std/io/impls.rs.html#363
|
||||
self.write_all(src).map_err(|_| {
|
||||
OverflowSnafu {
|
||||
src_len: src.len(),
|
||||
dst_len: self.as_slice().len(),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl BufferMut for Vec<u8> {
|
||||
fn as_slice(&self) -> &[u8] {
|
||||
self
|
||||
}
|
||||
|
||||
fn write_from_slice(&mut self, src: &[u8]) -> Result<()> {
|
||||
self.extend_from_slice(src);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -44,6 +44,12 @@ impl From<Vec<u8>> for Bytes {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Bytes> for Vec<u8> {
|
||||
fn from(bytes: Bytes) -> Vec<u8> {
|
||||
bytes.0.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Bytes {
|
||||
type Target = [u8];
|
||||
|
||||
|
||||
@@ -13,9 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod bit_vec;
|
||||
pub mod buffer;
|
||||
pub mod bytes;
|
||||
pub mod plugins;
|
||||
pub mod range_read;
|
||||
#[allow(clippy::all)]
|
||||
pub mod readable_size;
|
||||
pub mod secrets;
|
||||
|
||||
80
src/common/base/src/range_read.rs
Normal file
80
src/common/base/src/range_read.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::{BufMut, Bytes};
|
||||
use futures::{AsyncReadExt, AsyncSeekExt};
|
||||
|
||||
/// `Metadata` contains the metadata of a source.
|
||||
pub struct Metadata {
|
||||
/// The length of the source in bytes.
|
||||
pub content_length: u64,
|
||||
}
|
||||
|
||||
/// `RangeReader` reads a range of bytes from a source.
|
||||
#[async_trait]
|
||||
pub trait RangeReader: Send + Unpin {
|
||||
/// Returns the metadata of the source.
|
||||
async fn metadata(&mut self) -> io::Result<Metadata>;
|
||||
|
||||
/// Reads the bytes in the given range.
|
||||
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes>;
|
||||
|
||||
/// Reads the bytes in the given range into the buffer.
|
||||
///
|
||||
/// Handles the buffer based on its capacity:
|
||||
/// - If the buffer is insufficient to hold the bytes, it will either:
|
||||
/// - Allocate additional space (e.g., for `Vec<u8>`)
|
||||
/// - Panic (e.g., for `&mut [u8]`)
|
||||
async fn read_into(
|
||||
&mut self,
|
||||
range: Range<u64>,
|
||||
buf: &mut (impl BufMut + Send),
|
||||
) -> io::Result<()> {
|
||||
let bytes = self.read(range).await?;
|
||||
buf.put_slice(&bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads the bytes in the given ranges.
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
|
||||
let mut result = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
result.push(self.read(range.clone()).await?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
|
||||
///
|
||||
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
|
||||
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
|
||||
#[async_trait]
|
||||
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader for R {
|
||||
async fn metadata(&mut self) -> io::Result<Metadata> {
|
||||
let content_length = self.seek(io::SeekFrom::End(0)).await?;
|
||||
Ok(Metadata { content_length })
|
||||
}
|
||||
|
||||
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
|
||||
let mut buf = vec![0; (range.end - range.start) as usize];
|
||||
self.seek(io::SeekFrom::Start(range.start)).await?;
|
||||
self.read_exact(&mut buf).await?;
|
||||
Ok(Bytes::from(buf))
|
||||
}
|
||||
}
|
||||
@@ -1,182 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use common_base::buffer::Error::Overflow;
|
||||
use common_base::buffer::{Buffer, BufferMut};
|
||||
use paste::paste;
|
||||
|
||||
#[test]
|
||||
pub fn test_buffer_read_write() {
|
||||
let mut buf = BytesMut::with_capacity(16);
|
||||
buf.write_u64_le(1234u64).unwrap();
|
||||
let result = buf.peek_u64_le().unwrap();
|
||||
assert_eq!(1234u64, result);
|
||||
buf.advance_by(8);
|
||||
|
||||
buf.write_from_slice("hello, world".as_bytes()).unwrap();
|
||||
let mut content = vec![0u8; 5];
|
||||
buf.peek_to_slice(&mut content).unwrap();
|
||||
let read = String::from_utf8_lossy(&content);
|
||||
assert_eq!("hello", read);
|
||||
buf.advance_by(5);
|
||||
// after read, buffer should still have 7 bytes to read.
|
||||
assert_eq!(7, buf.remaining());
|
||||
|
||||
let mut content = vec![0u8; 6];
|
||||
buf.read_to_slice(&mut content).unwrap();
|
||||
let read = String::from_utf8_lossy(&content);
|
||||
assert_eq!(", worl", read);
|
||||
// after read, buffer should still have 1 byte to read.
|
||||
assert_eq!(1, buf.remaining());
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_buffer_read() {
|
||||
let mut bytes = Bytes::from_static("hello".as_bytes());
|
||||
assert_eq!(5, bytes.remaining_size());
|
||||
assert_eq!(b'h', bytes.peek_u8_le().unwrap());
|
||||
bytes.advance_by(1);
|
||||
assert_eq!(4, bytes.remaining_size());
|
||||
}
|
||||
|
||||
macro_rules! test_primitive_read_write {
|
||||
( $($num_ty: ty), *) => {
|
||||
$(
|
||||
paste!{
|
||||
#[test]
|
||||
fn [<test_read_write_ $num_ty>]() {
|
||||
assert_eq!($num_ty::MAX,(&mut $num_ty::MAX.to_le_bytes() as &[u8]).[<read_ $num_ty _le>]().unwrap());
|
||||
assert_eq!($num_ty::MIN,(&mut $num_ty::MIN.to_le_bytes() as &[u8]).[<read_ $num_ty _le>]().unwrap());
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
test_primitive_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64];
|
||||
|
||||
#[test]
|
||||
pub fn test_read_write_from_slice_buffer() {
|
||||
let mut buf = "hello".as_bytes();
|
||||
assert_eq!(104, buf.peek_u8_le().unwrap());
|
||||
buf.advance_by(1);
|
||||
assert_eq!(101, buf.peek_u8_le().unwrap());
|
||||
buf.advance_by(1);
|
||||
assert_eq!(108, buf.peek_u8_le().unwrap());
|
||||
buf.advance_by(1);
|
||||
assert_eq!(108, buf.peek_u8_le().unwrap());
|
||||
buf.advance_by(1);
|
||||
assert_eq!(111, buf.peek_u8_le().unwrap());
|
||||
buf.advance_by(1);
|
||||
assert_matches!(buf.peek_u8_le(), Err(Overflow { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_read_u8_from_slice_buffer() {
|
||||
let mut buf = "hello".as_bytes();
|
||||
assert_eq!(104, buf.read_u8_le().unwrap());
|
||||
assert_eq!(101, buf.read_u8_le().unwrap());
|
||||
assert_eq!(108, buf.read_u8_le().unwrap());
|
||||
assert_eq!(108, buf.read_u8_le().unwrap());
|
||||
assert_eq!(111, buf.read_u8_le().unwrap());
|
||||
assert_matches!(buf.read_u8_le(), Err(Overflow { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_read_write_numbers() {
|
||||
let mut buf: Vec<u8> = vec![];
|
||||
buf.write_u64_le(1234).unwrap();
|
||||
assert_eq!(1234, (&buf[..]).read_u64_le().unwrap());
|
||||
|
||||
buf.write_u32_le(4242).unwrap();
|
||||
let mut p = &buf[..];
|
||||
assert_eq!(1234, p.read_u64_le().unwrap());
|
||||
assert_eq!(4242, p.read_u32_le().unwrap());
|
||||
}
|
||||
|
||||
macro_rules! test_primitive_vec_read_write {
|
||||
( $($num_ty: ty), *) => {
|
||||
$(
|
||||
paste!{
|
||||
#[test]
|
||||
fn [<test_read_write_ $num_ty _from_vec_buffer>]() {
|
||||
let mut buf = vec![];
|
||||
let _ = buf.[<write_ $num_ty _le>]($num_ty::MAX).unwrap();
|
||||
assert_eq!($num_ty::MAX, buf.as_slice().[<read_ $num_ty _le>]().unwrap());
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
test_primitive_vec_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64];
|
||||
|
||||
#[test]
|
||||
pub fn test_peek_write_from_vec_buffer() {
|
||||
let mut buf: Vec<u8> = vec![];
|
||||
buf.write_from_slice("hello".as_bytes()).unwrap();
|
||||
let mut slice = buf.as_slice();
|
||||
assert_eq!(104, slice.peek_u8_le().unwrap());
|
||||
slice.advance_by(1);
|
||||
assert_eq!(101, slice.peek_u8_le().unwrap());
|
||||
slice.advance_by(1);
|
||||
assert_eq!(108, slice.peek_u8_le().unwrap());
|
||||
slice.advance_by(1);
|
||||
assert_eq!(108, slice.peek_u8_le().unwrap());
|
||||
slice.advance_by(1);
|
||||
assert_eq!(111, slice.peek_u8_le().unwrap());
|
||||
slice.advance_by(1);
|
||||
assert_matches!(slice.read_u8_le(), Err(Overflow { .. }));
|
||||
}
|
||||
|
||||
macro_rules! test_primitive_bytes_read_write {
|
||||
( $($num_ty: ty), *) => {
|
||||
$(
|
||||
paste!{
|
||||
#[test]
|
||||
fn [<test_read_write_ $num_ty _from_bytes>]() {
|
||||
let mut bytes = bytes::Bytes::from($num_ty::MAX.to_le_bytes().to_vec());
|
||||
assert_eq!($num_ty::MAX, bytes.[<read_ $num_ty _le>]().unwrap());
|
||||
|
||||
let mut bytes = bytes::Bytes::from($num_ty::MIN.to_le_bytes().to_vec());
|
||||
assert_eq!($num_ty::MIN, bytes.[<read_ $num_ty _le>]().unwrap());
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
test_primitive_bytes_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64];
|
||||
|
||||
#[test]
|
||||
pub fn test_write_overflow() {
|
||||
let mut buf = [0u8; 4];
|
||||
assert_matches!(
|
||||
(&mut buf[..]).write_from_slice("hell".as_bytes()),
|
||||
Ok { .. }
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
(&mut buf[..]).write_from_slice("hello".as_bytes()),
|
||||
Err(common_base::buffer::Error::Overflow { .. })
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -12,15 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::SeekFrom;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
|
||||
use common_base::range_read::RangeReader;
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu};
|
||||
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
|
||||
use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::format::MIN_BLOB_SIZE;
|
||||
@@ -49,28 +48,28 @@ impl<R> InvertedIndexBlobReader<R> {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: AsyncRead + AsyncSeek + Unpin + Send> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
async fn read_all(&mut self, dest: &mut Vec<u8>) -> Result<usize> {
|
||||
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
|
||||
self.source
|
||||
.seek(SeekFrom::Start(0))
|
||||
.read_into(0..metadata.content_length, dest)
|
||||
.await
|
||||
.context(SeekSnafu)?;
|
||||
self.source.read_to_end(dest).await.context(ReadSnafu)
|
||||
.context(CommonIoSnafu)?;
|
||||
Ok(metadata.content_length as usize)
|
||||
}
|
||||
|
||||
async fn seek_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>> {
|
||||
self.source
|
||||
.seek(SeekFrom::Start(offset))
|
||||
let buf = self
|
||||
.source
|
||||
.read(offset..offset + size as u64)
|
||||
.await
|
||||
.context(SeekSnafu)?;
|
||||
let mut buf = vec![0u8; size as usize];
|
||||
self.source.read(&mut buf).await.context(ReadSnafu)?;
|
||||
Ok(buf)
|
||||
.context(CommonIoSnafu)?;
|
||||
Ok(buf.into())
|
||||
}
|
||||
|
||||
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
let end = SeekFrom::End(0);
|
||||
let blob_size = self.source.seek(end).await.context(SeekSnafu)?;
|
||||
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
|
||||
let blob_size = metadata.content_length;
|
||||
Self::validate_blob_size(blob_size)?;
|
||||
|
||||
let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size);
|
||||
|
||||
@@ -12,32 +12,30 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::SeekFrom;
|
||||
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
|
||||
use common_base::range_read::RangeReader;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{
|
||||
DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
|
||||
CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
|
||||
UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu,
|
||||
};
|
||||
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
|
||||
/// InvertedIndeFooterReader is for reading the footer section of the blob.
|
||||
pub struct InvertedIndeFooterReader<R> {
|
||||
source: R,
|
||||
pub struct InvertedIndeFooterReader<'a, R> {
|
||||
source: &'a mut R,
|
||||
blob_size: u64,
|
||||
}
|
||||
|
||||
impl<R> InvertedIndeFooterReader<R> {
|
||||
pub fn new(source: R, blob_size: u64) -> Self {
|
||||
impl<'a, R> InvertedIndeFooterReader<'a, R> {
|
||||
pub fn new(source: &'a mut R, blob_size: u64) -> Self {
|
||||
Self { source, blob_size }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncRead + AsyncSeek + Unpin> InvertedIndeFooterReader<R> {
|
||||
impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> {
|
||||
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
|
||||
let payload_size = self.read_payload_size().await?;
|
||||
let metas = self.read_payload(payload_size).await?;
|
||||
@@ -45,26 +43,26 @@ impl<R: AsyncRead + AsyncSeek + Unpin> InvertedIndeFooterReader<R> {
|
||||
}
|
||||
|
||||
async fn read_payload_size(&mut self) -> Result<u64> {
|
||||
let size_offset = SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE);
|
||||
self.source.seek(size_offset).await.context(SeekSnafu)?;
|
||||
let size_buf = &mut [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
|
||||
self.source.read_exact(size_buf).await.context(ReadSnafu)?;
|
||||
let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
|
||||
let end = self.blob_size;
|
||||
let start = end - FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
self.source
|
||||
.read_into(start..end, &mut &mut size_buf[..])
|
||||
.await
|
||||
.context(CommonIoSnafu)?;
|
||||
|
||||
let payload_size = u32::from_le_bytes(*size_buf) as u64;
|
||||
let payload_size = u32::from_le_bytes(size_buf) as u64;
|
||||
self.validate_payload_size(payload_size)?;
|
||||
|
||||
Ok(payload_size)
|
||||
}
|
||||
|
||||
async fn read_payload(&mut self, payload_size: u64) -> Result<InvertedIndexMetas> {
|
||||
let payload_offset =
|
||||
SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size);
|
||||
self.source.seek(payload_offset).await.context(SeekSnafu)?;
|
||||
let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
let start = end - payload_size;
|
||||
let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?;
|
||||
|
||||
let payload = &mut vec![0u8; payload_size as usize];
|
||||
self.source.read_exact(payload).await.context(ReadSnafu)?;
|
||||
|
||||
let metas = InvertedIndexMetas::decode(&payload[..]).context(DecodeProtoSnafu)?;
|
||||
let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?;
|
||||
self.validate_metas(&metas, payload_size)?;
|
||||
|
||||
Ok(metas)
|
||||
@@ -144,7 +142,8 @@ mod tests {
|
||||
|
||||
let payload_buf = create_test_payload(meta);
|
||||
let blob_size = payload_buf.len() as u64;
|
||||
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
|
||||
let mut cursor = Cursor::new(payload_buf);
|
||||
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
|
||||
|
||||
let payload_size = reader.read_payload_size().await.unwrap();
|
||||
let metas = reader.read_payload(payload_size).await.unwrap();
|
||||
@@ -164,7 +163,8 @@ mod tests {
|
||||
let mut payload_buf = create_test_payload(meta);
|
||||
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
|
||||
let blob_size = payload_buf.len() as u64;
|
||||
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
|
||||
let mut cursor = Cursor::new(payload_buf);
|
||||
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
|
||||
|
||||
let payload_size_result = reader.read_payload_size().await;
|
||||
assert!(payload_size_result.is_err());
|
||||
@@ -181,7 +181,8 @@ mod tests {
|
||||
|
||||
let payload_buf = create_test_payload(meta);
|
||||
let blob_size = payload_buf.len() as u64;
|
||||
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
|
||||
let mut cursor = Cursor::new(payload_buf);
|
||||
let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size);
|
||||
|
||||
let payload_size = reader.read_payload_size().await.unwrap();
|
||||
let payload_result = reader.read_payload(payload_size).await;
|
||||
|
||||
Reference in New Issue
Block a user