feat(inverted_index): introduce SstIndexCreator (#3107)

* feat(inverted_index): introduce SstIndexCreator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: tiny polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: distinguish intermediate store and index store

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: move comment as doc comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: column id as index name

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-01-09 17:24:16 +08:00
committed by GitHub
parent 7d0d2163d2
commit db98484796
19 changed files with 911 additions and 47 deletions

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod sort;
mod sort_create;
pub mod sort;
pub mod sort_create;
use async_trait::async_trait;

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod external_provider;
mod external_sort;
pub mod external_provider;
pub mod external_sort;
mod intermediate_rw;
mod merge_stream;

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::io::Error as IoError;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -167,6 +167,12 @@ pub enum Error {
total_row_count: usize,
expected_row_count: usize,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
location: Location,
},
}
impl ErrorExt for Error {
@@ -197,6 +203,8 @@ impl ErrorExt for Error {
| FstInsert { .. }
| InconsistentRowCount { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,
External { source, .. } => source.status_code(),
}
}

View File

@@ -445,6 +445,21 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to push index value"))]
PushIndexValue {
source: index::inverted_index::error::Error,
location: Location,
},
#[snafu(display("Failed to write index completely"))]
IndexFinish {
source: index::inverted_index::error::Error,
location: Location,
},
#[snafu(display("Operate on aborted index"))]
OperateAbortedIndex { location: Location },
#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
source: puffin::error::Error,
@@ -463,6 +478,18 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to write puffin completely"))]
PuffinFinish {
source: puffin::error::Error,
location: Location,
},
#[snafu(display("Failed to add blob to puffin file"))]
PuffinAddBlob {
source: puffin::error::Error,
location: Location,
},
#[snafu(display("Failed to clean dir {dir}"))]
CleanDir {
dir: String,
@@ -522,6 +549,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
@@ -569,10 +597,14 @@ impl ErrorExt for Error {
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
| PuffinFinish { source, .. }
| PuffinAddBlob { source, .. } => source.status_code(),
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,

View File

@@ -150,7 +150,7 @@ lazy_static! {
// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!(
"index_apply_elapsed",
"greptime_index_apply_elapsed",
"index apply elapsed",
)
.unwrap();
@@ -160,6 +160,26 @@ lazy_static! {
"index apply memory usage",
)
.unwrap();
/// Timer of index creation.
pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_create_elapsed",
"index create elapsed",
&[STAGE_LABEL]
)
.unwrap();
/// Counter of rows indexed.
pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!(
"greptime_index_create_rows_total",
"index create rows total",
)
.unwrap();
/// Counter of created index bytes.
pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!(
"greptime_index_create_bytes_total",
"index create bytes total",
)
.unwrap();
/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_io_bytes_total",
@@ -170,6 +190,15 @@ lazy_static! {
/// Counter of read bytes on puffin files.
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of write bytes on puffin files.
pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "puffin"]);
/// Counter of read bytes on intermediate files.
pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "intermediate"]);
/// Counter of write bytes on intermediate files.
pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "intermediate"]);
/// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush.
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
@@ -190,5 +219,17 @@ lazy_static! {
/// Counter of flush operations on puffin files.
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
/// Counter of read operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "intermediate"]);
/// Counter of seek operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "intermediate"]);
/// Counter of write operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "intermediate"]);
/// Counter of flush operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "intermediate"]);
// ------- End of index metrics.
}

View File

@@ -48,7 +48,7 @@ pub trait RowCodec {
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>>;
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
data_type: ConcreteDataType,
}

View File

@@ -16,6 +16,14 @@
pub mod applier;
mod codec;
pub mod creator;
mod store;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
// TODO(zhongzc): how to determine this value?
/// The minimum memory usage threshold for a column to qualify for external sorting during index creation.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192;
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;

View File

@@ -32,14 +32,13 @@ use index::inverted_index::search::predicate::Predicate;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
type ColumnName = String;
/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan.
pub struct SstIndexApplierBuilder<'a> {
/// Directory of the region, required argument for constructing [`SstIndexApplier`].
@@ -52,7 +51,7 @@ pub struct SstIndexApplierBuilder<'a> {
metadata: &'a RegionMetadata,
/// Stores predicates during traversal on the Expr tree.
output: HashMap<ColumnName, Vec<Predicate>>,
output: HashMap<ColumnId, Vec<Predicate>>,
}
impl<'a> SstIndexApplierBuilder<'a> {
@@ -81,7 +80,11 @@ impl<'a> SstIndexApplierBuilder<'a> {
return Ok(None);
}
let predicates = self.output.into_iter().collect();
let predicates = self
.output
.into_iter()
.map(|(column_id, predicates)| (column_id.to_string(), predicates))
.collect();
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(SstIndexApplier::new(
self.region_dir,
@@ -122,18 +125,16 @@ impl<'a> SstIndexApplierBuilder<'a> {
}
/// Helper function to add a predicate to the output.
fn add_predicate(&mut self, column_name: &str, predicate: Predicate) {
match self.output.get_mut(column_name) {
Some(predicates) => predicates.push(predicate),
None => {
self.output.insert(column_name.to_string(), vec![predicate]);
}
}
fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
self.output.entry(column_id).or_default().push(predicate);
}
/// Helper function to get the column type of a tag column.
/// Helper function to get the column id and the column type of a tag column.
/// Returns `None` if the column is not a tag column.
fn tag_column_type(&self, column_name: &str) -> Result<Option<ConcreteDataType>> {
fn tag_column_id_and_type(
&self,
column_name: &str,
) -> Result<Option<(ColumnId, ConcreteDataType)>> {
let column = self
.metadata
.column_by_name(column_name)
@@ -142,7 +143,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
})?;
Ok((column.semantic_type == SemanticType::Tag)
.then(|| column.column_schema.data_type.clone()))
.then(|| (column.column_id, column.column_schema.data_type.clone())))
}
/// Helper function to get a non-null literal.
@@ -303,7 +304,7 @@ mod tests {
});
builder.traverse_and_collect(&expr);
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
@@ -311,7 +312,7 @@ mod tests {
pattern: "bar".to_string()
})
);
let predicates = builder.output.get("b").unwrap();
let predicates = builder.output.get(&2).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],

View File

@@ -28,7 +28,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(column_name) = Self::column_name(&between.expr) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
let Some(low) = Self::nonnull_lit(&between.low) else {
@@ -51,7 +51,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
},
});
self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
Ok(())
}
}
@@ -80,7 +80,7 @@ mod tests {
builder.collect_between(&between).unwrap();
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],

View File

@@ -114,7 +114,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(lit) = Self::nonnull_lit(literal) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
@@ -122,7 +122,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
range: range_builder(Self::encode_lit(lit, data_type)?),
});
self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
Ok(())
}
}
@@ -230,7 +230,7 @@ mod tests {
builder.collect_comparison_expr(left, op, right).unwrap();
}
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), cases.len());
for ((_, expected), actual) in cases.into_iter().zip(predicates) {
assert_eq!(

View File

@@ -31,14 +31,14 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
let predicate = Predicate::InList(InListPredicate {
list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]),
});
self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
Ok(())
}
@@ -59,7 +59,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
@@ -68,7 +68,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
let predicate = Predicate::InList(InListPredicate { list: inlist });
self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
}
Ok(())
@@ -142,7 +142,7 @@ mod tests {
.collect_eq(&string_lit("bar"), &tag_column())
.unwrap();
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 2);
assert_eq!(
predicates[0],
@@ -227,7 +227,7 @@ mod tests {
builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],

View File

@@ -29,7 +29,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(column_name) = Self::column_name(&inlist.expr) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
@@ -46,7 +46,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
.insert(Self::encode_lit(lit, data_type.clone())?);
}
self.add_predicate(column_name, Predicate::InList(predicate));
self.add_predicate(column_id, Predicate::InList(predicate));
Ok(())
}
}
@@ -74,7 +74,7 @@ mod tests {
builder.collect_inlist(&in_list).unwrap();
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],

View File

@@ -25,7 +25,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(column_name) = Self::column_name(column) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
if !data_type.is_string() {
@@ -38,7 +38,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let predicate = Predicate::RegexMatch(RegexMatchPredicate {
pattern: pattern.clone(),
});
self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
Ok(())
}
}
@@ -62,7 +62,7 @@ mod tests {
.collect_regex_match(&tag_column(), &string_lit("abc"))
.unwrap();
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::value::ValueRef;
use datatypes::value::{Value, ValueRef};
use memcomparable::Serializer;
use store_api::metadata::ColumnMetadata;
use crate::error::Result;
use crate::row_converter::SortField;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
@@ -36,9 +37,65 @@ impl IndexValueCodec {
}
}
type ColumnId = String;
/// Decodes primary key values into their corresponding column ids, data types and values.
pub struct IndexValuesCodec {
/// The tag column ids.
column_ids: Vec<ColumnId>,
/// The data types of tag columns.
fields: Vec<SortField>,
/// The decoder for the primary key.
decoder: McmpRowCodec,
}
impl IndexValuesCodec {
/// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns.
pub fn from_tag_columns<'a>(tag_columns: impl Iterator<Item = &'a ColumnMetadata>) -> Self {
let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns
.map(|column| {
(
column.column_id.to_string(),
SortField::new(column.column_schema.data_type.clone()),
)
})
.unzip();
let decoder = McmpRowCodec::new(fields.clone());
Self {
column_ids,
fields,
decoder,
}
}
/// Decodes a primary key into its corresponding column ids, data types and values.
pub fn decode(
&self,
primary_key: &[u8],
) -> Result<impl Iterator<Item = (&ColumnId, &SortField, Option<Value>)>> {
let values = self.decoder.decode(primary_key)?;
let iter = values
.into_iter()
.zip(&self.column_ids)
.zip(&self.fields)
.map(|((value, column_id), encoder)| {
if value.is_null() {
(column_id, encoder, None)
} else {
(column_id, encoder, Some(value))
}
});
Ok(iter)
}
}
#[cfg(test)]
mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use super::*;
use crate::error::Error;
@@ -62,4 +119,42 @@ mod tests {
let res = IndexValueCodec::encode_value(value, &field, &mut buffer);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
}
#[test]
fn test_decode_primary_key_basic() {
let tag_columns = vec![
ColumnMetadata {
column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
semantic_type: api::v1::SemanticType::Tag,
column_id: 1,
},
ColumnMetadata {
column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
semantic_type: api::v1::SemanticType::Tag,
column_id: 2,
},
];
let primary_key = McmpRowCodec::new(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
])
.encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
.unwrap();
let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter());
let mut iter = codec.decode(&primary_key).unwrap();
let (column_id, field, value) = iter.next().unwrap();
assert_eq!(column_id, "1");
assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype()));
assert_eq!(value, None);
let (column_id, field, value) = iter.next().unwrap();
assert_eq!(column_id, "2");
assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype()));
assert_eq!(value, Some(Value::Int64(10)));
assert!(iter.next().is_none());
}
}

View File

@@ -0,0 +1,263 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod statistics;
mod temp_provider;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use common_telemetry::warn;
use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use object_store::ObjectStore;
use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PuffinFinishSnafu,
PushIndexValueSnafu, Result,
};
use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
};
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::{
INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB,
};
use crate::sst::location::{self, IntermediateLocation};
type ByteCount = usize;
type RowCount = usize;
/// Creates SST index.
pub struct SstIndexCreator {
/// Directory of the region.
region_dir: String,
/// ID of the SST file.
sst_file_id: FileId,
/// The store to write index files.
store: InstrumentedStore,
/// The index creator.
index_creator: Box<dyn InvertedIndexCreator>,
/// The provider of intermediate files.
temp_file_provider: Arc<TempFileProvider>,
/// Codec for decoding primary keys.
codec: IndexValuesCodec,
/// Reusable buffer for encoding index values.
value_buf: Vec<u8>,
/// Statistics of index creation.
stats: Statistics,
/// Whether the index creation is aborted.
aborted: bool,
}
impl SstIndexCreator {
/// Creates a new `SstIndexCreator`.
/// Should ensure that the number of tag columns is greater than 0.
pub fn new(
region_dir: String,
sst_file_id: FileId,
metadata: &RegionMetadataRef,
index_store: ObjectStore,
intermediate_store: ObjectStore, // prefer to use local store
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
) -> Self {
// `memory_usage_threshold` is the total memory usage threshold of the index creation,
// so we need to divide it by the number of columns
let memory_threshold = memory_usage_threshold.map(|threshold| {
(threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD)
});
let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&region_dir, &sst_file_id),
InstrumentedStore::new(intermediate_store),
));
let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold);
let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size));
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
Self {
region_dir,
sst_file_id,
store: InstrumentedStore::new(index_store),
codec,
index_creator,
temp_file_provider,
value_buf: vec![],
stats: Statistics::default(),
aborted: false,
}
}
/// Updates index with a batch of rows.
/// Garbage will be cleaned up if failed to update.
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if batch.is_empty() {
return Ok(());
}
if let Err(update_err) = self.do_update(batch).await {
// clean up garbage if failed to update
if let Err(err) = self.do_cleanup().await {
warn!(
err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}",
self.region_dir, self.sst_file_id,
);
}
return Err(update_err);
}
Ok(())
}
/// Finishes index creation and cleans up garbage.
/// Returns the number of rows and bytes written.
pub async fn finish(&mut self) -> Result<(RowCount, ByteCount)> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if self.stats.row_count() == 0 {
// no IO is performed, no garbage to clean up, just return
return Ok((0, 0));
}
let finish_res = self.do_finish().await;
// clean up garbage no matter finish successfully or not
if let Err(err) = self.do_cleanup().await {
warn!(
err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}",
self.region_dir, self.sst_file_id,
);
}
finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
}
/// Aborts index creation and clean up garbage.
pub async fn abort(&mut self) -> Result<()> {
if self.aborted {
return Ok(());
}
self.aborted = true;
self.do_cleanup().await
}
async fn do_update(&mut self, batch: &Batch) -> Result<()> {
let mut guard = self.stats.record_update();
let n = batch.num_rows();
guard.inc_row_count(n);
for (column_id, field, value) in self.codec.decode(batch.primary_key())? {
if let Some(value) = value.as_ref() {
self.value_buf.clear();
IndexValueCodec::encode_value(value.as_value_ref(), field, &mut self.value_buf)?;
}
// non-null value -> Some(encoded_bytes), null value -> None
let value = value.is_some().then_some(self.value_buf.as_slice());
self.index_creator
.push_with_name_n(column_id, value, n)
.await
.context(PushIndexValueSnafu)?;
}
Ok(())
}
/// Data flow of finishing index:
///
/// ```text
/// (In Memory Buffer)
/// ┌──────┐
/// ┌─────────────┐ │ PIPE │
/// │ │ write index data │ │
/// │ IndexWriter ├──────────────────►│ tx │
/// │ │ │ │
/// └─────────────┘ │ │
/// ┌─────────────────┤ rx │
/// ┌─────────────┐ │ read as blob └──────┘
/// │ │ │
/// │ PuffinWriter├─┤
/// │ │ │ copy to file ┌──────┐
/// └─────────────┘ └────────────────►│ File │
/// └──────┘
/// ```
async fn do_finish(&mut self) -> Result<()> {
let mut guard = self.stats.record_finish();
let file_path = location::index_file_path(&self.region_dir, self.sst_file_id);
let file_writer = self
.store
.writer(
&file_path,
&INDEX_PUFFIN_WRITE_BYTES_TOTAL,
&INDEX_PUFFIN_WRITE_OP_TOTAL,
&INDEX_PUFFIN_FLUSH_OP_TOTAL,
)
.await?;
let mut puffin_writer = PuffinFileWriter::new(file_writer);
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let blob = Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
data: rx.compat(),
properties: HashMap::default(),
};
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
let (index_finish, puffin_add_blob) = futures::join!(
self.index_creator.finish(&mut index_writer),
puffin_writer.add_blob(blob)
);
index_finish.context(IndexFinishSnafu)?;
puffin_add_blob.context(PuffinAddBlobSnafu)?;
let byte_count = puffin_writer.finish().await.context(PuffinFinishSnafu)?;
guard.inc_byte_count(byte_count);
Ok(())
}
async fn do_cleanup(&mut self) -> Result<()> {
let _guard = self.stats.record_cleanup();
self.temp_file_provider.cleanup().await
}
}
#[cfg(test)]
mod tests {
// TODO(zhongzc): This PR has grown quite large, and the SstIndexCreator deserves
// a significant number of unit tests. These unit tests are substantial enough to
// make up a large PR on their own. I will bring them in with the next PR.
}

View File

@@ -0,0 +1,160 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{Duration, Instant};
use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_ELAPSED, INDEX_CREATE_ROWS_TOTAL};
/// Stage of the index creation process.
enum Stage {
Update,
Finish,
Cleanup,
}
/// Statistics for index creation. Flush metrics when dropped.
#[derive(Default)]
pub(crate) struct Statistics {
/// Accumulated elapsed time for the index update stage.
update_elapsed: Duration,
/// Accumulated elapsed time for the index finish stage.
finish_elapsed: Duration,
/// Accumulated elapsed time for the cleanup stage.
cleanup_eplased: Duration,
/// Number of rows in the index.
row_count: usize,
/// Number of bytes in the index.
byte_count: usize,
}
impl Statistics {
/// Starts timing the update stage, returning a `TimerGuard` to automatically record duration.
#[must_use]
pub fn record_update(&mut self) -> TimerGuard<'_> {
TimerGuard::new(self, Stage::Update)
}
/// Starts timing the finish stage, returning a `TimerGuard` to automatically record duration.
#[must_use]
pub fn record_finish(&mut self) -> TimerGuard<'_> {
TimerGuard::new(self, Stage::Finish)
}
/// Starts timing the cleanup stage, returning a `TimerGuard` to automatically record duration.
#[must_use]
pub fn record_cleanup(&mut self) -> TimerGuard<'_> {
TimerGuard::new(self, Stage::Cleanup)
}
/// Returns row count.
pub fn row_count(&self) -> usize {
self.row_count
}
/// Returns byte count.
pub fn byte_count(&self) -> usize {
self.byte_count
}
}
impl Drop for Statistics {
fn drop(&mut self) {
INDEX_CREATE_ELAPSED
.with_label_values(&["update"])
.observe(self.update_elapsed.as_secs_f64());
INDEX_CREATE_ELAPSED
.with_label_values(&["finish"])
.observe(self.finish_elapsed.as_secs_f64());
INDEX_CREATE_ELAPSED
.with_label_values(&["cleanup"])
.observe(self.cleanup_eplased.as_secs_f64());
INDEX_CREATE_ELAPSED.with_label_values(&["total"]).observe(
(self.update_elapsed + self.finish_elapsed + self.cleanup_eplased).as_secs_f64(),
);
INDEX_CREATE_ROWS_TOTAL.inc_by(self.row_count as _);
INDEX_CREATE_BYTES_TOTAL.inc_by(self.byte_count as _);
}
}
/// `TimerGuard` is a RAII struct that ensures elapsed time
/// is recorded when it goes out of scope.
pub(crate) struct TimerGuard<'a> {
stats: &'a mut Statistics,
stage: Stage,
timer: Instant,
}
impl<'a> TimerGuard<'a> {
/// Creates a new `TimerGuard`,
fn new(stats: &'a mut Statistics, stage: Stage) -> Self {
Self {
stats,
stage,
timer: Instant::now(),
}
}
/// Increases the row count of the index creation statistics.
pub fn inc_row_count(&mut self, n: usize) {
self.stats.row_count += n;
}
/// Increases the byte count of the index creation statistics.
pub fn inc_byte_count(&mut self, n: usize) {
self.stats.byte_count += n;
}
}
impl Drop for TimerGuard<'_> {
fn drop(&mut self) {
match self.stage {
Stage::Update => {
self.stats.update_elapsed += self.timer.elapsed();
}
Stage::Finish => {
self.stats.finish_elapsed += self.timer.elapsed();
}
Stage::Cleanup => {
self.stats.cleanup_eplased += self.timer.elapsed();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_statistics_basic() {
let mut stats = Statistics::default();
{
let mut guard = stats.record_update();
guard.inc_byte_count(100);
guard.inc_row_count(10);
}
{
let _guard = stats.record_finish();
}
{
let _guard = stats.record_cleanup();
}
assert_eq!(stats.row_count(), 10);
assert_eq!(stats.byte_count(), 100);
assert!(stats.update_elapsed > Duration::default());
assert!(stats.finish_elapsed > Duration::default());
assert!(stats.cleanup_eplased > Duration::default());
}
}

View File

@@ -0,0 +1,172 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::warn;
use futures::{AsyncRead, AsyncWrite};
use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use index::inverted_index::error as index_error;
use index::inverted_index::error::Result as IndexResult;
use snafu::ResultExt;
use crate::error::Result;
use crate::metrics::{
INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
};
use crate::sst::index::store::InstrumentedStore;
use crate::sst::location::IntermediateLocation;
/// `TempFileProvider` implements `ExternalTempFileProvider`.
/// It uses `InstrumentedStore` to create and read intermediate files.
pub(crate) struct TempFileProvider {
/// Provides the location of intermediate files.
location: IntermediateLocation,
/// Provides access to files in the object store.
store: InstrumentedStore,
}
#[async_trait]
impl ExternalTempFileProvider for TempFileProvider {
async fn create(
&self,
column_id: &str,
file_id: &str,
) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.location.file_path(column_id, file_id);
let writer = self
.store
.writer(
&path,
&INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
&INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
&INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
)
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
Ok(Box::new(writer))
}
async fn read_all(
&self,
column_id: &str,
) -> IndexResult<Vec<Box<dyn AsyncRead + Unpin + Send>>> {
let column_path = self.location.column_path(column_id);
let entries = self
.store
.list(&column_path)
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let mut readers = Vec::with_capacity(entries.len());
for entry in entries {
if entry.metadata().is_dir() {
warn!("Unexpected entry in index creation dir: {:?}", entry.path());
continue;
}
let reader = self
.store
.reader(
entry.path(),
&INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
&INDEX_INTERMEDIATE_READ_OP_TOTAL,
&INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
)
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
readers.push(Box::new(reader) as _);
}
Ok(readers)
}
}
impl TempFileProvider {
/// Creates a new `TempFileProvider`.
pub fn new(location: IntermediateLocation, store: InstrumentedStore) -> Self {
Self { location, store }
}
/// Removes all intermediate files.
pub async fn cleanup(&self) -> Result<()> {
self.store.remove_all(self.location.root_path()).await
}
}
#[cfg(test)]
mod tests {
use futures::{AsyncReadExt, AsyncWriteExt};
use object_store::services::Memory;
use object_store::ObjectStore;
use super::*;
use crate::sst::file::FileId;
#[tokio::test]
async fn test_temp_file_provider_basic() {
let location = IntermediateLocation::new("region_dir", &FileId::random());
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let store = InstrumentedStore::new(object_store);
let provider = TempFileProvider::new(location.clone(), store);
let column_name = "tag0";
let file_id = "0000000010";
let mut writer = provider.create(column_name, file_id).await.unwrap();
writer.write_all(b"hello").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let file_id = "0000000100";
let mut writer = provider.create(column_name, file_id).await.unwrap();
writer.write_all(b"world").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let column_name = "tag1";
let file_id = "0000000010";
let mut writer = provider.create(column_name, file_id).await.unwrap();
writer.write_all(b"foo").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let readers = provider.read_all("tag0").await.unwrap();
assert_eq!(readers.len(), 2);
for mut reader in readers {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert!(matches!(buf.as_slice(), b"hello" | b"world"));
}
let readers = provider.read_all("tag1").await.unwrap();
assert_eq!(readers.len(), 1);
let mut reader = readers.into_iter().next().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"foo");
provider.cleanup().await.unwrap();
assert!(provider
.store
.list(location.root_path())
.await
.unwrap()
.is_empty());
}
}

View File

@@ -75,6 +75,20 @@ impl InstrumentedStore {
flush_count,
))
}
/// Proxies to [`ObjectStore::list`].
pub async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
Ok(list)
}
/// Proxies to [`ObjectStore::remove_all`].
pub async fn remove_all(&self, path: &str) -> Result<()> {
self.object_store
.remove_all(path)
.await
.context(OpenDalSnafu)
}
}
/// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use object_store::util;
use uuid::Uuid;
use crate::sst::file::FileId;
@@ -29,8 +30,48 @@ pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String {
util::join_path(&dir, &sst_file_id.as_puffin())
}
/// `IntermediateLocation` produces paths for intermediate files
/// during external sorting.
#[derive(Debug, Clone)]
pub struct IntermediateLocation {
root_path: String,
}
impl IntermediateLocation {
/// Create a new `IntermediateLocation`. Set the root directory to
/// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/`, incorporating
/// uuid to differentiate active sorting files from orphaned data due to unexpected
/// process termination.
pub fn new(region_dir: &str, sst_file_id: &FileId) -> Self {
let uuid = Uuid::new_v4();
let child = format!("index/__intermediate/{sst_file_id}/{uuid}/");
Self {
root_path: util::join_path(region_dir, &child),
}
}
/// Returns the root directory of the intermediate files
pub fn root_path(&self) -> &str {
&self.root_path
}
/// Returns the path of the directory for intermediate files associated with a column:
/// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/`
pub fn column_path(&self, column_id: &str) -> String {
util::join_path(&self.root_path, &format!("{column_id}/"))
}
/// Returns the path of the intermediate file with the given id for a column:
/// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im`
pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String {
util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im"))
}
}
#[cfg(test)]
mod tests {
use regex::Regex;
use super::*;
#[test]
@@ -50,4 +91,33 @@ mod tests {
format!("region_dir/index/{file_id}.puffin")
);
}
#[test]
fn test_intermediate_location() {
let sst_file_id = FileId::random();
let location = IntermediateLocation::new("region_dir", &sst_file_id);
let re = Regex::new(&format!(
"region_dir/index/__intermediate/{sst_file_id}/{}/",
r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
))
.unwrap();
assert!(re.is_match(location.root_path()));
let uuid = location.root_path().split('/').nth(4).unwrap();
let column_id = "1";
assert_eq!(
location.column_path(column_id),
format!("region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/")
);
let im_file_id = "000000000010";
assert_eq!(
location.file_path(column_id, im_file_id),
format!(
"region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im"
)
);
}
}