diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index fe4d1aa034..b21af27f10 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -251,6 +251,23 @@ pub struct PromqlQuery { pub step: String, pub lookback: Option, pub db: Option, + // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`, + // `arrow`], + // the default value is `greptimedb_v1` + pub format: Option, + // For arrow output + pub compression: Option, + // Returns epoch timestamps with the specified precision. + // Both u and µ indicate microseconds. + // epoch = [ns,u,µ,ms,s], + // + // For influx output only + // + // TODO(jeremy): currently, only InfluxDB result format is supported, + // and all columns of the `Timestamp` type will be converted to their + // specified time precision. Maybe greptimedb format can support this + // param too. + pub epoch: Option, } impl From for PromQuery { @@ -292,9 +309,30 @@ pub async fn promql( let resp = ErrorResponse::from_error_message(status, msg); HttpResponse::Error(resp) } else { + let format = params + .format + .as_ref() + .map(|s| s.to_lowercase()) + .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1)) + .unwrap_or(ResponseFormat::GreptimedbV1); + let epoch = params + .epoch + .as_ref() + .map(|s| s.to_lowercase()) + .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); + let compression = params.compression.clone(); + let prom_query = params.into(); let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await; - GreptimedbV1Response::from_output(outputs).await + + match format { + ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await, + ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Table => TableResponse::from_output(outputs).await, + ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, + ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, + ResponseFormat::Json => JsonResponse::from_output(outputs).await, + } }; resp.with_execution_time(exec_start.elapsed().as_millis() as u64) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9e369e654d..2c663e99b9 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -483,7 +483,7 @@ pub async fn test_sql_api(store_type: StorageType) { } pub async fn test_prometheus_promql_api(store_type: StorageType) { - let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "promql_api").await; let client = TestClient::new(app).await; let res = client @@ -492,7 +492,18 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let _body = serde_json::from_str::(&res.text().await).unwrap(); + let json_text = res.text().await; + assert!(serde_json::from_str::(&json_text).is_ok()); + + let res = client + .get("/v1/promql?query=1&start=0&end=100&step=5s&format=csv") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let csv_body = &res.text().await; + assert_eq!("0,1.0\n5000,1.0\n10000,1.0\n15000,1.0\n20000,1.0\n25000,1.0\n30000,1.0\n35000,1.0\n40000,1.0\n45000,1.0\n50000,1.0\n55000,1.0\n60000,1.0\n65000,1.0\n70000,1.0\n75000,1.0\n80000,1.0\n85000,1.0\n90000,1.0\n95000,1.0\n100000,1.0\n", csv_body); + guard.remove_all().await; }