mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat(inverted_index): add index reader (#2803)
* feat(inverted_index): add reader Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix: toml format Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: add prefix relative_ to the offset parameter Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * docs: add doc comment Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: update proto Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix: outdated docs Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -3343,6 +3343,12 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fst"
|
||||
version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a"
|
||||
|
||||
[[package]]
|
||||
name = "funty"
|
||||
version = "2.0.0"
|
||||
@@ -3537,7 +3543,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11efce55d8ce20257e08842e4f4c1c8fce2b3a8#a11efce55d8ce20257e08842e4f4c1c8fce2b3a8"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7#2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7"
|
||||
dependencies = [
|
||||
"prost 0.12.2",
|
||||
"serde",
|
||||
@@ -3902,6 +3908,23 @@ dependencies = [
|
||||
"quote",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.4.3"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"fst",
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"prost 0.12.2",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
|
||||
@@ -51,6 +51,7 @@ members = [
|
||||
"src/sql",
|
||||
"src/store-api",
|
||||
"src/table",
|
||||
"src/index",
|
||||
"tests-integration",
|
||||
"tests/runner",
|
||||
]
|
||||
@@ -83,9 +84,10 @@ datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev =
|
||||
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
|
||||
derive_builder = "0.12"
|
||||
etcd-client = "0.12"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11efce55d8ce20257e08842e4f4c1c8fce2b3a8" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7" }
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
|
||||
20
src/index/Cargo.toml
Normal file
20
src/index/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "index"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
fst.workspace = true
|
||||
futures.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
prost.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-util.workspace = true
|
||||
tokio.workspace = true
|
||||
16
src/index/src/inverted_index.rs
Normal file
16
src/index/src/inverted_index.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod error;
|
||||
pub mod format;
|
||||
99
src/index/src/inverted_index/error.rs
Normal file
99
src/index/src/inverted_index/error.rs
Normal file
@@ -0,0 +1,99 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::io::Error as IoError;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to seek"))]
|
||||
Seek {
|
||||
#[snafu(source)]
|
||||
error: IoError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read"))]
|
||||
Read {
|
||||
#[snafu(source)]
|
||||
error: IoError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unexpected inverted index blob size, min: {min_blob_size}, actual: {actual_blob_size}"
|
||||
))]
|
||||
UnexpectedBlobSize {
|
||||
min_blob_size: u64,
|
||||
actual_blob_size: u64,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))]
|
||||
UnexpectedFooterPayloadSize {
|
||||
max_payload_size: u64,
|
||||
actual_payload_size: u64,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected inverted index offset size, offset: {offset}, size: {size}, blob_size: {blob_size}, payload_size: {payload_size}"))]
|
||||
UnexpectedOffsetSize {
|
||||
offset: u64,
|
||||
size: u64,
|
||||
blob_size: u64,
|
||||
payload_size: u64,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode fst"))]
|
||||
DecodeFst {
|
||||
#[snafu(source)]
|
||||
error: fst::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode protobuf"))]
|
||||
DecodeProto {
|
||||
#[snafu(source)]
|
||||
error: prost::DecodeError,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
match self {
|
||||
Seek { .. }
|
||||
| Read { .. }
|
||||
| UnexpectedFooterPayloadSize { .. }
|
||||
| UnexpectedOffsetSize { .. }
|
||||
| UnexpectedBlobSize { .. }
|
||||
| DecodeProto { .. }
|
||||
| DecodeFst { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
56
src/index/src/inverted_index/format.rs
Normal file
56
src/index/src/inverted_index/format.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! # SST Files with Inverted Index Format Specification
|
||||
//!
|
||||
//! ## File Structure
|
||||
//!
|
||||
//! Each SST file includes a series of inverted indices followed by a footer.
|
||||
//!
|
||||
//! `inverted_index₀ inverted_index₁ ... inverted_indexₙ footer`
|
||||
//!
|
||||
//! - Each `inverted_indexᵢ` represents an index entry corresponding to tag values and their locations within the file.
|
||||
//! - `footer`: Contains metadata about the inverted indices, encoded as a protobuf message.
|
||||
//!
|
||||
//! ## Inverted Index Internals
|
||||
//!
|
||||
//! An inverted index comprises a collection of bitmaps, a null bitmap, and a finite state transducer (FST) indicating tag values' positions:
|
||||
//!
|
||||
//! `bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ null_bitmap fst`
|
||||
//!
|
||||
//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group.
|
||||
//! - `null_bitmap`: Bitset tracking the presence of null values within the tag column.
|
||||
//! - `fst`: Finite State Transducer providing an ordered map of bytes, representing the tag values.
|
||||
//!
|
||||
//! ## Footer Details
|
||||
//!
|
||||
//! The footer encapsulates the metadata for inversion mappings:
|
||||
//!
|
||||
//! `footer_payload footer_payload_size`
|
||||
//!
|
||||
//! - `footer_payload`: Protobuf-encoded [`InvertedIndexMetas`] describing the metadata of each inverted index.
|
||||
//! - `footer_payload_size`: Size in bytes of the `footer_payload`, displayed as a `u32` integer.
|
||||
//! - The footer aids in the interpretation of the inverted indices, providing necessary offset and count information.
|
||||
//!
|
||||
//! ## Reference
|
||||
//!
|
||||
//! More detailed information regarding the encoding of the inverted indices can be found in the [RFC].
|
||||
//!
|
||||
//! [`InvertedIndexMetas`]: https://github.com/GreptimeTeam/greptime-proto/blob/2aaee38de81047537dfa42af9df63bcfb866e06c/proto/greptime/v1/index/inverted_index.proto#L32-L64
|
||||
//! [RFC]: https://github.com/GreptimeTeam/greptimedb/blob/develop/docs/rfcs/2023-11-03-inverted-index.md
|
||||
|
||||
pub mod reader;
|
||||
|
||||
const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4;
|
||||
const MIN_BLOB_SIZE: u64 = FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
43
src/index/src/inverted_index/format/reader.rs
Normal file
43
src/index/src/inverted_index/format/reader.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod blob;
|
||||
mod footer;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use fst::Map;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
|
||||
use crate::inverted_index::error::Result;
|
||||
|
||||
pub type FstMap = Map<Vec<u8>>;
|
||||
|
||||
/// InvertedIndexReader defines an asynchronous reader of inverted index data
|
||||
#[async_trait]
|
||||
pub trait InvertedIndexReader {
|
||||
/// Retrieve metadata of all inverted indices stored within the blob.
|
||||
async fn metadata(&mut self) -> Result<InvertedIndexMetas>;
|
||||
|
||||
/// Retrieve the finite state transducer (FST) map for a given inverted index metadata entry.
|
||||
async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result<FstMap>;
|
||||
|
||||
/// Retrieve the bitmap for a given inverted index metadata entry at the specified offset and size.
|
||||
async fn bitmap(
|
||||
&mut self,
|
||||
meta: &InvertedIndexMeta,
|
||||
relative_offset: u32,
|
||||
size: u32,
|
||||
) -> Result<BitVec>;
|
||||
}
|
||||
235
src/index/src/inverted_index/format/reader/blob.rs
Normal file
235
src/index/src/inverted_index/format/reader/blob.rs
Normal file
@@ -0,0 +1,235 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::SeekFrom;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{
|
||||
DecodeFstSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu,
|
||||
};
|
||||
use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader;
|
||||
use crate::inverted_index::format::reader::{FstMap, InvertedIndexReader};
|
||||
use crate::inverted_index::format::MIN_BLOB_SIZE;
|
||||
|
||||
/// Inverted index blob reader, implements [`InvertedIndexReader`]
|
||||
pub struct InvertedIndexBlobReader<R> {
|
||||
/// The blob
|
||||
source: R,
|
||||
}
|
||||
|
||||
impl<R> InvertedIndexBlobReader<R> {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(source: R) -> Self {
|
||||
Self { source }
|
||||
}
|
||||
|
||||
fn validate_blob_size(blob_size: u64) -> Result<()> {
|
||||
ensure!(
|
||||
blob_size >= MIN_BLOB_SIZE,
|
||||
UnexpectedBlobSizeSnafu {
|
||||
min_blob_size: MIN_BLOB_SIZE,
|
||||
actual_blob_size: blob_size,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: AsyncRead + AsyncSeek + Unpin + Send> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
|
||||
let end = SeekFrom::End(0);
|
||||
let blob_size = self.source.seek(end).await.context(SeekSnafu)?;
|
||||
Self::validate_blob_size(blob_size)?;
|
||||
|
||||
let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size);
|
||||
footer_reader.metadata().await
|
||||
}
|
||||
|
||||
async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result<FstMap> {
|
||||
let offset = SeekFrom::Start(meta.base_offset + meta.relative_fst_offset as u64);
|
||||
self.source.seek(offset).await.context(SeekSnafu)?;
|
||||
let mut buf = vec![0u8; meta.fst_size as usize];
|
||||
self.source.read_exact(&mut buf).await.context(ReadSnafu)?;
|
||||
|
||||
FstMap::new(buf).context(DecodeFstSnafu)
|
||||
}
|
||||
|
||||
async fn bitmap(
|
||||
&mut self,
|
||||
meta: &InvertedIndexMeta,
|
||||
relative_offset: u32,
|
||||
size: u32,
|
||||
) -> Result<BitVec> {
|
||||
let offset = SeekFrom::Start(meta.base_offset + relative_offset as u64);
|
||||
self.source.seek(offset).await.context(SeekSnafu)?;
|
||||
let mut buf = vec![0u8; size as usize];
|
||||
self.source.read_exact(&mut buf).await.context(ReadSnafu)?;
|
||||
|
||||
Ok(BitVec::from_vec(buf))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::bit_vec::prelude::*;
|
||||
use fst::MapBuilder;
|
||||
use futures::io::Cursor;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use prost::Message;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn create_fake_fst() -> Vec<u8> {
|
||||
let mut fst_buf = Vec::new();
|
||||
let mut build = MapBuilder::new(&mut fst_buf).unwrap();
|
||||
build.insert("key1".as_bytes(), 1).unwrap();
|
||||
build.insert("key2".as_bytes(), 2).unwrap();
|
||||
build.finish().unwrap();
|
||||
fst_buf
|
||||
}
|
||||
|
||||
fn create_fake_bitmap() -> Vec<u8> {
|
||||
bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0, 1, 0].into_vec()
|
||||
}
|
||||
|
||||
fn create_inverted_index_blob() -> Vec<u8> {
|
||||
let bitmap_size = create_fake_bitmap().len();
|
||||
let fst_size = create_fake_fst().len();
|
||||
|
||||
// first index
|
||||
let mut inverted_index = Vec::new();
|
||||
inverted_index.extend_from_slice(&create_fake_bitmap()); // value bitmap
|
||||
inverted_index.extend_from_slice(&create_fake_bitmap()); // null bitmap
|
||||
inverted_index.extend_from_slice(&create_fake_fst()); // fst
|
||||
|
||||
let meta = InvertedIndexMeta {
|
||||
name: "tag0".to_string(),
|
||||
base_offset: 0,
|
||||
inverted_index_size: inverted_index.len() as _,
|
||||
relative_null_bitmap_offset: bitmap_size as _,
|
||||
null_bitmap_size: bitmap_size as _,
|
||||
relative_fst_offset: (bitmap_size * 2) as _,
|
||||
fst_size: fst_size as _,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// second index
|
||||
let meta1 = InvertedIndexMeta {
|
||||
name: "tag1".to_string(),
|
||||
base_offset: meta.inverted_index_size,
|
||||
inverted_index_size: inverted_index.len() as _,
|
||||
relative_null_bitmap_offset: bitmap_size as _,
|
||||
null_bitmap_size: bitmap_size as _,
|
||||
relative_fst_offset: (bitmap_size * 2) as _,
|
||||
fst_size: fst_size as _,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// metas
|
||||
let mut metas = InvertedIndexMetas::default();
|
||||
metas.metas.insert(meta.name.clone(), meta);
|
||||
metas.metas.insert(meta1.name.clone(), meta1);
|
||||
let mut meta_buf = Vec::new();
|
||||
metas.encode(&mut meta_buf).unwrap();
|
||||
|
||||
let mut blob = vec![];
|
||||
|
||||
// first index
|
||||
blob.extend_from_slice(&inverted_index);
|
||||
|
||||
// second index
|
||||
blob.extend_from_slice(&inverted_index);
|
||||
|
||||
// footer
|
||||
blob.extend_from_slice(&meta_buf);
|
||||
blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes());
|
||||
|
||||
blob
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_inverted_index_blob_reader_metadata() {
|
||||
let blob = create_inverted_index_blob();
|
||||
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
assert_eq!(metas.metas.len(), 2);
|
||||
|
||||
let meta0 = metas.metas.get("tag0").unwrap();
|
||||
assert_eq!(meta0.name, "tag0");
|
||||
assert_eq!(meta0.base_offset, 0);
|
||||
assert_eq!(meta0.inverted_index_size, 54);
|
||||
assert_eq!(meta0.relative_null_bitmap_offset, 2);
|
||||
assert_eq!(meta0.null_bitmap_size, 2);
|
||||
assert_eq!(meta0.relative_fst_offset, 4);
|
||||
assert_eq!(meta0.fst_size, 50);
|
||||
|
||||
let meta1 = metas.metas.get("tag1").unwrap();
|
||||
assert_eq!(meta1.name, "tag1");
|
||||
assert_eq!(meta1.base_offset, 54);
|
||||
assert_eq!(meta1.inverted_index_size, 54);
|
||||
assert_eq!(meta1.relative_null_bitmap_offset, 2);
|
||||
assert_eq!(meta1.null_bitmap_size, 2);
|
||||
assert_eq!(meta1.relative_fst_offset, 4);
|
||||
assert_eq!(meta1.fst_size, 50);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_inverted_index_blob_reader_fst() {
|
||||
let blob = create_inverted_index_blob();
|
||||
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let fst_map = blob_reader.fst(meta).await.unwrap();
|
||||
assert_eq!(fst_map.len(), 2);
|
||||
assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
|
||||
assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
|
||||
|
||||
let meta = metas.metas.get("tag1").unwrap();
|
||||
let fst_map = blob_reader.fst(meta).await.unwrap();
|
||||
assert_eq!(fst_map.len(), 2);
|
||||
assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
|
||||
assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_inverted_index_blob_reader_bitmap() {
|
||||
let blob = create_inverted_index_blob();
|
||||
let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob));
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
let bitmap = blob_reader.bitmap(meta, 2, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let meta = metas.metas.get("tag1").unwrap();
|
||||
|
||||
let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
let bitmap = blob_reader.bitmap(meta, 2, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
}
|
||||
}
|
||||
186
src/index/src/inverted_index/format/reader/footer.rs
Normal file
186
src/index/src/inverted_index/format/reader/footer.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::SeekFrom;
|
||||
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{
|
||||
DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
|
||||
UnexpectedOffsetSizeSnafu,
|
||||
};
|
||||
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
|
||||
/// InvertedIndeFooterReader is for reading the footer section of the blob.
|
||||
pub struct InvertedIndeFooterReader<R> {
|
||||
source: R,
|
||||
blob_size: u64,
|
||||
}
|
||||
|
||||
impl<R> InvertedIndeFooterReader<R> {
|
||||
pub fn new(source: R, blob_size: u64) -> Self {
|
||||
Self { source, blob_size }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncRead + AsyncSeek + Unpin> InvertedIndeFooterReader<R> {
|
||||
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
|
||||
let payload_size = self.read_payload_size().await?;
|
||||
let metas = self.read_payload(payload_size).await?;
|
||||
Ok(metas)
|
||||
}
|
||||
|
||||
async fn read_payload_size(&mut self) -> Result<u64> {
|
||||
let size_offset = SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE);
|
||||
self.source.seek(size_offset).await.context(SeekSnafu)?;
|
||||
let size_buf = &mut [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
|
||||
self.source.read_exact(size_buf).await.context(ReadSnafu)?;
|
||||
|
||||
let payload_size = u32::from_le_bytes(*size_buf) as u64;
|
||||
self.validate_payload_size(payload_size)?;
|
||||
|
||||
Ok(payload_size)
|
||||
}
|
||||
|
||||
async fn read_payload(&mut self, payload_size: u64) -> Result<InvertedIndexMetas> {
|
||||
let payload_offset =
|
||||
SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size);
|
||||
self.source.seek(payload_offset).await.context(SeekSnafu)?;
|
||||
|
||||
let payload = &mut vec![0u8; payload_size as usize];
|
||||
self.source.read_exact(payload).await.context(ReadSnafu)?;
|
||||
|
||||
let metas = InvertedIndexMetas::decode(&payload[..]).context(DecodeProtoSnafu)?;
|
||||
self.validate_metas(&metas, payload_size)?;
|
||||
|
||||
Ok(metas)
|
||||
}
|
||||
|
||||
fn validate_payload_size(&self, payload_size: u64) -> Result<()> {
|
||||
let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
ensure!(
|
||||
payload_size <= max_payload_size,
|
||||
UnexpectedFooterPayloadSizeSnafu {
|
||||
max_payload_size,
|
||||
actual_payload_size: payload_size,
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the read metadata is consistent with expected sizes and offsets.
|
||||
fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
|
||||
for meta in metas.metas.values() {
|
||||
let InvertedIndexMeta {
|
||||
base_offset,
|
||||
inverted_index_size,
|
||||
..
|
||||
} = meta;
|
||||
|
||||
let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size;
|
||||
ensure!(
|
||||
*base_offset + *inverted_index_size <= limit,
|
||||
UnexpectedOffsetSizeSnafu {
|
||||
offset: *base_offset,
|
||||
size: *inverted_index_size,
|
||||
blob_size: self.blob_size,
|
||||
payload_size,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::io::Cursor;
|
||||
use prost::Message;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
|
||||
let mut metas = InvertedIndexMetas::default();
|
||||
metas.metas.insert("test".to_string(), meta);
|
||||
|
||||
let mut payload_buf = vec![];
|
||||
metas.encode(&mut payload_buf).unwrap();
|
||||
|
||||
let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec();
|
||||
payload_buf.extend_from_slice(&footer_payload_size);
|
||||
payload_buf
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_payload() {
|
||||
let meta = InvertedIndexMeta {
|
||||
name: "test".to_string(),
|
||||
segment_row_count: 4096,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let payload_buf = create_test_payload(meta);
|
||||
let blob_size = payload_buf.len() as u64;
|
||||
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
|
||||
|
||||
let payload_size = reader.read_payload_size().await.unwrap();
|
||||
let metas = reader.read_payload(payload_size).await.unwrap();
|
||||
|
||||
assert_eq!(metas.metas.len(), 1);
|
||||
let index_meta = &metas.metas.get("test").unwrap();
|
||||
assert_eq!(index_meta.name, "test");
|
||||
assert_eq!(index_meta.segment_row_count, 4096);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_invalid_footer_payload_size() {
|
||||
let meta = InvertedIndexMeta {
|
||||
name: "test".to_string(),
|
||||
segment_row_count: 4096,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut payload_buf = create_test_payload(meta);
|
||||
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
|
||||
let blob_size = payload_buf.len() as u64;
|
||||
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
|
||||
|
||||
let payload_size_result = reader.read_payload_size().await;
|
||||
assert!(payload_size_result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_invalid_offset_size() {
|
||||
let meta = InvertedIndexMeta {
|
||||
name: "test".to_string(),
|
||||
base_offset: 0,
|
||||
inverted_index_size: 1, // Set size to 1 to make ecceed the blob size
|
||||
segment_row_count: 4096,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let payload_buf = create_test_payload(meta);
|
||||
let blob_size = payload_buf.len() as u64;
|
||||
let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size);
|
||||
|
||||
let payload_size = reader.read_payload_size().await.unwrap();
|
||||
let payload_result = reader.read_payload(payload_size).await;
|
||||
assert!(payload_result.is_err());
|
||||
}
|
||||
}
|
||||
15
src/index/src/lib.rs
Normal file
15
src/index/src/lib.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod inverted_index;
|
||||
@@ -40,6 +40,8 @@
|
||||
//!
|
||||
//! Footer payload bytes is either uncompressed or LZ4-compressed (as a single LZ4 compression frame with content size present),
|
||||
//! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object.
|
||||
//!
|
||||
//! [`FileMetadata`]: ../file_metadata/struct.FileMetadata.html
|
||||
|
||||
pub mod reader;
|
||||
pub mod writer;
|
||||
|
||||
Reference in New Issue
Block a user