mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor(elasticsearch): use _index as greptimedb table in log ingestion and add /${index}/_bulk API (#5335)
* refactor(elasticsearch): use `_index` as greptimedb table in log ingestion and add `/${index}/_bulk` API
Signed-off-by: zyy17 <zyylsxm@gmail.com>
* refactor: code review
---------
Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use axum::extract::{Query, State};
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::headers::ContentType;
|
||||
use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
|
||||
use axum::response::IntoResponse;
|
||||
@@ -32,7 +32,8 @@ use crate::error::{
|
||||
Result as ServersResult,
|
||||
};
|
||||
use crate::http::event::{
|
||||
ingest_logs_inner, LogIngesterQueryParams, LogState, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
ingest_logs_inner, LogIngestRequest, LogIngesterQueryParams, LogState,
|
||||
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
|
||||
@@ -81,15 +82,39 @@ pub async fn handle_get_license() -> impl IntoResponse {
|
||||
(StatusCode::OK, elasticsearch_headers(), axum::Json(body))
|
||||
}
|
||||
|
||||
// Process `_bulk` API requests. Only support to create logs.
|
||||
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
|
||||
/// Process `_bulk` API requests. Only support to create logs.
|
||||
/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn handle_bulk_api(
|
||||
State(log_state): State<LogState>,
|
||||
Query(params): Query<LogIngesterQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
Extension(query_ctx): Extension<QueryContext>,
|
||||
TypedHeader(_content_type): TypedHeader<ContentType>,
|
||||
payload: String,
|
||||
) -> impl IntoResponse {
|
||||
do_handle_bulk_api(log_state, None, params, query_ctx, payload).await
|
||||
}
|
||||
|
||||
/// Process `/${index}/_bulk` API requests. Only support to create logs.
|
||||
/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn handle_bulk_api_with_index(
|
||||
State(log_state): State<LogState>,
|
||||
Path(index): Path<String>,
|
||||
Query(params): Query<LogIngesterQueryParams>,
|
||||
Extension(query_ctx): Extension<QueryContext>,
|
||||
TypedHeader(_content_type): TypedHeader<ContentType>,
|
||||
payload: String,
|
||||
) -> impl IntoResponse {
|
||||
do_handle_bulk_api(log_state, Some(index), params, query_ctx, payload).await
|
||||
}
|
||||
|
||||
async fn do_handle_bulk_api(
|
||||
log_state: LogState,
|
||||
index: Option<String>,
|
||||
params: LogIngesterQueryParams,
|
||||
mut query_ctx: QueryContext,
|
||||
payload: String,
|
||||
) -> impl IntoResponse {
|
||||
let start = Instant::now();
|
||||
debug!(
|
||||
@@ -107,21 +132,6 @@ pub async fn handle_bulk_api(
|
||||
.with_label_values(&[&db])
|
||||
.start_timer();
|
||||
|
||||
let table = if let Some(table) = params.table {
|
||||
table
|
||||
} else {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
elasticsearch_headers(),
|
||||
axum::Json(write_bulk_response(
|
||||
start.elapsed().as_millis() as i64,
|
||||
0,
|
||||
StatusCode::BAD_REQUEST.as_u16() as u32,
|
||||
"require parameter 'table'",
|
||||
)),
|
||||
);
|
||||
};
|
||||
|
||||
// If pipeline_name is not provided, use the internal pipeline.
|
||||
let pipeline = if let Some(pipeline) = params.pipeline_name {
|
||||
pipeline
|
||||
@@ -130,8 +140,8 @@ pub async fn handle_bulk_api(
|
||||
};
|
||||
|
||||
// Read the ndjson payload and convert it to a vector of Value.
|
||||
let log_values = match convert_es_input_to_log_values(&payload, ¶ms.msg_field) {
|
||||
Ok(log_values) => log_values,
|
||||
let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) {
|
||||
Ok(requests) => requests,
|
||||
Err(e) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
@@ -145,14 +155,13 @@ pub async fn handle_bulk_api(
|
||||
);
|
||||
}
|
||||
};
|
||||
let log_num = log_values.len();
|
||||
let log_num = requests.len();
|
||||
|
||||
if let Err(e) = ingest_logs_inner(
|
||||
log_state.log_handler,
|
||||
pipeline,
|
||||
None,
|
||||
table,
|
||||
log_values,
|
||||
requests,
|
||||
Arc::new(query_ctx),
|
||||
)
|
||||
.await
|
||||
@@ -237,16 +246,18 @@ pub fn elasticsearch_headers() -> HeaderMap {
|
||||
ELASTICSEARCH_HEADERS.clone()
|
||||
}
|
||||
|
||||
// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests.
|
||||
// The input will be Elasticsearch bulk request in NDJSON format.
|
||||
// For example, the input will be like this:
|
||||
// { "index" : { "_index" : "test", "_id" : "1" } }
|
||||
// { "field1" : "value1" }
|
||||
// { "index" : { "_index" : "test", "_id" : "2" } }
|
||||
// { "field2" : "value2" }
|
||||
fn convert_es_input_to_log_values(
|
||||
fn parse_bulk_request(
|
||||
input: &str,
|
||||
index_from_url: &Option<String>,
|
||||
msg_field: &Option<String>,
|
||||
) -> ServersResult<Vec<Value>> {
|
||||
) -> ServersResult<Vec<LogIngestRequest>> {
|
||||
// Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
|
||||
let values: Vec<Value> = Deserializer::from_str(input)
|
||||
.into_iter::<Value>()
|
||||
@@ -261,49 +272,80 @@ fn convert_es_input_to_log_values(
|
||||
}
|
||||
);
|
||||
|
||||
let mut log_values: Vec<Value> = Vec::with_capacity(values.len() / 2);
|
||||
let mut requests: Vec<LogIngestRequest> = Vec::with_capacity(values.len() / 2);
|
||||
let mut values = values.into_iter();
|
||||
|
||||
// Read the ndjson payload and convert it to a (index, value) vector.
|
||||
// For Elasticsearch post `_bulk` API, each chunk contains two objects:
|
||||
// 1. The first object is the command, it should be `create` or `index`. `create` is used for insert, `index` is used for upsert.
|
||||
// 2. The second object is the document data.
|
||||
let mut is_document = false;
|
||||
for v in values {
|
||||
if !is_document {
|
||||
// Read the first object to get the command, it should be `create` or `index`.
|
||||
ensure!(
|
||||
v.get("create").is_some() || v.get("index").is_some(),
|
||||
InvalidElasticsearchInputSnafu {
|
||||
reason: format!(
|
||||
"invalid bulk request, expected 'create' or 'index' but got {:?}",
|
||||
v
|
||||
),
|
||||
}
|
||||
);
|
||||
is_document = true;
|
||||
continue;
|
||||
}
|
||||
// 1. The first object is the command, it should be `create` or `index`.
|
||||
// 2. The second object is the document data.
|
||||
while let Some(mut cmd) = values.next() {
|
||||
// NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command.
|
||||
let index = if let Some(cmd) = cmd.get_mut("create") {
|
||||
get_index_from_cmd(cmd.take())?
|
||||
} else if let Some(cmd) = cmd.get_mut("index") {
|
||||
get_index_from_cmd(cmd.take())?
|
||||
} else {
|
||||
return InvalidElasticsearchInputSnafu {
|
||||
reason: format!(
|
||||
"invalid bulk request, expected 'create' or 'index' but got {:?}",
|
||||
cmd
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
// It means the second object is the document data.
|
||||
if is_document {
|
||||
// Read the second object to get the document data. Stop the loop if there is no document.
|
||||
if let Some(document) = values.next() {
|
||||
// If the msg_field is provided, fetch the value of the field from the document data.
|
||||
let log_value = if let Some(msg_field) = msg_field {
|
||||
get_log_value_from_msg_field(v, msg_field)
|
||||
get_log_value_from_msg_field(document, msg_field)
|
||||
} else {
|
||||
v
|
||||
document
|
||||
};
|
||||
|
||||
log_values.push(log_value);
|
||||
ensure!(
|
||||
index.is_some() || index_from_url.is_some(),
|
||||
InvalidElasticsearchInputSnafu {
|
||||
reason: "missing index in bulk request".to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
// Reset the flag for the next chunk.
|
||||
is_document = false;
|
||||
requests.push(LogIngestRequest {
|
||||
table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
|
||||
values: vec![log_value],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Received log data: {:?}", log_values);
|
||||
debug!(
|
||||
"Received {} log ingest requests: {:?}",
|
||||
requests.len(),
|
||||
requests
|
||||
);
|
||||
|
||||
Ok(log_values)
|
||||
Ok(requests)
|
||||
}
|
||||
|
||||
// Get the index from the command. We will take index as the table name in GreptimeDB.
|
||||
fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
|
||||
if let Some(index) = v.get_mut("_index") {
|
||||
if let Value::String(index) = index.take() {
|
||||
Ok(Some(index))
|
||||
} else {
|
||||
// If the `_index` exists, it should be a string.
|
||||
InvalidElasticsearchInputSnafu {
|
||||
reason: "index is not a string in bulk request".to_string(),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
// If the msg_field is provided, fetch the value of the field from the document data.
|
||||
// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello").
|
||||
fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
|
||||
if let Some(message) = v.get_mut(msg_field) {
|
||||
let message = message.take();
|
||||
@@ -327,7 +369,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_convert_es_input_to_log_values() {
|
||||
fn test_parse_bulk_request() {
|
||||
let test_cases = vec![
|
||||
// Normal case.
|
||||
(
|
||||
@@ -338,9 +380,76 @@ mod tests {
|
||||
{"foo2":"foo2_value","bar2":"bar2_value"}
|
||||
"#,
|
||||
None,
|
||||
None,
|
||||
Ok(vec![
|
||||
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
|
||||
json!({"foo2": "foo2_value", "bar2": "bar2_value"}),
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![
|
||||
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
|
||||
],
|
||||
},
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
|
||||
},
|
||||
]),
|
||||
),
|
||||
// Case with index.
|
||||
(
|
||||
r#"
|
||||
{"create":{"_index":"test","_id":"1"}}
|
||||
{"foo1":"foo1_value", "bar1":"bar1_value"}
|
||||
{"create":{"_index":"logs","_id":"2"}}
|
||||
{"foo2":"foo2_value","bar2":"bar2_value"}
|
||||
"#,
|
||||
Some("logs".to_string()),
|
||||
None,
|
||||
Ok(vec![
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
|
||||
},
|
||||
LogIngestRequest {
|
||||
table: "logs".to_string(),
|
||||
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
|
||||
},
|
||||
]),
|
||||
),
|
||||
// Case with index.
|
||||
(
|
||||
r#"
|
||||
{"create":{"_index":"test","_id":"1"}}
|
||||
{"foo1":"foo1_value", "bar1":"bar1_value"}
|
||||
{"create":{"_index":"logs","_id":"2"}}
|
||||
{"foo2":"foo2_value","bar2":"bar2_value"}
|
||||
"#,
|
||||
Some("logs".to_string()),
|
||||
None,
|
||||
Ok(vec![
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
|
||||
},
|
||||
LogIngestRequest {
|
||||
table: "logs".to_string(),
|
||||
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
|
||||
},
|
||||
]),
|
||||
),
|
||||
// Case with incomplete bulk request.
|
||||
(
|
||||
r#"
|
||||
{"create":{"_index":"test","_id":"1"}}
|
||||
{"foo1":"foo1_value", "bar1":"bar1_value"}
|
||||
{"create":{"_index":"logs","_id":"2"}}
|
||||
"#,
|
||||
Some("logs".to_string()),
|
||||
None,
|
||||
Ok(vec![
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
|
||||
},
|
||||
]),
|
||||
),
|
||||
// Specify the `data` field as the message field and the value is a JSON string.
|
||||
@@ -351,10 +460,17 @@ mod tests {
|
||||
{"create":{"_index":"test","_id":"2"}}
|
||||
{"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
|
||||
"#,
|
||||
None,
|
||||
Some("data".to_string()),
|
||||
Ok(vec![
|
||||
json!({"foo1": "foo1_value", "bar1": "bar1_value"}),
|
||||
json!({"foo2": "foo2_value", "bar2": "bar2_value"}),
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![json!({"foo1": "foo1_value", "bar1": "bar1_value"})],
|
||||
},
|
||||
LogIngestRequest {
|
||||
table: "test".to_string(),
|
||||
values: vec![json!({"foo2": "foo2_value", "bar2": "bar2_value"})],
|
||||
},
|
||||
]),
|
||||
),
|
||||
// Simulate the log data from Logstash.
|
||||
@@ -365,10 +481,21 @@ mod tests {
|
||||
{"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
|
||||
{"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
|
||||
"#,
|
||||
None,
|
||||
Some("message".to_string()),
|
||||
Ok(vec![
|
||||
json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""),
|
||||
json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""),
|
||||
LogIngestRequest {
|
||||
table: "logs-generic-default".to_string(),
|
||||
values: vec![
|
||||
json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""),
|
||||
],
|
||||
},
|
||||
LogIngestRequest {
|
||||
table: "logs-generic-default".to_string(),
|
||||
values: vec![
|
||||
json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""),
|
||||
],
|
||||
},
|
||||
]),
|
||||
),
|
||||
// With invalid bulk request.
|
||||
@@ -378,18 +505,19 @@ mod tests {
|
||||
{ "foo1" : "foo1_value", "bar1" : "bar1_value" }
|
||||
"#,
|
||||
None,
|
||||
None,
|
||||
Err(InvalidElasticsearchInputSnafu {
|
||||
reason: "it's a invalid bulk request".to_string(),
|
||||
}),
|
||||
),
|
||||
];
|
||||
|
||||
for (input, msg_field, expected) in test_cases {
|
||||
let log_values = convert_es_input_to_log_values(input, &msg_field);
|
||||
for (input, index, msg_field, expected) in test_cases {
|
||||
let requests = parse_bulk_request(input, &index, &msg_field);
|
||||
if expected.is_ok() {
|
||||
assert_eq!(log_values.unwrap(), expected.unwrap());
|
||||
assert_eq!(requests.unwrap(), expected.unwrap());
|
||||
} else {
|
||||
assert!(log_values.is_err());
|
||||
assert!(requests.is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -769,6 +769,10 @@ impl HttpServer {
|
||||
// Return fake response for Elasticsearch license request.
|
||||
.route("/_license", routing::get(elasticsearch::handle_get_license))
|
||||
.route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
|
||||
.route(
|
||||
"/:index/_bulk",
|
||||
routing::post(elasticsearch::handle_bulk_api_with_index),
|
||||
)
|
||||
// Return fake response for Elasticsearch ilm request.
|
||||
.route(
|
||||
"/_ilm/policy/*path",
|
||||
|
||||
@@ -84,6 +84,16 @@ pub struct LogIngesterQueryParams {
|
||||
pub msg_field: Option<String>,
|
||||
}
|
||||
|
||||
/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
|
||||
/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub(crate) struct LogIngestRequest {
|
||||
/// The table where the log data will be written to.
|
||||
pub table: String,
|
||||
/// The log data to be ingested.
|
||||
pub values: Vec<Value>,
|
||||
}
|
||||
|
||||
pub struct PipelineContent(String);
|
||||
|
||||
#[async_trait]
|
||||
@@ -513,8 +523,10 @@ pub async fn log_ingester(
|
||||
handler,
|
||||
pipeline_name,
|
||||
version,
|
||||
table_name,
|
||||
value,
|
||||
vec![LogIngestRequest {
|
||||
table: table_name,
|
||||
values: value,
|
||||
}],
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
@@ -543,74 +555,78 @@ pub(crate) async fn ingest_logs_inner(
|
||||
state: PipelineHandlerRef,
|
||||
pipeline_name: String,
|
||||
version: PipelineVersion,
|
||||
table_name: String,
|
||||
pipeline_data: Vec<Value>,
|
||||
log_ingest_requests: Vec<LogIngestRequest>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<HttpResponse> {
|
||||
let db = query_ctx.get_db_string();
|
||||
let exec_timer = std::time::Instant::now();
|
||||
|
||||
let mut results = Vec::with_capacity(pipeline_data.len());
|
||||
let transformed_data: Rows;
|
||||
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
|
||||
let table = state
|
||||
.get_table(&table_name, &query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
let rows = pipeline::identity_pipeline(pipeline_data, table)
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
let mut insert_requests = Vec::with_capacity(log_ingest_requests.len());
|
||||
|
||||
transformed_data = rows
|
||||
} else {
|
||||
let pipeline = state
|
||||
.get_pipeline(&pipeline_name, version, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let transform_timer = std::time::Instant::now();
|
||||
let mut intermediate_state = pipeline.init_intermediate_state();
|
||||
|
||||
for v in pipeline_data {
|
||||
pipeline
|
||||
.prepare(v, &mut intermediate_state)
|
||||
.inspect_err(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
})
|
||||
for request in log_ingest_requests {
|
||||
let transformed_data: Rows = if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
|
||||
let table = state
|
||||
.get_table(&request.table, &query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
pipeline::identity_pipeline(request.values, table)
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
let r = pipeline
|
||||
.exec_mut(&mut intermediate_state)
|
||||
.inspect_err(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
})
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
results.push(r);
|
||||
pipeline.reset_intermediate_state(&mut intermediate_state);
|
||||
}
|
||||
.context(PipelineSnafu)?
|
||||
} else {
|
||||
let pipeline = state
|
||||
.get_pipeline(&pipeline_name, version, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
let transform_timer = std::time::Instant::now();
|
||||
let mut intermediate_state = pipeline.init_intermediate_state();
|
||||
let mut results = Vec::with_capacity(request.values.len());
|
||||
for v in request.values {
|
||||
pipeline
|
||||
.prepare(v, &mut intermediate_state)
|
||||
.inspect_err(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
})
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
let r = pipeline
|
||||
.exec_mut(&mut intermediate_state)
|
||||
.inspect_err(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
})
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
results.push(r);
|
||||
pipeline.reset_intermediate_state(&mut intermediate_state);
|
||||
}
|
||||
|
||||
transformed_data = Rows {
|
||||
rows: results,
|
||||
schema: pipeline.schemas().clone(),
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
|
||||
Rows {
|
||||
rows: results,
|
||||
schema: pipeline.schemas().clone(),
|
||||
}
|
||||
};
|
||||
|
||||
insert_requests.push(RowInsertRequest {
|
||||
rows: Some(transformed_data),
|
||||
table_name: request.table.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
let insert_request = RowInsertRequest {
|
||||
rows: Some(transformed_data),
|
||||
table_name: table_name.clone(),
|
||||
};
|
||||
let insert_requests = RowInsertRequests {
|
||||
inserts: vec![insert_request],
|
||||
};
|
||||
let output = state.insert(insert_requests, query_ctx).await;
|
||||
let output = state
|
||||
.insert(
|
||||
RowInsertRequests {
|
||||
inserts: insert_requests,
|
||||
},
|
||||
query_ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Ok(Output {
|
||||
data: OutputData::AffectedRows(rows),
|
||||
|
||||
@@ -98,6 +98,7 @@ macro_rules! http_tests {
|
||||
test_loki_pb_logs,
|
||||
test_loki_json_logs,
|
||||
test_elasticsearch_logs,
|
||||
test_elasticsearch_logs_with_index,
|
||||
);
|
||||
)*
|
||||
};
|
||||
@@ -1990,7 +1991,7 @@ pub async fn test_elasticsearch_logs(store_type: StorageType) {
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/json"),
|
||||
)],
|
||||
"/v1/elasticsearch/_bulk?table=elasticsearch_logs_test",
|
||||
"/v1/elasticsearch/_bulk",
|
||||
body.as_bytes().to_vec(),
|
||||
false,
|
||||
)
|
||||
@@ -1998,12 +1999,64 @@ pub async fn test_elasticsearch_logs(store_type: StorageType) {
|
||||
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
let expected = "[[\"foo_value2\",\"value2\"],[\"foo_value1\",\"value1\"]]";
|
||||
let expected = "[[\"foo_value1\",\"value1\"],[\"foo_value2\",\"value2\"]]";
|
||||
|
||||
validate_data(
|
||||
"test_elasticsearch_logs",
|
||||
&client,
|
||||
"select foo, bar from elasticsearch_logs_test;",
|
||||
"select foo, bar from test;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_elasticsearch_logs_with_index(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_elasticsearch_logs_with_index").await;
|
||||
|
||||
let client = TestClient::new(app);
|
||||
|
||||
// It will write to test_index1 and test_index2(specified in the path).
|
||||
let body = r#"
|
||||
{"create":{"_index":"test_index1","_id":"1"}}
|
||||
{"foo":"foo_value1", "bar":"value1"}
|
||||
{"create":{"_id":"2"}}
|
||||
{"foo":"foo_value2","bar":"value2"}
|
||||
"#;
|
||||
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/json"),
|
||||
)],
|
||||
"/v1/elasticsearch/test_index2/_bulk",
|
||||
body.as_bytes().to_vec(),
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// test content of test_index1
|
||||
let expected = "[[\"foo_value1\",\"value1\"]]";
|
||||
validate_data(
|
||||
"test_elasticsearch_logs_with_index",
|
||||
&client,
|
||||
"select foo, bar from test_index1;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// test content of test_index2
|
||||
let expected = "[[\"foo_value2\",\"value2\"]]";
|
||||
validate_data(
|
||||
"test_elasticsearch_logs_with_index_2",
|
||||
&client,
|
||||
"select foo, bar from test_index2;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
Reference in New Issue
Block a user