Compare commits

...

1 Commits

Author SHA1 Message Date
shuiyisong
557e850d87 chore: use Bytes instead of string in bulk ingestion (#5717)
chore: use bytes instead of string in bulk log ingestion
2025-03-18 04:59:02 +08:00

View File

@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::io::BufRead;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use api::v1::RowInsertRequests; use api::v1::RowInsertRequests;
use async_trait::async_trait; use async_trait::async_trait;
use axum::body::Bytes;
use axum::extract::{FromRequest, Multipart, Path, Query, Request, State}; use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
use axum::http::header::CONTENT_TYPE; use axum::http::header::CONTENT_TYPE;
use axum::http::{HeaderMap, StatusCode}; use axum::http::{HeaderMap, StatusCode};
@@ -389,8 +391,8 @@ pub struct PipelineDryrunParams {
/// Check if the payload is valid json /// Check if the payload is valid json
/// Check if the payload contains pipeline or pipeline_name and data /// Check if the payload contains pipeline or pipeline_name and data
/// Return Some if valid, None if invalid /// Return Some if valid, None if invalid
fn check_pipeline_dryrun_params_valid(payload: &str) -> Option<PipelineDryrunParams> { fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
match serde_json::from_str::<PipelineDryrunParams>(payload) { match serde_json::from_slice::<PipelineDryrunParams>(payload) {
// payload with pipeline or pipeline_name and data is array // payload with pipeline or pipeline_name and data is array
Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params), Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
// because of the pipeline_name or pipeline is required // because of the pipeline_name or pipeline is required
@@ -432,7 +434,7 @@ pub async fn pipeline_dryrun(
Query(query_params): Query<LogIngesterQueryParams>, Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>, Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>, TypedHeader(content_type): TypedHeader<ContentType>,
payload: String, payload: Bytes,
) -> Result<Response> { ) -> Result<Response> {
let handler = log_state.log_handler; let handler = log_state.log_handler;
@@ -514,7 +516,7 @@ pub async fn log_ingester(
Extension(mut query_ctx): Extension<QueryContext>, Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>, TypedHeader(content_type): TypedHeader<ContentType>,
headers: HeaderMap, headers: HeaderMap,
payload: String, payload: Bytes,
) -> Result<HttpResponse> { ) -> Result<HttpResponse> {
// validate source and payload // validate source and payload
let source = query_params.source.as_deref(); let source = query_params.source.as_deref();
@@ -565,40 +567,45 @@ pub async fn log_ingester(
fn extract_pipeline_value_by_content_type( fn extract_pipeline_value_by_content_type(
content_type: ContentType, content_type: ContentType,
payload: String, payload: Bytes,
ignore_errors: bool, ignore_errors: bool,
) -> Result<Vec<Value>> { ) -> Result<Vec<Value>> {
Ok(match content_type { Ok(match content_type {
ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory( ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(), Deserializer::from_slice(&payload).into_iter(),
ignore_errors, ignore_errors,
)?, )?,
ct if ct == *NDJSON_CONTENT_TYPE => { ct if ct == *NDJSON_CONTENT_TYPE => {
let mut result = Vec::with_capacity(1000); let mut result = Vec::with_capacity(1000);
for (index, line) in payload.lines().enumerate() { for (index, line) in payload.lines().enumerate() {
match serde_json::from_str(line) { let line = match line {
Ok(v) => { Ok(line) if !line.is_empty() => line,
result.push(v); Ok(_) => continue, // Skip empty lines
} Err(_) if ignore_errors => continue,
Err(_) => { Err(e) => {
if !ignore_errors { warn!(e; "invalid string at index: {}", index);
warn!( return InvalidParameterSnafu {
"invalid json item in array, index: {:?}, value: {:?}", reason: format!("invalid line at index: {}", index),
index, line
);
return InvalidParameterSnafu {
reason: format!("invalid item:{} in array", line),
}
.fail();
} }
.fail();
} }
};
if let Ok(v) = serde_json::from_str(&line) {
result.push(v);
} else if !ignore_errors {
warn!("invalid JSON at index: {}, content: {:?}", index, line);
return InvalidParameterSnafu {
reason: format!("invalid JSON at index: {}", index),
}
.fail();
} }
} }
result result
} }
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
.lines() .lines()
.filter(|line| !line.is_empty()) .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
.map(|line| json!({"message": line})) .map(|line| json!({"message": line}))
.collect(), .collect(),
_ => UnsupportedContentTypeSnafu { content_type }.fail()?, _ => UnsupportedContentTypeSnafu { content_type }.fail()?,
@@ -677,7 +684,8 @@ pub(crate) async fn ingest_logs_inner(
pub trait LogValidator: Send + Sync { pub trait LogValidator: Send + Sync {
/// validate payload by source before processing /// validate payload by source before processing
/// Return a `Some` result to indicate validation failure. /// Return a `Some` result to indicate validation failure.
async fn validate(&self, source: Option<&str>, payload: &str) -> Option<Result<HttpResponse>>; async fn validate(&self, source: Option<&str>, payload: &Bytes)
-> Option<Result<HttpResponse>>;
} }
pub type LogValidatorRef = Arc<dyn LogValidator + 'static>; pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
@@ -731,17 +739,17 @@ mod tests {
{"a": 1} {"a": 1}
{"b": 2"} {"b": 2"}
{"c": 1} {"c": 1}
"#; "#
.as_bytes();
let payload = Bytes::from_static(payload);
let fail_rest = let fail_rest =
extract_pipeline_value_by_content_type(ContentType::json(), payload.to_string(), true); extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
assert!(fail_rest.is_ok()); assert!(fail_rest.is_ok());
assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]); assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]);
let fail_only_wrong = extract_pipeline_value_by_content_type( let fail_only_wrong =
NDJSON_CONTENT_TYPE.clone(), extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
payload.to_string(),
true,
);
assert!(fail_only_wrong.is_ok()); assert!(fail_only_wrong.is_ok());
assert_eq!( assert_eq!(
fail_only_wrong.unwrap(), fail_only_wrong.unwrap(),