bench: flow update case

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-23 15:36:50 +08:00
parent 576c802394
commit 42445979b9

View File

@@ -16,6 +16,7 @@
mod test {
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::auth_header::AuthScheme;
use api::v1::{Basic, ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
@@ -30,11 +31,14 @@ mod test {
use common_query::OutputData;
use common_recordbatch::RecordBatch;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use datatypes::arrow::array::Array;
use datatypes::arrow_array::StringArray;
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Int32Vector, StringVector, TimestampMillisecondVector};
use futures_util::StreamExt;
use itertools::Itertools;
use serde_json::Value;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcServerConfig};
@@ -337,6 +341,707 @@ mod test {
assert!(err_msg.contains(&previous_watermark.1.to_string()));
}
#[derive(Debug, Clone, Copy)]
struct ManualIncrementalBenchmarkCase {
label: &'static str,
initial_hosts: usize,
initial_rows_per_host: usize,
delta_existing_hosts: usize,
delta_rows_per_host: usize,
include_new_delta_host: bool,
iterations: usize,
}
#[derive(Debug)]
struct QueryExecutionSummary {
pretty: String,
elapsed: Duration,
row_count: usize,
region_watermarks: Vec<RegionWatermarkEntry>,
region_watermark_map: Option<std::collections::HashMap<u64, u64>>,
}
#[derive(Debug)]
struct ExplainQuerySummary {
pretty: String,
row_count: usize,
plan_texts: Vec<String>,
json_nodes: Vec<ExplainNodeSummary>,
}
#[derive(Debug, Clone)]
struct ExplainNodeSummary {
name: String,
param: String,
output_rows: usize,
elapsed_compute: usize,
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "benchmark-style integration test for Task 4b"]
async fn test_standalone_flight_manual_incremental_join_benchmark() {
common_telemetry::init_default_ut_logging();
let (db, server) = setup_grpc_server(
StorageType::File,
"test_standalone_flight_manual_incremental_join_benchmark",
)
.await;
let addr = server.bind_addr().unwrap().to_string();
let client = Client::with_urls(vec![addr]);
let client = Database::new_with_dbname("greptime-public", client);
let cases = [
ManualIncrementalBenchmarkCase {
label: "small_delta_large_sink",
initial_hosts: 128,
initial_rows_per_host: 6,
delta_existing_hosts: 4,
delta_rows_per_host: 2,
include_new_delta_host: true,
iterations: 3,
},
ManualIncrementalBenchmarkCase {
label: "medium_delta_large_sink",
initial_hosts: 128,
initial_rows_per_host: 6,
delta_existing_hosts: 32,
delta_rows_per_host: 2,
include_new_delta_host: true,
iterations: 3,
},
ManualIncrementalBenchmarkCase {
label: "small_delta_small_sink",
initial_hosts: 16,
initial_rows_per_host: 6,
delta_existing_hosts: 4,
delta_rows_per_host: 2,
include_new_delta_host: true,
iterations: 3,
},
ManualIncrementalBenchmarkCase {
label: "tiny_delta_huge_sink",
initial_hosts: 1024,
initial_rows_per_host: 8,
delta_existing_hosts: 2,
delta_rows_per_host: 1,
include_new_delta_host: true,
iterations: 2,
},
ManualIncrementalBenchmarkCase {
label: "tiny_delta_src_heavy_sink_small",
initial_hosts: 16,
initial_rows_per_host: 1024,
delta_existing_hosts: 2,
delta_rows_per_host: 1,
include_new_delta_host: true,
iterations: 2,
},
ManualIncrementalBenchmarkCase {
label: "tiny_delta_src_huge_sink_mid",
initial_hosts: 128,
initial_rows_per_host: 512,
delta_existing_hosts: 2,
delta_rows_per_host: 1,
include_new_delta_host: true,
iterations: 2,
},
ManualIncrementalBenchmarkCase {
label: "tiny_delta_src_extreme_sink_mid",
initial_hosts: 256,
initial_rows_per_host: 1024,
delta_existing_hosts: 2,
delta_rows_per_host: 1,
include_new_delta_host: true,
iterations: 2,
},
ManualIncrementalBenchmarkCase {
label: "small_delta_src_ultra_sink_mid",
initial_hosts: 256,
initial_rows_per_host: 4096,
delta_existing_hosts: 20,
delta_rows_per_host: 100,
include_new_delta_host: true,
iterations: 2,
},
];
for case in cases {
run_manual_incremental_join_benchmark_case(&db, &client, case).await;
}
}
async fn run_manual_incremental_join_benchmark_case(
db: &impl MockInstance,
client: &Database,
case: ManualIncrementalBenchmarkCase,
) {
let source_table = format!("flow_bench_src_{}", case.label);
let sink_table = format!("flow_bench_sink_{}", case.label);
create_flow_bench_source_table(client, &source_table).await;
create_flow_bench_sink_table(client, &sink_table).await;
let initial_rows = make_source_rows(
&source_table,
0,
case.initial_hosts,
case.initial_rows_per_host,
1,
);
insert_source_rows(client, &source_table, &initial_rows).await;
let source_table_id = db
.frontend()
.catalog_manager()
.table("greptime", "public", &source_table, None)
.await
.unwrap()
.unwrap()
.table_info()
.table_id();
let previous_watermarks = std::collections::HashMap::from([(
RegionId::new(source_table_id, 0).as_u64(),
initial_rows.len() as u64,
)]);
execute_affected_rows_sql(
client,
&format!(
"insert into {sink_table} select host, cast(sum(v) as bigint), max(ts) from {source_table} group by host"
),
)
.await;
let sink_table_id = db
.frontend()
.catalog_manager()
.table("greptime", "public", &sink_table, None)
.await
.unwrap()
.unwrap()
.table_info()
.table_id();
let delta_rows = make_delta_rows(
case,
(case.initial_hosts * case.initial_rows_per_host) as i64 + 1,
);
insert_source_rows(client, &source_table, &delta_rows).await;
let expected_summary =
execute_stream_query_with_hints(client, &full_aggregate_query(&source_table), &[])
.await;
let expected = expected_summary.pretty.clone();
let expected_row_count = expected_summary.row_count;
let changed_hosts = delta_rows
.iter()
.map(|(host, _, _)| host.clone())
.unique()
.collect_vec();
let expected_changed_summary = execute_stream_query_with_hints(
client,
&changed_rows_full_baseline_query(&source_table, &changed_hosts),
&[],
)
.await;
let expected_changed = expected_changed_summary.pretty.clone();
let expected_changed_row_count = expected_changed_summary.row_count;
let incremental_after_seqs = format_region_seq_map(&previous_watermarks);
let sink_table_id_str = sink_table_id.to_string();
let hint_pairs = [
("flow.incremental_mode", "memtable_only".to_string()),
(
"flow.incremental_after_seqs",
incremental_after_seqs.clone(),
),
("flow.return_region_seq", "true".to_string()),
("flow.sink_table_id", sink_table_id_str.clone()),
];
let hints = hint_pairs
.iter()
.map(|(k, v)| (*k, v.as_str()))
.collect::<Vec<_>>();
let mut full_latencies = Vec::with_capacity(case.iterations);
let mut delta_only_left_join_latencies = Vec::with_capacity(case.iterations);
if matches!(
case.label,
"tiny_delta_src_extreme_sink_mid" | "small_delta_src_ultra_sink_mid"
) {
let full_verbose = execute_explain_query_with_hints(
client,
&format!(
"EXPLAIN ANALYZE VERBOSE {}",
full_aggregate_query(&source_table)
),
&[],
)
.await;
let delta_only_verbose = execute_explain_query_with_hints(
client,
&format!(
"EXPLAIN ANALYZE VERBOSE {}",
manual_delta_only_left_join_update_query(&source_table, &sink_table)
),
&hints,
)
.await;
let full_json = execute_explain_query_with_hints(
client,
&format!(
"EXPLAIN ANALYZE FORMAT JSON {}",
full_aggregate_query(&source_table)
),
&[],
)
.await;
let delta_only_json = execute_explain_query_with_hints(
client,
&format!(
"EXPLAIN ANALYZE FORMAT JSON {}",
manual_delta_only_left_join_update_query(&source_table, &sink_table)
),
&hints,
)
.await;
assert!(
full_verbose.pretty.contains("AggregateExec")
|| full_verbose.pretty.contains("HashAggregateExec")
);
assert!(
delta_only_verbose.pretty.contains("HashJoinExec")
|| delta_only_verbose.pretty.contains("Left")
);
assert!(full_verbose.pretty.contains("SeqScan"));
assert!(delta_only_verbose.pretty.contains("SeqScan"));
assert!(full_json.pretty.contains("elapsed_compute"));
assert!(delta_only_json.pretty.contains("elapsed_compute"));
assert!(
full_json
.json_nodes
.iter()
.any(|node| node.name == "AggregateExec"),
"expected full explain json to include AggregateExec nodes"
);
assert!(
full_json
.json_nodes
.iter()
.any(|node| node.name == "SeqScan"),
"expected full explain json to include SeqScan nodes"
);
assert!(
delta_only_json
.json_nodes
.iter()
.any(|node| node.name == "HashJoinExec"),
"expected delta-only explain json to include HashJoinExec nodes"
);
assert!(
delta_only_json
.json_nodes
.iter()
.filter(|node| node.name == "SeqScan")
.count()
>= 2,
"expected delta-only explain json to include both source and sink SeqScan nodes"
);
common_telemetry::info!(
"FlowBench profile case={} full_verbose_rows={} delta_only_verbose_rows={} full_json_rows={} delta_only_json_rows={} full_json_plans={} delta_only_json_plans={}\nFULL_VERBOSE\n{}\nDELTA_ONLY_VERBOSE\n{}\nFULL_JSON\n{}\nDELTA_ONLY_JSON\n{}\nFULL_JSON_NODE_SUMMARY={:?}\nDELTA_ONLY_JSON_NODE_SUMMARY={:?}",
case.label,
full_verbose.row_count,
delta_only_verbose.row_count,
full_json.row_count,
delta_only_json.row_count,
full_json.plan_texts.len(),
delta_only_json.plan_texts.len(),
full_verbose.pretty,
delta_only_verbose.pretty,
full_json.pretty,
delta_only_json.pretty,
summarize_explain_nodes(&full_json.json_nodes),
summarize_explain_nodes(&delta_only_json.json_nodes),
);
}
for _ in 0..case.iterations {
let full_summary =
execute_stream_query_with_hints(client, &full_aggregate_query(&source_table), &[])
.await;
let changed_rows_full_summary = execute_stream_query_with_hints(
client,
&changed_rows_full_baseline_query(&source_table, &changed_hosts),
&[],
)
.await;
let delta_only_left_join_summary = execute_stream_query_with_hints(
client,
&manual_delta_only_left_join_update_query(&source_table, &sink_table),
&hints,
)
.await;
assert_eq!(full_summary.pretty, expected);
assert_eq!(changed_rows_full_summary.pretty, expected_changed);
assert_eq!(delta_only_left_join_summary.pretty, expected_changed);
assert_eq!(
changed_rows_full_summary.row_count,
expected_changed_row_count
);
assert_eq!(
delta_only_left_join_summary.row_count,
expected_changed_row_count
);
assert!(
delta_only_left_join_summary
.region_watermark_map
.as_ref()
.map(|watermarks| !watermarks.is_empty())
.unwrap_or(false)
|| !delta_only_left_join_summary.region_watermarks.is_empty(),
"expected delta-only left-join incremental query to return region watermark metrics"
);
full_latencies.push(full_summary.elapsed);
delta_only_left_join_latencies.push(delta_only_left_join_summary.elapsed);
}
common_telemetry::info!(
"FlowBench case={} full_latencies_ms={:?} delta_only_left_join_latencies_ms={:?} expected_rows={} expected_changed_rows={}",
case.label,
full_latencies.iter().map(Duration::as_millis).collect_vec(),
delta_only_left_join_latencies
.iter()
.map(Duration::as_millis)
.collect_vec(),
expected_row_count,
expected_changed_row_count,
);
client
.sql(&format!("admin flush_table('{source_table}')"))
.await
.unwrap();
let output = client
.sql_with_hint(
&manual_delta_only_left_join_update_query(&source_table, &sink_table),
&hints,
)
.await
.unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stale delta-only benchmark query to return stream output");
};
let err = loop {
match stream.next().await {
Some(Err(err)) => break err,
Some(Ok(_)) => continue,
None => panic!("expected stale delta-only incremental benchmark query to fail"),
}
};
assert_eq!(err.status_code(), StatusCode::EngineExecuteQuery);
let err_msg = format!("{err:?}");
assert!(err_msg.contains("STALE_CURSOR"));
}
async fn execute_stream_query_with_hints(
client: &Database,
sql: &str,
hints: &[(&str, &str)],
) -> QueryExecutionSummary {
let started = Instant::now();
let result = client.sql_with_terminal_metrics(sql, hints).await.unwrap();
let region_watermark_map = result.region_watermark_map();
let OutputData::Stream(mut stream) = result.output.data else {
panic!("expected stream output for benchmark query");
};
let schema = stream.schema();
let mut row_count = 0;
let mut rows = Vec::new();
while let Some(batch) = stream.next().await {
let batch = batch.unwrap();
row_count += batch.num_rows();
rows.push(batch);
}
let pretty = common_recordbatch::RecordBatches::try_new(schema, rows)
.unwrap()
.pretty_print()
.unwrap();
let elapsed = started.elapsed();
let region_watermarks = stream
.metrics()
.map(|metrics| metrics.region_watermarks)
.unwrap_or_default();
let region_watermark_map = region_watermark_map.or_else(|| {
let region_seq_pairs = region_watermarks
.iter()
.map(|entry| {
entry
.watermark
.map(|watermark| (entry.region_id, watermark))
})
.collect::<Option<Vec<_>>>()?;
Some(std::collections::HashMap::from_iter(region_seq_pairs))
});
QueryExecutionSummary {
pretty,
elapsed,
row_count,
region_watermarks,
region_watermark_map,
}
}
async fn execute_explain_query_with_hints(
client: &Database,
sql: &str,
hints: &[(&str, &str)],
) -> ExplainQuerySummary {
let output = client.sql_with_hint(sql, hints).await.unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output for explain query");
};
let schema = stream.schema();
let mut row_count = 0;
let mut rows = Vec::new();
while let Some(batch) = stream.next().await {
let batch = batch.unwrap();
row_count += batch.num_rows();
rows.push(batch);
}
let plan_texts = extract_explain_plan_texts(&rows);
let pretty = common_recordbatch::RecordBatches::try_new(schema, rows)
.unwrap()
.pretty_print()
.unwrap();
let json_nodes = plan_texts
.iter()
.filter_map(|plan| serde_json::from_str::<Value>(plan).ok())
.flat_map(|value| flatten_explain_json_nodes(&value))
.collect_vec();
ExplainQuerySummary {
pretty,
row_count,
plan_texts,
json_nodes,
}
}
fn extract_explain_plan_texts(rows: &[RecordBatch]) -> Vec<String> {
let mut plan_texts = Vec::new();
for batch in rows {
let Some(plan_column) = batch.columns().get(2) else {
continue;
};
let Some(plan_array) = plan_column.as_any().downcast_ref::<StringArray>() else {
continue;
};
for idx in 0..plan_array.len() {
if plan_array.is_valid(idx) {
plan_texts.push(plan_array.value(idx).to_string());
}
}
}
plan_texts
}
fn flatten_explain_json_nodes(value: &Value) -> Vec<ExplainNodeSummary> {
let mut nodes = Vec::new();
flatten_explain_json_nodes_into(value, &mut nodes);
nodes
}
fn flatten_explain_json_nodes_into(value: &Value, nodes: &mut Vec<ExplainNodeSummary>) {
let name = value
.get("name")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let param = value
.get("param")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let output_rows = value
.get("output_rows")
.and_then(Value::as_u64)
.unwrap_or_default() as usize;
let elapsed_compute = value
.get("elapsed_compute")
.and_then(Value::as_u64)
.unwrap_or_default() as usize;
if !name.is_empty() {
nodes.push(ExplainNodeSummary {
name,
param,
output_rows,
elapsed_compute,
});
}
if let Some(children) = value.get("children").and_then(Value::as_array) {
for child in children {
flatten_explain_json_nodes_into(child, nodes);
}
}
}
fn summarize_explain_nodes(nodes: &[ExplainNodeSummary]) -> Vec<String> {
nodes
.iter()
.map(|node| {
format!(
"{} rows={} elapsed_ns={} param={}",
node.name, node.output_rows, node.elapsed_compute, node.param
)
})
.collect()
}
async fn execute_affected_rows_sql(client: &Database, sql: &str) {
let output = client.sql(sql).await.unwrap();
let OutputData::AffectedRows(_) = output.data else {
panic!("expected affected rows output for sql: {sql}");
};
}
async fn create_flow_bench_source_table(client: &Database, table_name: &str) {
execute_affected_rows_sql(
client,
&format!(
"create table {table_name}(host string, v bigint, ts timestamp time index, primary key(host))"
),
)
.await;
}
async fn create_flow_bench_sink_table(client: &Database, table_name: &str) {
execute_affected_rows_sql(
client,
&format!(
"create table {table_name}(host string, total bigint, ts timestamp time index, primary key(host))"
),
)
.await;
}
async fn insert_source_rows(client: &Database, table_name: &str, rows: &[(String, i64, i64)]) {
for chunk in &rows.iter().chunks(128) {
let values = chunk
.map(|(host, value, ts)| format!("('{host}', {value}, {ts})"))
.join(",");
execute_affected_rows_sql(
client,
&format!("insert into {table_name}(host, v, ts) values {values}"),
)
.await;
}
}
fn make_source_rows(
_table_name: &str,
ts_start: i64,
host_count: usize,
rows_per_host: usize,
value_bias: i64,
) -> Vec<(String, i64, i64)> {
let mut rows = Vec::with_capacity(host_count * rows_per_host);
let mut ts = ts_start;
for host_idx in 0..host_count {
for row_idx in 0..rows_per_host {
ts += 1;
rows.push((
format!("host_{host_idx:03}"),
value_bias + host_idx as i64 + row_idx as i64,
ts,
));
}
}
rows
}
fn make_delta_rows(
case: ManualIncrementalBenchmarkCase,
ts_start: i64,
) -> Vec<(String, i64, i64)> {
let mut rows = Vec::with_capacity(
case.delta_existing_hosts * case.delta_rows_per_host
+ usize::from(case.include_new_delta_host) * case.delta_rows_per_host,
);
let mut ts = ts_start;
for host_idx in 0..case.delta_existing_hosts {
for row_idx in 0..case.delta_rows_per_host {
ts += 1;
rows.push((
format!("host_{host_idx:03}"),
1_000 + host_idx as i64 + row_idx as i64,
ts,
));
}
}
if case.include_new_delta_host {
for row_idx in 0..case.delta_rows_per_host {
ts += 1;
rows.push((
format!("host_new_{}", case.label),
2_000 + row_idx as i64,
ts,
));
}
}
rows
}
fn full_aggregate_query(source_table: &str) -> String {
format!(
"select host, cast(sum(v) as bigint) as total from {source_table} group by host order by host"
)
}
fn changed_rows_full_baseline_query(source_table: &str, changed_hosts: &[String]) -> String {
let host_list = changed_hosts
.iter()
.map(|host| format!("'{}'", host))
.join(",");
format!(
"select host, cast(sum(v) as bigint) as total from {source_table} where host in ({host_list}) group by host order by host"
)
}
fn manual_delta_only_left_join_update_query(source_table: &str, sink_table: &str) -> String {
format!(
"with delta as (select host, cast(sum(v) as bigint) as delta_total from {source_table} group by host) \
select d.host as host, cast(coalesce(s.total, 0) + d.delta_total as bigint) as total \
from delta d left join {sink_table} s on d.host = s.host \
order by host"
)
}
fn format_region_seq_map(region_seqs: &std::collections::HashMap<u64, u64>) -> String {
let mut entries = region_seqs
.iter()
.map(|(region_id, seq)| format!(r#""{region_id}":"{seq}""#))
.collect::<Vec<_>>();
entries.sort();
format!("{{{}}}", entries.join(","))
}
async fn test_put_record_batches(client: &Database, record_batches: Vec<RecordBatch>) {
let requests_count = record_batches.len();
let schema = record_batches[0].schema.arrow_schema().clone();