Files
fys cce70b9427 fix(mito): failed to compact memtable with json2 (#8297)
* fix(json2): failed to compact memtable

* fix: cargo clippy

* refactor: align schema with json2 filed in flush

* chore: add unit test for json aligner

* chore: add json2 integration test

* fix: cr by codex

* fix: use parquet schema for encoded JSON2 memtable parts

* Use is_structured_json_field to determine whether the field is of JSON2 type.

* fix: cargo clippy

* fix: only align structured json fields

* chore: assert bulk JSON2 aligner input schemas in debug
2026-06-26 08:52:06 +00:00

357 lines
9.6 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use sqlx::{Connection, Executor, MySqlConnection, Row};
use tests_integration::test_util::{StorageType, setup_mysql_server};
type Json2Rows = Vec<(
String,
Option<String>,
Option<String>,
Option<String>,
Option<String>,
)>;
#[tokio::test(flavor = "multi_thread")]
async fn test_json2_single_mem_range_flush() {
common_telemetry::init_default_ut_logging();
let (mut guard, server) =
setup_mysql_server(StorageType::File, "test_json2_single_range_flush").await;
let addr = server.bind_addr().unwrap();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
create_json2_table(&mut conn, "json2_single_mem_range", 100)
.await
.unwrap();
// One INSERT statement produces one bulk part, so flush sees exactly one
// mem range. The rows still carry different JSON2 shapes, exercising
// alignment inside that range.
conn.execute(
r#"
INSERT INTO json2_single_mem_range VALUES
(1, 'host1', '{"payload":{"id":1},"metric":1}'),
(2, 'host2', '{"payload":{"name":"n2"},"flag":true}')
"#,
)
.await
.unwrap();
conn.execute("ADMIN FLUSH_TABLE('json2_single_mem_range')")
.await
.unwrap();
assert_eq!(
vec![
(
"host1".to_string(),
Some("1".to_string()),
None,
Some("1".to_string()),
None,
),
(
"host2".to_string(),
None,
Some("n2".to_string()),
None,
Some("true".to_string())
),
],
query_json2_rows(&mut conn, "json2_single_mem_range")
.await
.unwrap()
);
let _ = server.shutdown().await;
guard.remove_all().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_json2_multi_mem_range_flush() {
common_telemetry::init_default_ut_logging();
let (mut guard, server) =
setup_mysql_server(StorageType::File, "test_json2_multi_range_flush").await;
let addr = server.bind_addr().unwrap();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
create_json2_table(&mut conn, "json2_multiple_mem_ranges", 100)
.await
.unwrap();
// Separate INSERT statements produce separate bulk parts. The high merge
// threshold keeps them unmerged, so flush must align JSON2 schemas across
// multiple mem ranges.
conn.execute(
r#"INSERT INTO json2_multiple_mem_ranges VALUES
(1, 'host1', '{"payload":{"id":1},"metric":1}')"#,
)
.await
.unwrap();
conn.execute(
r#"INSERT INTO json2_multiple_mem_ranges VALUES
(2, 'host2', '{"payload":{"name":"n2"},"flag":true}')"#,
)
.await
.unwrap();
conn.execute("ADMIN FLUSH_TABLE('json2_multiple_mem_ranges')")
.await
.unwrap();
assert_eq!(
vec![
(
"host1".to_string(),
Some("1".to_string()),
None,
Some("1".to_string()),
None,
),
(
"host2".to_string(),
None,
Some("n2".to_string()),
None,
Some("true".to_string())
),
],
query_json2_rows(&mut conn, "json2_multiple_mem_ranges")
.await
.unwrap()
);
conn.execute(
r#"INSERT INTO json2_multiple_mem_ranges VALUES
(3, 'host3', '{"payload":{"id":3,"name":"n3"},"metric":3,"flag":false}')"#,
)
.await
.unwrap();
conn.execute(
r#"INSERT INTO json2_multiple_mem_ranges VALUES
(4, 'host4', '{"payload":{"extra":"e4"},"metric":4}')"#,
)
.await
.unwrap();
conn.execute("ADMIN FLUSH_TABLE('json2_multiple_mem_ranges')")
.await
.unwrap();
assert_eq!(2, count_parquet_files(guard.home_guard.temp_dir.path()));
conn.execute("ADMIN COMPACT_TABLE('json2_multiple_mem_ranges')")
.await
.unwrap();
assert_eq!(
vec![
(
"host1".to_string(),
Some("1".to_string()),
None,
Some("1".to_string()),
None,
),
(
"host2".to_string(),
None,
Some("n2".to_string()),
None,
Some("true".to_string())
),
(
"host3".to_string(),
Some("3".to_string()),
Some("n3".to_string()),
Some("3".to_string()),
Some("false".to_string())
),
("host4".to_string(), None, None, Some("4".to_string()), None,),
],
query_json2_rows(&mut conn, "json2_multiple_mem_ranges")
.await
.unwrap()
);
let _ = server.shutdown().await;
guard.remove_all().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_json2_multi_row_insert() {
common_telemetry::init_default_ut_logging();
const NUM_ROWS: usize = 1024;
const TABLE_NAME: &str = "json2_multi_row_insert";
let (mut guard, server) =
setup_mysql_server(StorageType::File, "test_json2_multi_row_insert").await;
let addr = server.bind_addr().unwrap();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
create_json2_compaction_table(&mut conn, TABLE_NAME)
.await
.unwrap();
for i in 0..NUM_ROWS {
let json = json2_payload(i);
let sql = format!(
r#"INSERT INTO {TABLE_NAME} VALUES
({}, 'host{}', '{}')"#,
i + 1,
i,
json
);
conn.execute(sql.as_str()).await.unwrap();
}
assert_eq!(
NUM_ROWS as i64,
count_table_rows(&mut conn, TABLE_NAME).await.unwrap()
);
let _ = server.shutdown().await;
guard.remove_all().await;
}
async fn create_json2_table(
conn: &mut MySqlConnection,
table_name: &str,
merge_threshold: usize,
) -> sqlx::Result<()> {
conn.execute(
format!(
r#"
CREATE TABLE {table_name} (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
j JSON2
) WITH (
'append_mode' = 'true',
'sst_format' = 'flat',
'memtable.type' = 'bulk',
'memtable.bulk.merge_threshold' = '{merge_threshold}',
'memtable.bulk.encode_row_threshold' = '1000000'
)
"#
)
.as_str(),
)
.await?;
Ok(())
}
async fn create_json2_compaction_table(
conn: &mut MySqlConnection,
table_name: &str,
) -> sqlx::Result<()> {
conn.execute(
format!(
r#"
CREATE TABLE {table_name} (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
j JSON2
) WITH (
'append_mode' = 'true',
'sst_format' = 'flat',
'memtable.type' = 'bulk',
'memtable.bulk.merge_threshold' = '8',
'memtable.bulk.encode_row_threshold' = '64',
'memtable.bulk.encode_bytes_threshold' = '100000000'
)
"#
)
.as_str(),
)
.await?;
Ok(())
}
async fn query_json2_rows(conn: &mut MySqlConnection, table_name: &str) -> sqlx::Result<Json2Rows> {
let rows = sqlx::query(
format!(
r#"
SELECT
host,
j.payload.id::STRING AS payload_id,
j.payload.name::STRING AS payload_name,
j.metric::STRING AS metric,
j.flag::STRING AS flag
FROM {table_name}
ORDER BY ts
"#
)
.as_str(),
)
.fetch_all(conn)
.await?;
Ok(rows
.into_iter()
.map(|row| {
(
row.get::<String, _>("host"),
row.get::<Option<String>, _>("payload_id"),
row.get::<Option<String>, _>("payload_name"),
row.get::<Option<String>, _>("metric"),
row.get::<Option<String>, _>("flag"),
)
})
.collect())
}
async fn count_table_rows(conn: &mut MySqlConnection, table_name: &str) -> sqlx::Result<i64> {
let row = sqlx::query(format!("SELECT COUNT(*) AS count FROM {table_name}").as_str())
.fetch_one(conn)
.await?;
Ok(row.get("count"))
}
fn json2_payload(i: usize) -> String {
match i % 4 {
0 => format!(r#"{{"payload":{{"id":{i}}},"metric":{i}}}"#),
1 => format!(r#"{{"payload":{{"name":"n{i}"}},"flag":true}}"#),
2 => format!(r#"{{"payload":{{"score":{}.5}},"tags":["a","b"]}}"#, i),
_ => format!(r#"{{"payload":{{"extra":"e{i}"}},"metric":{i},"flag":false}}"#),
}
}
fn count_parquet_files(path: &Path) -> usize {
let Ok(entries) = std::fs::read_dir(path) else {
return 0;
};
entries
.filter_map(Result::ok)
.map(|entry| entry.path())
.map(|path| {
if path.is_dir() {
count_parquet_files(&path)
} else if path.extension().is_some_and(|ext| ext == "parquet") {
1
} else {
0
}
})
.sum()
}