diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 9dacad49fd..b461d4ef39 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -218,8 +218,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to convert sst schema, file: {}, source: {}", file, source))] - ConvertSstSchema { + #[snafu(display("Failed to convert store schema, file: {}, source: {}", file, source))] + ConvertStoreSchema { file: String, #[snafu(backtrace)] source: crate::schema::Error, @@ -265,7 +265,7 @@ impl ErrorExt for Error { | WalDataCorrupted { .. } | VersionNotFound { .. } | SequenceNotMonotonic { .. } - | ConvertSstSchema { .. } + | ConvertStoreSchema { .. } | InvalidRawRegion { .. } => StatusCode::Unexpected, FlushIo { .. } diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index e7d720784f..6b7eee1b6d 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -93,8 +93,8 @@ pub struct RegionSchema { /// columns order in [ColumnsMetadata] to ensure the projection index of a field /// is correct. user_schema: SchemaRef, - /// SST schema contains all columns of the region, including all internal columns. - sst_schema: SstSchema, + /// store schema contains all columns of the region, including all internal columns. + store_schema: StoreSchema, /// Metadata of columns. columns: ColumnsMetadataRef, } @@ -102,14 +102,14 @@ pub struct RegionSchema { impl RegionSchema { pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { let user_schema = Arc::new(build_user_schema(&columns, version)?); - let sst_schema = SstSchema::from_columns_metadata(&columns, version)?; + let store_schema = StoreSchema::from_columns_metadata(&columns, version)?; - debug_assert_eq!(user_schema.version(), sst_schema.version()); + debug_assert_eq!(user_schema.version(), store_schema.version()); debug_assert_eq!(version, user_schema.version()); Ok(RegionSchema { user_schema, - sst_schema, + store_schema, columns, }) } @@ -121,10 +121,10 @@ impl RegionSchema { &self.user_schema } - /// Returns the schema for sst, which contains all columns used by the region. + /// Returns the schema actually stores, which would also contains all internal columns. #[inline] - pub fn sst_schema(&self) -> &SstSchema { - &self.sst_schema + pub fn store_schema(&self) -> &StoreSchema { + &self.store_schema } #[inline] @@ -154,17 +154,17 @@ impl RegionSchema { #[inline] fn sequence_index(&self) -> usize { - self.sst_schema.sequence_index() + self.store_schema.sequence_index() } #[inline] fn op_type_index(&self) -> usize { - self.sst_schema.op_type_index() + self.store_schema.op_type_index() } #[inline] fn row_key_indices(&self) -> impl Iterator { - self.sst_schema.row_key_indices() + self.store_schema.row_key_indices() } #[inline] @@ -180,19 +180,19 @@ impl RegionSchema { pub type RegionSchemaRef = Arc; -// TODO(yingwen): Now this schema in not only used by SST, maybe rename it to InternalSchema -// or something else. -/// Schema of SST. +/// Schema for storage engine. +/// +/// Used internally, contains all row key columns, internal columns and parts of value columns. /// /// Only contains a reference to schema and some indices, so it should be cheap to clone. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SstSchema { +pub struct StoreSchema { schema: SchemaRef, row_key_end: usize, user_column_end: usize, } -impl SstSchema { +impl StoreSchema { #[inline] pub fn version(&self) -> u32 { self.schema.version() @@ -230,13 +230,13 @@ impl SstSchema { Ok(Batch::new(columns)) } - fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result { + fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result { let column_schemas: Vec<_> = columns .iter_all_columns() .map(|col| ColumnSchema::from(&col.desc)) .collect(); - SstSchema::new( + StoreSchema::new( column_schemas, version, columns.timestamp_key_index(), @@ -251,7 +251,7 @@ impl SstSchema { timestamp_key_index: usize, row_key_end: usize, user_column_end: usize, - ) -> Result { + ) -> Result { let schema = SchemaBuilder::from(column_schemas) .timestamp_index(timestamp_key_index) .version(version) @@ -269,7 +269,7 @@ impl SstSchema { schema.column_schemas()[user_column_end + 1].name ); - Ok(SstSchema { + Ok(StoreSchema { schema: Arc::new(schema), row_key_end, user_column_end, @@ -302,10 +302,10 @@ impl SstSchema { } } -impl TryFrom for SstSchema { +impl TryFrom for StoreSchema { type Error = Error; - fn try_from(arrow_schema: ArrowSchema) -> Result { + fn try_from(arrow_schema: ArrowSchema) -> Result { let schema = Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?; // Recover other metadata from schema. let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?; @@ -321,7 +321,7 @@ impl TryFrom for SstSchema { InvalidIndexSnafu ); - Ok(SstSchema { + Ok(StoreSchema { schema: Arc::new(schema), row_key_end, user_column_end, @@ -408,7 +408,7 @@ pub struct ProjectedSchema { /// Projection info, `None` means don't need to do projection. projection: Option, /// Schema used to read from data sources. - schema_to_read: SstSchema, + schema_to_read: StoreSchema, /// User schema after projection. projected_user_schema: SchemaRef, } @@ -448,10 +448,10 @@ impl ProjectedSchema { /// Create a `ProjectedSchema` that read all columns. pub fn no_projection(region_schema: RegionSchemaRef) -> ProjectedSchema { - // We could just reuse the SstSchema and user schema. + // We could just reuse the StoreSchema and user schema. ProjectedSchema { projection: None, - schema_to_read: region_schema.sst_schema().clone(), + schema_to_read: region_schema.store_schema().clone(), projected_user_schema: region_schema.user_schema().clone(), } } @@ -462,7 +462,7 @@ impl ProjectedSchema { } #[inline] - pub fn schema_to_read(&self) -> &SstSchema { + pub fn schema_to_read(&self) -> &StoreSchema { &self.schema_to_read } @@ -533,7 +533,7 @@ impl ProjectedSchema { fn build_schema_to_read( region_schema: &RegionSchema, projection: &Projection, - ) -> Result { + ) -> Result { let column_schemas: Vec<_> = projection .columns_to_read .iter() @@ -541,7 +541,7 @@ impl ProjectedSchema { .collect(); // All row key columns are reserved in this schema, so we can use the row_key_end // and timestamp_key_index from region schema. - SstSchema::new( + StoreSchema::new( column_schemas, region_schema.version(), region_schema.timestamp_key_index(), @@ -584,7 +584,7 @@ impl ProjectedSchema { fn validate_projection(region_schema: &RegionSchema, indices: &[usize]) -> Result<()> { // The projection indices should not be empty, at least the timestamp column - // should be always read, and the `SstSchema` also requires the timestamp column. + // should be always read, and the `StoreSchema` also requires the timestamp column. ensure!( !indices.is_empty(), InvalidProjectionSnafu { @@ -616,7 +616,7 @@ fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { value.parse().context(ParseIndexSnafu { value }) } -// Now user schema don't have extra metadata like sst schema. +// Now user schema don't have extra metadata like store schema. fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result { let column_schemas: Vec<_> = columns .iter_user_columns() @@ -705,14 +705,14 @@ mod tests { // Checks version. assert_eq!(123, region_schema.version()); - assert_eq!(123, region_schema.sst_schema().version()); + assert_eq!(123, region_schema.store_schema().version()); - // Checks SstSchema. - let sst_schema = region_schema.sst_schema(); - let sst_arrow_schema = sst_schema.arrow_schema(); - let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap(); + // Checks StoreSchema. + let store_schema = region_schema.store_schema(); + let sst_arrow_schema = store_schema.arrow_schema(); + let converted_store_schema = StoreSchema::try_from((**sst_arrow_schema).clone()).unwrap(); - assert_eq!(*sst_schema, converted_sst_schema); + assert_eq!(*store_schema, converted_store_schema); let expect_schema = schema_util::new_schema_with_version( &[ @@ -727,21 +727,21 @@ mod tests { ); assert_eq!( expect_schema.column_schemas(), - sst_schema.schema().column_schemas() + store_schema.schema().column_schemas() ); - assert_eq!(3, sst_schema.sequence_index()); - assert_eq!(4, sst_schema.op_type_index()); - let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect(); + assert_eq!(3, store_schema.sequence_index()); + assert_eq!(4, store_schema.op_type_index()); + let row_key_indices: Vec<_> = store_schema.row_key_indices().collect(); assert_eq!([0, 1], &row_key_indices[..]); // Test batch and chunk conversion. let batch = new_batch(); // Convert batch to chunk. - let chunk = sst_schema.batch_to_arrow_chunk(&batch); + let chunk = store_schema.batch_to_arrow_chunk(&batch); check_chunk_batch(&chunk, &batch); // Convert chunk to batch. - let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap(); + let converted_batch = store_schema.arrow_chunk_to_batch(&chunk).unwrap(); check_chunk_batch(&chunk, &converted_batch); } @@ -826,11 +826,11 @@ mod tests { let projected_schema = ProjectedSchema::new(region_schema, Some(vec![2, 1])).unwrap(); // The schema to read should be same as region schema with (k0, timestamp, v0). - // We can't use `new_schema_with_version()` because the SstSchema also store other + // We can't use `new_schema_with_version()` because the StoreSchema also store other // metadata that `new_schema_with_version()` can't store. let expect_schema = new_region_schema(123, 1); assert_eq!( - expect_schema.sst_schema(), + expect_schema.store_schema(), projected_schema.schema_to_read() ); @@ -879,7 +879,7 @@ mod tests { projected_schema.projected_user_schema() ); assert_eq!( - region_schema.sst_schema(), + region_schema.store_schema(), projected_schema.schema_to_read() ); diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 2afd7e0050..dff35e3137 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -23,7 +23,7 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; -use crate::schema::{ProjectedSchemaRef, SstSchema}; +use crate::schema::{ProjectedSchemaRef, StoreSchema}; use crate::sst; /// Parquet sst writer. @@ -55,8 +55,8 @@ impl<'a> ParquetWriter<'a> { /// in config will be written to a single row group. async fn write_rows(self, extra_meta: Option>) -> Result<()> { let projected_schema = self.iter.schema(); - let sst_schema = projected_schema.schema_to_read(); - let schema = sst_schema.arrow_schema(); + let store_schema = projected_schema.schema_to_read(); + let schema = store_schema.arrow_schema(); let object = self.object_store.object(self.file_path); // FIXME(hl): writer size is not used in fs backend so just leave it to 0, @@ -82,7 +82,7 @@ impl<'a> ParquetWriter<'a> { for batch in self.iter { let batch = batch?; - sink.send(sst_schema.batch_to_arrow_chunk(&batch)) + sink.send(store_schema.batch_to_arrow_chunk(&batch)) .await .context(error::WriteParquetSnafu)?; } @@ -193,10 +193,10 @@ impl<'a> ParquetReader<'a> { .context(error::ReadParquetSnafu { file: &file_path })?; let arrow_schema = infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?; - // Now the SstSchema is only used to validate metadata of the parquet file, but this schema + // Now the StoreSchema is only used to validate metadata of the parquet file, but this schema // would be useful once we support altering schema, as this is the actual schema of the SST. - let _sst_schema = SstSchema::try_from(arrow_schema) - .context(error::ConvertSstSchemaSnafu { file: &file_path })?; + let _store_schema = StoreSchema::try_from(arrow_schema) + .context(error::ConvertStoreSchemaSnafu { file: &file_path })?; let projected_fields = self.projected_fields().to_vec(); let chunk_stream = try_stream!({