From db98484796f1f900dc772c8eaec762b83c04c73b Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 9 Jan 2024 17:24:16 +0800 Subject: [PATCH] feat(inverted_index): introduce SstIndexCreator (#3107) * feat(inverted_index): introduce SstIndexCreator Signed-off-by: Zhenchi * chore: tiny polish Signed-off-by: Zhenchi * feat: distinguish intermediate store and index store Signed-off-by: Zhenchi * chore: move comment as doc comment Signed-off-by: Zhenchi * refactor: column id as index name Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/index/src/inverted_index/create.rs | 4 +- src/index/src/inverted_index/create/sort.rs | 4 +- src/index/src/inverted_index/error.rs | 10 +- src/mito2/src/error.rs | 40 ++- src/mito2/src/metrics.rs | 43 ++- src/mito2/src/row_converter.rs | 2 +- src/mito2/src/sst/index.rs | 8 + src/mito2/src/sst/index/applier/builder.rs | 33 +-- .../src/sst/index/applier/builder/between.rs | 6 +- .../sst/index/applier/builder/comparison.rs | 6 +- .../src/sst/index/applier/builder/eq_list.rs | 12 +- .../src/sst/index/applier/builder/in_list.rs | 6 +- .../sst/index/applier/builder/regex_match.rs | 6 +- src/mito2/src/sst/index/codec.rs | 99 ++++++- src/mito2/src/sst/index/creator.rs | 263 ++++++++++++++++++ src/mito2/src/sst/index/creator/statistics.rs | 160 +++++++++++ .../src/sst/index/creator/temp_provider.rs | 172 ++++++++++++ src/mito2/src/sst/index/store.rs | 14 + src/mito2/src/sst/location.rs | 70 +++++ 19 files changed, 911 insertions(+), 47 deletions(-) create mode 100644 src/mito2/src/sst/index/creator.rs create mode 100644 src/mito2/src/sst/index/creator/statistics.rs create mode 100644 src/mito2/src/sst/index/creator/temp_provider.rs diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index db6bf1ad25..e17f987b5c 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod sort; -mod sort_create; +pub mod sort; +pub mod sort_create; use async_trait::async_trait; diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs index 53a70fc7b5..3690178356 100644 --- a/src/index/src/inverted_index/create/sort.rs +++ b/src/index/src/inverted_index/create/sort.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod external_provider; -mod external_sort; +pub mod external_provider; +pub mod external_sort; mod intermediate_rw; mod merge_stream; diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 6e5f39006e..c3c40dddea 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::io::Error as IoError; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -167,6 +167,12 @@ pub enum Error { total_row_count: usize, expected_row_count: usize, }, + + #[snafu(display("External error"))] + External { + source: BoxedError, + location: Location, + }, } impl ErrorExt for Error { @@ -197,6 +203,8 @@ impl ErrorExt for Error { | FstInsert { .. } | InconsistentRowCount { .. } | IndexNotFound { .. } => StatusCode::InvalidArguments, + + External { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 86ac9abfe7..98b5883310 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -445,6 +445,21 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to push index value"))] + PushIndexValue { + source: index::inverted_index::error::Error, + location: Location, + }, + + #[snafu(display("Failed to write index completely"))] + IndexFinish { + source: index::inverted_index::error::Error, + location: Location, + }, + + #[snafu(display("Operate on aborted index"))] + OperateAbortedIndex { location: Location }, + #[snafu(display("Failed to read puffin metadata"))] PuffinReadMetadata { source: puffin::error::Error, @@ -463,6 +478,18 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to write puffin completely"))] + PuffinFinish { + source: puffin::error::Error, + location: Location, + }, + + #[snafu(display("Failed to add blob to puffin file"))] + PuffinAddBlob { + source: puffin::error::Error, + location: Location, + }, + #[snafu(display("Failed to clean dir {dir}"))] CleanDir { dir: String, @@ -522,6 +549,7 @@ impl ErrorExt for Error { | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } + | OperateAbortedIndex { .. } | PuffinBlobTypeNotFound { .. } | UnexpectedReplay { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, @@ -569,10 +597,14 @@ impl ErrorExt for Error { EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), - BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(), - PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => { - source.status_code() - } + BuildIndexApplier { source, .. } + | PushIndexValue { source, .. } + | ApplyIndex { source, .. } + | IndexFinish { source, .. } => source.status_code(), + PuffinReadMetadata { source, .. } + | PuffinReadBlob { source, .. } + | PuffinFinish { source, .. } + | PuffinAddBlob { source, .. } => source.status_code(), CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index fa7b199e57..eb84d6d595 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -150,7 +150,7 @@ lazy_static! { // Index metrics. /// Timer of index application. pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!( - "index_apply_elapsed", + "greptime_index_apply_elapsed", "index apply elapsed", ) .unwrap(); @@ -160,6 +160,26 @@ lazy_static! { "index apply memory usage", ) .unwrap(); + /// Timer of index creation. + pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_index_create_elapsed", + "index create elapsed", + &[STAGE_LABEL] + ) + .unwrap(); + /// Counter of rows indexed. + pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!( + "greptime_index_create_rows_total", + "index create rows total", + ) + .unwrap(); + /// Counter of created index bytes. + pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!( + "greptime_index_create_bytes_total", + "index create bytes total", + ) + .unwrap(); + /// Counter of r/w bytes on index related IO operations. pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( "greptime_index_io_bytes_total", @@ -170,6 +190,15 @@ lazy_static! { /// Counter of read bytes on puffin files. pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL .with_label_values(&["read", "puffin"]); + /// Counter of write bytes on puffin files. + pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL + .with_label_values(&["write", "puffin"]); + /// Counter of read bytes on intermediate files. + pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL + .with_label_values(&["read", "intermediate"]); + /// Counter of write bytes on intermediate files. + pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL + .with_label_values(&["write", "intermediate"]); /// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush. pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!( @@ -190,5 +219,17 @@ lazy_static! { /// Counter of flush operations on puffin files. pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL .with_label_values(&["flush", "puffin"]); + /// Counter of read operations on intermediate files. + pub static ref INDEX_INTERMEDIATE_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["read", "intermediate"]); + /// Counter of seek operations on intermediate files. + pub static ref INDEX_INTERMEDIATE_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["seek", "intermediate"]); + /// Counter of write operations on intermediate files. + pub static ref INDEX_INTERMEDIATE_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["write", "intermediate"]); + /// Counter of flush operations on intermediate files. + pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["flush", "intermediate"]); // ------- End of index metrics. } diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 33ef054335..29a8c8a63b 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -48,7 +48,7 @@ pub trait RowCodec { fn decode(&self, bytes: &[u8]) -> Result>; } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SortField { data_type: ConcreteDataType, } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 1f89612dee..7e6cefa992 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -16,6 +16,14 @@ pub mod applier; mod codec; +pub mod creator; mod store; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; + +// TODO(zhongzc): how to determine this value? +/// The minimum memory usage threshold for a column to qualify for external sorting during index creation. +const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192; + +/// The buffer size for the pipe used to send index data to the puffin blob. +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 240846a044..6779e817fd 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -32,14 +32,13 @@ use index::inverted_index::search::predicate::Predicate; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; +use store_api::storage::ColumnId; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::applier::SstIndexApplier; use crate::sst::index::codec::IndexValueCodec; -type ColumnName = String; - /// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. pub struct SstIndexApplierBuilder<'a> { /// Directory of the region, required argument for constructing [`SstIndexApplier`]. @@ -52,7 +51,7 @@ pub struct SstIndexApplierBuilder<'a> { metadata: &'a RegionMetadata, /// Stores predicates during traversal on the Expr tree. - output: HashMap>, + output: HashMap>, } impl<'a> SstIndexApplierBuilder<'a> { @@ -81,7 +80,11 @@ impl<'a> SstIndexApplierBuilder<'a> { return Ok(None); } - let predicates = self.output.into_iter().collect(); + let predicates = self + .output + .into_iter() + .map(|(column_id, predicates)| (column_id.to_string(), predicates)) + .collect(); let applier = PredicatesIndexApplier::try_from(predicates); Ok(Some(SstIndexApplier::new( self.region_dir, @@ -122,18 +125,16 @@ impl<'a> SstIndexApplierBuilder<'a> { } /// Helper function to add a predicate to the output. - fn add_predicate(&mut self, column_name: &str, predicate: Predicate) { - match self.output.get_mut(column_name) { - Some(predicates) => predicates.push(predicate), - None => { - self.output.insert(column_name.to_string(), vec![predicate]); - } - } + fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) { + self.output.entry(column_id).or_default().push(predicate); } - /// Helper function to get the column type of a tag column. + /// Helper function to get the column id and the column type of a tag column. /// Returns `None` if the column is not a tag column. - fn tag_column_type(&self, column_name: &str) -> Result> { + fn tag_column_id_and_type( + &self, + column_name: &str, + ) -> Result> { let column = self .metadata .column_by_name(column_name) @@ -142,7 +143,7 @@ impl<'a> SstIndexApplierBuilder<'a> { })?; Ok((column.semantic_type == SemanticType::Tag) - .then(|| column.column_schema.data_type.clone())) + .then(|| (column.column_id, column.column_schema.data_type.clone()))) } /// Helper function to get a non-null literal. @@ -303,7 +304,7 @@ mod tests { }); builder.traverse_and_collect(&expr); - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), 1); assert_eq!( predicates[0], @@ -311,7 +312,7 @@ mod tests { pattern: "bar".to_string() }) ); - let predicates = builder.output.get("b").unwrap(); + let predicates = builder.output.get(&2).unwrap(); assert_eq!(predicates.len(), 1); assert_eq!( predicates[0], diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs index 50ae7073b2..5fed21dcc2 100644 --- a/src/mito2/src/sst/index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -28,7 +28,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let Some(column_name) = Self::column_name(&between.expr) else { return Ok(()); }; - let Some(data_type) = self.tag_column_type(column_name)? else { + let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else { return Ok(()); }; let Some(low) = Self::nonnull_lit(&between.low) else { @@ -51,7 +51,7 @@ impl<'a> SstIndexApplierBuilder<'a> { }, }); - self.add_predicate(column_name, predicate); + self.add_predicate(column_id, predicate); Ok(()) } } @@ -80,7 +80,7 @@ mod tests { builder.collect_between(&between).unwrap(); - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), 1); assert_eq!( predicates[0], diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/applier/builder/comparison.rs index e132c1c928..31381c7871 100644 --- a/src/mito2/src/sst/index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/applier/builder/comparison.rs @@ -114,7 +114,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let Some(lit) = Self::nonnull_lit(literal) else { return Ok(()); }; - let Some(data_type) = self.tag_column_type(column_name)? else { + let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else { return Ok(()); }; @@ -122,7 +122,7 @@ impl<'a> SstIndexApplierBuilder<'a> { range: range_builder(Self::encode_lit(lit, data_type)?), }); - self.add_predicate(column_name, predicate); + self.add_predicate(column_id, predicate); Ok(()) } } @@ -230,7 +230,7 @@ mod tests { builder.collect_comparison_expr(left, op, right).unwrap(); } - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), cases.len()); for ((_, expected), actual) in cases.into_iter().zip(predicates) { assert_eq!( diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/applier/builder/eq_list.rs index 07e74e012d..6e28920e38 100644 --- a/src/mito2/src/sst/index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/applier/builder/eq_list.rs @@ -31,14 +31,14 @@ impl<'a> SstIndexApplierBuilder<'a> { let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else { return Ok(()); }; - let Some(data_type) = self.tag_column_type(column_name)? else { + let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else { return Ok(()); }; let predicate = Predicate::InList(InListPredicate { list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]), }); - self.add_predicate(column_name, predicate); + self.add_predicate(column_id, predicate); Ok(()) } @@ -59,7 +59,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else { return Ok(()); }; - let Some(data_type) = self.tag_column_type(column_name)? else { + let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else { return Ok(()); }; @@ -68,7 +68,7 @@ impl<'a> SstIndexApplierBuilder<'a> { if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? { let predicate = Predicate::InList(InListPredicate { list: inlist }); - self.add_predicate(column_name, predicate); + self.add_predicate(column_id, predicate); } Ok(()) @@ -142,7 +142,7 @@ mod tests { .collect_eq(&string_lit("bar"), &tag_column()) .unwrap(); - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), 2); assert_eq!( predicates[0], @@ -227,7 +227,7 @@ mod tests { builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap(); - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), 1); assert_eq!( predicates[0], diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/applier/builder/in_list.rs index cfb2b8738f..0d3081f1c0 100644 --- a/src/mito2/src/sst/index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/applier/builder/in_list.rs @@ -29,7 +29,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let Some(column_name) = Self::column_name(&inlist.expr) else { return Ok(()); }; - let Some(data_type) = self.tag_column_type(column_name)? else { + let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else { return Ok(()); }; @@ -46,7 +46,7 @@ impl<'a> SstIndexApplierBuilder<'a> { .insert(Self::encode_lit(lit, data_type.clone())?); } - self.add_predicate(column_name, Predicate::InList(predicate)); + self.add_predicate(column_id, Predicate::InList(predicate)); Ok(()) } } @@ -74,7 +74,7 @@ mod tests { builder.collect_inlist(&in_list).unwrap(); - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), 1); assert_eq!( predicates[0], diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/applier/builder/regex_match.rs index 1aa1cd9d95..48646334e7 100644 --- a/src/mito2/src/sst/index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/applier/builder/regex_match.rs @@ -25,7 +25,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let Some(column_name) = Self::column_name(column) else { return Ok(()); }; - let Some(data_type) = self.tag_column_type(column_name)? else { + let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else { return Ok(()); }; if !data_type.is_string() { @@ -38,7 +38,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let predicate = Predicate::RegexMatch(RegexMatchPredicate { pattern: pattern.clone(), }); - self.add_predicate(column_name, predicate); + self.add_predicate(column_id, predicate); Ok(()) } } @@ -62,7 +62,7 @@ mod tests { .collect_regex_match(&tag_column(), &string_lit("abc")) .unwrap(); - let predicates = builder.output.get("a").unwrap(); + let predicates = builder.output.get(&1).unwrap(); assert_eq!(predicates.len(), 1); assert_eq!( predicates[0], diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index ada5ac07cb..5962c185f4 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use datatypes::value::ValueRef; +use datatypes::value::{Value, ValueRef}; use memcomparable::Serializer; +use store_api::metadata::ColumnMetadata; use crate::error::Result; -use crate::row_converter::SortField; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Encodes index values according to their data types for sorting and storage use. pub struct IndexValueCodec; @@ -36,9 +37,65 @@ impl IndexValueCodec { } } +type ColumnId = String; + +/// Decodes primary key values into their corresponding column ids, data types and values. +pub struct IndexValuesCodec { + /// The tag column ids. + column_ids: Vec, + /// The data types of tag columns. + fields: Vec, + /// The decoder for the primary key. + decoder: McmpRowCodec, +} + +impl IndexValuesCodec { + /// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns. + pub fn from_tag_columns<'a>(tag_columns: impl Iterator) -> Self { + let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns + .map(|column| { + ( + column.column_id.to_string(), + SortField::new(column.column_schema.data_type.clone()), + ) + }) + .unzip(); + + let decoder = McmpRowCodec::new(fields.clone()); + Self { + column_ids, + fields, + decoder, + } + } + + /// Decodes a primary key into its corresponding column ids, data types and values. + pub fn decode( + &self, + primary_key: &[u8], + ) -> Result)>> { + let values = self.decoder.decode(primary_key)?; + + let iter = values + .into_iter() + .zip(&self.column_ids) + .zip(&self.fields) + .map(|((value, column_id), encoder)| { + if value.is_null() { + (column_id, encoder, None) + } else { + (column_id, encoder, Some(value)) + } + }); + + Ok(iter) + } +} + #[cfg(test)] mod tests { use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; use super::*; use crate::error::Error; @@ -62,4 +119,42 @@ mod tests { let res = IndexValueCodec::encode_value(value, &field, &mut buffer); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); } + + #[test] + fn test_decode_primary_key_basic() { + let tag_columns = vec![ + ColumnMetadata { + column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true), + semantic_type: api::v1::SemanticType::Tag, + column_id: 1, + }, + ColumnMetadata { + column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false), + semantic_type: api::v1::SemanticType::Tag, + column_id: 2, + }, + ]; + + let primary_key = McmpRowCodec::new(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::int64_datatype()), + ]) + .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter()) + .unwrap(); + + let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter()); + let mut iter = codec.decode(&primary_key).unwrap(); + + let (column_id, field, value) = iter.next().unwrap(); + assert_eq!(column_id, "1"); + assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype())); + assert_eq!(value, None); + + let (column_id, field, value) = iter.next().unwrap(); + assert_eq!(column_id, "2"); + assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype())); + assert_eq!(value, Some(Value::Int64(10))); + + assert!(iter.next().is_none()); + } } diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs new file mode 100644 index 0000000000..b88b47c139 --- /dev/null +++ b/src/mito2/src/sst/index/creator.rs @@ -0,0 +1,263 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod statistics; +mod temp_provider; + +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::Arc; + +use common_telemetry::warn; +use index::inverted_index::create::sort::external_sort::ExternalSorter; +use index::inverted_index::create::sort_create::SortIndexCreator; +use index::inverted_index::create::InvertedIndexCreator; +use index::inverted_index::format::writer::InvertedIndexBlobWriter; +use object_store::ObjectStore; +use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; +use snafu::{ensure, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use tokio::io::duplex; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::error::{ + IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PuffinFinishSnafu, + PushIndexValueSnafu, Result, +}; +use crate::metrics::{ + INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, +}; +use crate::read::Batch; +use crate::sst::file::FileId; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::creator::statistics::Statistics; +use crate::sst::index::creator::temp_provider::TempFileProvider; +use crate::sst::index::store::InstrumentedStore; +use crate::sst::index::{ + INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB, +}; +use crate::sst::location::{self, IntermediateLocation}; + +type ByteCount = usize; +type RowCount = usize; + +/// Creates SST index. +pub struct SstIndexCreator { + /// Directory of the region. + region_dir: String, + /// ID of the SST file. + sst_file_id: FileId, + + /// The store to write index files. + store: InstrumentedStore, + /// The index creator. + index_creator: Box, + /// The provider of intermediate files. + temp_file_provider: Arc, + + /// Codec for decoding primary keys. + codec: IndexValuesCodec, + /// Reusable buffer for encoding index values. + value_buf: Vec, + + /// Statistics of index creation. + stats: Statistics, + /// Whether the index creation is aborted. + aborted: bool, +} + +impl SstIndexCreator { + /// Creates a new `SstIndexCreator`. + /// Should ensure that the number of tag columns is greater than 0. + pub fn new( + region_dir: String, + sst_file_id: FileId, + metadata: &RegionMetadataRef, + index_store: ObjectStore, + intermediate_store: ObjectStore, // prefer to use local store + memory_usage_threshold: Option, + row_group_size: NonZeroUsize, + ) -> Self { + // `memory_usage_threshold` is the total memory usage threshold of the index creation, + // so we need to divide it by the number of columns + let memory_threshold = memory_usage_threshold.map(|threshold| { + (threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD) + }); + let temp_file_provider = Arc::new(TempFileProvider::new( + IntermediateLocation::new(®ion_dir, &sst_file_id), + InstrumentedStore::new(intermediate_store), + )); + let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold); + let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size)); + + let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + Self { + region_dir, + sst_file_id, + store: InstrumentedStore::new(index_store), + codec, + index_creator, + temp_file_provider, + + value_buf: vec![], + + stats: Statistics::default(), + aborted: false, + } + } + + /// Updates index with a batch of rows. + /// Garbage will be cleaned up if failed to update. + pub async fn update(&mut self, batch: &Batch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if batch.is_empty() { + return Ok(()); + } + + if let Err(update_err) = self.do_update(batch).await { + // clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + warn!( + err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}", + self.region_dir, self.sst_file_id, + ); + } + return Err(update_err); + } + + Ok(()) + } + + /// Finishes index creation and cleans up garbage. + /// Returns the number of rows and bytes written. + pub async fn finish(&mut self) -> Result<(RowCount, ByteCount)> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.stats.row_count() == 0 { + // no IO is performed, no garbage to clean up, just return + return Ok((0, 0)); + } + + let finish_res = self.do_finish().await; + // clean up garbage no matter finish successfully or not + if let Err(err) = self.do_cleanup().await { + warn!( + err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}", + self.region_dir, self.sst_file_id, + ); + } + + finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) + } + + /// Aborts index creation and clean up garbage. + pub async fn abort(&mut self) -> Result<()> { + if self.aborted { + return Ok(()); + } + self.aborted = true; + + self.do_cleanup().await + } + + async fn do_update(&mut self, batch: &Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + + let n = batch.num_rows(); + guard.inc_row_count(n); + + for (column_id, field, value) in self.codec.decode(batch.primary_key())? { + if let Some(value) = value.as_ref() { + self.value_buf.clear(); + IndexValueCodec::encode_value(value.as_value_ref(), field, &mut self.value_buf)?; + } + + // non-null value -> Some(encoded_bytes), null value -> None + let value = value.is_some().then_some(self.value_buf.as_slice()); + self.index_creator + .push_with_name_n(column_id, value, n) + .await + .context(PushIndexValueSnafu)?; + } + + Ok(()) + } + + /// Data flow of finishing index: + /// + /// ```text + /// (In Memory Buffer) + /// ┌──────┐ + /// ┌─────────────┐ │ PIPE │ + /// │ │ write index data │ │ + /// │ IndexWriter ├──────────────────►│ tx │ + /// │ │ │ │ + /// └─────────────┘ │ │ + /// ┌─────────────────┤ rx │ + /// ┌─────────────┐ │ read as blob └──────┘ + /// │ │ │ + /// │ PuffinWriter├─┤ + /// │ │ │ copy to file ┌──────┐ + /// └─────────────┘ └────────────────►│ File │ + /// └──────┘ + /// ``` + async fn do_finish(&mut self) -> Result<()> { + let mut guard = self.stats.record_finish(); + + let file_path = location::index_file_path(&self.region_dir, self.sst_file_id); + let file_writer = self + .store + .writer( + &file_path, + &INDEX_PUFFIN_WRITE_BYTES_TOTAL, + &INDEX_PUFFIN_WRITE_OP_TOTAL, + &INDEX_PUFFIN_FLUSH_OP_TOTAL, + ) + .await?; + let mut puffin_writer = PuffinFileWriter::new(file_writer); + + let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); + let blob = Blob { + blob_type: INDEX_BLOB_TYPE.to_string(), + data: rx.compat(), + properties: HashMap::default(), + }; + let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write()); + + let (index_finish, puffin_add_blob) = futures::join!( + self.index_creator.finish(&mut index_writer), + puffin_writer.add_blob(blob) + ); + index_finish.context(IndexFinishSnafu)?; + puffin_add_blob.context(PuffinAddBlobSnafu)?; + + let byte_count = puffin_writer.finish().await.context(PuffinFinishSnafu)?; + guard.inc_byte_count(byte_count); + Ok(()) + } + + async fn do_cleanup(&mut self) -> Result<()> { + let _guard = self.stats.record_cleanup(); + + self.temp_file_provider.cleanup().await + } +} + +#[cfg(test)] +mod tests { + // TODO(zhongzc): This PR has grown quite large, and the SstIndexCreator deserves + // a significant number of unit tests. These unit tests are substantial enough to + // make up a large PR on their own. I will bring them in with the next PR. +} diff --git a/src/mito2/src/sst/index/creator/statistics.rs b/src/mito2/src/sst/index/creator/statistics.rs new file mode 100644 index 0000000000..e4f30db7a9 --- /dev/null +++ b/src/mito2/src/sst/index/creator/statistics.rs @@ -0,0 +1,160 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::{Duration, Instant}; + +use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_ELAPSED, INDEX_CREATE_ROWS_TOTAL}; + +/// Stage of the index creation process. +enum Stage { + Update, + Finish, + Cleanup, +} + +/// Statistics for index creation. Flush metrics when dropped. +#[derive(Default)] +pub(crate) struct Statistics { + /// Accumulated elapsed time for the index update stage. + update_elapsed: Duration, + /// Accumulated elapsed time for the index finish stage. + finish_elapsed: Duration, + /// Accumulated elapsed time for the cleanup stage. + cleanup_eplased: Duration, + /// Number of rows in the index. + row_count: usize, + /// Number of bytes in the index. + byte_count: usize, +} + +impl Statistics { + /// Starts timing the update stage, returning a `TimerGuard` to automatically record duration. + #[must_use] + pub fn record_update(&mut self) -> TimerGuard<'_> { + TimerGuard::new(self, Stage::Update) + } + + /// Starts timing the finish stage, returning a `TimerGuard` to automatically record duration. + #[must_use] + pub fn record_finish(&mut self) -> TimerGuard<'_> { + TimerGuard::new(self, Stage::Finish) + } + + /// Starts timing the cleanup stage, returning a `TimerGuard` to automatically record duration. + #[must_use] + pub fn record_cleanup(&mut self) -> TimerGuard<'_> { + TimerGuard::new(self, Stage::Cleanup) + } + + /// Returns row count. + pub fn row_count(&self) -> usize { + self.row_count + } + + /// Returns byte count. + pub fn byte_count(&self) -> usize { + self.byte_count + } +} + +impl Drop for Statistics { + fn drop(&mut self) { + INDEX_CREATE_ELAPSED + .with_label_values(&["update"]) + .observe(self.update_elapsed.as_secs_f64()); + INDEX_CREATE_ELAPSED + .with_label_values(&["finish"]) + .observe(self.finish_elapsed.as_secs_f64()); + INDEX_CREATE_ELAPSED + .with_label_values(&["cleanup"]) + .observe(self.cleanup_eplased.as_secs_f64()); + INDEX_CREATE_ELAPSED.with_label_values(&["total"]).observe( + (self.update_elapsed + self.finish_elapsed + self.cleanup_eplased).as_secs_f64(), + ); + + INDEX_CREATE_ROWS_TOTAL.inc_by(self.row_count as _); + INDEX_CREATE_BYTES_TOTAL.inc_by(self.byte_count as _); + } +} + +/// `TimerGuard` is a RAII struct that ensures elapsed time +/// is recorded when it goes out of scope. +pub(crate) struct TimerGuard<'a> { + stats: &'a mut Statistics, + stage: Stage, + timer: Instant, +} + +impl<'a> TimerGuard<'a> { + /// Creates a new `TimerGuard`, + fn new(stats: &'a mut Statistics, stage: Stage) -> Self { + Self { + stats, + stage, + timer: Instant::now(), + } + } + + /// Increases the row count of the index creation statistics. + pub fn inc_row_count(&mut self, n: usize) { + self.stats.row_count += n; + } + + /// Increases the byte count of the index creation statistics. + pub fn inc_byte_count(&mut self, n: usize) { + self.stats.byte_count += n; + } +} + +impl Drop for TimerGuard<'_> { + fn drop(&mut self) { + match self.stage { + Stage::Update => { + self.stats.update_elapsed += self.timer.elapsed(); + } + Stage::Finish => { + self.stats.finish_elapsed += self.timer.elapsed(); + } + Stage::Cleanup => { + self.stats.cleanup_eplased += self.timer.elapsed(); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_statistics_basic() { + let mut stats = Statistics::default(); + { + let mut guard = stats.record_update(); + guard.inc_byte_count(100); + guard.inc_row_count(10); + } + { + let _guard = stats.record_finish(); + } + { + let _guard = stats.record_cleanup(); + } + assert_eq!(stats.row_count(), 10); + assert_eq!(stats.byte_count(), 100); + assert!(stats.update_elapsed > Duration::default()); + assert!(stats.finish_elapsed > Duration::default()); + assert!(stats.cleanup_eplased > Duration::default()); + } +} diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/creator/temp_provider.rs new file mode 100644 index 0000000000..d8dfff3d7d --- /dev/null +++ b/src/mito2/src/sst/index/creator/temp_provider.rs @@ -0,0 +1,172 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_telemetry::warn; +use futures::{AsyncRead, AsyncWrite}; +use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider; +use index::inverted_index::error as index_error; +use index::inverted_index::error::Result as IndexResult; +use snafu::ResultExt; + +use crate::error::Result; +use crate::metrics::{ + INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, +}; +use crate::sst::index::store::InstrumentedStore; +use crate::sst::location::IntermediateLocation; + +/// `TempFileProvider` implements `ExternalTempFileProvider`. +/// It uses `InstrumentedStore` to create and read intermediate files. +pub(crate) struct TempFileProvider { + /// Provides the location of intermediate files. + location: IntermediateLocation, + /// Provides access to files in the object store. + store: InstrumentedStore, +} + +#[async_trait] +impl ExternalTempFileProvider for TempFileProvider { + async fn create( + &self, + column_id: &str, + file_id: &str, + ) -> IndexResult> { + let path = self.location.file_path(column_id, file_id); + let writer = self + .store + .writer( + &path, + &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, + &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, + &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + Ok(Box::new(writer)) + } + + async fn read_all( + &self, + column_id: &str, + ) -> IndexResult>> { + let column_path = self.location.column_path(column_id); + let entries = self + .store + .list(&column_path) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + let mut readers = Vec::with_capacity(entries.len()); + + for entry in entries { + if entry.metadata().is_dir() { + warn!("Unexpected entry in index creation dir: {:?}", entry.path()); + continue; + } + + let reader = self + .store + .reader( + entry.path(), + &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + &INDEX_INTERMEDIATE_READ_OP_TOTAL, + &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + readers.push(Box::new(reader) as _); + } + + Ok(readers) + } +} + +impl TempFileProvider { + /// Creates a new `TempFileProvider`. + pub fn new(location: IntermediateLocation, store: InstrumentedStore) -> Self { + Self { location, store } + } + + /// Removes all intermediate files. + pub async fn cleanup(&self) -> Result<()> { + self.store.remove_all(self.location.root_path()).await + } +} + +#[cfg(test)] +mod tests { + use futures::{AsyncReadExt, AsyncWriteExt}; + use object_store::services::Memory; + use object_store::ObjectStore; + + use super::*; + use crate::sst::file::FileId; + + #[tokio::test] + async fn test_temp_file_provider_basic() { + let location = IntermediateLocation::new("region_dir", &FileId::random()); + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let store = InstrumentedStore::new(object_store); + let provider = TempFileProvider::new(location.clone(), store); + + let column_name = "tag0"; + let file_id = "0000000010"; + let mut writer = provider.create(column_name, file_id).await.unwrap(); + writer.write_all(b"hello").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_id = "0000000100"; + let mut writer = provider.create(column_name, file_id).await.unwrap(); + writer.write_all(b"world").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let column_name = "tag1"; + let file_id = "0000000010"; + let mut writer = provider.create(column_name, file_id).await.unwrap(); + writer.write_all(b"foo").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let readers = provider.read_all("tag0").await.unwrap(); + assert_eq!(readers.len(), 2); + for mut reader in readers { + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert!(matches!(buf.as_slice(), b"hello" | b"world")); + } + let readers = provider.read_all("tag1").await.unwrap(); + assert_eq!(readers.len(), 1); + let mut reader = readers.into_iter().next().unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"foo"); + + provider.cleanup().await.unwrap(); + + assert!(provider + .store + .list(location.root_path()) + .await + .unwrap() + .is_empty()); + } +} diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index dbdbcbf780..a5e66c708e 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -75,6 +75,20 @@ impl InstrumentedStore { flush_count, )) } + + /// Proxies to [`ObjectStore::list`]. + pub async fn list(&self, path: &str) -> Result> { + let list = self.object_store.list(path).await.context(OpenDalSnafu)?; + Ok(list) + } + + /// Proxies to [`ObjectStore::remove_all`]. + pub async fn remove_all(&self, path: &str) -> Result<()> { + self.object_store + .remove_all(path) + .await + .context(OpenDalSnafu) + } } /// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs index 179e9159c9..d3b69d9c73 100644 --- a/src/mito2/src/sst/location.rs +++ b/src/mito2/src/sst/location.rs @@ -13,6 +13,7 @@ // limitations under the License. use object_store::util; +use uuid::Uuid; use crate::sst::file::FileId; @@ -29,8 +30,48 @@ pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String { util::join_path(&dir, &sst_file_id.as_puffin()) } +/// `IntermediateLocation` produces paths for intermediate files +/// during external sorting. +#[derive(Debug, Clone)] +pub struct IntermediateLocation { + root_path: String, +} + +impl IntermediateLocation { + /// Create a new `IntermediateLocation`. Set the root directory to + /// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/`, incorporating + /// uuid to differentiate active sorting files from orphaned data due to unexpected + /// process termination. + pub fn new(region_dir: &str, sst_file_id: &FileId) -> Self { + let uuid = Uuid::new_v4(); + let child = format!("index/__intermediate/{sst_file_id}/{uuid}/"); + Self { + root_path: util::join_path(region_dir, &child), + } + } + + /// Returns the root directory of the intermediate files + pub fn root_path(&self) -> &str { + &self.root_path + } + + /// Returns the path of the directory for intermediate files associated with a column: + /// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/` + pub fn column_path(&self, column_id: &str) -> String { + util::join_path(&self.root_path, &format!("{column_id}/")) + } + + /// Returns the path of the intermediate file with the given id for a column: + /// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im` + pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String { + util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im")) + } +} + #[cfg(test)] mod tests { + use regex::Regex; + use super::*; #[test] @@ -50,4 +91,33 @@ mod tests { format!("region_dir/index/{file_id}.puffin") ); } + + #[test] + fn test_intermediate_location() { + let sst_file_id = FileId::random(); + let location = IntermediateLocation::new("region_dir", &sst_file_id); + + let re = Regex::new(&format!( + "region_dir/index/__intermediate/{sst_file_id}/{}/", + r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}" + )) + .unwrap(); + assert!(re.is_match(location.root_path())); + + let uuid = location.root_path().split('/').nth(4).unwrap(); + + let column_id = "1"; + assert_eq!( + location.column_path(column_id), + format!("region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/") + ); + + let im_file_id = "000000000010"; + assert_eq!( + location.file_path(column_id, im_file_id), + format!( + "region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im" + ) + ); + } }