refactor: pass pipeline name through http header and get db from query context (#6405)

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-06-27 18:43:37 +08:00
committed by GitHub
parent 6684200fce
commit 753a7e1a24

View File

@@ -39,6 +39,7 @@ use crate::http::event::{
extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState,
PipelineIngestRequest,
};
use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
use crate::metrics::{
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
};
@@ -132,7 +133,7 @@ async fn do_handle_bulk_api(
// The `schema` is already set in the query_ctx in auth process.
query_ctx.set_channel(Channel::Elasticsearch);
let db = params.db.unwrap_or_else(|| "public".to_string());
let db = query_ctx.current_schema();
// Record the ingestion time histogram.
let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
@@ -140,11 +141,12 @@ async fn do_handle_bulk_api(
.start_timer();
// If pipeline_name is not provided, use the internal pipeline.
let pipeline_name = if let Some(pipeline) = params.pipeline_name {
pipeline
} else {
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
};
let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
headers
.get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
.and_then(|v| v.to_str().ok())
.unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
});
// Read the ndjson payload and convert it to a vector of Value.
let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
@@ -164,7 +166,7 @@ async fn do_handle_bulk_api(
};
let log_num = requests.len();
let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) {
let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
Ok(pipeline) => pipeline,
Err(e) => {
// should be unreachable