chore: support table suffix in hint (#6223)

* feat: pipeline recognize hints from exec

* chore: rename and add test

* chore: minor improve

* chore: rename and add comments

* fix: typos

* feat: add initial impl for vrl processor

* chore: update processors to allow vrl process

* feat: pipeline recognize hints from exec

* chore: rename and add test

* chore: minor improve

* chore: rename and add comments

* fix: typos

* chore: remove unnecessory clone fn

* chore: group metrics

* chore: use struct in transform output enum

* test: add test for vrl

* fix: leaked conflicts

* chore: merge branch code & add check in compile

* fix: check condition

* fix: check auto-transform timeindex

* chore: support table_suffix in hint

* chore: add test for table suffix in vrl hint

* refactor: change context_opt to a struct
This commit is contained in:
shuiyisong
2025-06-06 13:39:10 -07:00
committed by GitHub
parent a2b3ad77df
commit 538b5abaae
11 changed files with 247 additions and 99 deletions

View File

@@ -20,10 +20,10 @@ use crate::error::{
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
use crate::{PipelineMap, Value};
const FIELD: &str = "field";
const TABLE_SUFFIX: &str = "table_suffix";
const PIPELINE: &str = "pipeline";
const VALUE: &str = "value";
const RULES: &str = "rules";
@@ -80,7 +80,7 @@ impl TryFrom<&Yaml> for Dispatcher {
rules
.iter()
.map(|rule| {
let table_part = rule[TABLE_SUFFIX]
let table_part = rule[TABLE_SUFFIX_KEY]
.as_str()
.map(|s| s.to_string())
.context(TableSuffixRequiredForDispatcherRuleSnafu)?;

View File

@@ -411,13 +411,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"At least one timestamp-related processor is required to use auto transform"
))]
TransformNoTimestampProcessor {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Illegal to set multiple timestamp Index columns, please set only one: {columns}"
))]
@@ -433,7 +426,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Exactly one timestamp value is required to use auto transform"))]
#[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform"))]
AutoTransformOneTimestamp {
#[snafu(implicit)]
location: Location,
@@ -880,7 +873,6 @@ impl ErrorExt for Error {
| TransformTypeMustBeSet { .. }
| TransformColumnNameMustBeUnique { .. }
| TransformMultipleTimestampIndex { .. }
| TransformNoTimestampProcessor { .. }
| TransformTimestampIndexCount { .. }
| AutoTransformOneTimestamp { .. }
| CoerceUnsupportedNullType { .. }

View File

@@ -30,12 +30,13 @@ use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::error::{
InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
YamlLoadSnafu, YamlParseSnafu,
};
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
use crate::etl::processor::ProcessorKind;
use crate::tablesuffix::TableSuffixTemplate;
use crate::GreptimeTransformer;
use crate::{ContextOpt, GreptimeTransformer};
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
@@ -80,16 +81,14 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
// check processors have at least one timestamp-related processor
let cnt = processors
.iter()
.filter(|p| {
matches!(
p,
ProcessorKind::Date(_)
| ProcessorKind::Timestamp(_)
| ProcessorKind::Epoch(_)
)
.filter_map(|p| match p {
ProcessorKind::Date(d) => Some(d.target_count()),
ProcessorKind::Timestamp(t) => Some(t.target_count()),
ProcessorKind::Epoch(e) => Some(e.target_count()),
_ => None,
})
.count();
ensure!(cnt > 0, TransformNoTimestampProcessorSnafu);
.sum::<usize>();
ensure!(cnt == 1, AutoTransformOneTimestampSnafu);
None
} else {
Some(GreptimeTransformer::new(transformers)?)
@@ -161,7 +160,7 @@ pub enum PipelineExecOutput {
#[derive(Debug)]
pub struct TransformedOutput {
pub opt: String,
pub opt: ContextOpt,
pub row: Row,
pub table_suffix: Option<String>,
pub pipeline_map: PipelineMap,
@@ -244,9 +243,11 @@ impl Pipeline {
return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
}
// do transform
if let Some(transformer) = self.transformer() {
let (opt, row) = transformer.transform_mut(&mut val)?;
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val));
let (mut opt, row) = transformer.transform_mut(&mut val)?;
let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
@@ -254,7 +255,12 @@ impl Pipeline {
pipeline_map: val,
}))
} else {
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val));
// check table suffix var
let table_suffix = val
.remove(TABLE_SUFFIX_KEY)
.map(|f| f.to_str_value())
.or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val)));
let mut ts_unit_map = HashMap::with_capacity(4);
// get all ts values
for (k, v) in val.iter() {

View File

@@ -13,69 +13,145 @@
// limitations under the License.
use std::collections::hash_map::IntoIter;
use std::collections::BTreeMap;
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use itertools::Itertools;
use session::context::{QueryContext, QueryContextRef};
use crate::tablesuffix::TableSuffixTemplate;
use crate::PipelineMap;
const DEFAULT_OPT: &str = "";
const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
const GREPTIME_TTL: &str = "greptime_ttl";
const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
pub const PIPELINE_HINT_KEYS: [&str; 6] = [
"greptime_auto_create_table",
"greptime_ttl",
"greptime_append_mode",
"greptime_merge_mode",
"greptime_physical_table",
"greptime_skip_wal",
pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
pub(crate) const TTL_KEY: &str = "ttl";
pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
pub const PIPELINE_HINT_KEYS: [&str; 7] = [
GREPTIME_AUTO_CREATE_TABLE,
GREPTIME_TTL,
GREPTIME_APPEND_MODE,
GREPTIME_MERGE_MODE,
GREPTIME_PHYSICAL_TABLE,
GREPTIME_SKIP_WAL,
GREPTIME_TABLE_SUFFIX,
];
const PIPELINE_HINT_PREFIX: &str = "greptime_";
// Remove hints from the pipeline context and form a option string
// e.g: skip_wal=true,ttl=1d
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> String {
let mut btreemap = BTreeMap::new();
for k in PIPELINE_HINT_KEYS {
if let Some(v) = pipeline_map.remove(k) {
btreemap.insert(k, v.to_str_value());
/// ContextOpt is a collection of options(including table options and pipeline options)
/// that should be extracted during the pipeline execution.
///
/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`].
/// It's is used as the key in [`ContextReq`] for grouping the row insert requests.
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ContextOpt {
// table options, that need to be set in the query context before making row insert requests
auto_create_table: Option<String>,
ttl: Option<String>,
append_mode: Option<String>,
merge_mode: Option<String>,
physical_table: Option<String>,
skip_wal: Option<String>,
// pipeline options, not set in query context
// can be removed before the end of the pipeline execution
table_suffix: Option<String>,
}
impl ContextOpt {
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self {
let mut opt = Self::default();
for k in PIPELINE_HINT_KEYS {
if let Some(v) = pipeline_map.remove(k) {
match k {
GREPTIME_AUTO_CREATE_TABLE => {
opt.auto_create_table = Some(v.to_str_value());
}
GREPTIME_TTL => {
opt.ttl = Some(v.to_str_value());
}
GREPTIME_APPEND_MODE => {
opt.append_mode = Some(v.to_str_value());
}
GREPTIME_MERGE_MODE => {
opt.merge_mode = Some(v.to_str_value());
}
GREPTIME_PHYSICAL_TABLE => {
opt.physical_table = Some(v.to_str_value());
}
GREPTIME_SKIP_WAL => {
opt.skip_wal = Some(v.to_str_value());
}
GREPTIME_TABLE_SUFFIX => {
opt.table_suffix = Some(v.to_str_value());
}
_ => {}
}
}
}
opt
}
pub(crate) fn resolve_table_suffix(
&mut self,
table_suffix: Option<&TableSuffixTemplate>,
pipeline_map: &PipelineMap,
) -> Option<String> {
self.table_suffix
.take()
.or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
}
pub fn set_query_context(self, ctx: &mut QueryContext) {
if let Some(auto_create_table) = &self.auto_create_table {
ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
}
if let Some(ttl) = &self.ttl {
ctx.set_extension(TTL_KEY, ttl);
}
if let Some(append_mode) = &self.append_mode {
ctx.set_extension(APPEND_MODE_KEY, append_mode);
}
if let Some(merge_mode) = &self.merge_mode {
ctx.set_extension(MERGE_MODE_KEY, merge_mode);
}
if let Some(physical_table) = &self.physical_table {
ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
}
if let Some(skip_wal) = &self.skip_wal {
ctx.set_extension(SKIP_WAL_KEY, skip_wal);
}
}
btreemap
.into_iter()
.map(|(k, v)| format!("{}={}", k.replace(PIPELINE_HINT_PREFIX, ""), v))
.join(",")
}
// split the option string back to a map
fn from_opt_to_map(opt: &str) -> HashMap<&str, &str> {
opt.split(',')
.filter_map(|s| {
s.split_once("=")
.filter(|(k, v)| !k.is_empty() && !v.is_empty())
})
.collect()
}
// ContextReq is a collection of row insert requests with different options.
// The default option is empty string.
// Because options are set in query context, we have to split them into sequential calls
// e.g:
// {
// "skip_wal=true,ttl=1d": [RowInsertRequest],
// "ttl=1d": [RowInsertRequest],
// }
/// ContextReq is a collection of row insert requests with different options.
/// The default option is all empty.
/// Because options are set in query context, we have to split them into sequential calls
/// The key is a [`ContextOpt`] struct for strong type.
/// e.g:
/// {
/// "skip_wal=true,ttl=1d": [RowInsertRequest],
/// "ttl=1d": [RowInsertRequest],
/// }
#[derive(Debug, Default)]
pub struct ContextReq {
req: HashMap<String, Vec<RowInsertRequest>>,
req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
}
impl ContextReq {
pub fn from_opt_map(opt_map: HashMap<String, Rows>, table_name: String) -> Self {
pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
Self {
req: opt_map
.into_iter()
@@ -88,17 +164,17 @@ impl ContextReq {
}],
)
})
.collect::<HashMap<String, Vec<RowInsertRequest>>>(),
.collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
}
}
pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
let mut req_map = HashMap::new();
req_map.insert(DEFAULT_OPT.to_string(), reqs);
req_map.insert(ContextOpt::default(), reqs);
Self { req: req_map }
}
pub fn add_rows(&mut self, opt: String, req: RowInsertRequest) {
pub fn add_rows(&mut self, opt: ContextOpt, req: RowInsertRequest) {
self.req.entry(opt).or_default().push(req);
}
@@ -131,7 +207,7 @@ impl ContextReq {
// It will clone the query context for each option and set the options to the context.
// Then it will return the context and the row insert requests for actual insert.
pub struct ContextReqIter {
opt_req: IntoIter<String, Vec<RowInsertRequest>>,
opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
ctx_template: QueryContext,
}
@@ -140,13 +216,8 @@ impl Iterator for ContextReqIter {
fn next(&mut self) -> Option<Self::Item> {
let (opt, req_vec) = self.opt_req.next()?;
let opt_map = from_opt_to_map(&opt);
let mut ctx = self.ctx_template.clone();
for (k, v) in opt_map {
ctx.set_extension(k, v);
}
opt.set_query_context(&mut ctx);
Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
}

View File

@@ -163,6 +163,10 @@ pub struct DateProcessor {
}
impl DateProcessor {
pub(crate) fn target_count(&self) -> usize {
self.fields.len()
}
fn parse(&self, val: &str) -> Result<Timestamp> {
let mut tz = Tz::UTC;
if let Some(timezone) = &self.timezone {

View File

@@ -111,6 +111,10 @@ impl EpochProcessor {
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
}
}
pub(crate) fn target_count(&self) -> usize {
self.fields.len()
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {

View File

@@ -205,6 +205,10 @@ impl TimestampProcessor {
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
}
}
pub(crate) fn target_count(&self) -> usize {
self.fields.len()
}
}
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {

View File

@@ -88,9 +88,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
type Error = Error;
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
let mut transforms = Vec::with_capacity(100);
let mut all_output_keys: Vec<String> = Vec::with_capacity(100);
let mut all_required_keys = Vec::with_capacity(100);
let mut transforms = Vec::with_capacity(32);
let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
let mut all_required_keys = Vec::with_capacity(32);
for doc in docs {
let transform_builder: Transform = doc
.as_hash()
@@ -123,15 +124,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
#[derive(Debug, Clone)]
pub struct Transform {
pub fields: Fields,
pub type_: Value,
pub default: Option<Value>,
pub index: Option<Index>,
pub tag: bool,
pub on_failure: Option<OnFailure>,
}

View File

@@ -35,12 +35,13 @@ use crate::error::{
TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
};
use crate::etl::ctx_req::ContextOpt;
use crate::etl::field::{Field, Fields};
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transforms};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
use crate::{from_pipeline_map_to_opt, PipelineContext};
use crate::PipelineContext;
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
@@ -185,8 +186,8 @@ impl GreptimeTransformer {
}
}
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(String, Row)> {
let opt = from_pipeline_map_to_opt(pipeline_map);
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
let mut output_index = 0;
@@ -519,7 +520,7 @@ fn resolve_value(
fn identity_pipeline_inner(
pipeline_maps: Vec<PipelineMap>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<(SchemaInfo, HashMap<String, Vec<Row>>)> {
) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
let mut schema_info = SchemaInfo::default();
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
@@ -544,7 +545,7 @@ fn identity_pipeline_inner(
let len = pipeline_maps.len();
for mut pipeline_map in pipeline_maps {
let opt = from_pipeline_map_to_opt(&mut pipeline_map);
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
opt_map
@@ -578,7 +579,7 @@ pub fn identity_pipeline(
array: Vec<PipelineMap>,
table: Option<Arc<table::Table>>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<HashMap<String, Rows>> {
) -> Result<HashMap<ContextOpt, Rows>> {
let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
array
.into_iter()
@@ -609,7 +610,7 @@ pub fn identity_pipeline(
},
)
})
.collect::<HashMap<String, Rows>>()
.collect::<HashMap<ContextOpt, Rows>>()
})
}
@@ -761,7 +762,7 @@ mod tests {
assert!(rows.is_ok());
let mut rows = rows.unwrap();
assert!(rows.len() == 1);
let rows = rows.remove("").unwrap();
let rows = rows.remove(&ContextOpt::default()).unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
@@ -799,7 +800,7 @@ mod tests {
}
assert!(rows.len() == 1);
let rows = rows.remove("").unwrap();
let rows = rows.remove(&ContextOpt::default()).unwrap();
Rows {
schema: schema.schema,

View File

@@ -19,7 +19,7 @@ mod manager;
mod metrics;
mod tablesuffix;
pub use etl::ctx_req::{from_pipeline_map_to_opt, ContextReq};
pub use etl::ctx_req::{ContextOpt, ContextReq};
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
pub use etl::transform::transformer::identity_pipeline;

View File

@@ -106,6 +106,7 @@ macro_rules! http_tests {
test_pipeline_suffix_template,
test_pipeline_context,
test_pipeline_with_vrl,
test_pipeline_with_hint_vrl,
test_otlp_metrics,
test_otlp_traces_v0,
@@ -2067,7 +2068,8 @@ table_suffix: _${type}
"type": "http",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n",
"greptime_ttl": "1d"
"greptime_ttl": "1d",
"greptime_skip_wal": "true"
},
{
"id1": "2436",
@@ -2117,12 +2119,13 @@ table_suffix: _${type}
// CREATE TABLE IF NOT EXISTS "d_table_http" (
// ... ignore
// )
// ENGINE=mito
// ENGINE=mito
// WITH(
// append_mode = 'true',
// skip_wal = 'true',
// ttl = '1day'
// )
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n ttl = '1day'\\n)\"]]";
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]";
validate_data(
"test_pipeline_context_http",
&client,
@@ -2202,6 +2205,73 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_with_hint_vrl(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(storage_type, "test_pipeline_with_hint_vrl").await;
// handshake
let client = TestClient::new(app).await;
let pipeline = r#"
processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
- vrl:
source: |
.greptime_table_suffix, err = "_" + .id
.
transform:
- fields:
- id
type: int32
- field: time
type: time
index: timestamp
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/root")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let data_body = r#"
[
{
"id": "2436",
"time": "2024-05-25 20:16:37.217"
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
validate_data(
"test_pipeline_with_hint_vrl",
&client,
"show tables",
"[[\"d_table_2436\"],[\"demo\"],[\"numbers\"]]",
)
.await;
guard.remove_all().await;
}
pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =