diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs index 050ae5889e..de8244ecec 100644 --- a/src/servers/src/elasticsearch.rs +++ b/src/servers/src/elasticsearch.rs @@ -39,6 +39,7 @@ use crate::http::event::{ extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState, PipelineIngestRequest, }; +use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME; use crate::metrics::{ METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED, }; @@ -132,7 +133,7 @@ async fn do_handle_bulk_api( // The `schema` is already set in the query_ctx in auth process. query_ctx.set_channel(Channel::Elasticsearch); - let db = params.db.unwrap_or_else(|| "public".to_string()); + let db = query_ctx.current_schema(); // Record the ingestion time histogram. let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED @@ -140,11 +141,12 @@ async fn do_handle_bulk_api( .start_timer(); // If pipeline_name is not provided, use the internal pipeline. - let pipeline_name = if let Some(pipeline) = params.pipeline_name { - pipeline - } else { - GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string() - }; + let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| { + headers + .get(GREPTIME_PIPELINE_NAME_HEADER_NAME) + .and_then(|v| v.to_str().ok()) + .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME) + }); // Read the ndjson payload and convert it to a vector of Value. let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) { @@ -164,7 +166,7 @@ async fn do_handle_bulk_api( }; let log_num = requests.len(); - let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) { + let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) { Ok(pipeline) => pipeline, Err(e) => { // should be unreachable