feat: introduce reconcile logical tables procedure (#6588)

* feat: introduce reconcile logical tables procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: lock logical tables

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-31 19:48:06 +08:00
committed by WenyXu
parent d3a1c80fbd
commit 6fdc0b99b3
12 changed files with 1021 additions and 5 deletions

View File

@@ -44,6 +44,7 @@ pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub(crate) use create_table_template::{build_template_from_raw_table_info, CreateRequestBuilder};
pub mod create_view;
pub mod drop_database;
pub mod drop_flow;

View File

@@ -14,16 +14,57 @@
use std::collections::HashMap;
use api::v1::column_def::try_as_column_def;
use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use snafu::OptionExt;
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::wal_options_allocator::prepare_wal_options;
/// Builds a [CreateRequest] from a [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
pub(crate) fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
.map(|(i, c)| {
let is_primary_key = primary_key_indices.contains(&i);
let column_def = try_as_column_def(c, is_primary_key)
.context(error::ConvertColumnDefSnafu { column: &c.name })?;
Ok(RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
})
})
.collect::<Result<Vec<_>>>()?;
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: METRIC_ENGINE_NAME.to_string(),
column_defs,
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
path: String::new(),
options,
};
Ok(template)
}
pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
let column_defs = create_table_expr
.column_defs

View File

@@ -20,4 +20,7 @@ pub(crate) mod reconcile_database;
pub(crate) mod reconcile_table;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_logical_tables;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod utils;

View File

@@ -0,0 +1,241 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod reconcile_regions;
pub(crate) mod reconciliation_end;
pub(crate) mod reconciliation_start;
pub(crate) mod resolve_table_metadatas;
pub(crate) mod update_table_infos;
use std::any::Any;
use std::fmt::Debug;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::table_name::TableName;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::Result;
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart;
use crate::reconciliation::utils::Context;
pub struct ReconcileLogicalTablesContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub persistent_ctx: PersistentContext,
}
impl ReconcileLogicalTablesContext {
/// Creates a new [`ReconcileLogicalTablesContext`] with the given [`Context`] and [`PersistentContext`].
pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
Self {
node_manager: ctx.node_manager,
table_metadata_manager: ctx.table_metadata_manager,
cache_invalidator: ctx.cache_invalidator,
persistent_ctx,
}
}
pub(crate) fn table_name(&self) -> &TableName {
&self.persistent_ctx.table_name
}
pub(crate) fn table_id(&self) -> TableId {
self.persistent_ctx.table_id
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PersistentContext {
pub(crate) table_id: TableId,
pub(crate) table_name: TableName,
// The logical tables need to be reconciled.
// The logical tables belongs to the physical table.
pub(crate) logical_tables: Vec<TableName>,
// The logical table ids.
// The value will be set in `ReconciliationStart` state.
pub(crate) logical_table_ids: Vec<TableId>,
/// The table info value.
/// The value will be set in `ReconciliationStart` state.
pub(crate) table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
// The physical table route.
// The value will be set in `ReconciliationStart` state.
pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
// The table infos to be updated.
// The value will be set in `ResolveTableMetadatas` state.
pub(crate) update_table_infos: Vec<(TableId, Vec<ColumnMetadata>)>,
// The table infos to be created.
// The value will be set in `ResolveTableMetadatas` state.
pub(crate) create_tables: Vec<(TableId, RawTableInfo)>,
// Whether the procedure is a subprocedure.
pub(crate) is_subprocedure: bool,
}
impl PersistentContext {
pub(crate) fn new(
table_id: TableId,
table_name: TableName,
logical_tables: Vec<(TableId, TableName)>,
is_subprocedure: bool,
) -> Self {
let (logical_table_ids, logical_tables) = logical_tables.into_iter().unzip();
Self {
table_id,
table_name,
logical_tables,
logical_table_ids,
table_info_value: None,
physical_table_route: None,
update_table_infos: vec![],
create_tables: vec![],
is_subprocedure,
}
}
}
pub struct ReconcileLogicalTablesProcedure {
pub context: ReconcileLogicalTablesContext,
state: Box<dyn State>,
}
#[derive(Debug, Serialize)]
struct ProcedureData<'a> {
state: &'a dyn State,
persistent_ctx: &'a PersistentContext,
}
#[derive(Debug, Deserialize)]
struct ProcedureDataOwned {
state: Box<dyn State>,
persistent_ctx: PersistentContext,
}
impl ReconcileLogicalTablesProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileLogicalTables";
pub fn new(
ctx: Context,
table_id: TableId,
table_name: TableName,
logical_tables: Vec<(TableId, TableName)>,
is_subprocedure: bool,
) -> Self {
let persistent_ctx =
PersistentContext::new(table_id, table_name, logical_tables, is_subprocedure);
let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx);
let state = Box::new(ReconciliationStart);
Self { context, state }
}
pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
let ProcedureDataOwned {
state,
persistent_ctx,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx);
Ok(Self { context, state })
}
}
#[async_trait]
impl Procedure for ReconcileLogicalTablesProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
Err(e) => {
if e.is_retry_later() {
Err(ProcedureError::retry_later(e))
} else {
Err(ProcedureError::external(e))
}
}
}
}
fn dump(&self) -> ProcedureResult<String> {
let data = ProcedureData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.context.table_name().table_ref();
let mut table_ids = self
.context
.persistent_ctx
.logical_table_ids
.iter()
.map(|t| TableLock::Write(*t).into())
.collect::<Vec<_>>();
table_ids.sort_unstable();
table_ids.push(TableLock::Read(self.context.table_id()).into());
if self.context.persistent_ctx.is_subprocedure {
// The catalog and schema are already locked by the parent procedure.
// Only lock the table name.
return LockKey::new(table_ids);
}
let mut keys = vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
];
keys.extend(table_ids);
LockKey::new(keys)
}
}
#[async_trait::async_trait]
#[typetag::serde(tag = "reconcile_logical_tables_state")]
pub(crate) trait State: Sync + Send + Debug {
fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -0,0 +1,146 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path};
use crate::ddl::{build_template_from_raw_table_info, CreateRequestBuilder};
use crate::error::Result;
use crate::reconciliation::reconcile_logical_tables::update_table_infos::UpdateTableInfos;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::rpc::router::{find_leaders, region_distribution};
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconcileRegions;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileRegions {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
if ctx.persistent_ctx.create_tables.is_empty() {
return Ok((Box::new(UpdateTableInfos), Status::executing(false)));
}
// Safety: previous steps ensure the physical table route is set.
let region_routes = &ctx
.persistent_ctx
.physical_table_route
.as_ref()
.unwrap()
.region_routes;
let region_distribution = region_distribution(region_routes);
let leaders = find_leaders(region_routes)
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
let mut create_table_tasks = Vec::with_capacity(leaders.len());
for (datanode_id, region_role_set) in region_distribution {
if region_role_set.leader_regions.is_empty() {
continue;
}
// Safety: It contains all leaders in the region routes.
let peer = leaders.get(&datanode_id).unwrap().clone();
let request = self.make_request(&region_role_set.leader_regions, ctx)?;
let requester = ctx.node_manager.datanode(&peer).await;
create_table_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
future::join_all(create_table_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
let table_id = ctx.table_id();
let table_name = ctx.table_name();
info!(
"Reconciled regions for logical tables: {:?}, physical table: {}, table_id: {}",
ctx.persistent_ctx
.create_tables
.iter()
.map(|(table_id, _)| table_id)
.collect::<Vec<_>>(),
table_id,
table_name
);
ctx.persistent_ctx.create_tables.clear();
return Ok((Box::new(UpdateTableInfos), Status::executing(true)));
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ReconcileRegions {
fn make_request(
&self,
region_numbers: &[u32],
ctx: &ReconcileLogicalTablesContext,
) -> Result<RegionRequest> {
let physical_table_id = ctx.table_id();
let table_name = ctx.table_name();
let create_tables = &ctx.persistent_ctx.create_tables;
let mut requests = Vec::with_capacity(region_numbers.len() * create_tables.len());
for (table_id, table_info) in create_tables {
let request_builder =
create_region_request_from_raw_table_info(table_info, physical_table_id)?;
let storage_path =
region_storage_path(&table_name.catalog_name, &table_name.schema_name);
for region_number in region_numbers {
let region_id = RegionId::new(*table_id, *region_number);
let one_region_request =
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
requests.push(one_region_request);
}
}
Ok(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Creates(CreateRequests { requests })),
})
}
}
/// Creates a region request builder from a raw table info.
fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconciliationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconciliationEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(ReconciliationEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,171 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id;
use crate::error::{self, Result};
use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::utils::check_column_metadatas_consistent;
/// The start state of the reconciliation procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconciliationStart;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconciliationStart {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let (physical_table_id, physical_table_route) = ctx
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
ensure!(
physical_table_id == table_id,
error::UnexpectedSnafu {
err_msg: format!(
"Expected physical table: {}, but it's a logical table of table: {}",
table_name, physical_table_id
),
}
);
info!(
"Starting reconciliation for logical table: table_id: {}, table_name: {}",
table_id, table_name
);
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
let region_metadatas = region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?;
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_name, table_id
),
}
);
if region_metadatas.iter().any(|r| r.is_none()) {
return error::UnexpectedSnafu {
err_msg: format!(
"Some regions of the physical table are not open. Table: {}, table_id: {}",
table_name, table_id
),
}
.fail();
}
// Safety: checked above
let region_metadatas = region_metadatas
.into_iter()
.map(|r| r.unwrap())
.collect::<Vec<_>>();
let _region_metadata = check_column_metadatas_consistent(&region_metadatas).context(
error::UnexpectedSnafu {
err_msg: format!(
"Column metadatas are not consistent for table: {}, table_id: {}",
table_name, table_id
),
},
)?;
// TODO(weny): ensure all columns in region metadata can be found in table info.
// Validates the logical tables.
Self::validate_schema(&ctx.persistent_ctx.logical_tables)?;
let table_refs = ctx
.persistent_ctx
.logical_tables
.iter()
.map(|t| t.table_ref())
.collect::<Vec<_>>();
let table_ids = get_all_table_ids_by_names(
ctx.table_metadata_manager.table_name_manager(),
&table_refs,
)
.await?;
Self::validate_logical_table_routes(ctx, &table_ids).await?;
ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
ctx.persistent_ctx.logical_table_ids = table_ids;
Ok((Box::new(ResolveTableMetadatas), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ReconciliationStart {
/// Validates all the logical tables have the same catalog and schema.
fn validate_schema(logical_tables: &[TableName]) -> Result<()> {
let is_same_schema = logical_tables.windows(2).all(|pair| {
pair[0].catalog_name == pair[1].catalog_name
&& pair[0].schema_name == pair[1].schema_name
});
ensure!(
is_same_schema,
error::UnexpectedSnafu {
err_msg: "The logical tables have different schemas",
}
);
Ok(())
}
async fn validate_logical_table_routes(
ctx: &mut ReconcileLogicalTablesContext,
table_ids: &[TableId],
) -> Result<()> {
let all_logical_table_routes_have_same_physical_id =
all_logical_table_routes_have_same_physical_id(
ctx.table_metadata_manager.table_route_manager(),
table_ids,
ctx.table_id(),
)
.await?;
ensure!(
all_logical_table_routes_have_same_physical_id,
error::UnexpectedSnafu {
err_msg: "All the logical tables should have the same physical table id",
}
);
Ok(())
}
}

View File

@@ -0,0 +1,130 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
use crate::error::{self, Result};
use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::utils::{
check_column_metadatas_consistent, need_update_logical_table_info,
};
#[derive(Debug, Serialize, Deserialize)]
pub struct ResolveTableMetadatas;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ResolveTableMetadatas {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_names = ctx
.persistent_ctx
.logical_tables
.iter()
.map(|t| t.table_ref())
.collect::<Vec<_>>();
let table_ids = &ctx.persistent_ctx.logical_table_ids;
let mut create_tables = vec![];
let mut update_table_infos = vec![];
let table_info_values = get_all_table_info_values_by_table_ids(
ctx.table_metadata_manager.table_info_manager(),
table_ids,
&table_names,
)
.await?;
// Safety: The physical table route is set in `ReconciliationStart` state.
let region_routes = &ctx
.persistent_ctx
.physical_table_route
.as_ref()
.unwrap()
.region_routes;
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) {
let region_metadatas = region_metadata_lister
.list(*table_id, region_routes)
.await?;
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_info_value.table_info.name, table_id
),
}
);
if region_metadatas.iter().any(|r| r.is_none()) {
create_tables.push((*table_id, table_info_value.table_info.clone()));
continue;
}
// Safety: The physical table route is set in `ReconciliationStart` state.
let region_metadatas = region_metadatas
.into_iter()
.map(|r| r.unwrap())
.collect::<Vec<_>>();
if let Some(column_metadatas) = check_column_metadatas_consistent(&region_metadatas) {
if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) {
update_table_infos.push((*table_id, column_metadatas));
}
} else {
// If the logical regions have inconsistent column metadatas, it won't affect read and write.
// It's safe to continue if the column metadatas of the logical table are inconsistent.
warn!(
"Found inconsistent column metadatas for table: {}, table_id: {}. Remaining the inconsistent column metadatas",
table_info_value.table_info.name, table_id
);
}
}
let table_id = ctx.table_id();
let table_name = ctx.table_name();
info!(
"Resolving table metadatas for physical table: {}, table_id: {}, updating table infos: {:?}, creating tables: {:?}",
table_name,
table_id,
update_table_infos
.iter()
.map(|(table_id, _)| *table_id)
.collect::<Vec<_>>(),
create_tables
.iter()
.map(|(table_id, _)| *table_id)
.collect::<Vec<_>>(),
);
ctx.persistent_ctx.update_table_infos = update_table_infos;
ctx.persistent_ctx.create_tables = create_tables;
Ok((Box::new(ReconcileRegions), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,178 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use store_api::metadata::ColumnMetadata;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::cache_invalidator::Context as CacheContext;
use crate::ddl::utils::table_info::{
batch_update_table_info_values, get_all_table_info_values_by_table_ids,
};
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::reconciliation::reconcile_logical_tables::reconciliation_end::ReconciliationEnd;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::utils::build_table_meta_from_column_metadatas;
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateTableInfos;
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdateTableInfos {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
if ctx.persistent_ctx.update_table_infos.is_empty() {
return Ok((Box::new(ReconciliationEnd), Status::executing(false)));
}
let all_table_names = ctx
.persistent_ctx
.logical_table_ids
.iter()
.cloned()
.zip(
ctx.persistent_ctx
.logical_tables
.iter()
.map(|t| t.table_ref()),
)
.collect::<HashMap<_, _>>();
let table_ids = ctx
.persistent_ctx
.update_table_infos
.iter()
.map(|(table_id, _)| *table_id)
.collect::<Vec<_>>();
let table_names = table_ids
.iter()
.map(|table_id| *all_table_names.get(table_id).unwrap())
.collect::<Vec<_>>();
let table_info_values = get_all_table_info_values_by_table_ids(
ctx.table_metadata_manager.table_info_manager(),
&table_ids,
&table_names,
)
.await?;
let mut table_info_values_to_update =
Vec::with_capacity(ctx.persistent_ctx.update_table_infos.len());
for ((table_id, column_metadatas), table_info_value) in ctx
.persistent_ctx
.update_table_infos
.iter()
.zip(table_info_values.into_iter())
{
let new_table_info = Self::build_new_table_info(
*table_id,
column_metadatas,
&table_info_value.table_info,
)?;
table_info_values_to_update.push((table_info_value, new_table_info));
}
let table_id = ctx.table_id();
let table_name = ctx.table_name();
batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update)
.await?;
info!(
"Updated table infos for logical tables: {:?}, physical table: {}, table_id: {}",
ctx.persistent_ctx
.update_table_infos
.iter()
.map(|(table_id, _)| table_id)
.collect::<Vec<_>>(),
table_id,
table_name,
);
let cache_ctx = CacheContext {
subject: Some(format!(
"Invalidate table by reconcile logical tables, physical_table_id: {}",
table_id
)),
};
let idents = Self::build_cache_ident_keys(table_id, table_name, &table_ids, &table_names);
ctx.cache_invalidator
.invalidate(&cache_ctx, &idents)
.await?;
ctx.persistent_ctx.update_table_infos.clear();
Ok((Box::new(ReconciliationEnd), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl UpdateTableInfos {
fn build_new_table_info(
table_id: TableId,
column_metadatas: &[ColumnMetadata],
table_info: &RawTableInfo,
) -> Result<RawTableInfo> {
let table_ref = table_info.table_ref();
let table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_info.meta,
None,
column_metadatas,
)?;
let mut new_table_info = table_info.clone();
new_table_info.meta = table_meta;
new_table_info.ident.version = table_info.ident.version + 1;
new_table_info.sort_columns();
Ok(new_table_info)
}
fn build_cache_ident_keys(
physical_table_id: TableId,
physical_table_name: &TableName,
table_ids: &[TableId],
table_names: &[TableReference],
) -> Vec<CacheIdent> {
let mut cache_keys = Vec::with_capacity(table_ids.len() * 2 + 2);
cache_keys.push(CacheIdent::TableId(physical_table_id));
cache_keys.push(CacheIdent::TableName(physical_table_name.clone()));
cache_keys.extend(
table_ids
.iter()
.map(|table_id| CacheIdent::TableId(*table_id)),
);
cache_keys.extend(
table_names
.iter()
.map(|table_ref| CacheIdent::TableName((*table_ref).into())),
);
cache_keys
}
}

View File

@@ -21,7 +21,7 @@ use datatypes::schema::ColumnSchema;
use snafu::{ensure, OptionExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableMeta;
use table::metadata::{RawTableInfo, RawTableMeta};
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -433,6 +433,62 @@ pub(crate) async fn validate_table_id_and_name(
Ok(())
}
/// Checks whether the column metadata invariants hold for the logical table.
///
/// Invariants:
/// - Primary key (Tag) columns must exist in the new metadata.
/// - Timestamp column must remain exactly the same in name and ID.
///
/// TODO(weny): add tests
pub(crate) fn check_column_metadatas_invariants_for_logical_table(
column_metadatas: &[ColumnMetadata],
table_info: &RawTableInfo,
) -> bool {
let new_primary_keys = column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| c.column_schema.name.as_str())
.collect::<HashSet<_>>();
let old_primary_keys = table_info
.meta
.primary_key_indices
.iter()
.map(|i| table_info.meta.schema.column_schemas[*i].name.as_str());
for name in old_primary_keys {
if !new_primary_keys.contains(name) {
return false;
}
}
let old_timestamp_column_name = table_info
.meta
.schema
.column_schemas
.iter()
.find(|c| c.is_time_index())
.map(|c| c.name.as_str());
let new_timestamp_column_name = column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| c.column_schema.name.as_str());
old_timestamp_column_name != new_timestamp_column_name
}
/// Returns true if the logical table info needs to be updated.
///
/// The logical table only support to add columns, so we can check the length of column metadatas
/// to determine whether the logical table info needs to be updated.
pub(crate) fn need_update_logical_table_info(
table_info: &RawTableInfo,
column_metadatas: &[ColumnMetadata],
) -> bool {
table_info.meta.schema.column_schemas.len() != column_metadatas.len()
}
#[derive(Clone)]
pub struct Context {
pub node_manager: NodeManagerRef,

View File

@@ -165,7 +165,7 @@ const MAX_REQUEST_SIZE: usize = 1024 * 1024;
/// Returns true if the key is an internal key.
fn is_internal_key(kv: &FileKeyValue) -> bool {
kv.key == ELECTION_KEY.as_bytes() || kv.key == CANDIDATES_ROOT.as_bytes()
kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes())
}
impl MetadataSnapshotManager {

View File

@@ -57,6 +57,15 @@ impl TryFrom<ColumnDefaultConstraint> for Vec<u8> {
}
}
impl TryFrom<&ColumnDefaultConstraint> for Vec<u8> {
type Error = error::Error;
fn try_from(value: &ColumnDefaultConstraint) -> std::result::Result<Self, Self::Error> {
let s = serde_json::to_string(value).context(error::SerializeSnafu)?;
Ok(s.into_bytes())
}
}
impl Display for ColumnDefaultConstraint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {