From b0c093508a9d8d9f99fb6722eb701dc9f72af1a2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 22 Apr 2026 11:05:01 +0800 Subject: [PATCH] fix: upgrade mysql metadata value limit to mediumblob (#7985) * fix: upgrade mysql metadata values to mediumblob * fix: fail mysql metadata startup on upgrade check errors Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/kv_backend/rds/mysql.rs | 278 +++++++++++++++++++- 1 file changed, 272 insertions(+), 6 deletions(-) diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs index ee776239bf..ea8bab4704 100644 --- a/src/common/meta/src/kv_backend/rds/mysql.rs +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -15,14 +15,18 @@ use std::marker::PhantomData; use std::sync::Arc; -use common_telemetry::debug; -use snafu::ResultExt; +use common_telemetry::{debug, info, warn}; +use lazy_static::lazy_static; +use regex::Regex; +use snafu::{OptionExt, ResultExt}; use sqlx::mysql::MySqlRow; use sqlx::pool::Pool; use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction}; use strum::AsRefStr; -use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result}; +use crate::error::{ + CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result, UnexpectedSnafu, +}; use crate::kv_backend::KvBackendRef; use crate::kv_backend::rds::{ Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE, @@ -37,6 +41,18 @@ use crate::rpc::store::{ const MYSQL_STORE_NAME: &str = "mysql_store"; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ValueBlobType { + Blob, + MediumBlob, + LongBlob, +} + +lazy_static! { + static ref VALUE_COLUMN_BLOB_TYPE_RE: Regex = + Regex::new(r#"(?i)(?:\(|,)\s*[`"]?v[`"]?\s+(longblob|mediumblob|blob)\b"#).unwrap(); +} + type MySqlClient = Arc>; pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>); @@ -161,7 +177,11 @@ impl<'a> MySqlTemplateFactory<'a> { table_name: table_name.to_string(), create_table_statement: format!( // Cannot be more than 3072 bytes in PRIMARY KEY - "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);", + "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v MEDIUMBLOB);", + ), + show_create_table_statement: format!("SHOW CREATE TABLE `{table_name}`"), + alter_value_column_statement: format!( + "ALTER TABLE `{table_name}` MODIFY COLUMN v MEDIUMBLOB;" ), range_template: RangeTemplate { point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"), @@ -186,6 +206,8 @@ impl<'a> MySqlTemplateFactory<'a> { pub struct MySqlTemplateSet { table_name: String, create_table_statement: String, + show_create_table_statement: String, + alter_value_column_statement: String, range_template: RangeTemplate, delete_template: RangeTemplate, } @@ -534,6 +556,68 @@ impl KvQueryExecutor for MySqlStore { } impl MySqlStore { + /// Reads the current table definition for best-effort schema upgrades. + async fn fetch_create_table_sql( + pool: &Pool, + sql_template_set: &MySqlTemplateSet, + ) -> Result> { + let row = sqlx::query(&sql_template_set.show_create_table_statement) + .fetch_optional(pool) + .await + .with_context(|_| MySqlExecutionSnafu { + sql: sql_template_set.show_create_table_statement.clone(), + })?; + Ok(row.map(|row| row.get(1))) + } + + /// Parses the blob type of the `v` column from `SHOW CREATE TABLE` output. + fn parse_value_column_blob_type(create_table_sql: &str) -> Option { + // `SHOW CREATE TABLE` returns MySQL-specific DDL. A minimal parser keeps the + // upgrade check small and avoids introducing a SQL parser just for one column. + let captures = VALUE_COLUMN_BLOB_TYPE_RE.captures(create_table_sql)?; + match captures.get(1)?.as_str().to_ascii_lowercase().as_str() { + "blob" => Some(ValueBlobType::Blob), + "mediumblob" => Some(ValueBlobType::MediumBlob), + "longblob" => Some(ValueBlobType::LongBlob), + _ => None, + } + } + + /// Upgrades the metadata value column to `MEDIUMBLOB` when an old table still uses `BLOB`. + async fn maybe_upgrade_value_column_to_mediumblob( + pool: &Pool, + sql_template_set: &MySqlTemplateSet, + ) -> Result<()> { + let table_name = &sql_template_set.table_name; + let create_table_sql = Self::fetch_create_table_sql(pool, sql_template_set) + .await? + .context(UnexpectedSnafu { + err_msg: format!("Failed to fetch CREATE TABLE SQL for `{table_name}`"), + })?; + + match Self::parse_value_column_blob_type(&create_table_sql) { + Some(ValueBlobType::Blob) => { + sqlx::query(&sql_template_set.alter_value_column_statement) + .execute(pool) + .await + .with_context(|_| MySqlExecutionSnafu { + sql: sql_template_set.alter_value_column_statement.clone(), + })?; + info!("Upgraded MySQL metadata value column to MEDIUMBLOB for `{table_name}`"); + } + Some(ValueBlobType::MediumBlob | ValueBlobType::LongBlob) => { + debug!("MySQL metadata value column for `{table_name}` is already compatible"); + } + None => { + warn!( + "Failed to determine MySQL metadata value column type from table definition for `{table_name}`, skip automatic MEDIUMBLOB upgrade" + ); + } + } + + Ok(()) + } + /// Create [MySqlStore] impl of [KvBackendRef] from url. pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result { let pool = MySqlPool::connect(url) @@ -558,6 +642,7 @@ impl MySqlStore { .with_context(|_| MySqlExecutionSnafu { sql: sql_template_set.create_table_statement.clone(), })?; + Self::maybe_upgrade_value_column_to_mediumblob(&pool, &sql_template_set).await?; Ok(Arc::new(MySqlStore { max_txn_ops, sql_template_set, @@ -574,6 +659,7 @@ impl MySqlStore { mod tests { use common_telemetry::init_default_ut_logging; use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode}; + use uuid::Uuid; use super::*; use crate::kv_backend::test::{ @@ -585,15 +671,45 @@ mod tests { text_txn_multi_compare_op, unprepare_kv, }; use crate::maybe_skip_mysql_integration_test; + use crate::rpc::store::{PutRequest, RangeRequest}; use crate::test_util::test_certs_dir; - async fn build_mysql_kv_backend(table_name: &str) -> Option { + fn new_test_table_name(prefix: &str) -> String { + let uuid = Uuid::new_v4().simple().to_string(); + let max_prefix_len = 63usize.saturating_sub(uuid.len() + 1); + let prefix = &prefix[..prefix.len().min(max_prefix_len)]; + format!("{prefix}_{uuid}") + } + + async fn mysql_pool() -> Option { init_default_ut_logging(); let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default(); if endpoints.is_empty() { return None; } - let pool = MySqlPool::connect(&endpoints).await.unwrap(); + Some(MySqlPool::connect(&endpoints).await.unwrap()) + } + + async fn show_create_table(pool: &MySqlPool, table_name: &str) -> String { + let sql = format!("SHOW CREATE TABLE `{table_name}`"); + let row = sqlx::query(&sql).fetch_one(pool).await.unwrap(); + row.get::(1) + } + + async fn create_legacy_blob_table(pool: &MySqlPool, table_name: &str) { + let sql = format!( + "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);" + ); + sqlx::query(&sql).execute(pool).await.unwrap(); + } + + async fn drop_table(pool: &MySqlPool, table_name: &str) { + let sql = format!("DROP TABLE IF EXISTS `{table_name}`;"); + sqlx::query(&sql).execute(pool).await.unwrap(); + } + + async fn build_mysql_kv_backend(table_name: &str) -> Option { + let pool = mysql_pool().await?; let sql_templates = MySqlTemplateFactory::new(table_name).build(); sqlx::query(&sql_templates.create_table_statement) .execute(&pool) @@ -610,6 +726,156 @@ mod tests { }) } + #[test] + fn test_parse_value_column_blob_type() { + let sql = r#"CREATE TABLE `greptime_metakv` ( + `k` varbinary(3072) NOT NULL, + `v` MEDIUMBLOB, + PRIMARY KEY (`k`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"#; + assert_eq!( + Some(ValueBlobType::MediumBlob), + MySqlStore::parse_value_column_blob_type(sql) + ); + + let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` blob, PRIMARY KEY (`k`))"#; + assert_eq!( + Some(ValueBlobType::Blob), + MySqlStore::parse_value_column_blob_type(sql) + ); + + let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` longblob, PRIMARY KEY (`k`))"#; + assert_eq!( + Some(ValueBlobType::LongBlob), + MySqlStore::parse_value_column_blob_type(sql) + ); + + let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` BLOB NOT NULL, PRIMARY KEY (`k`))"; + assert_eq!( + Some(ValueBlobType::Blob), + MySqlStore::parse_value_column_blob_type(sql) + ); + + let sql = "CREATE TABLE `greptime_metakv` (\n `k` varbinary(3072) NOT NULL,\n \"v\" MediumBlob,\n PRIMARY KEY (`k`)\n)"; + assert_eq!( + Some(ValueBlobType::MediumBlob), + MySqlStore::parse_value_column_blob_type(sql) + ); + + let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, vv blob, `v` longblob, PRIMARY KEY (`k`))"; + assert_eq!( + Some(ValueBlobType::LongBlob), + MySqlStore::parse_value_column_blob_type(sql) + ); + + let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, value blob, PRIMARY KEY (`k`))"; + assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql)); + + let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` varchar(255), PRIMARY KEY (`k`))"; + assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql)); + + let sql = + "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, PRIMARY KEY (`k`))"; + assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql)); + } + + #[tokio::test] + async fn test_mysql_new_metadata_table_uses_mediumblob() { + maybe_skip_mysql_integration_test!(); + let pool = mysql_pool().await.unwrap(); + let table_name = new_test_table_name("test_mysql_mediumblob_schema"); + + MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128) + .await + .unwrap(); + + let create_table = show_create_table(&pool, &table_name).await; + assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB")); + + drop_table(&pool, &table_name).await; + } + + #[tokio::test] + async fn test_mysql_legacy_blob_metadata_table_is_upgraded() { + maybe_skip_mysql_integration_test!(); + let pool = mysql_pool().await.unwrap(); + let table_name = new_test_table_name("test_mysql_legacy_blob_upgrade"); + + create_legacy_blob_table(&pool, &table_name).await; + MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128) + .await + .unwrap(); + + let create_table = show_create_table(&pool, &table_name).await; + assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB")); + + drop_table(&pool, &table_name).await; + } + + #[tokio::test] + async fn test_mysql_metadata_table_stores_large_values() { + maybe_skip_mysql_integration_test!(); + let pool = mysql_pool().await.unwrap(); + let table_name = new_test_table_name("test_mysql_large_metadata_value"); + let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128) + .await + .unwrap(); + let key = b"large-value".to_vec(); + let value = vec![b'x'; 70 * 1024]; + + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + let response = kv_backend + .range(RangeRequest::new().with_key(key.clone())) + .await + .unwrap(); + + assert_eq!(1, response.kvs.len()); + assert_eq!(key, response.kvs[0].key); + assert_eq!(value, response.kvs[0].value); + + drop_table(&pool, &table_name).await; + } + + #[tokio::test] + async fn test_mysql_upgraded_metadata_table_stores_large_values() { + maybe_skip_mysql_integration_test!(); + let pool = mysql_pool().await.unwrap(); + let table_name = new_test_table_name("test_mysql_upgraded_large_metadata_value"); + + create_legacy_blob_table(&pool, &table_name).await; + let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128) + .await + .unwrap(); + let key = b"large-value".to_vec(); + let value = vec![b'y'; 70 * 1024]; + + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + let response = kv_backend + .range(RangeRequest::new().with_key(key.clone())) + .await + .unwrap(); + + assert_eq!(1, response.kvs.len()); + assert_eq!(key, response.kvs[0].key); + assert_eq!(value, response.kvs[0].value); + + drop_table(&pool, &table_name).await; + } + #[tokio::test] async fn test_mysql_put() { maybe_skip_mysql_integration_test!();