feat: update physical table's schema on creating logical table (#3570)

* feat: update physical table's schema on creating logical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove debug code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak ut const

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* invalid physical table cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-03-25 11:19:30 +08:00
committed by GitHub
parent 2ad0b24efa
commit 992c7ec71b
9 changed files with 179 additions and 22 deletions

View File

@@ -12,30 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use api::v1::{CreateTableExpr, SemanticType};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use futures_util::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use crate::cache_invalidator::Context;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::error::{
DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu,
TableInfoNotFoundSnafu,
};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
@@ -169,11 +179,12 @@ impl CreateLogicalTablesProcedure {
self.create_regions(region_routes).await
}
/// Creates table metadata
/// Creates table metadata for logical tables and update corresponding physical
/// table's metadata.
///
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&self) -> Result<Status> {
pub async fn on_create_metadata(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.creator.data.physical_table_id();
let remaining_tasks = self.creator.data.remaining_tasks();
@@ -208,6 +219,42 @@ impl CreateLogicalTablesProcedure {
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
if !self.creator.data.physical_columns.is_empty() {
// fetch old physical table's info
let physical_table_info = self
.context
.table_metadata_manager
.get_full_table_info(self.creator.data.physical_table_id)
.await?
.0
.context(TableInfoNotFoundSnafu {
table_name: format!("table id - {}", self.creator.data.physical_table_id),
})?;
// generate new table info
let new_table_info = self
.creator
.data
.build_new_physical_table_info(&physical_table_info);
// update physical table's metadata
self.context
.table_metadata_manager
.update_table_info(physical_table_info, new_table_info)
.await?;
// invalid table cache
self.context
.cache_invalidator
.invalidate(
&Context::default(),
vec![CacheIdent::TableId(self.creator.data.physical_table_id)],
)
.await?;
} else {
warn!("No physical columns found, leaving the physical table's schema unchanged");
}
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");
Ok(Status::done_with_output(table_ids))
@@ -275,11 +322,39 @@ impl CreateLogicalTablesProcedure {
});
}
join_all(create_region_tasks)
// collect response from datanodes
let raw_schemas = join_all(create_region_tasks)
.await
.into_iter()
.map(|response| {
response.map(|mut response| response.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))
})
.collect::<Result<Vec<_>>>()?;
if raw_schemas.is_empty() {
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}
// verify all datanodes return the same raw schemas
// Safety: previous check ensures this vector is not empty.
let first = raw_schemas.first().unwrap();
ensure!(
raw_schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "Raw schemas from datanodes are not the same"
}
);
// decode raw schemas and store it
if let Some(raw_schema) = first {
let physical_columns =
ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?;
self.creator.data.physical_columns = physical_columns;
} else {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}
self.creator.data.state = CreateTablesState::CreateMetadata;
// Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage.
@@ -357,6 +432,7 @@ impl TablesCreator {
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
},
}
}
@@ -370,6 +446,7 @@ pub struct CreateTablesData {
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
physical_columns: Vec<ColumnMetadata>,
}
impl CreateTablesData {
@@ -420,6 +497,47 @@ impl CreateTablesData {
})
.collect::<Vec<_>>()
}
/// Generate the new physical table info.
///
/// This method will consumes the physical columns.
fn build_new_physical_table_info(
&mut self,
old_table_info: &DeserializedValueWithBytes<TableInfoValue>,
) -> RawTableInfo {
let mut raw_table_info = old_table_info.deref().table_info.clone();
let existing_primary_key = raw_table_info
.meta
.schema
.column_schemas
.iter()
.map(|col| col.name.clone())
.collect::<HashSet<_>>();
let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
let value_indices = &mut raw_table_info.meta.value_indices;
value_indices.clear();
let time_index = &mut raw_table_info.meta.schema.timestamp_index;
let columns = &mut raw_table_info.meta.schema.column_schemas;
columns.clear();
for (idx, col) in self.physical_columns.drain(..).enumerate() {
match col.semantic_type {
SemanticType::Tag => {
// push new primary key to the end.
if !existing_primary_key.contains(&col.column_schema.name) {
primary_key_indices.push(idx);
}
}
SemanticType::Field => value_indices.push(idx),
SemanticType::Timestamp => *time_index = Some(idx),
}
columns.push(col.column_schema);
}
raw_table_info
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]

View File

@@ -270,8 +270,10 @@ impl RegionServerHandler for RegionServer {
// merge results by sum up affected rows and merge extensions.
let mut affected_rows = 0;
let mut extension = HashMap::new();
for result in results {
affected_rows += result.affected_rows;
extension.extend(result.extension);
}
Ok(RegionResponse {
@@ -282,7 +284,7 @@ impl RegionServerHandler for RegionServer {
}),
}),
affected_rows: affected_rows as _,
extension: Default::default(),
extension,
})
}
}

View File

@@ -252,7 +252,7 @@ async fn test_on_datanode_create_logical_regions() {
let region_routes = test_data::new_region_routes();
let datanode_manager = new_datanode_manager(&region_server, &region_routes).await;
let physical_table_route = TableRouteValue::physical(region_routes);
let physical_table_id = 111;
let physical_table_id = 1;
let task1 = create_table_task(Some("my_table1"));
let task2 = create_table_task(Some("my_table2"));

View File

@@ -148,7 +148,6 @@ impl RegionEngine for MetricEngine {
RegionRequest::Catchup(_) => Ok(0),
};
// TODO: pass extension
result
.map_err(BoxedError::new)
.map(|rows| RegionHandleResult {

View File

@@ -19,19 +19,28 @@ CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) en
Affected Rows: 0
SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name;
SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name;
Error: 3000(PlanQuery), Failed to plan SQL: No field named metric. Valid fields are information_schema.tables.table_catalog, information_schema.tables.table_schema, information_schema.tables.table_name, information_schema.tables.table_type, information_schema.tables.table_id, information_schema.tables.engine.
+---------------+--------------+------------+------------+--------+
| table_catalog | table_schema | table_name | table_type | engine |
+---------------+--------------+------------+------------+--------+
| greptime | public | phy | BASE TABLE | metric |
| greptime | public | t1 | BASE TABLE | metric |
| greptime | public | t2 | BASE TABLE | metric |
+---------------+--------------+------------+------------+--------+
-- We currently don't maintains physical table's schema.
DESC TABLE phy;
+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
+------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | | NO | | FIELD |
| val | Float64 | | YES | | FIELD |
| __table_id | UInt32 | PRI | NO | | TAG |
| __tsid | UInt64 | PRI | NO | | TAG |
| host | String | PRI | YES | | TAG |
| job | String | PRI | YES | | TAG |
+------------+----------------------+-----+------+---------+---------------+
DESC TABLE t1;

View File

@@ -6,9 +6,8 @@ CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) e
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name;
SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name;
-- We currently don't maintains physical table's schema.
DESC TABLE phy;
DESC TABLE t1;

View File

@@ -49,6 +49,30 @@ DROP TABLE t2;
Affected Rows: 0
DESC TABLE phy;
+------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | | NO | | FIELD |
| val | Float64 | | YES | | FIELD |
| __table_id | UInt32 | PRI | NO | | TAG |
| __tsid | UInt64 | PRI | NO | | TAG |
| host | String | PRI | YES | | TAG |
| job | String | PRI | YES | | TAG |
+------------+----------------------+-----+------+---------+---------------+
SELECT ts, val, __tsid, host, job FROM phy;
+-------------------------+-----+----------------------+-------+------+
| ts | val | __tsid | host | job |
+-------------------------+-----+----------------------+-------+------+
| 1970-01-01T00:00:00.001 | 1.0 | 1128149335081630826 | host2 | |
| 1970-01-01T00:00:00 | 0.0 | 18067404594631612786 | host1 | |
| 1970-01-01T00:00:00.001 | 1.0 | 2176048834144407834 | | job2 |
| 1970-01-01T00:00:00 | 0.0 | 15980333303142110493 | | job1 |
+-------------------------+-----+----------------------+-------+------+
DROP TABLE phy;
Affected Rows: 0

View File

@@ -18,4 +18,8 @@ DROP TABLE t1;
DROP TABLE t2;
DESC TABLE phy;
SELECT ts, val, __tsid, host, job FROM phy;
DROP TABLE phy;

View File

@@ -199,6 +199,8 @@ impl Env {
};
let log_file_name = self.data_home.join(log_file_name).display().to_string();
println!("{subcommand} log file at {log_file_name}");
let log_file = OpenOptions::new()
.create(true)
.write(true)