mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 16:10:02 +00:00
refactor: Serialize RawSchema/RawTableMeta/RawTableInfo (#382)
* refactor: Serialize Schema/TableMeta/TableInfo to raw structs * test: Add tests for raw struct conversion * style: Fix clippy * refactor: SchemaBuilder::timestamp_index takes Option<usize> So caller could chain the timestamp_index method call where there is no timestamp index. * style(datatypes): Chains SchemaBuilder method calls
This commit is contained in:
@@ -171,7 +171,7 @@ fn build_system_catalog_schema() -> Schema {
|
||||
// The schema of this table must be valid.
|
||||
SchemaBuilder::try_from(cols)
|
||||
.unwrap()
|
||||
.timestamp_index(2)
|
||||
.timestamp_index(Some(2))
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::{TableId, TableMeta, TableVersion};
|
||||
use table::metadata::{RawTableMeta, TableId, TableVersion};
|
||||
|
||||
use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX};
|
||||
use crate::error::{
|
||||
@@ -101,7 +101,7 @@ impl TableKey {
|
||||
pub struct TableValue {
|
||||
pub id: TableId,
|
||||
pub node_id: String,
|
||||
pub meta: TableMeta,
|
||||
pub meta: RawTableMeta,
|
||||
}
|
||||
|
||||
impl TableValue {
|
||||
@@ -204,10 +204,8 @@ impl SchemaValue {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -254,14 +252,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_serialize_schema() {
|
||||
let schema_ref = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
let schema = Schema::new(vec![ColumnSchema::new(
|
||||
"name",
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
)]));
|
||||
)]);
|
||||
|
||||
let meta = TableMeta {
|
||||
schema: schema_ref,
|
||||
let meta = RawTableMeta {
|
||||
schema: RawSchema::from(&schema),
|
||||
engine: "mito".to_string(),
|
||||
created_on: chrono::DateTime::default(),
|
||||
primary_key_indices: vec![0, 1],
|
||||
|
||||
@@ -145,7 +145,7 @@ fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
|
||||
Ok(Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.context(error::CreateSchemaSnafu)?
|
||||
.timestamp_index(ts_index)
|
||||
.timestamp_index(Some(ts_index))
|
||||
.build()
|
||||
.context(error::CreateSchemaSnafu)?,
|
||||
))
|
||||
@@ -314,7 +314,7 @@ mod tests {
|
||||
Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -153,7 +153,7 @@ pub fn build_create_table_request(
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(timestamp_index)
|
||||
.timestamp_index(Some(timestamp_index))
|
||||
.build()
|
||||
.context(error::CreateSchemaSnafu)?,
|
||||
);
|
||||
@@ -430,7 +430,7 @@ mod tests {
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(columns)
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
@@ -537,7 +537,7 @@ mod tests {
|
||||
Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(3)
|
||||
.timestamp_index(Some(3))
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -133,7 +133,7 @@ mod tests {
|
||||
Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(3)
|
||||
.timestamp_index(Some(3))
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -143,7 +143,7 @@ impl SqlHandler {
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(columns_schemas)
|
||||
.context(CreateSchemaSnafu)?
|
||||
.timestamp_index(ts_index)
|
||||
.timestamp_index(Some(ts_index))
|
||||
.build()
|
||||
.context(CreateSchemaSnafu)?,
|
||||
);
|
||||
|
||||
@@ -65,7 +65,7 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) -
|
||||
schema: Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(3)
|
||||
.timestamp_index(Some(3))
|
||||
.build()
|
||||
.expect("ts is expected to be timestamp column"),
|
||||
),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod constraint;
|
||||
mod raw;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
@@ -11,6 +12,7 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu};
|
||||
pub use crate::schema::constraint::ColumnDefaultConstraint;
|
||||
pub use crate::schema::raw::RawSchema;
|
||||
use crate::vectors::VectorRef;
|
||||
|
||||
/// Key used to store column name of the timestamp column in metadata.
|
||||
@@ -104,7 +106,7 @@ impl ColumnSchema {
|
||||
}
|
||||
|
||||
/// A common schema, should be immutable.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Schema {
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
name_to_index: HashMap<String, usize>,
|
||||
@@ -125,19 +127,20 @@ impl Schema {
|
||||
pub const INITIAL_VERSION: u32 = 0;
|
||||
|
||||
/// Create a schema from a vector of [ColumnSchema].
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics when ColumnSchema's `default_constrait` can't be serialized into json.
|
||||
pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
|
||||
// Builder won't fail
|
||||
// Builder won't fail in this case
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Try to Create a schema from a vector of [ColumnSchema].
|
||||
pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
|
||||
// Builder won't fail
|
||||
Ok(SchemaBuilder::try_from(column_schemas)?.build().unwrap())
|
||||
SchemaBuilder::try_from(column_schemas)?.build()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -239,8 +242,8 @@ impl SchemaBuilder {
|
||||
/// Set timestamp index.
|
||||
///
|
||||
/// The validation of timestamp column is done in `build()`.
|
||||
pub fn timestamp_index(mut self, timestamp_index: usize) -> Self {
|
||||
self.timestamp_index = Some(timestamp_index);
|
||||
pub fn timestamp_index(mut self, timestamp_index: Option<usize>) -> Self {
|
||||
self.timestamp_index = timestamp_index;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -547,7 +550,10 @@ mod tests {
|
||||
assert_eq!(0, schema.num_columns());
|
||||
assert!(schema.is_empty());
|
||||
|
||||
assert!(SchemaBuilder::default().timestamp_index(0).build().is_err());
|
||||
assert!(SchemaBuilder::default()
|
||||
.timestamp_index(Some(0))
|
||||
.build()
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -600,7 +606,7 @@ mod tests {
|
||||
];
|
||||
let schema = SchemaBuilder::try_from(column_schemas.clone())
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
.version(123)
|
||||
.build()
|
||||
.unwrap();
|
||||
@@ -622,17 +628,17 @@ mod tests {
|
||||
];
|
||||
assert!(SchemaBuilder::try_from(column_schemas.clone())
|
||||
.unwrap()
|
||||
.timestamp_index(0)
|
||||
.timestamp_index(Some(0))
|
||||
.build()
|
||||
.is_err());
|
||||
assert!(SchemaBuilder::try_from(column_schemas.clone())
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
.build()
|
||||
.is_err());
|
||||
assert!(SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
.build()
|
||||
.is_err());
|
||||
}
|
||||
|
||||
60
src/datatypes/src/schema/raw.rs
Normal file
60
src/datatypes/src/schema/raw.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::schema::{ColumnSchema, Schema, SchemaBuilder};
|
||||
|
||||
/// Struct used to serialize and deserialize [`Schema`](crate::schema::Schema).
|
||||
///
|
||||
/// This struct only contains necessary data to recover the Schema.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RawSchema {
|
||||
pub column_schemas: Vec<ColumnSchema>,
|
||||
pub timestamp_index: Option<usize>,
|
||||
pub version: u32,
|
||||
}
|
||||
|
||||
impl TryFrom<RawSchema> for Schema {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(raw: RawSchema) -> Result<Schema> {
|
||||
SchemaBuilder::try_from(raw.column_schemas)?
|
||||
.timestamp_index(raw.timestamp_index)
|
||||
.version(raw.version)
|
||||
.build()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Schema> for RawSchema {
|
||||
fn from(schema: &Schema) -> RawSchema {
|
||||
RawSchema {
|
||||
column_schemas: schema.column_schemas.clone(),
|
||||
timestamp_index: schema.timestamp_index,
|
||||
version: schema.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::data_type::ConcreteDataType;
|
||||
|
||||
#[test]
|
||||
fn test_raw_convert() {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
|
||||
];
|
||||
let schema = SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(Some(1))
|
||||
.version(123)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let raw = RawSchema::from(&schema);
|
||||
let schema_new = Schema::try_from(raw).unwrap();
|
||||
|
||||
assert_eq!(schema, schema_new);
|
||||
}
|
||||
}
|
||||
@@ -217,7 +217,7 @@ fn build_scripts_schema() -> Schema {
|
||||
// Schema is always valid here
|
||||
SchemaBuilder::try_from(cols)
|
||||
.unwrap()
|
||||
.timestamp_index(3)
|
||||
.timestamp_index(Some(3))
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
|
||||
|
||||
pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> Schema {
|
||||
let column_schemas = column_defs
|
||||
let column_schemas: Vec<_> = column_defs
|
||||
.iter()
|
||||
.map(|column_def| {
|
||||
let datatype = column_def.1.data_type();
|
||||
@@ -15,15 +15,11 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) ->
|
||||
})
|
||||
.collect();
|
||||
|
||||
if let Some(index) = timestamp_index {
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(index)
|
||||
.build()
|
||||
.unwrap()
|
||||
} else {
|
||||
Schema::new(column_schemas)
|
||||
}
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(timestamp_index)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> SchemaRef {
|
||||
|
||||
@@ -78,16 +78,14 @@ impl TryFrom<Schema> for schema::SchemaRef {
|
||||
.map(schema::ColumnSchema::try_from)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let schema: schema::SchemaRef = match schema.timestamp_index {
|
||||
Some(index) => Arc::new(
|
||||
schema::SchemaBuilder::try_from(column_schemas)
|
||||
.context(ConvertSchemaSnafu)?
|
||||
.timestamp_index(index.value as usize)
|
||||
.build()
|
||||
.context(ConvertSchemaSnafu)?,
|
||||
),
|
||||
None => Arc::new(schema::Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?),
|
||||
};
|
||||
let timestamp_index = schema.timestamp_index.map(|index| index.value as usize);
|
||||
let schema = Arc::new(
|
||||
schema::SchemaBuilder::try_from(column_schemas)
|
||||
.context(ConvertSchemaSnafu)?
|
||||
.timestamp_index(timestamp_index)
|
||||
.build()
|
||||
.context(ConvertSchemaSnafu)?,
|
||||
);
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
|
||||
@@ -235,14 +235,12 @@ impl ProjectedSchema {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut builder = SchemaBuilder::try_from(column_schemas)
|
||||
let schema = SchemaBuilder::try_from(column_schemas)
|
||||
.context(metadata::ConvertSchemaSnafu)?
|
||||
.version(region_schema.version());
|
||||
if let Some(timestamp_index) = timestamp_index {
|
||||
builder = builder.timestamp_index(timestamp_index);
|
||||
}
|
||||
|
||||
let schema = builder.build().context(metadata::InvalidSchemaSnafu)?;
|
||||
.timestamp_index(timestamp_index)
|
||||
.version(region_schema.version())
|
||||
.build()
|
||||
.context(metadata::InvalidSchemaSnafu)?;
|
||||
|
||||
Ok(Arc::new(schema))
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema>
|
||||
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.context(metadata::ConvertSchemaSnafu)?
|
||||
.timestamp_index(columns.timestamp_key_index())
|
||||
.timestamp_index(Some(columns.timestamp_key_index()))
|
||||
.version(version)
|
||||
.build()
|
||||
.context(metadata::InvalidSchemaSnafu)
|
||||
|
||||
@@ -94,7 +94,7 @@ impl StoreSchema {
|
||||
|
||||
let schema = SchemaBuilder::try_from(column_schemas)
|
||||
.context(metadata::ConvertSchemaSnafu)?
|
||||
.timestamp_index(timestamp_key_index)
|
||||
.timestamp_index(Some(timestamp_key_index))
|
||||
.version(version)
|
||||
.add_metadata(ROW_KEY_END_KEY, row_key_end.to_string())
|
||||
.add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string())
|
||||
@@ -252,7 +252,7 @@ mod tests {
|
||||
let expect_schema = SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.version(123)
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
.build()
|
||||
.unwrap();
|
||||
// Only compare column schemas since SchemaRef in StoreSchema also contains other metadata that only used
|
||||
|
||||
@@ -23,13 +23,12 @@ pub fn new_schema_with_version(
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut builder = SchemaBuilder::try_from(column_schemas)
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(timestamp_index)
|
||||
.version(version)
|
||||
.build()
|
||||
.unwrap()
|
||||
.version(version);
|
||||
if let Some(index) = timestamp_index {
|
||||
builder = builder.timestamp_index(index);
|
||||
}
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> SchemaRef {
|
||||
|
||||
@@ -109,7 +109,7 @@ mod tests {
|
||||
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
.timestamp_index(Some(1))
|
||||
}
|
||||
|
||||
fn new_test_schema(v0_constraint: Option<Option<ColumnDefaultConstraint>>) -> SchemaRef {
|
||||
|
||||
@@ -464,7 +464,7 @@ mod tests {
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(2)
|
||||
.timestamp_index(Some(2))
|
||||
.build()
|
||||
.expect("ts must be timestamp column"),
|
||||
);
|
||||
|
||||
@@ -178,6 +178,15 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to convert metadata from deserialized data, source: {}",
|
||||
source
|
||||
))]
|
||||
ConvertRaw {
|
||||
#[snafu(backtrace)]
|
||||
source: table::metadata::ConvertError,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<Error> for table::error::Error {
|
||||
@@ -215,7 +224,7 @@ impl ErrorExt for Error {
|
||||
|
||||
ColumnsNotExist { .. } => StatusCode::TableColumnNotFound,
|
||||
|
||||
TableInfoNotFound { .. } => StatusCode::Unexpected,
|
||||
TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected,
|
||||
|
||||
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ mod tests {
|
||||
use storage::manifest::MetaActionIteratorImpl;
|
||||
use store_api::manifest::action::ProtocolAction;
|
||||
use store_api::manifest::{Manifest, MetaActionIterator};
|
||||
use table::metadata::TableInfo;
|
||||
use table::metadata::{RawTableInfo, TableInfo};
|
||||
|
||||
use super::*;
|
||||
use crate::manifest::action::{TableChange, TableMetaAction, TableRemove};
|
||||
@@ -32,7 +32,7 @@ mod tests {
|
||||
matches!(&action_list.actions[0], TableMetaAction::Protocol(p) if *p == *protocol)
|
||||
);
|
||||
assert!(
|
||||
matches!(&action_list.actions[1], TableMetaAction::Change(c) if c.table_info == *table_info)
|
||||
matches!(&action_list.actions[1], TableMetaAction::Change(c) if TableInfo::try_from(c.table_info.clone()).unwrap() == *table_info)
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
@@ -52,7 +52,7 @@ mod tests {
|
||||
let table_info = test_util::build_test_table_info();
|
||||
let action_list =
|
||||
TableMetaActionList::new(vec![TableMetaAction::Change(Box::new(TableChange {
|
||||
table_info: table_info.clone(),
|
||||
table_info: RawTableInfo::from(table_info.clone()),
|
||||
}))]);
|
||||
|
||||
assert_eq!(0, manifest.update(action_list).await.unwrap());
|
||||
|
||||
@@ -11,11 +11,11 @@ use storage::manifest::helper;
|
||||
use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader};
|
||||
use store_api::manifest::ManifestVersion;
|
||||
use store_api::manifest::MetaAction;
|
||||
use table::metadata::{TableIdent, TableInfo};
|
||||
use table::metadata::{RawTableInfo, TableIdent};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct TableChange {
|
||||
pub table_info: TableInfo,
|
||||
pub table_info: RawTableInfo,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
@@ -137,7 +137,7 @@ mod tests {
|
||||
let mut protocol = ProtocolAction::new();
|
||||
protocol.min_reader_version = 1;
|
||||
|
||||
let table_info = test_util::build_test_table_info();
|
||||
let table_info = RawTableInfo::from(test_util::build_test_table_info());
|
||||
|
||||
let mut action_list = TableMetaActionList::new(vec![
|
||||
TableMetaAction::Protocol(protocol.clone()),
|
||||
|
||||
@@ -28,7 +28,7 @@ use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder};
|
||||
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
|
||||
use table::table::scan::SimpleTableScan;
|
||||
use table::{
|
||||
metadata::{TableInfo, TableType},
|
||||
metadata::{RawTableInfo, TableInfo, TableType},
|
||||
table::Table,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
@@ -288,7 +288,7 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
self.manifest
|
||||
.update(TableMetaActionList::with_action(TableMetaAction::Change(
|
||||
Box::new(TableChange {
|
||||
table_info: new_info.clone(),
|
||||
table_info: RawTableInfo::from(new_info.clone()),
|
||||
}),
|
||||
)))
|
||||
.await
|
||||
@@ -339,11 +339,9 @@ fn build_table_schema_with_new_columns(
|
||||
.context(SchemaBuildSnafu {
|
||||
msg: "Failed to convert column schemas into table schema",
|
||||
})?
|
||||
.timestamp_index(table_schema.timestamp_index())
|
||||
.version(table_schema.version() + 1);
|
||||
|
||||
if let Some(index) = table_schema.timestamp_index() {
|
||||
builder = builder.timestamp_index(index);
|
||||
}
|
||||
for (k, v) in table_schema.arrow_schema().metadata.iter() {
|
||||
builder = builder.add_metadata(k, v);
|
||||
}
|
||||
@@ -455,7 +453,7 @@ impl<R: Region> MitoTable<R> {
|
||||
let _manifest_version = manifest
|
||||
.update(TableMetaActionList::with_action(TableMetaAction::Change(
|
||||
Box::new(TableChange {
|
||||
table_info: table_info.clone(),
|
||||
table_info: RawTableInfo::from(table_info.clone()),
|
||||
}),
|
||||
)))
|
||||
.await
|
||||
@@ -515,10 +513,12 @@ impl<R: Region> MitoTable<R> {
|
||||
for action in action_list.actions {
|
||||
match action {
|
||||
TableMetaAction::Change(c) => {
|
||||
table_info = Some(c.table_info);
|
||||
table_info = Some(
|
||||
TableInfo::try_from(c.table_info).context(error::ConvertRawSnafu)?,
|
||||
);
|
||||
}
|
||||
TableMetaAction::Protocol(_) => {}
|
||||
_ => unimplemented!(),
|
||||
TableMetaAction::Remove(_) => unimplemented!("Drop table is unimplemented"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ pub fn schema_for_test() -> Schema {
|
||||
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(3)
|
||||
.timestamp_index(Some(3))
|
||||
.build()
|
||||
.expect("ts must be timestamp column")
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use datatypes::schema::SchemaRef;
|
||||
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
|
||||
use datatypes::schema::{RawSchema, Schema, SchemaRef};
|
||||
use derive_builder::Builder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::ColumnId;
|
||||
@@ -38,13 +39,14 @@ pub enum TableType {
|
||||
Temporary,
|
||||
}
|
||||
|
||||
/// Identifier of the table.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
|
||||
pub struct TableIdent {
|
||||
pub table_id: TableId,
|
||||
pub version: TableVersion,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Builder, PartialEq)]
|
||||
#[derive(Clone, Debug, Builder, PartialEq)]
|
||||
#[builder(pattern = "mutable")]
|
||||
pub struct TableMeta {
|
||||
pub schema: SchemaRef,
|
||||
@@ -54,8 +56,10 @@ pub struct TableMeta {
|
||||
#[builder(default, setter(into))]
|
||||
pub engine: String,
|
||||
pub next_column_id: ColumnId,
|
||||
/// Options for table engine.
|
||||
#[builder(default)]
|
||||
pub engine_options: HashMap<String, String>,
|
||||
/// Table options.
|
||||
#[builder(default)]
|
||||
pub options: HashMap<String, String>,
|
||||
#[builder(default = "Utc::now()")]
|
||||
@@ -92,13 +96,14 @@ impl TableMeta {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Builder)]
|
||||
#[derive(Clone, Debug, PartialEq, Builder)]
|
||||
#[builder(pattern = "owned")]
|
||||
pub struct TableInfo {
|
||||
#[builder(default, setter(into))]
|
||||
pub ident: TableIdent,
|
||||
#[builder(setter(into))]
|
||||
pub name: String,
|
||||
/// Comment of the table.
|
||||
#[builder(default, setter(into))]
|
||||
pub desc: Option<String>,
|
||||
#[builder(default, setter(into))]
|
||||
@@ -121,15 +126,15 @@ impl TableInfoBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_id(mut self, id: impl Into<TableId>) -> Self {
|
||||
pub fn table_id(mut self, id: TableId) -> Self {
|
||||
let ident = self.ident.get_or_insert_with(TableIdent::default);
|
||||
ident.table_id = id.into();
|
||||
ident.table_id = id;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn table_version(mut self, version: impl Into<TableVersion>) -> Self {
|
||||
pub fn table_version(mut self, version: TableVersion) -> Self {
|
||||
let ident = self.ident.get_or_insert_with(TableIdent::default);
|
||||
ident.version = version.into();
|
||||
ident.version = version;
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -148,3 +153,136 @@ impl From<TableId> for TableIdent {
|
||||
Self::new(table_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct used to serialize and deserialize [`TableMeta`].
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub struct RawTableMeta {
|
||||
pub schema: RawSchema,
|
||||
pub primary_key_indices: Vec<usize>,
|
||||
pub value_indices: Vec<usize>,
|
||||
pub engine: String,
|
||||
pub next_column_id: ColumnId,
|
||||
pub engine_options: HashMap<String, String>,
|
||||
pub options: HashMap<String, String>,
|
||||
pub created_on: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl From<TableMeta> for RawTableMeta {
|
||||
fn from(meta: TableMeta) -> RawTableMeta {
|
||||
RawTableMeta {
|
||||
schema: RawSchema::from(&*meta.schema),
|
||||
primary_key_indices: meta.primary_key_indices,
|
||||
value_indices: meta.value_indices,
|
||||
engine: meta.engine,
|
||||
next_column_id: meta.next_column_id,
|
||||
engine_options: meta.engine_options,
|
||||
options: meta.options,
|
||||
created_on: meta.created_on,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RawTableMeta> for TableMeta {
|
||||
type Error = ConvertError;
|
||||
|
||||
fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
|
||||
Ok(TableMeta {
|
||||
schema: Arc::new(Schema::try_from(raw.schema)?),
|
||||
primary_key_indices: raw.primary_key_indices,
|
||||
value_indices: raw.value_indices,
|
||||
engine: raw.engine,
|
||||
next_column_id: raw.next_column_id,
|
||||
engine_options: raw.engine_options,
|
||||
options: raw.options,
|
||||
created_on: raw.created_on,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct used to serialize and deserialize [`TableInfo`].
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub struct RawTableInfo {
|
||||
pub ident: TableIdent,
|
||||
pub name: String,
|
||||
pub desc: Option<String>,
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub meta: RawTableMeta,
|
||||
pub table_type: TableType,
|
||||
}
|
||||
|
||||
impl From<TableInfo> for RawTableInfo {
|
||||
fn from(info: TableInfo) -> RawTableInfo {
|
||||
RawTableInfo {
|
||||
ident: info.ident,
|
||||
name: info.name,
|
||||
desc: info.desc,
|
||||
catalog_name: info.catalog_name,
|
||||
schema_name: info.schema_name,
|
||||
meta: RawTableMeta::from(info.meta),
|
||||
table_type: info.table_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RawTableInfo> for TableInfo {
|
||||
type Error = ConvertError;
|
||||
|
||||
fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
|
||||
Ok(TableInfo {
|
||||
ident: raw.ident,
|
||||
name: raw.name,
|
||||
desc: raw.desc,
|
||||
catalog_name: raw.catalog_name,
|
||||
schema_name: raw.schema_name,
|
||||
meta: TableMeta::try_from(raw.meta)?,
|
||||
table_type: raw.table_type,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn new_test_schema() -> Schema {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
|
||||
];
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(Some(1))
|
||||
.version(123)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_raw_convert() {
|
||||
let schema = Arc::new(new_test_schema());
|
||||
let meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![1])
|
||||
.value_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(2)
|
||||
.build()
|
||||
.unwrap();
|
||||
let info = TableInfoBuilder::default()
|
||||
.table_id(10)
|
||||
.table_version(5)
|
||||
.name("mytable")
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let raw = RawTableInfo::from(info.clone());
|
||||
let info_new = TableInfo::try_from(raw).unwrap();
|
||||
|
||||
assert_eq!(info, info_new);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user