mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 21:32:58 +00:00
feat: Holds ColumnMetadata in StoreSchema (#333)
* chore: Update StoreSchema comment * feat: Add metadata to ColumnSchema * feat: Impl conversion between ColumnMetadata and ColumnSchema We could use this feature to store the ColumnMetadata as arrow's Schema, since the ColumnSchema could be further converted to an arrow schema. Then we could use ColumnMetadata in StoreSchema, which contains more information, especially the column id. * feat(storage): Merge schema::Error to metadata::Error To avoid cyclic dependency of two Errors * feat(storage): Store ColumnMetadata in StoreSchema * feat(storage): Use StoreSchemaRef to avoid cloning the whole StoreSchema struct * test(storage): Fix test_store_schema * feat(datatypes): Return error on duplicate meta key * chore: Address CR comments
This commit is contained in:
@@ -75,6 +75,9 @@ pub enum Error {
|
||||
reason: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Duplicated metadata for {}", key))]
|
||||
DuplicateMeta { key: String, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
mod constraint;
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use arrow::datatypes::Metadata;
|
||||
@@ -25,12 +25,14 @@ const VERSION_KEY: &str = "greptime:version";
|
||||
/// Key used to store default constraint in arrow field's metadata.
|
||||
const ARROW_FIELD_DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
|
||||
|
||||
/// Schema of a column, used as an immutable struct.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ColumnSchema {
|
||||
pub name: String,
|
||||
pub data_type: ConcreteDataType,
|
||||
is_nullable: bool,
|
||||
default_constraint: Option<ColumnDefaultConstraint>,
|
||||
metadata: Metadata,
|
||||
}
|
||||
|
||||
impl ColumnSchema {
|
||||
@@ -44,6 +46,7 @@ impl ColumnSchema {
|
||||
data_type,
|
||||
is_nullable,
|
||||
default_constraint: None,
|
||||
metadata: Metadata::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +60,11 @@ impl ColumnSchema {
|
||||
self.default_constraint.as_ref()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn metadata(&self) -> &Metadata {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
pub fn with_default_constraint(
|
||||
mut self,
|
||||
default_constraint: Option<ColumnDefaultConstraint>,
|
||||
@@ -69,6 +77,12 @@ impl ColumnSchema {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Creates a new [`ColumnSchema`] with given metadata.
|
||||
pub fn with_metadata(mut self, metadata: Metadata) -> Self {
|
||||
self.metadata = metadata;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_default_vector(&self, num_rows: usize) -> Result<Option<VectorRef>> {
|
||||
match &self.default_constraint {
|
||||
Some(c) => c
|
||||
@@ -303,8 +317,9 @@ impl TryFrom<&Field> for ColumnSchema {
|
||||
|
||||
fn try_from(field: &Field) -> Result<ColumnSchema> {
|
||||
let data_type = ConcreteDataType::try_from(&field.data_type)?;
|
||||
let default_constraint = match field.metadata.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) {
|
||||
Some(json) => Some(serde_json::from_str(json).context(DeserializeSnafu { json })?),
|
||||
let mut metadata = field.metadata.clone();
|
||||
let default_constraint = match metadata.remove(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) {
|
||||
Some(json) => Some(serde_json::from_str(&json).context(DeserializeSnafu { json })?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
@@ -313,6 +328,7 @@ impl TryFrom<&Field> for ColumnSchema {
|
||||
data_type,
|
||||
is_nullable: field.is_nullable,
|
||||
default_constraint,
|
||||
metadata,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -321,16 +337,21 @@ impl TryFrom<&ColumnSchema> for Field {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(column_schema: &ColumnSchema) -> Result<Field> {
|
||||
let metadata = if let Some(value) = &column_schema.default_constraint {
|
||||
let mut m = BTreeMap::new();
|
||||
m.insert(
|
||||
let mut metadata = column_schema.metadata.clone();
|
||||
if let Some(value) = &column_schema.default_constraint {
|
||||
// Adds an additional metadata to store the default constraint.
|
||||
let old = metadata.insert(
|
||||
ARROW_FIELD_DEFAULT_CONSTRAINT_KEY.to_string(),
|
||||
serde_json::to_string(&value).context(SerializeSnafu)?,
|
||||
);
|
||||
m
|
||||
} else {
|
||||
BTreeMap::default()
|
||||
};
|
||||
|
||||
ensure!(
|
||||
old.is_none(),
|
||||
error::DuplicateMetaSnafu {
|
||||
key: ARROW_FIELD_DEFAULT_CONSTRAINT_KEY,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Field::new(
|
||||
column_schema.name.clone(),
|
||||
@@ -420,6 +441,11 @@ mod tests {
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))))
|
||||
.unwrap();
|
||||
assert!(column_schema
|
||||
.metadata()
|
||||
.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY)
|
||||
.is_none());
|
||||
|
||||
let field = Field::try_from(&column_schema).unwrap();
|
||||
assert_eq!("test", field.name);
|
||||
assert_eq!(ArrowDataType::Int32, field.data_type);
|
||||
@@ -436,6 +462,45 @@ mod tests {
|
||||
assert_eq!(column_schema, new_column_schema);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_with_metadata() {
|
||||
let mut metadata = Metadata::new();
|
||||
metadata.insert("k1".to_string(), "v1".to_string());
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_metadata(metadata)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.unwrap();
|
||||
assert_eq!("v1", column_schema.metadata().get("k1").unwrap());
|
||||
assert!(column_schema
|
||||
.metadata()
|
||||
.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY)
|
||||
.is_none());
|
||||
|
||||
let field = Field::try_from(&column_schema).unwrap();
|
||||
assert_eq!("v1", field.metadata.get("k1").unwrap());
|
||||
assert!(field
|
||||
.metadata
|
||||
.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY)
|
||||
.is_some());
|
||||
|
||||
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
|
||||
assert_eq!(column_schema, new_column_schema);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_with_duplicate_metadata() {
|
||||
let mut metadata = Metadata::new();
|
||||
metadata.insert(
|
||||
ARROW_FIELD_DEFAULT_CONSTRAINT_KEY.to_string(),
|
||||
"v1".to_string(),
|
||||
);
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_metadata(metadata)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.unwrap();
|
||||
Field::try_from(&column_schema).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_invalid_default_constraint() {
|
||||
ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
|
||||
|
||||
@@ -179,7 +179,7 @@ pub enum Error {
|
||||
#[snafu(display("Parquet file schema is invalid, source: {}", source))]
|
||||
InvalidParquetSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: crate::schema::Error,
|
||||
source: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display("Region is under {} state, cannot proceed operation", state))]
|
||||
@@ -223,7 +223,7 @@ pub enum Error {
|
||||
ConvertStoreSchema {
|
||||
file: String,
|
||||
#[snafu(backtrace)]
|
||||
source: crate::schema::Error,
|
||||
source: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid raw region metadata, region: {}, source: {}", region, source))]
|
||||
@@ -236,7 +236,7 @@ pub enum Error {
|
||||
#[snafu(display("Invalid projection, source: {}", source))]
|
||||
InvalidProjection {
|
||||
#[snafu(backtrace)]
|
||||
source: crate::schema::Error,
|
||||
source: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to push data to batch builder, source: {}", source))]
|
||||
@@ -295,6 +295,15 @@ pub enum Error {
|
||||
version: u32,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to convert between ColumnSchema and ColumnMetadata, source: {}",
|
||||
source
|
||||
))]
|
||||
ConvertColumnSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: MetadataError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -345,9 +354,9 @@ impl ErrorExt for Error {
|
||||
| InvalidRegionState { .. }
|
||||
| ReadWal { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
InvalidAlterRequest { source, .. }
|
||||
| InvalidRegionDesc { source, .. }
|
||||
| ConvertColumnSchema { source, .. } => source.status_code(),
|
||||
PushBatch { source, .. } => source.status_code(),
|
||||
AddDefault { source, .. } => source.status_code(),
|
||||
}
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::num::ParseIntError;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Metadata};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::storage::{
|
||||
consts::{self, ReservedColumnId},
|
||||
AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder,
|
||||
ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId,
|
||||
RegionDescriptor, RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor,
|
||||
RowKeyDescriptorBuilder, Schema, SchemaRef,
|
||||
ColumnDescriptorBuilderError, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder,
|
||||
ColumnFamilyId, ColumnId, RegionDescriptor, RegionDescriptorBuilder, RegionId, RegionMeta,
|
||||
RowKeyDescriptor, RowKeyDescriptorBuilder, Schema, SchemaRef,
|
||||
};
|
||||
|
||||
use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata};
|
||||
@@ -18,6 +21,7 @@ use crate::schema::{RegionSchema, RegionSchemaRef};
|
||||
|
||||
/// Error for handling metadata.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub(crate)))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Column name {} already exists", name))]
|
||||
ColNameExists { name: String, backtrace: Backtrace },
|
||||
@@ -34,7 +38,7 @@ pub enum Error {
|
||||
#[snafu(display("Failed to build schema, source: {}", source))]
|
||||
InvalidSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: crate::schema::Error,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Column name {} is reserved by the system", name))]
|
||||
@@ -64,7 +68,63 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Failed to drop column {} as it is an internal column", name))]
|
||||
DropInternalColumn { name: String },
|
||||
|
||||
// End of variants for validating `AlterRequest`.
|
||||
#[snafu(display("Failed to convert to column schema, source: {}", source))]
|
||||
ToColumnSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to parse metadata to int, key_value: {}, source: {}",
|
||||
key_value,
|
||||
source
|
||||
))]
|
||||
ParseMetaInt {
|
||||
// Store key and value in one string to reduce the enum size.
|
||||
key_value: String,
|
||||
source: std::num::ParseIntError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Metadata of {} not found", key))]
|
||||
MetaNotFound { key: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to build column descriptor, source: {}", source))]
|
||||
BuildColumnDescriptor {
|
||||
source: ColumnDescriptorBuilderError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert from arrow schema, source: {}", source))]
|
||||
ConvertArrowSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid internal column index in arrow schema"))]
|
||||
InvalidIndex { backtrace: Backtrace },
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to convert arrow chunk to batch, name: {}, source: {}",
|
||||
name,
|
||||
source
|
||||
))]
|
||||
ConvertChunk {
|
||||
name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert schema, source: {}", source))]
|
||||
ConvertSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid projection, {}", msg))]
|
||||
InvalidProjection { msg: String, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -285,8 +345,7 @@ impl TryFrom<RawRegionMetadata> for RegionMetadata {
|
||||
|
||||
fn try_from(raw: RawRegionMetadata) -> Result<RegionMetadata> {
|
||||
let columns = Arc::new(ColumnsMetadata::from(raw.columns));
|
||||
let schema =
|
||||
Arc::new(RegionSchema::new(columns.clone(), raw.version).context(InvalidSchemaSnafu)?);
|
||||
let schema = Arc::new(RegionSchema::new(columns.clone(), raw.version)?);
|
||||
|
||||
Ok(RegionMetadata {
|
||||
id: raw.id,
|
||||
@@ -299,6 +358,10 @@ impl TryFrom<RawRegionMetadata> for RegionMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
const METADATA_CF_ID_KEY: &str = "greptime:storage:cf_id";
|
||||
const METADATA_COLUMN_ID_KEY: &str = "greptime:storage:column_id";
|
||||
const METADATA_COMMENT_KEY: &str = "greptime:storage:comment";
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ColumnMetadata {
|
||||
pub cf_id: ColumnFamilyId,
|
||||
@@ -315,6 +378,67 @@ impl ColumnMetadata {
|
||||
pub fn name(&self) -> &str {
|
||||
&self.desc.name
|
||||
}
|
||||
|
||||
/// Convert `self` to [`ColumnSchema`] for building a [`StoreSchema`](crate::schema::StoreSchema). This
|
||||
/// would store additional metadatas to the ColumnSchema.
|
||||
pub fn to_column_schema(&self) -> Result<ColumnSchema> {
|
||||
let desc = &self.desc;
|
||||
ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable())
|
||||
.with_metadata(self.to_metadata())
|
||||
.with_default_constraint(desc.default_constraint().cloned())
|
||||
.context(ToColumnSchemaSnafu)
|
||||
}
|
||||
|
||||
/// Convert [`ColumnSchema`] in [`StoreSchema`](crate::schema::StoreSchema) to [`ColumnMetadata`].
|
||||
pub fn from_column_schema(column_schema: &ColumnSchema) -> Result<ColumnMetadata> {
|
||||
let metadata = column_schema.metadata();
|
||||
let cf_id = try_parse_int(metadata, METADATA_CF_ID_KEY, Some(consts::DEFAULT_CF_ID))?;
|
||||
let column_id = try_parse_int(metadata, METADATA_COLUMN_ID_KEY, None)?;
|
||||
let comment = metadata
|
||||
.get(METADATA_COMMENT_KEY)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let desc = ColumnDescriptorBuilder::new(
|
||||
column_id,
|
||||
&column_schema.name,
|
||||
column_schema.data_type.clone(),
|
||||
)
|
||||
.is_nullable(column_schema.is_nullable())
|
||||
.default_constraint(column_schema.default_constraint().cloned())
|
||||
.comment(comment)
|
||||
.build()
|
||||
.context(BuildColumnDescriptorSnafu)?;
|
||||
|
||||
Ok(ColumnMetadata { cf_id, desc })
|
||||
}
|
||||
|
||||
fn to_metadata(&self) -> Metadata {
|
||||
let mut metadata = Metadata::new();
|
||||
if self.cf_id != consts::DEFAULT_CF_ID {
|
||||
metadata.insert(METADATA_CF_ID_KEY.to_string(), self.cf_id.to_string());
|
||||
}
|
||||
metadata.insert(METADATA_COLUMN_ID_KEY.to_string(), self.desc.id.to_string());
|
||||
if !self.desc.comment.is_empty() {
|
||||
metadata.insert(METADATA_COMMENT_KEY.to_string(), self.desc.comment.clone());
|
||||
}
|
||||
|
||||
metadata
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse_int<T>(metadata: &Metadata, key: &str, default_value: Option<T>) -> Result<T>
|
||||
where
|
||||
T: FromStr<Err = ParseIntError>,
|
||||
{
|
||||
if let Some(value) = metadata.get(key) {
|
||||
return value.parse().with_context(|_| ParseMetaIntSnafu {
|
||||
key_value: format!("{}={}", key, value),
|
||||
});
|
||||
}
|
||||
// No such key in metadata.
|
||||
|
||||
default_value.context(MetaNotFoundSnafu { key })
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
@@ -361,8 +485,9 @@ impl ColumnsMetadata {
|
||||
self.columns.iter().take(self.user_column_end)
|
||||
}
|
||||
|
||||
pub fn iter_all_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter()
|
||||
#[inline]
|
||||
pub fn columns(&self) -> &[ColumnMetadata] {
|
||||
&self.columns
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -710,8 +835,7 @@ impl RegionMetadataBuilder {
|
||||
|
||||
fn build(self) -> Result<RegionMetadata> {
|
||||
let columns = Arc::new(self.columns_meta_builder.build()?);
|
||||
let schema =
|
||||
Arc::new(RegionSchema::new(columns.clone(), self.version).context(InvalidSchemaSnafu)?);
|
||||
let schema = Arc::new(RegionSchema::new(columns.clone(), self.version)?);
|
||||
|
||||
Ok(RegionMetadata {
|
||||
id: self.id,
|
||||
@@ -765,10 +889,11 @@ fn is_internal_value_column(column_name: &str) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
// TODO(yingwen): Add tests for using invalid row_key/cf to build metadata.
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::schema::ColumnDefaultConstraint;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::value::Value;
|
||||
use store_api::storage::{
|
||||
AddColumn, AlterOperation, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder,
|
||||
RowKeyDescriptorBuilder,
|
||||
@@ -1231,4 +1356,27 @@ mod tests {
|
||||
};
|
||||
metadata.validate_alter(&req).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_metadata_conversion() {
|
||||
let desc = ColumnDescriptorBuilder::new(123, "test", ConcreteDataType::int32_datatype())
|
||||
.is_nullable(false)
|
||||
.default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(321))))
|
||||
.comment("hello")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let meta = ColumnMetadata {
|
||||
cf_id: consts::DEFAULT_CF_ID,
|
||||
desc: desc.clone(),
|
||||
};
|
||||
let column_schema = meta.to_column_schema().unwrap();
|
||||
let new_meta = ColumnMetadata::from_column_schema(&column_schema).unwrap();
|
||||
assert_eq!(meta, new_meta);
|
||||
|
||||
let meta = ColumnMetadata { cf_id: 567, desc };
|
||||
let column_schema = meta.to_column_schema().unwrap();
|
||||
let new_meta = ColumnMetadata::from_column_schema(&column_schema).unwrap();
|
||||
assert_eq!(meta, new_meta);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,71 +3,9 @@ mod projected;
|
||||
mod region;
|
||||
mod store;
|
||||
|
||||
use common_error::prelude::*;
|
||||
|
||||
pub use crate::schema::projected::{ProjectedSchema, ProjectedSchemaRef};
|
||||
pub use crate::schema::region::{RegionSchema, RegionSchemaRef};
|
||||
pub use crate::schema::store::StoreSchema;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to build schema, source: {}", source))]
|
||||
BuildSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert from arrow schema, source: {}", source))]
|
||||
ConvertArrowSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid internal column index in arrow schema"))]
|
||||
InvalidIndex { backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Missing metadata {} in arrow schema", key))]
|
||||
MissingMeta { key: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Missing column {} in arrow schema", column))]
|
||||
MissingColumn {
|
||||
column: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to parse index in schema meta, value: {}, source: {}",
|
||||
value,
|
||||
source
|
||||
))]
|
||||
ParseIndex {
|
||||
value: String,
|
||||
source: std::num::ParseIntError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to convert arrow chunk to batch, name: {}, source: {}",
|
||||
name,
|
||||
source
|
||||
))]
|
||||
ConvertChunk {
|
||||
name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert schema, source: {}", source))]
|
||||
ConvertSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid projection, {}", msg))]
|
||||
InvalidProjection { msg: String, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
pub use crate::schema::store::{StoreSchema, StoreSchemaRef};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -4,13 +4,14 @@ use std::sync::Arc;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datatypes::arrow::bitmap::MutableBitmap;
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::{BooleanVector, VectorRef};
|
||||
use store_api::storage::{Chunk, ColumnId};
|
||||
|
||||
use crate::error;
|
||||
use crate::metadata::{self, Result};
|
||||
use crate::read::{Batch, BatchOp};
|
||||
use crate::schema::{self, RegionSchema, RegionSchemaRef, Result, StoreSchema};
|
||||
use crate::schema::{RegionSchema, RegionSchemaRef, StoreSchema, StoreSchemaRef};
|
||||
|
||||
/// Metadata about projection.
|
||||
#[derive(Debug, Default)]
|
||||
@@ -91,7 +92,7 @@ pub struct ProjectedSchema {
|
||||
/// Projection info, `None` means don't need to do projection.
|
||||
projection: Option<Projection>,
|
||||
/// Schema used to read from data sources.
|
||||
schema_to_read: StoreSchema,
|
||||
schema_to_read: StoreSchemaRef,
|
||||
/// User schema after projection.
|
||||
projected_user_schema: SchemaRef,
|
||||
}
|
||||
@@ -145,7 +146,7 @@ impl ProjectedSchema {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn schema_to_read(&self) -> &StoreSchema {
|
||||
pub fn schema_to_read(&self) -> &StoreSchemaRef {
|
||||
&self.schema_to_read
|
||||
}
|
||||
|
||||
@@ -216,21 +217,25 @@ impl ProjectedSchema {
|
||||
fn build_schema_to_read(
|
||||
region_schema: &RegionSchema,
|
||||
projection: &Projection,
|
||||
) -> Result<StoreSchema> {
|
||||
let column_schemas: Vec<_> = projection
|
||||
) -> Result<StoreSchemaRef> {
|
||||
// Reorder columns according to the projection.
|
||||
let columns: Vec<_> = projection
|
||||
.columns_to_read
|
||||
.iter()
|
||||
.map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc))
|
||||
.map(|col_idx| region_schema.column_metadata(*col_idx))
|
||||
.cloned()
|
||||
.collect();
|
||||
// All row key columns are reserved in this schema, so we can use the row_key_end
|
||||
// and timestamp_key_index from region schema.
|
||||
StoreSchema::new(
|
||||
column_schemas,
|
||||
let store_schema = StoreSchema::new(
|
||||
columns,
|
||||
region_schema.version(),
|
||||
region_schema.timestamp_key_index(),
|
||||
region_schema.row_key_end(),
|
||||
projection.num_user_columns,
|
||||
)
|
||||
)?;
|
||||
|
||||
Ok(Arc::new(store_schema))
|
||||
}
|
||||
|
||||
fn build_projected_user_schema(
|
||||
@@ -252,17 +257,22 @@ impl ProjectedSchema {
|
||||
let column_schemas: Vec<_> = projection
|
||||
.projected_columns
|
||||
.iter()
|
||||
.map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc))
|
||||
.map(|col_idx| {
|
||||
region_schema
|
||||
.column_metadata(*col_idx)
|
||||
.desc
|
||||
.to_column_schema()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut builder = SchemaBuilder::try_from(column_schemas)
|
||||
.context(schema::ConvertSchemaSnafu)?
|
||||
.context(metadata::ConvertSchemaSnafu)?
|
||||
.version(region_schema.version());
|
||||
if let Some(timestamp_index) = timestamp_index {
|
||||
builder = builder.timestamp_index(timestamp_index);
|
||||
}
|
||||
|
||||
let schema = builder.build().context(schema::BuildSchemaSnafu)?;
|
||||
let schema = builder.build().context(metadata::InvalidSchemaSnafu)?;
|
||||
|
||||
Ok(Arc::new(schema))
|
||||
}
|
||||
@@ -272,7 +282,7 @@ impl ProjectedSchema {
|
||||
// should be always read, and the `StoreSchema` also requires the timestamp column.
|
||||
ensure!(
|
||||
!indices.is_empty(),
|
||||
schema::InvalidProjectionSnafu {
|
||||
metadata::InvalidProjectionSnafu {
|
||||
msg: "at least one column should be read",
|
||||
}
|
||||
);
|
||||
@@ -282,7 +292,7 @@ impl ProjectedSchema {
|
||||
for i in indices {
|
||||
ensure!(
|
||||
*i < user_schema.num_columns(),
|
||||
schema::InvalidProjectionSnafu {
|
||||
metadata::InvalidProjectionSnafu {
|
||||
msg: format!(
|
||||
"index {} out of bound, only contains {} columns",
|
||||
i,
|
||||
@@ -363,7 +373,8 @@ mod tests {
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::schema::{tests, Error};
|
||||
use crate::metadata::Error;
|
||||
use crate::schema::tests;
|
||||
use crate::test_util::{read_util, schema_util};
|
||||
|
||||
#[test]
|
||||
@@ -428,7 +439,8 @@ mod tests {
|
||||
|
||||
// Test is_needed
|
||||
let needed: Vec<_> = region_schema
|
||||
.all_columns()
|
||||
.columns()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, column_meta)| {
|
||||
if projected_schema.is_needed(column_meta.id()) {
|
||||
@@ -491,7 +503,7 @@ mod tests {
|
||||
projected_schema.schema_to_read()
|
||||
);
|
||||
|
||||
for column in region_schema.all_columns() {
|
||||
for column in region_schema.columns() {
|
||||
assert!(projected_schema.is_needed(column.id()));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{Schema, SchemaBuilder, SchemaRef};
|
||||
|
||||
use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef};
|
||||
use crate::schema::{self, Result, StoreSchema};
|
||||
use crate::metadata::{self, ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef, Result};
|
||||
use crate::schema::{StoreSchema, StoreSchemaRef};
|
||||
|
||||
/// Schema of region.
|
||||
///
|
||||
@@ -29,7 +29,7 @@ pub struct RegionSchema {
|
||||
/// is correct.
|
||||
user_schema: SchemaRef,
|
||||
/// store schema contains all columns of the region, including all internal columns.
|
||||
store_schema: StoreSchema,
|
||||
store_schema: StoreSchemaRef,
|
||||
/// Metadata of columns.
|
||||
columns: ColumnsMetadataRef,
|
||||
}
|
||||
@@ -37,7 +37,7 @@ pub struct RegionSchema {
|
||||
impl RegionSchema {
|
||||
pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result<RegionSchema> {
|
||||
let user_schema = Arc::new(build_user_schema(&columns, version)?);
|
||||
let store_schema = StoreSchema::from_columns_metadata(&columns, version)?;
|
||||
let store_schema = Arc::new(StoreSchema::from_columns_metadata(&columns, version)?);
|
||||
|
||||
debug_assert_eq!(user_schema.version(), store_schema.version());
|
||||
debug_assert_eq!(version, user_schema.version());
|
||||
@@ -58,7 +58,7 @@ impl RegionSchema {
|
||||
|
||||
/// Returns the schema actually stores, which would also contains all internal columns.
|
||||
#[inline]
|
||||
pub fn store_schema(&self) -> &StoreSchema {
|
||||
pub fn store_schema(&self) -> &StoreSchemaRef {
|
||||
&self.store_schema
|
||||
}
|
||||
|
||||
@@ -118,8 +118,8 @@ impl RegionSchema {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn all_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter_all_columns()
|
||||
pub(crate) fn columns(&self) -> &[ColumnMetadata] {
|
||||
self.columns.columns()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,15 +129,15 @@ pub type RegionSchemaRef = Arc<RegionSchema>;
|
||||
fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema> {
|
||||
let column_schemas: Vec<_> = columns
|
||||
.iter_user_columns()
|
||||
.map(|col| ColumnSchema::from(&col.desc))
|
||||
.map(|col| col.desc.to_column_schema())
|
||||
.collect();
|
||||
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.context(schema::ConvertSchemaSnafu)?
|
||||
.context(metadata::ConvertSchemaSnafu)?
|
||||
.timestamp_index(columns.timestamp_key_index())
|
||||
.version(version)
|
||||
.build()
|
||||
.context(schema::BuildSchemaSnafu)
|
||||
.context(metadata::InvalidSchemaSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -4,29 +4,30 @@ use common_error::prelude::*;
|
||||
use datatypes::arrow::array::Array;
|
||||
use datatypes::arrow::chunk::Chunk as ArrowChunk;
|
||||
use datatypes::arrow::datatypes::Schema as ArrowSchema;
|
||||
use datatypes::schema::{ColumnSchema, Metadata, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{Metadata, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use store_api::storage::consts;
|
||||
|
||||
use crate::metadata::ColumnsMetadata;
|
||||
use crate::metadata::{self, ColumnMetadata, ColumnsMetadata, Error, Result};
|
||||
use crate::read::Batch;
|
||||
use crate::schema::{self, Error, Result};
|
||||
|
||||
const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end";
|
||||
const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end";
|
||||
|
||||
/// Schema for storage engine.
|
||||
/// Schema that contains storage engine specific metadata, such as internal columns.
|
||||
///
|
||||
/// Used internally, contains all row key columns, internal columns and parts of value columns.
|
||||
///
|
||||
/// Only contains a reference to schema and some indices, so it should be cheap to clone.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
/// Used internally, contains all row key columns, internal columns and a sub set of
|
||||
/// value columns in a region. The columns are organized in `key, value, internal` order.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct StoreSchema {
|
||||
columns: Vec<ColumnMetadata>,
|
||||
schema: SchemaRef,
|
||||
row_key_end: usize,
|
||||
user_column_end: usize,
|
||||
}
|
||||
|
||||
pub type StoreSchemaRef = Arc<StoreSchema>;
|
||||
|
||||
impl StoreSchema {
|
||||
#[inline]
|
||||
pub fn version(&self) -> u32 {
|
||||
@@ -56,7 +57,7 @@ impl StoreSchema {
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, column)| {
|
||||
Helper::try_into_vector(column.clone()).context(schema::ConvertChunkSnafu {
|
||||
Helper::try_into_vector(column.clone()).context(metadata::ConvertChunkSnafu {
|
||||
name: self.column_name(i),
|
||||
})
|
||||
})
|
||||
@@ -87,13 +88,8 @@ impl StoreSchema {
|
||||
columns: &ColumnsMetadata,
|
||||
version: u32,
|
||||
) -> Result<StoreSchema> {
|
||||
let column_schemas: Vec<_> = columns
|
||||
.iter_all_columns()
|
||||
.map(|col| ColumnSchema::from(&col.desc))
|
||||
.collect();
|
||||
|
||||
StoreSchema::new(
|
||||
column_schemas,
|
||||
columns.columns().to_vec(),
|
||||
version,
|
||||
columns.timestamp_key_index(),
|
||||
columns.row_key_end(),
|
||||
@@ -102,20 +98,25 @@ impl StoreSchema {
|
||||
}
|
||||
|
||||
pub(crate) fn new(
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
columns: Vec<ColumnMetadata>,
|
||||
version: u32,
|
||||
timestamp_key_index: usize,
|
||||
row_key_end: usize,
|
||||
user_column_end: usize,
|
||||
) -> Result<StoreSchema> {
|
||||
let column_schemas = columns
|
||||
.iter()
|
||||
.map(|meta| meta.to_column_schema())
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let schema = SchemaBuilder::try_from(column_schemas)
|
||||
.context(schema::ConvertSchemaSnafu)?
|
||||
.context(metadata::ConvertSchemaSnafu)?
|
||||
.timestamp_index(timestamp_key_index)
|
||||
.version(version)
|
||||
.add_metadata(ROW_KEY_END_KEY, row_key_end.to_string())
|
||||
.add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string())
|
||||
.build()
|
||||
.context(schema::BuildSchemaSnafu)?;
|
||||
.context(metadata::InvalidSchemaSnafu)?;
|
||||
|
||||
assert_eq!(
|
||||
consts::SEQUENCE_COLUMN_NAME,
|
||||
@@ -127,6 +128,7 @@ impl StoreSchema {
|
||||
);
|
||||
|
||||
Ok(StoreSchema {
|
||||
columns,
|
||||
schema: Arc::new(schema),
|
||||
row_key_end,
|
||||
user_column_end,
|
||||
@@ -163,7 +165,7 @@ impl TryFrom<ArrowSchema> for StoreSchema {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(arrow_schema: ArrowSchema) -> Result<StoreSchema> {
|
||||
let schema = Schema::try_from(arrow_schema).context(schema::ConvertArrowSchemaSnafu)?;
|
||||
let schema = Schema::try_from(arrow_schema).context(metadata::ConvertArrowSchemaSnafu)?;
|
||||
// Recover other metadata from schema.
|
||||
let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?;
|
||||
let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?;
|
||||
@@ -171,14 +173,22 @@ impl TryFrom<ArrowSchema> for StoreSchema {
|
||||
// There should be sequence and op_type columns.
|
||||
ensure!(
|
||||
consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name,
|
||||
schema::InvalidIndexSnafu
|
||||
metadata::InvalidIndexSnafu
|
||||
);
|
||||
ensure!(
|
||||
consts::OP_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name,
|
||||
schema::InvalidIndexSnafu
|
||||
metadata::InvalidIndexSnafu
|
||||
);
|
||||
|
||||
// Recover ColumnMetadata from schema.
|
||||
let columns = schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(ColumnMetadata::from_column_schema)
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
Ok(StoreSchema {
|
||||
columns,
|
||||
schema: Arc::new(schema),
|
||||
row_key_end,
|
||||
user_column_end,
|
||||
@@ -189,21 +199,20 @@ impl TryFrom<ArrowSchema> for StoreSchema {
|
||||
fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result<usize> {
|
||||
let value = metadata
|
||||
.get(key)
|
||||
.context(schema::MissingMetaSnafu { key })?;
|
||||
value.parse().context(schema::ParseIndexSnafu { value })
|
||||
.context(metadata::MetaNotFoundSnafu { key })?;
|
||||
value.parse().with_context(|_| metadata::ParseMetaIntSnafu {
|
||||
key_value: format!("{}={}", key, value),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::arrow::array::Array;
|
||||
use datatypes::arrow::chunk::Chunk as ArrowChunk;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use store_api::storage::consts;
|
||||
|
||||
use super::*;
|
||||
use crate::read::Batch;
|
||||
use crate::schema::tests;
|
||||
use crate::test_util::schema_util;
|
||||
|
||||
fn check_chunk_batch(chunk: &ArrowChunk<Arc<dyn Array>>, batch: &Batch) {
|
||||
assert_eq!(5, chunk.columns().len());
|
||||
@@ -224,22 +233,24 @@ mod tests {
|
||||
let sst_arrow_schema = store_schema.arrow_schema();
|
||||
let converted_store_schema = StoreSchema::try_from((**sst_arrow_schema).clone()).unwrap();
|
||||
|
||||
assert_eq!(*store_schema, converted_store_schema);
|
||||
assert_eq!(**store_schema, converted_store_schema);
|
||||
|
||||
let expect_schema = schema_util::new_schema_with_version(
|
||||
&[
|
||||
("k0", LogicalTypeId::Int64, false),
|
||||
("timestamp", LogicalTypeId::Timestamp, false),
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
(consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
(consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false),
|
||||
],
|
||||
Some(1),
|
||||
123,
|
||||
);
|
||||
let column_schemas: Vec<_> = region_schema
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|meta| meta.to_column_schema().unwrap())
|
||||
.collect();
|
||||
let expect_schema = SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.version(123)
|
||||
.timestamp_index(1)
|
||||
.build()
|
||||
.unwrap();
|
||||
// Only compare column schemas since SchemaRef in StoreSchema also contains other metadata that only used
|
||||
// by StoreSchema.
|
||||
assert_eq!(
|
||||
expect_schema.column_schemas(),
|
||||
store_schema.schema().column_schemas()
|
||||
store_schema.schema().column_schemas(),
|
||||
);
|
||||
assert_eq!(3, store_schema.sequence_index());
|
||||
assert_eq!(4, store_schema.op_type_index());
|
||||
|
||||
@@ -39,6 +39,14 @@ impl ColumnDescriptor {
|
||||
pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
|
||||
self.default_constraint.as_ref()
|
||||
}
|
||||
|
||||
/// Convert [ColumnDescriptor] to [ColumnSchema]. Fields not in ColumnSchema **will not**
|
||||
/// be stored as metadata.
|
||||
pub fn to_column_schema(&self) -> ColumnSchema {
|
||||
ColumnSchema::new(&self.name, self.data_type.clone(), self.is_nullable)
|
||||
.with_default_constraint(self.default_constraint.clone())
|
||||
.expect("ColumnDescriptor should validate default constraint")
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnDescriptorBuilder {
|
||||
@@ -74,14 +82,6 @@ impl ColumnDescriptorBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ColumnDescriptor> for ColumnSchema {
|
||||
fn from(desc: &ColumnDescriptor) -> ColumnSchema {
|
||||
ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable)
|
||||
.with_default_constraint(desc.default_constraint.clone())
|
||||
.expect("ColumnDescriptor should validate default constraint")
|
||||
}
|
||||
}
|
||||
|
||||
/// A [RowKeyDescriptor] contains information about row key.
|
||||
#[derive(Debug, Clone, PartialEq, Builder)]
|
||||
#[builder(pattern = "owned")]
|
||||
@@ -206,6 +206,22 @@ mod tests {
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_descriptor_to_column_schema() {
|
||||
let constraint = ColumnDefaultConstraint::Value(Value::Int32(123));
|
||||
let desc = new_column_desc_builder()
|
||||
.default_constraint(Some(constraint.clone()))
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap();
|
||||
let column_schema = desc.to_column_schema();
|
||||
let expected = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
|
||||
.with_default_constraint(Some(constraint))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(expected, column_schema);
|
||||
}
|
||||
|
||||
fn new_timestamp_desc() -> ColumnDescriptor {
|
||||
ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype())
|
||||
.build()
|
||||
|
||||
Reference in New Issue
Block a user