refactor: Rename SstSchema to StoreSchema (#204)

This commit is contained in:
evenyag
2022-08-25 17:43:10 +08:00
committed by GitHub
parent 53637c90fd
commit 793caa8d44
3 changed files with 56 additions and 56 deletions

View File

@@ -218,8 +218,8 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to convert sst schema, file: {}, source: {}", file, source))]
ConvertSstSchema {
#[snafu(display("Failed to convert store schema, file: {}, source: {}", file, source))]
ConvertStoreSchema {
file: String,
#[snafu(backtrace)]
source: crate::schema::Error,
@@ -265,7 +265,7 @@ impl ErrorExt for Error {
| WalDataCorrupted { .. }
| VersionNotFound { .. }
| SequenceNotMonotonic { .. }
| ConvertSstSchema { .. }
| ConvertStoreSchema { .. }
| InvalidRawRegion { .. } => StatusCode::Unexpected,
FlushIo { .. }

View File

@@ -93,8 +93,8 @@ pub struct RegionSchema {
/// columns order in [ColumnsMetadata] to ensure the projection index of a field
/// is correct.
user_schema: SchemaRef,
/// SST schema contains all columns of the region, including all internal columns.
sst_schema: SstSchema,
/// store schema contains all columns of the region, including all internal columns.
store_schema: StoreSchema,
/// Metadata of columns.
columns: ColumnsMetadataRef,
}
@@ -102,14 +102,14 @@ pub struct RegionSchema {
impl RegionSchema {
pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result<RegionSchema> {
let user_schema = Arc::new(build_user_schema(&columns, version)?);
let sst_schema = SstSchema::from_columns_metadata(&columns, version)?;
let store_schema = StoreSchema::from_columns_metadata(&columns, version)?;
debug_assert_eq!(user_schema.version(), sst_schema.version());
debug_assert_eq!(user_schema.version(), store_schema.version());
debug_assert_eq!(version, user_schema.version());
Ok(RegionSchema {
user_schema,
sst_schema,
store_schema,
columns,
})
}
@@ -121,10 +121,10 @@ impl RegionSchema {
&self.user_schema
}
/// Returns the schema for sst, which contains all columns used by the region.
/// Returns the schema actually stores, which would also contains all internal columns.
#[inline]
pub fn sst_schema(&self) -> &SstSchema {
&self.sst_schema
pub fn store_schema(&self) -> &StoreSchema {
&self.store_schema
}
#[inline]
@@ -154,17 +154,17 @@ impl RegionSchema {
#[inline]
fn sequence_index(&self) -> usize {
self.sst_schema.sequence_index()
self.store_schema.sequence_index()
}
#[inline]
fn op_type_index(&self) -> usize {
self.sst_schema.op_type_index()
self.store_schema.op_type_index()
}
#[inline]
fn row_key_indices(&self) -> impl Iterator<Item = usize> {
self.sst_schema.row_key_indices()
self.store_schema.row_key_indices()
}
#[inline]
@@ -180,19 +180,19 @@ impl RegionSchema {
pub type RegionSchemaRef = Arc<RegionSchema>;
// TODO(yingwen): Now this schema in not only used by SST, maybe rename it to InternalSchema
// or something else.
/// Schema of SST.
/// Schema for storage engine.
///
/// Used internally, contains all row key columns, internal columns and parts of value columns.
///
/// Only contains a reference to schema and some indices, so it should be cheap to clone.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SstSchema {
pub struct StoreSchema {
schema: SchemaRef,
row_key_end: usize,
user_column_end: usize,
}
impl SstSchema {
impl StoreSchema {
#[inline]
pub fn version(&self) -> u32 {
self.schema.version()
@@ -230,13 +230,13 @@ impl SstSchema {
Ok(Batch::new(columns))
}
fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result<SstSchema> {
fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result<StoreSchema> {
let column_schemas: Vec<_> = columns
.iter_all_columns()
.map(|col| ColumnSchema::from(&col.desc))
.collect();
SstSchema::new(
StoreSchema::new(
column_schemas,
version,
columns.timestamp_key_index(),
@@ -251,7 +251,7 @@ impl SstSchema {
timestamp_key_index: usize,
row_key_end: usize,
user_column_end: usize,
) -> Result<SstSchema> {
) -> Result<StoreSchema> {
let schema = SchemaBuilder::from(column_schemas)
.timestamp_index(timestamp_key_index)
.version(version)
@@ -269,7 +269,7 @@ impl SstSchema {
schema.column_schemas()[user_column_end + 1].name
);
Ok(SstSchema {
Ok(StoreSchema {
schema: Arc::new(schema),
row_key_end,
user_column_end,
@@ -302,10 +302,10 @@ impl SstSchema {
}
}
impl TryFrom<ArrowSchema> for SstSchema {
impl TryFrom<ArrowSchema> for StoreSchema {
type Error = Error;
fn try_from(arrow_schema: ArrowSchema) -> Result<SstSchema> {
fn try_from(arrow_schema: ArrowSchema) -> Result<StoreSchema> {
let schema = Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?;
// Recover other metadata from schema.
let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?;
@@ -321,7 +321,7 @@ impl TryFrom<ArrowSchema> for SstSchema {
InvalidIndexSnafu
);
Ok(SstSchema {
Ok(StoreSchema {
schema: Arc::new(schema),
row_key_end,
user_column_end,
@@ -408,7 +408,7 @@ pub struct ProjectedSchema {
/// Projection info, `None` means don't need to do projection.
projection: Option<Projection>,
/// Schema used to read from data sources.
schema_to_read: SstSchema,
schema_to_read: StoreSchema,
/// User schema after projection.
projected_user_schema: SchemaRef,
}
@@ -448,10 +448,10 @@ impl ProjectedSchema {
/// Create a `ProjectedSchema` that read all columns.
pub fn no_projection(region_schema: RegionSchemaRef) -> ProjectedSchema {
// We could just reuse the SstSchema and user schema.
// We could just reuse the StoreSchema and user schema.
ProjectedSchema {
projection: None,
schema_to_read: region_schema.sst_schema().clone(),
schema_to_read: region_schema.store_schema().clone(),
projected_user_schema: region_schema.user_schema().clone(),
}
}
@@ -462,7 +462,7 @@ impl ProjectedSchema {
}
#[inline]
pub fn schema_to_read(&self) -> &SstSchema {
pub fn schema_to_read(&self) -> &StoreSchema {
&self.schema_to_read
}
@@ -533,7 +533,7 @@ impl ProjectedSchema {
fn build_schema_to_read(
region_schema: &RegionSchema,
projection: &Projection,
) -> Result<SstSchema> {
) -> Result<StoreSchema> {
let column_schemas: Vec<_> = projection
.columns_to_read
.iter()
@@ -541,7 +541,7 @@ impl ProjectedSchema {
.collect();
// All row key columns are reserved in this schema, so we can use the row_key_end
// and timestamp_key_index from region schema.
SstSchema::new(
StoreSchema::new(
column_schemas,
region_schema.version(),
region_schema.timestamp_key_index(),
@@ -584,7 +584,7 @@ impl ProjectedSchema {
fn validate_projection(region_schema: &RegionSchema, indices: &[usize]) -> Result<()> {
// The projection indices should not be empty, at least the timestamp column
// should be always read, and the `SstSchema` also requires the timestamp column.
// should be always read, and the `StoreSchema` also requires the timestamp column.
ensure!(
!indices.is_empty(),
InvalidProjectionSnafu {
@@ -616,7 +616,7 @@ fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result<usize> {
value.parse().context(ParseIndexSnafu { value })
}
// Now user schema don't have extra metadata like sst schema.
// Now user schema don't have extra metadata like store schema.
fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema> {
let column_schemas: Vec<_> = columns
.iter_user_columns()
@@ -705,14 +705,14 @@ mod tests {
// Checks version.
assert_eq!(123, region_schema.version());
assert_eq!(123, region_schema.sst_schema().version());
assert_eq!(123, region_schema.store_schema().version());
// Checks SstSchema.
let sst_schema = region_schema.sst_schema();
let sst_arrow_schema = sst_schema.arrow_schema();
let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap();
// Checks StoreSchema.
let store_schema = region_schema.store_schema();
let sst_arrow_schema = store_schema.arrow_schema();
let converted_store_schema = StoreSchema::try_from((**sst_arrow_schema).clone()).unwrap();
assert_eq!(*sst_schema, converted_sst_schema);
assert_eq!(*store_schema, converted_store_schema);
let expect_schema = schema_util::new_schema_with_version(
&[
@@ -727,21 +727,21 @@ mod tests {
);
assert_eq!(
expect_schema.column_schemas(),
sst_schema.schema().column_schemas()
store_schema.schema().column_schemas()
);
assert_eq!(3, sst_schema.sequence_index());
assert_eq!(4, sst_schema.op_type_index());
let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect();
assert_eq!(3, store_schema.sequence_index());
assert_eq!(4, store_schema.op_type_index());
let row_key_indices: Vec<_> = store_schema.row_key_indices().collect();
assert_eq!([0, 1], &row_key_indices[..]);
// Test batch and chunk conversion.
let batch = new_batch();
// Convert batch to chunk.
let chunk = sst_schema.batch_to_arrow_chunk(&batch);
let chunk = store_schema.batch_to_arrow_chunk(&batch);
check_chunk_batch(&chunk, &batch);
// Convert chunk to batch.
let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap();
let converted_batch = store_schema.arrow_chunk_to_batch(&chunk).unwrap();
check_chunk_batch(&chunk, &converted_batch);
}
@@ -826,11 +826,11 @@ mod tests {
let projected_schema = ProjectedSchema::new(region_schema, Some(vec![2, 1])).unwrap();
// The schema to read should be same as region schema with (k0, timestamp, v0).
// We can't use `new_schema_with_version()` because the SstSchema also store other
// We can't use `new_schema_with_version()` because the StoreSchema also store other
// metadata that `new_schema_with_version()` can't store.
let expect_schema = new_region_schema(123, 1);
assert_eq!(
expect_schema.sst_schema(),
expect_schema.store_schema(),
projected_schema.schema_to_read()
);
@@ -879,7 +879,7 @@ mod tests {
projected_schema.projected_user_schema()
);
assert_eq!(
region_schema.sst_schema(),
region_schema.store_schema(),
projected_schema.schema_to_read()
);

View File

@@ -23,7 +23,7 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchReader};
use crate::schema::{ProjectedSchemaRef, SstSchema};
use crate::schema::{ProjectedSchemaRef, StoreSchema};
use crate::sst;
/// Parquet sst writer.
@@ -55,8 +55,8 @@ impl<'a> ParquetWriter<'a> {
/// in config will be written to a single row group.
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
let projected_schema = self.iter.schema();
let sst_schema = projected_schema.schema_to_read();
let schema = sst_schema.arrow_schema();
let store_schema = projected_schema.schema_to_read();
let schema = store_schema.arrow_schema();
let object = self.object_store.object(self.file_path);
// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
@@ -82,7 +82,7 @@ impl<'a> ParquetWriter<'a> {
for batch in self.iter {
let batch = batch?;
sink.send(sst_schema.batch_to_arrow_chunk(&batch))
sink.send(store_schema.batch_to_arrow_chunk(&batch))
.await
.context(error::WriteParquetSnafu)?;
}
@@ -193,10 +193,10 @@ impl<'a> ParquetReader<'a> {
.context(error::ReadParquetSnafu { file: &file_path })?;
let arrow_schema =
infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?;
// Now the SstSchema is only used to validate metadata of the parquet file, but this schema
// Now the StoreSchema is only used to validate metadata of the parquet file, but this schema
// would be useful once we support altering schema, as this is the actual schema of the SST.
let _sst_schema = SstSchema::try_from(arrow_schema)
.context(error::ConvertSstSchemaSnafu { file: &file_path })?;
let _store_schema = StoreSchema::try_from(arrow_schema)
.context(error::ConvertStoreSchemaSnafu { file: &file_path })?;
let projected_fields = self.projected_fields().to_vec();
let chunk_stream = try_stream!({