From 538b5abaae2c4e578bd313a9a6b5232e7a4227e0 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Fri, 6 Jun 2025 13:39:10 -0700 Subject: [PATCH] chore: support table suffix in hint (#6223) * feat: pipeline recognize hints from exec * chore: rename and add test * chore: minor improve * chore: rename and add comments * fix: typos * feat: add initial impl for vrl processor * chore: update processors to allow vrl process * feat: pipeline recognize hints from exec * chore: rename and add test * chore: minor improve * chore: rename and add comments * fix: typos * chore: remove unnecessory clone fn * chore: group metrics * chore: use struct in transform output enum * test: add test for vrl * fix: leaked conflicts * chore: merge branch code & add check in compile * fix: check condition * fix: check auto-transform timeindex * chore: support table_suffix in hint * chore: add test for table suffix in vrl hint * refactor: change context_opt to a struct --- src/pipeline/src/dispatcher.rs | 4 +- src/pipeline/src/error.rs | 10 +- src/pipeline/src/etl.rs | 38 ++-- src/pipeline/src/etl/ctx_req.rs | 173 ++++++++++++------ src/pipeline/src/etl/processor/date.rs | 4 + src/pipeline/src/etl/processor/epoch.rs | 4 + src/pipeline/src/etl/processor/timestamp.rs | 4 + src/pipeline/src/etl/transform.rs | 12 +- .../src/etl/transform/transformer/greptime.rs | 19 +- src/pipeline/src/lib.rs | 2 +- tests-integration/tests/http.rs | 76 +++++++- 11 files changed, 247 insertions(+), 99 deletions(-) diff --git a/src/pipeline/src/dispatcher.rs b/src/pipeline/src/dispatcher.rs index 37b3469a25..f34140b1bc 100644 --- a/src/pipeline/src/dispatcher.rs +++ b/src/pipeline/src/dispatcher.rs @@ -20,10 +20,10 @@ use crate::error::{ Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu, ValueRequiredForDispatcherRuleSnafu, }; +use crate::etl::ctx_req::TABLE_SUFFIX_KEY; use crate::{PipelineMap, Value}; const FIELD: &str = "field"; -const TABLE_SUFFIX: &str = "table_suffix"; const PIPELINE: &str = "pipeline"; const VALUE: &str = "value"; const RULES: &str = "rules"; @@ -80,7 +80,7 @@ impl TryFrom<&Yaml> for Dispatcher { rules .iter() .map(|rule| { - let table_part = rule[TABLE_SUFFIX] + let table_part = rule[TABLE_SUFFIX_KEY] .as_str() .map(|s| s.to_string()) .context(TableSuffixRequiredForDispatcherRuleSnafu)?; diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index e002030a89..f402903498 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -411,13 +411,6 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display( - "At least one timestamp-related processor is required to use auto transform" - ))] - TransformNoTimestampProcessor { - #[snafu(implicit)] - location: Location, - }, #[snafu(display( "Illegal to set multiple timestamp Index columns, please set only one: {columns}" ))] @@ -433,7 +426,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("Exactly one timestamp value is required to use auto transform"))] + #[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform"))] AutoTransformOneTimestamp { #[snafu(implicit)] location: Location, @@ -880,7 +873,6 @@ impl ErrorExt for Error { | TransformTypeMustBeSet { .. } | TransformColumnNameMustBeUnique { .. } | TransformMultipleTimestampIndex { .. } - | TransformNoTimestampProcessor { .. } | TransformTimestampIndexCount { .. } | AutoTransformOneTimestamp { .. } | CoerceUnsupportedNullType { .. } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index eb48610123..49784a5fe5 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -30,12 +30,13 @@ use yaml_rust::YamlLoader; use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ - InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result, - TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu, + AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result, + YamlLoadSnafu, YamlParseSnafu, }; +use crate::etl::ctx_req::TABLE_SUFFIX_KEY; use crate::etl::processor::ProcessorKind; use crate::tablesuffix::TableSuffixTemplate; -use crate::GreptimeTransformer; +use crate::{ContextOpt, GreptimeTransformer}; const DESCRIPTION: &str = "description"; const PROCESSORS: &str = "processors"; @@ -80,16 +81,14 @@ pub fn parse(input: &Content) -> Result { // check processors have at least one timestamp-related processor let cnt = processors .iter() - .filter(|p| { - matches!( - p, - ProcessorKind::Date(_) - | ProcessorKind::Timestamp(_) - | ProcessorKind::Epoch(_) - ) + .filter_map(|p| match p { + ProcessorKind::Date(d) => Some(d.target_count()), + ProcessorKind::Timestamp(t) => Some(t.target_count()), + ProcessorKind::Epoch(e) => Some(e.target_count()), + _ => None, }) - .count(); - ensure!(cnt > 0, TransformNoTimestampProcessorSnafu); + .sum::(); + ensure!(cnt == 1, AutoTransformOneTimestampSnafu); None } else { Some(GreptimeTransformer::new(transformers)?) @@ -161,7 +160,7 @@ pub enum PipelineExecOutput { #[derive(Debug)] pub struct TransformedOutput { - pub opt: String, + pub opt: ContextOpt, pub row: Row, pub table_suffix: Option, pub pipeline_map: PipelineMap, @@ -244,9 +243,11 @@ impl Pipeline { return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val)); } + // do transform if let Some(transformer) = self.transformer() { - let (opt, row) = transformer.transform_mut(&mut val)?; - let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val)); + let (mut opt, row) = transformer.transform_mut(&mut val)?; + let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val); + Ok(PipelineExecOutput::Transformed(TransformedOutput { opt, row, @@ -254,7 +255,12 @@ impl Pipeline { pipeline_map: val, })) } else { - let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val)); + // check table suffix var + let table_suffix = val + .remove(TABLE_SUFFIX_KEY) + .map(|f| f.to_str_value()) + .or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val))); + let mut ts_unit_map = HashMap::with_capacity(4); // get all ts values for (k, v) in val.iter() { diff --git a/src/pipeline/src/etl/ctx_req.rs b/src/pipeline/src/etl/ctx_req.rs index 6d846a1bcc..c4b38c8964 100644 --- a/src/pipeline/src/etl/ctx_req.rs +++ b/src/pipeline/src/etl/ctx_req.rs @@ -13,69 +13,145 @@ // limitations under the License. use std::collections::hash_map::IntoIter; -use std::collections::BTreeMap; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; -use itertools::Itertools; use session::context::{QueryContext, QueryContextRef}; +use crate::tablesuffix::TableSuffixTemplate; use crate::PipelineMap; -const DEFAULT_OPT: &str = ""; +const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table"; +const GREPTIME_TTL: &str = "greptime_ttl"; +const GREPTIME_APPEND_MODE: &str = "greptime_append_mode"; +const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode"; +const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table"; +const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal"; +const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix"; -pub const PIPELINE_HINT_KEYS: [&str; 6] = [ - "greptime_auto_create_table", - "greptime_ttl", - "greptime_append_mode", - "greptime_merge_mode", - "greptime_physical_table", - "greptime_skip_wal", +pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table"; +pub(crate) const TTL_KEY: &str = "ttl"; +pub(crate) const APPEND_MODE_KEY: &str = "append_mode"; +pub(crate) const MERGE_MODE_KEY: &str = "merge_mode"; +pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table"; +pub(crate) const SKIP_WAL_KEY: &str = "skip_wal"; +pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix"; + +pub const PIPELINE_HINT_KEYS: [&str; 7] = [ + GREPTIME_AUTO_CREATE_TABLE, + GREPTIME_TTL, + GREPTIME_APPEND_MODE, + GREPTIME_MERGE_MODE, + GREPTIME_PHYSICAL_TABLE, + GREPTIME_SKIP_WAL, + GREPTIME_TABLE_SUFFIX, ]; const PIPELINE_HINT_PREFIX: &str = "greptime_"; -// Remove hints from the pipeline context and form a option string -// e.g: skip_wal=true,ttl=1d -pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> String { - let mut btreemap = BTreeMap::new(); - for k in PIPELINE_HINT_KEYS { - if let Some(v) = pipeline_map.remove(k) { - btreemap.insert(k, v.to_str_value()); +/// ContextOpt is a collection of options(including table options and pipeline options) +/// that should be extracted during the pipeline execution. +/// +/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`]. +/// It's is used as the key in [`ContextReq`] for grouping the row insert requests. +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ContextOpt { + // table options, that need to be set in the query context before making row insert requests + auto_create_table: Option, + ttl: Option, + append_mode: Option, + merge_mode: Option, + physical_table: Option, + skip_wal: Option, + + // pipeline options, not set in query context + // can be removed before the end of the pipeline execution + table_suffix: Option, +} + +impl ContextOpt { + pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self { + let mut opt = Self::default(); + for k in PIPELINE_HINT_KEYS { + if let Some(v) = pipeline_map.remove(k) { + match k { + GREPTIME_AUTO_CREATE_TABLE => { + opt.auto_create_table = Some(v.to_str_value()); + } + GREPTIME_TTL => { + opt.ttl = Some(v.to_str_value()); + } + GREPTIME_APPEND_MODE => { + opt.append_mode = Some(v.to_str_value()); + } + GREPTIME_MERGE_MODE => { + opt.merge_mode = Some(v.to_str_value()); + } + GREPTIME_PHYSICAL_TABLE => { + opt.physical_table = Some(v.to_str_value()); + } + GREPTIME_SKIP_WAL => { + opt.skip_wal = Some(v.to_str_value()); + } + GREPTIME_TABLE_SUFFIX => { + opt.table_suffix = Some(v.to_str_value()); + } + _ => {} + } + } + } + opt + } + + pub(crate) fn resolve_table_suffix( + &mut self, + table_suffix: Option<&TableSuffixTemplate>, + pipeline_map: &PipelineMap, + ) -> Option { + self.table_suffix + .take() + .or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map))) + } + + pub fn set_query_context(self, ctx: &mut QueryContext) { + if let Some(auto_create_table) = &self.auto_create_table { + ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table); + } + if let Some(ttl) = &self.ttl { + ctx.set_extension(TTL_KEY, ttl); + } + if let Some(append_mode) = &self.append_mode { + ctx.set_extension(APPEND_MODE_KEY, append_mode); + } + if let Some(merge_mode) = &self.merge_mode { + ctx.set_extension(MERGE_MODE_KEY, merge_mode); + } + if let Some(physical_table) = &self.physical_table { + ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table); + } + if let Some(skip_wal) = &self.skip_wal { + ctx.set_extension(SKIP_WAL_KEY, skip_wal); } } - btreemap - .into_iter() - .map(|(k, v)| format!("{}={}", k.replace(PIPELINE_HINT_PREFIX, ""), v)) - .join(",") } -// split the option string back to a map -fn from_opt_to_map(opt: &str) -> HashMap<&str, &str> { - opt.split(',') - .filter_map(|s| { - s.split_once("=") - .filter(|(k, v)| !k.is_empty() && !v.is_empty()) - }) - .collect() -} - -// ContextReq is a collection of row insert requests with different options. -// The default option is empty string. -// Because options are set in query context, we have to split them into sequential calls -// e.g: -// { -// "skip_wal=true,ttl=1d": [RowInsertRequest], -// "ttl=1d": [RowInsertRequest], -// } +/// ContextReq is a collection of row insert requests with different options. +/// The default option is all empty. +/// Because options are set in query context, we have to split them into sequential calls +/// The key is a [`ContextOpt`] struct for strong type. +/// e.g: +/// { +/// "skip_wal=true,ttl=1d": [RowInsertRequest], +/// "ttl=1d": [RowInsertRequest], +/// } #[derive(Debug, Default)] pub struct ContextReq { - req: HashMap>, + req: HashMap>, } impl ContextReq { - pub fn from_opt_map(opt_map: HashMap, table_name: String) -> Self { + pub fn from_opt_map(opt_map: HashMap, table_name: String) -> Self { Self { req: opt_map .into_iter() @@ -88,17 +164,17 @@ impl ContextReq { }], ) }) - .collect::>>(), + .collect::>>(), } } pub fn default_opt_with_reqs(reqs: Vec) -> Self { let mut req_map = HashMap::new(); - req_map.insert(DEFAULT_OPT.to_string(), reqs); + req_map.insert(ContextOpt::default(), reqs); Self { req: req_map } } - pub fn add_rows(&mut self, opt: String, req: RowInsertRequest) { + pub fn add_rows(&mut self, opt: ContextOpt, req: RowInsertRequest) { self.req.entry(opt).or_default().push(req); } @@ -131,7 +207,7 @@ impl ContextReq { // It will clone the query context for each option and set the options to the context. // Then it will return the context and the row insert requests for actual insert. pub struct ContextReqIter { - opt_req: IntoIter>, + opt_req: IntoIter>, ctx_template: QueryContext, } @@ -140,13 +216,8 @@ impl Iterator for ContextReqIter { fn next(&mut self) -> Option { let (opt, req_vec) = self.opt_req.next()?; - - let opt_map = from_opt_to_map(&opt); - let mut ctx = self.ctx_template.clone(); - for (k, v) in opt_map { - ctx.set_extension(k, v); - } + opt.set_query_context(&mut ctx); Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec })) } diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index 4756dcba7f..691c4b3732 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -163,6 +163,10 @@ pub struct DateProcessor { } impl DateProcessor { + pub(crate) fn target_count(&self) -> usize { + self.fields.len() + } + fn parse(&self, val: &str) -> Result { let mut tz = Tz::UTC; if let Some(timezone) = &self.timezone { diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index bbaa8df7a5..24f0790890 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -111,6 +111,10 @@ impl EpochProcessor { Resolution::Nano => Ok(Timestamp::Nanosecond(t)), } } + + pub(crate) fn target_count(&self) -> usize { + self.fields.len() + } } impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor { diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index d608625836..d36d659dbb 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -205,6 +205,10 @@ impl TimestampProcessor { Resolution::Nano => Ok(Timestamp::Nanosecond(t)), } } + + pub(crate) fn target_count(&self) -> usize { + self.fields.len() + } } fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result, Tz)>> { diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index 2eba270b8d..2a2a908ef5 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -88,9 +88,10 @@ impl TryFrom<&Vec> for Transforms { type Error = Error; fn try_from(docs: &Vec) -> Result { - let mut transforms = Vec::with_capacity(100); - let mut all_output_keys: Vec = Vec::with_capacity(100); - let mut all_required_keys = Vec::with_capacity(100); + let mut transforms = Vec::with_capacity(32); + let mut all_output_keys: Vec = Vec::with_capacity(32); + let mut all_required_keys = Vec::with_capacity(32); + for doc in docs { let transform_builder: Transform = doc .as_hash() @@ -123,15 +124,10 @@ impl TryFrom<&Vec> for Transforms { #[derive(Debug, Clone)] pub struct Transform { pub fields: Fields, - pub type_: Value, - pub default: Option, - pub index: Option, - pub tag: bool, - pub on_failure: Option, } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index c1836b00e7..f35f005b6b 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -35,12 +35,13 @@ use crate::error::{ TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, }; +use crate::etl::ctx_req::ContextOpt; use crate::etl::field::{Field, Fields}; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::etl::value::{Timestamp, Value}; use crate::etl::PipelineMap; -use crate::{from_pipeline_map_to_opt, PipelineContext}; +use crate::PipelineContext; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -185,8 +186,8 @@ impl GreptimeTransformer { } } - pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(String, Row)> { - let opt = from_pipeline_map_to_opt(pipeline_map); + pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> { + let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map); let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; let mut output_index = 0; @@ -519,7 +520,7 @@ fn resolve_value( fn identity_pipeline_inner( pipeline_maps: Vec, pipeline_ctx: &PipelineContext<'_>, -) -> Result<(SchemaInfo, HashMap>)> { +) -> Result<(SchemaInfo, HashMap>)> { let mut schema_info = SchemaInfo::default(); let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); @@ -544,7 +545,7 @@ fn identity_pipeline_inner( let len = pipeline_maps.len(); for mut pipeline_map in pipeline_maps { - let opt = from_pipeline_map_to_opt(&mut pipeline_map); + let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map); let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?; opt_map @@ -578,7 +579,7 @@ pub fn identity_pipeline( array: Vec, table: Option>, pipeline_ctx: &PipelineContext<'_>, -) -> Result> { +) -> Result> { let input = if pipeline_ctx.pipeline_param.flatten_json_object() { array .into_iter() @@ -609,7 +610,7 @@ pub fn identity_pipeline( }, ) }) - .collect::>() + .collect::>() }) } @@ -761,7 +762,7 @@ mod tests { assert!(rows.is_ok()); let mut rows = rows.unwrap(); assert!(rows.len() == 1); - let rows = rows.remove("").unwrap(); + let rows = rows.remove(&ContextOpt::default()).unwrap(); assert_eq!(rows.schema.len(), 8); assert_eq!(rows.rows.len(), 2); assert_eq!(8, rows.rows[0].values.len()); @@ -799,7 +800,7 @@ mod tests { } assert!(rows.len() == 1); - let rows = rows.remove("").unwrap(); + let rows = rows.remove(&ContextOpt::default()).unwrap(); Rows { schema: schema.schema, diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index f9d1746498..38248eb767 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -19,7 +19,7 @@ mod manager; mod metrics; mod tablesuffix; -pub use etl::ctx_req::{from_pipeline_map_to_opt, ContextReq}; +pub use etl::ctx_req::{ContextOpt, ContextReq}; pub use etl::processor::Processor; pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo}; pub use etl::transform::transformer::identity_pipeline; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e199109d40..dad071d5d4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -106,6 +106,7 @@ macro_rules! http_tests { test_pipeline_suffix_template, test_pipeline_context, test_pipeline_with_vrl, + test_pipeline_with_hint_vrl, test_otlp_metrics, test_otlp_traces_v0, @@ -2067,7 +2068,8 @@ table_suffix: _${type} "type": "http", "time": "2024-05-25 20:16:37.217", "log": "ClusterAdapter:enter sendTextDataToCluster\\n", - "greptime_ttl": "1d" + "greptime_ttl": "1d", + "greptime_skip_wal": "true" }, { "id1": "2436", @@ -2117,12 +2119,13 @@ table_suffix: _${type} // CREATE TABLE IF NOT EXISTS "d_table_http" ( // ... ignore // ) - // ENGINE=mito + // ENGINE=mito // WITH( // append_mode = 'true', + // skip_wal = 'true', // ttl = '1day' // ) - let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n ttl = '1day'\\n)\"]]"; + let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]"; validate_data( "test_pipeline_context_http", &client, @@ -2202,6 +2205,73 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_with_hint_vrl(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(storage_type, "test_pipeline_with_hint_vrl").await; + + // handshake + let client = TestClient::new(app).await; + + let pipeline = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + - vrl: + source: | + .greptime_table_suffix, err = "_" + .id + . + +transform: + - fields: + - id + type: int32 + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/root") + .header("Content-Type", "application/x-yaml") + .body(pipeline) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "id": "2436", + "time": "2024-05-25 20:16:37.217" + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=d_table&pipeline_name=root") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + validate_data( + "test_pipeline_with_hint_vrl", + &client, + "show tables", + "[[\"d_table_2436\"],[\"demo\"],[\"numbers\"]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =