fix: upgrade mysql metadata value limit to mediumblob (#7985)

* fix: upgrade mysql metadata values to mediumblob

* fix: fail mysql metadata startup on upgrade check errors

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-04-22 11:05:01 +08:00
committed by GitHub
parent 80c395ee23
commit b0c093508a

View File

@@ -15,14 +15,18 @@
use std::marker::PhantomData;
use std::sync::Arc;
use common_telemetry::debug;
use snafu::ResultExt;
use common_telemetry::{debug, info, warn};
use lazy_static::lazy_static;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use sqlx::mysql::MySqlRow;
use sqlx::pool::Pool;
use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
use strum::AsRefStr;
use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
use crate::error::{
CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result, UnexpectedSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
@@ -37,6 +41,18 @@ use crate::rpc::store::{
const MYSQL_STORE_NAME: &str = "mysql_store";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ValueBlobType {
Blob,
MediumBlob,
LongBlob,
}
lazy_static! {
static ref VALUE_COLUMN_BLOB_TYPE_RE: Regex =
Regex::new(r#"(?i)(?:\(|,)\s*[`"]?v[`"]?\s+(longblob|mediumblob|blob)\b"#).unwrap();
}
type MySqlClient = Arc<Pool<MySql>>;
pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
@@ -161,7 +177,11 @@ impl<'a> MySqlTemplateFactory<'a> {
table_name: table_name.to_string(),
create_table_statement: format!(
// Cannot be more than 3072 bytes in PRIMARY KEY
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v MEDIUMBLOB);",
),
show_create_table_statement: format!("SHOW CREATE TABLE `{table_name}`"),
alter_value_column_statement: format!(
"ALTER TABLE `{table_name}` MODIFY COLUMN v MEDIUMBLOB;"
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
@@ -186,6 +206,8 @@ impl<'a> MySqlTemplateFactory<'a> {
pub struct MySqlTemplateSet {
table_name: String,
create_table_statement: String,
show_create_table_statement: String,
alter_value_column_statement: String,
range_template: RangeTemplate,
delete_template: RangeTemplate,
}
@@ -534,6 +556,68 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
}
impl MySqlStore {
/// Reads the current table definition for best-effort schema upgrades.
async fn fetch_create_table_sql(
pool: &Pool<MySql>,
sql_template_set: &MySqlTemplateSet,
) -> Result<Option<String>> {
let row = sqlx::query(&sql_template_set.show_create_table_statement)
.fetch_optional(pool)
.await
.with_context(|_| MySqlExecutionSnafu {
sql: sql_template_set.show_create_table_statement.clone(),
})?;
Ok(row.map(|row| row.get(1)))
}
/// Parses the blob type of the `v` column from `SHOW CREATE TABLE` output.
fn parse_value_column_blob_type(create_table_sql: &str) -> Option<ValueBlobType> {
// `SHOW CREATE TABLE` returns MySQL-specific DDL. A minimal parser keeps the
// upgrade check small and avoids introducing a SQL parser just for one column.
let captures = VALUE_COLUMN_BLOB_TYPE_RE.captures(create_table_sql)?;
match captures.get(1)?.as_str().to_ascii_lowercase().as_str() {
"blob" => Some(ValueBlobType::Blob),
"mediumblob" => Some(ValueBlobType::MediumBlob),
"longblob" => Some(ValueBlobType::LongBlob),
_ => None,
}
}
/// Upgrades the metadata value column to `MEDIUMBLOB` when an old table still uses `BLOB`.
async fn maybe_upgrade_value_column_to_mediumblob(
pool: &Pool<MySql>,
sql_template_set: &MySqlTemplateSet,
) -> Result<()> {
let table_name = &sql_template_set.table_name;
let create_table_sql = Self::fetch_create_table_sql(pool, sql_template_set)
.await?
.context(UnexpectedSnafu {
err_msg: format!("Failed to fetch CREATE TABLE SQL for `{table_name}`"),
})?;
match Self::parse_value_column_blob_type(&create_table_sql) {
Some(ValueBlobType::Blob) => {
sqlx::query(&sql_template_set.alter_value_column_statement)
.execute(pool)
.await
.with_context(|_| MySqlExecutionSnafu {
sql: sql_template_set.alter_value_column_statement.clone(),
})?;
info!("Upgraded MySQL metadata value column to MEDIUMBLOB for `{table_name}`");
}
Some(ValueBlobType::MediumBlob | ValueBlobType::LongBlob) => {
debug!("MySQL metadata value column for `{table_name}` is already compatible");
}
None => {
warn!(
"Failed to determine MySQL metadata value column type from table definition for `{table_name}`, skip automatic MEDIUMBLOB upgrade"
);
}
}
Ok(())
}
/// Create [MySqlStore] impl of [KvBackendRef] from url.
pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
let pool = MySqlPool::connect(url)
@@ -558,6 +642,7 @@ impl MySqlStore {
.with_context(|_| MySqlExecutionSnafu {
sql: sql_template_set.create_table_statement.clone(),
})?;
Self::maybe_upgrade_value_column_to_mediumblob(&pool, &sql_template_set).await?;
Ok(Arc::new(MySqlStore {
max_txn_ops,
sql_template_set,
@@ -574,6 +659,7 @@ impl MySqlStore {
mod tests {
use common_telemetry::init_default_ut_logging;
use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode};
use uuid::Uuid;
use super::*;
use crate::kv_backend::test::{
@@ -585,15 +671,45 @@ mod tests {
text_txn_multi_compare_op, unprepare_kv,
};
use crate::maybe_skip_mysql_integration_test;
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::test_util::test_certs_dir;
async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
fn new_test_table_name(prefix: &str) -> String {
let uuid = Uuid::new_v4().simple().to_string();
let max_prefix_len = 63usize.saturating_sub(uuid.len() + 1);
let prefix = &prefix[..prefix.len().min(max_prefix_len)];
format!("{prefix}_{uuid}")
}
async fn mysql_pool() -> Option<MySqlPool> {
init_default_ut_logging();
let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
return None;
}
let pool = MySqlPool::connect(&endpoints).await.unwrap();
Some(MySqlPool::connect(&endpoints).await.unwrap())
}
async fn show_create_table(pool: &MySqlPool, table_name: &str) -> String {
let sql = format!("SHOW CREATE TABLE `{table_name}`");
let row = sqlx::query(&sql).fetch_one(pool).await.unwrap();
row.get::<String, _>(1)
}
async fn create_legacy_blob_table(pool: &MySqlPool, table_name: &str) {
let sql = format!(
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);"
);
sqlx::query(&sql).execute(pool).await.unwrap();
}
async fn drop_table(pool: &MySqlPool, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS `{table_name}`;");
sqlx::query(&sql).execute(pool).await.unwrap();
}
async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
let pool = mysql_pool().await?;
let sql_templates = MySqlTemplateFactory::new(table_name).build();
sqlx::query(&sql_templates.create_table_statement)
.execute(&pool)
@@ -610,6 +726,156 @@ mod tests {
})
}
#[test]
fn test_parse_value_column_blob_type() {
let sql = r#"CREATE TABLE `greptime_metakv` (
`k` varbinary(3072) NOT NULL,
`v` MEDIUMBLOB,
PRIMARY KEY (`k`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"#;
assert_eq!(
Some(ValueBlobType::MediumBlob),
MySqlStore::parse_value_column_blob_type(sql)
);
let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` blob, PRIMARY KEY (`k`))"#;
assert_eq!(
Some(ValueBlobType::Blob),
MySqlStore::parse_value_column_blob_type(sql)
);
let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` longblob, PRIMARY KEY (`k`))"#;
assert_eq!(
Some(ValueBlobType::LongBlob),
MySqlStore::parse_value_column_blob_type(sql)
);
let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` BLOB NOT NULL, PRIMARY KEY (`k`))";
assert_eq!(
Some(ValueBlobType::Blob),
MySqlStore::parse_value_column_blob_type(sql)
);
let sql = "CREATE TABLE `greptime_metakv` (\n `k` varbinary(3072) NOT NULL,\n \"v\" MediumBlob,\n PRIMARY KEY (`k`)\n)";
assert_eq!(
Some(ValueBlobType::MediumBlob),
MySqlStore::parse_value_column_blob_type(sql)
);
let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, vv blob, `v` longblob, PRIMARY KEY (`k`))";
assert_eq!(
Some(ValueBlobType::LongBlob),
MySqlStore::parse_value_column_blob_type(sql)
);
let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, value blob, PRIMARY KEY (`k`))";
assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` varchar(255), PRIMARY KEY (`k`))";
assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
let sql =
"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, PRIMARY KEY (`k`))";
assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
}
#[tokio::test]
async fn test_mysql_new_metadata_table_uses_mediumblob() {
maybe_skip_mysql_integration_test!();
let pool = mysql_pool().await.unwrap();
let table_name = new_test_table_name("test_mysql_mediumblob_schema");
MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
.await
.unwrap();
let create_table = show_create_table(&pool, &table_name).await;
assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
drop_table(&pool, &table_name).await;
}
#[tokio::test]
async fn test_mysql_legacy_blob_metadata_table_is_upgraded() {
maybe_skip_mysql_integration_test!();
let pool = mysql_pool().await.unwrap();
let table_name = new_test_table_name("test_mysql_legacy_blob_upgrade");
create_legacy_blob_table(&pool, &table_name).await;
MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
.await
.unwrap();
let create_table = show_create_table(&pool, &table_name).await;
assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
drop_table(&pool, &table_name).await;
}
#[tokio::test]
async fn test_mysql_metadata_table_stores_large_values() {
maybe_skip_mysql_integration_test!();
let pool = mysql_pool().await.unwrap();
let table_name = new_test_table_name("test_mysql_large_metadata_value");
let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
.await
.unwrap();
let key = b"large-value".to_vec();
let value = vec![b'x'; 70 * 1024];
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
let response = kv_backend
.range(RangeRequest::new().with_key(key.clone()))
.await
.unwrap();
assert_eq!(1, response.kvs.len());
assert_eq!(key, response.kvs[0].key);
assert_eq!(value, response.kvs[0].value);
drop_table(&pool, &table_name).await;
}
#[tokio::test]
async fn test_mysql_upgraded_metadata_table_stores_large_values() {
maybe_skip_mysql_integration_test!();
let pool = mysql_pool().await.unwrap();
let table_name = new_test_table_name("test_mysql_upgraded_large_metadata_value");
create_legacy_blob_table(&pool, &table_name).await;
let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
.await
.unwrap();
let key = b"large-value".to_vec();
let value = vec![b'y'; 70 * 1024];
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
let response = kv_backend
.range(RangeRequest::new().with_key(key.clone()))
.await
.unwrap();
assert_eq!(1, response.kvs.len());
assert_eq!(key, response.kvs[0].key);
assert_eq!(value, response.kvs[0].value);
drop_table(&pool, &table_name).await;
}
#[tokio::test]
async fn test_mysql_put() {
maybe_skip_mysql_integration_test!();