mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat: support elasticsearch _bulk API to ingest logs (#5261)
* feat: support elasticsearch '_bulk' API to ingest logs Signed-off-by: zyy17 <zyylsxm@gmail.com> * refactor: code review * refactor: add metrics --------- Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -3997,9 +3997,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.1.3"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
|
||||
checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
|
||||
396
src/servers/src/elasticsearch.rs
Normal file
396
src/servers/src/elasticsearch.rs
Normal file
@@ -0,0 +1,396 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use axum::extract::{Query, State};
|
||||
use axum::headers::ContentType;
|
||||
use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::{Extension, TypedHeader};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_telemetry::{debug, error};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde_json::{json, Deserializer, Value};
|
||||
use session::context::{Channel, QueryContext};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu,
|
||||
Result as ServersResult,
|
||||
};
|
||||
use crate::http::event::{
|
||||
ingest_logs_inner, LogIngesterQueryParams, LogState, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
|
||||
};
|
||||
|
||||
// The headers for every response of Elasticsearch API.
|
||||
static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
|
||||
HeaderMap::from_iter([
|
||||
(
|
||||
axum::http::header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/json"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-elastic-product"),
|
||||
HeaderValue::from_static("Elasticsearch"),
|
||||
),
|
||||
])
|
||||
});
|
||||
|
||||
// The fake version of Elasticsearch and used for `_version` API.
|
||||
const ELASTICSEARCH_VERSION: &str = "8.16.0";
|
||||
|
||||
// Return fake response for Elasticsearch ping request.
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn handle_get_version() -> impl IntoResponse {
|
||||
let body = serde_json::json!({
|
||||
"version": {
|
||||
"number": ELASTICSEARCH_VERSION
|
||||
}
|
||||
});
|
||||
(StatusCode::OK, elasticsearch_headers(), axum::Json(body))
|
||||
}
|
||||
|
||||
// Return fake response for Elasticsearch license request.
|
||||
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html.
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn handle_get_license() -> impl IntoResponse {
|
||||
let body = serde_json::json!({
|
||||
"license": {
|
||||
"uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
|
||||
"type": "oss",
|
||||
"status": "active",
|
||||
"expiry_date_in_millis": 4891198687000_i64,
|
||||
}
|
||||
});
|
||||
(StatusCode::OK, elasticsearch_headers(), axum::Json(body))
|
||||
}
|
||||
|
||||
// Process `_bulk` API requests. Only support to create logs.
|
||||
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn handle_bulk_api(
|
||||
State(log_state): State<LogState>,
|
||||
Query(params): Query<LogIngesterQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TypedHeader(_content_type): TypedHeader<ContentType>,
|
||||
payload: String,
|
||||
) -> impl IntoResponse {
|
||||
let start = Instant::now();
|
||||
debug!(
|
||||
"Received bulk request, params: {:?}, payload: {:?}",
|
||||
params, payload
|
||||
);
|
||||
|
||||
// The `schema` is already set in the query_ctx in auth process.
|
||||
query_ctx.set_channel(Channel::Elasticsearch);
|
||||
|
||||
let db = params.db.unwrap_or_else(|| "public".to_string());
|
||||
|
||||
// Record the ingestion time histogram.
|
||||
let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
|
||||
.with_label_values(&[&db])
|
||||
.start_timer();
|
||||
|
||||
let table = if let Some(table) = params.table {
|
||||
table
|
||||
} else {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
elasticsearch_headers(),
|
||||
axum::Json(write_bulk_response(
|
||||
start.elapsed().as_millis() as i64,
|
||||
0,
|
||||
StatusCode::BAD_REQUEST.as_u16() as u32,
|
||||
"require parameter 'table'",
|
||||
)),
|
||||
);
|
||||
};
|
||||
|
||||
// If pipeline_name is not provided, use the internal pipeline.
|
||||
let pipeline = if let Some(pipeline) = params.pipeline_name {
|
||||
pipeline
|
||||
} else {
|
||||
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
|
||||
};
|
||||
|
||||
// Read the ndjson payload and convert it to a vector of Value.
|
||||
let log_values = match convert_es_input_to_log_values(&payload, ¶ms.msg_field) {
|
||||
Ok(log_values) => log_values,
|
||||
Err(e) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
elasticsearch_headers(),
|
||||
axum::Json(write_bulk_response(
|
||||
start.elapsed().as_millis() as i64,
|
||||
0,
|
||||
StatusCode::BAD_REQUEST.as_u16() as u32,
|
||||
e.to_string().as_str(),
|
||||
)),
|
||||
);
|
||||
}
|
||||
};
|
||||
let log_num = log_values.len();
|
||||
|
||||
if let Err(e) = ingest_logs_inner(
|
||||
log_state.log_handler,
|
||||
pipeline,
|
||||
None,
|
||||
table,
|
||||
log_values,
|
||||
Arc::new(query_ctx),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to ingest logs");
|
||||
return (
|
||||
status_code_to_http_status(&e.status_code()),
|
||||
elasticsearch_headers(),
|
||||
axum::Json(write_bulk_response(
|
||||
start.elapsed().as_millis() as i64,
|
||||
0,
|
||||
e.status_code() as u32,
|
||||
e.to_string().as_str(),
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// Record the number of documents ingested.
|
||||
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
|
||||
.with_label_values(&[&db])
|
||||
.inc_by(log_num as u64);
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
elasticsearch_headers(),
|
||||
axum::Json(write_bulk_response(
|
||||
start.elapsed().as_millis() as i64,
|
||||
log_num,
|
||||
StatusCode::CREATED.as_u16() as u32,
|
||||
"",
|
||||
)),
|
||||
)
|
||||
}
|
||||
|
||||
// It will generate the following response when write _bulk request to GreptimeDB successfully:
|
||||
// {
|
||||
// "took": 1000,
|
||||
// "errors": false,
|
||||
// "items": [
|
||||
// { "create": { "status": 201 } },
|
||||
// { "create": { "status": 201 } },
|
||||
// ...
|
||||
// ]
|
||||
// }
|
||||
// If the status code is not 201, it will generate the following response:
|
||||
// {
|
||||
// "took": 1000,
|
||||
// "errors": true,
|
||||
// "items": [
|
||||
// { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "<error_reason>" } } }
|
||||
// ]
|
||||
// }
|
||||
fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
|
||||
if error_reason.is_empty() {
|
||||
let items: Vec<Value> = (0..n)
|
||||
.map(|_| {
|
||||
json!({
|
||||
"create": {
|
||||
"status": status_code
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
json!({
|
||||
"took": took_ms,
|
||||
"errors": false,
|
||||
"items": items,
|
||||
})
|
||||
} else {
|
||||
json!({
|
||||
"took": took_ms,
|
||||
"errors": true,
|
||||
"items": [
|
||||
{ "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
|
||||
]
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the headers for every response of Elasticsearch API.
|
||||
pub fn elasticsearch_headers() -> HeaderMap {
|
||||
ELASTICSEARCH_HEADERS.clone()
|
||||
}
|
||||
|
||||
// The input will be Elasticsearch bulk request in NDJSON format.
|
||||
// For example, the input will be like this:
|
||||
// { "index" : { "_index" : "test", "_id" : "1" } }
|
||||
// { "field1" : "value1" }
|
||||
// { "index" : { "_index" : "test", "_id" : "2" } }
|
||||
// { "field2" : "value2" }
|
||||
fn convert_es_input_to_log_values(
|
||||
input: &str,
|
||||
msg_field: &Option<String>,
|
||||
) -> ServersResult<Vec<Value>> {
|
||||
// Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
|
||||
let values: Vec<Value> = Deserializer::from_str(input)
|
||||
.into_iter::<Value>()
|
||||
.collect::<Result<_, _>>()
|
||||
.context(ParseJsonSnafu)?;
|
||||
|
||||
// Check if the input is empty.
|
||||
ensure!(
|
||||
!values.is_empty(),
|
||||
InvalidElasticsearchInputSnafu {
|
||||
reason: "empty bulk request".to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let mut log_values: Vec<Value> = Vec::with_capacity(values.len() / 2);
|
||||
|
||||
// For Elasticsearch post `_bulk` API, each chunk contains two objects:
|
||||
// 1. The first object is the command, it should be `create` or `index`. `create` is used for insert, `index` is used for upsert.
|
||||
// 2. The second object is the document data.
|
||||
let mut is_document = false;
|
||||
for v in values {
|
||||
if !is_document {
|
||||
// Read the first object to get the command, it should be `create` or `index`.
|
||||
ensure!(
|
||||
v.get("create").is_some() || v.get("index").is_some(),
|
||||
InvalidElasticsearchInputSnafu {
|
||||
reason: format!(
|
||||
"invalid bulk request, expected 'create' or 'index' but got {:?}",
|
||||
v
|
||||
),
|
||||
}
|
||||
);
|
||||
is_document = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// It means the second object is the document data.
|
||||
if is_document {
|
||||
// If the msg_field is provided, fetch the value of the field from the document data.
|
||||
let log_value = if let Some(msg_field) = msg_field {
|
||||
get_log_value_from_msg_field(v, msg_field)
|
||||
} else {
|
||||
v
|
||||
};
|
||||
|
||||
log_values.push(log_value);
|
||||
|
||||
// Reset the flag for the next chunk.
|
||||
is_document = false;
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Received log data: {:?}", log_values);
|
||||
|
||||
Ok(log_values)
|
||||
}
|
||||
|
||||
fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
|
||||
if let Some(message) = v.get_mut(msg_field) {
|
||||
let message = message.take();
|
||||
match message {
|
||||
Value::String(s) => match serde_json::from_str::<Value>(&s) {
|
||||
Ok(s) => s,
|
||||
// If the message is not a valid JSON, just use the original message as the log value.
|
||||
Err(_) => Value::String(s),
|
||||
},
|
||||
// If the message is not a string, just use the original message as the log value.
|
||||
_ => message,
|
||||
}
|
||||
} else {
|
||||
// If the msg_field is not found, just use the original message as the log value.
|
||||
v
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_convert_es_input_to_log_values() {
|
||||
let test_cases = vec![
|
||||
// Normal case.
|
||||
(
|
||||
r#"
|
||||
{"create":{"_index":"test","_id":"1"}}
|
||||
{"foo1":"foo1_value", "bar1":"bar1_value"}
|
||||
{"create":{"_index":"test","_id":"2"}}
|
||||
{"foo2":"foo2_value","bar2":"bar2_value"}
|
||||
"#,
|
||||
None,
|
||||
Ok(vec![
|
||||
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
|
||||
json!({"foo2": "foo2_value", "bar2": "bar2_value"}),
|
||||
]),
|
||||
),
|
||||
// Specify the `data` field as the message field and the value is a JSON string.
|
||||
(
|
||||
r#"
|
||||
{"create":{"_index":"test","_id":"1"}}
|
||||
{"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
|
||||
{"create":{"_index":"test","_id":"2"}}
|
||||
{"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
|
||||
"#,
|
||||
Some("data".to_string()),
|
||||
Ok(vec![
|
||||
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
|
||||
json!({"foo2": "foo2_value", "bar2": "bar2_value"}),
|
||||
]),
|
||||
),
|
||||
// Simulate the log data from Logstash.
|
||||
(
|
||||
r#"
|
||||
{"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
|
||||
{"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
|
||||
{"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
|
||||
{"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
|
||||
"#,
|
||||
Some("message".to_string()),
|
||||
Ok(vec![
|
||||
json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""),
|
||||
json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""),
|
||||
]),
|
||||
),
|
||||
// With invalid bulk request.
|
||||
(
|
||||
r#"
|
||||
{ "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
|
||||
{ "foo1" : "foo1_value", "bar1" : "bar1_value" }
|
||||
"#,
|
||||
None,
|
||||
Err(InvalidElasticsearchInputSnafu {
|
||||
reason: "it's a invalid bulk request".to_string(),
|
||||
}),
|
||||
),
|
||||
];
|
||||
|
||||
for (input, msg_field, expected) in test_cases {
|
||||
let log_values = convert_es_input_to_log_values(input, &msg_field);
|
||||
if expected.is_ok() {
|
||||
assert_eq!(log_values.unwrap(), expected.unwrap());
|
||||
} else {
|
||||
assert!(log_values.is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -586,6 +586,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid elasticsearch input, reason: {}", reason))]
|
||||
InvalidElasticsearchInput {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -655,7 +662,8 @@ impl ErrorExt for Error {
|
||||
| UnsupportedJsonDataTypeForTag { .. }
|
||||
| InvalidTableName { .. }
|
||||
| PrepareStatementNotFound { .. }
|
||||
| FailedToParseQuery { .. } => StatusCode::InvalidArguments,
|
||||
| FailedToParseQuery { .. }
|
||||
| InvalidElasticsearchInput { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
RowWriter { source, .. } => source.status_code(),
|
||||
|
||||
@@ -22,6 +22,7 @@ use async_trait::async_trait;
|
||||
use auth::UserProviderRef;
|
||||
use axum::error_handling::HandleErrorLayer;
|
||||
use axum::extract::DefaultBodyLimit;
|
||||
use axum::http::StatusCode as HttpStatusCode;
|
||||
use axum::response::{IntoResponse, Json, Response};
|
||||
use axum::{middleware, routing, BoxError, Router};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
@@ -48,6 +49,7 @@ use tower_http::trace::TraceLayer;
|
||||
use self::authorize::AuthState;
|
||||
use self::result::table_result::TableResponse;
|
||||
use crate::configurator::ConfiguratorRef;
|
||||
use crate::elasticsearch;
|
||||
use crate::error::{AddressBindSnafu, AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu};
|
||||
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
|
||||
use crate::http::prometheus::{
|
||||
@@ -597,7 +599,19 @@ impl HttpServerBuilder {
|
||||
|
||||
let router = router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/loki"),
|
||||
HttpServer::route_loki(log_state),
|
||||
HttpServer::route_loki(log_state.clone()),
|
||||
);
|
||||
|
||||
let router = router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/elasticsearch"),
|
||||
HttpServer::route_elasticsearch(log_state.clone()),
|
||||
);
|
||||
|
||||
let router = router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/elasticsearch/"),
|
||||
Router::new()
|
||||
.route("/", routing::get(elasticsearch::handle_get_version))
|
||||
.with_state(log_state),
|
||||
);
|
||||
|
||||
Self { router, ..self }
|
||||
@@ -743,6 +757,82 @@ impl HttpServer {
|
||||
.with_state(log_state)
|
||||
}
|
||||
|
||||
fn route_elasticsearch<S>(log_state: LogState) -> Router<S> {
|
||||
Router::new()
|
||||
// Return fake responsefor HEAD '/' request.
|
||||
.route(
|
||||
"/",
|
||||
routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())),
|
||||
)
|
||||
// Return fake response for Elasticsearch version request.
|
||||
.route("/", routing::get(elasticsearch::handle_get_version))
|
||||
// Return fake response for Elasticsearch license request.
|
||||
.route("/_license", routing::get(elasticsearch::handle_get_license))
|
||||
.route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
|
||||
// Return fake response for Elasticsearch ilm request.
|
||||
.route(
|
||||
"/_ilm/policy/*path",
|
||||
routing::any((
|
||||
HttpStatusCode::OK,
|
||||
elasticsearch::elasticsearch_headers(),
|
||||
axum::Json(serde_json::json!({})),
|
||||
)),
|
||||
)
|
||||
// Return fake response for Elasticsearch index template request.
|
||||
.route(
|
||||
"/_index_template/*path",
|
||||
routing::any((
|
||||
HttpStatusCode::OK,
|
||||
elasticsearch::elasticsearch_headers(),
|
||||
axum::Json(serde_json::json!({})),
|
||||
)),
|
||||
)
|
||||
// Return fake response for Elasticsearch ingest pipeline request.
|
||||
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html.
|
||||
.route(
|
||||
"/_ingest/*path",
|
||||
routing::any((
|
||||
HttpStatusCode::OK,
|
||||
elasticsearch::elasticsearch_headers(),
|
||||
axum::Json(serde_json::json!({})),
|
||||
)),
|
||||
)
|
||||
// Return fake response for Elasticsearch nodes discovery request.
|
||||
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html.
|
||||
.route(
|
||||
"/_nodes/*path",
|
||||
routing::any((
|
||||
HttpStatusCode::OK,
|
||||
elasticsearch::elasticsearch_headers(),
|
||||
axum::Json(serde_json::json!({})),
|
||||
)),
|
||||
)
|
||||
// Return fake response for Logstash APIs requests.
|
||||
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/logstash-apis.html
|
||||
.route(
|
||||
"/logstash/*path",
|
||||
routing::any((
|
||||
HttpStatusCode::OK,
|
||||
elasticsearch::elasticsearch_headers(),
|
||||
axum::Json(serde_json::json!({})),
|
||||
)),
|
||||
)
|
||||
.route(
|
||||
"/_logstash/*path",
|
||||
routing::any((
|
||||
HttpStatusCode::OK,
|
||||
elasticsearch::elasticsearch_headers(),
|
||||
axum::Json(serde_json::json!({})),
|
||||
)),
|
||||
)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(HandleErrorLayer::new(handle_error))
|
||||
.layer(RequestDecompressionLayer::new()),
|
||||
)
|
||||
.with_state(log_state)
|
||||
}
|
||||
|
||||
fn route_log<S>(log_state: LogState) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/logs", routing::post(event::log_ingester))
|
||||
|
||||
@@ -53,8 +53,8 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
|
||||
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
|
||||
const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
|
||||
const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
|
||||
|
||||
lazy_static! {
|
||||
pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
|
||||
@@ -64,15 +64,24 @@ lazy_static! {
|
||||
ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
|
||||
}
|
||||
|
||||
/// LogIngesterQueryParams is used for query params of log ingester API.
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub struct LogIngesterQueryParams {
|
||||
pub table: Option<String>,
|
||||
/// The database where log data will be written to.
|
||||
pub db: Option<String>,
|
||||
/// The table where log data will be written to.
|
||||
pub table: Option<String>,
|
||||
/// The pipeline that will be used for log ingestion.
|
||||
pub pipeline_name: Option<String>,
|
||||
pub ignore_errors: Option<bool>,
|
||||
|
||||
/// The version of the pipeline to be used for log ingestion.
|
||||
pub version: Option<String>,
|
||||
/// Whether to ignore errors during log ingestion.
|
||||
pub ignore_errors: Option<bool>,
|
||||
/// The source of the log data.
|
||||
pub source: Option<String>,
|
||||
/// The JSON field name of the log message. If not provided, it will take the whole log as the message.
|
||||
/// The field must be at the top level of the JSON structure.
|
||||
pub msg_field: Option<String>,
|
||||
}
|
||||
|
||||
pub struct PipelineContent(String);
|
||||
@@ -530,7 +539,7 @@ fn extract_pipeline_value_by_content_type(
|
||||
})
|
||||
}
|
||||
|
||||
async fn ingest_logs_inner(
|
||||
pub(crate) async fn ingest_logs_inner(
|
||||
state: PipelineHandlerRef,
|
||||
pipeline_name: String,
|
||||
version: PipelineVersion,
|
||||
|
||||
@@ -23,6 +23,7 @@ use datatypes::schema::Schema;
|
||||
|
||||
pub mod addrs;
|
||||
pub mod configurator;
|
||||
pub(crate) mod elasticsearch;
|
||||
pub mod error;
|
||||
pub mod export_metrics;
|
||||
pub mod grpc;
|
||||
|
||||
@@ -182,6 +182,20 @@ lazy_static! {
|
||||
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_servers_elasticsearch_logs_ingestion_elapsed",
|
||||
"servers elasticsearch logs ingestion elapsed",
|
||||
&[METRIC_DB_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_servers_elasticsearch_logs_docs_count",
|
||||
"servers elasticsearch logs docs count",
|
||||
&[METRIC_DB_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_servers_http_logs_transform_elapsed",
|
||||
|
||||
@@ -398,6 +398,7 @@ pub enum Channel {
|
||||
Influx = 7,
|
||||
Opentsdb = 8,
|
||||
Loki = 9,
|
||||
Elasticsearch = 10,
|
||||
}
|
||||
|
||||
impl From<u32> for Channel {
|
||||
@@ -412,7 +413,7 @@ impl From<u32> for Channel {
|
||||
7 => Self::Influx,
|
||||
8 => Self::Opentsdb,
|
||||
9 => Self::Loki,
|
||||
|
||||
10 => Self::Elasticsearch,
|
||||
_ => Self::Unknown,
|
||||
}
|
||||
}
|
||||
@@ -440,6 +441,7 @@ impl Display for Channel {
|
||||
Channel::Influx => write!(f, "influx"),
|
||||
Channel::Opentsdb => write!(f, "opentsdb"),
|
||||
Channel::Loki => write!(f, "loki"),
|
||||
Channel::Elasticsearch => write!(f, "elasticsearch"),
|
||||
Channel::Unknown => write!(f, "unknown"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ macro_rules! http_tests {
|
||||
test_otlp_logs,
|
||||
test_loki_pb_logs,
|
||||
test_loki_json_logs,
|
||||
test_elasticsearch_logs,
|
||||
);
|
||||
)*
|
||||
};
|
||||
@@ -1969,6 +1970,47 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_elasticsearch_logs(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_elasticsearch_logs").await;
|
||||
|
||||
let client = TestClient::new(app);
|
||||
|
||||
let body = r#"
|
||||
{"create":{"_index":"test","_id":"1"}}
|
||||
{"foo":"foo_value1", "bar":"value1"}
|
||||
{"create":{"_index":"test","_id":"2"}}
|
||||
{"foo":"foo_value2","bar":"value2"}
|
||||
"#;
|
||||
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/json"),
|
||||
)],
|
||||
"/v1/elasticsearch/_bulk?table=elasticsearch_logs_test",
|
||||
body.as_bytes().to_vec(),
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
let expected = "[[\"foo_value2\",\"value2\"],[\"foo_value1\",\"value1\"]]";
|
||||
|
||||
validate_data(
|
||||
"test_elasticsearch_logs",
|
||||
&client,
|
||||
"select foo, bar from elasticsearch_logs_test;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
|
||||
let res = client
|
||||
.get(format!("/v1/sql?sql={sql}").as_str())
|
||||
@@ -1978,7 +2020,10 @@ async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected
|
||||
let resp = res.text().await;
|
||||
let v = get_rows_from_output(&resp);
|
||||
|
||||
assert_eq!(v, expected, "validate {test_name} fail");
|
||||
assert_eq!(
|
||||
v, expected,
|
||||
"validate {test_name} fail, expected: {expected}, actual: {v}"
|
||||
);
|
||||
}
|
||||
|
||||
async fn send_req(
|
||||
|
||||
Reference in New Issue
Block a user