diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a231137b2d..bea4737fe7 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -563,6 +563,7 @@ impl PromHandler for Instance { let stmt = QueryLanguageParser::parse_promql(query).with_context(|_| ParsePromQLSnafu { query: query.clone(), })?; + self.statement_executor .execute_stmt(stmt, query_ctx) .await diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index f54076d25c..daae98feaf 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -42,7 +42,7 @@ use schemars::JsonSchema; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use session::context::{QueryContext, QueryContextRef}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, Location, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; use tower::ServiceBuilder; @@ -97,6 +97,7 @@ impl PromServer { .route("/query", routing::post(instant_query).get(instant_query)) .route("/query_range", routing::post(range_query).get(range_query)) .route("/labels", routing::post(labels_query).get(labels_query)) + .route("/series", routing::post(series_query).get(series_query)) .route( "/label/:label_name/values", routing::get(label_values_query), @@ -191,6 +192,7 @@ pub struct PromData { pub enum PromResponse { PromData(PromData), Labels(Vec), + Series(Vec>), LabelValues(Vec), } @@ -593,6 +595,30 @@ pub async fn labels_query( PromJsonResponse::success(PromResponse::Labels(sorted_labels)) } +async fn retrieve_series_from_query_result( + result: Result, + series: &mut Vec>, + table_name: &str, +) -> Result<()> { + match result? { + Output::RecordBatches(batches) => { + record_batches_to_series(batches, series, table_name)?; + Ok(()) + } + Output::Stream(stream) => { + let batches = RecordBatches::try_collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + record_batches_to_series(batches, series, table_name)?; + Ok(()) + } + Output::AffectedRows(_) => Err(Error::UnexpectedResult { + reason: "expected data result, but got affected rows".to_string(), + location: Location::default(), + }), + } +} + /// Retrieve labels name from query result async fn retrieve_labels_name_from_query_result( result: Result, @@ -617,6 +643,28 @@ async fn retrieve_labels_name_from_query_result( } } +fn record_batches_to_series( + batches: RecordBatches, + series: &mut Vec>, + table_name: &str, +) -> Result<()> { + for batch in batches.iter() { + for row in batch.rows() { + let mut element: HashMap = row + .iter() + .enumerate() + .map(|(idx, column)| { + let column_name = batch.schema.column_name_by_index(idx); + (column_name.to_string(), column.to_string()) + }) + .collect(); + element.insert("__name__".to_string(), table_name.to_string()); + series.push(element); + } + } + Ok(()) +} + /// Retrieve labels name from record batches fn record_batches_to_labels_name( batches: RecordBatches, @@ -803,14 +851,12 @@ async fn retrieve_label_values_from_record_batch( ConcreteDataType::String(_) => {} _ => return Ok(()), } - for batch in batches.iter() { let label_column = batch .column(label_col_idx) .as_any() .downcast_ref::() .unwrap(); - for row_index in 0..batch.num_rows() { if let Some(label_value) = label_column.get_data(row_index) { labels_values.insert(label_value.to_string()); @@ -820,3 +866,57 @@ async fn retrieve_label_values_from_record_batch( Ok(()) } + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct SeriesQuery { + start: Option, + end: Option, + #[serde(flatten)] + matches: Matches, + db: Option, +} + +#[axum_macros::debug_handler] +pub async fn series_query( + State(handler): State, + Query(params): Query, + Form(form_params): Form, +) -> Json { + let mut queries: Vec = params.matches.0; + if queries.is_empty() { + queries = form_params.matches.0; + } + if queries.is_empty() { + return PromJsonResponse::error("Unsupported", "match[] parameter is required"); + } + let start = params + .start + .or(form_params.start) + .unwrap_or_else(yesterday_rfc3339); + let end = params + .end + .or(form_params.end) + .unwrap_or_else(current_time_rfc3339); + + let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); + let query_ctx = Arc::new(QueryContext::with(catalog, schema)); + + let mut series = Vec::new(); + for query in queries { + let table_name = query.clone(); + let prom_query = PromQuery { + query, + start: start.clone(), + end: end.clone(), + // TODO: find a better value for step + step: DEFAULT_LOOKBACK_STRING.to_string(), + }; + let result = handler.do_query(&prom_query, query_ctx.clone()).await; + if let Err(err) = retrieve_series_from_query_result(result, &mut series, &table_name).await + { + return PromJsonResponse::error(err.status_code().to_string(), err.to_string()); + } + } + PromJsonResponse::success(PromResponse::Series(series)) +} diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index f4a7bc99a6..70c32b6d3e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::env; use std::net::SocketAddr; use std::sync::Arc; @@ -35,7 +36,11 @@ use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::Instance; use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; +use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, RawSchema}; +use datatypes::vectors::{ + Float64VectorBuilder, MutableVector, StringVectorBuilder, TimestampMillisecondVectorBuilder, +}; use frontend::instance::Instance as FeInstance; use frontend::service_config::{MysqlOptions, PostgresOptions}; use object_store::services::{Azblob, Oss, S3}; @@ -54,7 +59,7 @@ use servers::server::Server; use servers::Mode; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; -use table::requests::{CreateTableRequest, TableOptions}; +use table::requests::{CreateTableRequest, InsertRequest, TableOptions}; #[derive(Debug, Eq, PartialEq)] pub enum StorageType { @@ -271,6 +276,7 @@ pub async fn create_test_table( catalog_manager: &CatalogManagerRef, sql_handler: &SqlHandler, ts_type: ConcreteDataType, + table_name: &str, ) -> Result<()> { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), @@ -278,8 +284,6 @@ pub async fn create_test_table( ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("ts", ts_type, true).with_time_index(true), ]; - - let table_name = "demo"; let table_engine: TableEngineRef = sql_handler .table_engine_manager() .engine(MITO_ENGINE) @@ -327,6 +331,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router instance.catalog_manager(), instance.sql_handler(), ConcreteDataType::timestamp_millisecond_datatype(), + "demo", ) .await .unwrap(); @@ -363,6 +368,7 @@ pub async fn setup_test_http_app_with_frontend( frontend.catalog_manager(), instance.sql_handler(), ConcreteDataType::timestamp_millisecond_datatype(), + "demo", ) .await .unwrap(); @@ -382,25 +388,83 @@ pub async fn setup_test_http_app_with_frontend( (app, guard) } +fn mock_insert_request(host: &str, cpu: f64, memory: f64, ts: i64) -> InsertRequest { + let mut columns_values = HashMap::with_capacity(4); + let mut builder = StringVectorBuilder::with_capacity(1); + builder.push(Some(host)); + columns_values.insert("host".to_string(), builder.to_vector()); + + let mut builder = Float64VectorBuilder::with_capacity(1); + builder.push(Some(cpu)); + columns_values.insert("cpu".to_string(), builder.to_vector()); + + let mut builder = Float64VectorBuilder::with_capacity(1); + builder.push(Some(memory)); + columns_values.insert("memory".to_string(), builder.to_vector()); + + let mut builder = TimestampMillisecondVectorBuilder::with_capacity(1); + builder.push(Some(ts.into())); + columns_values.insert("ts".to_string(), builder.to_vector()); + + InsertRequest { + catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), + schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), + table_name: "demo".to_string(), + columns_values, + region_number: 0, + } +} + pub async fn setup_test_prom_app_with_frontend( store_type: StorageType, name: &str, ) -> (Router, TestGuard) { + std::env::set_var("TZ", "UTC"); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let frontend = FeInstance::try_new_standalone(instance.clone()) .await .unwrap(); instance.start().await.unwrap(); + create_test_table( frontend.catalog_manager(), instance.sql_handler(), ConcreteDataType::timestamp_millisecond_datatype(), + "demo", ) .await .unwrap(); - let prom_server = PromServer::create_server(Arc::new(frontend) as _); - let app = prom_server.make_app(); + let demo = frontend + .catalog_manager() + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "demo") + .await + .unwrap() + .unwrap(); + + let _ = demo + .insert(mock_insert_request("host1", 1.1, 2.2, 0)) + .await + .unwrap(); + let _ = demo + .insert(mock_insert_request("host2", 2.1, 4.3, 600000)) + .await + .unwrap(); + + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + let frontend_ref = Arc::new(frontend); + let http_server = HttpServerBuilder::new(http_opts) + .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) + .with_script_handler(frontend_ref.clone()) + .with_prom_handler(frontend_ref.clone()) + .build(); + let prom_server = PromServer::create_server(frontend_ref); + let app = http_server.build(http_server.make_app()); + let app = app.merge(prom_server.make_app()); (app, guard) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2337ee3d59..ca98b12a7f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -18,7 +18,7 @@ use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; use servers::http::handler::HealthResponse; use servers::http::{JsonOutput, JsonResponse}; -use servers::prom::PromJsonResponse; +use servers::prom::{PromJsonResponse, PromResponse}; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend, StorageType, @@ -315,7 +315,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // labels - let res = client.get("/api/v1/labels?match[]=up").send().await; + let res = client.get("/api/v1/labels?match[]=demo").send().await; assert_eq!(res.status(), StatusCode::OK); let res = client .post("/api/v1/labels?match[]=up") @@ -323,6 +323,19 @@ pub async fn test_prom_http_api(store_type: StorageType) { .send() .await; assert_eq!(res.status(), StatusCode::OK); + let res = client + .get("/api/v1/labels?match[]=demo&start=0") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["__name__", "cpu", "host", "memory", "ts"])) + .unwrap() + ); + // labels query with multiple match[] params let res = client .get("/api/v1/labels?match[]=up&match[]=down") @@ -336,6 +349,29 @@ pub async fn test_prom_http_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); + // series + let res = client + .get("/api/v1/series?match[]=demo&start=0&end=0") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!( + [{"__name__" : "demo","ts":"1970-01-01 00:00:00+0000","cpu":"1.1","host":"host1","memory":"2.2"}] + )) + .unwrap() + ); + + let res = client + .post("/api/v1/series?match[]=up&match[]=down") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + // label values // should return error if there is no match[] let res = client.get("/api/v1/label/instance/values").send().await; @@ -347,14 +383,16 @@ pub async fn test_prom_http_api(store_type: StorageType) { // single match[] let res = client - .get("/api/v1/label/instance/values?match[]=up") + .get("/api/v1/label/host/values?match[]=demo&start=0&end=600") .send() .await; assert_eq!(res.status(), StatusCode::OK); - let prom_resp = res.json::().await; - assert_eq!(prom_resp.status, "success"); - assert!(prom_resp.error.is_none()); - assert!(prom_resp.error_type.is_none()); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["host1", "host2"])).unwrap() + ); // multiple match[] let res = client