feat: cli tool to repair partition columns mismatch

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-01-22 15:54:40 +08:00
parent 38e4a94956
commit 622e39267d
8 changed files with 329 additions and 16 deletions

1
Cargo.lock generated
View File

@@ -2016,6 +2016,7 @@ dependencies = [
"nu-ansi-term",
"object-store",
"operator",
"partition",
"paste",
"query",
"rand 0.9.1",

View File

@@ -52,6 +52,7 @@ nu-ansi-term = "0.46"
object-store.workspace = true
operator.workspace = true
paste.workspace = true
partition.workspace = true
query.workspace = true
rand.workspace = true
reqwest.workspace = true

View File

@@ -22,7 +22,7 @@ use common_error::ext::BoxedError;
use crate::Tool;
use crate::metadata::control::{DelCommand, GetCommand};
use crate::metadata::repair::RepairLogicalTablesCommand;
use crate::metadata::repair::RepairCommand;
use crate::metadata::snapshot::SnapshotCommand;
/// Command for managing metadata operations,
@@ -36,14 +36,15 @@ pub enum MetadataCommand {
Get(GetCommand),
#[clap(subcommand)]
Del(DelCommand),
RepairLogicalTables(RepairLogicalTablesCommand),
#[clap(subcommand)]
Repair(RepairCommand),
}
impl MetadataCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetadataCommand::Snapshot(cmd) => cmd.build().await,
MetadataCommand::RepairLogicalTables(cmd) => cmd.build().await,
MetadataCommand::Repair(cmd) => cmd.build().await,
MetadataCommand::Get(cmd) => cmd.build().await,
MetadataCommand::Del(cmd) => cmd.build().await,
}

View File

@@ -14,12 +14,13 @@
mod alter_table;
mod create_table;
mod partition_column;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use clap::{Parser, Subcommand};
use client::api::v1::CreateTableExpr;
use client::client_manager::NodeClients;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -34,6 +35,7 @@ use common_meta::peer::Peer;
use common_meta::rpc::router::{RegionRoute, find_leaders};
use common_telemetry::{error, info, warn};
use futures::TryStreamExt;
use partition_column::RepairPartitionColumnCommand;
use snafu::{ResultExt, ensure};
use store_api::storage::TableId;
@@ -44,6 +46,21 @@ use crate::error::{
};
use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
#[derive(Subcommand)]
pub enum RepairCommand {
LogicalTables(RepairLogicalTablesCommand),
PartitionColumn(RepairPartitionColumnCommand),
}
impl RepairCommand {
pub(super) async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
Self::LogicalTables(x) => x.build().await,
Self::PartitionColumn(x) => x.build().await.map(|x| Box::new(x) as _),
}
}
}
/// Repair metadata of logical tables.
#[derive(Debug, Default, Parser)]
pub struct RepairLogicalTablesCommand {

View File

@@ -0,0 +1,298 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use common_meta::rpc::store::{PutRequest, RangeRequest};
use common_telemetry::{info, warn};
use futures::StreamExt;
use partition::expr::PartitionExpr;
use store_api::storage::TableId;
use table::metadata::TableType;
use crate::{StoreConfig, Tool};
#[derive(Parser)]
pub struct RepairPartitionColumnCommand {
#[clap(flatten)]
store_config: StoreConfig,
/// Whether to actually do the update in the underlying metadata store, or not.
#[clap(long)]
dry_run: bool,
/// The maximum count of update times.
#[clap(long)]
update_limit: Option<u32>,
}
impl RepairPartitionColumnCommand {
pub(super) async fn build(&self) -> Result<RepairPartitionColumnTool, BoxedError> {
let kv_backend = self.store_config.build().await?;
Ok(RepairPartitionColumnTool {
kv_backend,
dry_run: self.dry_run,
update_limit: self.update_limit,
})
}
}
pub(crate) struct RepairPartitionColumnTool {
kv_backend: KvBackendRef,
dry_run: bool,
update_limit: Option<u32>,
}
impl RepairPartitionColumnTool {
async fn do_repair(
&self,
table_infos: HashMap<TableId, TableInfoValue>,
table_routes: HashMap<TableId, TableRouteValue>,
) -> Result<(), BoxedError> {
let mut update_count = 0;
for (table_id, table_info_value) in &table_infos {
let table_meta = &table_info_value.table_info.meta;
let mut partition_columns = Vec::with_capacity(table_meta.partition_key_indices.len());
for i in &table_meta.partition_key_indices {
if let Some(x) = table_meta.schema.column_schemas.get(*i) {
partition_columns.push(&x.name);
} else {
warn!("Partition column not found by index: {i}");
}
}
let Some(TableRouteValue::Physical(table_route)) = table_routes.get(table_id) else {
continue;
};
let mut partition_expr_columns = HashSet::new();
for region_route in &table_route.region_routes {
let Ok(Some(partition_expr)) =
PartitionExpr::from_json_str(&region_route.region.partition_expr())
else {
continue;
};
partition_expr.collect_column_names(&mut partition_expr_columns);
}
let mut partition_expr_columns = partition_expr_columns.iter().collect::<Vec<_>>();
partition_expr_columns.sort();
partition_columns.sort();
if partition_expr_columns != partition_columns {
warn!(
"Columns in partition exprs: {:?} do not match partition columns: {:?} in table {}",
partition_expr_columns,
partition_columns,
table_info_value.table_name(),
);
if let Some(update_limit) = self.update_limit
&& update_count >= update_limit
{
warn!("Reached update limit: {update_limit}, return");
return Ok(());
}
self.update_partition_columns(partition_expr_columns, table_info_value)
.await?;
update_count += 1;
}
}
Ok(())
}
async fn update_partition_columns(
&self,
partition_expr_columns: Vec<&String>,
table_info_value: &TableInfoValue,
) -> Result<(), BoxedError> {
let column_schemas = &table_info_value.table_info.meta.schema.column_schemas;
let partition_column_indices = partition_expr_columns
.iter()
.flat_map(|column| {
column_schemas
.iter()
.enumerate()
.find_map(|(i, x)| (&x.name == *column).then_some(i))
})
.collect::<Vec<_>>();
info!(
"Updating partition columns to {:?} (by column indices: {:?}) in table '{}'",
partition_expr_columns,
partition_column_indices,
table_info_value.table_name(),
);
if self.dry_run {
info!("Dry run enabled, do nothing");
return Ok(());
}
let mut new_table_info = table_info_value.table_info.clone();
new_table_info.meta.partition_key_indices = partition_column_indices;
let table_info = table_info_value.update(new_table_info);
let request = PutRequest::new()
.with_key(TableInfoKey::new(table_info.table_info.ident.table_id).to_bytes())
.with_value(table_info.try_as_raw_value().map_err(BoxedError::new)?);
let _ = self
.kv_backend
.put(request)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
#[async_trait]
impl Tool for RepairPartitionColumnTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let key_values = PaginationStream::new(
self.kv_backend.clone(),
RangeRequest::new().with_range(vec![0], vec![0]),
DEFAULT_PAGE_SIZE,
Ok,
)
.into_stream();
let mut key_values = Box::pin(key_values);
let mut table_infos = HashMap::new();
let mut table_routes = HashMap::new();
while let Some(result) = key_values.next().await {
match result {
Ok(kv) => {
if let Ok(key) = TableInfoKey::from_bytes(kv.key()) {
let Ok(value) = TableInfoValue::try_from_raw_value(&kv.value) else {
warn!("Skip corrupted key: {key}");
continue;
};
if value.table_info.table_type == TableType::Base {
table_infos.insert(value.table_info.ident.table_id, value);
}
} else if let Ok(key) = TableRouteKey::from_bytes(kv.key()) {
let Ok(value) = TableRouteValue::try_from_raw_value(&kv.value) else {
warn!("Skip corrupted key: {key}");
continue;
};
if value.is_physical() {
table_routes.insert(key.table_id, value);
}
}
}
Err(e) => {
warn!(e; "Failed to get next key")
}
}
}
self.do_repair(table_infos, table_routes).await
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_meta::kv_backend::KvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use super::*;
#[tokio::test]
async fn test_repair_partition_column() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_info_key = TableInfoKey::new(1282).to_bytes();
let table_info_value = r#"{"table_info":{"ident":{"table_id":1282,"version":2},"name":"foo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"c0","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c1","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c2","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c3","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c4","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c5","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c6","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c7","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c8","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c9","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c10","data_type":{"Timestamp":{"Nanosecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":10,"version":2},"primary_key_indices":[4,7],"value_indices":[0,1,2,3,5,6,8,9,10],"engine":"mito","next_column_id":11,"region_numbers":[0,1,2],"options":{"write_buffer_size":null,"ttl":"14days","skip_wal":false,"extra_options":{"append_mode":"true"}},"created_on":"2025-09-25T01:39:28.702584510Z","partition_key_indices":[3]},"table_type":"Base"},"version":2}"#;
kv_backend
.put(
PutRequest::new()
.with_key(table_info_key.clone())
.with_value(table_info_value),
)
.await
.unwrap();
let table_route_key = TableRouteKey::new(1282).to_bytes();
let table_route_value = r#"{"type":"physical","region_routes":[{"region":{"id":5506148073472,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"Lt\",\"rhs\":{\"Value\":{\"Int32\":1}}}}"]},"attrs":{}},"leader_peer":{"id":12,"addr":"192.168.1.1:3001"},"follower_peers":[],"leader_down_since":null},{"region":{"id":5506148073473,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":1}}}},\"op\":\"And\",\"rhs\":{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"Lt\",\"rhs\":{\"Value\":{\"Int32\":2}}}}}}"]},"attrs":{}},"leader_peer":{"id":13,"addr":"192.168.1.2:3001"},"follower_peers":[],"leader_down_since":null},{"region":{"id":5506148073474,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":2}}}}"]},"attrs":{}},"leader_peer":{"id":10,"addr":"192.168.1.3:3001"},"follower_peers":[],"leader_down_since":null}],"version":0}"#;
kv_backend
.put(
PutRequest::new()
.with_key(table_route_key)
.with_value(table_route_value),
)
.await
.unwrap();
let tool = RepairPartitionColumnTool {
kv_backend: kv_backend.clone(),
dry_run: true,
update_limit: None,
};
tool.do_work().await.unwrap();
let actual = String::from_utf8(
kv_backend
.get(&table_info_key)
.await
.unwrap()
.unwrap()
.value,
)
.unwrap();
assert_eq!(actual, table_info_value);
let tool = RepairPartitionColumnTool {
kv_backend: kv_backend.clone(),
dry_run: false,
update_limit: Some(0),
};
tool.do_work().await.unwrap();
let actual = String::from_utf8(
kv_backend
.get(&table_info_key)
.await
.unwrap()
.unwrap()
.value,
)
.unwrap();
assert_eq!(actual, table_info_value);
let tool = RepairPartitionColumnTool {
kv_backend: kv_backend.clone(),
dry_run: false,
update_limit: Some(1),
};
tool.do_work().await.unwrap();
let actual = String::from_utf8(
kv_backend
.get(&table_info_key)
.await
.unwrap()
.unwrap()
.value,
)
.unwrap();
let expected = r#"{"table_info":{"ident":{"table_id":1282,"version":2},"name":"foo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"c0","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c1","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c2","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c3","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c4","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c5","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c6","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c7","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c8","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c9","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c10","data_type":{"Timestamp":{"Nanosecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":10,"version":2},"primary_key_indices":[4,7],"value_indices":[0,1,2,3,5,6,8,9,10],"engine":"mito","next_column_id":11,"options":{"write_buffer_size":null,"ttl":"14days","skip_wal":false,"extra_options":{"append_mode":"true"}},"created_on":"2025-09-25T01:39:28.702584510Z","updated_on":"2025-09-25T01:39:28.702584510Z","partition_key_indices":[4],"column_ids":[]},"table_type":"Base"},"version":3}"#;
assert_eq!(actual, expected);
}
}

View File

@@ -21,6 +21,7 @@ pub mod store;
use std::fmt::{Display, Formatter};
use api::v1::meta::{KeyValue as PbKeyValue, ResponseHeader as PbResponseHeader};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub struct ResponseHeader(PbResponseHeader);
@@ -48,7 +49,7 @@ impl ResponseHeader {
}
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct KeyValue {
pub key: Vec<u8>,
pub value: Vec<u8>,

View File

@@ -35,7 +35,7 @@ use crate::kv_backend::KvBackendRef;
use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use crate::rpc::KeyValue;
use crate::rpc::store::{BatchPutRequest, RangeRequest};
use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
use crate::snapshot::file::Document;
/// The format of the backup file.
#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
@@ -164,7 +164,7 @@ pub struct MetadataSnapshotManager {
const MAX_REQUEST_SIZE: usize = 1024 * 1024;
/// Returns true if the key is an internal key.
fn is_internal_key(kv: &FileKeyValue) -> bool {
fn is_internal_key(kv: &KeyValue) -> bool {
kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes())
}
@@ -197,7 +197,7 @@ impl MetadataSnapshotManager {
let mut total_request_size = 0;
let mut count = 0;
let now = Instant::now();
for FileKeyValue { key, value } in metadata_content.into_iter() {
for KeyValue { key, value } in metadata_content.into_iter() {
count += 1;
let key_size = key.len();
let value_size = value.len();
@@ -277,7 +277,7 @@ impl MetadataSnapshotManager {
let now = Instant::now();
let req = RangeRequest::new().with_range(vec![0], vec![0]);
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
Ok(FileKeyValue {
Ok(KeyValue {
key: kv.key,
value: kv.value,
})

View File

@@ -20,6 +20,7 @@ use snafu::ResultExt;
use crate::error::{
DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu,
};
use crate::rpc::KeyValue;
use crate::snapshot::FileFormat;
/// The layout of the backup file.
@@ -118,13 +119,6 @@ impl MetadataContent {
}
}
/// The key-value pair of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct KeyValue {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
#[cfg(test)]
mod tests {
use super::*;