fix(log-query): panic on prometheus (#5429)

* fix(log-query): panic on prometheus

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test environment setup

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-01-23 19:48:37 +08:00
committed by Yingwen
parent 4d70589488
commit ea0a347edc
5 changed files with 71 additions and 1 deletions

1
Cargo.lock generated
View File

@@ -12373,6 +12373,7 @@ dependencies = [
"futures-util",
"hex",
"itertools 0.10.5",
"log-query",
"loki-api",
"meta-client",
"meta-srv",

View File

@@ -38,7 +38,7 @@ pub async fn logs(
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED
let _timer = crate::metrics::METRIC_HTTP_LOGS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

View File

@@ -45,6 +45,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
log-query = { workspace = true }
loki-api = "0.1"
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }

View File

@@ -394,6 +394,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()),
None,
)
.with_logs_handler(instance.instance.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
@@ -429,6 +430,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
Some(instance.instance.clone()),
)
.with_log_ingest_handler(instance.instance.clone(), None, None)
.with_logs_handler(instance.instance.clone())
.with_otlp_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());
@@ -467,6 +469,7 @@ pub async fn setup_test_prom_app_with_frontend(
ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()),
Some(frontend_ref.clone()),
)
.with_logs_handler(instance.instance.clone())
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())

View File

@@ -22,6 +22,7 @@ use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use log_query::{ColumnFilters, Context, Limit, LogQuery, TimeFilter};
use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_api::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
@@ -40,6 +41,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use table::table_name::TableName;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
@@ -97,6 +99,7 @@ macro_rules! http_tests {
test_otlp_traces,
test_otlp_logs,
test_loki_logs,
test_log_query,
);
)*
};
@@ -1882,6 +1885,68 @@ pub async fn test_loki_logs(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_log_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_log_query").await;
let client = TestClient::new(app);
// prepare data with SQL API
let res = client
.get("/v1/sql?sql=create table logs (`ts` timestamp time index, message string);")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let res = client
.post("/v1/sql?sql=insert into logs values ('2024-11-07 10:53:50', 'hello');")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
// test log query
let log_query = LogQuery {
table: TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "logs".to_string(),
},
time_filter: TimeFilter {
start: Some("2024-11-07".to_string()),
end: None,
span: None,
},
limit: Limit {
skip: None,
fetch: Some(1),
},
columns: vec![
ColumnFilters {
column_name: "ts".to_string(),
filters: vec![],
},
ColumnFilters {
column_name: "message".to_string(),
filters: vec![],
},
],
context: Context::None,
};
let res = client
.post("/v1/logs")
.header("Content-Type", "application/json")
.body(serde_json::to_string(&log_query).unwrap())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let resp = res.text().await;
let v = get_rows_from_output(&resp);
assert_eq!(v, "[[1730976830000,\"hello\"]]");
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())