feat(inverted_index.format): add writer (#2900)

* feat(inverted_index.format): add writer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: remove clippy allow

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/index/src/inverted_index/error.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Zhenchi
2023-12-11 17:55:25 +08:00
committed by GitHub
parent cf8b6c77dc
commit 1e22f1cb4f
7 changed files with 497 additions and 4 deletions

View File

@@ -40,6 +40,27 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to write"))]
Write {
#[snafu(source)]
error: IoError,
location: Location,
},
#[snafu(display("Failed to flush"))]
Flush {
#[snafu(source)]
error: IoError,
location: Location,
},
#[snafu(display("Failed to close"))]
Close {
#[snafu(source)]
error: IoError,
location: Location,
},
#[snafu(display(
"Unexpected inverted index blob size, min: {min_blob_size}, actual: {actual_blob_size}"
))]
@@ -115,6 +136,20 @@ pub enum Error {
#[snafu(display("index not found, name: {name}"))]
IndexNotFound { name: String, location: Location },
#[snafu(display("Failed to insert value to FST"))]
FstInsert {
#[snafu(source)]
error: fst::Error,
location: Location,
},
#[snafu(display("Failed to compile FST"))]
FstCompile {
#[snafu(source)]
error: fst::Error,
location: Location,
},
}
impl ErrorExt for Error {
@@ -123,19 +158,24 @@ impl ErrorExt for Error {
match self {
Seek { .. }
| Read { .. }
| Write { .. }
| Flush { .. }
| Close { .. }
| UnexpectedFooterPayloadSize { .. }
| UnexpectedZeroSegmentRowCount { .. }
| UnexpectedOffsetSize { .. }
| UnexpectedBlobSize { .. }
| DecodeProto { .. }
| DecodeFst { .. }
| KeysApplierUnexpectedPredicates { .. } => StatusCode::Unexpected,
| KeysApplierUnexpectedPredicates { .. }
| FstCompile { .. } => StatusCode::Unexpected,
ParseRegex { .. }
| ParseDFA { .. }
| KeysApplierWithoutInList { .. }
| IntersectionApplierWithInList { .. }
| EmptyPredicates { .. }
| FstInsert { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -27,10 +27,10 @@
//!
//! An inverted index comprises a collection of bitmaps, a null bitmap, and a finite state transducer (FST) indicating tag values' positions:
//!
//! `bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ null_bitmap fst`
//! `null_bitmap bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ fst`
//!
//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group.
//! - `null_bitmap`: Bitset tracking the presence of null values within the tag column.
//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group.
//! - `fst`: Finite State Transducer providing an ordered map of bytes, representing the tag values.
//!
//! ## Footer Details
@@ -51,6 +51,7 @@
//! [RFC]: https://github.com/GreptimeTeam/greptimedb/blob/develop/docs/rfcs/2023-11-03-inverted-index.md
pub mod reader;
pub mod writer;
const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4;
const MIN_BLOB_SIZE: u64 = FOOTER_PAYLOAD_SIZE_SIZE;

View File

@@ -20,6 +20,7 @@ use common_base::BitVec;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use crate::inverted_index::error::Result;
pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
use crate::inverted_index::FstMap;
/// InvertedIndexReader defines an asynchronous reader of inverted index data

View File

@@ -34,7 +34,6 @@ pub struct InvertedIndexBlobReader<R> {
}
impl<R> InvertedIndexBlobReader<R> {
#[allow(dead_code)]
pub fn new(source: R) -> Self {
Self { source }
}

View File

@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod blob;
mod single;
use async_trait::async_trait;
use common_base::BitVec;
use futures::Stream;
use crate::inverted_index::error::Result;
pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter;
use crate::inverted_index::Bytes;
/// Trait for writing inverted index data to underlying storage.
#[async_trait]
pub trait InvertedIndexWriter {
/// Adds entries to an index.
///
/// * `name` is the index identifier.
/// * `null_bitmap` marks positions of null entries.
/// * `values` is a stream of values and their locations, yielded lexicographically.
/// Errors occur if the values are out of order.
async fn add_index<S>(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()>
where
S: Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin;
/// Finalizes the index writing process, ensuring all data is written.
async fn finish(&mut self) -> Result<()>;
}

View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_base::BitVec;
use futures::{AsyncWrite, AsyncWriteExt, Stream};
use greptime_proto::v1::index::InvertedIndexMetas;
use prost::Message;
use snafu::ResultExt;
use super::single::SingleIndexWriter;
use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::Bytes;
/// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages
/// writing of an inverted index to a blob storage.
pub struct InvertedIndexBlobWriter<W> {
/// The underlying blob storage
blob_writer: W,
/// Tracks the total number of bytes written to the storage so far
written_size: u64,
/// Metadata about each index that has been written
metas: InvertedIndexMetas,
}
#[async_trait]
impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWriter<W> {
async fn add_index<S>(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()>
where
S: Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin,
{
let single_writer = SingleIndexWriter::new(
name.clone(),
self.written_size,
null_bitmap,
values,
&mut self.blob_writer,
);
let metadata = single_writer.write().await?;
self.written_size += metadata.inverted_index_size;
self.metas.metas.insert(name, metadata);
Ok(())
}
async fn finish(&mut self) -> Result<()> {
let metas_bytes = self.metas.encode_to_vec();
self.blob_writer
.write_all(&metas_bytes)
.await
.context(WriteSnafu)?;
let footer_size = metas_bytes.len() as u32;
self.blob_writer
.write_all(&footer_size.to_le_bytes())
.await
.context(WriteSnafu)?;
self.blob_writer.flush().await.context(FlushSnafu)?;
self.blob_writer.close().await.context(CloseSnafu)?;
Ok(())
}
}
impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
pub fn new(
blob_writer: W,
total_row_count: u64,
segment_row_count: u64,
) -> InvertedIndexBlobWriter<W> {
InvertedIndexBlobWriter {
blob_writer,
written_size: 0,
metas: InvertedIndexMetas {
total_row_count,
segment_row_count,
..Default::default()
},
}
}
}
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use super::*;
use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)
}
#[tokio::test]
async fn test_inverted_index_blob_writer_write_empty() {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1);
writer.finish().await.unwrap();
let cursor = Cursor::new(blob);
let mut reader = InvertedIndexBlobReader::new(cursor);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 0);
}
#[tokio::test]
async fn test_inverted_index_blob_writer_write_basic() {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1);
writer
.add_index(
"tag0".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
futures::stream::iter(vec![
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
]),
)
.await
.unwrap();
writer
.add_index(
"tag1".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
futures::stream::iter(vec![
Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
]),
)
.await
.unwrap();
writer.finish().await.unwrap();
let cursor = Cursor::new(blob);
let mut reader = InvertedIndexBlobReader::new(cursor);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
// tag0
let tag0 = metadata.metas.get("tag0").unwrap();
let stats0 = tag0.stats.as_ref().unwrap();
assert_eq!(stats0.distinct_count, 3);
assert_eq!(stats0.null_count, 1);
assert_eq!(stats0.min_value, Bytes::from("a"));
assert_eq!(stats0.max_value, Bytes::from("c"));
let fst0 = reader.fst(tag0).await.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = reader.bitmap(tag0, offset, size).await.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = reader.bitmap(tag0, offset, size).await.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = reader.bitmap(tag0, offset, size).await.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
// tag1
let tag1 = metadata.metas.get("tag1").unwrap();
let stats1 = tag1.stats.as_ref().unwrap();
assert_eq!(stats1.distinct_count, 3);
assert_eq!(stats1.null_count, 1);
assert_eq!(stats1.min_value, Bytes::from("x"));
assert_eq!(stats1.max_value, Bytes::from("z"));
let fst1 = reader.fst(tag1).await.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = reader.bitmap(tag1, offset, size).await.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = reader.bitmap(tag1, offset, size).await.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = reader.bitmap(tag1, offset, size).await.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
}
}

View File

@@ -0,0 +1,213 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::BitVec;
use fst::MapBuilder;
use futures::{AsyncWrite, AsyncWriteExt, Stream, StreamExt};
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats};
use snafu::ResultExt;
use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu};
use crate::inverted_index::Bytes;
/// `SingleIndexWriter` writes values to the blob storage for an individual inverted index
pub struct SingleIndexWriter<W, S> {
/// The underlying blob storage
blob_writer: W,
/// The null bitmap to be written
null_bitmap: BitVec,
/// The stream of values to be written, yielded lexicographically
values: S,
/// Builder for constructing the FST
fst: MapBuilder<Vec<u8>>,
/// Metadata about the index
meta: InvertedIndexMeta,
}
impl<W, S> SingleIndexWriter<W, S>
where
W: AsyncWrite + Send + Unpin,
S: Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin,
{
/// Constructs a new `SingleIndexWriter`
pub fn new(
name: String,
base_offset: u64,
null_bitmap: BitVec,
values: S,
blob_writer: W,
) -> SingleIndexWriter<W, S> {
SingleIndexWriter {
blob_writer,
null_bitmap,
values,
fst: MapBuilder::memory(),
meta: InvertedIndexMeta {
name,
base_offset,
stats: Some(InvertedIndexStats::default()),
..Default::default()
},
}
}
/// Writes the null bitmap, values with their bitmaps, and constructs the FST map.
pub async fn write(mut self) -> Result<InvertedIndexMeta> {
self.write_null_bitmap().await?;
while let Some(result) = self.values.next().await {
let (bytes, bitmap) = result?;
self.append_value(bytes, bitmap).await?;
}
self.finish_fst_construction().await
}
/// Writes the null bitmap to the blob and updates the metadata accordingly
async fn write_null_bitmap(&mut self) -> Result<()> {
let null_bitmap_bytes = self.null_bitmap.as_raw_slice();
self.blob_writer
.write_all(null_bitmap_bytes)
.await
.context(WriteSnafu)?;
self.meta.relative_null_bitmap_offset = self.meta.inverted_index_size as _;
self.meta.null_bitmap_size = null_bitmap_bytes.len() as _;
self.meta.inverted_index_size += self.meta.null_bitmap_size as u64;
// update stats
if let Some(stats) = self.meta.stats.as_mut() {
let null_count = self.null_bitmap.count_ones();
stats.null_count = null_count as u64;
}
Ok(())
}
/// Appends a value and its bitmap to the blob, updates the FST, and the metadata
async fn append_value(&mut self, value: Bytes, bitmap: BitVec) -> Result<()> {
let bitmap_bytes = bitmap.into_vec();
self.blob_writer
.write_all(&bitmap_bytes)
.await
.context(WriteSnafu)?;
let offset = self.meta.inverted_index_size as u32;
let size = bitmap_bytes.len() as u32;
self.meta.inverted_index_size += size as u64;
let packed = bytemuck::cast::<[u32; 2], u64>([offset, size]);
self.fst.insert(&value, packed).context(FstInsertSnafu)?;
// update stats
if let Some(stats) = self.meta.stats.as_mut() {
stats.distinct_count += 1;
// update min/max, assume values are appended in lexicographic order
if stats.distinct_count == 1 {
stats.min_value = value.clone();
}
stats.max_value = value;
}
Ok(())
}
/// Writes the compiled FST to the blob and finalizes the metadata
async fn finish_fst_construction(mut self) -> Result<InvertedIndexMeta> {
let fst_bytes = self.fst.into_inner().context(FstCompileSnafu)?;
self.blob_writer
.write_all(&fst_bytes)
.await
.context(WriteSnafu)?;
self.meta.relative_fst_offset = self.meta.inverted_index_size as _;
self.meta.fst_size = fst_bytes.len() as _;
self.meta.inverted_index_size += self.meta.fst_size as u64;
Ok(self.meta)
}
}
#[cfg(test)]
mod tests {
use futures::stream;
use super::*;
use crate::inverted_index::error::Error;
use crate::inverted_index::Bytes;
#[tokio::test]
async fn test_single_index_writer_write_empty() {
let mut blob = Vec::new();
let writer = SingleIndexWriter::new(
"test".to_string(),
0,
BitVec::new(),
stream::empty(),
&mut blob,
);
let meta = writer.write().await.unwrap();
assert_eq!(meta.name, "test");
assert_eq!(meta.base_offset, 0);
assert_eq!(meta.stats, Some(InvertedIndexStats::default()));
}
#[tokio::test]
async fn test_single_index_writer_write_basic() {
let mut blob = Vec::new();
let writer = SingleIndexWriter::new(
"test".to_string(),
0,
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
stream::iter(vec![
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))),
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
]),
&mut blob,
);
let meta = writer.write().await.unwrap();
assert_eq!(meta.name, "test");
assert_eq!(meta.base_offset, 0);
let stats = meta.stats.as_ref().unwrap();
assert_eq!(stats.distinct_count, 3);
assert_eq!(stats.null_count, 1);
assert_eq!(stats.min_value, Bytes::from("a"));
assert_eq!(stats.max_value, Bytes::from("c"));
}
#[tokio::test]
async fn test_single_index_writer_write_out_of_order() {
let mut blob = Vec::new();
let writer = SingleIndexWriter::new(
"test".to_string(),
0,
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
stream::iter(vec![
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))),
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
]),
&mut blob,
);
let res = writer.write().await;
assert!(matches!(res, Err(Error::FstInsert { .. })));
}
}