feat: support text/plain format for log ingestion (#4300)

* feat: support text/plain format of log input

* refactor: pipeline query and delete using dataframe api

* chore: minor refactor

* refactor: skip jsonify when processing plan/text

* refactor: support array(string) as pipeline engine input
This commit is contained in:
shuiyisong
2024-07-12 17:17:15 +08:00
committed by GitHub
parent 9f2d53c3df
commit 67dfdd6c61
8 changed files with 291 additions and 65 deletions

4
Cargo.lock generated
View File

@@ -6963,9 +6963,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.47.3"
version = "0.47.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cac4826fe3d5482a49b92955b0f6b06ce45b46ec84484176588209bfbf996870"
checksum = "ff159a2da374ef2d64848a6547943cf1af7d2ceada5ae77be175e1389aa07ae3"
dependencies = [
"anyhow",
"async-trait",

View File

@@ -93,6 +93,19 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
Value::Map(map) => {
values.push(self.exec_map(map)?);
}
Value::String(_) => {
let fields = self.fields();
if fields.len() != 1 {
return Err(format!(
"{} processor: expected fields length 1 when processing line input, but got {}",
self.kind(),
fields.len()
));
}
let field = fields.first().unwrap();
values.push(self.exec_field(&val, field).map(Value::Map)?);
}
_ if self.ignore_processor_array_failure() => {
warn!("expected a map, but got {val}")
}

View File

@@ -81,6 +81,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create dataframe"))]
DataFrame {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("General catalog error"))]
Catalog {
source: catalog::error::Error,
@@ -126,6 +133,7 @@ impl ErrorExt for Error {
| InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
}

View File

@@ -24,33 +24,32 @@ use common_query::OutputData;
use common_recordbatch::util as record_util;
use common_telemetry::{debug, info};
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::col;
use datafusion_common::{TableReference, ToDFSchema};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlanBuilder};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan};
use datatypes::prelude::ScalarVector;
use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use moka::sync::Cache;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableInfo;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::error::{
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu,
ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
PipelineNotFoundSnafu, Result,
DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu,
InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result,
};
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
use crate::manager::{PipelineInfo, PipelineVersion};
use crate::util::{build_plan_filter, generate_pipeline_cache_key};
use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions};
pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
@@ -337,15 +336,24 @@ impl PipelineTable {
return Ok(None);
}
// 2. do delete
// 2. prepare dataframe
let dataframe = self
.query_engine
.read_table(self.table.clone())
.context(DataFrameSnafu)?;
let DataFrame::DataFusion(dataframe) = dataframe;
let dataframe = dataframe
.filter(prepare_dataframe_conditions(schema, name, version))
.context(BuildDfLogicalPlanSnafu)?;
// 3. prepare dml stmt
let table_info = self.table.table_info();
let table_name = TableReference::full(
table_info.catalog_name.clone(),
table_info.schema_name.clone(),
table_info.name.clone(),
);
let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
let table_source = Arc::new(DefaultTableSource::new(table_provider));
let df_schema = Arc::new(
table_info
@@ -357,24 +365,17 @@ impl PipelineTable {
.context(BuildDfLogicalPlanSnafu)?,
);
// create scan plan
let logical_plan = LogicalPlanBuilder::scan(table_name.clone(), table_source, None)
.context(BuildDfLogicalPlanSnafu)?
.filter(build_plan_filter(schema, name, version))
.context(BuildDfLogicalPlanSnafu)?
.build()
.context(BuildDfLogicalPlanSnafu)?;
// create dml stmt
let stmt = DmlStatement::new(
table_name,
df_schema,
datafusion_expr::WriteOp::Delete,
Arc::new(logical_plan),
Arc::new(dataframe.into_parts().1),
);
let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt));
// 4. execute dml stmt
let output = self
.query_engine
.execute(plan, Self::query_ctx(&table_info))
@@ -404,24 +405,19 @@ impl PipelineTable {
name: &str,
version: PipelineVersion,
) -> Result<Option<(String, TimestampNanosecond)>> {
let table_info = self.table.table_info();
// 1. prepare dataframe
let dataframe = self
.query_engine
.read_table(self.table.clone())
.context(DataFrameSnafu)?;
let DataFrame::DataFusion(dataframe) = dataframe;
let table_name = TableReference::full(
table_info.catalog_name.clone(),
table_info.schema_name.clone(),
table_info.name.clone(),
);
let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
let table_source = Arc::new(DefaultTableSource::new(table_provider));
let plan = LogicalPlanBuilder::scan(table_name, table_source, None)
let dataframe = dataframe
.filter(prepare_dataframe_conditions(schema, name, version))
.context(BuildDfLogicalPlanSnafu)?
.filter(build_plan_filter(schema, name, version))
.context(BuildDfLogicalPlanSnafu)?
.project(vec![
col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME),
col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME),
.select_columns(&[
PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
])
.context(BuildDfLogicalPlanSnafu)?
.sort(vec![
@@ -429,15 +425,18 @@ impl PipelineTable {
])
.context(BuildDfLogicalPlanSnafu)?
.limit(0, Some(1))
.context(BuildDfLogicalPlanSnafu)?
.build()
.context(BuildDfLogicalPlanSnafu)?;
let plan = LogicalPlan::DfPlan(dataframe.into_parts().1);
let table_info = self.table.table_info();
debug!("find_pipeline_by_name: plan: {:?}", plan);
// 2. execute plan
let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), Self::query_ctx(&table_info))
.execute(plan, Self::query_ctx(&table_info))
.await
.context(ExecuteInternalStatementSnafu)?;
let stream = match output.data {
@@ -446,6 +445,7 @@ impl PipelineTable {
_ => unreachable!(),
};
// 3. construct result
let records = record_util::collect(stream)
.await
.context(CollectRecordsSnafu)?;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_time::Timestamp;
use datafusion_expr::{and, col, lit, Expr};
use datafusion_expr::{col, lit, Expr};
use datatypes::timestamp::TimestampNanosecond;
use crate::error::{InvalidPipelineVersionSnafu, Result};
@@ -34,19 +34,22 @@ pub fn to_pipeline_version(version_str: Option<String>) -> Result<PipelineVersio
}
}
pub(crate) fn build_plan_filter(schema: &str, name: &str, version: PipelineVersion) -> Expr {
let schema_and_name_filter = and(
col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)),
pub(crate) fn prepare_dataframe_conditions(
schema: &str,
name: &str,
version: PipelineVersion,
) -> Expr {
let mut conditions = vec![
col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)),
);
col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)),
];
if let Some(v) = version {
and(
schema_and_name_filter,
col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())),
)
} else {
schema_and_name_filter
conditions
.push(col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())));
}
conditions.into_iter().reduce(Expr::and).unwrap()
}
pub(crate) fn generate_pipeline_cache_key(

View File

@@ -14,7 +14,8 @@
use common_telemetry::tracing::info;
use greptime_proto::v1::value::ValueData::{
BoolValue, F64Value, StringValue, TimestampSecondValue, U32Value, U64Value, U8Value,
BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue, U32Value,
U64Value, U8Value,
};
use greptime_proto::v1::Value as GreptimeValue;
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
@@ -455,3 +456,55 @@ transform:
info!("\n");
}
}
#[test]
fn test_simple_data() {
let input_value_str = r#"
{
"line": "2024-05-25 20:16:37.217 hello world"
}
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.unwrap()
.try_into()
.unwrap();
let pipeline_yaml = r#"
processors:
- dissect:
fields:
- line
patterns:
- "%{+ts} %{+ts} %{content}"
- date:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
transform:
- fields:
- content
type: string
- field: ts
type: time
index: timestamp
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let output = pipeline.exec(input_value).unwrap();
let r = output
.rows
.into_iter()
.flat_map(|v| v.values)
.map(|v| v.value_data.unwrap())
.collect::<Vec<_>>();
let expected = vec![
StringValue("hello world".into()),
TimestampNanosecondValue(1716668197217000000),
];
assert_eq!(expected, r);
}

View File

@@ -25,7 +25,6 @@ use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use common_telemetry::{error, warn};
use mime_guess::mime;
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
use pipeline::util::to_pipeline_version;
use pipeline::{PipelineVersion, Value as PipelineValue};
@@ -250,15 +249,7 @@ pub async fn log_ingester(
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
let m: mime::Mime = content_type.clone().into();
let value = match m.subtype() {
mime::JSON => transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?,
// add more content type support
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
};
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
ingest_logs_inner(
handler,
@@ -271,18 +262,43 @@ pub async fn log_ingester(
.await
}
fn extract_pipeline_value_by_content_type(
content_type: ContentType,
payload: String,
ignore_errors: bool,
) -> Result<PipelineValue> {
Ok(match content_type {
ct if ct == ContentType::json() => {
let json_value = transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?;
PipelineValue::try_from(json_value)
.map_err(|reason| CastTypeSnafu { msg: reason }.build())
.context(PipelineSnafu)?
}
ct if ct == ContentType::text() || ct == ContentType::text_utf8() => {
let arr = payload
.lines()
.filter(|line| !line.is_empty())
.map(|line| PipelineValue::String(line.to_string()))
.collect::<Vec<PipelineValue>>();
PipelineValue::Array(arr.into())
}
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
})
}
async fn ingest_logs_inner(
state: LogHandlerRef,
pipeline_name: String,
version: PipelineVersion,
table_name: String,
payload: Value,
pipeline_data: PipelineValue,
query_ctx: QueryContextRef,
) -> Result<HttpResponse> {
let start = std::time::Instant::now();
let pipeline_data = PipelineValue::try_from(payload)
.map_err(|reason| CastTypeSnafu { msg: reason }.build())
.context(PipelineSnafu)?;
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
@@ -321,3 +337,35 @@ pub struct LogState {
pub log_handler: LogHandlerRef,
pub log_validator: Option<LogValidatorRef>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transform_ndjson() {
let s = "{\"a\": 1}\n{\"b\": 2}";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
let s = "{\"a\": 1}";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
assert_eq!(a, "[{\"a\":1}]");
let s = "[{\"a\": 1}]";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
assert_eq!(a, "[{\"a\":1}]");
let s = "[{\"a\": 1}, {\"b\": 2}]";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
}
}

View File

@@ -78,6 +78,7 @@ macro_rules! http_tests {
test_vm_proto_remote_write,
test_pipeline_api,
test_plain_text_ingestion,
);
)*
};
@@ -1127,3 +1128,103 @@ transform:
guard.remove_all().await;
}
pub async fn test_plain_text_ingestion(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;
// handshake
let client = TestClient::new(app);
let body = r#"
processors:
- dissect:
fields:
- line
patterns:
- "%{+ts} %{+ts} %{content}"
- date:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
transform:
- fields:
- content
type: string
- field: ts
type: time
index: timestamp
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let content = res.text().await;
let content = serde_json::from_str(&content);
assert!(content.is_ok());
// {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]}
let content: Value = content.unwrap();
let version_str = content
.get("pipelines")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("version")
.unwrap()
.as_str()
.unwrap()
.to_string();
assert!(!version_str.is_empty());
// 2. write data
let data_body = r#"
2024-05-25 20:16:37.217 hello
2024-05-25 20:16:37.218 hello world
"#;
let res = client
.post("/v1/events/logs?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "text/plain")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. select data
let res = client.get("/v1/sql?sql=select * from logs1").send().await;
assert_eq!(res.status(), StatusCode::OK);
let resp = res.text().await;
let resp: Value = serde_json::from_str(&resp).unwrap();
let v = resp
.get("output")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("records")
.unwrap()
.get("rows")
.unwrap()
.to_string();
assert_eq!(
v,
r#"[["hello",1716668197217000000],["hello world",1716668197218000000]]"#,
);
guard.remove_all().await;
}