diff --git a/Cargo.lock b/Cargo.lock index fdf1952380..a48b6afee2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6963,9 +6963,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.47.3" +version = "0.47.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac4826fe3d5482a49b92955b0f6b06ce45b46ec84484176588209bfbf996870" +checksum = "ff159a2da374ef2d64848a6547943cf1af7d2ceada5ae77be175e1389aa07ae3" dependencies = [ "anyhow", "async-trait", diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 390538098a..5d1396067d 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -93,6 +93,19 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { Value::Map(map) => { values.push(self.exec_map(map)?); } + Value::String(_) => { + let fields = self.fields(); + if fields.len() != 1 { + return Err(format!( + "{} processor: expected fields length 1 when processing line input, but got {}", + self.kind(), + fields.len() + )); + } + let field = fields.first().unwrap(); + + values.push(self.exec_field(&val, field).map(Value::Map)?); + } _ if self.ignore_processor_array_failure() => { warn!("expected a map, but got {val}") } diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index 07332590f1..4467b42b51 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -81,6 +81,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to create dataframe"))] + DataFrame { + source: query::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("General catalog error"))] Catalog { source: catalog::error::Error, @@ -126,6 +133,7 @@ impl ErrorExt for Error { | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, BuildDfLogicalPlan { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), + DataFrame { source, .. } => source.status_code(), Catalog { source, .. } => source.status_code(), CreateTable { source, .. } => source.status_code(), } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 58df2bcabb..3c69f59f2a 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -24,33 +24,32 @@ use common_query::OutputData; use common_recordbatch::util as record_util; use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; -use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::col; use datafusion_common::{TableReference, ToDFSchema}; -use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan}; use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; use moka::sync::Cache; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; +use query::dataframe::DataFrame; use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableInfo; -use table::table::adapter::DfTableProviderAdapter; use table::TableRef; use crate::error::{ BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu, - ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu, - PipelineNotFoundSnafu, Result, + DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu, + InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result, }; use crate::etl::transform::GreptimeTransformer; use crate::etl::{parse, Content, Pipeline}; use crate::manager::{PipelineInfo, PipelineVersion}; -use crate::util::{build_plan_filter, generate_pipeline_cache_key}; +use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions}; pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines"; pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; @@ -337,15 +336,24 @@ impl PipelineTable { return Ok(None); } - // 2. do delete + // 2. prepare dataframe + let dataframe = self + .query_engine + .read_table(self.table.clone()) + .context(DataFrameSnafu)?; + let DataFrame::DataFusion(dataframe) = dataframe; + + let dataframe = dataframe + .filter(prepare_dataframe_conditions(schema, name, version)) + .context(BuildDfLogicalPlanSnafu)?; + + // 3. prepare dml stmt let table_info = self.table.table_info(); let table_name = TableReference::full( table_info.catalog_name.clone(), table_info.schema_name.clone(), table_info.name.clone(), ); - let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); - let table_source = Arc::new(DefaultTableSource::new(table_provider)); let df_schema = Arc::new( table_info @@ -357,24 +365,17 @@ impl PipelineTable { .context(BuildDfLogicalPlanSnafu)?, ); - // create scan plan - let logical_plan = LogicalPlanBuilder::scan(table_name.clone(), table_source, None) - .context(BuildDfLogicalPlanSnafu)? - .filter(build_plan_filter(schema, name, version)) - .context(BuildDfLogicalPlanSnafu)? - .build() - .context(BuildDfLogicalPlanSnafu)?; - // create dml stmt let stmt = DmlStatement::new( table_name, df_schema, datafusion_expr::WriteOp::Delete, - Arc::new(logical_plan), + Arc::new(dataframe.into_parts().1), ); let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt)); + // 4. execute dml stmt let output = self .query_engine .execute(plan, Self::query_ctx(&table_info)) @@ -404,24 +405,19 @@ impl PipelineTable { name: &str, version: PipelineVersion, ) -> Result> { - let table_info = self.table.table_info(); + // 1. prepare dataframe + let dataframe = self + .query_engine + .read_table(self.table.clone()) + .context(DataFrameSnafu)?; + let DataFrame::DataFusion(dataframe) = dataframe; - let table_name = TableReference::full( - table_info.catalog_name.clone(), - table_info.schema_name.clone(), - table_info.name.clone(), - ); - - let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); - let table_source = Arc::new(DefaultTableSource::new(table_provider)); - - let plan = LogicalPlanBuilder::scan(table_name, table_source, None) + let dataframe = dataframe + .filter(prepare_dataframe_conditions(schema, name, version)) .context(BuildDfLogicalPlanSnafu)? - .filter(build_plan_filter(schema, name, version)) - .context(BuildDfLogicalPlanSnafu)? - .project(vec![ - col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME), - col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME), + .select_columns(&[ + PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME, + PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, ]) .context(BuildDfLogicalPlanSnafu)? .sort(vec![ @@ -429,15 +425,18 @@ impl PipelineTable { ]) .context(BuildDfLogicalPlanSnafu)? .limit(0, Some(1)) - .context(BuildDfLogicalPlanSnafu)? - .build() .context(BuildDfLogicalPlanSnafu)?; + let plan = LogicalPlan::DfPlan(dataframe.into_parts().1); + + let table_info = self.table.table_info(); + debug!("find_pipeline_by_name: plan: {:?}", plan); + // 2. execute plan let output = self .query_engine - .execute(LogicalPlan::DfPlan(plan), Self::query_ctx(&table_info)) + .execute(plan, Self::query_ctx(&table_info)) .await .context(ExecuteInternalStatementSnafu)?; let stream = match output.data { @@ -446,6 +445,7 @@ impl PipelineTable { _ => unreachable!(), }; + // 3. construct result let records = record_util::collect(stream) .await .context(CollectRecordsSnafu)?; diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs index 6133c64215..a7d968edcf 100644 --- a/src/pipeline/src/manager/util.rs +++ b/src/pipeline/src/manager/util.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_time::Timestamp; -use datafusion_expr::{and, col, lit, Expr}; +use datafusion_expr::{col, lit, Expr}; use datatypes::timestamp::TimestampNanosecond; use crate::error::{InvalidPipelineVersionSnafu, Result}; @@ -34,19 +34,22 @@ pub fn to_pipeline_version(version_str: Option) -> Result Expr { - let schema_and_name_filter = and( - col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), +pub(crate) fn prepare_dataframe_conditions( + schema: &str, + name: &str, + version: PipelineVersion, +) -> Expr { + let mut conditions = vec![ col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), - ); + col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), + ]; + if let Some(v) = version { - and( - schema_and_name_filter, - col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), - ) - } else { - schema_and_name_filter + conditions + .push(col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string()))); } + + conditions.into_iter().reduce(Expr::and).unwrap() } pub(crate) fn generate_pipeline_cache_key( diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 08f2ad3811..af3b5a8c20 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -14,7 +14,8 @@ use common_telemetry::tracing::info; use greptime_proto::v1::value::ValueData::{ - BoolValue, F64Value, StringValue, TimestampSecondValue, U32Value, U64Value, U8Value, + BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue, U32Value, + U64Value, U8Value, }; use greptime_proto::v1::Value as GreptimeValue; use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; @@ -455,3 +456,55 @@ transform: info!("\n"); } } + +#[test] +fn test_simple_data() { + let input_value_str = r#" +{ + "line": "2024-05-25 20:16:37.217 hello world" +} +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .unwrap() + .try_into() + .unwrap(); + + let pipeline_yaml = r#" +processors: + - dissect: + fields: + - line + patterns: + - "%{+ts} %{+ts} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + +transform: + - fields: + - content + type: string + - field: ts + type: time + index: timestamp +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let output = pipeline.exec(input_value).unwrap(); + let r = output + .rows + .into_iter() + .flat_map(|v| v.values) + .map(|v| v.value_data.unwrap()) + .collect::>(); + + let expected = vec![ + StringValue("hello world".into()), + TimestampNanosecondValue(1716668197217000000), + ]; + + assert_eq!(expected, r); +} diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index ea436009b0..53d3b8d1f3 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -25,7 +25,6 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; use common_telemetry::{error, warn}; -use mime_guess::mime; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; use pipeline::util::to_pipeline_version; use pipeline::{PipelineVersion, Value as PipelineValue}; @@ -250,15 +249,7 @@ pub async fn log_ingester( let ignore_errors = query_params.ignore_errors.unwrap_or(false); - let m: mime::Mime = content_type.clone().into(); - let value = match m.subtype() { - mime::JSON => transform_ndjson_array_factory( - Deserializer::from_str(&payload).into_iter(), - ignore_errors, - )?, - // add more content type support - _ => UnsupportedContentTypeSnafu { content_type }.fail()?, - }; + let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; ingest_logs_inner( handler, @@ -271,18 +262,43 @@ pub async fn log_ingester( .await } +fn extract_pipeline_value_by_content_type( + content_type: ContentType, + payload: String, + ignore_errors: bool, +) -> Result { + Ok(match content_type { + ct if ct == ContentType::json() => { + let json_value = transform_ndjson_array_factory( + Deserializer::from_str(&payload).into_iter(), + ignore_errors, + )?; + + PipelineValue::try_from(json_value) + .map_err(|reason| CastTypeSnafu { msg: reason }.build()) + .context(PipelineSnafu)? + } + ct if ct == ContentType::text() || ct == ContentType::text_utf8() => { + let arr = payload + .lines() + .filter(|line| !line.is_empty()) + .map(|line| PipelineValue::String(line.to_string())) + .collect::>(); + PipelineValue::Array(arr.into()) + } + _ => UnsupportedContentTypeSnafu { content_type }.fail()?, + }) +} + async fn ingest_logs_inner( state: LogHandlerRef, pipeline_name: String, version: PipelineVersion, table_name: String, - payload: Value, + pipeline_data: PipelineValue, query_ctx: QueryContextRef, ) -> Result { let start = std::time::Instant::now(); - let pipeline_data = PipelineValue::try_from(payload) - .map_err(|reason| CastTypeSnafu { msg: reason }.build()) - .context(PipelineSnafu)?; let pipeline = state .get_pipeline(&pipeline_name, version, query_ctx.clone()) @@ -321,3 +337,35 @@ pub struct LogState { pub log_handler: LogHandlerRef, pub log_validator: Option, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_transform_ndjson() { + let s = "{\"a\": 1}\n{\"b\": 2}"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); + + let s = "{\"a\": 1}"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1}]"); + + let s = "[{\"a\": 1}]"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1}]"); + + let s = "[{\"a\": 1}, {\"b\": 2}]"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9a7d982790..06ee1a0221 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -78,6 +78,7 @@ macro_rules! http_tests { test_vm_proto_remote_write, test_pipeline_api, + test_plain_text_ingestion, ); )* }; @@ -1127,3 +1128,103 @@ transform: guard.remove_all().await; } + +pub async fn test_plain_text_ingestion(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await; + + // handshake + let client = TestClient::new(app); + + let body = r#" +processors: + - dissect: + fields: + - line + patterns: + - "%{+ts} %{+ts} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + +transform: + - fields: + - content + type: string + - field: ts + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} + let content: Value = content.unwrap(); + + let version_str = content + .get("pipelines") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("version") + .unwrap() + .as_str() + .unwrap() + .to_string(); + assert!(!version_str.is_empty()); + + // 2. write data + let data_body = r#" +2024-05-25 20:16:37.217 hello +2024-05-25 20:16:37.218 hello world +"#; + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "text/plain") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. select data + let res = client.get("/v1/sql?sql=select * from logs1").send().await; + assert_eq!(res.status(), StatusCode::OK); + let resp = res.text().await; + + let resp: Value = serde_json::from_str(&resp).unwrap(); + let v = resp + .get("output") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("records") + .unwrap() + .get("rows") + .unwrap() + .to_string(); + + assert_eq!( + v, + r#"[["hello",1716668197217000000],["hello world",1716668197218000000]]"#, + ); + + guard.remove_all().await; +}