fix(log-query): panic on prometheus (#5429)

* fix(log-query): panic on prometheus

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test environment setup

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-01-23 19:48:37 +08:00
committed by GitHub
parent 45e68603a1
commit 9af4160068
5 changed files with 64 additions and 1 deletions

View File

@@ -46,6 +46,7 @@ frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
log-query = { workspace = true }
loki-api.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }

View File

@@ -393,6 +393,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
};
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()))
.with_logs_handler(instance.instance.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
@@ -425,6 +426,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
http_server = http_server
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()))
.with_log_ingest_handler(instance.instance.clone(), None, None)
.with_logs_handler(instance.instance.clone())
.with_otlp_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());
@@ -474,6 +476,7 @@ pub async fn setup_test_prom_app_with_frontend(
let is_strict_mode = true;
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_logs_handler(instance.instance.clone())
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())

View File

@@ -22,6 +22,7 @@ use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use log_query::{Context, Limit, LogQuery, TimeFilter};
use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_api::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
@@ -39,6 +40,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use table::table_name::TableName;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
@@ -99,6 +101,7 @@ macro_rules! http_tests {
test_loki_json_logs,
test_elasticsearch_logs,
test_elasticsearch_logs_with_index,
test_log_query,
);
)*
};
@@ -2140,6 +2143,61 @@ pub async fn test_elasticsearch_logs_with_index(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_log_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_log_query").await;
let client = TestClient::new(app).await;
// prepare data with SQL API
let res = client
.get("/v1/sql?sql=create table logs (`ts` timestamp time index, message string);")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let res = client
.post("/v1/sql?sql=insert into logs values ('2024-11-07 10:53:50', 'hello');")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
// test log query
let log_query = LogQuery {
table: TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "logs".to_string(),
},
time_filter: TimeFilter {
start: Some("2024-11-07".to_string()),
end: None,
span: None,
},
limit: Limit {
skip: None,
fetch: Some(1),
},
columns: vec!["ts".to_string(), "message".to_string()],
filters: vec![],
context: Context::None,
exprs: vec![],
};
let res = client
.post("/v1/logs")
.header("Content-Type", "application/json")
.body(serde_json::to_string(&log_query).unwrap())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let resp = res.text().await;
let v = get_rows_from_output(&resp);
assert_eq!(v, "[[1730976830000,\"hello\"]]");
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())