From e6d0e1754b2290f6b9b347bfff92a14043120c82 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 27 Jan 2026 20:19:26 +0800 Subject: [PATCH] refactor: remove the `RawTableMeta` and `RawTableInfo` to make codes more concise Signed-off-by: luofucong --- Cargo.lock | 8 +- src/catalog/src/error.rs | 8 - src/catalog/src/kvbackend/manager.rs | 33 +- src/catalog/src/table_source.rs | 2 +- src/cli/src/bench.rs | 12 +- src/cli/src/metadata/control/test_utils.rs | 4 +- src/cli/src/metadata/repair/alter_table.rs | 10 +- src/cli/src/metadata/repair/create_table.rs | 15 +- src/cli/src/metadata/utils.rs | 4 +- src/common/grpc-expr/src/alter.rs | 6 +- src/common/meta/src/cache/table/table_info.rs | 10 +- .../alter_logical_tables/update_metadata.rs | 14 +- .../src/ddl/alter_logical_tables/validator.rs | 2 +- src/common/meta/src/ddl/alter_table.rs | 6 +- .../meta/src/ddl/alter_table/executor.rs | 11 +- .../src/ddl/alter_table/region_request.rs | 6 +- src/common/meta/src/ddl/comment_on.rs | 22 +- .../meta/src/ddl/create_logical_tables.rs | 4 +- .../src/ddl/create_logical_tables/metadata.rs | 23 +- .../create_logical_tables/region_request.rs | 6 +- src/common/meta/src/ddl/create_table.rs | 4 +- .../meta/src/ddl/create_table/executor.rs | 8 +- .../meta/src/ddl/create_table/template.rs | 34 +- src/common/meta/src/ddl/create_view.rs | 4 +- .../meta/src/ddl/drop_table/executor.rs | 4 +- src/common/meta/src/ddl/drop_view.rs | 4 +- src/common/meta/src/ddl/test_util.rs | 10 +- .../meta/src/ddl/test_util/create_table.rs | 25 +- .../src/ddl/tests/alter_logical_tables.rs | 4 +- src/common/meta/src/ddl/tests/alter_table.rs | 2 +- src/common/meta/src/ddl/tests/create_view.rs | 6 +- src/common/meta/src/ddl/truncate_table.rs | 4 +- .../meta/src/ddl/utils/raw_table_info.rs | 47 ++- src/common/meta/src/ddl/utils/table_info.rs | 4 +- src/common/meta/src/error.rs | 8 - src/common/meta/src/key.rs | 41 +-- .../meta/src/key/schema_metadata_manager.rs | 6 +- src/common/meta/src/key/table_info.rs | 22 +- .../reconcile_logical_tables.rs | 4 +- .../reconcile_regions.rs | 6 +- .../update_table_infos.rs | 6 +- .../src/reconciliation/reconcile_table.rs | 8 +- .../resolve_column_metadata.rs | 2 +- src/common/meta/src/reconciliation/utils.rs | 45 ++- src/common/meta/src/rpc/ddl.rs | 78 ++-- src/datatypes/src/schema.rs | 33 +- src/datatypes/src/schema/raw.rs | 129 ------- src/flow/src/adapter.rs | 6 +- src/flow/src/adapter/table_source.rs | 6 +- src/flow/src/adapter/util.rs | 8 +- src/flow/src/batching_mode/engine.rs | 6 +- .../src/handler/region_lease_handler.rs | 4 +- .../src/procedure/region_migration.rs | 8 +- .../downgrade_leader_region.rs | 2 +- .../src/procedure/region_migration/manager.rs | 16 +- .../region_migration/migration_start.rs | 8 +- .../region_migration/open_candidate_region.rs | 2 +- .../procedure/region_migration/test_util.rs | 6 +- .../downgrade_leader_region.rs | 4 +- .../rollback_downgraded_region.rs | 2 +- .../upgrade_candidate_region.rs | 14 +- .../upgrade_candidate_region.rs | 2 +- .../procedure/repartition/allocate_region.rs | 4 +- src/meta-srv/src/procedure/test_util.rs | 2 +- src/meta-srv/src/procedure/utils.rs | 58 ++- src/meta-srv/src/region/lease_keeper.rs | 10 +- src/operator/src/error.rs | 8 - src/operator/src/expr_helper.rs | 3 +- src/operator/src/statement/ddl.rs | 40 +- src/operator/src/statement/show.rs | 3 +- src/operator/src/tests/partition_manager.rs | 2 +- src/query/src/sql.rs | 8 +- src/table/src/metadata.rs | 342 ++++++++---------- .../standalone/common/view/create.result | 3 +- tests/cases/standalone/common/view/create.sql | 1 + 75 files changed, 562 insertions(+), 770 deletions(-) delete mode 100644 src/datatypes/src/schema/raw.rs diff --git a/Cargo.lock b/Cargo.lock index 94ef17bca2..f13c177350 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2240,7 +2240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -6347,7 +6347,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.57.0", + "windows-core 0.61.2", ] [[package]] @@ -7344,7 +7344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -15284,7 +15284,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index e7c6efc018..58c5ef96b9 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -218,13 +218,6 @@ pub enum Error { source: common_query::error::Error, }, - #[snafu(display("Invalid table info in catalog"))] - InvalidTableInfoInCatalog { - #[snafu(implicit)] - location: Location, - source: datatypes::error::Error, - }, - #[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))] QueryAccessDenied { catalog: String, schema: String }, @@ -368,7 +361,6 @@ impl ErrorExt for Error { Error::CreateTable { source, .. } => source.status_code(), Error::DecodePlan { source, .. } => source.status_code(), - Error::InvalidTableInfoInCatalog { source, .. } => source.status_code(), Error::Internal { source, .. } => source.status_code(), diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 7852142c6a..dc86c703d8 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -50,8 +50,8 @@ use tokio_stream::wrappers::ReceiverStream; use crate::CatalogManager; use crate::error::{ - CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, - ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu, + CacheNotFoundSnafu, GetTableCacheSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, + Result, TableMetadataManagerSnafu, }; use crate::information_schema::{ InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef, @@ -146,7 +146,7 @@ impl KvBackendCatalogManager { .table_info .meta .schema - .column_schemas + .column_schemas() .get(physical_index) .and_then(|physical_column| { // Find the corresponding index in the logical table schema @@ -417,7 +417,7 @@ impl CatalogManager for KvBackendCatalogManager { .into_values() .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema) .map(build_table) - .collect::>>()?; + .collect::>(); Ok(tables) } @@ -507,16 +507,12 @@ impl CatalogManager for KvBackendCatalogManager { }; for table in table_info_values.into_values().map(build_table) { - let table = if let Ok(table) = table { - Self::override_logical_table_partition_key_indices( - &table_route_cache, - metadata_manager.table_info_manager(), - table, - ) - .await - } else { - table - }; + let table = Self::override_logical_table_partition_key_indices( + &table_route_cache, + metadata_manager.table_info_manager(), + table, + ) + .await; if tx.send(table).await.is_err() { return; } @@ -530,12 +526,9 @@ impl CatalogManager for KvBackendCatalogManager { } } -fn build_table(table_info_value: TableInfoValue) -> Result { - let table_info = table_info_value - .table_info - .try_into() - .context(InvalidTableInfoInCatalogSnafu)?; - Ok(DistTable::table(Arc::new(table_info))) +fn build_table(table_info_value: TableInfoValue) -> TableRef { + let table_info = table_info_value.table_info; + DistTable::table(Arc::new(table_info)) } // TODO: This struct can hold a static map of all system tables when diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index fe69a1e72d..132e02fe14 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -343,7 +343,7 @@ mod tests { // Create view metadata table_metadata_manager .create_view_metadata( - view_info.clone().into(), + view_info.clone(), logical_plan, HashSet::new(), vec!["a".to_string(), "b".to_string()], diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index 82c460c32b..ae5765e3e0 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -32,10 +32,10 @@ use common_meta::rpc::router::{Region, RegionRoute}; use common_telemetry::info; use common_wal::options::WalOptions; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema}; +use datatypes::schema::{ColumnSchema, Schema}; use rand::Rng; use store_api::storage::RegionNumber; -use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; +use table::metadata::{TableId, TableIdent, TableInfo, TableMeta, TableType}; use table::table_name::TableName; use self::metadata::TableMetadataBencher; @@ -132,7 +132,7 @@ impl Tool for BenchTableMetadata { } } -fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { +fn create_table_info(table_id: TableId, table_name: TableName) -> TableInfo { let columns = 100; let mut column_schemas = Vec::with_capacity(columns); column_schemas.push( @@ -153,8 +153,8 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { )); } - let meta = RawTableMeta { - schema: RawSchema::new(column_schemas), + let meta = TableMeta { + schema: Arc::new(Schema::new(column_schemas)), engine: "mito".to_string(), created_on: chrono::DateTime::default(), updated_on: chrono::DateTime::default(), @@ -166,7 +166,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { column_ids: vec![], }; - RawTableInfo { + TableInfo { ident: TableIdent { table_id, version: 1, diff --git a/src/cli/src/metadata/control/test_utils.rs b/src/cli/src/metadata/control/test_utils.rs index 8f7d236ec7..54ac61ebfe 100644 --- a/src/cli/src/metadata/control/test_utils.rs +++ b/src/cli/src/metadata/control/test_utils.rs @@ -19,7 +19,7 @@ use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::rpc::store::PutRequest; use store_api::storage::{RegionId, TableId}; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; /// Puts a key-value pair into the kv backend. pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) { @@ -35,7 +35,7 @@ pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) { pub async fn prepare_physical_table_metadata( table_name: &str, table_id: TableId, -) -> (RawTableInfo, PhysicalTableRouteValue) { +) -> (TableInfo, PhysicalTableRouteValue) { let mut create_physical_table_task = test_create_physical_table_task(table_name); let table_route = PhysicalTableRouteValue::new(vec![RegionRoute { region: Region { diff --git a/src/cli/src/metadata/repair/alter_table.rs b/src/cli/src/metadata/repair/alter_table.rs index efe95c74b8..49af3cddbc 100644 --- a/src/cli/src/metadata/repair/alter_table.rs +++ b/src/cli/src/metadata/repair/alter_table.rs @@ -21,14 +21,12 @@ use common_meta::rpc::router::{RegionRoute, find_leader_regions}; use operator::expr_helper::column_schemas_to_defs; use snafu::ResultExt; use store_api::storage::{RegionId, TableId}; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::error::{CovertColumnSchemasToDefsSnafu, Result}; /// Generates alter table expression for all columns. -pub fn generate_alter_table_expr_for_all_columns( - table_info: &RawTableInfo, -) -> Result { +pub fn generate_alter_table_expr_for_all_columns(table_info: &TableInfo) -> Result { let schema = &table_info.meta.schema; let mut alter_table_expr = AlterTableExpr { @@ -42,10 +40,10 @@ pub fn generate_alter_table_expr_for_all_columns( .meta .primary_key_indices .iter() - .map(|i| schema.column_schemas[*i].name.clone()) + .map(|i| schema.column_schemas()[*i].name.clone()) .collect::>(); - let add_columns = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys) + let add_columns = column_schemas_to_defs(schema.column_schemas().to_vec(), &primary_keys) .context(CovertColumnSchemasToDefsSnafu)?; alter_table_expr.kind = Some(Kind::AddColumns(AddColumns { diff --git a/src/cli/src/metadata/repair/create_table.rs b/src/cli/src/metadata/repair/create_table.rs index b540e2175e..afa85e4409 100644 --- a/src/cli/src/metadata/repair/create_table.rs +++ b/src/cli/src/metadata/repair/create_table.rs @@ -23,23 +23,24 @@ use common_meta::rpc::router::{RegionRoute, find_leader_regions}; use operator::expr_helper::column_schemas_to_defs; use snafu::ResultExt; use store_api::storage::{RegionId, TableId}; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::error::{CovertColumnSchemasToDefsSnafu, Result}; -/// Generates a `CreateTableExpr` from a `RawTableInfo`. -pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result { +/// Generates a `CreateTableExpr` from a `TableInfo`. +pub fn generate_create_table_expr(table_info: &TableInfo) -> Result { let schema = &table_info.meta.schema; + let column_schemas = schema.column_schemas(); let primary_keys = table_info .meta .primary_key_indices .iter() - .map(|i| schema.column_schemas[*i].name.clone()) + .map(|i| column_schemas[*i].name.clone()) .collect::>(); - let timestamp_index = schema.timestamp_index.as_ref().unwrap(); - let time_index = schema.column_schemas[*timestamp_index].name.clone(); - let column_defs = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys) + let timestamp_index = schema.timestamp_index().unwrap(); + let time_index = column_schemas[timestamp_index].name.clone(); + let column_defs = column_schemas_to_defs(column_schemas.to_vec(), &primary_keys) .context(CovertColumnSchemasToDefsSnafu)?; let table_options = HashMap::from(&table_info.meta.options); diff --git a/src/cli/src/metadata/utils.rs b/src/cli/src/metadata/utils.rs index 47c6e6cb1e..35f2813a24 100644 --- a/src/cli/src/metadata/utils.rs +++ b/src/cli/src/metadata/utils.rs @@ -24,7 +24,7 @@ use common_meta::kv_backend::KvBackendRef; use futures::Stream; use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::error::{Result, TableMetadataSnafu, UnexpectedSnafu}; @@ -58,7 +58,7 @@ pub struct TableMetadataIterator { /// The full table metadata. pub struct FullTableMetadata { pub table_id: TableId, - pub table_info: RawTableInfo, + pub table_info: TableInfo, pub table_route: TableRouteValue, } diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 706f86178e..53aedd99f7 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -24,7 +24,7 @@ use api::v1::{ SkippingIndexType as PbSkippingIndexType, column_def, }; use common_query::AddColumnLocation; -use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions}; +use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SkippingIndexOptions}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::region_request::{SetRegionOption, UnsetRegionOption}; use table::metadata::{TableId, TableMeta}; @@ -268,7 +268,7 @@ pub fn alter_expr_to_request( Ok(request) } -pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result { +pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result { let column_schemas = expr .column_defs .iter() @@ -300,7 +300,7 @@ pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> }) .collect::>(); - Ok(RawSchema::new(column_schemas)) + Ok(Schema::new(column_schemas)) } fn parse_location(location: Option) -> Result> { diff --git a/src/common/meta/src/cache/table/table_info.rs b/src/common/meta/src/cache/table/table_info.rs index 3b967188be..b853d908e8 100644 --- a/src/common/meta/src/cache/table/table_info.rs +++ b/src/common/meta/src/cache/table/table_info.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use futures::future::BoxFuture; use moka::future::Cache; -use snafu::{OptionExt, ResultExt}; +use snafu::OptionExt; use store_api::storage::TableId; use table::metadata::TableInfo; @@ -48,15 +48,13 @@ fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer Result, RawTableInfo)>> { + ) -> Result, TableInfo)>> { let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len()); for (task, table) in self .data @@ -74,10 +74,9 @@ impl AlterLogicalTablesProcedure { &self, task: &AlterTableTask, table: &DeserializedValueWithBytes, - ) -> Result<(DeserializedValueWithBytes, RawTableInfo)> { + ) -> Result<(DeserializedValueWithBytes, TableInfo)> { // Builds new_meta - let table_info = TableInfo::try_from(table.table_info.clone()) - .context(error::ConvertRawTableInfoSnafu)?; + let table_info = table.table_info.clone(); let table_ref = task.table_ref(); let request = alter_expr_to_request( table.table_info.ident.table_id, @@ -98,9 +97,8 @@ impl AlterLogicalTablesProcedure { new_table.meta = new_meta; new_table.ident.version = version; - let mut raw_table_info = RawTableInfo::from(new_table); - raw_table_info.sort_columns(); + new_table.sort_columns(); - Ok((table.clone(), raw_table_info)) + Ok((table.clone(), new_table)) } } diff --git a/src/common/meta/src/ddl/alter_logical_tables/validator.rs b/src/common/meta/src/ddl/alter_logical_tables/validator.rs index d9d8581125..7e6c130351 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/validator.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/validator.rs @@ -239,7 +239,7 @@ fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|c| &c.name) .collect::>(); diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 3e913cae29..14bff50afb 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -34,7 +34,7 @@ use snafu::{ResultExt, ensure}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId, TableInfo}; +use table::metadata::{TableId, TableInfo}; use table::table_reference::TableReference; use crate::ddl::DdlContext; @@ -288,7 +288,7 @@ impl AlterTableProcedure { &self.context.table_metadata_manager, table_info_value, self.data.region_distribution.as_ref(), - new_info.into(), + new_info, &self.data.column_metadatas, ) .await?; @@ -426,7 +426,7 @@ impl AlterTableData { self.table_id } - fn table_info(&self) -> Option<&RawTableInfo> { + fn table_info(&self) -> Option<&TableInfo> { self.table_info_value .as_ref() .map(|value| &value.table_info) diff --git a/src/common/meta/src/ddl/alter_table/executor.rs b/src/common/meta/src/ddl/alter_table/executor.rs index 5e44023f35..82a0239f50 100644 --- a/src/common/meta/src/ddl/alter_table/executor.rs +++ b/src/common/meta/src/ddl/alter_table/executor.rs @@ -26,7 +26,7 @@ use futures::future; use snafu::{ResultExt, ensure}; use store_api::metadata::ColumnMetadata; use store_api::storage::{RegionId, TableId}; -use table::metadata::{RawTableInfo, TableInfo}; +use table::metadata::TableInfo; use table::requests::AlterKind; use table::table_name::TableName; @@ -107,7 +107,7 @@ impl AlterTableExecutor { /// Building the new table info here allows us to catch any issues with the /// alteration before committing metadata changes. pub(crate) fn validate_alter_table_expr( - table_info: &RawTableInfo, + table_info: &TableInfo, alter_table_expr: AlterTableExpr, ) -> Result { build_new_table_info(table_info, alter_table_expr) @@ -119,7 +119,7 @@ impl AlterTableExecutor { table_metadata_manager: &TableMetadataManagerRef, current_table_info_value: &DeserializedValueWithBytes, region_distribution: Option<&RegionDistribution>, - mut raw_table_info: RawTableInfo, + mut raw_table_info: TableInfo, column_metadatas: &[ColumnMetadata], ) -> Result<()> { let table_ref = self.table.table_ref(); @@ -261,11 +261,10 @@ pub(crate) fn make_alter_region_request( /// `next_column_id` by the number of columns being added, which may result in gaps /// in the column id sequence. fn build_new_table_info( - table_info: &RawTableInfo, + table_info: &TableInfo, alter_table_expr: AlterTableExpr, ) -> Result { - let table_info = - TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?; + let table_info = table_info.clone(); let schema_name = &table_info.schema_name; let catalog_name = &table_info.catalog_name; let table_name = &table_info.name; diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index 3a6f541cb1..1b3dafdd3a 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -19,7 +19,7 @@ use api::v1::region::{ AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, alter_request, }; use snafu::OptionExt; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::ddl::alter_table::AlterTableProcedure; use crate::error::{self, InvalidProtoMsgSnafu, Result}; @@ -43,7 +43,7 @@ impl AlterTableProcedure { /// It always adds column if not exists and drops column if exists. /// It skips the column if it already exists in the table. fn create_proto_alter_kind( - table_info: &RawTableInfo, + table_info: &TableInfo, alter_kind: &Kind, ) -> Result> { match alter_kind { @@ -52,7 +52,7 @@ fn create_proto_alter_kind( let existing_columns: HashSet<_> = table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|col| &col.name) .collect(); diff --git a/src/common/meta/src/ddl/comment_on.rs b/src/common/meta/src/ddl/comment_on.rs index 37b614ba5e..65c2cbc0ec 100644 --- a/src/common/meta/src/ddl/comment_on.rs +++ b/src/common/meta/src/ddl/comment_on.rs @@ -12,18 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use async_trait::async_trait; use chrono::Utc; use common_catalog::format_full_table_name; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::tracing::info; -use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY; +use datatypes::schema::{COMMENT_KEY as COLUMN_COMMENT_KEY, Schema}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::TableId; use strum::AsRefStr; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY; use table::table_name::TableName; @@ -146,7 +148,7 @@ impl CommentOnProcedure { .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .any(|col| &col.name == column_name); @@ -175,7 +177,7 @@ impl CommentOnProcedure { .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .find(|col| &col.name == column_name) .unwrap(); // Safe: validated above @@ -276,16 +278,18 @@ impl CommentOnProcedure { let mut new_table_info = table_info_value.table_info.clone(); let column_name = self.data.column_name.as_ref().unwrap(); - let column_schema = new_table_info - .meta - .schema - .column_schemas + let mut column_schemas = new_table_info.meta.schema.column_schemas().to_vec(); + let column_schema = column_schemas .iter_mut() .find(|col| &col.name == column_name) .unwrap(); // Safe: validated in prepare update_column_comment_metadata(column_schema, self.data.comment.clone()); + new_table_info.meta.schema = Arc::new(Schema::new_with_version( + column_schemas, + new_table_info.meta.schema.version(), + )); self.update_table_info(table_info_value, new_table_info) .await?; @@ -328,7 +332,7 @@ impl CommentOnProcedure { async fn update_table_info( &self, current_table_info: &DeserializedValueWithBytes, - new_table_info: RawTableInfo, + new_table_info: TableInfo, ) -> Result<()> { let table_id = current_table_info.table_info.ident.table_id; let new_table_info_value = current_table_info.update(new_table_info); diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 0fcb5c8d62..78f6e225b7 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -32,7 +32,7 @@ use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::storage::RegionNumber; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use crate::ddl::DdlContext; use crate::ddl::utils::{ @@ -279,7 +279,7 @@ impl CreateTablesData { /// Returns the remaining tasks. /// The length of tasks must be greater than 0. - fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> { + fn remaining_tasks(&self) -> Vec<(TableInfo, TableRouteValue)> { self.tasks .iter() .zip(self.table_ids_already_exists.iter()) diff --git a/src/common/meta/src/ddl/create_logical_tables/metadata.rs b/src/common/meta/src/ddl/create_logical_tables/metadata.rs index 339c3b463b..2242466894 100644 --- a/src/common/meta/src/ddl/create_logical_tables/metadata.rs +++ b/src/common/meta/src/ddl/create_logical_tables/metadata.rs @@ -13,9 +13,10 @@ // limitations under the License. use std::collections::HashSet; +use std::sync::Arc; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema}; +use datatypes::schema::{ColumnSchema, Schema}; use snafu::OptionExt; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; @@ -51,7 +52,7 @@ impl CreateLogicalTablesProcedure { .partition_key_indices .iter() .map(|&idx| { - physical_table_info.table_info.meta.schema.column_schemas[idx] + physical_table_info.table_info.meta.schema.column_schemas()[idx] .name .clone() }) @@ -72,14 +73,9 @@ impl CreateLogicalTablesProcedure { for task in &mut self.data.tasks { // Get existing column names in the logical table - let existing_column_names: HashSet<_> = task - .table_info - .meta - .schema - .column_schemas - .iter() - .map(|c| &c.name) - .collect(); + let column_schemas = task.table_info.meta.schema.column_schemas(); + let existing_column_names: HashSet<_> = + column_schemas.iter().map(|c| &c.name).collect(); let mut new_columns = Vec::new(); let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone(); @@ -87,8 +83,7 @@ impl CreateLogicalTablesProcedure { // Add missing partition columns for partition_column in partition_columns { if !existing_column_names.contains(partition_column) { - let new_column_index = - task.table_info.meta.schema.column_schemas.len() + new_columns.len(); + let new_column_index = column_schemas.len() + new_columns.len(); // Create new column schema for the partition column let column_schema = ColumnSchema::new( @@ -105,11 +100,11 @@ impl CreateLogicalTablesProcedure { // If we added new columns, update the table info if !new_columns.is_empty() { - let mut updated_columns = task.table_info.meta.schema.column_schemas.clone(); + let mut updated_columns = column_schemas.to_vec(); updated_columns.extend(new_columns); // Create new schema with updated columns - let new_schema = RawSchema::new(updated_columns); + let new_schema = Arc::new(Schema::new(updated_columns)); // Update the table info task.table_info.meta.schema = new_schema; diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index 4f1b0da7d8..4416e6c0a1 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -19,7 +19,7 @@ use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader, region use common_telemetry::debug; use common_telemetry::tracing_context::TracingContext; use store_api::storage::{RegionId, TableId}; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::template::{ @@ -95,11 +95,11 @@ pub fn create_region_request_builder( Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } -/// Builds a [CreateRequestBuilder] from a [RawTableInfo]. +/// Builds a [CreateRequestBuilder] from a [TableInfo]. /// /// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. pub fn create_region_request_builder_from_raw_table_info( - raw_table_info: &RawTableInfo, + raw_table_info: &TableInfo, physical_table_id: TableId, ) -> Result { let template = build_template_from_raw_table_info(raw_table_info)?; diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index bfa57693d0..a5b642f1a2 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::storage::RegionNumber; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use table::table_name::TableName; use table::table_reference::TableReference; pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info}; @@ -103,7 +103,7 @@ impl CreateTableProcedure { }) } - fn table_info(&self) -> &RawTableInfo { + fn table_info(&self) -> &TableInfo { &self.data.task.table_info } diff --git a/src/common/meta/src/ddl/create_table/executor.rs b/src/common/meta/src/ddl/create_table/executor.rs index 2d58085f41..841623840b 100644 --- a/src/common/meta/src/ddl/create_table/executor.rs +++ b/src/common/meta/src/ddl/create_table/executor.rs @@ -23,7 +23,7 @@ use snafu::ensure; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use table::table_name::TableName; use crate::ddl::utils::raw_table_info::update_table_info_column_ids; @@ -175,19 +175,19 @@ impl CreateTableExecutor { &self, table_metadata_manager: &TableMetadataManagerRef, region_failure_detector_controller: &RegionFailureDetectorControllerRef, - mut raw_table_info: RawTableInfo, + mut table_info: TableInfo, column_metadatas: &[ColumnMetadata], table_route: PhysicalTableRouteValue, region_wal_options: HashMap, ) -> Result<()> { if !column_metadatas.is_empty() { - update_table_info_column_ids(&mut raw_table_info, column_metadatas); + update_table_info_column_ids(&mut table_info, column_metadatas); } let detecting_regions = convert_region_routes_to_detecting_regions(&table_route.region_routes); let table_route = TableRouteValue::Physical(table_route); table_metadata_manager - .create_table_metadata(raw_table_info, table_route, region_wal_options) + .create_table_metadata(table_info, table_route, region_wal_options) .await?; region_failure_detector_controller .register_failure_detectors(detecting_regions) diff --git a/src/common/meta/src/ddl/create_table/template.rs b/src/common/meta/src/ddl/create_table/template.rs index 6cb5e66c31..40775f5d32 100644 --- a/src/common/meta/src/ddl/create_table/template.rs +++ b/src/common/meta/src/ddl/create_table/template.rs @@ -22,25 +22,25 @@ use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use crate::error::{self, Result}; use crate::reconciliation::utils::build_column_metadata_from_table_info; use crate::wal_provider::prepare_wal_options; -/// Constructs a [CreateRequest] based on the provided [RawTableInfo]. +/// Constructs a [CreateRequest] based on the provided [TableInfo]. /// /// Note: This function is primarily intended for creating logical tables. /// /// Logical table templates keep the original column order and primary key indices from -/// `RawTableInfo` (including internal columns when present), because these are used to +/// `TableInfo` (including internal columns when present), because these are used to /// reconstruct the logical schema on the engine side. -pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result { - let primary_key_indices = &raw_table_info.meta.primary_key_indices; - let column_defs = raw_table_info +pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result { + let primary_key_indices = &table_info.meta.primary_key_indices; + let column_defs = table_info .meta .schema - .column_schemas + .column_schemas() .iter() .enumerate() .map(|(i, c)| { @@ -56,12 +56,12 @@ pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Resu }) .collect::>>()?; - let options = HashMap::from(&raw_table_info.meta.options); + let options = HashMap::from(&table_info.meta.options); let template = CreateRequest { region_id: 0, - engine: raw_table_info.meta.engine.clone(), + engine: table_info.meta.engine.clone(), column_defs, - primary_key: raw_table_info + primary_key: table_info .meta .primary_key_indices .iter() @@ -75,21 +75,21 @@ pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Resu Ok(template) } -/// Constructs a [CreateRequest] based on the provided [RawTableInfo] for physical table. +/// Constructs a [CreateRequest] based on the provided [TableInfo] for physical table. /// /// Note: This function is primarily intended for creating physical table. /// /// Physical table templates mark primary /// keys by tag semantic type to match the physical storage layout. pub fn build_template_from_raw_table_info_for_physical_table( - raw_table_info: &RawTableInfo, + table_info: &TableInfo, ) -> Result { - let name_to_ids = raw_table_info + let name_to_ids = table_info .name_to_ids() .context(error::MissingColumnIdsSnafu)?; let column_metadatas = build_column_metadata_from_table_info( - &raw_table_info.meta.schema.column_schemas, - &raw_table_info.meta.primary_key_indices, + table_info.meta.schema.column_schemas(), + &table_info.meta.primary_key_indices, &name_to_ids, )?; let primary_key_ids = column_metadatas @@ -115,10 +115,10 @@ pub fn build_template_from_raw_table_info_for_physical_table( }) .collect::>>()?; - let options = HashMap::from(&raw_table_info.meta.options); + let options = HashMap::from(&table_info.meta.options); let template = CreateRequest { region_id: 0, - engine: raw_table_info.meta.engine.clone(), + engine: table_info.meta.engine.clone(), column_defs, primary_key: primary_key_ids, path: String::new(), diff --git a/src/common/meta/src/ddl/create_view.rs b/src/common/meta/src/ddl/create_view.rs index 8392362042..af71bf96e0 100644 --- a/src/common/meta/src/ddl/create_view.rs +++ b/src/common/meta/src/ddl/create_view.rs @@ -19,7 +19,7 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId, TableType}; +use table::metadata::{TableId, TableInfo, TableType}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; @@ -58,7 +58,7 @@ impl CreateViewProcedure { Ok(CreateViewProcedure { context, data }) } - fn view_info(&self) -> &RawTableInfo { + fn view_info(&self) -> &TableInfo { &self.data.task.view_info } diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 666e2eef6e..c342487365 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -328,7 +328,7 @@ mod tests { use api::v1::{ColumnDataType, SemanticType}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use table::metadata::RawTableInfo; + use table::metadata::TableInfo; use table::table_name::TableName; use super::*; @@ -339,7 +339,7 @@ mod tests { use crate::key::table_route::TableRouteValue; use crate::test_util::{MockDatanodeManager, new_ddl_context}; - fn test_create_raw_table_info(name: &str) -> RawTableInfo { + fn test_create_raw_table_info(name: &str) -> TableInfo { let create_table = TestCreateTableExprBuilder::default() .column_defs([ TestColumnDefBuilder::default() diff --git a/src/common/meta/src/ddl/drop_view.rs b/src/common/meta/src/ddl/drop_view.rs index 300e2cbac8..283a611f6d 100644 --- a/src/common/meta/src/ddl/drop_view.rs +++ b/src/common/meta/src/ddl/drop_view.rs @@ -21,7 +21,7 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId, TableType}; +use table::metadata::{TableId, TableInfo, TableType}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; @@ -118,7 +118,7 @@ impl DropViewProcedure { Ok(()) } - fn ensure_is_view(&self, table_info: &RawTableInfo) -> Result<()> { + fn ensure_is_view(&self, table_info: &TableInfo) -> Result<()> { ensure!( table_info.table_type == TableType::View, error::InvalidViewInfoSnafu { diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 5eef47defb..1dd1f783dc 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -33,7 +33,7 @@ use store_api::metric_engine_consts::{ METRIC_ENGINE_NAME, }; use store_api::storage::consts::ReservedColumnId; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::test_util::columns::TestColumnDefBuilder; @@ -46,7 +46,7 @@ use crate::rpc::ddl::CreateTableTask; pub async fn create_physical_table_metadata( ddl_context: &DdlContext, - table_info: RawTableInfo, + table_info: TableInfo, table_route: TableRouteValue, ) { ddl_context @@ -240,12 +240,12 @@ pub fn test_column_metadatas(tag_fields: &[&str]) -> Vec { } /// Asserts the column names. -pub fn assert_column_name(table_info: &RawTableInfo, expected_column_names: &[&str]) { +pub fn assert_column_name(table_info: &TableInfo, expected_column_names: &[&str]) { assert_eq!( table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|c| c.name.clone()) .collect::>(), @@ -266,7 +266,7 @@ pub fn assert_column_name_and_id(column_metadatas: &[ColumnMetadata], expected: } /// Gets the raw table info. -pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> RawTableInfo { +pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> TableInfo { ddl_context .table_metadata_manager .table_info_manager() diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index e407261e25..138c9814cc 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use api::v1::column_def::try_as_column_schema; use api::v1::meta::Partition; @@ -21,10 +22,10 @@ use chrono::DateTime; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, MITO2_ENGINE, }; -use datatypes::schema::RawSchema; +use datatypes::schema::Schema; use derive_builder::Builder; use store_api::storage::TableId; -use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; +use table::metadata::{TableIdent, TableInfo, TableMeta, TableType}; use table::requests::TableOptions; use crate::ddl::test_util::columns::TestColumnDefBuilder; @@ -87,9 +88,9 @@ impl From for CreateTableExpr { } } -/// Builds [RawTableInfo] from [CreateTableExpr]. -pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo { - RawTableInfo { +/// Builds [TableInfo] from [CreateTableExpr]. +pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> TableInfo { + TableInfo { ident: TableIdent { table_id: expr .table_id @@ -102,19 +103,13 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo { desc: Some(expr.desc.clone()), catalog_name: expr.catalog_name.clone(), schema_name: expr.schema_name.clone(), - meta: RawTableMeta { - schema: RawSchema { - column_schemas: expr - .column_defs + meta: TableMeta { + schema: Arc::new(Schema::new( + expr.column_defs .iter() .map(|column| try_as_column_schema(column).unwrap()) .collect(), - timestamp_index: expr - .column_defs - .iter() - .position(|column| column.semantic_type() == SemanticType::Timestamp), - version: 0, - }, + )), primary_key_indices: expr .primary_keys .iter() diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 139f90eed2..127c3e95f9 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -546,7 +546,7 @@ async fn test_on_part_duplicate_alter_request() { .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|x| x.name.clone()) .collect::>(); @@ -565,7 +565,7 @@ async fn test_on_part_duplicate_alter_request() { .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|x| x.name.clone()) .collect::>(); diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index a9ba4a0aa8..bf6a83a581 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -506,7 +506,7 @@ async fn test_on_update_metadata_add_columns() { .table_info; assert_eq!( - table_info.meta.schema.column_schemas.len() as u32, + table_info.meta.schema.column_schemas().len() as u32, table_info.meta.next_column_id ); assert_column_name(&table_info, &["ts", "host", "cpu", "my_tag3"]); diff --git a/src/common/meta/src/ddl/tests/create_view.rs b/src/common/meta/src/ddl/tests/create_view.rs index 0ab242cce0..e4fefa8944 100644 --- a/src/common/meta/src/ddl/tests/create_view.rs +++ b/src/common/meta/src/ddl/tests/create_view.rs @@ -22,7 +22,7 @@ use common_error::status_code::StatusCode; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; use table::metadata; -use table::metadata::{RawTableInfo, RawTableMeta, TableType}; +use table::metadata::{TableInfo, TableMeta, TableType}; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::create_view::CreateViewProcedure; @@ -74,7 +74,7 @@ pub(crate) fn test_create_view_task(name: &str) -> CreateViewTask { definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(), }; - let view_info = RawTableInfo { + let view_info = TableInfo { ident: metadata::TableIdent { table_id: 0, version: 0, @@ -83,7 +83,7 @@ pub(crate) fn test_create_view_task(name: &str) -> CreateViewTask { desc: None, catalog_name: expr.catalog_name.clone(), schema_name: expr.schema_name.clone(), - meta: RawTableMeta::default(), + meta: TableMeta::empty(), table_type: TableType::View, }; diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index bf46c8c9a1..40e8980a1c 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, ensure}; use store_api::storage::RegionId; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use table::table_name::TableName; use table::table_reference::TableReference; @@ -226,7 +226,7 @@ impl TruncateTableData { self.task.table_name() } - fn table_info(&self) -> &RawTableInfo { + fn table_info(&self) -> &TableInfo { &self.table_info_value.table_info } diff --git a/src/common/meta/src/ddl/utils/raw_table_info.rs b/src/common/meta/src/ddl/utils/raw_table_info.rs index c952e23469..2126b7be17 100644 --- a/src/common/meta/src/ddl/utils/raw_table_info.rs +++ b/src/common/meta/src/ddl/utils/raw_table_info.rs @@ -13,38 +13,38 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use api::v1::SemanticType; use common_telemetry::debug; use common_telemetry::tracing::warn; +use datatypes::schema::Schema; use store_api::metadata::ColumnMetadata; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; /// Generate the new physical table info. pub(crate) fn build_new_physical_table_info( - mut raw_table_info: RawTableInfo, + mut table_info: TableInfo, physical_columns: &[ColumnMetadata], -) -> RawTableInfo { +) -> TableInfo { debug!( "building new physical table info for table: {}, table_id: {}", - raw_table_info.name, raw_table_info.ident.table_id + table_info.name, table_info.ident.table_id ); - let existing_columns = raw_table_info + let existing_columns = table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|col| col.name.clone()) .collect::>(); - let primary_key_indices = &mut raw_table_info.meta.primary_key_indices; - let value_indices = &mut raw_table_info.meta.value_indices; + let primary_key_indices = &mut table_info.meta.primary_key_indices; + let value_indices = &mut table_info.meta.value_indices; value_indices.clear(); - let time_index = &mut raw_table_info.meta.schema.timestamp_index; - let columns = &mut raw_table_info.meta.schema.column_schemas; - columns.clear(); - let column_ids = &mut raw_table_info.meta.column_ids; + let column_ids = &mut table_info.meta.column_ids; column_ids.clear(); + let mut columns = Vec::with_capacity(physical_columns.len()); for (idx, col) in physical_columns.iter().enumerate() { match col.semantic_type { SemanticType::Tag => { @@ -56,7 +56,6 @@ pub(crate) fn build_new_physical_table_info( SemanticType::Field => value_indices.push(idx), SemanticType::Timestamp => { value_indices.push(idx); - *time_index = Some(idx); } } @@ -64,11 +63,11 @@ pub(crate) fn build_new_physical_table_info( column_ids.push(col.column_id); } - if let Some(time_index) = *time_index { - raw_table_info.meta.schema.column_schemas[time_index].set_time_index(); - } - - raw_table_info + table_info.meta.schema = Arc::new(Schema::new_with_version( + columns, + table_info.meta.schema.version(), + )); + table_info } /// Updates the column IDs in the table info based on the provided column metadata. @@ -77,13 +76,13 @@ pub(crate) fn build_new_physical_table_info( /// before updating the column ids. If the column metadata doesn't match the table schema, /// the table info remains unchanged. pub(crate) fn update_table_info_column_ids( - raw_table_info: &mut RawTableInfo, + table_info: &mut TableInfo, column_metadatas: &[ColumnMetadata], ) { - let mut table_column_names = raw_table_info + let mut table_column_names = table_info .meta .schema - .column_schemas + .column_schemas() .iter() .map(|c| c.name.as_str()) .collect::>(); @@ -98,7 +97,7 @@ pub(crate) fn update_table_info_column_ids( if table_column_names != column_names { warn!( "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}", - raw_table_info.name, raw_table_info.ident.table_id, table_column_names, column_names, + table_info.name, table_info.ident.table_id, table_column_names, column_names, ); return; } @@ -108,7 +107,7 @@ pub(crate) fn update_table_info_column_ids( .map(|c| (c.column_schema.name.clone(), c.column_id)) .collect::>(); - let schema = &raw_table_info.meta.schema.column_schemas; + let schema = table_info.meta.schema.column_schemas(); let mut column_ids = Vec::with_capacity(schema.len()); for column_schema in schema { if let Some(id) = name_to_id.get(&column_schema.name) { @@ -116,5 +115,5 @@ pub(crate) fn update_table_info_column_ids( } } - raw_table_info.meta.column_ids = column_ids; + table_info.meta.column_ids = column_ids; } diff --git a/src/common/meta/src/ddl/utils/table_info.rs b/src/common/meta/src/ddl/utils/table_info.rs index 7f42a383fe..61de4d443d 100644 --- a/src/common/meta/src/ddl/utils/table_info.rs +++ b/src/common/meta/src/ddl/utils/table_info.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use snafu::OptionExt; use store_api::storage::TableId; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use table::table_reference::TableReference; use crate::error::{Result, TableInfoNotFoundSnafu}; @@ -75,7 +75,7 @@ pub(crate) async fn all_logical_table_routes_have_same_physical_id( /// Returns an error if any table info value fails to update. pub(crate) async fn batch_update_table_info_values( table_metadata_manager: &TableMetadataManager, - table_info_values: Vec<(DeserializedValueWithBytes, RawTableInfo)>, + table_info_values: Vec<(DeserializedValueWithBytes, TableInfo)>, ) -> Result<()> { let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size(); if table_info_values.len() > chunk_size { diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index f048e85f2b..f95eae7e56 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -201,13 +201,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to convert RawTableInfo into TableInfo"))] - ConvertRawTableInfo { - #[snafu(implicit)] - location: Location, - source: datatypes::Error, - }, - #[snafu(display("Primary key '{key}' not found when creating region request"))] PrimaryKeyNotFound { key: String, @@ -1113,7 +1106,6 @@ impl ErrorExt for Error { | BuildTableMeta { .. } | TableRouteNotFound { .. } | TableRepartNotFound { .. } - | ConvertRawTableInfo { .. } | RegionOperatingRace { .. } | EncodeWalOptions { .. } | BuildKafkaClient { .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 4e089cd0d4..3bb9e1a559 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -138,7 +138,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::RegionNumber; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use table::table_name::TableName; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; @@ -674,7 +674,7 @@ impl TableMetadataManager { /// pub async fn create_view_metadata( &self, - view_info: RawTableInfo, + view_info: TableInfo, raw_logical_plan: Vec, table_names: HashSet, columns: Vec, @@ -747,7 +747,7 @@ impl TableMetadataManager { /// The caller MUST ensure it has the exclusive access to `TableNameKey`. pub async fn create_table_metadata( &self, - table_info: RawTableInfo, + table_info: TableInfo, table_route_value: TableRouteValue, region_wal_options: HashMap, ) -> Result<()> { @@ -834,7 +834,7 @@ impl TableMetadataManager { /// Creates metadata for multiple logical tables and return an error if different metadata exists. pub async fn create_logical_tables_metadata( &self, - tables_data: Vec<(RawTableInfo, TableRouteValue)>, + tables_data: Vec<(TableInfo, TableRouteValue)>, ) -> Result<()> { let len = tables_data.len(); let mut txns = Vec::with_capacity(3 * len); @@ -1118,7 +1118,7 @@ impl TableMetadataManager { &self, current_table_info_value: &DeserializedValueWithBytes, region_distribution: Option, - new_table_info: RawTableInfo, + new_table_info: TableInfo, ) -> Result<()> { let table_id = current_table_info_value.table_info.ident.table_id; let new_table_info_value = current_table_info_value.update(new_table_info); @@ -1213,7 +1213,7 @@ impl TableMetadataManager { pub async fn batch_update_table_info_values( &self, - table_info_value_pairs: Vec<(DeserializedValueWithBytes, RawTableInfo)>, + table_info_value_pairs: Vec<(DeserializedValueWithBytes, TableInfo)>, ) -> Result<()> { let len = table_info_value_pairs.len(); let mut txns = Vec::with_capacity(len); @@ -1477,7 +1477,7 @@ mod tests { use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::TryStreamExt; use store_api::storage::{RegionId, RegionNumber}; - use table::metadata::{RawTableInfo, TableInfo}; + use table::metadata::TableInfo; use table::table_name::TableName; use super::datanode_table::DatanodeTableKey; @@ -1571,7 +1571,7 @@ mod tests { async fn create_physical_table_metadata( table_metadata_manager: &TableMetadataManager, - table_info: RawTableInfo, + table_info: TableInfo, region_routes: Vec, region_wal_options: HashMap, ) -> Result<()> { @@ -1609,7 +1609,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let wal_provider = WalProvider::RaftEngine; let regions: Vec<_> = (0..16).collect(); let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap(); @@ -1635,7 +1635,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let region_wal_options = create_mock_region_wal_options() .into_iter() .map(|(k, v)| (k, serde_json::to_string(&v).unwrap())) @@ -1717,7 +1717,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; let table_route_value = TableRouteValue::physical(region_routes.clone()); @@ -1779,11 +1779,10 @@ mod tests { let region_id = RegionId::new(table_id, regin_number); let region_route = new_region_route(region_id.as_u64(), 2); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = test_utils::new_test_table_info_with_name( + let table_info = test_utils::new_test_table_info_with_name( table_id, &format!("my_table_{}", table_id), - ) - .into(); + ); let table_route_value = TableRouteValue::physical(region_routes.clone()); tables_data.push((table_info, table_route_value)); @@ -1802,7 +1801,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; let datanode_id = 2; let region_wal_options = create_mock_region_wal_options(); @@ -1908,7 +1907,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; // creates metadata. create_physical_table_metadata( @@ -1984,7 +1983,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; // creates metadata. create_physical_table_metadata( @@ -2069,7 +2068,7 @@ mod tests { leader_down_since: None, }, ]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; let current_table_route_value = DeserializedValueWithBytes::from_inner( TableRouteValue::physical(region_routes.clone()), @@ -2151,7 +2150,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; let engine = table_info.meta.engine.as_str(); let region_storage_path = @@ -2277,7 +2276,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = new_test_table_info().into(); + let table_info = new_test_table_info(); let table_id = table_info.ident.table_id; let engine = table_info.meta.engine.as_str(); let region_storage_path = @@ -2617,7 +2616,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let view_info: RawTableInfo = new_test_table_info().into(); + let view_info = new_test_table_info(); let view_id = view_info.ident.table_id; diff --git a/src/common/meta/src/key/schema_metadata_manager.rs b/src/common/meta/src/key/schema_metadata_manager.rs index 2b94d4c56f..d49b83841b 100644 --- a/src/common/meta/src/key/schema_metadata_manager.rs +++ b/src/common/meta/src/key/schema_metadata_manager.rs @@ -65,14 +65,14 @@ impl SchemaMetadataManager { schema_value: Option, kv_backend: crate::kv_backend::KvBackendRef, ) { - use table::metadata::{RawTableInfo, TableType}; - let value = crate::key::table_info::TableInfoValue::new(RawTableInfo { + use table::metadata::{TableInfo, TableType}; + let value = crate::key::table_info::TableInfoValue::new(TableInfo { ident: Default::default(), name: table_name.to_string(), desc: None, catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), - meta: Default::default(), + meta: table::metadata::TableMeta::empty(), table_type: TableType::Base, }); let table_info_manager = crate::key::table_info::TableInfoManager::new(kv_backend.clone()); diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 29a40d8222..8532e1bd4d 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use snafu::OptionExt; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use table::table_name::TableName; use table::table_reference::TableReference; @@ -82,19 +82,19 @@ impl MetadataKey<'_, TableInfoKey> for TableInfoKey { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TableInfoValue { - pub table_info: RawTableInfo, + pub table_info: TableInfo, version: u64, } impl TableInfoValue { - pub fn new(table_info: RawTableInfo) -> Self { + pub fn new(table_info: TableInfo) -> Self { Self { table_info, version: 0, } } - pub fn update(&self, new_table_info: RawTableInfo) -> Self { + pub fn update(&self, new_table_info: TableInfo) -> Self { Self { table_info: new_table_info, version: self.version + 1, @@ -103,7 +103,7 @@ impl TableInfoValue { pub(crate) fn with_update(&self, update: F) -> Self where - F: FnOnce(&mut RawTableInfo), + F: FnOnce(&mut TableInfo), { let mut new_table_info = self.table_info.clone(); update(&mut new_table_info); @@ -280,8 +280,8 @@ impl TableInfoManager { mod tests { use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema, Schema}; - use table::metadata::{RawTableMeta, TableIdent, TableType}; + use datatypes::schema::{ColumnSchema, Schema}; + use table::metadata::{TableIdent, TableMeta, TableType}; use super::*; @@ -322,15 +322,15 @@ mod tests { assert_eq!(value, deserialized); } - fn new_table_info(table_id: TableId) -> RawTableInfo { + fn new_table_info(table_id: TableId) -> TableInfo { let schema = Schema::new(vec![ColumnSchema::new( "name", ConcreteDataType::string_datatype(), true, )]); - let meta = RawTableMeta { - schema: RawSchema::from(&schema), + let meta = TableMeta { + schema: Arc::new(schema), engine: "mito".to_string(), created_on: chrono::DateTime::default(), updated_on: chrono::DateTime::default(), @@ -342,7 +342,7 @@ mod tests { column_ids: vec![], }; - RawTableInfo { + TableInfo { ident: TableIdent { table_id, version: 1, diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables.rs index a067767c72..19d9045d96 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables.rs @@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::storage::TableId; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use table::table_name::TableName; use crate::cache_invalidator::CacheInvalidatorRef; @@ -107,7 +107,7 @@ pub(crate) struct PersistentContext { pub(crate) update_table_infos: Vec<(TableId, Vec)>, // The table infos to be created. // The value will be set in `ResolveTableMetadatas` state. - pub(crate) create_tables: Vec<(TableId, RawTableInfo)>, + pub(crate) create_tables: Vec<(TableId, TableInfo)>, // Whether the procedure is a subprocedure. pub(crate) is_subprocedure: bool, } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs index 17abfe70d1..dd9fd00fff 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -22,7 +22,7 @@ use common_telemetry::tracing_context::TracingContext; use futures::future; use serde::{Deserialize, Serialize}; use store_api::storage::{RegionId, RegionNumber, TableId}; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path}; use crate::ddl::{CreateRequestBuilder, build_template_from_raw_table_info}; @@ -147,10 +147,10 @@ impl ReconcileRegions { /// /// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. fn create_region_request_from_raw_table_info( - raw_table_info: &RawTableInfo, + table_info: &TableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info)?; + let template = build_template_from_raw_table_info(table_info)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs index e82d210573..d0de7a06fb 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs @@ -20,7 +20,7 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use store_api::metadata::ColumnMetadata; use store_api::storage::TableId; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use table::table_name::TableName; use table::table_reference::TableReference; @@ -138,8 +138,8 @@ impl UpdateTableInfos { fn build_new_table_info( table_id: TableId, column_metadatas: &[ColumnMetadata], - table_info: &RawTableInfo, - ) -> Result { + table_info: &TableInfo, + ) -> Result { let table_ref = table_info.table_ref(); let table_meta = build_table_meta_from_column_metadatas( table_id, diff --git a/src/common/meta/src/reconciliation/reconcile_table.rs b/src/common/meta/src/reconciliation/reconcile_table.rs index 15b3154f84..004b93d38c 100644 --- a/src/common/meta/src/reconciliation/reconcile_table.rs +++ b/src/common/meta/src/reconciliation/reconcile_table.rs @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::storage::TableId; -use table::metadata::RawTableMeta; +use table::metadata::TableMeta; use table::table_name::TableName; use tonic::async_trait; @@ -78,11 +78,11 @@ impl ReconcileTableContext { self.persistent_ctx.table_id } - /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. + /// Builds a [`TableMeta`] from the provided [`ColumnMetadata`]s. pub(crate) fn build_table_meta( &self, column_metadatas: &[ColumnMetadata], - ) -> Result { + ) -> Result { // Safety: The table info value is set in `ReconciliationStart` state. let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap(); let table_id = self.table_id(); @@ -145,7 +145,7 @@ impl PersistentContext { #[derive(Default)] pub(crate) struct VolatileContext { - pub(crate) table_meta: Option, + pub(crate) table_meta: Option, pub(crate) metrics: ReconcileTableMetrics, } diff --git a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs index ee4324102b..d45cfe464e 100644 --- a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs +++ b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs @@ -118,7 +118,7 @@ impl State for ResolveColumnMetadata { .name_to_ids() .context(MissingColumnIdsSnafu)?; let column_metadata = build_column_metadata_from_table_info( - &table_info_value.table_info.meta.schema.column_schemas, + table_info_value.table_info.meta.schema.column_schemas(), &table_info_value.table_info.meta.primary_key_indices, &name_to_ids, )?; diff --git a/src/common/meta/src/reconciliation/utils.rs b/src/common/meta/src/reconciliation/utils.rs index 76b32bc2a6..4debc6de4d 100644 --- a/src/common/meta/src/reconciliation/utils.rs +++ b/src/common/meta/src/reconciliation/utils.rs @@ -15,18 +15,19 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; use std::ops::AddAssign; +use std::sync::Arc; use std::time::Instant; use api::v1::SemanticType; use common_procedure::{Context as ProcedureContext, ProcedureId, watcher}; use common_telemetry::{error, warn}; -use datatypes::schema::ColumnSchema; +use datatypes::schema::{ColumnSchema, Schema}; use futures::future::{join_all, try_join_all}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::consts::ReservedColumnId; use store_api::storage::{RegionId, TableId}; -use table::metadata::{RawTableInfo, RawTableMeta}; +use table::metadata::{TableInfo, TableMeta}; use table::table_name::TableName; use table::table_reference::TableReference; @@ -272,7 +273,7 @@ pub(crate) fn check_column_metadata_invariants( Ok(()) } -/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. +/// Builds a [`TableMeta`] from the provided [`ColumnMetadata`]s. /// /// Returns an error if: /// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided). @@ -284,23 +285,24 @@ pub(crate) fn check_column_metadata_invariants( pub(crate) fn build_table_meta_from_column_metadatas( table_id: TableId, table_ref: TableReference, - table_meta: &RawTableMeta, + table_meta: &TableMeta, name_to_ids: Option>, column_metadata: &[ColumnMetadata], -) -> Result { +) -> Result { let column_in_column_metadata = column_metadata .iter() .map(|c| (c.column_schema.name.as_str(), c)) .collect::>(); + let column_schemas = table_meta.schema.column_schemas(); let primary_key_names = table_meta .primary_key_indices .iter() - .map(|i| table_meta.schema.column_schemas[*i].name.as_str()) + .map(|i| column_schemas[*i].name.as_str()) .collect::>(); let partition_key_names = table_meta .partition_key_indices .iter() - .map(|i| table_meta.schema.column_schemas[*i].name.as_str()) + .map(|i| column_schemas[*i].name.as_str()) .collect::>(); ensure!( column_metadata @@ -351,14 +353,12 @@ pub(crate) fn build_table_meta_from_column_metadatas( let primary_key_indices = &mut new_raw_table_meta.primary_key_indices; let partition_key_indices = &mut new_raw_table_meta.partition_key_indices; let value_indices = &mut new_raw_table_meta.value_indices; - let time_index = &mut new_raw_table_meta.schema.timestamp_index; - let columns = &mut new_raw_table_meta.schema.column_schemas; + let mut columns = Vec::with_capacity(column_metadata.len()); let column_ids = &mut new_raw_table_meta.column_ids; let next_column_id = &mut new_raw_table_meta.next_column_id; column_ids.clear(); value_indices.clear(); - columns.clear(); primary_key_indices.clear(); partition_key_indices.clear(); @@ -375,7 +375,6 @@ pub(crate) fn build_table_meta_from_column_metadatas( } SemanticType::Timestamp => { value_indices.push(idx); - *time_index = Some(idx); } } @@ -391,10 +390,10 @@ pub(crate) fn build_table_meta_from_column_metadatas( .unwrap_or(*next_column_id) .max(*next_column_id); - if let Some(time_index) = *time_index { - new_raw_table_meta.schema.column_schemas[time_index].set_time_index(); - } - + new_raw_table_meta.schema = Arc::new(Schema::new_with_version( + columns, + table_meta.schema.version(), + )); Ok(new_raw_table_meta) } @@ -403,10 +402,10 @@ pub(crate) fn build_table_meta_from_column_metadatas( /// The logical table only support to add columns, so we can check the length of column metadatas /// to determine whether the logical table info needs to be updated. pub(crate) fn need_update_logical_table_info( - table_info: &RawTableInfo, + table_info: &TableInfo, column_metadatas: &[ColumnMetadata], ) -> bool { - table_info.meta.schema.column_schemas.len() != column_metadatas.len() + table_info.meta.schema.column_schemas().len() != column_metadatas.len() } /// The result of waiting for inflight subprocedures. @@ -959,7 +958,7 @@ mod tests { use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; - use table::metadata::{RawTableMeta, TableMetaBuilder}; + use table::metadata::TableMetaBuilder; use table::table_reference::TableReference; use super::*; @@ -1010,17 +1009,15 @@ mod tests { ] } - fn new_test_raw_table_info() -> RawTableMeta { + fn new_test_raw_table_info() -> TableMeta { let mut table_meta_builder = TableMetaBuilder::empty(); - let table_meta = table_meta_builder + table_meta_builder .schema(Arc::new(new_test_schema())) .primary_key_indices(vec![0]) .partition_key_indices(vec![2]) .next_column_id(4) .build() - .unwrap(); - - table_meta.into() + .unwrap() } #[test] @@ -1081,7 +1078,7 @@ mod tests { assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]); assert_eq!(new_table_meta.partition_key_indices, vec![2]); assert_eq!(new_table_meta.value_indices, vec![1, 2]); - assert_eq!(new_table_meta.schema.timestamp_index, Some(1)); + assert_eq!(new_table_meta.schema.timestamp_index(), Some(1)); assert_eq!( new_table_meta.column_ids, vec![0, 1, 2, ReservedColumnId::table_id()] diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index ddb40ee5ec..e5293a78a3 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -48,7 +48,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{DefaultOnNull, serde_as}; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{TableId, TableInfo}; use table::requests::validate_database_option; use table::table_name::TableName; use table::table_reference::TableReference; @@ -103,13 +103,13 @@ impl DdlTask { pub fn new_create_table( expr: CreateTableExpr, partitions: Vec, - table_info: RawTableInfo, + table_info: TableInfo, ) -> Self { DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info)) } /// Creates a [`DdlTask`] to create several logical tables. - pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self { + pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self { DdlTask::CreateLogicalTables( table_data .into_iter() @@ -197,7 +197,7 @@ impl DdlTask { } /// Creates a [`DdlTask`] to create a view. - pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self { + pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self { DdlTask::CreateView(CreateViewTask { create_view, view_info, @@ -415,7 +415,7 @@ impl From for PbDdlTaskResponse { #[derive(Debug, PartialEq, Clone)] pub struct CreateViewTask { pub create_view: CreateViewExpr, - pub view_info: RawTableInfo, + pub view_info: TableInfo, } impl CreateViewTask { @@ -648,7 +648,7 @@ impl From for PbDropTableTask { pub struct CreateTableTask { pub create_table: CreateTableExpr, pub partitions: Vec, - pub table_info: RawTableInfo, + pub table_info: TableInfo, } impl TryFrom for CreateTableTask { @@ -683,7 +683,7 @@ impl CreateTableTask { pub fn new( expr: CreateTableExpr, partitions: Vec, - table_info: RawTableInfo, + table_info: TableInfo, ) -> CreateTableTask { CreateTableTask { create_table: expr, @@ -717,7 +717,7 @@ impl CreateTableTask { self.table_info.ident.table_id = table_id; } - /// Sort the columns in [CreateTableExpr] and [RawTableInfo]. + /// Sort the columns in [CreateTableExpr] and [TableInfo]. /// /// This function won't do any check or verification. Caller should /// ensure this task is valid. @@ -1597,10 +1597,10 @@ mod tests { use std::sync::Arc; use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType}; - use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder}; + use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::storage::ConcreteDataType; - use table::metadata::{RawTableInfo, RawTableMeta, TableType}; + use table::metadata::{TableInfo, TableMeta, TableType}; use table::test_util::table_info::test_table_info; use super::{AlterTableTask, CreateTableTask, *}; @@ -1609,11 +1609,7 @@ mod tests { fn test_basic_ser_de_create_table_task() { let schema = SchemaBuilder::default().build().unwrap(); let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema)); - let task = CreateTableTask::new( - CreateTableExpr::default(), - Vec::new(), - RawTableInfo::from(table_info), - ); + let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info); let output = serde_json::to_vec(&task).unwrap(); @@ -1636,32 +1632,28 @@ mod tests { #[test] fn test_sort_columns() { // construct RawSchema - let raw_schema = RawSchema { - column_schemas: vec![ - ColumnSchema::new( - "column3".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - ColumnSchema::new( - "column1".to_string(), - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new( - "column2".to_string(), - ConcreteDataType::float64_datatype(), - true, - ), - ], - timestamp_index: Some(1), - version: 0, - }; + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new( + "column3".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "column1".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "column2".to_string(), + ConcreteDataType::float64_datatype(), + true, + ), + ])); // construct RawTableMeta - let raw_table_meta = RawTableMeta { - schema: raw_schema, + let meta = TableMeta { + schema, primary_key_indices: vec![0], value_indices: vec![2], engine: METRIC_ENGINE_NAME.to_string(), @@ -1673,10 +1665,10 @@ mod tests { column_ids: Default::default(), }; - // construct RawTableInfo - let raw_table_info = RawTableInfo { + // construct TableInfo + let raw_table_info = TableInfo { ident: Default::default(), - meta: raw_table_meta, + meta, name: Default::default(), desc: Default::default(), catalog_name: Default::default(), @@ -1729,7 +1721,7 @@ mod tests { // Assert that the table_info is updated correctly assert_eq!( - create_table_task.table_info.meta.schema.timestamp_index, + create_table_task.table_info.meta.schema.timestamp_index(), Some(0) ); assert_eq!( diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 46305d0717..9070e2babe 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -14,7 +14,6 @@ mod column_schema; pub mod constraint; -mod raw; use std::collections::HashMap; use std::fmt; @@ -22,6 +21,7 @@ use std::sync::Arc; use arrow::datatypes::{Field, Schema as ArrowSchema}; use datafusion_common::DFSchemaRef; +use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ResultExt, ensure}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; @@ -40,7 +40,6 @@ pub use crate::schema::column_schema::{ VectorIndexOptions, }; pub use crate::schema::constraint::ColumnDefaultConstraint; -pub use crate::schema::raw::RawSchema; /// Key used to store version number of the schema in metadata. pub const VERSION_KEY: &str = "greptime:version"; @@ -48,15 +47,18 @@ pub const VERSION_KEY: &str = "greptime:version"; pub const TYPE_KEY: &str = "greptime:type"; /// A common schema, should be immutable. -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq, Serialize)] pub struct Schema { column_schemas: Vec, + #[serde(skip_serializing)] name_to_index: HashMap, + #[serde(skip_serializing)] arrow_schema: Arc, /// Index of the timestamp key column. /// /// Timestamp key column is the column holds the timestamp and forms part of /// the primary key. None means there is no timestamp key column. + #[serde(skip_serializing)] timestamp_index: Option, /// Version of the schema. /// @@ -64,6 +66,26 @@ pub struct Schema { version: u32, } +impl<'de> Deserialize<'de> for Schema { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + + #[derive(Deserialize)] + struct RawSchema { + column_schemas: Vec, + version: u32, + } + let raw = RawSchema::deserialize(deserializer)?; + + SchemaBuilder::try_from(raw.column_schemas) + .and_then(|x| x.version(raw.version).build()) + .map_err(D::Error::custom) + } +} + impl fmt::Debug for Schema { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Schema") @@ -85,8 +107,13 @@ impl Schema { /// Panics when ColumnSchema's `default_constraint` can't be serialized into json. pub fn new(column_schemas: Vec) -> Schema { // Builder won't fail in this case + Self::new_with_version(column_schemas, Self::INITIAL_VERSION) + } + + pub fn new_with_version(column_schemas: Vec, version: u32) -> Schema { SchemaBuilder::try_from(column_schemas) .unwrap() + .version(version) .build() .unwrap() } diff --git a/src/datatypes/src/schema/raw.rs b/src/datatypes/src/schema/raw.rs deleted file mode 100644 index eaca9617a0..0000000000 --- a/src/datatypes/src/schema/raw.rs +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use serde::{Deserialize, Serialize}; - -use crate::error::{Error, Result}; -use crate::schema::{ColumnSchema, Schema, SchemaBuilder}; - -/// Struct used to serialize and deserialize [`Schema`](crate::schema::Schema). -/// -/// This struct only contains necessary data to recover the Schema. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -pub struct RawSchema { - /// Schema of columns. - pub column_schemas: Vec, - /// Index of the timestamp column. - pub timestamp_index: Option, - /// Schema version. - pub version: u32, -} - -impl RawSchema { - /// Creates a new [RawSchema] from specific `column_schemas`. - /// - /// Sets [RawSchema::timestamp_index] to the first index of the timestamp - /// column. It doesn't check whether time index column is duplicate. - pub fn new(column_schemas: Vec) -> RawSchema { - let timestamp_index = column_schemas - .iter() - .position(|column_schema| column_schema.is_time_index()); - - RawSchema { - column_schemas, - timestamp_index, - version: 0, - } - } -} - -impl TryFrom for Schema { - type Error = Error; - - fn try_from(raw: RawSchema) -> Result { - // While building Schema, we don't trust the fields, such as timestamp_index, - // in RawSchema. We use SchemaBuilder to perform the validation. - SchemaBuilder::try_from(raw.column_schemas)? - .version(raw.version) - .build() - } -} - -impl From<&Schema> for RawSchema { - fn from(schema: &Schema) -> RawSchema { - RawSchema { - column_schemas: schema.column_schemas.clone(), - timestamp_index: schema.timestamp_index, - version: schema.version, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::data_type::ConcreteDataType; - - #[test] - fn test_raw_convert() { - let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ]; - let schema = SchemaBuilder::try_from(column_schemas) - .unwrap() - .version(123) - .build() - .unwrap(); - - let raw = RawSchema::from(&schema); - let schema_new = Schema::try_from(raw).unwrap(); - - assert_eq!(schema, schema_new); - } - - #[test] - fn test_new_raw_schema_with_time_index() { - let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ]; - let schema = RawSchema::new(column_schemas); - assert_eq!(1, schema.timestamp_index.unwrap()); - } - - #[test] - fn test_new_raw_schema_without_time_index() { - let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ]; - let schema = RawSchema::new(column_schemas); - assert!(schema.timestamp_index.is_none()); - } -} diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index c6a1a2dcd8..2091c53d1d 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -485,13 +485,13 @@ impl StreamingEngine { .await? .unwrap(); let meta = table_info.table_info.meta; + let schema = meta.schema.column_schemas().to_vec(); let primary_keys = meta .primary_key_indices .into_iter() - .map(|i| meta.schema.column_schemas[i].name.clone()) + .map(|i| schema[i].name.clone()) .collect_vec(); - let schema = meta.schema.column_schemas; - let time_index = meta.schema.timestamp_index; + let time_index = meta.schema.timestamp_index(); Ok(Some((primary_keys, time_index, schema))) } else { Ok(None) diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index b2bdded896..ca0858df27 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -124,14 +124,14 @@ impl ManagedTableSource { .context(UnexpectedSnafu { reason: format!("Table id = {:?}, couldn't found table info", table_id), })?; - let raw_schema = &info.table_info.meta.schema; - let Some(ts_index) = raw_schema.timestamp_index else { + let schema = &info.table_info.meta.schema; + let Some(ts_index) = schema.timestamp_index() else { UnexpectedSnafu { reason: format!("Table id = {:?}, couldn't found timestamp index", table_id), } .fail()? }; - let col_schema = raw_schema.column_schemas[ts_index].clone(); + let col_schema = schema.column_schemas()[ts_index].clone(); Ok((ts_index, col_schema)) } diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index 661b4406a1..80b3f5ae19 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -145,8 +145,8 @@ pub fn table_info_value_to_relation_desc( ) -> Result { let raw_schema = table_info_value.table_info.meta.schema; let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema - .column_schemas - .clone() + .column_schemas() + .to_vec() .into_iter() .map(|col| { ( @@ -162,7 +162,7 @@ pub fn table_info_value_to_relation_desc( let key = table_info_value.table_info.meta.primary_key_indices; let keys = vec![crate::repr::Key::from(key)]; - let time_index = raw_schema.timestamp_index; + let time_index = raw_schema.timestamp_index(); let relation_desc = RelationDesc { typ: RelationType { column_types, @@ -174,7 +174,7 @@ pub fn table_info_value_to_relation_desc( names: col_names, }; let default_values = raw_schema - .column_schemas + .column_schemas() .iter() .map(|c| { c.default_constraint().cloned().or_else(|| { diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 3ff9d0e11d..5c677e6e38 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -127,7 +127,7 @@ impl BatchingEngine { table_name.table_name, ]; let schema = &table_infos.get(&id).unwrap().table_info.meta.schema; - let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()] + let time_index_unit = schema.column_schemas()[schema.timestamp_index().unwrap()] .data_type .as_timestamp() .unwrap() @@ -576,7 +576,7 @@ impl BatchingEngine { .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .filter(|col| col.data_type == ConcreteDataType::float64_datatype()) .collect::>(); @@ -604,7 +604,7 @@ impl BatchingEngine { .table_info .meta .schema - .column_schemas + .column_schemas() .iter() .enumerate() { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 34011d9fee..ea648bfe7f 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -192,7 +192,7 @@ mod test { let another_region_id = RegionId::new(table_id, region_number + 1); let peer = Peer::empty(datanode_id); let follower_peer = Peer::empty(datanode_id + 1); - let table_info = new_test_table_info(table_id).into(); + let table_info = new_test_table_info(table_id); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), @@ -328,7 +328,7 @@ mod test { let no_exist_region_id = RegionId::new(table_id, region_number + 2); let peer = Peer::empty(datanode_id); let follower_peer = Peer::empty(datanode_id + 1); - let table_info = new_test_table_info(table_id).into(); + let table_info = new_test_table_info(table_id); let region_routes = vec![ RegionRoute { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 6f28c0a927..3c244dd0d5 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -1172,7 +1172,7 @@ mod tests { let from_peer = persistent_context.from_peer.clone(); let to_peer = persistent_context.to_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), @@ -1211,7 +1211,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), @@ -1299,7 +1299,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), @@ -1419,7 +1419,7 @@ mod tests { let from_peer_id = persistent_context.from_peer.id; let from_peer = persistent_context.from_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 2c60664663..e4ecc4ea8b 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -401,7 +401,7 @@ mod tests { async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { let region_id = ctx.persistent_ctx.region_ids[0]; - let table_info = new_test_table_info(region_id.table_id()).into(); + let table_info = new_test_table_info(region_id.table_id()); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 25a3d986ec..70cba21b5f 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -704,7 +704,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(3)), @@ -732,7 +732,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(3)), @@ -764,7 +764,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(3)), @@ -798,7 +798,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(2)), @@ -870,7 +870,7 @@ mod test { timeout: Duration::from_millis(1000), trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(2)), @@ -903,7 +903,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(3)), @@ -936,7 +936,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(3)), @@ -980,7 +980,7 @@ mod test { task.trigger_reason, ), ); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(1)), diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 589f86b3a4..17b577501e 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -223,7 +223,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_route = RegionRoute { region: Region::new_test(RegionId::new(1024, 3)), leader_peer: Some(from_peer.clone()), @@ -250,7 +250,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(to_peer), @@ -277,7 +277,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(from_peer_id)), @@ -302,7 +302,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes: Vec = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(1024)), diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index b110a6007b..74c932466a 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -425,7 +425,7 @@ mod tests { let mut env = TestingEnv::new(); // Prepares table - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(from_peer_id)), diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index c039fc441d..4e5624401e 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -37,7 +37,7 @@ use common_procedure_test::MockContextProvider; use common_telemetry::debug; use futures::future::BoxFuture; use store_api::storage::RegionId; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::error::{self, Error, Result}; @@ -169,7 +169,7 @@ impl TestingEnv { // Creates a table metadata with the physical table route. pub async fn create_physical_table_metadata( &self, - table_info: RawTableInfo, + table_info: TableInfo, region_routes: Vec, ) { self.table_metadata_manager @@ -289,7 +289,7 @@ impl ProcedureMigrationTestSuite { /// Initializes table metadata. pub(crate) async fn init_table_metadata( &self, - table_info: RawTableInfo, + table_info: TableInfo, region_routes: Vec, ) { self.env diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index f2415d9d90..9e8545bb43 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -142,7 +142,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let table_id = ctx.persistent_ctx.region_ids[0].table_id(); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(1024)), @@ -185,7 +185,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let table_id = ctx.persistent_ctx.region_ids[0].table_id(); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(from_peer.clone()), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index eafcaf4677..b3ee848fbe 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -120,7 +120,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let table_id = ctx.persistent_ctx.region_ids[0].table_id(); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![ RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index bdad23c0b1..153e9913ed 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -262,7 +262,7 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(4)), @@ -295,7 +295,7 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(3)), @@ -330,7 +330,7 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(1)), @@ -369,7 +369,7 @@ mod tests { let leader_peer = persistent_context.from_peer.clone(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(leader_peer), @@ -396,7 +396,7 @@ mod tests { let candidate_peer = persistent_context.to_peer.clone(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), @@ -424,7 +424,7 @@ mod tests { let candidate_peer = persistent_context.to_peer.clone(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024).into(); + let table_info = new_test_table_info(1024); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), @@ -454,7 +454,7 @@ mod tests { let opening_keeper = MemoryRegionKeeper::default(); let table_id = 1024; - let table_info = new_test_table_info(table_id).into(); + let table_info = new_test_table_info(table_id); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 0621f30361..da9ea1dd36 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -381,7 +381,7 @@ mod tests { async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { let region_id = ctx.persistent_ctx.region_ids[0]; - let table_info = new_test_table_info(region_id.table_id()).into(); + let table_info = new_test_table_info(region_id.table_id()); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index 6f35c3feec..b1bf93d986 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -28,7 +28,7 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{RegionNumber, TableId}; -use table::metadata::RawTableInfo; +use table::metadata::TableInfo; use table::table_reference::TableReference; use tokio::time::Instant; @@ -258,7 +258,7 @@ impl AllocateRegion { async fn allocate_regions( node_manager: &NodeManagerRef, - raw_table_info: &RawTableInfo, + raw_table_info: &TableInfo, region_routes: &[RegionRoute], wal_options: &HashMap, ) -> Result<()> { diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 76f3f4249a..1071adfa75 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -300,7 +300,7 @@ pub async fn new_wal_prune_metadata( let region_ids = (0..n_region) .map(|i| RegionId::new(table_id, i)) .collect::>(); - let table_info = new_test_table_info(table_id).into(); + let table_info = new_test_table_info(table_id); let region_routes = region_ids .iter() .map(|region_id| RegionRoute { diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index e12268f1c3..0d6e154a31 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -308,8 +308,8 @@ pub mod test_data { use common_meta::sequence::SequenceBuilder; use common_meta::wal_provider::WalProvider; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema}; - use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; + use datatypes::schema::{ColumnSchema, Schema}; + use table::metadata::{TableIdent, TableInfo, TableMeta, TableType}; use table::requests::TableOptions; use crate::cache_invalidator::MetasrvCacheInvalidator; @@ -330,8 +330,8 @@ pub mod test_data { ] } - pub fn new_table_info() -> RawTableInfo { - RawTableInfo { + pub fn new_table_info() -> TableInfo { + TableInfo { ident: TableIdent { table_id: 42, version: 1, @@ -340,33 +340,29 @@ pub mod test_data { desc: Some("blabla".to_string()), catalog_name: "my_catalog".to_string(), schema_name: "my_schema".to_string(), - meta: RawTableMeta { - schema: RawSchema { - column_schemas: vec![ - ColumnSchema::new( - "ts".to_string(), - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ColumnSchema::new( - "my_tag1".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - ColumnSchema::new( - "my_tag2".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - ColumnSchema::new( - "my_field_column".to_string(), - ConcreteDataType::int32_datatype(), - true, - ), - ], - timestamp_index: Some(0), - version: 0, - }, + meta: TableMeta { + schema: Arc::new(Schema::new(vec![ + ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new( + "my_tag1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "my_tag2".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "my_field_column".to_string(), + ConcreteDataType::int32_datatype(), + true, + ), + ])), primary_key_indices: vec![1, 2], value_indices: vec![2], engine: MITO2_ENGINE.to_string(), diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index c4b105345c..6d282fb49f 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -265,7 +265,7 @@ mod tests { use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; - use table::metadata::RawTableInfo; + use table::metadata::TableInfo; use super::{RegionLeaseKeeper, renew_region_lease_via_region_route}; use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse}; @@ -343,7 +343,7 @@ mod tests { #[tokio::test] async fn test_collect_metadata() { let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id).into(); + let table_info: TableInfo = new_test_table_info(table_id); let region_id = RegionId::new(table_id, 1); let leader_peer_id = 1024; @@ -394,7 +394,7 @@ mod tests { #[tokio::test] async fn test_renew_region_leases_basic() { let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id).into(); + let table_info: TableInfo = new_test_table_info(table_id); let region_id = RegionId::new(table_id, 1); let leader_peer_id = 1024; @@ -501,7 +501,7 @@ mod tests { #[tokio::test] async fn test_renew_unexpected_logic_table() { let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id).into(); + let table_info: TableInfo = new_test_table_info(table_id); let region_id = RegionId::new(table_id, 1); let keeper = new_test_keeper(); @@ -537,7 +537,7 @@ mod tests { #[tokio::test] async fn test_renew_region_leases_with_downgrade_leader() { let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id).into(); + let table_info: TableInfo = new_test_table_info(table_id); let region_id = RegionId::new(table_id, 1); let leader_peer_id = 1024; diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 139847f01b..b4acc0f363 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -306,13 +306,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to create table info"))] - CreateTableInfo { - #[snafu(implicit)] - location: Location, - source: datatypes::error::Error, - }, - #[snafu(display("Failed to build CreateExpr on insertion"))] BuildCreateExprOnInsertion { #[snafu(implicit)] @@ -973,7 +966,6 @@ impl ErrorExt for Error { } Error::Table { source, .. } | Error::Insert { source, .. } => source.status_code(), Error::ConvertColumnDefaultConstraint { source, .. } - | Error::CreateTableInfo { source, .. } | Error::IntoVectors { source, .. } => source.status_code(), Error::RequestInserts { source, .. } | Error::FindViewInfo { source, .. } => { source.status_code() diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index d8a8d54d43..17e6522894 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -155,7 +155,8 @@ pub(crate) async fn create_external_expr( let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options) .await .context(InferFileTableSchemaSnafu)? - .column_schemas; + .column_schemas() + .to_vec(); let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() { // expanded form diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index d250d66d69..3384aaa097 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -54,7 +54,7 @@ use common_time::{Timestamp, Timezone}; use datafusion_common::tree_node::TreeNodeVisitor; use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema, Schema}; +use datatypes::schema::{ColumnSchema, Schema}; use datatypes::value::Value; use datatypes::vectors::{StringVector, VectorRef}; use humantime::parse_duration; @@ -83,7 +83,7 @@ use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::TableRef; use table::dist_table::DistTable; -use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; +use table::metadata::{self, TableId, TableInfo, TableMeta, TableType}; use table::requests::{ AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions, }; @@ -92,7 +92,7 @@ use table::table_reference::TableReference; use crate::error::{ self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu, - ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, + ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, @@ -385,8 +385,7 @@ impl StatementExecutor { table_info.ident.table_id = table_id; - let table_info: Arc = - Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?); + let table_info = Arc::new(table_info); create_table.table_id = Some(api::v1::TableId { id: table_id }); let table = DistTable::table(table_info); @@ -921,7 +920,7 @@ impl StatementExecutor { let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name); - let mut view_info = RawTableInfo { + let mut view_info = TableInfo { ident: metadata::TableIdent { // The view id of distributed table is assigned by Meta, set "0" here as a placeholder. table_id: 0, @@ -932,7 +931,7 @@ impl StatementExecutor { catalog_name: expr.catalog_name.clone(), schema_name: expr.schema_name.clone(), // The meta doesn't make sense for views, so using a default one. - meta: RawTableMeta::default(), + meta: TableMeta::empty(), table_type: TableType::View, }; @@ -961,7 +960,7 @@ impl StatementExecutor { view_info.ident.table_id = view_id; - let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?); + let view_info = Arc::new(view_info); let table = DistTable::table(view_info); @@ -1737,7 +1736,7 @@ impl StatementExecutor { &self, create_table: CreateTableExpr, partitions: Vec, - table_info: RawTableInfo, + table_info: TableInfo, query_context: QueryContextRef, ) -> Result { let partitions = partitions @@ -1758,7 +1757,7 @@ impl StatementExecutor { async fn create_logical_tables_procedure( &self, - tables_data: Vec<(CreateTableExpr, RawTableInfo)>, + tables_data: Vec<(CreateTableExpr, TableInfo)>, query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( @@ -2057,7 +2056,7 @@ pub fn verify_alter( pub fn create_table_info( create_table: &CreateTableExpr, partition_columns: Vec, -) -> Result { +) -> Result { let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); let mut column_name_to_index_map = HashMap::new(); @@ -2072,15 +2071,8 @@ pub fn create_table_info( let _ = column_name_to_index_map.insert(column.name.clone(), idx); } - let timestamp_index = column_name_to_index_map - .get(&create_table.time_index) - .cloned(); - - let raw_schema = RawSchema { - column_schemas: column_schemas.clone(), - timestamp_index, - version: 0, - }; + let next_column_id = column_schemas.len() as u32; + let schema = Arc::new(Schema::new(column_schemas)); let primary_key_indices = create_table .primary_keys @@ -2106,12 +2098,12 @@ pub fn create_table_info( let table_options = TableOptions::try_from_iter(&create_table.table_options) .context(UnrecognizedTableOptionSnafu)?; - let meta = RawTableMeta { - schema: raw_schema, + let meta = TableMeta { + schema, primary_key_indices, value_indices: vec![], engine: create_table.engine.clone(), - next_column_id: column_schemas.len() as u32, + next_column_id, options: table_options, created_on: Utc::now(), updated_on: Utc::now(), @@ -2125,7 +2117,7 @@ pub fn create_table_info( Some(create_table.desc.clone()) }; - let table_info = RawTableInfo { + let table_info = TableInfo { ident: metadata::TableIdent { // The table id of distributed table is assigned by Meta, set "0" here as a placeholder. table_id: 0, diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 7d4c14020f..33cdb0f367 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -133,8 +133,7 @@ impl StatementExecutor { .await .context(TableMetadataManagerSnafu)? { - let mut latest_info = TableInfo::try_from(latest.into_inner().table_info) - .context(error::CreateTableInfoSnafu)?; + let mut latest_info = latest.into_inner().table_info; if !partition_column_names.is_empty() { latest_info.meta.partition_key_indices = partition_column_names diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index fef513be40..c3d7b70adb 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -124,7 +124,7 @@ pub(crate) async fn create_partition_rule_manager( let region_wal_options = new_test_region_wal_options(regions.clone()); table_metadata_manager .create_table_metadata( - new_test_table_info(1, "table_1", regions.clone().into_iter()).into(), + new_test_table_info(1, "table_1", regions.clone().into_iter()), TableRouteValue::physical(vec![ RegionRoute { region: Region { diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index fb927d2901..dcff099e82 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -44,7 +44,7 @@ use datafusion::common::ScalarValue; use datafusion::prelude::SessionContext; use datafusion_expr::{Expr, SortExpr, case, col, lit}; use datatypes::prelude::*; -use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema}; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; use datatypes::vectors::StringVector; use itertools::Itertools; use object_store::ObjectStore; @@ -1203,14 +1203,12 @@ pub async fn infer_file_table_schema( object_store: &ObjectStore, files: &[String], options: &HashMap, -) -> Result { +) -> Result { let format = parse_file_table_format(options)?; let merged = infer_schemas(object_store, files, format.as_ref()) .await .context(error::InferSchemaSnafu)?; - Ok(RawSchema::from( - &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?, - )) + Schema::try_from(merged).context(error::ConvertSchemaSnafu) } // Converts the file column schemas to table column schemas. diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 5e40a0e845..bb90b82e57 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -22,8 +22,7 @@ use common_query::AddColumnLocation; use datafusion_expr::TableProviderFilterPushDown; pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; use datatypes::schema::{ - ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef, - SkippingIndexOptions, + ColumnSchema, FulltextOptions, Schema, SchemaBuilder, SchemaRef, SkippingIndexOptions, }; use derive_builder::Builder; use serde::{Deserialize, Deserializer, Serialize}; @@ -124,7 +123,7 @@ pub struct TableIdent { /// The table metadata. /// /// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works. -#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)] +#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder, Serialize)] #[builder(pattern = "mutable", custom_constructor)] pub struct TableMeta { pub schema: SchemaRef, @@ -149,6 +148,72 @@ pub struct TableMeta { pub column_ids: Vec, } +impl TableMeta { + pub fn empty() -> Self { + Self { + schema: Arc::new(Schema::new(vec![])), + primary_key_indices: vec![], + value_indices: vec![], + engine: "".to_string(), + next_column_id: 0, + options: TableOptions::default(), + created_on: Utc::now(), + updated_on: Utc::now(), + partition_key_indices: vec![], + column_ids: vec![], + } + } +} + +impl<'de> Deserialize<'de> for TableMeta { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct RawTableMeta { + schema: SchemaRef, + primary_key_indices: Vec, + value_indices: Vec, + engine: String, + next_column_id: ColumnId, + options: TableOptions, + created_on: DateTime, + updated_on: Option>, + #[serde(default)] + partition_key_indices: Vec, + #[serde(default)] + column_ids: Vec, + } + + let RawTableMeta { + schema, + primary_key_indices, + value_indices, + engine, + next_column_id, + options, + created_on, + updated_on, + partition_key_indices, + column_ids, + } = RawTableMeta::deserialize(deserializer)?; + + Ok(Self { + schema, + primary_key_indices, + value_indices, + engine, + next_column_id, + options, + created_on, + updated_on: updated_on.unwrap_or(created_on), + partition_key_indices, + column_ids, + }) + } +} + impl TableMetaBuilder { /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder]. #[cfg(any(test, feature = "testing"))] @@ -1062,7 +1127,7 @@ impl TableMeta { } } -#[derive(Clone, Debug, PartialEq, Eq, Builder)] +#[derive(Clone, Debug, PartialEq, Eq, Builder, Serialize, Deserialize)] #[builder(pattern = "owned")] pub struct TableInfo { /// Id and version of the table. @@ -1154,126 +1219,13 @@ impl From for TableIdent { } } -/// Struct used to serialize and deserialize [`TableMeta`]. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)] -pub struct RawTableMeta { - pub schema: RawSchema, - /// The indices of columns in primary key. Note that the index of timestamp column - /// is not included. Order matters to this array. - pub primary_key_indices: Vec, - /// The indices of columns in value. The index of timestamp column is included. - /// Order doesn't matter to this array. - pub value_indices: Vec, - /// Engine type of this table. Usually in small case. - pub engine: String, - /// Next column id of a new column. - /// It's used to ensure all columns with the same name across all regions have the same column id. - pub next_column_id: ColumnId, - pub options: TableOptions, - pub created_on: DateTime, - pub updated_on: DateTime, - /// Order doesn't matter to this array. - #[serde(default)] - pub partition_key_indices: Vec, - /// Map of column name to column id. - /// Note: This field may be empty for older versions that did not include this field. - #[serde(default)] - pub column_ids: Vec, -} - -impl<'de> Deserialize<'de> for RawTableMeta { - fn deserialize( - deserializer: D, - ) -> std::result::Result>::Error> - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - struct Helper { - schema: RawSchema, - primary_key_indices: Vec, - value_indices: Vec, - engine: String, - next_column_id: u32, - options: TableOptions, - created_on: DateTime, - updated_on: Option>, - #[serde(default)] - partition_key_indices: Vec, - #[serde(default)] - column_ids: Vec, - } - - let h = Helper::deserialize(deserializer)?; - Ok(RawTableMeta { - schema: h.schema, - primary_key_indices: h.primary_key_indices, - value_indices: h.value_indices, - engine: h.engine, - next_column_id: h.next_column_id, - options: h.options, - created_on: h.created_on, - updated_on: h.updated_on.unwrap_or(h.created_on), - partition_key_indices: h.partition_key_indices, - column_ids: h.column_ids, - }) - } -} - -impl From for RawTableMeta { - fn from(meta: TableMeta) -> RawTableMeta { - RawTableMeta { - schema: RawSchema::from(&*meta.schema), - primary_key_indices: meta.primary_key_indices, - value_indices: meta.value_indices, - engine: meta.engine, - next_column_id: meta.next_column_id, - options: meta.options, - created_on: meta.created_on, - updated_on: meta.updated_on, - partition_key_indices: meta.partition_key_indices, - column_ids: meta.column_ids, - } - } -} - -impl TryFrom for TableMeta { - type Error = ConvertError; - - fn try_from(raw: RawTableMeta) -> ConvertResult { - Ok(TableMeta { - schema: Arc::new(Schema::try_from(raw.schema)?), - primary_key_indices: raw.primary_key_indices, - value_indices: raw.value_indices, - engine: raw.engine, - next_column_id: raw.next_column_id, - options: raw.options, - created_on: raw.created_on, - updated_on: raw.updated_on, - partition_key_indices: raw.partition_key_indices, - column_ids: raw.column_ids, - }) - } -} - -/// Struct used to serialize and deserialize [`TableInfo`]. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -pub struct RawTableInfo { - pub ident: TableIdent, - pub name: String, - pub desc: Option, - pub catalog_name: String, - pub schema_name: String, - pub meta: RawTableMeta, - pub table_type: TableType, -} - -impl RawTableInfo { +impl TableInfo { /// Returns the map of column name to column id. /// /// Note: This method may return an empty map for older versions that did not include this field. pub fn name_to_ids(&self) -> Option> { - if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() { + let column_schemas = self.meta.schema.column_schemas(); + if self.meta.column_ids.len() != column_schemas.len() { None } else { Some( @@ -1281,15 +1233,15 @@ impl RawTableInfo { .column_ids .iter() .enumerate() - .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id)) + .map(|(index, id)| (column_schemas[index].name.clone(), *id)) .collect(), ) } } - /// Sort the columns in [RawTableInfo], logical tables require it. + /// Sort the columns in [TableInfo], logical tables require it. pub fn sort_columns(&mut self) { - let column_schemas = &self.meta.schema.column_schemas; + let column_schemas = self.meta.schema.column_schemas(); let primary_keys = self .meta .primary_key_indices @@ -1298,23 +1250,16 @@ impl RawTableInfo { .collect::>(); let name_to_ids = self.name_to_ids().unwrap_or_default(); - self.meta - .schema - .column_schemas - .sort_unstable_by(|a, b| a.name.cmp(&b.name)); + let mut column_schemas = column_schemas.to_vec(); + column_schemas.sort_unstable_by(|a, b| a.name.cmp(&b.name)); // Compute new indices of sorted columns let mut primary_key_indices = Vec::with_capacity(primary_keys.len()); - let mut timestamp_index = None; - let mut value_indices = - Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len()); - let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len()); - for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() { + let mut value_indices = Vec::with_capacity(column_schemas.len() - primary_keys.len()); + let mut column_ids = Vec::with_capacity(column_schemas.len()); + for (index, column_schema) in column_schemas.iter().enumerate() { if primary_keys.contains(&column_schema.name) { primary_key_indices.push(index); - } else if column_schema.is_time_index() { - value_indices.push(index); - timestamp_index = Some(index); } else { value_indices.push(index); } @@ -1324,7 +1269,10 @@ impl RawTableInfo { } // Overwrite table meta - self.meta.schema.timestamp_index = timestamp_index; + self.meta.schema = Arc::new(Schema::new_with_version( + column_schemas, + self.meta.schema.version(), + )); self.meta.primary_key_indices = primary_key_indices; self.meta.value_indices = value_indices; self.meta.column_ids = column_ids; @@ -1347,36 +1295,6 @@ impl RawTableInfo { } } -impl From for RawTableInfo { - fn from(info: TableInfo) -> RawTableInfo { - RawTableInfo { - ident: info.ident, - name: info.name, - desc: info.desc, - catalog_name: info.catalog_name, - schema_name: info.schema_name, - meta: RawTableMeta::from(info.meta), - table_type: info.table_type, - } - } -} - -impl TryFrom for TableInfo { - type Error = ConvertError; - - fn try_from(raw: RawTableInfo) -> ConvertResult { - Ok(TableInfo { - ident: raw.ident, - name: raw.name, - desc: raw.desc, - catalog_name: raw.catalog_name, - schema_name: raw.schema_name, - meta: TableMeta::try_from(raw.meta)?, - table_type: raw.table_type, - }) - } -} - /// Set column fulltext options if it passed the validation. /// /// Options allowed to modify: @@ -1491,30 +1409,6 @@ mod tests { .unwrap() } - #[test] - fn test_raw_convert() { - let schema = Arc::new(new_test_schema()); - let meta = TableMetaBuilder::empty() - .schema(schema) - .primary_key_indices(vec![0]) - .engine("engine") - .next_column_id(3) - .build() - .unwrap(); - let info = TableInfoBuilder::default() - .table_id(10) - .table_version(5) - .name("mytable") - .meta(meta) - .build() - .unwrap(); - - let raw = RawTableInfo::from(info.clone()); - let info_new = TableInfo::try_from(raw).unwrap(); - - assert_eq!(info, info_new); - } - fn add_columns_to_meta(meta: &TableMeta) -> TableMeta { let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); @@ -2121,4 +2015,72 @@ mod tests { let fulltext_options = column_schema.fulltext_options().unwrap().unwrap(); assert!(!fulltext_options.enable); } + + #[test] + fn test_table_info_serde_compatibility() { + // "serialized" is generated by the following codes before this refactor (PR 7626): + // + // ```Rust + // serde_json::to_string(&RawTableInfo::from(TableInfo { + // ident: TableIdent { + // table_id: 1024, + // version: 1, + // }, + // name: "foo".to_string(), + // desc: Some("my table".to_string()), + // catalog_name: "greptime".to_string(), + // schema_name: "public".to_string(), + // meta: TableMeta { + // schema: Arc::new(new_test_schema()), + // primary_key_indices: vec![0], + // value_indices: vec![1, 2], + // engine: "mito".to_string(), + // next_column_id: 3, + // options: TableOptions { + // ttl: Some(common_time::TimeToLive::Duration( + // std::time::Duration::from_secs(3600), + // )), + // ..Default::default() + // }, + // created_on: DateTime::::MIN_UTC, + // updated_on: DateTime::::MAX_UTC, + // partition_key_indices: vec![2], + // column_ids: vec![0, 1, 2], + // }, + // table_type: TableType::Base, + // })) + // ``` + let serialized = r#"{"ident":{"table_id":1024,"version":1},"name":"foo","desc":"my table","catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"col1","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"col2","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":1,"version":123},"primary_key_indices":[0],"value_indices":[1,2],"engine":"mito","next_column_id":3,"options":{"write_buffer_size":null,"ttl":"1h","skip_wal":false,"extra_options":{}},"created_on":"-262143-01-01T00:00:00Z","updated_on":"+262142-12-31T23:59:59.999999999Z","partition_key_indices":[2],"column_ids":[0,1,2]},"table_type":"Base"}"#; + + let actual: TableInfo = serde_json::from_str(serialized).unwrap(); + let expected = TableInfo { + ident: TableIdent { + table_id: 1024, + version: 1, + }, + name: "foo".to_string(), + desc: Some("my table".to_string()), + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + meta: TableMeta { + schema: Arc::new(new_test_schema()), + primary_key_indices: vec![0], + value_indices: vec![1, 2], + engine: "mito".to_string(), + next_column_id: 3, + options: TableOptions { + ttl: Some(common_time::TimeToLive::Duration( + std::time::Duration::from_secs(3600), + )), + ..Default::default() + }, + created_on: DateTime::::MIN_UTC, + updated_on: DateTime::::MAX_UTC, + partition_key_indices: vec![2], + column_ids: vec![0, 1, 2], + }, + table_type: TableType::Base, + }; + assert_eq!(actual, expected); + } } diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index c5d2d2750c..1c6e0ee50b 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -128,12 +128,13 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; +++++++++++++++++++++++++ -- SQLNESS REPLACE (\s\d+\s) ID +-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) DATETIME SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW'; +---------------+--------------+------------+------------+----------+-------------+-----------------+--------------+------------------+----------------+--------+---------+------------+------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------------+-----------+ | table_catalog | table_schema | table_name | table_type | table_id | data_length | max_data_length | index_length | max_index_length | avg_row_length | engine | version | row_format | table_rows | data_free | auto_increment | create_time | update_time | check_time | table_collation | checksum | create_options | table_comment | temporary | +---------------+--------------+------------+------------+----------+-------------+-----------------+--------------+------------------+----------------+--------+---------+------------+------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------------+-----------+ -| greptime | public | test_view | VIEW |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | 1970-01-01T00:00:00 | 1970-01-01T00:00:00 | | utf8_bin |ID | | | N | +| greptime | public | test_view | VIEW |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID |DATETIME |DATETIME | | utf8_bin |ID | | | N | +---------------+--------------+------------+------------+----------+-------------+-----------------+--------------+------------------+----------------+--------+---------+------------+------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------------+-----------+ SHOW COLUMNS FROM test_view; diff --git a/tests/cases/standalone/common/view/create.sql b/tests/cases/standalone/common/view/create.sql index d5e390de07..b82704d3a9 100644 --- a/tests/cases/standalone/common/view/create.sql +++ b/tests/cases/standalone/common/view/create.sql @@ -47,6 +47,7 @@ ORDER BY 1,2; SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; -- SQLNESS REPLACE (\s\d+\s) ID +-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) DATETIME SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW'; SHOW COLUMNS FROM test_view;