feat: record sst file time range in FileMeta (#860)

* feat: record sst file time range in FileMeta

* fix: clippy

* chore: add some log and doc
This commit is contained in:
Lei, HUANG
2023-01-11 21:16:07 +08:00
committed by GitHub
parent b39dbcbda9
commit 4015dd8075
5 changed files with 149 additions and 52 deletions

View File

@@ -43,9 +43,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Missing timestamp in write batch"))]
BatchMissingTimestamp { backtrace: Backtrace },
#[snafu(display("Failed to write columns, source: {}", source))]
FlushIo {
source: object_store::Error,
@@ -184,13 +181,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("IO failed while reading Parquet file: {}, source: {}", file, source))]
ReadParquetIo {
file: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Region is under {} state, cannot proceed operation", state))]
InvalidRegionState {
state: &'static str,
@@ -222,12 +212,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Region version not found in manifest, the region: {}", region_name))]
VersionNotFound {
region_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Sequence of region should increase monotonically (should be {} < {})",
prev,
@@ -317,18 +301,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Timestamp column type illegal, data type: {:?}", data_type))]
IllegalTimestampColumnType { data_type: ConcreteDataType },
#[snafu(display(
"Failed to convert between ColumnSchema and ColumnMetadata, source: {}",
source
))]
ConvertColumnSchema {
#[snafu(backtrace)]
source: MetadataError,
},
#[snafu(display("Incompatible schema to read, reason: {}", reason))]
CompatRead {
reason: String,
@@ -437,6 +409,9 @@ pub enum Error {
#[snafu(display("More columns than expected in the request"))]
MoreColumnThanExpected { backtrace: Backtrace },
#[snafu(display("Failed to decode parquet file time range, msg: {}", msg))]
DecodeParquetTimeRange { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -448,12 +423,10 @@ impl ErrorExt for Error {
match self {
InvalidScanIndex { .. }
| BatchMissingColumn { .. }
| BatchMissingTimestamp { .. }
| InvalidProjection { .. }
| BuildBatch { .. }
| NotInSchemaToCompat { .. }
| WriteToOldVersion { .. }
| IllegalTimestampColumnType { .. }
| CreateRecordBatch { .. }
| RequestTooLarge { .. }
| TypeMismatch { .. }
@@ -469,7 +442,6 @@ impl ErrorExt for Error {
| DecodeMetaActionList { .. }
| Readline { .. }
| WalDataCorrupted { .. }
| VersionNotFound { .. }
| SequenceNotMonotonic { .. }
| ConvertStoreSchema { .. }
| InvalidRawRegion { .. }
@@ -496,19 +468,19 @@ impl ErrorExt for Error {
| ManifestProtocolForbidRead { .. }
| ManifestProtocolForbidWrite { .. }
| ReadParquet { .. }
| ReadParquetIo { .. }
| InvalidRegionState { .. }
| ReadWal { .. } => StatusCode::StorageUnavailable,
UnknownColumn { .. } => StatusCode::TableColumnNotFound,
InvalidAlterRequest { source, .. }
| InvalidRegionDesc { source, .. }
| ConvertColumnSchema { source, .. } => source.status_code(),
InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => {
source.status_code()
}
PushBatch { source, .. } => source.status_code(),
CreateDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
DecodeParquetTimeRange { .. } => StatusCode::Unexpected,
}
}

View File

@@ -27,7 +27,7 @@ use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileMeta, WriteOptions};
use crate::sst::{AccessLayerRef, FileMeta, SstInfo, WriteOptions};
use crate::wal::Wal;
/// Default write buffer size (32M).
@@ -185,12 +185,18 @@ impl<S: LogStore> FlushJob<S> {
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
futures.push(async move {
self.sst_layer
let SstInfo {
start_timestamp,
end_timestamp,
} = self
.sst_layer
.write_sst(&file_name, iter, &WriteOptions::default())
.await?;
Ok(FileMeta {
file_name,
start_timestamp,
end_timestamp,
level: 0,
})
});

View File

@@ -42,6 +42,8 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
file_name: f.to_string(),
start_timestamp: None,
end_timestamp: None,
level: 0,
})
.collect(),
@@ -49,6 +51,8 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
file_name: f.to_string(),
start_timestamp: None,
end_timestamp: None,
level: 0,
})
.collect(),

View File

@@ -17,6 +17,7 @@ mod parquet;
use std::sync::Arc;
use async_trait::async_trait;
use common_time::Timestamp;
use object_store::{util, ObjectStore};
use serde::{Deserialize, Serialize};
use table::predicate::Predicate;
@@ -176,6 +177,8 @@ impl FileHandleInner {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileMeta {
pub file_name: String,
pub start_timestamp: Option<Timestamp>,
pub end_timestamp: Option<Timestamp>,
/// SST level of the file.
pub level: u8,
}
@@ -195,6 +198,12 @@ pub struct ReadOptions {
pub predicate: Predicate,
}
#[derive(Debug)]
pub struct SstInfo {
pub start_timestamp: Option<Timestamp>,
pub end_timestamp: Option<Timestamp>,
}
/// SST access layer.
#[async_trait]
pub trait AccessLayer: Send + Sync + std::fmt::Debug {
@@ -204,7 +213,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
file_name: &str,
iter: BoxedBatchIterator,
opts: &WriteOptions,
) -> Result<()>;
) -> Result<SstInfo>;
/// Read SST file with given `file_name` and schema.
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader>;
@@ -240,14 +249,12 @@ impl AccessLayer for FsAccessLayer {
file_name: &str,
iter: BoxedBatchIterator,
opts: &WriteOptions,
) -> Result<()> {
) -> Result<SstInfo> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(file_name);
let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone());
writer.write_sst(opts).await?;
Ok(())
writer.write_sst(opts).await
}
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader> {

View File

@@ -21,26 +21,32 @@ use std::sync::Arc;
use async_compat::CompatExt;
use async_stream::try_stream;
use async_trait::async_trait;
use common_telemetry::error;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use futures_util::{Stream, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{Compression, Encoding};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use parquet::format::FileMetaData;
use snafu::{OptionExt, ResultExt};
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::error::{
self, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu, Result, WriteObjectSnafu,
WriteParquetSnafu,
self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu,
Result, WriteObjectSnafu, WriteParquetSnafu,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchReader};
use crate::schema::compat::ReadAdapter;
use crate::schema::{ProjectedSchemaRef, StoreSchema};
use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef};
use crate::sst;
use crate::sst::SstInfo;
/// Parquet sst writer.
pub struct ParquetWriter<'a> {
@@ -64,14 +70,14 @@ impl<'a> ParquetWriter<'a> {
}
}
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<()> {
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<SstInfo> {
self.write_rows(None).await
}
/// Iterates memtable and writes rows to Parquet file.
/// A chunk of records yielded from each iteration with a size given
/// in config will be written to a single row group.
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<SstInfo> {
let projected_schema = self.iter.schema();
let store_schema = projected_schema.schema_to_read();
let schema = store_schema.arrow_schema().clone();
@@ -109,14 +115,101 @@ impl<'a> ParquetWriter<'a> {
.write(&arrow_batch)
.context(WriteParquetSnafu)?;
}
arrow_writer.close().context(WriteParquetSnafu)?;
let file_meta = arrow_writer.close().context(WriteParquetSnafu)?;
let (start_timestamp, end_timestamp) =
match decode_timestamp_range(&file_meta, store_schema) {
Ok(Some((start, end))) => (Some(start), Some(end)),
Ok(None) => (None, None),
Err(e) => {
error!(e;"Failed to calculate time range of parquet file");
(None, None)
}
};
object.write(buf).await.context(WriteObjectSnafu {
path: object.path(),
})?;
Ok(())
Ok(SstInfo {
start_timestamp,
end_timestamp,
})
}
}
fn decode_timestamp_range(
file_meta: &FileMetaData,
store_schema: &StoreSchemaRef,
) -> Result<Option<(Timestamp, Timestamp)>> {
let schema = store_schema.schema();
let (Some(ts_col_idx), Some(ts_col)) = (schema.timestamp_index(), schema.timestamp_column()) else { return Ok(None); };
let ts_datatype = &ts_col.data_type;
decode_timestamp_range_inner(file_meta, ts_col_idx, ts_datatype)
}
fn decode_timestamp_range_inner(
file_meta: &FileMetaData,
ts_index: usize,
ts_datatype: &ConcreteDataType,
) -> Result<Option<(Timestamp, Timestamp)>> {
let mut start = i64::MAX;
let mut end = i64::MIN;
let unit = match ts_datatype {
ConcreteDataType::Int64(_) => TimeUnit::Millisecond,
ConcreteDataType::Timestamp(type_) => type_.unit(),
_ => {
return DecodeParquetTimeRangeSnafu {
msg: format!("Unexpected timestamp column datatype: {ts_datatype:?}"),
}
.fail();
}
};
for rg in &file_meta.row_groups {
let Some(ref metadata) = rg
.columns
.get(ts_index)
.context(DecodeParquetTimeRangeSnafu {
msg: format!("Cannot find ts column by index: {ts_index}"),
})?
.meta_data else { return Ok(None) };
let Some(stats) = &metadata.statistics else { return Ok(None) };
let (Some(min_value), Some(max_value)) = (&stats.min_value, &stats.max_value) else { return Ok(None); };
// according to [parquet's spec](https://parquet.apache.org/docs/file-format/data-pages/encodings/), min/max value in stats uses plain encoding with little endian.
// also see https://github.com/apache/arrow-rs/blob/5fb337db04a1a19f7d40da46f19b7b5fd4051593/parquet/src/file/statistics.rs#L172
let min = i64::from_le_bytes(min_value[..8].try_into().map_err(|e| {
error!(
"Failed to decode min value from stats, bytes: {:?}, source: {:?}",
min_value, e
);
DecodeParquetTimeRangeSnafu {
msg: "decode min value",
}
.build()
})?);
let max = i64::from_le_bytes(max_value[..8].try_into().map_err(|e| {
error!(
"Failed to decode max value from stats, bytes: {:?}, source: {:?}",
max_value, e
);
DecodeParquetTimeRangeSnafu {
msg: "decode max value",
}
.build()
})?);
start = start.min(min);
end = end.max(max);
}
Ok(Some((
Timestamp::new(start, unit),
Timestamp::new(end, unit),
)))
}
pub struct ParquetReader<'a> {
file_path: &'a str,
object_store: ObjectStore,
@@ -374,11 +467,20 @@ mod tests {
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone());
writer
let SstInfo {
start_timestamp,
end_timestamp,
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
assert_eq!(Some(Timestamp::new_millisecond(0)), start_timestamp);
assert_eq!(
Some(Timestamp::new_millisecond((rows_total - 1) as i64)),
end_timestamp
);
let operator = ObjectStore::new(
object_store::backend::fs::Builder::default()
.root(dir.path().to_str().unwrap())
@@ -438,11 +540,17 @@ mod tests {
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone());
writer
let SstInfo {
start_timestamp,
end_timestamp,
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
assert_eq!(Some(Timestamp::new_millisecond(1000)), start_timestamp);
assert_eq!(Some(Timestamp::new_millisecond(2003)), end_timestamp);
let operator = ObjectStore::new(
object_store::backend::fs::Builder::default()
.root(dir.path().to_str().unwrap())