From 4015dd80752e1e6aaa3d7cacc3203cb67ed9be6d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 11 Jan 2023 21:16:07 +0800 Subject: [PATCH] feat: record sst file time range in FileMeta (#860) * feat: record sst file time range in FileMeta * fix: clippy * chore: add some log and doc --- src/storage/src/error.rs | 42 ++------ src/storage/src/flush.rs | 10 +- src/storage/src/manifest/test_utils.rs | 4 + src/storage/src/sst.rs | 17 +++- src/storage/src/sst/parquet.rs | 128 +++++++++++++++++++++++-- 5 files changed, 149 insertions(+), 52 deletions(-) diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 2219c0bf00..fbb121eff3 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -43,9 +43,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Missing timestamp in write batch"))] - BatchMissingTimestamp { backtrace: Backtrace }, - #[snafu(display("Failed to write columns, source: {}", source))] FlushIo { source: object_store::Error, @@ -184,13 +181,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("IO failed while reading Parquet file: {}, source: {}", file, source))] - ReadParquetIo { - file: String, - source: std::io::Error, - backtrace: Backtrace, - }, - #[snafu(display("Region is under {} state, cannot proceed operation", state))] InvalidRegionState { state: &'static str, @@ -222,12 +212,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Region version not found in manifest, the region: {}", region_name))] - VersionNotFound { - region_name: String, - backtrace: Backtrace, - }, - #[snafu(display( "Sequence of region should increase monotonically (should be {} < {})", prev, @@ -317,18 +301,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Timestamp column type illegal, data type: {:?}", data_type))] - IllegalTimestampColumnType { data_type: ConcreteDataType }, - - #[snafu(display( - "Failed to convert between ColumnSchema and ColumnMetadata, source: {}", - source - ))] - ConvertColumnSchema { - #[snafu(backtrace)] - source: MetadataError, - }, - #[snafu(display("Incompatible schema to read, reason: {}", reason))] CompatRead { reason: String, @@ -437,6 +409,9 @@ pub enum Error { #[snafu(display("More columns than expected in the request"))] MoreColumnThanExpected { backtrace: Backtrace }, + + #[snafu(display("Failed to decode parquet file time range, msg: {}", msg))] + DecodeParquetTimeRange { msg: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -448,12 +423,10 @@ impl ErrorExt for Error { match self { InvalidScanIndex { .. } | BatchMissingColumn { .. } - | BatchMissingTimestamp { .. } | InvalidProjection { .. } | BuildBatch { .. } | NotInSchemaToCompat { .. } | WriteToOldVersion { .. } - | IllegalTimestampColumnType { .. } | CreateRecordBatch { .. } | RequestTooLarge { .. } | TypeMismatch { .. } @@ -469,7 +442,6 @@ impl ErrorExt for Error { | DecodeMetaActionList { .. } | Readline { .. } | WalDataCorrupted { .. } - | VersionNotFound { .. } | SequenceNotMonotonic { .. } | ConvertStoreSchema { .. } | InvalidRawRegion { .. } @@ -496,19 +468,19 @@ impl ErrorExt for Error { | ManifestProtocolForbidRead { .. } | ManifestProtocolForbidWrite { .. } | ReadParquet { .. } - | ReadParquetIo { .. } | InvalidRegionState { .. } | ReadWal { .. } => StatusCode::StorageUnavailable, UnknownColumn { .. } => StatusCode::TableColumnNotFound, - InvalidAlterRequest { source, .. } - | InvalidRegionDesc { source, .. } - | ConvertColumnSchema { source, .. } => source.status_code(), + InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => { + source.status_code() + } PushBatch { source, .. } => source.status_code(), CreateDefault { source, .. } => source.status_code(), ConvertChunk { source, .. } => source.status_code(), MarkWalObsolete { source, .. } => source.status_code(), + DecodeParquetTimeRange { .. } => StatusCode::Unexpected, } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index cea35e5525..f952591eb6 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -27,7 +27,7 @@ use crate::manifest::action::*; use crate::manifest::region::RegionManifest; use crate::memtable::{IterContext, MemtableId, MemtableRef}; use crate::region::{RegionWriterRef, SharedDataRef}; -use crate::sst::{AccessLayerRef, FileMeta, WriteOptions}; +use crate::sst::{AccessLayerRef, FileMeta, SstInfo, WriteOptions}; use crate::wal::Wal; /// Default write buffer size (32M). @@ -185,12 +185,18 @@ impl FlushJob { // TODO(hl): Check if random file name already exists in meta. let iter = m.iter(&iter_ctx)?; futures.push(async move { - self.sst_layer + let SstInfo { + start_timestamp, + end_timestamp, + } = self + .sst_layer .write_sst(&file_name, iter, &WriteOptions::default()) .await?; Ok(FileMeta { file_name, + start_timestamp, + end_timestamp, level: 0, }) }); diff --git a/src/storage/src/manifest/test_utils.rs b/src/storage/src/manifest/test_utils.rs index 7e7d1e1809..e840051ec2 100644 --- a/src/storage/src/manifest/test_utils.rs +++ b/src/storage/src/manifest/test_utils.rs @@ -42,6 +42,8 @@ pub fn build_region_edit( .iter() .map(|f| FileMeta { file_name: f.to_string(), + start_timestamp: None, + end_timestamp: None, level: 0, }) .collect(), @@ -49,6 +51,8 @@ pub fn build_region_edit( .iter() .map(|f| FileMeta { file_name: f.to_string(), + start_timestamp: None, + end_timestamp: None, level: 0, }) .collect(), diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 3f317b2eb1..c7fd8bceb2 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -17,6 +17,7 @@ mod parquet; use std::sync::Arc; use async_trait::async_trait; +use common_time::Timestamp; use object_store::{util, ObjectStore}; use serde::{Deserialize, Serialize}; use table::predicate::Predicate; @@ -176,6 +177,8 @@ impl FileHandleInner { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct FileMeta { pub file_name: String, + pub start_timestamp: Option, + pub end_timestamp: Option, /// SST level of the file. pub level: u8, } @@ -195,6 +198,12 @@ pub struct ReadOptions { pub predicate: Predicate, } +#[derive(Debug)] +pub struct SstInfo { + pub start_timestamp: Option, + pub end_timestamp: Option, +} + /// SST access layer. #[async_trait] pub trait AccessLayer: Send + Sync + std::fmt::Debug { @@ -204,7 +213,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { file_name: &str, iter: BoxedBatchIterator, opts: &WriteOptions, - ) -> Result<()>; + ) -> Result; /// Read SST file with given `file_name` and schema. async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result; @@ -240,14 +249,12 @@ impl AccessLayer for FsAccessLayer { file_name: &str, iter: BoxedBatchIterator, opts: &WriteOptions, - ) -> Result<()> { + ) -> Result { // Now we only supports parquet format. We may allow caller to specific SST format in // WriteOptions in the future. let file_path = self.sst_file_path(file_name); let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone()); - - writer.write_sst(opts).await?; - Ok(()) + writer.write_sst(opts).await } async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result { diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 2cc7547b8e..bb8523baad 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -21,26 +21,32 @@ use std::sync::Arc; use async_compat::CompatExt; use async_stream::try_stream; use async_trait::async_trait; +use common_telemetry::error; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::ConcreteDataType; use futures_util::{Stream, StreamExt, TryStreamExt}; use object_store::ObjectStore; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{Compression, Encoding}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; -use snafu::ResultExt; +use parquet::format::FileMetaData; +use snafu::{OptionExt, ResultExt}; use table::predicate::Predicate; use tokio::io::BufReader; use crate::error::{ - self, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu, Result, WriteObjectSnafu, - WriteParquetSnafu, + self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu, + Result, WriteObjectSnafu, WriteParquetSnafu, }; use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; use crate::schema::compat::ReadAdapter; -use crate::schema::{ProjectedSchemaRef, StoreSchema}; +use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef}; use crate::sst; +use crate::sst::SstInfo; /// Parquet sst writer. pub struct ParquetWriter<'a> { @@ -64,14 +70,14 @@ impl<'a> ParquetWriter<'a> { } } - pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<()> { + pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result { self.write_rows(None).await } /// Iterates memtable and writes rows to Parquet file. /// A chunk of records yielded from each iteration with a size given /// in config will be written to a single row group. - async fn write_rows(self, extra_meta: Option>) -> Result<()> { + async fn write_rows(self, extra_meta: Option>) -> Result { let projected_schema = self.iter.schema(); let store_schema = projected_schema.schema_to_read(); let schema = store_schema.arrow_schema().clone(); @@ -109,14 +115,101 @@ impl<'a> ParquetWriter<'a> { .write(&arrow_batch) .context(WriteParquetSnafu)?; } - arrow_writer.close().context(WriteParquetSnafu)?; + + let file_meta = arrow_writer.close().context(WriteParquetSnafu)?; + + let (start_timestamp, end_timestamp) = + match decode_timestamp_range(&file_meta, store_schema) { + Ok(Some((start, end))) => (Some(start), Some(end)), + Ok(None) => (None, None), + Err(e) => { + error!(e;"Failed to calculate time range of parquet file"); + (None, None) + } + }; + object.write(buf).await.context(WriteObjectSnafu { path: object.path(), })?; - Ok(()) + Ok(SstInfo { + start_timestamp, + end_timestamp, + }) } } +fn decode_timestamp_range( + file_meta: &FileMetaData, + store_schema: &StoreSchemaRef, +) -> Result> { + let schema = store_schema.schema(); + let (Some(ts_col_idx), Some(ts_col)) = (schema.timestamp_index(), schema.timestamp_column()) else { return Ok(None); }; + let ts_datatype = &ts_col.data_type; + decode_timestamp_range_inner(file_meta, ts_col_idx, ts_datatype) +} + +fn decode_timestamp_range_inner( + file_meta: &FileMetaData, + ts_index: usize, + ts_datatype: &ConcreteDataType, +) -> Result> { + let mut start = i64::MAX; + let mut end = i64::MIN; + + let unit = match ts_datatype { + ConcreteDataType::Int64(_) => TimeUnit::Millisecond, + ConcreteDataType::Timestamp(type_) => type_.unit(), + _ => { + return DecodeParquetTimeRangeSnafu { + msg: format!("Unexpected timestamp column datatype: {ts_datatype:?}"), + } + .fail(); + } + }; + + for rg in &file_meta.row_groups { + let Some(ref metadata) = rg + .columns + .get(ts_index) + .context(DecodeParquetTimeRangeSnafu { + msg: format!("Cannot find ts column by index: {ts_index}"), + })? + .meta_data else { return Ok(None) }; + let Some(stats) = &metadata.statistics else { return Ok(None) }; + let (Some(min_value), Some(max_value)) = (&stats.min_value, &stats.max_value) else { return Ok(None); }; + + // according to [parquet's spec](https://parquet.apache.org/docs/file-format/data-pages/encodings/), min/max value in stats uses plain encoding with little endian. + // also see https://github.com/apache/arrow-rs/blob/5fb337db04a1a19f7d40da46f19b7b5fd4051593/parquet/src/file/statistics.rs#L172 + let min = i64::from_le_bytes(min_value[..8].try_into().map_err(|e| { + error!( + "Failed to decode min value from stats, bytes: {:?}, source: {:?}", + min_value, e + ); + DecodeParquetTimeRangeSnafu { + msg: "decode min value", + } + .build() + })?); + let max = i64::from_le_bytes(max_value[..8].try_into().map_err(|e| { + error!( + "Failed to decode max value from stats, bytes: {:?}, source: {:?}", + max_value, e + ); + DecodeParquetTimeRangeSnafu { + msg: "decode max value", + } + .build() + })?); + start = start.min(min); + end = end.max(max); + } + + Ok(Some(( + Timestamp::new(start, unit), + Timestamp::new(end, unit), + ))) +} + pub struct ParquetReader<'a> { file_path: &'a str, object_store: ObjectStore, @@ -374,11 +467,20 @@ mod tests { let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); - writer + let SstInfo { + start_timestamp, + end_timestamp, + } = writer .write_sst(&sst::WriteOptions::default()) .await .unwrap(); + assert_eq!(Some(Timestamp::new_millisecond(0)), start_timestamp); + assert_eq!( + Some(Timestamp::new_millisecond((rows_total - 1) as i64)), + end_timestamp + ); + let operator = ObjectStore::new( object_store::backend::fs::Builder::default() .root(dir.path().to_str().unwrap()) @@ -438,11 +540,17 @@ mod tests { let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); - writer + let SstInfo { + start_timestamp, + end_timestamp, + } = writer .write_sst(&sst::WriteOptions::default()) .await .unwrap(); + assert_eq!(Some(Timestamp::new_millisecond(1000)), start_timestamp); + assert_eq!(Some(Timestamp::new_millisecond(2003)), end_timestamp); + let operator = ObjectStore::new( object_store::backend::fs::Builder::default() .root(dir.path().to_str().unwrap())