diff --git a/Cargo.lock b/Cargo.lock index 516d1553c5..f4f2b083dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3997,9 +3997,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "foldhash" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" [[package]] name = "form_urlencoded" diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs new file mode 100644 index 0000000000..30fb7af0db --- /dev/null +++ b/src/servers/src/elasticsearch.rs @@ -0,0 +1,396 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use axum::extract::{Query, State}; +use axum::headers::ContentType; +use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use axum::response::IntoResponse; +use axum::{Extension, TypedHeader}; +use common_error::ext::ErrorExt; +use common_telemetry::{debug, error}; +use once_cell::sync::Lazy; +use serde_json::{json, Deserializer, Value}; +use session::context::{Channel, QueryContext}; +use snafu::{ensure, ResultExt}; + +use crate::error::{ + status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, + Result as ServersResult, +}; +use crate::http::event::{ + ingest_logs_inner, LogIngesterQueryParams, LogState, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, +}; +use crate::metrics::{ + METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED, +}; + +// The headers for every response of Elasticsearch API. +static ELASTICSEARCH_HEADERS: Lazy = Lazy::new(|| { + HeaderMap::from_iter([ + ( + axum::http::header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ), + ( + HeaderName::from_static("x-elastic-product"), + HeaderValue::from_static("Elasticsearch"), + ), + ]) +}); + +// The fake version of Elasticsearch and used for `_version` API. +const ELASTICSEARCH_VERSION: &str = "8.16.0"; + +// Return fake response for Elasticsearch ping request. +#[axum_macros::debug_handler] +pub async fn handle_get_version() -> impl IntoResponse { + let body = serde_json::json!({ + "version": { + "number": ELASTICSEARCH_VERSION + } + }); + (StatusCode::OK, elasticsearch_headers(), axum::Json(body)) +} + +// Return fake response for Elasticsearch license request. +// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html. +#[axum_macros::debug_handler] +pub async fn handle_get_license() -> impl IntoResponse { + let body = serde_json::json!({ + "license": { + "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx", + "type": "oss", + "status": "active", + "expiry_date_in_millis": 4891198687000_i64, + } + }); + (StatusCode::OK, elasticsearch_headers(), axum::Json(body)) +} + +// Process `_bulk` API requests. Only support to create logs. +// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request. +#[axum_macros::debug_handler] +pub async fn handle_bulk_api( + State(log_state): State, + Query(params): Query, + Extension(mut query_ctx): Extension, + TypedHeader(_content_type): TypedHeader, + payload: String, +) -> impl IntoResponse { + let start = Instant::now(); + debug!( + "Received bulk request, params: {:?}, payload: {:?}", + params, payload + ); + + // The `schema` is already set in the query_ctx in auth process. + query_ctx.set_channel(Channel::Elasticsearch); + + let db = params.db.unwrap_or_else(|| "public".to_string()); + + // Record the ingestion time histogram. + let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED + .with_label_values(&[&db]) + .start_timer(); + + let table = if let Some(table) = params.table { + table + } else { + return ( + StatusCode::BAD_REQUEST, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + StatusCode::BAD_REQUEST.as_u16() as u32, + "require parameter 'table'", + )), + ); + }; + + // If pipeline_name is not provided, use the internal pipeline. + let pipeline = if let Some(pipeline) = params.pipeline_name { + pipeline + } else { + GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string() + }; + + // Read the ndjson payload and convert it to a vector of Value. + let log_values = match convert_es_input_to_log_values(&payload, ¶ms.msg_field) { + Ok(log_values) => log_values, + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + StatusCode::BAD_REQUEST.as_u16() as u32, + e.to_string().as_str(), + )), + ); + } + }; + let log_num = log_values.len(); + + if let Err(e) = ingest_logs_inner( + log_state.log_handler, + pipeline, + None, + table, + log_values, + Arc::new(query_ctx), + ) + .await + { + error!(e; "Failed to ingest logs"); + return ( + status_code_to_http_status(&e.status_code()), + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + e.status_code() as u32, + e.to_string().as_str(), + )), + ); + } + + // Record the number of documents ingested. + METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT + .with_label_values(&[&db]) + .inc_by(log_num as u64); + + ( + StatusCode::OK, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + log_num, + StatusCode::CREATED.as_u16() as u32, + "", + )), + ) +} + +// It will generate the following response when write _bulk request to GreptimeDB successfully: +// { +// "took": 1000, +// "errors": false, +// "items": [ +// { "create": { "status": 201 } }, +// { "create": { "status": 201 } }, +// ... +// ] +// } +// If the status code is not 201, it will generate the following response: +// { +// "took": 1000, +// "errors": true, +// "items": [ +// { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "" } } } +// ] +// } +fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value { + if error_reason.is_empty() { + let items: Vec = (0..n) + .map(|_| { + json!({ + "create": { + "status": status_code + } + }) + }) + .collect(); + json!({ + "took": took_ms, + "errors": false, + "items": items, + }) + } else { + json!({ + "took": took_ms, + "errors": true, + "items": [ + { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } } + ] + }) + } +} + +/// Returns the headers for every response of Elasticsearch API. +pub fn elasticsearch_headers() -> HeaderMap { + ELASTICSEARCH_HEADERS.clone() +} + +// The input will be Elasticsearch bulk request in NDJSON format. +// For example, the input will be like this: +// { "index" : { "_index" : "test", "_id" : "1" } } +// { "field1" : "value1" } +// { "index" : { "_index" : "test", "_id" : "2" } } +// { "field2" : "value2" } +fn convert_es_input_to_log_values( + input: &str, + msg_field: &Option, +) -> ServersResult> { + // Read the ndjson payload and convert it to `Vec`. Return error if the input is not a valid JSON. + let values: Vec = Deserializer::from_str(input) + .into_iter::() + .collect::>() + .context(ParseJsonSnafu)?; + + // Check if the input is empty. + ensure!( + !values.is_empty(), + InvalidElasticsearchInputSnafu { + reason: "empty bulk request".to_string(), + } + ); + + let mut log_values: Vec = Vec::with_capacity(values.len() / 2); + + // For Elasticsearch post `_bulk` API, each chunk contains two objects: + // 1. The first object is the command, it should be `create` or `index`. `create` is used for insert, `index` is used for upsert. + // 2. The second object is the document data. + let mut is_document = false; + for v in values { + if !is_document { + // Read the first object to get the command, it should be `create` or `index`. + ensure!( + v.get("create").is_some() || v.get("index").is_some(), + InvalidElasticsearchInputSnafu { + reason: format!( + "invalid bulk request, expected 'create' or 'index' but got {:?}", + v + ), + } + ); + is_document = true; + continue; + } + + // It means the second object is the document data. + if is_document { + // If the msg_field is provided, fetch the value of the field from the document data. + let log_value = if let Some(msg_field) = msg_field { + get_log_value_from_msg_field(v, msg_field) + } else { + v + }; + + log_values.push(log_value); + + // Reset the flag for the next chunk. + is_document = false; + } + } + + debug!("Received log data: {:?}", log_values); + + Ok(log_values) +} + +fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value { + if let Some(message) = v.get_mut(msg_field) { + let message = message.take(); + match message { + Value::String(s) => match serde_json::from_str::(&s) { + Ok(s) => s, + // If the message is not a valid JSON, just use the original message as the log value. + Err(_) => Value::String(s), + }, + // If the message is not a string, just use the original message as the log value. + _ => message, + } + } else { + // If the msg_field is not found, just use the original message as the log value. + v + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_es_input_to_log_values() { + let test_cases = vec![ + // Normal case. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"foo1":"foo1_value", "bar1":"bar1_value"} + {"create":{"_index":"test","_id":"2"}} + {"foo2":"foo2_value","bar2":"bar2_value"} + "#, + None, + Ok(vec![ + json!({"foo1": "foo1_value", "bar1": "bar1_value"}), + json!({"foo2": "foo2_value", "bar2": "bar2_value"}), + ]), + ), + // Specify the `data` field as the message field and the value is a JSON string. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"} + {"create":{"_index":"test","_id":"2"}} + {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"} + "#, + Some("data".to_string()), + Ok(vec![ + json!({"foo1": "foo1_value", "bar1": "bar1_value"}), + json!({"foo2": "foo2_value", "bar2": "bar2_value"}), + ]), + ), + // Simulate the log data from Logstash. + ( + r#" + {"create":{"_id":null,"_index":"logs-generic-default","routing":null}} + {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}} + {"create":{"_id":null,"_index":"logs-generic-default","routing":null}} + {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}} + "#, + Some("message".to_string()), + Ok(vec![ + json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""), + json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""), + ]), + ), + // With invalid bulk request. + ( + r#" + { "not_create_or_index" : { "_index" : "test", "_id" : "1" } } + { "foo1" : "foo1_value", "bar1" : "bar1_value" } + "#, + None, + Err(InvalidElasticsearchInputSnafu { + reason: "it's a invalid bulk request".to_string(), + }), + ), + ]; + + for (input, msg_field, expected) in test_cases { + let log_values = convert_es_input_to_log_values(input, &msg_field); + if expected.is_ok() { + assert_eq!(log_values.unwrap(), expected.unwrap()); + } else { + assert!(log_values.is_err()); + } + } + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8cab4516f8..b8882b0b72 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -586,6 +586,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid elasticsearch input, reason: {}", reason))] + InvalidElasticsearchInput { + reason: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -655,7 +662,8 @@ impl ErrorExt for Error { | UnsupportedJsonDataTypeForTag { .. } | InvalidTableName { .. } | PrepareStatementNotFound { .. } - | FailedToParseQuery { .. } => StatusCode::InvalidArguments, + | FailedToParseQuery { .. } + | InvalidElasticsearchInput { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 68caa2a4da..1c32466566 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use auth::UserProviderRef; use axum::error_handling::HandleErrorLayer; use axum::extract::DefaultBodyLimit; +use axum::http::StatusCode as HttpStatusCode; use axum::response::{IntoResponse, Json, Response}; use axum::{middleware, routing, BoxError, Router}; use common_base::readable_size::ReadableSize; @@ -48,6 +49,7 @@ use tower_http::trace::TraceLayer; use self::authorize::AuthState; use self::result::table_result::TableResponse; use crate::configurator::ConfiguratorRef; +use crate::elasticsearch; use crate::error::{AddressBindSnafu, AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu}; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::prometheus::{ @@ -597,7 +599,19 @@ impl HttpServerBuilder { let router = router.nest( &format!("/{HTTP_API_VERSION}/loki"), - HttpServer::route_loki(log_state), + HttpServer::route_loki(log_state.clone()), + ); + + let router = router.nest( + &format!("/{HTTP_API_VERSION}/elasticsearch"), + HttpServer::route_elasticsearch(log_state.clone()), + ); + + let router = router.nest( + &format!("/{HTTP_API_VERSION}/elasticsearch/"), + Router::new() + .route("/", routing::get(elasticsearch::handle_get_version)) + .with_state(log_state), ); Self { router, ..self } @@ -743,6 +757,82 @@ impl HttpServer { .with_state(log_state) } + fn route_elasticsearch(log_state: LogState) -> Router { + Router::new() + // Return fake responsefor HEAD '/' request. + .route( + "/", + routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())), + ) + // Return fake response for Elasticsearch version request. + .route("/", routing::get(elasticsearch::handle_get_version)) + // Return fake response for Elasticsearch license request. + .route("/_license", routing::get(elasticsearch::handle_get_license)) + .route("/_bulk", routing::post(elasticsearch::handle_bulk_api)) + // Return fake response for Elasticsearch ilm request. + .route( + "/_ilm/policy/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Elasticsearch index template request. + .route( + "/_index_template/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Elasticsearch ingest pipeline request. + // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html. + .route( + "/_ingest/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Elasticsearch nodes discovery request. + // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html. + .route( + "/_nodes/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Logstash APIs requests. + // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/logstash-apis.html + .route( + "/logstash/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + .route( + "/_logstash/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(RequestDecompressionLayer::new()), + ) + .with_state(log_state) + } + fn route_log(log_state: LogState) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 951c796ac3..54421c9886 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -53,8 +53,8 @@ use crate::metrics::{ }; use crate::query_handler::PipelineHandlerRef; +pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; -const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; lazy_static! { pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json(); @@ -64,15 +64,24 @@ lazy_static! { ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap(); } +/// LogIngesterQueryParams is used for query params of log ingester API. #[derive(Debug, Default, Serialize, Deserialize)] pub struct LogIngesterQueryParams { - pub table: Option, + /// The database where log data will be written to. pub db: Option, + /// The table where log data will be written to. + pub table: Option, + /// The pipeline that will be used for log ingestion. pub pipeline_name: Option, - pub ignore_errors: Option, - + /// The version of the pipeline to be used for log ingestion. pub version: Option, + /// Whether to ignore errors during log ingestion. + pub ignore_errors: Option, + /// The source of the log data. pub source: Option, + /// The JSON field name of the log message. If not provided, it will take the whole log as the message. + /// The field must be at the top level of the JSON structure. + pub msg_field: Option, } pub struct PipelineContent(String); @@ -530,7 +539,7 @@ fn extract_pipeline_value_by_content_type( }) } -async fn ingest_logs_inner( +pub(crate) async fn ingest_logs_inner( state: PipelineHandlerRef, pipeline_name: String, version: PipelineVersion, diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 92f2b8b9d0..417d264651 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -23,6 +23,7 @@ use datatypes::schema::Schema; pub mod addrs; pub mod configurator; +pub(crate) mod elasticsearch; pub mod error; pub mod export_metrics; pub mod grpc; diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index fe81fed6ce..2a3930e698 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -182,6 +182,20 @@ lazy_static! { &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] ) .unwrap(); + pub static ref METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_elasticsearch_logs_ingestion_elapsed", + "servers elasticsearch logs ingestion elapsed", + &[METRIC_DB_LABEL] + ) + .unwrap(); + pub static ref METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT: IntCounterVec = register_int_counter_vec!( + "greptime_servers_elasticsearch_logs_docs_count", + "servers elasticsearch logs docs count", + &[METRIC_DB_LABEL] + ) + .unwrap(); + pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_logs_transform_elapsed", diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 1c621b3ab7..3544228afe 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -398,6 +398,7 @@ pub enum Channel { Influx = 7, Opentsdb = 8, Loki = 9, + Elasticsearch = 10, } impl From for Channel { @@ -412,7 +413,7 @@ impl From for Channel { 7 => Self::Influx, 8 => Self::Opentsdb, 9 => Self::Loki, - + 10 => Self::Elasticsearch, _ => Self::Unknown, } } @@ -440,6 +441,7 @@ impl Display for Channel { Channel::Influx => write!(f, "influx"), Channel::Opentsdb => write!(f, "opentsdb"), Channel::Loki => write!(f, "loki"), + Channel::Elasticsearch => write!(f, "elasticsearch"), Channel::Unknown => write!(f, "unknown"), } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ed5dcde94f..93042f9c0d 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -97,6 +97,7 @@ macro_rules! http_tests { test_otlp_logs, test_loki_pb_logs, test_loki_json_logs, + test_elasticsearch_logs, ); )* }; @@ -1969,6 +1970,47 @@ pub async fn test_loki_json_logs(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_elasticsearch_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_elasticsearch_logs").await; + + let client = TestClient::new(app); + + let body = r#" + {"create":{"_index":"test","_id":"1"}} + {"foo":"foo_value1", "bar":"value1"} + {"create":{"_index":"test","_id":"2"}} + {"foo":"foo_value2","bar":"value2"} + "#; + + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/json"), + )], + "/v1/elasticsearch/_bulk?table=elasticsearch_logs_test", + body.as_bytes().to_vec(), + false, + ) + .await; + + assert_eq!(StatusCode::OK, res.status()); + + let expected = "[[\"foo_value2\",\"value2\"],[\"foo_value1\",\"value1\"]]"; + + validate_data( + "test_elasticsearch_logs", + &client, + "select foo, bar from elasticsearch_logs_test;", + expected, + ) + .await; + + guard.remove_all().await; +} + async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str()) @@ -1978,7 +2020,10 @@ async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected let resp = res.text().await; let v = get_rows_from_output(&resp); - assert_eq!(v, expected, "validate {test_name} fail"); + assert_eq!( + v, expected, + "validate {test_name} fail, expected: {expected}, actual: {v}" + ); } async fn send_req(