mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
1 Commits
feature/df
...
v0.13.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
557e850d87 |
@@ -12,12 +12,14 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::io::BufRead;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use api::v1::RowInsertRequests;
|
use api::v1::RowInsertRequests;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use axum::body::Bytes;
|
||||||
use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
|
use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
|
||||||
use axum::http::header::CONTENT_TYPE;
|
use axum::http::header::CONTENT_TYPE;
|
||||||
use axum::http::{HeaderMap, StatusCode};
|
use axum::http::{HeaderMap, StatusCode};
|
||||||
@@ -389,8 +391,8 @@ pub struct PipelineDryrunParams {
|
|||||||
/// Check if the payload is valid json
|
/// Check if the payload is valid json
|
||||||
/// Check if the payload contains pipeline or pipeline_name and data
|
/// Check if the payload contains pipeline or pipeline_name and data
|
||||||
/// Return Some if valid, None if invalid
|
/// Return Some if valid, None if invalid
|
||||||
fn check_pipeline_dryrun_params_valid(payload: &str) -> Option<PipelineDryrunParams> {
|
fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
|
||||||
match serde_json::from_str::<PipelineDryrunParams>(payload) {
|
match serde_json::from_slice::<PipelineDryrunParams>(payload) {
|
||||||
// payload with pipeline or pipeline_name and data is array
|
// payload with pipeline or pipeline_name and data is array
|
||||||
Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
|
Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
|
||||||
// because of the pipeline_name or pipeline is required
|
// because of the pipeline_name or pipeline is required
|
||||||
@@ -432,7 +434,7 @@ pub async fn pipeline_dryrun(
|
|||||||
Query(query_params): Query<LogIngesterQueryParams>,
|
Query(query_params): Query<LogIngesterQueryParams>,
|
||||||
Extension(mut query_ctx): Extension<QueryContext>,
|
Extension(mut query_ctx): Extension<QueryContext>,
|
||||||
TypedHeader(content_type): TypedHeader<ContentType>,
|
TypedHeader(content_type): TypedHeader<ContentType>,
|
||||||
payload: String,
|
payload: Bytes,
|
||||||
) -> Result<Response> {
|
) -> Result<Response> {
|
||||||
let handler = log_state.log_handler;
|
let handler = log_state.log_handler;
|
||||||
|
|
||||||
@@ -514,7 +516,7 @@ pub async fn log_ingester(
|
|||||||
Extension(mut query_ctx): Extension<QueryContext>,
|
Extension(mut query_ctx): Extension<QueryContext>,
|
||||||
TypedHeader(content_type): TypedHeader<ContentType>,
|
TypedHeader(content_type): TypedHeader<ContentType>,
|
||||||
headers: HeaderMap,
|
headers: HeaderMap,
|
||||||
payload: String,
|
payload: Bytes,
|
||||||
) -> Result<HttpResponse> {
|
) -> Result<HttpResponse> {
|
||||||
// validate source and payload
|
// validate source and payload
|
||||||
let source = query_params.source.as_deref();
|
let source = query_params.source.as_deref();
|
||||||
@@ -565,40 +567,45 @@ pub async fn log_ingester(
|
|||||||
|
|
||||||
fn extract_pipeline_value_by_content_type(
|
fn extract_pipeline_value_by_content_type(
|
||||||
content_type: ContentType,
|
content_type: ContentType,
|
||||||
payload: String,
|
payload: Bytes,
|
||||||
ignore_errors: bool,
|
ignore_errors: bool,
|
||||||
) -> Result<Vec<Value>> {
|
) -> Result<Vec<Value>> {
|
||||||
Ok(match content_type {
|
Ok(match content_type {
|
||||||
ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory(
|
ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory(
|
||||||
Deserializer::from_str(&payload).into_iter(),
|
Deserializer::from_slice(&payload).into_iter(),
|
||||||
ignore_errors,
|
ignore_errors,
|
||||||
)?,
|
)?,
|
||||||
ct if ct == *NDJSON_CONTENT_TYPE => {
|
ct if ct == *NDJSON_CONTENT_TYPE => {
|
||||||
let mut result = Vec::with_capacity(1000);
|
let mut result = Vec::with_capacity(1000);
|
||||||
for (index, line) in payload.lines().enumerate() {
|
for (index, line) in payload.lines().enumerate() {
|
||||||
match serde_json::from_str(line) {
|
let line = match line {
|
||||||
Ok(v) => {
|
Ok(line) if !line.is_empty() => line,
|
||||||
result.push(v);
|
Ok(_) => continue, // Skip empty lines
|
||||||
}
|
Err(_) if ignore_errors => continue,
|
||||||
Err(_) => {
|
Err(e) => {
|
||||||
if !ignore_errors {
|
warn!(e; "invalid string at index: {}", index);
|
||||||
warn!(
|
return InvalidParameterSnafu {
|
||||||
"invalid json item in array, index: {:?}, value: {:?}",
|
reason: format!("invalid line at index: {}", index),
|
||||||
index, line
|
|
||||||
);
|
|
||||||
return InvalidParameterSnafu {
|
|
||||||
reason: format!("invalid item:{} in array", line),
|
|
||||||
}
|
|
||||||
.fail();
|
|
||||||
}
|
}
|
||||||
|
.fail();
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Ok(v) = serde_json::from_str(&line) {
|
||||||
|
result.push(v);
|
||||||
|
} else if !ignore_errors {
|
||||||
|
warn!("invalid JSON at index: {}, content: {:?}", index, line);
|
||||||
|
return InvalidParameterSnafu {
|
||||||
|
reason: format!("invalid JSON at index: {}", index),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
|
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
|
||||||
.lines()
|
.lines()
|
||||||
.filter(|line| !line.is_empty())
|
.filter_map(|line| line.ok().filter(|line| !line.is_empty()))
|
||||||
.map(|line| json!({"message": line}))
|
.map(|line| json!({"message": line}))
|
||||||
.collect(),
|
.collect(),
|
||||||
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
|
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
|
||||||
@@ -677,7 +684,8 @@ pub(crate) async fn ingest_logs_inner(
|
|||||||
pub trait LogValidator: Send + Sync {
|
pub trait LogValidator: Send + Sync {
|
||||||
/// validate payload by source before processing
|
/// validate payload by source before processing
|
||||||
/// Return a `Some` result to indicate validation failure.
|
/// Return a `Some` result to indicate validation failure.
|
||||||
async fn validate(&self, source: Option<&str>, payload: &str) -> Option<Result<HttpResponse>>;
|
async fn validate(&self, source: Option<&str>, payload: &Bytes)
|
||||||
|
-> Option<Result<HttpResponse>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
|
pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
|
||||||
@@ -731,17 +739,17 @@ mod tests {
|
|||||||
{"a": 1}
|
{"a": 1}
|
||||||
{"b": 2"}
|
{"b": 2"}
|
||||||
{"c": 1}
|
{"c": 1}
|
||||||
"#;
|
"#
|
||||||
|
.as_bytes();
|
||||||
|
let payload = Bytes::from_static(payload);
|
||||||
|
|
||||||
let fail_rest =
|
let fail_rest =
|
||||||
extract_pipeline_value_by_content_type(ContentType::json(), payload.to_string(), true);
|
extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
|
||||||
assert!(fail_rest.is_ok());
|
assert!(fail_rest.is_ok());
|
||||||
assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]);
|
assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]);
|
||||||
|
|
||||||
let fail_only_wrong = extract_pipeline_value_by_content_type(
|
let fail_only_wrong =
|
||||||
NDJSON_CONTENT_TYPE.clone(),
|
extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
|
||||||
payload.to_string(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
assert!(fail_only_wrong.is_ok());
|
assert!(fail_only_wrong.is_ok());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
fail_only_wrong.unwrap(),
|
fail_only_wrong.unwrap(),
|
||||||
|
|||||||
Reference in New Issue
Block a user