feat: limiting the size of query results to Dashboard (#3901)

* feat: limiting the size of query results to Dashboard

* optimize code

* fix by cr

* fix integration tests error

* remove RequestSource::parse

* refactor: sql query params

* fix: unit test

---------

Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
maco
2024-05-14 09:57:30 +08:00
committed by GitHub
parent e15294db41
commit 494ce65729
8 changed files with 132 additions and 26 deletions

View File

@@ -190,6 +190,10 @@ impl From<SchemaRef> for OutputSchema {
pub struct HttpRecordsOutput { pub struct HttpRecordsOutput {
schema: OutputSchema, schema: OutputSchema,
rows: Vec<Vec<Value>>, rows: Vec<Vec<Value>>,
// total_rows is equal to rows.len() in most cases,
// the Dashboard query result may be truncated, so we need to return the total_rows.
#[serde(default)]
total_rows: usize,
// plan level execution metrics // plan level execution metrics
#[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(skip_serializing_if = "HashMap::is_empty")]
@@ -224,6 +228,7 @@ impl HttpRecordsOutput {
Ok(HttpRecordsOutput { Ok(HttpRecordsOutput {
schema: OutputSchema::from(schema), schema: OutputSchema::from(schema),
rows: vec![], rows: vec![],
total_rows: 0,
metrics: Default::default(), metrics: Default::default(),
}) })
} else { } else {
@@ -244,6 +249,7 @@ impl HttpRecordsOutput {
Ok(HttpRecordsOutput { Ok(HttpRecordsOutput {
schema: OutputSchema::from(schema), schema: OutputSchema::from(schema),
total_rows: rows.len(),
rows, rows,
metrics: Default::default(), metrics: Default::default(),
}) })
@@ -357,6 +363,34 @@ impl HttpResponse {
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
} }
} }
pub fn with_limit(self, limit: usize) -> Self {
match self {
HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
HttpResponse::Table(resp) => resp.with_limit(limit).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
_ => self,
}
}
}
pub fn process_with_limit(
mut outputs: Vec<GreptimeQueryOutput>,
limit: usize,
) -> Vec<GreptimeQueryOutput> {
outputs
.drain(..)
.map(|data| match data {
GreptimeQueryOutput::Records(mut records) => {
if records.rows.len() > limit {
records.rows.truncate(limit);
records.total_rows = limit;
}
GreptimeQueryOutput::Records(records)
}
_ => data,
})
.collect()
} }
impl IntoResponse for HttpResponse { impl IntoResponse for HttpResponse {

View File

@@ -23,6 +23,7 @@ use mime_guess::mime;
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::process_with_limit;
use crate::http::error_result::ErrorResponse; use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
@@ -65,6 +66,11 @@ impl CsvResponse {
pub fn execution_time_ms(&self) -> u64 { pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms self.execution_time_ms
} }
pub fn with_limit(mut self, limit: usize) -> Self {
self.output = process_with_limit(self.output, limit);
self
}
} }
impl IntoResponse for CsvResponse { impl IntoResponse for CsvResponse {

View File

@@ -23,6 +23,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use super::header::GREPTIME_DB_HEADER_METRICS; use super::header::GREPTIME_DB_HEADER_METRICS;
use super::process_with_limit;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
@@ -62,6 +63,11 @@ impl GreptimedbV1Response {
pub fn execution_time_ms(&self) -> u64 { pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms self.execution_time_ms
} }
pub fn with_limit(mut self, limit: usize) -> Self {
self.output = process_with_limit(self.output, limit);
self
}
} }
impl IntoResponse for GreptimedbV1Response { impl IntoResponse for GreptimedbV1Response {

View File

@@ -62,6 +62,7 @@ pub struct SqlQuery {
// specified time precision. Maybe greptimedb format can support this // specified time precision. Maybe greptimedb format can support this
// param too. // param too.
pub epoch: Option<String>, pub epoch: Option<String>,
pub limit: Option<usize>,
} }
/// Handler to execute sql /// Handler to execute sql
@@ -98,7 +99,7 @@ pub async fn sql(
if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
Err((status, msg)) Err((status, msg))
} else { } else {
Ok(sql_handler.do_query(sql, query_ctx).await) Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
} }
} else { } else {
Err(( Err((
@@ -117,7 +118,7 @@ pub async fn sql(
Ok(outputs) => outputs, Ok(outputs) => outputs,
}; };
let resp = match format { let mut resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::Table => TableResponse::from_output(outputs).await,
@@ -125,6 +126,9 @@ pub async fn sql(
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
}; };
if let Some(limit) = query_params.limit {
resp = resp.with_limit(limit);
}
resp.with_execution_time(start.elapsed().as_millis() as u64) resp.with_execution_time(start.elapsed().as_millis() as u64)
} }

View File

@@ -24,6 +24,7 @@ use mime_guess::mime;
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::process_with_limit;
use crate::http::error_result::ErrorResponse; use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
@@ -66,6 +67,11 @@ impl TableResponse {
pub fn execution_time_ms(&self) -> u64 { pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms self.execution_time_ms
} }
pub fn with_limit(mut self, limit: usize) -> Self {
self.output = process_with_limit(self.output, limit);
self
}
} }
impl Display for TableResponse { impl Display for TableResponse {

View File

@@ -23,6 +23,7 @@ use headers::HeaderValue;
use http_body::combinators::UnsyncBoxBody; use http_body::combinators::UnsyncBoxBody;
use hyper::Response; use hyper::Response;
use mime_guess::mime; use mime_guess::mime;
use servers::http::GreptimeQueryOutput::Records;
use servers::http::{ use servers::http::{
handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState, handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState,
GreptimeQueryOutput, HttpResponse, GreptimeQueryOutput, HttpResponse,
@@ -48,10 +49,8 @@ async fn test_sql_not_provided() {
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] {
let query = http_handler::SqlQuery { let query = http_handler::SqlQuery {
db: None,
sql: None,
format: Some(format.to_string()), format: Some(format.to_string()),
epoch: None, ..Default::default()
}; };
let HttpResponse::Error(resp) = http_handler::sql( let HttpResponse::Error(resp) = http_handler::sql(
@@ -82,8 +81,9 @@ async fn test_sql_output_rows() {
script_handler: None, script_handler: None,
}; };
let query_sql = "select sum(uint32s) from numbers limit 20";
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] {
let query = create_query(format); let query = create_query(format, query_sql, None);
let json = http_handler::sql( let json = http_handler::sql(
State(api_state.clone()), State(api_state.clone()),
query, query,
@@ -112,7 +112,8 @@ async fn test_sql_output_rows() {
[ [
4950 4950
] ]
] ],
"total_rows": 1
}"# }"#
); );
} }
@@ -176,6 +177,49 @@ async fn test_sql_output_rows() {
} }
} }
#[tokio::test]
async fn test_dashboard_sql_limit() {
let sql_handler = create_testing_sql_query_handler(MemTable::specified_numbers_table(2000));
let ctx = QueryContext::arc();
ctx.set_current_user(Some(auth::userinfo_by_name(None)));
let api_state = ApiState {
sql_handler,
script_handler: None,
};
for format in ["greptimedb_v1", "csv", "table"] {
let query = create_query(format, "select * from numbers", Some(1000));
let sql_response = http_handler::sql(
State(api_state.clone()),
query,
axum::Extension(ctx.clone()),
Form(http_handler::SqlQuery::default()),
)
.await;
match sql_response {
HttpResponse::GreptimedbV1(resp) => match resp.output().first().unwrap() {
Records(records) => {
assert_eq!(records.num_rows(), 1000);
}
_ => unreachable!(),
},
HttpResponse::Csv(resp) => match resp.output().first().unwrap() {
Records(records) => {
assert_eq!(records.num_rows(), 1000);
}
_ => unreachable!(),
},
HttpResponse::Table(resp) => match resp.output().first().unwrap() {
Records(records) => {
assert_eq!(records.num_rows(), 1000);
}
_ => unreachable!(),
},
_ => unreachable!(),
}
}
}
#[tokio::test] #[tokio::test]
async fn test_sql_form() { async fn test_sql_form() {
common_telemetry::init_default_ut_logging(); common_telemetry::init_default_ut_logging();
@@ -219,7 +263,8 @@ async fn test_sql_form() {
[ [
4950 4950
] ]
] ],
"total_rows": 1
}"# }"#
); );
} }
@@ -393,7 +438,8 @@ def test(n) -> vector[i64]:
[ [
4 4
] ]
] ],
"total_rows": 5
}"# }"#
); );
} }
@@ -460,7 +506,8 @@ def test(n, **params) -> vector[i64]:
[ [
46 46
] ]
] ],
"total_rows": 5
}"# }"#
); );
} }
@@ -484,21 +531,20 @@ fn create_invalid_script_query() -> Query<script_handler::ScriptQuery> {
}) })
} }
fn create_query(format: &str) -> Query<http_handler::SqlQuery> { fn create_query(format: &str, sql: &str, limit: Option<usize>) -> Query<http_handler::SqlQuery> {
Query(http_handler::SqlQuery { Query(http_handler::SqlQuery {
sql: Some("select sum(uint32s) from numbers limit 20".to_string()), sql: Some(sql.to_string()),
db: None,
format: Some(format.to_string()), format: Some(format.to_string()),
epoch: None, limit,
..Default::default()
}) })
} }
fn create_form(format: &str) -> Form<http_handler::SqlQuery> { fn create_form(format: &str) -> Form<http_handler::SqlQuery> {
Form(http_handler::SqlQuery { Form(http_handler::SqlQuery {
sql: Some("select sum(uint32s) from numbers limit 20".to_string()), sql: Some("select sum(uint32s) from numbers limit 20".to_string()),
db: None,
format: Some(format.to_string()), format: Some(format.to_string()),
epoch: None, ..Default::default()
}) })
} }

View File

@@ -101,6 +101,10 @@ impl MemTable {
/// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and /// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and
/// column type "uint32". Column data increased from 0 to 100. /// column type "uint32". Column data increased from 0 to 100.
pub fn default_numbers_table() -> TableRef { pub fn default_numbers_table() -> TableRef {
Self::specified_numbers_table(100)
}
pub fn specified_numbers_table(rows: u32) -> TableRef {
let column_schemas = vec![ColumnSchema::new( let column_schemas = vec![ColumnSchema::new(
"uint32s", "uint32s",
ConcreteDataType::uint32_datatype(), ConcreteDataType::uint32_datatype(),
@@ -108,7 +112,7 @@ impl MemTable {
)]; )];
let schema = Arc::new(Schema::new(column_schemas)); let schema = Arc::new(Schema::new(column_schemas));
let columns: Vec<VectorRef> = vec![Arc::new(UInt32Vector::from_slice( let columns: Vec<VectorRef> = vec![Arc::new(UInt32Vector::from_slice(
(0..100).collect::<Vec<_>>(), (0..rows).collect::<Vec<_>>(),
))]; ))];
let recordbatch = RecordBatch::new(schema, columns).unwrap(); let recordbatch = RecordBatch::new(schema, columns).unwrap();
MemTable::table("numbers", recordbatch) MemTable::table("numbers", recordbatch)

View File

@@ -147,7 +147,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
output[0], output[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]} "records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]],"total_rows":10}
})).unwrap() })).unwrap()
); );
@@ -189,7 +189,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
output[0], output[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]]} "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]],"total_rows":1}
})).unwrap() })).unwrap()
); );
@@ -207,7 +207,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
output[0], output[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1}
})).unwrap() })).unwrap()
); );
@@ -224,7 +224,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
output[0], output[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1}
})).unwrap() })).unwrap()
); );
@@ -241,13 +241,13 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
outputs[0], outputs[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1}
})).unwrap() })).unwrap()
); );
assert_eq!( assert_eq!(
outputs[1], outputs[1],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}} "records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}, "total_rows":0}
})) }))
.unwrap() .unwrap()
); );
@@ -276,7 +276,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
outputs[0], outputs[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1}
})).unwrap() })).unwrap()
); );
@@ -302,7 +302,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!( assert_eq!(
outputs[0], outputs[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1}
})).unwrap() })).unwrap()
); );
@@ -673,7 +673,7 @@ def test(n) -> vector[f64]:
assert_eq!( assert_eq!(
output[0], output[0],
serde_json::from_value::<GreptimeQueryOutput>(json!({ serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]} "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]],"total_rows": 10}
})).unwrap() })).unwrap()
); );