From ea0a347edcfe01f15a420b51ad673a161e209ad0 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 23 Jan 2025 19:48:37 +0800 Subject: [PATCH] fix(log-query): panic on prometheus (#5429) * fix(log-query): panic on prometheus Signed-off-by: Ruihang Xia * fix test environment setup Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/servers/src/http/logs.rs | 2 +- tests-integration/Cargo.toml | 1 + tests-integration/src/test_util.rs | 3 ++ tests-integration/tests/http.rs | 65 ++++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index d6099912f3..ddfd5ab23e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12373,6 +12373,7 @@ dependencies = [ "futures-util", "hex", "itertools 0.10.5", + "log-query", "loki-api", "meta-client", "meta-srv", diff --git a/src/servers/src/http/logs.rs b/src/servers/src/http/logs.rs index 0375865b31..ffdfdb034b 100644 --- a/src/servers/src/http/logs.rs +++ b/src/servers/src/http/logs.rs @@ -38,7 +38,7 @@ pub async fn logs( query_ctx.set_channel(Channel::Http); let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED + let _timer = crate::metrics::METRIC_HTTP_LOGS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 3fd12f1494..aee072fa84 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -45,6 +45,7 @@ flow.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true futures-util.workspace = true +log-query = { workspace = true } loki-api = "0.1" meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 57ec7f6f86..8b0a5edc4b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -394,6 +394,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), None, ) + .with_logs_handler(instance.instance.clone()) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build(); @@ -429,6 +430,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( Some(instance.instance.clone()), ) .with_log_ingest_handler(instance.instance.clone(), None, None) + .with_logs_handler(instance.instance.clone()) .with_otlp_handler(instance.instance.clone()) .with_greptime_config_options(instance.opts.to_toml().unwrap()); @@ -467,6 +469,7 @@ pub async fn setup_test_prom_app_with_frontend( ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()), Some(frontend_ref.clone()), ) + .with_logs_handler(instance.instance.clone()) .with_prom_handler(frontend_ref.clone(), true, is_strict_mode) .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f81cd7a4ad..c64536f87b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -22,6 +22,7 @@ use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use log_query::{ColumnFilters, Context, Limit, LogQuery, TimeFilter}; use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter}; use loki_api::prost_types::Timestamp; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; @@ -40,6 +41,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::GreptimeQueryOutput; use servers::prom_store; +use table::table_name::TableName; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, @@ -97,6 +99,7 @@ macro_rules! http_tests { test_otlp_traces, test_otlp_logs, test_loki_logs, + test_log_query, ); )* }; @@ -1882,6 +1885,68 @@ pub async fn test_loki_logs(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_log_query(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_log_query").await; + + let client = TestClient::new(app); + + // prepare data with SQL API + let res = client + .get("/v1/sql?sql=create table logs (`ts` timestamp time index, message string);") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let res = client + .post("/v1/sql?sql=insert into logs values ('2024-11-07 10:53:50', 'hello');") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + + // test log query + let log_query = LogQuery { + table: TableName { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "logs".to_string(), + }, + time_filter: TimeFilter { + start: Some("2024-11-07".to_string()), + end: None, + span: None, + }, + limit: Limit { + skip: None, + fetch: Some(1), + }, + columns: vec![ + ColumnFilters { + column_name: "ts".to_string(), + filters: vec![], + }, + ColumnFilters { + column_name: "message".to_string(), + filters: vec![], + }, + ], + context: Context::None, + }; + let res = client + .post("/v1/logs") + .header("Content-Type", "application/json") + .body(serde_json::to_string(&log_query).unwrap()) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let resp = res.text().await; + let v = get_rows_from_output(&resp); + assert_eq!(v, "[[1730976830000,\"hello\"]]"); + + guard.remove_all().await; +} + async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str())