mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 20:40:39 +00:00
refactor: changes CreateTableRequest::schema to RawSchema (#1018)
* refactor: changes CreateTableRequest::schema to RawSchema * refactor(grpc-expr): create_table_schema returns RawSchema
This commit is contained in:
@@ -19,7 +19,6 @@ use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::prelude::{Snafu, StatusCode};
|
||||
use datafusion::error::DataFusionError;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::RawSchema;
|
||||
use snafu::{Backtrace, ErrorCompat};
|
||||
|
||||
use crate::DeregisterTableRequest;
|
||||
@@ -162,19 +161,6 @@ pub enum Error {
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid table schema in catalog entry, table:{}, schema: {:?}, source: {}",
|
||||
table_info,
|
||||
schema,
|
||||
source
|
||||
))]
|
||||
InvalidTableSchema {
|
||||
table_info: String,
|
||||
schema: RawSchema,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failure during SchemaProvider operation, source: {}", source))]
|
||||
SchemaProviderOperation {
|
||||
#[snafu(backtrace)]
|
||||
@@ -254,8 +240,7 @@ impl ErrorExt for Error {
|
||||
Error::MetaSrv { source, .. } => source.status_code(),
|
||||
Error::SystemCatalogTableScan { source } => source.status_code(),
|
||||
Error::SystemCatalogTableScanExec { source } => source.status_code(),
|
||||
Error::InvalidTableSchema { source, .. }
|
||||
| Error::InvalidTableInfoInCatalog { source } => source.status_code(),
|
||||
Error::InvalidTableInfoInCatalog { source } => source.status_code(),
|
||||
Error::SchemaProviderOperation { source } | Error::Internal { source } => {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
@@ -32,8 +32,8 @@ use table::TableRef;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu,
|
||||
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
|
||||
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result,
|
||||
SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
|
||||
};
|
||||
use crate::helper::{
|
||||
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
|
||||
@@ -346,21 +346,13 @@ impl RemoteCatalogManager {
|
||||
);
|
||||
|
||||
let meta = &table_info.meta;
|
||||
let schema = meta
|
||||
.schema
|
||||
.clone()
|
||||
.try_into()
|
||||
.context(InvalidTableSchemaSnafu {
|
||||
table_info: format!("{catalog_name}.{schema_name}.{table_name}"),
|
||||
schema: meta.schema.clone(),
|
||||
})?;
|
||||
let req = CreateTableRequest {
|
||||
id: table_id,
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
desc: None,
|
||||
schema: Arc::new(schema),
|
||||
schema: meta.schema.clone(),
|
||||
region_numbers: region_numbers.clone(),
|
||||
primary_key_indices: meta.primary_key_indices.clone(),
|
||||
create_if_not_exists: true,
|
||||
|
||||
@@ -26,7 +26,7 @@ use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use common_time::util;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema, SchemaRef};
|
||||
use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -88,7 +88,7 @@ impl SystemCatalogTable {
|
||||
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
table_id: SYSTEM_CATALOG_TABLE_ID,
|
||||
};
|
||||
let schema = Arc::new(build_system_catalog_schema());
|
||||
let schema = build_system_catalog_schema();
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
if let Some(table) = engine
|
||||
@@ -105,7 +105,7 @@ impl SystemCatalogTable {
|
||||
schema_name: INFORMATION_SCHEMA_NAME.to_string(),
|
||||
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
desc: Some("System catalog table".to_string()),
|
||||
schema: schema.clone(),
|
||||
schema,
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX],
|
||||
create_if_not_exists: true,
|
||||
@@ -143,7 +143,7 @@ impl SystemCatalogTable {
|
||||
/// - value: JSON-encoded value of entry's metadata.
|
||||
/// - gmt_created: create time of this metadata.
|
||||
/// - gmt_modified: last updated time of this metadata.
|
||||
fn build_system_catalog_schema() -> Schema {
|
||||
fn build_system_catalog_schema() -> RawSchema {
|
||||
let cols = vec![
|
||||
ColumnSchema::new(
|
||||
"entry_type".to_string(),
|
||||
@@ -178,8 +178,7 @@ fn build_system_catalog_schema() -> Schema {
|
||||
),
|
||||
];
|
||||
|
||||
// The schema of this table must be valid.
|
||||
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
|
||||
RawSchema::new(cols)
|
||||
}
|
||||
|
||||
/// Formats key string for table entry in system catalog
|
||||
|
||||
@@ -28,7 +28,7 @@ mod tests {
|
||||
};
|
||||
use catalog::{CatalogList, CatalogManager, RegisterTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::schema::RawSchema;
|
||||
use futures_util::StreamExt;
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::CreateTableRequest;
|
||||
@@ -116,7 +116,7 @@ mod tests {
|
||||
let schema_name = "nonexistent_schema".to_string();
|
||||
let table_name = "fail_table".to_string();
|
||||
// this schema has no effect
|
||||
let table_schema = Arc::new(Schema::new(vec![]));
|
||||
let table_schema = RawSchema::new(vec![]);
|
||||
let table = table_engine
|
||||
.create_table(
|
||||
&EngineContext {},
|
||||
@@ -126,7 +126,7 @@ mod tests {
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
desc: None,
|
||||
schema: table_schema.clone(),
|
||||
schema: table_schema,
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: false,
|
||||
@@ -176,7 +176,7 @@ mod tests {
|
||||
let table_name = "test_table".to_string();
|
||||
let table_id = 1;
|
||||
// this schema has no effect
|
||||
let table_schema = Arc::new(Schema::new(vec![]));
|
||||
let table_schema = RawSchema::new(vec![]);
|
||||
let table = table_engine
|
||||
.create_table(
|
||||
&EngineContext {},
|
||||
@@ -186,7 +186,7 @@ mod tests {
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
desc: None,
|
||||
schema: table_schema.clone(),
|
||||
schema: table_schema,
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: false,
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: "".to_string(),
|
||||
desc: None,
|
||||
schema: Arc::new(Schema::new(vec![])),
|
||||
schema: RawSchema::new(vec![]),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: false,
|
||||
|
||||
@@ -12,19 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::alter_expr::Kind;
|
||||
use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
|
||||
|
||||
use crate::error::{
|
||||
ColumnNotFoundSnafu, CreateSchemaSnafu, InvalidColumnDefSnafu, MissingFieldSnafu,
|
||||
MissingTimestampColumnSnafu, Result,
|
||||
ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu,
|
||||
Result,
|
||||
};
|
||||
|
||||
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
|
||||
@@ -92,7 +90,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
|
||||
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<RawSchema> {
|
||||
let column_schemas = expr
|
||||
.column_defs
|
||||
.iter()
|
||||
@@ -121,12 +119,7 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.context(CreateSchemaSnafu)?
|
||||
.build()
|
||||
.context(CreateSchemaSnafu)?,
|
||||
))
|
||||
Ok(RawSchema::new(column_schemas))
|
||||
}
|
||||
|
||||
pub fn create_expr_to_request(
|
||||
@@ -138,8 +131,11 @@ pub fn create_expr_to_request(
|
||||
.primary_keys
|
||||
.iter()
|
||||
.map(|key| {
|
||||
// We do a linear search here.
|
||||
schema
|
||||
.column_index_by_name(key)
|
||||
.column_schemas
|
||||
.iter()
|
||||
.position(|column_schema| column_schema.name == *key)
|
||||
.context(ColumnNotFoundSnafu {
|
||||
column_name: key,
|
||||
table_name: &expr.table_name,
|
||||
|
||||
@@ -40,12 +40,6 @@ pub enum Error {
|
||||
source: api::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create schema when creating table, source: {}", source))]
|
||||
CreateSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}",
|
||||
exists,
|
||||
@@ -102,9 +96,9 @@ impl ErrorExt for Error {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Error::ColumnDataType { .. } => StatusCode::Internal,
|
||||
Error::CreateSchema { .. }
|
||||
| Error::DuplicatedTimestampColumn { .. }
|
||||
| Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments,
|
||||
Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments,
|
||||
Error::CreateVector { .. } => StatusCode::InvalidArguments,
|
||||
Error::MissingField { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
@@ -522,7 +522,7 @@ mod test {
|
||||
use catalog::{CatalogList, CatalogProvider, RegisterTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datafusion::common::{DFSchema, ToDFSchema};
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::schema::RawSchema;
|
||||
use table::requests::CreateTableRequest;
|
||||
use table::test_util::{EmptyTable, MockTableEngine};
|
||||
|
||||
@@ -558,7 +558,7 @@ mod test {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: None,
|
||||
schema: Arc::new(Schema::new(supported_types())),
|
||||
schema: RawSchema::new(supported_types()),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![],
|
||||
create_if_not_exists: true,
|
||||
|
||||
@@ -177,12 +177,6 @@ pub enum Error {
|
||||
#[snafu(display("Not support SQL, error: {}", msg))]
|
||||
NotSupportSql { msg: String },
|
||||
|
||||
#[snafu(display("Failed to create schema when creating table, source: {}", source))]
|
||||
CreateSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert datafusion schema, source: {}", source))]
|
||||
ConvertSchema {
|
||||
#[snafu(backtrace)]
|
||||
@@ -370,9 +364,9 @@ impl ErrorExt for Error {
|
||||
| Error::CreateExprToRequest { source }
|
||||
| Error::InsertData { source } => source.status_code(),
|
||||
|
||||
Error::CreateSchema { source, .. }
|
||||
| Error::ConvertSchema { source, .. }
|
||||
| Error::VectorComputation { source } => source.status_code(),
|
||||
Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
Error::ColumnValuesNumberMismatch { .. }
|
||||
| Error::InvalidSql { .. }
|
||||
|
||||
@@ -86,13 +86,11 @@ impl Instance {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{column_def, ColumnDataType, ColumnDef, TableId};
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_grpc_expr::create_table_schema;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema};
|
||||
use datatypes::value::Value;
|
||||
|
||||
use super::*;
|
||||
@@ -224,7 +222,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn expected_table_schema() -> SchemaRef {
|
||||
fn expected_table_schema() -> RawSchema {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
@@ -236,11 +234,7 @@ mod tests {
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
];
|
||||
Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
RawSchema::new(column_schemas)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,13 +13,12 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::{RegisterSchemaRequest, RegisterTableRequest};
|
||||
use common_query::Output;
|
||||
use common_telemetry::tracing::info;
|
||||
use common_telemetry::tracing::log::error;
|
||||
use datatypes::schema::SchemaBuilder;
|
||||
use datatypes::schema::RawSchema;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::{ColumnOption, TableConstraint};
|
||||
@@ -31,8 +30,8 @@ use table::metadata::TableId;
|
||||
use table::requests::*;
|
||||
|
||||
use crate::error::{
|
||||
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateSchemaSnafu,
|
||||
CreateTableSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu,
|
||||
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu,
|
||||
IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu,
|
||||
RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
|
||||
};
|
||||
use crate::sql::SqlHandler;
|
||||
@@ -239,13 +238,7 @@ impl SqlHandler {
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(columns_schemas)
|
||||
.context(CreateSchemaSnafu)?
|
||||
.build()
|
||||
.context(CreateSchemaSnafu)?,
|
||||
);
|
||||
|
||||
let schema = RawSchema::new(columns_schemas);
|
||||
let request = CreateTableRequest {
|
||||
id: table_id,
|
||||
catalog_name: table_ref.catalog.to_string(),
|
||||
@@ -267,6 +260,7 @@ mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::Schema;
|
||||
use sql::dialect::GenericDialect;
|
||||
use sql::parser::ParserContext;
|
||||
use sql::statements::statement::Statement;
|
||||
@@ -320,8 +314,8 @@ mod tests {
|
||||
assert_eq!(42, c.id);
|
||||
assert!(!c.create_if_not_exists);
|
||||
assert_eq!(vec![0], c.primary_key_indices);
|
||||
assert_eq!(1, c.schema.timestamp_index().unwrap());
|
||||
assert_eq!(4, c.schema.column_schemas().len());
|
||||
assert_eq!(1, c.schema.timestamp_index.unwrap());
|
||||
assert_eq!(4, c.schema.column_schemas.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -371,7 +365,7 @@ mod tests {
|
||||
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
|
||||
.unwrap();
|
||||
assert!(c.primary_key_indices.is_empty());
|
||||
assert_eq!(c.schema.timestamp_index(), Some(1));
|
||||
assert_eq!(c.schema.timestamp_index, Some(1));
|
||||
}
|
||||
|
||||
/// Constraints specified, not column cannot be found.
|
||||
@@ -438,40 +432,25 @@ mod tests {
|
||||
assert_eq!("s".to_string(), request.schema_name);
|
||||
assert_eq!("demo".to_string(), request.table_name);
|
||||
assert!(!request.create_if_not_exists);
|
||||
assert_eq!(4, request.schema.column_schemas().len());
|
||||
assert_eq!(4, request.schema.column_schemas.len());
|
||||
|
||||
assert_eq!(vec![0], request.primary_key_indices);
|
||||
let schema = Schema::try_from(request.schema).unwrap();
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
request
|
||||
.schema
|
||||
.column_schema_by_name("host")
|
||||
.unwrap()
|
||||
.data_type
|
||||
schema.column_schema_by_name("host").unwrap().data_type
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
request
|
||||
.schema
|
||||
.column_schema_by_name("ts")
|
||||
.unwrap()
|
||||
.data_type
|
||||
schema.column_schema_by_name("ts").unwrap().data_type
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::float64_datatype(),
|
||||
request
|
||||
.schema
|
||||
.column_schema_by_name("cpu")
|
||||
.unwrap()
|
||||
.data_type
|
||||
schema.column_schema_by_name("cpu").unwrap().data_type
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::float64_datatype(),
|
||||
request
|
||||
.schema
|
||||
.column_schema_by_name("memory")
|
||||
.unwrap()
|
||||
.data_type
|
||||
schema.column_schema_by_name("memory").unwrap().data_type
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER
|
||||
use common_query::Output;
|
||||
use common_recordbatch::util;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use mito::config::EngineConfig;
|
||||
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
|
||||
use query::QueryEngineFactory;
|
||||
@@ -104,12 +104,7 @@ pub(crate) async fn create_test_table(
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some(" a test table".to_string()),
|
||||
schema: Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.expect("ts is expected to be timestamp column"),
|
||||
),
|
||||
schema: RawSchema::new(column_schemas),
|
||||
create_if_not_exists: true,
|
||||
primary_key_indices: vec![0], // "host" is in primary keys
|
||||
table_options: HashMap::new(),
|
||||
|
||||
@@ -22,15 +22,38 @@ use crate::schema::{ColumnSchema, Schema, SchemaBuilder};
|
||||
/// This struct only contains necessary data to recover the Schema.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RawSchema {
|
||||
/// Schema of columns.
|
||||
pub column_schemas: Vec<ColumnSchema>,
|
||||
/// Index of the timestamp column.
|
||||
pub timestamp_index: Option<usize>,
|
||||
/// Schema version.
|
||||
pub version: u32,
|
||||
}
|
||||
|
||||
impl RawSchema {
|
||||
/// Creates a new [RawSchema] from specific `column_schemas`.
|
||||
///
|
||||
/// Sets [RawSchema::timestamp_index] to the first index of the timestamp
|
||||
/// column. It doesn't check whether time index column is duplicate.
|
||||
pub fn new(column_schemas: Vec<ColumnSchema>) -> RawSchema {
|
||||
let timestamp_index = column_schemas
|
||||
.iter()
|
||||
.position(|column_schema| column_schema.is_time_index());
|
||||
|
||||
RawSchema {
|
||||
column_schemas,
|
||||
timestamp_index,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RawSchema> for Schema {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(raw: RawSchema) -> Result<Schema> {
|
||||
// While building Schema, we don't trust the fields, such as timestamp_index,
|
||||
// in RawSchema. We use SchemaBuilder to perform the validation.
|
||||
SchemaBuilder::try_from(raw.column_schemas)?
|
||||
.version(raw.version)
|
||||
.build()
|
||||
@@ -74,4 +97,33 @@ mod tests {
|
||||
|
||||
assert_eq!(schema, schema_new);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_raw_schema_with_time_index() {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
];
|
||||
let schema = RawSchema::new(column_schemas);
|
||||
assert_eq!(1, schema.timestamp_index.unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_raw_schema_without_time_index() {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
];
|
||||
let schema = RawSchema::new(column_schemas);
|
||||
assert!(schema.timestamp_index.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::tracing::log::info;
|
||||
use common_telemetry::{debug, logging};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{
|
||||
@@ -42,8 +42,8 @@ use tokio::sync::Mutex;
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::{
|
||||
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
|
||||
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu,
|
||||
RegionNotFoundSnafu, Result, TableExistsSnafu,
|
||||
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu,
|
||||
MissingTimestampIndexSnafu, RegionNotFoundSnafu, Result, TableExistsSnafu,
|
||||
};
|
||||
use crate::manifest::TableManifest;
|
||||
use crate::table::MitoTable;
|
||||
@@ -274,7 +274,7 @@ fn build_column_family(
|
||||
fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> {
|
||||
let ts_index = request
|
||||
.schema
|
||||
.timestamp_index()
|
||||
.timestamp_index
|
||||
.context(MissingTimestampIndexSnafu {
|
||||
table_name: &request.table_name,
|
||||
})?;
|
||||
@@ -320,18 +320,19 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
}
|
||||
|
||||
let table_schema = &request.schema;
|
||||
let table_schema =
|
||||
Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?);
|
||||
let primary_key_indices = &request.primary_key_indices;
|
||||
let (next_column_id, default_cf) = build_column_family(
|
||||
INIT_COLUMN_ID,
|
||||
table_name,
|
||||
table_schema,
|
||||
&table_schema,
|
||||
primary_key_indices,
|
||||
)?;
|
||||
let (next_column_id, row_key) = build_row_key_desc(
|
||||
next_column_id,
|
||||
table_name,
|
||||
table_schema,
|
||||
&table_schema,
|
||||
primary_key_indices,
|
||||
)?;
|
||||
|
||||
@@ -378,7 +379,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(request.schema)
|
||||
.schema(table_schema)
|
||||
.engine(MITO_ENGINE)
|
||||
.next_column_id(next_column_id)
|
||||
.primary_key_indices(request.primary_key_indices.clone())
|
||||
@@ -599,7 +600,7 @@ mod tests {
|
||||
use common_query::physical_plan::SessionContext;
|
||||
use common_recordbatch::util;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder};
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef,
|
||||
@@ -635,12 +636,7 @@ mod tests {
|
||||
.with_time_index(true),
|
||||
];
|
||||
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.expect("ts must be timestamp column"),
|
||||
);
|
||||
let schema = RawSchema::new(column_schemas);
|
||||
|
||||
let (dir, object_store) =
|
||||
test_util::new_test_object_store("test_insert_with_column_default_constraint").await;
|
||||
@@ -665,7 +661,7 @@ mod tests {
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some("a test table".to_string()),
|
||||
schema: schema.clone(),
|
||||
schema,
|
||||
create_if_not_exists: true,
|
||||
primary_key_indices: Vec::default(),
|
||||
table_options: HashMap::new(),
|
||||
@@ -770,12 +766,7 @@ mod tests {
|
||||
.with_time_index(true),
|
||||
];
|
||||
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.expect("ts must be timestamp column"),
|
||||
);
|
||||
let schema = RawSchema::new(column_schemas);
|
||||
|
||||
let mut request = CreateTableRequest {
|
||||
id: 1,
|
||||
@@ -944,7 +935,7 @@ mod tests {
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
schema: RawSchema::from(&*table_info.meta.schema),
|
||||
create_if_not_exists: true,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
@@ -961,7 +952,7 @@ mod tests {
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
schema: RawSchema::from(&*table_info.meta.schema),
|
||||
create_if_not_exists: false,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
@@ -1170,7 +1161,7 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: another_name.to_string(),
|
||||
desc: Some("another test table".to_string()),
|
||||
schema: Arc::new(schema_for_test()),
|
||||
schema: RawSchema::from(&schema_for_test()),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![0],
|
||||
create_if_not_exists: true,
|
||||
@@ -1253,7 +1244,7 @@ mod tests {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
schema: RawSchema::from(&*table_info.meta.schema),
|
||||
create_if_not_exists: true,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
@@ -1286,7 +1277,7 @@ mod tests {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
schema: RawSchema::from(&*table_info.meta.schema),
|
||||
create_if_not_exists: false,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
|
||||
@@ -184,6 +184,9 @@ pub enum Error {
|
||||
region_name: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid schema, source: {}", source))]
|
||||
InvalidRawSchema { source: datatypes::error::Error },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -207,7 +210,8 @@ impl ErrorExt for Error {
|
||||
| ProjectedColumnNotFound { .. }
|
||||
| InvalidPrimaryKey { .. }
|
||||
| MissingTimestampIndex { .. }
|
||||
| TableNotFound { .. } => StatusCode::InvalidArguments,
|
||||
| TableNotFound { .. }
|
||||
| InvalidRawSchema { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected,
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use log_store::NoopLogStore;
|
||||
use object_store::services::fs::Builder;
|
||||
@@ -109,7 +109,7 @@ fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
|
||||
schema_name: "public".to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
desc: Some("a test table".to_string()),
|
||||
schema,
|
||||
schema: RawSchema::from(&*schema),
|
||||
region_numbers: vec![0],
|
||||
create_if_not_exists: true,
|
||||
primary_key_indices: vec![0],
|
||||
|
||||
@@ -24,7 +24,7 @@ use common_recordbatch::util as record_util;
|
||||
use common_telemetry::logging;
|
||||
use common_time::util;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use datatypes::vectors::{StringVector, TimestampMillisecondVector, Vector, VectorRef};
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::QueryEngineRef;
|
||||
@@ -50,7 +50,7 @@ impl ScriptsTable {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_engine: QueryEngineRef,
|
||||
) -> Result<Self> {
|
||||
let schema = Arc::new(build_scripts_schema());
|
||||
let schema = build_scripts_schema();
|
||||
// TODO(dennis): we put scripts table into default catalog and schema.
|
||||
// maybe put into system catalog?
|
||||
let request = CreateTableRequest {
|
||||
@@ -202,7 +202,7 @@ impl ScriptsTable {
|
||||
}
|
||||
|
||||
/// Build scripts table
|
||||
fn build_scripts_schema() -> Schema {
|
||||
fn build_scripts_schema() -> RawSchema {
|
||||
let cols = vec![
|
||||
ColumnSchema::new(
|
||||
"schema".to_string(),
|
||||
@@ -242,6 +242,5 @@ fn build_scripts_schema() -> Schema {
|
||||
),
|
||||
];
|
||||
|
||||
// Schema is always valid here
|
||||
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
|
||||
RawSchema::new(cols)
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::metadata::TableId;
|
||||
@@ -45,7 +45,7 @@ pub struct CreateTableRequest {
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub desc: Option<String>,
|
||||
pub schema: SchemaRef,
|
||||
pub schema: RawSchema,
|
||||
pub region_numbers: Vec<u32>,
|
||||
pub primary_key_indices: Vec<usize>,
|
||||
pub create_if_not_exists: bool,
|
||||
|
||||
@@ -29,8 +29,9 @@ pub struct EmptyTable {
|
||||
|
||||
impl EmptyTable {
|
||||
pub fn new(req: CreateTableRequest) -> Self {
|
||||
let schema = Arc::new(req.schema.try_into().unwrap());
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(req.schema)
|
||||
.schema(schema)
|
||||
.primary_key_indices(req.primary_key_indices)
|
||||
.next_column_id(0)
|
||||
.options(req.table_options)
|
||||
|
||||
@@ -30,7 +30,7 @@ use datanode::error::{CreateTableSnafu, Result};
|
||||
use datanode::instance::{Instance, InstanceRef};
|
||||
use datanode::sql::SqlHandler;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use frontend::instance::Instance as FeInstance;
|
||||
use object_store::backend::s3;
|
||||
use object_store::services::oss;
|
||||
@@ -236,12 +236,7 @@ pub async fn create_test_table(
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some(" a test table".to_string()),
|
||||
schema: Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.expect("ts is expected to be timestamp column"),
|
||||
),
|
||||
schema: RawSchema::new(column_schemas),
|
||||
create_if_not_exists: true,
|
||||
primary_key_indices: vec![0], // "host" is in primary keys
|
||||
table_options: HashMap::new(),
|
||||
|
||||
Reference in New Issue
Block a user