diff --git a/Cargo.lock b/Cargo.lock index 98ac14afd2..ea055ec03d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11762,6 +11762,7 @@ dependencies = [ "hex", "hyper-util", "itertools 0.10.5", + "log-query", "loki-api", "meta-client", "meta-srv", diff --git a/src/servers/src/http/logs.rs b/src/servers/src/http/logs.rs index 0375865b31..ffdfdb034b 100644 --- a/src/servers/src/http/logs.rs +++ b/src/servers/src/http/logs.rs @@ -38,7 +38,7 @@ pub async fn logs( query_ctx.set_channel(Channel::Http); let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED + let _timer = crate::metrics::METRIC_HTTP_LOGS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 9f70eafc46..e6b6a8e166 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -46,6 +46,7 @@ frontend = { workspace = true, features = ["testing"] } futures.workspace = true futures-util.workspace = true hyper-util = { workspace = true, features = ["tokio"] } +log-query = { workspace = true } loki-api.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 7235833a3b..a326681e0f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -393,6 +393,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router }; let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone())) + .with_logs_handler(instance.instance.clone()) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build(); @@ -425,6 +426,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( http_server = http_server .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone())) .with_log_ingest_handler(instance.instance.clone(), None, None) + .with_logs_handler(instance.instance.clone()) .with_otlp_handler(instance.instance.clone()) .with_greptime_config_options(instance.opts.to_toml().unwrap()); @@ -474,6 +476,7 @@ pub async fn setup_test_prom_app_with_frontend( let is_strict_mode = true; let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) + .with_logs_handler(instance.instance.clone()) .with_prom_handler(frontend_ref.clone(), true, is_strict_mode) .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 76c17ac5cd..714f19a972 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -22,6 +22,7 @@ use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use log_query::{Context, Limit, LogQuery, TimeFilter}; use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter}; use loki_api::prost_types::Timestamp; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; @@ -39,6 +40,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::GreptimeQueryOutput; use servers::prom_store; +use table::table_name::TableName; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, @@ -99,6 +101,7 @@ macro_rules! http_tests { test_loki_json_logs, test_elasticsearch_logs, test_elasticsearch_logs_with_index, + test_log_query, ); )* }; @@ -2140,6 +2143,61 @@ pub async fn test_elasticsearch_logs_with_index(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_log_query(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_log_query").await; + + let client = TestClient::new(app).await; + + // prepare data with SQL API + let res = client + .get("/v1/sql?sql=create table logs (`ts` timestamp time index, message string);") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let res = client + .post("/v1/sql?sql=insert into logs values ('2024-11-07 10:53:50', 'hello');") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + + // test log query + let log_query = LogQuery { + table: TableName { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "logs".to_string(), + }, + time_filter: TimeFilter { + start: Some("2024-11-07".to_string()), + end: None, + span: None, + }, + limit: Limit { + skip: None, + fetch: Some(1), + }, + columns: vec!["ts".to_string(), "message".to_string()], + filters: vec![], + context: Context::None, + exprs: vec![], + }; + let res = client + .post("/v1/logs") + .header("Content-Type", "application/json") + .body(serde_json::to_string(&log_query).unwrap()) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let resp = res.text().await; + let v = get_rows_from_output(&resp); + assert_eq!(v, "[[1730976830000,\"hello\"]]"); + + guard.remove_all().await; +} + async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str())