refactor: remove the RawTableMeta and RawTableInfo to make codes more concise

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-01-27 20:19:26 +08:00
parent 4ac73a7e9e
commit e6d0e1754b
75 changed files with 562 additions and 770 deletions

8
Cargo.lock generated
View File

@@ -2240,7 +2240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -6347,7 +6347,7 @@ dependencies = [
"js-sys",
"log",
"wasm-bindgen",
"windows-core 0.57.0",
"windows-core 0.61.2",
]
[[package]]
@@ -7344,7 +7344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -15284,7 +15284,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@@ -218,13 +218,6 @@ pub enum Error {
source: common_query::error::Error,
},
#[snafu(display("Invalid table info in catalog"))]
InvalidTableInfoInCatalog {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
QueryAccessDenied { catalog: String, schema: String },
@@ -368,7 +361,6 @@ impl ErrorExt for Error {
Error::CreateTable { source, .. } => source.status_code(),
Error::DecodePlan { source, .. } => source.status_code(),
Error::InvalidTableInfoInCatalog { source, .. } => source.status_code(),
Error::Internal { source, .. } => source.status_code(),

View File

@@ -50,8 +50,8 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::CatalogManager;
use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
CacheNotFoundSnafu, GetTableCacheSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
Result, TableMetadataManagerSnafu,
};
use crate::information_schema::{
InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
@@ -146,7 +146,7 @@ impl KvBackendCatalogManager {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.get(physical_index)
.and_then(|physical_column| {
// Find the corresponding index in the logical table schema
@@ -417,7 +417,7 @@ impl CatalogManager for KvBackendCatalogManager {
.into_values()
.filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
.map(build_table)
.collect::<Result<Vec<_>>>()?;
.collect::<Vec<_>>();
Ok(tables)
}
@@ -507,16 +507,12 @@ impl CatalogManager for KvBackendCatalogManager {
};
for table in table_info_values.into_values().map(build_table) {
let table = if let Ok(table) = table {
Self::override_logical_table_partition_key_indices(
&table_route_cache,
metadata_manager.table_info_manager(),
table,
)
.await
} else {
table
};
let table = Self::override_logical_table_partition_key_indices(
&table_route_cache,
metadata_manager.table_info_manager(),
table,
)
.await;
if tx.send(table).await.is_err() {
return;
}
@@ -530,12 +526,9 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
fn build_table(table_info_value: TableInfoValue) -> TableRef {
let table_info = table_info_value.table_info;
DistTable::table(Arc::new(table_info))
}
// TODO: This struct can hold a static map of all system tables when

View File

@@ -343,7 +343,7 @@ mod tests {
// Create view metadata
table_metadata_manager
.create_view_metadata(
view_info.clone().into(),
view_info.clone(),
logical_plan,
HashSet::new(),
vec!["a".to_string(), "b".to_string()],

View File

@@ -32,10 +32,10 @@ use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::info;
use common_wal::options::WalOptions;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use datatypes::schema::{ColumnSchema, Schema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use table::metadata::{TableId, TableIdent, TableInfo, TableMeta, TableType};
use table::table_name::TableName;
use self::metadata::TableMetadataBencher;
@@ -132,7 +132,7 @@ impl Tool for BenchTableMetadata {
}
}
fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
fn create_table_info(table_id: TableId, table_name: TableName) -> TableInfo {
let columns = 100;
let mut column_schemas = Vec::with_capacity(columns);
column_schemas.push(
@@ -153,8 +153,8 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
));
}
let meta = RawTableMeta {
schema: RawSchema::new(column_schemas),
let meta = TableMeta {
schema: Arc::new(Schema::new(column_schemas)),
engine: "mito".to_string(),
created_on: chrono::DateTime::default(),
updated_on: chrono::DateTime::default(),
@@ -166,7 +166,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
column_ids: vec![],
};
RawTableInfo {
TableInfo {
ident: TableIdent {
table_id,
version: 1,

View File

@@ -19,7 +19,7 @@ use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::rpc::store::PutRequest;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
/// Puts a key-value pair into the kv backend.
pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) {
@@ -35,7 +35,7 @@ pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) {
pub async fn prepare_physical_table_metadata(
table_name: &str,
table_id: TableId,
) -> (RawTableInfo, PhysicalTableRouteValue) {
) -> (TableInfo, PhysicalTableRouteValue) {
let mut create_physical_table_task = test_create_physical_table_task(table_name);
let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
region: Region {

View File

@@ -21,14 +21,12 @@ use common_meta::rpc::router::{RegionRoute, find_leader_regions};
use operator::expr_helper::column_schemas_to_defs;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
/// Generates alter table expression for all columns.
pub fn generate_alter_table_expr_for_all_columns(
table_info: &RawTableInfo,
) -> Result<AlterTableExpr> {
pub fn generate_alter_table_expr_for_all_columns(table_info: &TableInfo) -> Result<AlterTableExpr> {
let schema = &table_info.meta.schema;
let mut alter_table_expr = AlterTableExpr {
@@ -42,10 +40,10 @@ pub fn generate_alter_table_expr_for_all_columns(
.meta
.primary_key_indices
.iter()
.map(|i| schema.column_schemas[*i].name.clone())
.map(|i| schema.column_schemas()[*i].name.clone())
.collect::<Vec<_>>();
let add_columns = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys)
let add_columns = column_schemas_to_defs(schema.column_schemas().to_vec(), &primary_keys)
.context(CovertColumnSchemasToDefsSnafu)?;
alter_table_expr.kind = Some(Kind::AddColumns(AddColumns {

View File

@@ -23,23 +23,24 @@ use common_meta::rpc::router::{RegionRoute, find_leader_regions};
use operator::expr_helper::column_schemas_to_defs;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
/// Generates a `CreateTableExpr` from a `RawTableInfo`.
pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTableExpr> {
/// Generates a `CreateTableExpr` from a `TableInfo`.
pub fn generate_create_table_expr(table_info: &TableInfo) -> Result<CreateTableExpr> {
let schema = &table_info.meta.schema;
let column_schemas = schema.column_schemas();
let primary_keys = table_info
.meta
.primary_key_indices
.iter()
.map(|i| schema.column_schemas[*i].name.clone())
.map(|i| column_schemas[*i].name.clone())
.collect::<Vec<_>>();
let timestamp_index = schema.timestamp_index.as_ref().unwrap();
let time_index = schema.column_schemas[*timestamp_index].name.clone();
let column_defs = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys)
let timestamp_index = schema.timestamp_index().unwrap();
let time_index = column_schemas[timestamp_index].name.clone();
let column_defs = column_schemas_to_defs(column_schemas.to_vec(), &primary_keys)
.context(CovertColumnSchemasToDefsSnafu)?;
let table_options = HashMap::from(&table_info.meta.options);

View File

@@ -24,7 +24,7 @@ use common_meta::kv_backend::KvBackendRef;
use futures::Stream;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::error::{Result, TableMetadataSnafu, UnexpectedSnafu};
@@ -58,7 +58,7 @@ pub struct TableMetadataIterator {
/// The full table metadata.
pub struct FullTableMetadata {
pub table_id: TableId,
pub table_info: RawTableInfo,
pub table_info: TableInfo,
pub table_route: TableRouteValue,
}

View File

@@ -24,7 +24,7 @@ use api::v1::{
SkippingIndexType as PbSkippingIndexType, column_def,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SkippingIndexOptions};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
use table::metadata::{TableId, TableMeta};
@@ -268,7 +268,7 @@ pub fn alter_expr_to_request(
Ok(request)
}
pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<RawSchema> {
pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<Schema> {
let column_schemas = expr
.column_defs
.iter()
@@ -300,7 +300,7 @@ pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) ->
})
.collect::<Vec<_>>();
Ok(RawSchema::new(column_schemas))
Ok(Schema::new(column_schemas))
}
fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::{OptionExt, ResultExt};
use snafu::OptionExt;
use store_api::storage::TableId;
use table::metadata::TableInfo;
@@ -48,15 +48,13 @@ fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId,
Arc::new(move |table_id| {
let table_info_manager = table_info_manager.clone();
Box::pin(async move {
let raw_table_info = table_info_manager
let table_info = table_info_manager
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu {})?
.into_inner()
.table_info;
Ok(Some(Arc::new(
TableInfo::try_from(raw_table_info).context(error::ConvertRawTableInfoSnafu)?,
)))
Ok(Some(Arc::new(table_info)))
})
})
}
@@ -109,7 +107,7 @@ mod tests {
.await
.unwrap();
let table_info = cache.get(1024).await.unwrap().unwrap();
assert_eq!(*table_info, TableInfo::try_from(task.table_info).unwrap());
assert_eq!(*table_info, task.table_info);
assert!(cache.contains_key(&1024));
cache

View File

@@ -14,7 +14,7 @@
use common_grpc_expr::alter_expr_to_request;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use table::metadata::TableInfo;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
@@ -56,7 +56,7 @@ impl AlterLogicalTablesProcedure {
pub(crate) fn build_update_metadata(
&self,
) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>> {
let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
for (task, table) in self
.data
@@ -74,10 +74,9 @@ impl AlterLogicalTablesProcedure {
&self,
task: &AlterTableTask,
table: &DeserializedValueWithBytes<TableInfoValue>,
) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
) -> Result<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)> {
// Builds new_meta
let table_info = TableInfo::try_from(table.table_info.clone())
.context(error::ConvertRawTableInfoSnafu)?;
let table_info = table.table_info.clone();
let table_ref = task.table_ref();
let request = alter_expr_to_request(
table.table_info.ident.table_id,
@@ -98,9 +97,8 @@ impl AlterLogicalTablesProcedure {
new_table.meta = new_meta;
new_table.ident.version = version;
let mut raw_table_info = RawTableInfo::from(new_table);
raw_table_info.sort_columns();
new_table.sort_columns();
Ok((table.clone(), raw_table_info))
Ok((table.clone(), new_table))
}
}

View File

@@ -239,7 +239,7 @@ fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) ->
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|c| &c.name)
.collect::<HashSet<_>>();

View File

@@ -34,7 +34,7 @@ use snafu::{ResultExt, ensure};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::metadata::{TableId, TableInfo};
use table::table_reference::TableReference;
use crate::ddl::DdlContext;
@@ -288,7 +288,7 @@ impl AlterTableProcedure {
&self.context.table_metadata_manager,
table_info_value,
self.data.region_distribution.as_ref(),
new_info.into(),
new_info,
&self.data.column_metadatas,
)
.await?;
@@ -426,7 +426,7 @@ impl AlterTableData {
self.table_id
}
fn table_info(&self) -> Option<&RawTableInfo> {
fn table_info(&self) -> Option<&TableInfo> {
self.table_info_value
.as_ref()
.map(|value| &value.table_info)

View File

@@ -26,7 +26,7 @@ use futures::future;
use snafu::{ResultExt, ensure};
use store_api::metadata::ColumnMetadata;
use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, TableInfo};
use table::metadata::TableInfo;
use table::requests::AlterKind;
use table::table_name::TableName;
@@ -107,7 +107,7 @@ impl AlterTableExecutor {
/// Building the new table info here allows us to catch any issues with the
/// alteration before committing metadata changes.
pub(crate) fn validate_alter_table_expr(
table_info: &RawTableInfo,
table_info: &TableInfo,
alter_table_expr: AlterTableExpr,
) -> Result<TableInfo> {
build_new_table_info(table_info, alter_table_expr)
@@ -119,7 +119,7 @@ impl AlterTableExecutor {
table_metadata_manager: &TableMetadataManagerRef,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
region_distribution: Option<&RegionDistribution>,
mut raw_table_info: RawTableInfo,
mut raw_table_info: TableInfo,
column_metadatas: &[ColumnMetadata],
) -> Result<()> {
let table_ref = self.table.table_ref();
@@ -261,11 +261,10 @@ pub(crate) fn make_alter_region_request(
/// `next_column_id` by the number of columns being added, which may result in gaps
/// in the column id sequence.
fn build_new_table_info(
table_info: &RawTableInfo,
table_info: &TableInfo,
alter_table_expr: AlterTableExpr,
) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
let table_info = table_info.clone();
let schema_name = &table_info.schema_name;
let catalog_name = &table_info.catalog_name;
let table_name = &table_info.name;

View File

@@ -19,7 +19,7 @@ use api::v1::region::{
AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, alter_request,
};
use snafu::OptionExt;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{self, InvalidProtoMsgSnafu, Result};
@@ -43,7 +43,7 @@ impl AlterTableProcedure {
/// It always adds column if not exists and drops column if exists.
/// It skips the column if it already exists in the table.
fn create_proto_alter_kind(
table_info: &RawTableInfo,
table_info: &TableInfo,
alter_kind: &Kind,
) -> Result<Option<alter_request::Kind>> {
match alter_kind {
@@ -52,7 +52,7 @@ fn create_proto_alter_kind(
let existing_columns: HashSet<_> = table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();

View File

@@ -12,18 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY;
use datatypes::schema::{COMMENT_KEY as COLUMN_COMMENT_KEY, Schema};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::TableId;
use strum::AsRefStr;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
use table::table_name::TableName;
@@ -146,7 +148,7 @@ impl CommentOnProcedure {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.any(|col| &col.name == column_name);
@@ -175,7 +177,7 @@ impl CommentOnProcedure {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.find(|col| &col.name == column_name)
.unwrap(); // Safe: validated above
@@ -276,16 +278,18 @@ impl CommentOnProcedure {
let mut new_table_info = table_info_value.table_info.clone();
let column_name = self.data.column_name.as_ref().unwrap();
let column_schema = new_table_info
.meta
.schema
.column_schemas
let mut column_schemas = new_table_info.meta.schema.column_schemas().to_vec();
let column_schema = column_schemas
.iter_mut()
.find(|col| &col.name == column_name)
.unwrap(); // Safe: validated in prepare
update_column_comment_metadata(column_schema, self.data.comment.clone());
new_table_info.meta.schema = Arc::new(Schema::new_with_version(
column_schemas,
new_table_info.meta.schema.version(),
));
self.update_table_info(table_info_value, new_table_info)
.await?;
@@ -328,7 +332,7 @@ impl CommentOnProcedure {
async fn update_table_info(
&self,
current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
new_table_info: RawTableInfo,
new_table_info: TableInfo,
) -> Result<()> {
let table_id = current_table_info.table_info.ident.table_id;
let new_table_info_value = current_table_info.update(new_table_info);

View File

@@ -32,7 +32,7 @@ use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use crate::ddl::DdlContext;
use crate::ddl::utils::{
@@ -279,7 +279,7 @@ impl CreateTablesData {
/// Returns the remaining tasks.
/// The length of tasks must be greater than 0.
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
fn remaining_tasks(&self) -> Vec<(TableInfo, TableRouteValue)> {
self.tasks
.iter()
.zip(self.table_ids_already_exists.iter())

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use datatypes::schema::{ColumnSchema, Schema};
use snafu::OptionExt;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
@@ -51,7 +52,7 @@ impl CreateLogicalTablesProcedure {
.partition_key_indices
.iter()
.map(|&idx| {
physical_table_info.table_info.meta.schema.column_schemas[idx]
physical_table_info.table_info.meta.schema.column_schemas()[idx]
.name
.clone()
})
@@ -72,14 +73,9 @@ impl CreateLogicalTablesProcedure {
for task in &mut self.data.tasks {
// Get existing column names in the logical table
let existing_column_names: HashSet<_> = task
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect();
let column_schemas = task.table_info.meta.schema.column_schemas();
let existing_column_names: HashSet<_> =
column_schemas.iter().map(|c| &c.name).collect();
let mut new_columns = Vec::new();
let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone();
@@ -87,8 +83,7 @@ impl CreateLogicalTablesProcedure {
// Add missing partition columns
for partition_column in partition_columns {
if !existing_column_names.contains(partition_column) {
let new_column_index =
task.table_info.meta.schema.column_schemas.len() + new_columns.len();
let new_column_index = column_schemas.len() + new_columns.len();
// Create new column schema for the partition column
let column_schema = ColumnSchema::new(
@@ -105,11 +100,11 @@ impl CreateLogicalTablesProcedure {
// If we added new columns, update the table info
if !new_columns.is_empty() {
let mut updated_columns = task.table_info.meta.schema.column_schemas.clone();
let mut updated_columns = column_schemas.to_vec();
updated_columns.extend(new_columns);
// Create new schema with updated columns
let new_schema = RawSchema::new(updated_columns);
let new_schema = Arc::new(Schema::new(updated_columns));
// Update the table info
task.table_info.meta.schema = new_schema;

View File

@@ -19,7 +19,7 @@ use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader, region
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::template::{
@@ -95,11 +95,11 @@ pub fn create_region_request_builder(
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}
/// Builds a [CreateRequestBuilder] from a [RawTableInfo].
/// Builds a [CreateRequestBuilder] from a [TableInfo].
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
raw_table_info: &TableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;

View File

@@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use table::table_name::TableName;
use table::table_reference::TableReference;
pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
@@ -103,7 +103,7 @@ impl CreateTableProcedure {
})
}
fn table_info(&self) -> &RawTableInfo {
fn table_info(&self) -> &TableInfo {
&self.data.task.table_info
}

View File

@@ -23,7 +23,7 @@ use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use table::table_name::TableName;
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
@@ -175,19 +175,19 @@ impl CreateTableExecutor {
&self,
table_metadata_manager: &TableMetadataManagerRef,
region_failure_detector_controller: &RegionFailureDetectorControllerRef,
mut raw_table_info: RawTableInfo,
mut table_info: TableInfo,
column_metadatas: &[ColumnMetadata],
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
if !column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, column_metadatas);
update_table_info_column_ids(&mut table_info, column_metadatas);
}
let detecting_regions =
convert_region_routes_to_detecting_regions(&table_route.region_routes);
let table_route = TableRouteValue::Physical(table_route);
table_metadata_manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.create_table_metadata(table_info, table_route, region_wal_options)
.await?;
region_failure_detector_controller
.register_failure_detectors(detecting_regions)

View File

@@ -22,25 +22,25 @@ use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use crate::error::{self, Result};
use crate::reconciliation::utils::build_column_metadata_from_table_info;
use crate::wal_provider::prepare_wal_options;
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
/// Constructs a [CreateRequest] based on the provided [TableInfo].
///
/// Note: This function is primarily intended for creating logical tables.
///
/// Logical table templates keep the original column order and primary key indices from
/// `RawTableInfo` (including internal columns when present), because these are used to
/// `TableInfo` (including internal columns when present), because these are used to
/// reconstruct the logical schema on the engine side.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result<CreateRequest> {
let primary_key_indices = &table_info.meta.primary_key_indices;
let column_defs = table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.enumerate()
.map(|(i, c)| {
@@ -56,12 +56,12 @@ pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Resu
})
.collect::<Result<Vec<_>>>()?;
let options = HashMap::from(&raw_table_info.meta.options);
let options = HashMap::from(&table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
engine: table_info.meta.engine.clone(),
column_defs,
primary_key: raw_table_info
primary_key: table_info
.meta
.primary_key_indices
.iter()
@@ -75,21 +75,21 @@ pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Resu
Ok(template)
}
/// Constructs a [CreateRequest] based on the provided [RawTableInfo] for physical table.
/// Constructs a [CreateRequest] based on the provided [TableInfo] for physical table.
///
/// Note: This function is primarily intended for creating physical table.
///
/// Physical table templates mark primary
/// keys by tag semantic type to match the physical storage layout.
pub fn build_template_from_raw_table_info_for_physical_table(
raw_table_info: &RawTableInfo,
table_info: &TableInfo,
) -> Result<CreateRequest> {
let name_to_ids = raw_table_info
let name_to_ids = table_info
.name_to_ids()
.context(error::MissingColumnIdsSnafu)?;
let column_metadatas = build_column_metadata_from_table_info(
&raw_table_info.meta.schema.column_schemas,
&raw_table_info.meta.primary_key_indices,
table_info.meta.schema.column_schemas(),
&table_info.meta.primary_key_indices,
&name_to_ids,
)?;
let primary_key_ids = column_metadatas
@@ -115,10 +115,10 @@ pub fn build_template_from_raw_table_info_for_physical_table(
})
.collect::<Result<Vec<_>>>()?;
let options = HashMap::from(&raw_table_info.meta.options);
let options = HashMap::from(&table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
engine: table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_ids,
path: String::new(),

View File

@@ -19,7 +19,7 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableType};
use table::metadata::{TableId, TableInfo, TableType};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
@@ -58,7 +58,7 @@ impl CreateViewProcedure {
Ok(CreateViewProcedure { context, data })
}
fn view_info(&self) -> &RawTableInfo {
fn view_info(&self) -> &TableInfo {
&self.data.task.view_info
}

View File

@@ -328,7 +328,7 @@ mod tests {
use api::v1::{ColumnDataType, SemanticType};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use table::table_name::TableName;
use super::*;
@@ -339,7 +339,7 @@ mod tests {
use crate::key::table_route::TableRouteValue;
use crate::test_util::{MockDatanodeManager, new_ddl_context};
fn test_create_raw_table_info(name: &str) -> RawTableInfo {
fn test_create_raw_table_info(name: &str) -> TableInfo {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()

View File

@@ -21,7 +21,7 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableType};
use table::metadata::{TableId, TableInfo, TableType};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
@@ -118,7 +118,7 @@ impl DropViewProcedure {
Ok(())
}
fn ensure_is_view(&self, table_info: &RawTableInfo) -> Result<()> {
fn ensure_is_view(&self, table_info: &TableInfo) -> Result<()> {
ensure!(
table_info.table_type == TableType::View,
error::InvalidViewInfoSnafu {

View File

@@ -33,7 +33,7 @@ use store_api::metric_engine_consts::{
METRIC_ENGINE_NAME,
};
use store_api::storage::consts::ReservedColumnId;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
@@ -46,7 +46,7 @@ use crate::rpc::ddl::CreateTableTask;
pub async fn create_physical_table_metadata(
ddl_context: &DdlContext,
table_info: RawTableInfo,
table_info: TableInfo,
table_route: TableRouteValue,
) {
ddl_context
@@ -240,12 +240,12 @@ pub fn test_column_metadatas(tag_fields: &[&str]) -> Vec<ColumnMetadata> {
}
/// Asserts the column names.
pub fn assert_column_name(table_info: &RawTableInfo, expected_column_names: &[&str]) {
pub fn assert_column_name(table_info: &TableInfo, expected_column_names: &[&str]) {
assert_eq!(
table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|c| c.name.clone())
.collect::<Vec<_>>(),
@@ -266,7 +266,7 @@ pub fn assert_column_name_and_id(column_metadatas: &[ColumnMetadata], expected:
}
/// Gets the raw table info.
pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> RawTableInfo {
pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> TableInfo {
ddl_context
.table_metadata_manager
.table_info_manager()

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::column_def::try_as_column_schema;
use api::v1::meta::Partition;
@@ -21,10 +22,10 @@ use chrono::DateTime;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, MITO2_ENGINE,
};
use datatypes::schema::RawSchema;
use datatypes::schema::Schema;
use derive_builder::Builder;
use store_api::storage::TableId;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
use table::requests::TableOptions;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
@@ -87,9 +88,9 @@ impl From<TestCreateTableExpr> for CreateTableExpr {
}
}
/// Builds [RawTableInfo] from [CreateTableExpr].
pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
RawTableInfo {
/// Builds [TableInfo] from [CreateTableExpr].
pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> TableInfo {
TableInfo {
ident: TableIdent {
table_id: expr
.table_id
@@ -102,19 +103,13 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
desc: Some(expr.desc.clone()),
catalog_name: expr.catalog_name.clone(),
schema_name: expr.schema_name.clone(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: expr
.column_defs
meta: TableMeta {
schema: Arc::new(Schema::new(
expr.column_defs
.iter()
.map(|column| try_as_column_schema(column).unwrap())
.collect(),
timestamp_index: expr
.column_defs
.iter()
.position(|column| column.semantic_type() == SemanticType::Timestamp),
version: 0,
},
)),
primary_key_indices: expr
.primary_keys
.iter()

View File

@@ -546,7 +546,7 @@ async fn test_on_part_duplicate_alter_request() {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|x| x.name.clone())
.collect::<Vec<_>>();
@@ -565,7 +565,7 @@ async fn test_on_part_duplicate_alter_request() {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|x| x.name.clone())
.collect::<Vec<_>>();

View File

@@ -506,7 +506,7 @@ async fn test_on_update_metadata_add_columns() {
.table_info;
assert_eq!(
table_info.meta.schema.column_schemas.len() as u32,
table_info.meta.schema.column_schemas().len() as u32,
table_info.meta.next_column_id
);
assert_column_name(&table_info, &["ts", "host", "cpu", "my_tag3"]);

View File

@@ -22,7 +22,7 @@ use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use table::metadata;
use table::metadata::{RawTableInfo, RawTableMeta, TableType};
use table::metadata::{TableInfo, TableMeta, TableType};
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::create_view::CreateViewProcedure;
@@ -74,7 +74,7 @@ pub(crate) fn test_create_view_task(name: &str) -> CreateViewTask {
definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
};
let view_info = RawTableInfo {
let view_info = TableInfo {
ident: metadata::TableIdent {
table_id: 0,
version: 0,
@@ -83,7 +83,7 @@ pub(crate) fn test_create_view_task(name: &str) -> CreateViewTask {
desc: None,
catalog_name: expr.catalog_name.clone(),
schema_name: expr.schema_name.clone(),
meta: RawTableMeta::default(),
meta: TableMeta::empty(),
table_type: TableType::View,
};

View File

@@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -226,7 +226,7 @@ impl TruncateTableData {
self.task.table_name()
}
fn table_info(&self) -> &RawTableInfo {
fn table_info(&self) -> &TableInfo {
&self.table_info_value.table_info
}

View File

@@ -13,38 +13,38 @@
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::SemanticType;
use common_telemetry::debug;
use common_telemetry::tracing::warn;
use datatypes::schema::Schema;
use store_api::metadata::ColumnMetadata;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
/// Generate the new physical table info.
pub(crate) fn build_new_physical_table_info(
mut raw_table_info: RawTableInfo,
mut table_info: TableInfo,
physical_columns: &[ColumnMetadata],
) -> RawTableInfo {
) -> TableInfo {
debug!(
"building new physical table info for table: {}, table_id: {}",
raw_table_info.name, raw_table_info.ident.table_id
table_info.name, table_info.ident.table_id
);
let existing_columns = raw_table_info
let existing_columns = table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|col| col.name.clone())
.collect::<HashSet<_>>();
let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
let value_indices = &mut raw_table_info.meta.value_indices;
let primary_key_indices = &mut table_info.meta.primary_key_indices;
let value_indices = &mut table_info.meta.value_indices;
value_indices.clear();
let time_index = &mut raw_table_info.meta.schema.timestamp_index;
let columns = &mut raw_table_info.meta.schema.column_schemas;
columns.clear();
let column_ids = &mut raw_table_info.meta.column_ids;
let column_ids = &mut table_info.meta.column_ids;
column_ids.clear();
let mut columns = Vec::with_capacity(physical_columns.len());
for (idx, col) in physical_columns.iter().enumerate() {
match col.semantic_type {
SemanticType::Tag => {
@@ -56,7 +56,6 @@ pub(crate) fn build_new_physical_table_info(
SemanticType::Field => value_indices.push(idx),
SemanticType::Timestamp => {
value_indices.push(idx);
*time_index = Some(idx);
}
}
@@ -64,11 +63,11 @@ pub(crate) fn build_new_physical_table_info(
column_ids.push(col.column_id);
}
if let Some(time_index) = *time_index {
raw_table_info.meta.schema.column_schemas[time_index].set_time_index();
}
raw_table_info
table_info.meta.schema = Arc::new(Schema::new_with_version(
columns,
table_info.meta.schema.version(),
));
table_info
}
/// Updates the column IDs in the table info based on the provided column metadata.
@@ -77,13 +76,13 @@ pub(crate) fn build_new_physical_table_info(
/// before updating the column ids. If the column metadata doesn't match the table schema,
/// the table info remains unchanged.
pub(crate) fn update_table_info_column_ids(
raw_table_info: &mut RawTableInfo,
table_info: &mut TableInfo,
column_metadatas: &[ColumnMetadata],
) {
let mut table_column_names = raw_table_info
let mut table_column_names = table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.map(|c| c.name.as_str())
.collect::<Vec<_>>();
@@ -98,7 +97,7 @@ pub(crate) fn update_table_info_column_ids(
if table_column_names != column_names {
warn!(
"Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
raw_table_info.name, raw_table_info.ident.table_id, table_column_names, column_names,
table_info.name, table_info.ident.table_id, table_column_names, column_names,
);
return;
}
@@ -108,7 +107,7 @@ pub(crate) fn update_table_info_column_ids(
.map(|c| (c.column_schema.name.clone(), c.column_id))
.collect::<HashMap<_, _>>();
let schema = &raw_table_info.meta.schema.column_schemas;
let schema = table_info.meta.schema.column_schemas();
let mut column_ids = Vec::with_capacity(schema.len());
for column_schema in schema {
if let Some(id) = name_to_id.get(&column_schema.name) {
@@ -116,5 +115,5 @@ pub(crate) fn update_table_info_column_ids(
}
}
raw_table_info.meta.column_ids = column_ids;
table_info.meta.column_ids = column_ids;
}

View File

@@ -15,7 +15,7 @@
use itertools::Itertools;
use snafu::OptionExt;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use table::table_reference::TableReference;
use crate::error::{Result, TableInfoNotFoundSnafu};
@@ -75,7 +75,7 @@ pub(crate) async fn all_logical_table_routes_have_same_physical_id(
/// Returns an error if any table info value fails to update.
pub(crate) async fn batch_update_table_info_values(
table_metadata_manager: &TableMetadataManager,
table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
) -> Result<()> {
let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {

View File

@@ -201,13 +201,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
ConvertRawTableInfo {
#[snafu(implicit)]
location: Location,
source: datatypes::Error,
},
#[snafu(display("Primary key '{key}' not found when creating region request"))]
PrimaryKeyNotFound {
key: String,
@@ -1113,7 +1106,6 @@ impl ErrorExt for Error {
| BuildTableMeta { .. }
| TableRouteNotFound { .. }
| TableRepartNotFound { .. }
| ConvertRawTableInfo { .. }
| RegionOperatingRace { .. }
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }

View File

@@ -138,7 +138,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use table::table_name::TableName;
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
@@ -674,7 +674,7 @@ impl TableMetadataManager {
///
pub async fn create_view_metadata(
&self,
view_info: RawTableInfo,
view_info: TableInfo,
raw_logical_plan: Vec<u8>,
table_names: HashSet<TableName>,
columns: Vec<String>,
@@ -747,7 +747,7 @@ impl TableMetadataManager {
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn create_table_metadata(
&self,
table_info: RawTableInfo,
table_info: TableInfo,
table_route_value: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
@@ -834,7 +834,7 @@ impl TableMetadataManager {
/// Creates metadata for multiple logical tables and return an error if different metadata exists.
pub async fn create_logical_tables_metadata(
&self,
tables_data: Vec<(RawTableInfo, TableRouteValue)>,
tables_data: Vec<(TableInfo, TableRouteValue)>,
) -> Result<()> {
let len = tables_data.len();
let mut txns = Vec::with_capacity(3 * len);
@@ -1118,7 +1118,7 @@ impl TableMetadataManager {
&self,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
region_distribution: Option<RegionDistribution>,
new_table_info: RawTableInfo,
new_table_info: TableInfo,
) -> Result<()> {
let table_id = current_table_info_value.table_info.ident.table_id;
let new_table_info_value = current_table_info_value.update(new_table_info);
@@ -1213,7 +1213,7 @@ impl TableMetadataManager {
pub async fn batch_update_table_info_values(
&self,
table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
) -> Result<()> {
let len = table_info_value_pairs.len();
let mut txns = Vec::with_capacity(len);
@@ -1477,7 +1477,7 @@ mod tests {
use common_wal::options::{KafkaWalOptions, WalOptions};
use futures::TryStreamExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableInfo};
use table::metadata::TableInfo;
use table::table_name::TableName;
use super::datanode_table::DatanodeTableKey;
@@ -1571,7 +1571,7 @@ mod tests {
async fn create_physical_table_metadata(
table_metadata_manager: &TableMetadataManager,
table_info: RawTableInfo,
table_info: TableInfo,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
@@ -1609,7 +1609,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let wal_provider = WalProvider::RaftEngine;
let regions: Vec<_> = (0..16).collect();
let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
@@ -1635,7 +1635,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let region_wal_options = create_mock_region_wal_options()
.into_iter()
.map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
@@ -1717,7 +1717,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
let table_route_value = TableRouteValue::physical(region_routes.clone());
@@ -1779,11 +1779,10 @@ mod tests {
let region_id = RegionId::new(table_id, regin_number);
let region_route = new_region_route(region_id.as_u64(), 2);
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
let table_info = test_utils::new_test_table_info_with_name(
table_id,
&format!("my_table_{}", table_id),
)
.into();
);
let table_route_value = TableRouteValue::physical(region_routes.clone());
tables_data.push((table_info, table_route_value));
@@ -1802,7 +1801,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let region_wal_options = create_mock_region_wal_options();
@@ -1908,7 +1907,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
@@ -1984,7 +1983,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
@@ -2069,7 +2068,7 @@ mod tests {
leader_down_since: None,
},
];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
@@ -2151,7 +2150,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.as_str();
let region_storage_path =
@@ -2277,7 +2276,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.as_str();
let region_storage_path =
@@ -2617,7 +2616,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let view_info: RawTableInfo = new_test_table_info().into();
let view_info = new_test_table_info();
let view_id = view_info.ident.table_id;

View File

@@ -65,14 +65,14 @@ impl SchemaMetadataManager {
schema_value: Option<crate::key::schema_name::SchemaNameValue>,
kv_backend: crate::kv_backend::KvBackendRef,
) {
use table::metadata::{RawTableInfo, TableType};
let value = crate::key::table_info::TableInfoValue::new(RawTableInfo {
use table::metadata::{TableInfo, TableType};
let value = crate::key::table_info::TableInfoValue::new(TableInfo {
ident: Default::default(),
name: table_name.to_string(),
desc: None,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
meta: Default::default(),
meta: table::metadata::TableMeta::empty(),
table_type: TableType::Base,
});
let table_info_manager = crate::key::table_info::TableInfoManager::new(kv_backend.clone());

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -82,19 +82,19 @@ impl MetadataKey<'_, TableInfoKey> for TableInfoKey {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TableInfoValue {
pub table_info: RawTableInfo,
pub table_info: TableInfo,
version: u64,
}
impl TableInfoValue {
pub fn new(table_info: RawTableInfo) -> Self {
pub fn new(table_info: TableInfo) -> Self {
Self {
table_info,
version: 0,
}
}
pub fn update(&self, new_table_info: RawTableInfo) -> Self {
pub fn update(&self, new_table_info: TableInfo) -> Self {
Self {
table_info: new_table_info,
version: self.version + 1,
@@ -103,7 +103,7 @@ impl TableInfoValue {
pub(crate) fn with_update<F>(&self, update: F) -> Self
where
F: FnOnce(&mut RawTableInfo),
F: FnOnce(&mut TableInfo),
{
let mut new_table_info = self.table_info.clone();
update(&mut new_table_info);
@@ -280,8 +280,8 @@ impl TableInfoManager {
mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use table::metadata::{RawTableMeta, TableIdent, TableType};
use datatypes::schema::{ColumnSchema, Schema};
use table::metadata::{TableIdent, TableMeta, TableType};
use super::*;
@@ -322,15 +322,15 @@ mod tests {
assert_eq!(value, deserialized);
}
fn new_table_info(table_id: TableId) -> RawTableInfo {
fn new_table_info(table_id: TableId) -> TableInfo {
let schema = Schema::new(vec![ColumnSchema::new(
"name",
ConcreteDataType::string_datatype(),
true,
)]);
let meta = RawTableMeta {
schema: RawSchema::from(&schema),
let meta = TableMeta {
schema: Arc::new(schema),
engine: "mito".to_string(),
created_on: chrono::DateTime::default(),
updated_on: chrono::DateTime::default(),
@@ -342,7 +342,7 @@ mod tests {
column_ids: vec![],
};
RawTableInfo {
TableInfo {
ident: TableIdent {
table_id,
version: 1,

View File

@@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use table::table_name::TableName;
use crate::cache_invalidator::CacheInvalidatorRef;
@@ -107,7 +107,7 @@ pub(crate) struct PersistentContext {
pub(crate) update_table_infos: Vec<(TableId, Vec<ColumnMetadata>)>,
// The table infos to be created.
// The value will be set in `ResolveTableMetadatas` state.
pub(crate) create_tables: Vec<(TableId, RawTableInfo)>,
pub(crate) create_tables: Vec<(TableId, TableInfo)>,
// Whether the procedure is a subprocedure.
pub(crate) is_subprocedure: bool,
}

View File

@@ -22,7 +22,7 @@ use common_telemetry::tracing_context::TracingContext;
use futures::future;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber, TableId};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path};
use crate::ddl::{CreateRequestBuilder, build_template_from_raw_table_info};
@@ -147,10 +147,10 @@ impl ReconcileRegions {
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
table_info: &TableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
let template = build_template_from_raw_table_info(table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -20,7 +20,7 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use store_api::metadata::ColumnMetadata;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -138,8 +138,8 @@ impl UpdateTableInfos {
fn build_new_table_info(
table_id: TableId,
column_metadatas: &[ColumnMetadata],
table_info: &RawTableInfo,
) -> Result<RawTableInfo> {
table_info: &TableInfo,
) -> Result<TableInfo> {
let table_ref = table_info.table_ref();
let table_meta = build_table_meta_from_column_metadatas(
table_id,

View File

@@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::TableId;
use table::metadata::RawTableMeta;
use table::metadata::TableMeta;
use table::table_name::TableName;
use tonic::async_trait;
@@ -78,11 +78,11 @@ impl ReconcileTableContext {
self.persistent_ctx.table_id
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
/// Builds a [`TableMeta`] from the provided [`ColumnMetadata`]s.
pub(crate) fn build_table_meta(
&self,
column_metadatas: &[ColumnMetadata],
) -> Result<RawTableMeta> {
) -> Result<TableMeta> {
// Safety: The table info value is set in `ReconciliationStart` state.
let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
let table_id = self.table_id();
@@ -145,7 +145,7 @@ impl PersistentContext {
#[derive(Default)]
pub(crate) struct VolatileContext {
pub(crate) table_meta: Option<RawTableMeta>,
pub(crate) table_meta: Option<TableMeta>,
pub(crate) metrics: ReconcileTableMetrics,
}

View File

@@ -118,7 +118,7 @@ impl State for ResolveColumnMetadata {
.name_to_ids()
.context(MissingColumnIdsSnafu)?;
let column_metadata = build_column_metadata_from_table_info(
&table_info_value.table_info.meta.schema.column_schemas,
table_info_value.table_info.meta.schema.column_schemas(),
&table_info_value.table_info.meta.primary_key_indices,
&name_to_ids,
)?;

View File

@@ -15,18 +15,19 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use std::ops::AddAssign;
use std::sync::Arc;
use std::time::Instant;
use api::v1::SemanticType;
use common_procedure::{Context as ProcedureContext, ProcedureId, watcher};
use common_telemetry::{error, warn};
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, Schema};
use futures::future::{join_all, try_join_all};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, RawTableMeta};
use table::metadata::{TableInfo, TableMeta};
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -272,7 +273,7 @@ pub(crate) fn check_column_metadata_invariants(
Ok(())
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
/// Builds a [`TableMeta`] from the provided [`ColumnMetadata`]s.
///
/// Returns an error if:
/// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided).
@@ -284,23 +285,24 @@ pub(crate) fn check_column_metadata_invariants(
pub(crate) fn build_table_meta_from_column_metadatas(
table_id: TableId,
table_ref: TableReference,
table_meta: &RawTableMeta,
table_meta: &TableMeta,
name_to_ids: Option<HashMap<String, u32>>,
column_metadata: &[ColumnMetadata],
) -> Result<RawTableMeta> {
) -> Result<TableMeta> {
let column_in_column_metadata = column_metadata
.iter()
.map(|c| (c.column_schema.name.as_str(), c))
.collect::<HashMap<_, _>>();
let column_schemas = table_meta.schema.column_schemas();
let primary_key_names = table_meta
.primary_key_indices
.iter()
.map(|i| table_meta.schema.column_schemas[*i].name.as_str())
.map(|i| column_schemas[*i].name.as_str())
.collect::<HashSet<_>>();
let partition_key_names = table_meta
.partition_key_indices
.iter()
.map(|i| table_meta.schema.column_schemas[*i].name.as_str())
.map(|i| column_schemas[*i].name.as_str())
.collect::<HashSet<_>>();
ensure!(
column_metadata
@@ -351,14 +353,12 @@ pub(crate) fn build_table_meta_from_column_metadatas(
let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
let value_indices = &mut new_raw_table_meta.value_indices;
let time_index = &mut new_raw_table_meta.schema.timestamp_index;
let columns = &mut new_raw_table_meta.schema.column_schemas;
let mut columns = Vec::with_capacity(column_metadata.len());
let column_ids = &mut new_raw_table_meta.column_ids;
let next_column_id = &mut new_raw_table_meta.next_column_id;
column_ids.clear();
value_indices.clear();
columns.clear();
primary_key_indices.clear();
partition_key_indices.clear();
@@ -375,7 +375,6 @@ pub(crate) fn build_table_meta_from_column_metadatas(
}
SemanticType::Timestamp => {
value_indices.push(idx);
*time_index = Some(idx);
}
}
@@ -391,10 +390,10 @@ pub(crate) fn build_table_meta_from_column_metadatas(
.unwrap_or(*next_column_id)
.max(*next_column_id);
if let Some(time_index) = *time_index {
new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
}
new_raw_table_meta.schema = Arc::new(Schema::new_with_version(
columns,
table_meta.schema.version(),
));
Ok(new_raw_table_meta)
}
@@ -403,10 +402,10 @@ pub(crate) fn build_table_meta_from_column_metadatas(
/// The logical table only support to add columns, so we can check the length of column metadatas
/// to determine whether the logical table info needs to be updated.
pub(crate) fn need_update_logical_table_info(
table_info: &RawTableInfo,
table_info: &TableInfo,
column_metadatas: &[ColumnMetadata],
) -> bool {
table_info.meta.schema.column_schemas.len() != column_metadatas.len()
table_info.meta.schema.column_schemas().len() != column_metadatas.len()
}
/// The result of waiting for inflight subprocedures.
@@ -959,7 +958,7 @@ mod tests {
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use table::metadata::{RawTableMeta, TableMetaBuilder};
use table::metadata::TableMetaBuilder;
use table::table_reference::TableReference;
use super::*;
@@ -1010,17 +1009,15 @@ mod tests {
]
}
fn new_test_raw_table_info() -> RawTableMeta {
fn new_test_raw_table_info() -> TableMeta {
let mut table_meta_builder = TableMetaBuilder::empty();
let table_meta = table_meta_builder
table_meta_builder
.schema(Arc::new(new_test_schema()))
.primary_key_indices(vec![0])
.partition_key_indices(vec![2])
.next_column_id(4)
.build()
.unwrap();
table_meta.into()
.unwrap()
}
#[test]
@@ -1081,7 +1078,7 @@ mod tests {
assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
assert_eq!(new_table_meta.partition_key_indices, vec![2]);
assert_eq!(new_table_meta.value_indices, vec![1, 2]);
assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
assert_eq!(new_table_meta.schema.timestamp_index(), Some(1));
assert_eq!(
new_table_meta.column_ids,
vec![0, 1, 2, ReservedColumnId::table_id()]

View File

@@ -48,7 +48,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{DefaultOnNull, serde_as};
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{TableId, TableInfo};
use table::requests::validate_database_option;
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -103,13 +103,13 @@ impl DdlTask {
pub fn new_create_table(
expr: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
table_info: TableInfo,
) -> Self {
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
}
/// Creates a [`DdlTask`] to create several logical tables.
pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
DdlTask::CreateLogicalTables(
table_data
.into_iter()
@@ -197,7 +197,7 @@ impl DdlTask {
}
/// Creates a [`DdlTask`] to create a view.
pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
DdlTask::CreateView(CreateViewTask {
create_view,
view_info,
@@ -415,7 +415,7 @@ impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
#[derive(Debug, PartialEq, Clone)]
pub struct CreateViewTask {
pub create_view: CreateViewExpr,
pub view_info: RawTableInfo,
pub view_info: TableInfo,
}
impl CreateViewTask {
@@ -648,7 +648,7 @@ impl From<DropTableTask> for PbDropTableTask {
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
pub partitions: Vec<Partition>,
pub table_info: RawTableInfo,
pub table_info: TableInfo,
}
impl TryFrom<PbCreateTableTask> for CreateTableTask {
@@ -683,7 +683,7 @@ impl CreateTableTask {
pub fn new(
expr: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
table_info: TableInfo,
) -> CreateTableTask {
CreateTableTask {
create_table: expr,
@@ -717,7 +717,7 @@ impl CreateTableTask {
self.table_info.ident.table_id = table_id;
}
/// Sort the columns in [CreateTableExpr] and [RawTableInfo].
/// Sort the columns in [CreateTableExpr] and [TableInfo].
///
/// This function won't do any check or verification. Caller should
/// ensure this task is valid.
@@ -1597,10 +1597,10 @@ mod tests {
use std::sync::Arc;
use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::ConcreteDataType;
use table::metadata::{RawTableInfo, RawTableMeta, TableType};
use table::metadata::{TableInfo, TableMeta, TableType};
use table::test_util::table_info::test_table_info;
use super::{AlterTableTask, CreateTableTask, *};
@@ -1609,11 +1609,7 @@ mod tests {
fn test_basic_ser_de_create_table_task() {
let schema = SchemaBuilder::default().build().unwrap();
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
let task = CreateTableTask::new(
CreateTableExpr::default(),
Vec::new(),
RawTableInfo::from(table_info),
);
let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
let output = serde_json::to_vec(&task).unwrap();
@@ -1636,32 +1632,28 @@ mod tests {
#[test]
fn test_sort_columns() {
// construct RawSchema
let raw_schema = RawSchema {
column_schemas: vec![
ColumnSchema::new(
"column3".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"column1".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"column2".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
],
timestamp_index: Some(1),
version: 0,
};
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
"column3".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"column1".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"column2".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
]));
// construct RawTableMeta
let raw_table_meta = RawTableMeta {
schema: raw_schema,
let meta = TableMeta {
schema,
primary_key_indices: vec![0],
value_indices: vec![2],
engine: METRIC_ENGINE_NAME.to_string(),
@@ -1673,10 +1665,10 @@ mod tests {
column_ids: Default::default(),
};
// construct RawTableInfo
let raw_table_info = RawTableInfo {
// construct TableInfo
let raw_table_info = TableInfo {
ident: Default::default(),
meta: raw_table_meta,
meta,
name: Default::default(),
desc: Default::default(),
catalog_name: Default::default(),
@@ -1729,7 +1721,7 @@ mod tests {
// Assert that the table_info is updated correctly
assert_eq!(
create_table_task.table_info.meta.schema.timestamp_index,
create_table_task.table_info.meta.schema.timestamp_index(),
Some(0)
);
assert_eq!(

View File

@@ -14,7 +14,6 @@
mod column_schema;
pub mod constraint;
mod raw;
use std::collections::HashMap;
use std::fmt;
@@ -22,6 +21,7 @@ use std::sync::Arc;
use arrow::datatypes::{Field, Schema as ArrowSchema};
use datafusion_common::DFSchemaRef;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ResultExt, ensure};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
@@ -40,7 +40,6 @@ pub use crate::schema::column_schema::{
VectorIndexOptions,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
/// Key used to store version number of the schema in metadata.
pub const VERSION_KEY: &str = "greptime:version";
@@ -48,15 +47,18 @@ pub const VERSION_KEY: &str = "greptime:version";
pub const TYPE_KEY: &str = "greptime:type";
/// A common schema, should be immutable.
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq, Serialize)]
pub struct Schema {
column_schemas: Vec<ColumnSchema>,
#[serde(skip_serializing)]
name_to_index: HashMap<String, usize>,
#[serde(skip_serializing)]
arrow_schema: Arc<ArrowSchema>,
/// Index of the timestamp key column.
///
/// Timestamp key column is the column holds the timestamp and forms part of
/// the primary key. None means there is no timestamp key column.
#[serde(skip_serializing)]
timestamp_index: Option<usize>,
/// Version of the schema.
///
@@ -64,6 +66,26 @@ pub struct Schema {
version: u32,
}
impl<'de> Deserialize<'de> for Schema {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
#[derive(Deserialize)]
struct RawSchema {
column_schemas: Vec<ColumnSchema>,
version: u32,
}
let raw = RawSchema::deserialize(deserializer)?;
SchemaBuilder::try_from(raw.column_schemas)
.and_then(|x| x.version(raw.version).build())
.map_err(D::Error::custom)
}
}
impl fmt::Debug for Schema {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Schema")
@@ -85,8 +107,13 @@ impl Schema {
/// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
// Builder won't fail in this case
Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
}
pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(version)
.build()
.unwrap()
}

View File

@@ -1,129 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use crate::error::{Error, Result};
use crate::schema::{ColumnSchema, Schema, SchemaBuilder};
/// Struct used to serialize and deserialize [`Schema`](crate::schema::Schema).
///
/// This struct only contains necessary data to recover the Schema.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RawSchema {
/// Schema of columns.
pub column_schemas: Vec<ColumnSchema>,
/// Index of the timestamp column.
pub timestamp_index: Option<usize>,
/// Schema version.
pub version: u32,
}
impl RawSchema {
/// Creates a new [RawSchema] from specific `column_schemas`.
///
/// Sets [RawSchema::timestamp_index] to the first index of the timestamp
/// column. It doesn't check whether time index column is duplicate.
pub fn new(column_schemas: Vec<ColumnSchema>) -> RawSchema {
let timestamp_index = column_schemas
.iter()
.position(|column_schema| column_schema.is_time_index());
RawSchema {
column_schemas,
timestamp_index,
version: 0,
}
}
}
impl TryFrom<RawSchema> for Schema {
type Error = Error;
fn try_from(raw: RawSchema) -> Result<Schema> {
// While building Schema, we don't trust the fields, such as timestamp_index,
// in RawSchema. We use SchemaBuilder to perform the validation.
SchemaBuilder::try_from(raw.column_schemas)?
.version(raw.version)
.build()
}
}
impl From<&Schema> for RawSchema {
fn from(schema: &Schema) -> RawSchema {
RawSchema {
column_schemas: schema.column_schemas.clone(),
timestamp_index: schema.timestamp_index,
version: schema.version,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data_type::ConcreteDataType;
#[test]
fn test_raw_convert() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();
let raw = RawSchema::from(&schema);
let schema_new = Schema::try_from(raw).unwrap();
assert_eq!(schema, schema_new);
}
#[test]
fn test_new_raw_schema_with_time_index() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
];
let schema = RawSchema::new(column_schemas);
assert_eq!(1, schema.timestamp_index.unwrap());
}
#[test]
fn test_new_raw_schema_without_time_index() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
];
let schema = RawSchema::new(column_schemas);
assert!(schema.timestamp_index.is_none());
}
}

View File

@@ -485,13 +485,13 @@ impl StreamingEngine {
.await?
.unwrap();
let meta = table_info.table_info.meta;
let schema = meta.schema.column_schemas().to_vec();
let primary_keys = meta
.primary_key_indices
.into_iter()
.map(|i| meta.schema.column_schemas[i].name.clone())
.map(|i| schema[i].name.clone())
.collect_vec();
let schema = meta.schema.column_schemas;
let time_index = meta.schema.timestamp_index;
let time_index = meta.schema.timestamp_index();
Ok(Some((primary_keys, time_index, schema)))
} else {
Ok(None)

View File

@@ -124,14 +124,14 @@ impl ManagedTableSource {
.context(UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table info", table_id),
})?;
let raw_schema = &info.table_info.meta.schema;
let Some(ts_index) = raw_schema.timestamp_index else {
let schema = &info.table_info.meta.schema;
let Some(ts_index) = schema.timestamp_index() else {
UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found timestamp index", table_id),
}
.fail()?
};
let col_schema = raw_schema.column_schemas[ts_index].clone();
let col_schema = schema.column_schemas()[ts_index].clone();
Ok((ts_index, col_schema))
}

View File

@@ -145,8 +145,8 @@ pub fn table_info_value_to_relation_desc(
) -> Result<TableDesc, Error> {
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
.clone()
.column_schemas()
.to_vec()
.into_iter()
.map(|col| {
(
@@ -162,7 +162,7 @@ pub fn table_info_value_to_relation_desc(
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![crate::repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
let time_index = raw_schema.timestamp_index();
let relation_desc = RelationDesc {
typ: RelationType {
column_types,
@@ -174,7 +174,7 @@ pub fn table_info_value_to_relation_desc(
names: col_names,
};
let default_values = raw_schema
.column_schemas
.column_schemas()
.iter()
.map(|c| {
c.default_constraint().cloned().or_else(|| {

View File

@@ -127,7 +127,7 @@ impl BatchingEngine {
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
let time_index_unit = schema.column_schemas()[schema.timestamp_index().unwrap()]
.data_type
.as_timestamp()
.unwrap()
@@ -576,7 +576,7 @@ impl BatchingEngine {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.filter(|col| col.data_type == ConcreteDataType::float64_datatype())
.collect::<Vec<_>>();
@@ -604,7 +604,7 @@ impl BatchingEngine {
.table_info
.meta
.schema
.column_schemas
.column_schemas()
.iter()
.enumerate()
{

View File

@@ -192,7 +192,7 @@ mod test {
let another_region_id = RegionId::new(table_id, region_number + 1);
let peer = Peer::empty(datanode_id);
let follower_peer = Peer::empty(datanode_id + 1);
let table_info = new_test_table_info(table_id).into();
let table_info = new_test_table_info(table_id);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
@@ -328,7 +328,7 @@ mod test {
let no_exist_region_id = RegionId::new(table_id, region_number + 2);
let peer = Peer::empty(datanode_id);
let follower_peer = Peer::empty(datanode_id + 1);
let table_info = new_test_table_info(table_id).into();
let table_info = new_test_table_info(table_id);
let region_routes = vec![
RegionRoute {

View File

@@ -1172,7 +1172,7 @@ mod tests {
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
@@ -1211,7 +1211,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
@@ -1299,7 +1299,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
@@ -1419,7 +1419,7 @@ mod tests {
let from_peer_id = persistent_context.from_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),

View File

@@ -401,7 +401,7 @@ mod tests {
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let region_id = ctx.persistent_ctx.region_ids[0];
let table_info = new_test_table_info(region_id.table_id()).into();
let table_info = new_test_table_info(region_id.table_id());
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),

View File

@@ -704,7 +704,7 @@ mod test {
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(3)),
@@ -732,7 +732,7 @@ mod test {
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(3)),
@@ -764,7 +764,7 @@ mod test {
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
@@ -798,7 +798,7 @@ mod test {
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(2)),
@@ -870,7 +870,7 @@ mod test {
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(2)),
@@ -903,7 +903,7 @@ mod test {
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(3)),
@@ -936,7 +936,7 @@ mod test {
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
@@ -980,7 +980,7 @@ mod test {
task.trigger_reason,
),
);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(1)),

View File

@@ -223,7 +223,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_route = RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(from_peer.clone()),
@@ -250,7 +250,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(to_peer),
@@ -277,7 +277,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(from_peer_id)),
@@ -302,7 +302,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes: Vec<RegionRoute> = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1024)),

View File

@@ -425,7 +425,7 @@ mod tests {
let mut env = TestingEnv::new();
// Prepares table
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(from_peer_id)),

View File

@@ -37,7 +37,7 @@ use common_procedure_test::MockContextProvider;
use common_telemetry::debug;
use futures::future::BoxFuture;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
@@ -169,7 +169,7 @@ impl TestingEnv {
// Creates a table metadata with the physical table route.
pub async fn create_physical_table_metadata(
&self,
table_info: RawTableInfo,
table_info: TableInfo,
region_routes: Vec<RegionRoute>,
) {
self.table_metadata_manager
@@ -289,7 +289,7 @@ impl ProcedureMigrationTestSuite {
/// Initializes table metadata.
pub(crate) async fn init_table_metadata(
&self,
table_info: RawTableInfo,
table_info: TableInfo,
region_routes: Vec<RegionRoute>,
) {
self.env

View File

@@ -142,7 +142,7 @@ mod tests {
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1024)),
@@ -185,7 +185,7 @@ mod tests {
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),

View File

@@ -120,7 +120,7 @@ mod tests {
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),

View File

@@ -262,7 +262,7 @@ mod tests {
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
@@ -295,7 +295,7 @@ mod tests {
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(3)),
@@ -330,7 +330,7 @@ mod tests {
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1)),
@@ -369,7 +369,7 @@ mod tests {
let leader_peer = persistent_context.from_peer.clone();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(leader_peer),
@@ -396,7 +396,7 @@ mod tests {
let candidate_peer = persistent_context.to_peer.clone();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(candidate_peer),
@@ -424,7 +424,7 @@ mod tests {
let candidate_peer = persistent_context.to_peer.clone();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024).into();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(candidate_peer),
@@ -454,7 +454,7 @@ mod tests {
let opening_keeper = MemoryRegionKeeper::default();
let table_id = 1024;
let table_info = new_test_table_info(table_id).into();
let table_info = new_test_table_info(table_id);
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),

View File

@@ -381,7 +381,7 @@ mod tests {
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let region_id = ctx.persistent_ctx.region_ids[0];
let table_info = new_test_table_info(region_id.table_id()).into();
let table_info = new_test_table_info(region_id.table_id());
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),

View File

@@ -28,7 +28,7 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionNumber, TableId};
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use table::table_reference::TableReference;
use tokio::time::Instant;
@@ -258,7 +258,7 @@ impl AllocateRegion {
async fn allocate_regions(
node_manager: &NodeManagerRef,
raw_table_info: &RawTableInfo,
raw_table_info: &TableInfo,
region_routes: &[RegionRoute],
wal_options: &HashMap<RegionNumber, String>,
) -> Result<()> {

View File

@@ -300,7 +300,7 @@ pub async fn new_wal_prune_metadata(
let region_ids = (0..n_region)
.map(|i| RegionId::new(table_id, i))
.collect::<Vec<_>>();
let table_info = new_test_table_info(table_id).into();
let table_info = new_test_table_info(table_id);
let region_routes = region_ids
.iter()
.map(|region_id| RegionRoute {

View File

@@ -308,8 +308,8 @@ pub mod test_data {
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_provider::WalProvider;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use datatypes::schema::{ColumnSchema, Schema};
use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
use table::requests::TableOptions;
use crate::cache_invalidator::MetasrvCacheInvalidator;
@@ -330,8 +330,8 @@ pub mod test_data {
]
}
pub fn new_table_info() -> RawTableInfo {
RawTableInfo {
pub fn new_table_info() -> TableInfo {
TableInfo {
ident: TableIdent {
table_id: 42,
version: 1,
@@ -340,33 +340,29 @@ pub mod test_data {
desc: Some("blabla".to_string()),
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![
ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(
"my_tag1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_tag2".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_field_column".to_string(),
ConcreteDataType::int32_datatype(),
true,
),
],
timestamp_index: Some(0),
version: 0,
},
meta: TableMeta {
schema: Arc::new(Schema::new(vec![
ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(
"my_tag1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_tag2".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_field_column".to_string(),
ConcreteDataType::int32_datatype(),
true,
),
])),
primary_key_indices: vec![1, 2],
value_indices: vec![2],
engine: MITO2_ENGINE.to_string(),

View File

@@ -265,7 +265,7 @@ mod tests {
use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use table::metadata::TableInfo;
use super::{RegionLeaseKeeper, renew_region_lease_via_region_route};
use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
@@ -343,7 +343,7 @@ mod tests {
#[tokio::test]
async fn test_collect_metadata() {
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id).into();
let table_info: TableInfo = new_test_table_info(table_id);
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
@@ -394,7 +394,7 @@ mod tests {
#[tokio::test]
async fn test_renew_region_leases_basic() {
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id).into();
let table_info: TableInfo = new_test_table_info(table_id);
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
@@ -501,7 +501,7 @@ mod tests {
#[tokio::test]
async fn test_renew_unexpected_logic_table() {
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id).into();
let table_info: TableInfo = new_test_table_info(table_id);
let region_id = RegionId::new(table_id, 1);
let keeper = new_test_keeper();
@@ -537,7 +537,7 @@ mod tests {
#[tokio::test]
async fn test_renew_region_leases_with_downgrade_leader() {
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id).into();
let table_info: TableInfo = new_test_table_info(table_id);
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;

View File

@@ -306,13 +306,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create table info"))]
CreateTableInfo {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Failed to build CreateExpr on insertion"))]
BuildCreateExprOnInsertion {
#[snafu(implicit)]
@@ -973,7 +966,6 @@ impl ErrorExt for Error {
}
Error::Table { source, .. } | Error::Insert { source, .. } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::CreateTableInfo { source, .. }
| Error::IntoVectors { source, .. } => source.status_code(),
Error::RequestInserts { source, .. } | Error::FindViewInfo { source, .. } => {
source.status_code()

View File

@@ -155,7 +155,8 @@ pub(crate) async fn create_external_expr(
let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
.await
.context(InferFileTableSchemaSnafu)?
.column_schemas;
.column_schemas()
.to_vec();
let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
// expanded form

View File

@@ -54,7 +54,7 @@ use common_time::{Timestamp, Timezone};
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::value::Value;
use datatypes::vectors::{StringVector, VectorRef};
use humantime::parse_duration;
@@ -83,7 +83,7 @@ use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::TableRef;
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
use table::requests::{
AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
};
@@ -92,7 +92,7 @@ use table::table_reference::TableReference;
use crate::error::{
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
@@ -385,8 +385,7 @@ impl StatementExecutor {
table_info.ident.table_id = table_id;
let table_info: Arc<TableInfo> =
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
let table_info = Arc::new(table_info);
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = DistTable::table(table_info);
@@ -921,7 +920,7 @@ impl StatementExecutor {
let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
let mut view_info = RawTableInfo {
let mut view_info = TableInfo {
ident: metadata::TableIdent {
// The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
table_id: 0,
@@ -932,7 +931,7 @@ impl StatementExecutor {
catalog_name: expr.catalog_name.clone(),
schema_name: expr.schema_name.clone(),
// The meta doesn't make sense for views, so using a default one.
meta: RawTableMeta::default(),
meta: TableMeta::empty(),
table_type: TableType::View,
};
@@ -961,7 +960,7 @@ impl StatementExecutor {
view_info.ident.table_id = view_id;
let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
let view_info = Arc::new(view_info);
let table = DistTable::table(view_info);
@@ -1737,7 +1736,7 @@ impl StatementExecutor {
&self,
create_table: CreateTableExpr,
partitions: Vec<PartitionExpr>,
table_info: RawTableInfo,
table_info: TableInfo,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions
@@ -1758,7 +1757,7 @@ impl StatementExecutor {
async fn create_logical_tables_procedure(
&self,
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
tables_data: Vec<(CreateTableExpr, TableInfo)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest::new(
@@ -2057,7 +2056,7 @@ pub fn verify_alter(
pub fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
) -> Result<RawTableInfo> {
) -> Result<TableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
@@ -2072,15 +2071,8 @@ pub fn create_table_info(
let _ = column_name_to_index_map.insert(column.name.clone(), idx);
}
let timestamp_index = column_name_to_index_map
.get(&create_table.time_index)
.cloned();
let raw_schema = RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index,
version: 0,
};
let next_column_id = column_schemas.len() as u32;
let schema = Arc::new(Schema::new(column_schemas));
let primary_key_indices = create_table
.primary_keys
@@ -2106,12 +2098,12 @@ pub fn create_table_info(
let table_options = TableOptions::try_from_iter(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let meta = RawTableMeta {
schema: raw_schema,
let meta = TableMeta {
schema,
primary_key_indices,
value_indices: vec![],
engine: create_table.engine.clone(),
next_column_id: column_schemas.len() as u32,
next_column_id,
options: table_options,
created_on: Utc::now(),
updated_on: Utc::now(),
@@ -2125,7 +2117,7 @@ pub fn create_table_info(
Some(create_table.desc.clone())
};
let table_info = RawTableInfo {
let table_info = TableInfo {
ident: metadata::TableIdent {
// The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
table_id: 0,

View File

@@ -133,8 +133,7 @@ impl StatementExecutor {
.await
.context(TableMetadataManagerSnafu)?
{
let mut latest_info = TableInfo::try_from(latest.into_inner().table_info)
.context(error::CreateTableInfoSnafu)?;
let mut latest_info = latest.into_inner().table_info;
if !partition_column_names.is_empty() {
latest_info.meta.partition_key_indices = partition_column_names

View File

@@ -124,7 +124,7 @@ pub(crate) async fn create_partition_rule_manager(
let region_wal_options = new_test_region_wal_options(regions.clone());
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", regions.clone().into_iter()).into(),
new_test_table_info(1, "table_1", regions.clone().into_iter()),
TableRouteValue::physical(vec![
RegionRoute {
region: Region {

View File

@@ -44,7 +44,7 @@ use datafusion::common::ScalarValue;
use datafusion::prelude::SessionContext;
use datafusion_expr::{Expr, SortExpr, case, col, lit};
use datatypes::prelude::*;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
use datatypes::vectors::StringVector;
use itertools::Itertools;
use object_store::ObjectStore;
@@ -1203,14 +1203,12 @@ pub async fn infer_file_table_schema(
object_store: &ObjectStore,
files: &[String],
options: &HashMap<String, String>,
) -> Result<RawSchema> {
) -> Result<Schema> {
let format = parse_file_table_format(options)?;
let merged = infer_schemas(object_store, files, format.as_ref())
.await
.context(error::InferSchemaSnafu)?;
Ok(RawSchema::from(
&Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
))
Schema::try_from(merged).context(error::ConvertSchemaSnafu)
}
// Converts the file column schemas to table column schemas.

View File

@@ -22,8 +22,7 @@ use common_query::AddColumnLocation;
use datafusion_expr::TableProviderFilterPushDown;
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{
ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
SkippingIndexOptions,
ColumnSchema, FulltextOptions, Schema, SchemaBuilder, SchemaRef, SkippingIndexOptions,
};
use derive_builder::Builder;
use serde::{Deserialize, Deserializer, Serialize};
@@ -124,7 +123,7 @@ pub struct TableIdent {
/// The table metadata.
///
/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder, Serialize)]
#[builder(pattern = "mutable", custom_constructor)]
pub struct TableMeta {
pub schema: SchemaRef,
@@ -149,6 +148,72 @@ pub struct TableMeta {
pub column_ids: Vec<ColumnId>,
}
impl TableMeta {
pub fn empty() -> Self {
Self {
schema: Arc::new(Schema::new(vec![])),
primary_key_indices: vec![],
value_indices: vec![],
engine: "".to_string(),
next_column_id: 0,
options: TableOptions::default(),
created_on: Utc::now(),
updated_on: Utc::now(),
partition_key_indices: vec![],
column_ids: vec![],
}
}
}
impl<'de> Deserialize<'de> for TableMeta {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct RawTableMeta {
schema: SchemaRef,
primary_key_indices: Vec<usize>,
value_indices: Vec<usize>,
engine: String,
next_column_id: ColumnId,
options: TableOptions,
created_on: DateTime<Utc>,
updated_on: Option<DateTime<Utc>>,
#[serde(default)]
partition_key_indices: Vec<usize>,
#[serde(default)]
column_ids: Vec<ColumnId>,
}
let RawTableMeta {
schema,
primary_key_indices,
value_indices,
engine,
next_column_id,
options,
created_on,
updated_on,
partition_key_indices,
column_ids,
} = RawTableMeta::deserialize(deserializer)?;
Ok(Self {
schema,
primary_key_indices,
value_indices,
engine,
next_column_id,
options,
created_on,
updated_on: updated_on.unwrap_or(created_on),
partition_key_indices,
column_ids,
})
}
}
impl TableMetaBuilder {
/// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
#[cfg(any(test, feature = "testing"))]
@@ -1062,7 +1127,7 @@ impl TableMeta {
}
}
#[derive(Clone, Debug, PartialEq, Eq, Builder)]
#[derive(Clone, Debug, PartialEq, Eq, Builder, Serialize, Deserialize)]
#[builder(pattern = "owned")]
pub struct TableInfo {
/// Id and version of the table.
@@ -1154,126 +1219,13 @@ impl From<TableId> for TableIdent {
}
}
/// Struct used to serialize and deserialize [`TableMeta`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
pub struct RawTableMeta {
pub schema: RawSchema,
/// The indices of columns in primary key. Note that the index of timestamp column
/// is not included. Order matters to this array.
pub primary_key_indices: Vec<usize>,
/// The indices of columns in value. The index of timestamp column is included.
/// Order doesn't matter to this array.
pub value_indices: Vec<usize>,
/// Engine type of this table. Usually in small case.
pub engine: String,
/// Next column id of a new column.
/// It's used to ensure all columns with the same name across all regions have the same column id.
pub next_column_id: ColumnId,
pub options: TableOptions,
pub created_on: DateTime<Utc>,
pub updated_on: DateTime<Utc>,
/// Order doesn't matter to this array.
#[serde(default)]
pub partition_key_indices: Vec<usize>,
/// Map of column name to column id.
/// Note: This field may be empty for older versions that did not include this field.
#[serde(default)]
pub column_ids: Vec<ColumnId>,
}
impl<'de> Deserialize<'de> for RawTableMeta {
fn deserialize<D>(
deserializer: D,
) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct Helper {
schema: RawSchema,
primary_key_indices: Vec<usize>,
value_indices: Vec<usize>,
engine: String,
next_column_id: u32,
options: TableOptions,
created_on: DateTime<Utc>,
updated_on: Option<DateTime<Utc>>,
#[serde(default)]
partition_key_indices: Vec<usize>,
#[serde(default)]
column_ids: Vec<ColumnId>,
}
let h = Helper::deserialize(deserializer)?;
Ok(RawTableMeta {
schema: h.schema,
primary_key_indices: h.primary_key_indices,
value_indices: h.value_indices,
engine: h.engine,
next_column_id: h.next_column_id,
options: h.options,
created_on: h.created_on,
updated_on: h.updated_on.unwrap_or(h.created_on),
partition_key_indices: h.partition_key_indices,
column_ids: h.column_ids,
})
}
}
impl From<TableMeta> for RawTableMeta {
fn from(meta: TableMeta) -> RawTableMeta {
RawTableMeta {
schema: RawSchema::from(&*meta.schema),
primary_key_indices: meta.primary_key_indices,
value_indices: meta.value_indices,
engine: meta.engine,
next_column_id: meta.next_column_id,
options: meta.options,
created_on: meta.created_on,
updated_on: meta.updated_on,
partition_key_indices: meta.partition_key_indices,
column_ids: meta.column_ids,
}
}
}
impl TryFrom<RawTableMeta> for TableMeta {
type Error = ConvertError;
fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
Ok(TableMeta {
schema: Arc::new(Schema::try_from(raw.schema)?),
primary_key_indices: raw.primary_key_indices,
value_indices: raw.value_indices,
engine: raw.engine,
next_column_id: raw.next_column_id,
options: raw.options,
created_on: raw.created_on,
updated_on: raw.updated_on,
partition_key_indices: raw.partition_key_indices,
column_ids: raw.column_ids,
})
}
}
/// Struct used to serialize and deserialize [`TableInfo`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct RawTableInfo {
pub ident: TableIdent,
pub name: String,
pub desc: Option<String>,
pub catalog_name: String,
pub schema_name: String,
pub meta: RawTableMeta,
pub table_type: TableType,
}
impl RawTableInfo {
impl TableInfo {
/// Returns the map of column name to column id.
///
/// Note: This method may return an empty map for older versions that did not include this field.
pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
let column_schemas = self.meta.schema.column_schemas();
if self.meta.column_ids.len() != column_schemas.len() {
None
} else {
Some(
@@ -1281,15 +1233,15 @@ impl RawTableInfo {
.column_ids
.iter()
.enumerate()
.map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
.map(|(index, id)| (column_schemas[index].name.clone(), *id))
.collect(),
)
}
}
/// Sort the columns in [RawTableInfo], logical tables require it.
/// Sort the columns in [TableInfo], logical tables require it.
pub fn sort_columns(&mut self) {
let column_schemas = &self.meta.schema.column_schemas;
let column_schemas = self.meta.schema.column_schemas();
let primary_keys = self
.meta
.primary_key_indices
@@ -1298,23 +1250,16 @@ impl RawTableInfo {
.collect::<HashSet<_>>();
let name_to_ids = self.name_to_ids().unwrap_or_default();
self.meta
.schema
.column_schemas
.sort_unstable_by(|a, b| a.name.cmp(&b.name));
let mut column_schemas = column_schemas.to_vec();
column_schemas.sort_unstable_by(|a, b| a.name.cmp(&b.name));
// Compute new indices of sorted columns
let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
let mut timestamp_index = None;
let mut value_indices =
Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
let mut value_indices = Vec::with_capacity(column_schemas.len() - primary_keys.len());
let mut column_ids = Vec::with_capacity(column_schemas.len());
for (index, column_schema) in column_schemas.iter().enumerate() {
if primary_keys.contains(&column_schema.name) {
primary_key_indices.push(index);
} else if column_schema.is_time_index() {
value_indices.push(index);
timestamp_index = Some(index);
} else {
value_indices.push(index);
}
@@ -1324,7 +1269,10 @@ impl RawTableInfo {
}
// Overwrite table meta
self.meta.schema.timestamp_index = timestamp_index;
self.meta.schema = Arc::new(Schema::new_with_version(
column_schemas,
self.meta.schema.version(),
));
self.meta.primary_key_indices = primary_key_indices;
self.meta.value_indices = value_indices;
self.meta.column_ids = column_ids;
@@ -1347,36 +1295,6 @@ impl RawTableInfo {
}
}
impl From<TableInfo> for RawTableInfo {
fn from(info: TableInfo) -> RawTableInfo {
RawTableInfo {
ident: info.ident,
name: info.name,
desc: info.desc,
catalog_name: info.catalog_name,
schema_name: info.schema_name,
meta: RawTableMeta::from(info.meta),
table_type: info.table_type,
}
}
}
impl TryFrom<RawTableInfo> for TableInfo {
type Error = ConvertError;
fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
Ok(TableInfo {
ident: raw.ident,
name: raw.name,
desc: raw.desc,
catalog_name: raw.catalog_name,
schema_name: raw.schema_name,
meta: TableMeta::try_from(raw.meta)?,
table_type: raw.table_type,
})
}
}
/// Set column fulltext options if it passed the validation.
///
/// Options allowed to modify:
@@ -1491,30 +1409,6 @@ mod tests {
.unwrap()
}
#[test]
fn test_raw_convert() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let info = TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap();
let raw = RawTableInfo::from(info.clone());
let info_new = TableInfo::try_from(raw).unwrap();
assert_eq!(info, info_new);
}
fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
@@ -2121,4 +2015,72 @@ mod tests {
let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
assert!(!fulltext_options.enable);
}
#[test]
fn test_table_info_serde_compatibility() {
// "serialized" is generated by the following codes before this refactor (PR 7626):
//
// ```Rust
// serde_json::to_string(&RawTableInfo::from(TableInfo {
// ident: TableIdent {
// table_id: 1024,
// version: 1,
// },
// name: "foo".to_string(),
// desc: Some("my table".to_string()),
// catalog_name: "greptime".to_string(),
// schema_name: "public".to_string(),
// meta: TableMeta {
// schema: Arc::new(new_test_schema()),
// primary_key_indices: vec![0],
// value_indices: vec![1, 2],
// engine: "mito".to_string(),
// next_column_id: 3,
// options: TableOptions {
// ttl: Some(common_time::TimeToLive::Duration(
// std::time::Duration::from_secs(3600),
// )),
// ..Default::default()
// },
// created_on: DateTime::<Utc>::MIN_UTC,
// updated_on: DateTime::<Utc>::MAX_UTC,
// partition_key_indices: vec![2],
// column_ids: vec![0, 1, 2],
// },
// table_type: TableType::Base,
// }))
// ```
let serialized = r#"{"ident":{"table_id":1024,"version":1},"name":"foo","desc":"my table","catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"col1","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"col2","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":1,"version":123},"primary_key_indices":[0],"value_indices":[1,2],"engine":"mito","next_column_id":3,"options":{"write_buffer_size":null,"ttl":"1h","skip_wal":false,"extra_options":{}},"created_on":"-262143-01-01T00:00:00Z","updated_on":"+262142-12-31T23:59:59.999999999Z","partition_key_indices":[2],"column_ids":[0,1,2]},"table_type":"Base"}"#;
let actual: TableInfo = serde_json::from_str(serialized).unwrap();
let expected = TableInfo {
ident: TableIdent {
table_id: 1024,
version: 1,
},
name: "foo".to_string(),
desc: Some("my table".to_string()),
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
meta: TableMeta {
schema: Arc::new(new_test_schema()),
primary_key_indices: vec![0],
value_indices: vec![1, 2],
engine: "mito".to_string(),
next_column_id: 3,
options: TableOptions {
ttl: Some(common_time::TimeToLive::Duration(
std::time::Duration::from_secs(3600),
)),
..Default::default()
},
created_on: DateTime::<Utc>::MIN_UTC,
updated_on: DateTime::<Utc>::MAX_UTC,
partition_key_indices: vec![2],
column_ids: vec![0, 1, 2],
},
table_type: TableType::Base,
};
assert_eq!(actual, expected);
}
}

View File

@@ -128,12 +128,13 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
+++++++++++++++++++++++++
-- SQLNESS REPLACE (\s\d+\s) ID
-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) DATETIME
SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW';
+---------------+--------------+------------+------------+----------+-------------+-----------------+--------------+------------------+----------------+--------+---------+------------+------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------------+-----------+
| table_catalog | table_schema | table_name | table_type | table_id | data_length | max_data_length | index_length | max_index_length | avg_row_length | engine | version | row_format | table_rows | data_free | auto_increment | create_time | update_time | check_time | table_collation | checksum | create_options | table_comment | temporary |
+---------------+--------------+------------+------------+----------+-------------+-----------------+--------------+------------------+----------------+--------+---------+------------+------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------------+-----------+
| greptime | public | test_view | VIEW |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | 1970-01-01T00:00:00 | 1970-01-01T00:00:00 | | utf8_bin |ID | | | N |
| greptime | public | test_view | VIEW |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID |DATETIME |DATETIME | | utf8_bin |ID | | | N |
+---------------+--------------+------------+------------+----------+-------------+-----------------+--------------+------------------+----------------+--------+---------+------------+------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------------+-----------+
SHOW COLUMNS FROM test_view;

View File

@@ -47,6 +47,7 @@ ORDER BY 1,2;
SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
-- SQLNESS REPLACE (\s\d+\s) ID
-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) DATETIME
SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW';
SHOW COLUMNS FROM test_view;