feat(inverted_index.create): add index creator (#2960)

* feat(inverted_index.create): add read/write for external intermediate files

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: MAGIC_CODEC_V1 -> CODEC_V1_MAGIC

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: fix typos intermedia -> intermediate

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(inverted_index.create): add external sorter

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: fix typos intermedia -> intermediate

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: drop the stream as early as possible to avoid recursive calls to poll

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: project merge sorted stream

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add total_row_count to SortOutput

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: remove change of format

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(inverted_index.create): add index creator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add check for total_row_count

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: lazy set meta of writer

This reverts commit 63cb5bdb5c3a08406d978357d8167ca18ed1b83b.

* feat: lazyily provide inverted index writer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish readability

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add push_with_name_n

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-12-20 15:02:13 +08:00
committed by GitHub
parent d2f49cbc2e
commit 7d1724f832
6 changed files with 422 additions and 32 deletions

View File

@@ -13,3 +13,45 @@
// limitations under the License.
mod sort;
mod sort_create;
use async_trait::async_trait;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::BytesRef;
/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
/// - `value`: The data to be indexed, or `None` for a null entry
///
/// It should be equivalent to calling `push_with_name_n` with `n = 1`
async fn push_with_name(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
) -> Result<()> {
self.push_with_name_n(index_name, value, 1).await
}
/// Adds `n` identical values to the named index. `None` values represent absence of data (null)
///
/// - `index_name`: Identifier for the index being built
/// - `value`: The data to be indexed, or `None` for a null entry
///
/// It should be equivalent to calling `push_with_name` `n` times
async fn push_with_name_n(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
n: usize,
) -> Result<()>;
/// Finalizes the index creation process, ensuring all data is properly indexed and stored
/// in the provided writer
async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()>;
}

View File

@@ -29,6 +29,7 @@ use crate::inverted_index::create::sort::intermediate_rw::{
};
use crate::inverted_index::create::sort::merge_stream::MergeSortedStream;
use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter};
use crate::inverted_index::create::sort_create::SorterFactory;
use crate::inverted_index::error::Result;
use crate::inverted_index::{Bytes, BytesRef};
@@ -137,6 +138,21 @@ impl ExternalSorter {
}
}
/// Generates a factory function that creates new `ExternalSorter` instances
pub fn factory(
temp_file_provider: Arc<dyn ExternalTempFileProvider>,
memory_usage_threshold: Option<usize>,
) -> SorterFactory {
Box::new(move |index_name, segment_row_count| {
Box::new(Self::new(
index_name,
temp_file_provider.clone(),
segment_row_count,
memory_usage_threshold,
))
})
}
/// Pushes the non-null values to the values buffer and sets the bits within
/// the specified range in the given BitVec to true.
/// Returns the memory usage difference of the buffer after the operation.

View File

@@ -0,0 +1,304 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::num::NonZeroUsize;
use async_trait::async_trait;
use snafu::ensure;
use crate::inverted_index::create::sort::{SortOutput, Sorter};
use crate::inverted_index::create::InvertedIndexCreator;
use crate::inverted_index::error::{InconsistentRowCountSnafu, Result};
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::BytesRef;
type IndexName = String;
type SegmentRowCount = NonZeroUsize;
/// Factory type to produce `Sorter` instances associated with an index name and segment row count
pub type SorterFactory = Box<dyn Fn(IndexName, SegmentRowCount) -> Box<dyn Sorter> + Send>;
/// `SortIndexCreator` orchestrates indexing by sorting input data for each named index
/// and writing to an inverted index writer
pub struct SortIndexCreator {
/// Factory for producing `Sorter` instances
sorter_factory: SorterFactory,
/// Map of index names to sorters
sorters: HashMap<IndexName, Box<dyn Sorter>>,
/// Number of rows in each segment, used to produce sorters
segment_row_count: NonZeroUsize,
}
#[async_trait]
impl InvertedIndexCreator for SortIndexCreator {
/// Inserts `n` values or nulls into the sorter for the specified index.
///
/// If the index does not exist, a new index is created even if `n` is 0.
/// Caller may leverage this behavior to create indexes with no data.
async fn push_with_name_n(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
n: usize,
) -> Result<()> {
match self.sorters.get_mut(index_name) {
Some(sorter) => sorter.push_n(value, n).await,
None => {
let index_name = index_name.to_string();
let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count);
sorter.push_n(value, n).await?;
self.sorters.insert(index_name, sorter);
Ok(())
}
}
}
/// Finalizes the sorting for all indexes and writes them using the inverted index writer
async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()> {
let mut output_row_count = None;
for (index_name, mut sorter) in self.sorters.drain() {
let SortOutput {
segment_null_bitmap,
sorted_stream,
total_row_count,
} = sorter.output().await?;
let expected_row_count = *output_row_count.get_or_insert(total_row_count);
ensure!(
expected_row_count == total_row_count,
InconsistentRowCountSnafu {
index_name,
total_row_count,
expected_row_count,
}
);
writer
.add_index(index_name, segment_null_bitmap, sorted_stream)
.await?;
}
let total_row_count = output_row_count.unwrap_or_default() as _;
let segment_row_count = self.segment_row_count as _;
writer.finish(total_row_count, segment_row_count).await
}
}
impl SortIndexCreator {
/// Creates a new `SortIndexCreator` with the given sorter factory and index writer
pub fn new(sorter_factory: SorterFactory, segment_row_count: NonZeroUsize) -> Self {
Self {
sorter_factory,
sorters: HashMap::new(),
segment_row_count,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use common_base::BitVec;
use futures::{stream, StreamExt};
use super::*;
use crate::inverted_index::create::sort::SortedStream;
use crate::inverted_index::error::Error;
use crate::inverted_index::format::writer::MockInvertedIndexWriter;
use crate::inverted_index::Bytes;
#[tokio::test]
async fn test_sort_index_creator_basic() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
let index_values = vec![
("a", vec![b"3", b"2", b"1"]),
("b", vec![b"6", b"5", b"4"]),
("c", vec![b"1", b"2", b"3"]),
];
for (index_name, values) in index_values {
for value in values {
creator
.push_with_name(index_name, Some(value))
.await
.unwrap();
}
}
let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer
.expect_add_index()
.times(3)
.returning(|name, null_bitmap, stream| {
assert!(null_bitmap.is_empty());
match name.as_str() {
"a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
"b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
"c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
_ => panic!("unexpected index name: {}", name),
}
Ok(())
});
mock_writer
.expect_finish()
.times(1)
.returning(|total_row_count, segment_row_count| {
assert_eq!(total_row_count, 3);
assert_eq!(segment_row_count.get(), 1);
Ok(())
});
creator.finish(&mut mock_writer).await.unwrap();
}
#[tokio::test]
async fn test_sort_index_creator_inconsistent_row_count() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
let index_values = vec![
("a", vec![b"3", b"2", b"1"]),
("b", vec![b"6", b"5", b"4"]),
("c", vec![b"1", b"2"]),
];
for (index_name, values) in index_values {
for value in values {
creator
.push_with_name(index_name, Some(value))
.await
.unwrap();
}
}
let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer
.expect_add_index()
.returning(|name, null_bitmap, stream| {
assert!(null_bitmap.is_empty());
match name.as_str() {
"a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
"b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
"c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2"]),
_ => panic!("unexpected index name: {}", name),
}
Ok(())
});
mock_writer.expect_finish().never();
let res = creator.finish(&mut mock_writer).await;
assert!(matches!(res, Err(Error::InconsistentRowCount { .. })));
}
#[tokio::test]
async fn test_sort_index_creator_create_indexes_without_data() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
creator.push_with_name_n("a", None, 0).await.unwrap();
creator.push_with_name_n("b", None, 0).await.unwrap();
creator.push_with_name_n("c", None, 0).await.unwrap();
let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer
.expect_add_index()
.returning(|name, null_bitmap, stream| {
assert!(null_bitmap.is_empty());
assert!(matches!(name.as_str(), "a" | "b" | "c"));
assert!(stream_to_values(stream).is_empty());
Ok(())
});
mock_writer
.expect_finish()
.times(1)
.returning(|total_row_count, segment_row_count| {
assert_eq!(total_row_count, 0);
assert_eq!(segment_row_count.get(), 1);
Ok(())
});
creator.finish(&mut mock_writer).await.unwrap();
}
fn set_bit(bit_vec: &mut BitVec, index: usize) {
if index >= bit_vec.len() {
bit_vec.resize(index + 1, false);
}
bit_vec.set(index, true);
}
struct NaiveSorter {
total_row_count: usize,
segment_row_count: NonZeroUsize,
values: BTreeMap<Option<Bytes>, BitVec>,
}
impl NaiveSorter {
fn factory() -> SorterFactory {
Box::new(|_index_name, segment_row_count| {
Box::new(NaiveSorter {
total_row_count: 0,
segment_row_count,
values: BTreeMap::new(),
})
})
}
}
#[async_trait]
impl Sorter for NaiveSorter {
async fn push(&mut self, value: Option<BytesRef<'_>>) -> Result<()> {
let segment_index = self.total_row_count / self.segment_row_count;
self.total_row_count += 1;
let bitmap = self.values.entry(value.map(Into::into)).or_default();
set_bit(bitmap, segment_index);
Ok(())
}
async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()> {
for _ in 0..n {
self.push(value).await?;
}
Ok(())
}
async fn output(&mut self) -> Result<SortOutput> {
let segment_null_bitmap = self.values.remove(&None).unwrap_or_default();
Ok(SortOutput {
segment_null_bitmap,
sorted_stream: Box::new(stream::iter(
std::mem::take(&mut self.values)
.into_iter()
.map(|(v, b)| Ok((v.unwrap(), b))),
)),
total_row_count: self.total_row_count,
})
}
}
fn stream_to_values(stream: SortedStream) -> Vec<Bytes> {
futures::executor::block_on(async {
stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
})
}
}

View File

@@ -160,6 +160,13 @@ pub enum Error {
#[snafu(display("Unknown intermediate codec magic: {magic:?}"))]
UnknownIntermediateCodecMagic { magic: [u8; 4], location: Location },
#[snafu(display("Inconsistent row count, index_name: {index_name}, total_row_count: {total_row_count}, expected: {expected_row_count}"))]
InconsistentRowCount {
index_name: String,
total_row_count: usize,
expected_row_count: usize,
},
}
impl ErrorExt for Error {
@@ -188,6 +195,7 @@ impl ErrorExt for Error {
| IntersectionApplierWithInList { .. }
| EmptyPredicates { .. }
| FstInsert { .. }
| InconsistentRowCount { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -15,6 +15,8 @@
mod blob;
mod single;
use std::num::NonZeroUsize;
use async_trait::async_trait;
use common_base::BitVec;
use futures::Stream;
@@ -23,19 +25,27 @@ use crate::inverted_index::error::Result;
pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter;
use crate::inverted_index::Bytes;
pub type ValueStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;
/// Trait for writing inverted index data to underlying storage.
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexWriter {
pub trait InvertedIndexWriter: Send {
/// Adds entries to an index.
///
/// * `name` is the index identifier.
/// * `null_bitmap` marks positions of null entries.
/// * `values` is a stream of values and their locations, yielded lexicographically.
/// Errors occur if the values are out of order.
async fn add_index<S>(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()>
where
S: Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin;
async fn add_index(
&mut self,
name: String,
null_bitmap: BitVec,
values: ValueStream,
) -> Result<()>;
/// Finalizes the index writing process, ensuring all data is written.
async fn finish(&mut self) -> Result<()>;
/// `total_row_count` and `segment_row_count` is used to fill in the metadata.
async fn finish(&mut self, total_row_count: u64, segment_row_count: NonZeroUsize)
-> Result<()>;
}

View File

@@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroUsize;
use async_trait::async_trait;
use common_base::BitVec;
use futures::{AsyncWrite, AsyncWriteExt, Stream};
use futures::{AsyncWrite, AsyncWriteExt};
use greptime_proto::v1::index::InvertedIndexMetas;
use prost::Message;
use snafu::ResultExt;
use super::single::SingleIndexWriter;
use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::Bytes;
use crate::inverted_index::format::writer::single::SingleIndexWriter;
use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};
/// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages
/// writing of an inverted index to a blob storage.
@@ -39,10 +40,12 @@ pub struct InvertedIndexBlobWriter<W> {
#[async_trait]
impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWriter<W> {
async fn add_index<S>(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()>
where
S: Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin,
{
async fn add_index(
&mut self,
name: String,
null_bitmap: BitVec,
values: ValueStream,
) -> Result<()> {
let single_writer = SingleIndexWriter::new(
name.clone(),
self.written_size,
@@ -58,7 +61,14 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWrit
Ok(())
}
async fn finish(&mut self) -> Result<()> {
async fn finish(
&mut self,
total_row_count: u64,
segment_row_count: NonZeroUsize,
) -> Result<()> {
self.metas.segment_row_count = segment_row_count.get() as _;
self.metas.total_row_count = total_row_count;
let metas_bytes = self.metas.encode_to_vec();
self.blob_writer
.write_all(&metas_bytes)
@@ -78,19 +88,11 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWrit
}
impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
pub fn new(
blob_writer: W,
total_row_count: u64,
segment_row_count: u64,
) -> InvertedIndexBlobWriter<W> {
pub fn new(blob_writer: W) -> InvertedIndexBlobWriter<W> {
InvertedIndexBlobWriter {
blob_writer,
written_size: 0,
metas: InvertedIndexMetas {
total_row_count,
segment_row_count,
..Default::default()
},
metas: InvertedIndexMetas::default(),
}
}
}
@@ -98,9 +100,11 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use futures::stream;
use super::*;
use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use crate::inverted_index::Bytes;
fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)
@@ -109,8 +113,11 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_writer_write_empty() {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1);
writer.finish().await.unwrap();
let mut writer = InvertedIndexBlobWriter::new(&mut blob);
writer
.finish(8, NonZeroUsize::new(1).unwrap())
.await
.unwrap();
let cursor = Cursor::new(blob);
let mut reader = InvertedIndexBlobReader::new(cursor);
@@ -123,16 +130,16 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_writer_write_basic() {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1);
let mut writer = InvertedIndexBlobWriter::new(&mut blob);
writer
.add_index(
"tag0".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
futures::stream::iter(vec![
Box::new(stream::iter(vec![
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
]),
])),
)
.await
.unwrap();
@@ -140,15 +147,18 @@ mod tests {
.add_index(
"tag1".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
futures::stream::iter(vec![
Box::new(stream::iter(vec![
Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
]),
])),
)
.await
.unwrap();
writer.finish().await.unwrap();
writer
.finish(8, NonZeroUsize::new(1).unwrap())
.await
.unwrap();
let cursor = Cursor::new(blob);
let mut reader = InvertedIndexBlobReader::new(cursor);