feat: pipeline with insert options (#6192)

* feat: pipeline recognize hints from exec

* chore: rename and add test

* chore: minor improve

* chore: rename and add comments

* fix: typos

* chore: remove unnecessory clone fn

* chore: group metrics

* chore: use struct in transform output enum

* chore: update hint prefix
This commit is contained in:
shuiyisong
2025-06-04 02:46:48 +08:00
committed by GitHub
parent 38cac301f2
commit 69975f1f71
19 changed files with 544 additions and 186 deletions

View File

@@ -125,7 +125,7 @@ impl OpenTelemetryProtocolHandler for Instance {
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
) -> ServerResult<Vec<Output>> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
@@ -137,7 +137,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(
let opt_req = otlp::logs::to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
@@ -148,7 +148,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.await?;
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
let result = limiter.limit_ctx_req(&opt_req);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
@@ -157,10 +157,24 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
let mut outputs = vec![];
for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
let cnt = requests
.inserts
.iter()
.filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
.sum::<usize>();
let o = self
.handle_log_inserts(requests, temp_ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
outputs.push(o);
}
Ok(outputs)
}
}

View File

@@ -18,8 +18,11 @@ use std::sync::Arc;
use api::v1::column::Values;
use api::v1::greptime_request::Request;
use api::v1::value::ValueData;
use api::v1::{Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequests};
use api::v1::{
Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequest, RowInsertRequests,
};
use common_telemetry::{debug, warn};
use pipeline::ContextReq;
pub(crate) type LimiterRef = Arc<Limiter>;
@@ -75,7 +78,9 @@ impl Limiter {
pub fn limit_request(&self, request: &Request) -> Option<InFlightWriteBytesCounter> {
let size = match request {
Request::Inserts(requests) => self.insert_requests_data_size(requests),
Request::RowInserts(requests) => self.rows_insert_requests_data_size(requests),
Request::RowInserts(requests) => {
self.rows_insert_requests_data_size(requests.inserts.iter())
}
_ => 0,
};
self.limit_in_flight_write_bytes(size as u64)
@@ -85,7 +90,12 @@ impl Limiter {
&self,
requests: &RowInsertRequests,
) -> Option<InFlightWriteBytesCounter> {
let size = self.rows_insert_requests_data_size(requests);
let size = self.rows_insert_requests_data_size(requests.inserts.iter());
self.limit_in_flight_write_bytes(size as u64)
}
pub fn limit_ctx_req(&self, opt_req: &ContextReq) -> Option<InFlightWriteBytesCounter> {
let size = self.rows_insert_requests_data_size(opt_req.ref_all_req());
self.limit_in_flight_write_bytes(size as u64)
}
@@ -137,9 +147,12 @@ impl Limiter {
size
}
fn rows_insert_requests_data_size(&self, request: &RowInsertRequests) -> usize {
fn rows_insert_requests_data_size<'a>(
&self,
inserts: impl Iterator<Item = &'a RowInsertRequest>,
) -> usize {
let mut size: usize = 0;
for insert in &request.inserts {
for insert in inserts {
if let Some(rows) = &insert.rows {
for row in &rows.rows {
for value in &row.values {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![allow(dead_code)]
pub mod ctx_req;
pub mod field;
pub mod processor;
pub mod transform;
@@ -153,21 +154,39 @@ impl DispatchedTo {
/// The result of pipeline execution
#[derive(Debug)]
pub enum PipelineExecOutput {
Transformed((Row, Option<String>)),
// table_suffix, ts_key -> unit
AutoTransform(Option<String>, HashMap<String, TimeUnit>),
Transformed(TransformedOutput),
AutoTransform(AutoTransformOutput),
DispatchedTo(DispatchedTo),
}
#[derive(Debug)]
pub struct TransformedOutput {
pub opt: String,
pub row: Row,
pub table_suffix: Option<String>,
}
#[derive(Debug)]
pub struct AutoTransformOutput {
pub table_suffix: Option<String>,
// ts_column_name -> unit
pub ts_unit_map: HashMap<String, TimeUnit>,
}
impl PipelineExecOutput {
// Note: This is a test only function, do not use it in production.
pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
if let Self::Transformed(o) = self {
Some(o)
if let Self::Transformed(TransformedOutput {
row, table_suffix, ..
}) = self
{
Some((row, table_suffix))
} else {
None
}
}
// Note: This is a test only function, do not use it in production.
pub fn into_dispatched(self) -> Option<DispatchedTo> {
if let Self::DispatchedTo(d) = self {
Some(d)
@@ -224,9 +243,13 @@ impl Pipeline {
}
if let Some(transformer) = self.transformer() {
let row = transformer.transform_mut(val)?;
let (opt, row) = transformer.transform_mut(val)?;
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
Ok(PipelineExecOutput::Transformed((row, table_suffix)))
Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
}))
} else {
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
let mut ts_unit_map = HashMap::with_capacity(4);
@@ -238,7 +261,10 @@ impl Pipeline {
}
}
}
Ok(PipelineExecOutput::AutoTransform(table_suffix, ts_unit_map))
Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
table_suffix,
ts_unit_map,
}))
}
}

View File

@@ -0,0 +1,153 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::IntoIter;
use std::collections::BTreeMap;
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use itertools::Itertools;
use session::context::{QueryContext, QueryContextRef};
use crate::PipelineMap;
const DEFAULT_OPT: &str = "";
pub const PIPELINE_HINT_KEYS: [&str; 6] = [
"greptime_auto_create_table",
"greptime_ttl",
"greptime_append_mode",
"greptime_merge_mode",
"greptime_physical_table",
"greptime_skip_wal",
];
const PIPELINE_HINT_PREFIX: &str = "greptime_";
// Remove hints from the pipeline context and form a option string
// e.g: skip_wal=true,ttl=1d
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> String {
let mut btreemap = BTreeMap::new();
for k in PIPELINE_HINT_KEYS {
if let Some(v) = pipeline_map.remove(k) {
btreemap.insert(k, v.to_str_value());
}
}
btreemap
.into_iter()
.map(|(k, v)| format!("{}={}", k.replace(PIPELINE_HINT_PREFIX, ""), v))
.join(",")
}
// split the option string back to a map
fn from_opt_to_map(opt: &str) -> HashMap<&str, &str> {
opt.split(',')
.filter_map(|s| {
s.split_once("=")
.filter(|(k, v)| !k.is_empty() && !v.is_empty())
})
.collect()
}
// ContextReq is a collection of row insert requests with different options.
// The default option is empty string.
// Because options are set in query context, we have to split them into sequential calls
// e.g:
// {
// "skip_wal=true,ttl=1d": [RowInsertRequest],
// "ttl=1d": [RowInsertRequest],
// }
#[derive(Debug, Default)]
pub struct ContextReq {
req: HashMap<String, Vec<RowInsertRequest>>,
}
impl ContextReq {
pub fn from_opt_map(opt_map: HashMap<String, Rows>, table_name: String) -> Self {
Self {
req: opt_map
.into_iter()
.map(|(opt, rows)| {
(
opt,
vec![RowInsertRequest {
table_name: table_name.clone(),
rows: Some(rows),
}],
)
})
.collect::<HashMap<String, Vec<RowInsertRequest>>>(),
}
}
pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
let mut req_map = HashMap::new();
req_map.insert(DEFAULT_OPT.to_string(), reqs);
Self { req: req_map }
}
pub fn add_rows(&mut self, opt: String, req: RowInsertRequest) {
self.req.entry(opt).or_default().push(req);
}
pub fn merge(&mut self, other: Self) {
for (opt, req) in other.req {
self.req.entry(opt).or_default().extend(req);
}
}
pub fn as_req_iter(self, ctx: QueryContextRef) -> ContextReqIter {
let ctx = (*ctx).clone();
ContextReqIter {
opt_req: self.req.into_iter(),
ctx_template: ctx,
}
}
pub fn all_req(self) -> impl Iterator<Item = RowInsertRequest> {
self.req.into_iter().flat_map(|(_, req)| req)
}
pub fn ref_all_req(&self) -> impl Iterator<Item = &RowInsertRequest> {
self.req.values().flatten()
}
}
// ContextReqIter is an iterator that iterates over the ContextReq.
// The context template is cloned from the original query context.
// It will clone the query context for each option and set the options to the context.
// Then it will return the context and the row insert requests for actual insert.
pub struct ContextReqIter {
opt_req: IntoIter<String, Vec<RowInsertRequest>>,
ctx_template: QueryContext,
}
impl Iterator for ContextReqIter {
type Item = (QueryContextRef, RowInsertRequests);
fn next(&mut self) -> Option<Self::Item> {
let (opt, req_vec) = self.opt_req.next()?;
let opt_map = from_opt_to_map(&opt);
let mut ctx = self.ctx_template.clone();
for (k, v) in opt_map {
ctx.set_extension(k, v);
}
Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
}
}

View File

@@ -40,7 +40,7 @@ use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transforms};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
use crate::PipelineContext;
use crate::{from_pipeline_map_to_opt, PipelineContext};
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
@@ -185,13 +185,15 @@ impl GreptimeTransformer {
}
}
pub fn transform_mut(&self, val: &mut PipelineMap) -> Result<Row> {
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(String, Row)> {
let opt = from_pipeline_map_to_opt(pipeline_map);
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
let mut output_index = 0;
for transform in self.transforms.iter() {
for field in transform.fields.iter() {
let index = field.input_field();
match val.get(index) {
match pipeline_map.get(index) {
Some(v) => {
let value_data = coerce_value(v, transform)?;
// every transform fields has only one output field
@@ -217,7 +219,7 @@ impl GreptimeTransformer {
output_index += 1;
}
}
Ok(Row { values })
Ok((opt, Row { values }))
}
pub fn transforms(&self) -> &Transforms {
@@ -517,8 +519,7 @@ fn resolve_value(
fn identity_pipeline_inner(
pipeline_maps: Vec<PipelineMap>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<(SchemaInfo, Vec<Row>)> {
let mut rows = Vec::with_capacity(pipeline_maps.len());
) -> Result<(SchemaInfo, HashMap<String, Vec<Row>>)> {
let mut schema_info = SchemaInfo::default();
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
@@ -539,20 +540,30 @@ fn identity_pipeline_inner(
options: None,
});
for values in pipeline_maps {
let row = values_to_row(&mut schema_info, values, pipeline_ctx)?;
rows.push(row);
let mut opt_map = HashMap::new();
let len = pipeline_maps.len();
for mut pipeline_map in pipeline_maps {
let opt = from_pipeline_map_to_opt(&mut pipeline_map);
let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
opt_map
.entry(opt)
.or_insert_with(|| Vec::with_capacity(len))
.push(row);
}
let column_count = schema_info.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
for (_, row) in opt_map.iter_mut() {
for row in row.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
}
}
Ok((schema_info, rows))
Ok((schema_info, opt_map))
}
/// Identity pipeline for Greptime
@@ -567,7 +578,7 @@ pub fn identity_pipeline(
array: Vec<PipelineMap>,
table: Option<Arc<table::Table>>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<Rows> {
) -> Result<HashMap<String, Rows>> {
let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
array
.into_iter()
@@ -577,7 +588,7 @@ pub fn identity_pipeline(
array
};
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, rows)| {
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
if let Some(table) = table {
let table_info = table.table_info();
for tag_name in table_info.meta.row_key_column_names() {
@@ -586,10 +597,19 @@ pub fn identity_pipeline(
}
}
}
Rows {
schema: schema.schema,
rows,
}
opt_map
.into_iter()
.map(|(opt, rows)| {
(
opt,
Rows {
schema: schema.schema.clone(),
rows,
},
)
})
.collect::<HashMap<String, Rows>>()
})
}
@@ -739,7 +759,9 @@ mod tests {
];
let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
assert!(rows.is_ok());
let rows = rows.unwrap();
let mut rows = rows.unwrap();
assert!(rows.len() == 1);
let rows = rows.remove("").unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
@@ -769,12 +791,16 @@ mod tests {
let tag_column_names = ["name".to_string(), "address".to_string()];
let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
.map(|(mut schema, rows)| {
.map(|(mut schema, mut rows)| {
for name in tag_column_names {
if let Some(index) = schema.index.get(&name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
}
}
assert!(rows.len() == 1);
let rows = rows.remove("").unwrap();
Rows {
schema: schema.schema,
rows,

View File

@@ -19,14 +19,16 @@ mod manager;
mod metrics;
mod tablesuffix;
pub use etl::ctx_req::{from_pipeline_map_to_opt, ContextReq};
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::GreptimeTransformer;
pub use etl::value::{Array, Map, Value};
pub use etl::{
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, Content,
DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map,
AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
TransformedOutput,
};
pub use manager::{
pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition,

View File

@@ -38,6 +38,7 @@ use common_telemetry::{debug, error, tracing, warn};
use common_time::timezone::parse_timezone;
use futures_util::StreamExt;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::mpsc;
@@ -49,7 +50,6 @@ use crate::error::{
};
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
use crate::grpc::TonicResult;
use crate::hint_headers::READ_PREFERENCE_HINT;
use crate::metrics;
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;

View File

@@ -13,23 +13,9 @@
// limitations under the License.
use http::HeaderMap;
use session::hints::{HINTS_KEY, HINTS_KEY_PREFIX, HINT_KEYS};
use tonic::metadata::MetadataMap;
// For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d`
pub const HINTS_KEY: &str = "x-greptime-hints";
pub const READ_PREFERENCE_HINT: &str = "read_preference";
const HINT_KEYS: [&str; 7] = [
"x-greptime-hint-auto_create_table",
"x-greptime-hint-ttl",
"x-greptime-hint-append_mode",
"x-greptime-hint-merge_mode",
"x-greptime-hint-physical_table",
"x-greptime-hint-skip_wal",
"x-greptime-hint-read_preference",
];
pub(crate) fn extract_hints<T: ToHeaderMap>(headers: &T) -> Vec<(String, String)> {
let mut hints = Vec::new();
if let Some(value_str) = headers.get(HINTS_KEY) {
@@ -44,7 +30,7 @@ pub(crate) fn extract_hints<T: ToHeaderMap>(headers: &T) -> Vec<(String, String)
}
for key in HINT_KEYS.iter() {
if let Some(value) = headers.get(key) {
let new_key = key.replace("x-greptime-hint-", "");
let new_key = key.replace(HINTS_KEY_PREFIX, "");
hints.push((new_key, value.trim().to_string()));
}
}

View File

@@ -18,7 +18,6 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use axum::body::Bytes;
use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
@@ -34,7 +33,9 @@ use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap};
use pipeline::{
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
@@ -345,7 +346,7 @@ async fn dryrun_pipeline_inner(
let name_key = "name";
let results = results
.into_iter()
.all_req()
.filter_map(|row| {
if let Some(rows) = row.rows {
let table_name = row.table_name;
@@ -798,7 +799,7 @@ pub(crate) async fn ingest_logs_inner(
let db = query_ctx.get_db_string();
let exec_timer = std::time::Instant::now();
let mut insert_requests = Vec::with_capacity(log_ingest_requests.len());
let mut req = ContextReq::default();
let pipeline_params = GreptimePipelineParams::from_params(
headers
@@ -811,36 +812,42 @@ pub(crate) async fn ingest_logs_inner(
let requests =
run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
insert_requests.extend(requests);
req.merge(requests);
}
let output = handler
.insert(
RowInsertRequests {
inserts: insert_requests,
},
query_ctx,
)
.await;
let mut outputs = Vec::new();
let mut total_rows: u64 = 0;
let mut fail = false;
for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
let output = handler.insert(act_req, temp_ctx).await;
if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
total_rows += *rows as u64;
} else {
fail = true;
}
outputs.push(output);
}
if total_rows > 0 {
METRIC_HTTP_LOGS_INGESTION_COUNTER
.with_label_values(&[db.as_str()])
.inc_by(*rows as u64);
.inc_by(total_rows);
METRIC_HTTP_LOGS_INGESTION_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
}
if fail {
METRIC_HTTP_LOGS_INGESTION_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}
let response = GreptimedbV1Response::from_output(vec![output])
let response = GreptimedbV1Response::from_output(outputs)
.await
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)

View File

@@ -166,7 +166,7 @@ pub async fn logs(
resp_body: ExportLogsServiceResponse {
partial_success: None,
},
write_cost: o.meta.cost,
write_cost: o.iter().map(|o| o.meta.cost).sum(),
})
}

View File

@@ -15,7 +15,6 @@
use std::sync::Arc;
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use axum::body::Bytes;
use axum::extract::{Query, State};
use axum::http::{header, HeaderValue, StatusCode};
@@ -29,7 +28,7 @@ use hyper::HeaderMap;
use lazy_static::lazy_static;
use object_pool::Pool;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineDefinition;
use pipeline::{ContextReq, PipelineDefinition};
use prost::Message;
use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext};
@@ -133,18 +132,24 @@ pub async fn remote_write(
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
}
let (request, samples) =
let req =
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
let output = prom_store_handler
.write(request, query_ctx, prom_store_with_metric_engine)
.await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
)
.into_response())
let mut cost = 0;
for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
let cnt: u64 = reqs
.inserts
.iter()
.filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
.sum();
let output = prom_store_handler
.write(reqs, temp_ctx, prom_store_with_metric_engine)
.await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(cnt);
cost += output.meta.cost;
}
Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response())
}
impl IntoResponse for PromStoreResponse {
@@ -202,7 +207,7 @@ async fn decode_remote_write_request(
body: Bytes,
prom_validation_mode: PromValidationMode,
processor: &mut PromSeriesProcessor,
) -> Result<(RowInsertRequests, usize)> {
) -> Result<ContextReq> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
// due to vmagent's limitation, there is a chance that vmagent is
@@ -227,7 +232,8 @@ async fn decode_remote_write_request(
if processor.use_pipeline {
processor.exec_pipeline().await
} else {
Ok(request.as_row_insert_requests())
let reqs = request.as_row_insert_requests();
Ok(ContextReq::default_opt_with_reqs(reqs))
}
}

View File

@@ -18,13 +18,15 @@ use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue,
RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
};
use jsonb::{Number as JsonbNumber, Value as JsonbValue};
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue};
use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo};
use pipeline::{
ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
};
use serde_json::{Map, Value};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
@@ -55,21 +57,16 @@ pub async fn to_grpc_insert_requests(
table_name: String,
query_ctx: &QueryContextRef,
pipeline_handler: PipelineHandlerRef,
) -> Result<(RowInsertRequests, usize)> {
) -> Result<ContextReq> {
match pipeline {
PipelineWay::OtlpLogDirect(select_info) => {
let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
let len = rows.rows.len();
let insert_request = RowInsertRequest {
rows: Some(rows),
table_name,
};
Ok((
RowInsertRequests {
inserts: vec![insert_request],
},
len,
))
Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
}
PipelineWay::Pipeline(pipeline_def) => {
let data = parse_export_logs_service_request(request);
@@ -77,7 +74,7 @@ pub async fn to_grpc_insert_requests(
let pipeline_ctx =
PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
let inserts = run_pipeline(
run_pipeline(
&pipeline_handler,
&pipeline_ctx,
PipelineIngestRequest {
@@ -87,20 +84,7 @@ pub async fn to_grpc_insert_requests(
query_ctx,
true,
)
.await?;
let len = inserts
.iter()
.map(|insert| {
insert
.rows
.as_ref()
.map(|rows| rows.rows.len())
.unwrap_or(0)
})
.sum();
let insert_requests = RowInsertRequests { inserts };
Ok((insert_requests, len))
.await
}
_ => NotSupportedSnafu {
feat: "Unsupported pipeline for logs",

View File

@@ -20,8 +20,9 @@ use api::v1::{RowInsertRequest, Rows};
use itertools::Itertools;
use pipeline::error::AutoTransformOneTimestampSnafu;
use pipeline::{
DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, PipelineDefinition,
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext,
PipelineDefinition, PipelineExecOutput, PipelineMap, TransformedOutput,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::{Channel, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -66,7 +67,7 @@ pub(crate) async fn run_pipeline(
pipeline_req: PipelineIngestRequest,
query_ctx: &QueryContextRef,
is_top_level: bool,
) -> Result<Vec<RowInsertRequest>> {
) -> Result<ContextReq> {
if pipeline_ctx.pipeline_definition.is_identity() {
run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
} else {
@@ -79,7 +80,7 @@ async fn run_identity_pipeline(
pipeline_ctx: &PipelineContext<'_>,
pipeline_req: PipelineIngestRequest,
query_ctx: &QueryContextRef,
) -> Result<Vec<RowInsertRequest>> {
) -> Result<ContextReq> {
let PipelineIngestRequest {
table: table_name,
values: data_array,
@@ -93,12 +94,7 @@ async fn run_identity_pipeline(
.context(CatalogSnafu)?
};
pipeline::identity_pipeline(data_array, table, pipeline_ctx)
.map(|rows| {
vec![RowInsertRequest {
rows: Some(rows),
table_name,
}]
})
.map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
.context(PipelineSnafu)
}
@@ -108,7 +104,7 @@ async fn run_custom_pipeline(
pipeline_req: PipelineIngestRequest,
query_ctx: &QueryContextRef,
is_top_level: bool,
) -> Result<Vec<RowInsertRequest>> {
) -> Result<ContextReq> {
let db = query_ctx.get_db_string();
let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
@@ -135,17 +131,24 @@ async fn run_custom_pipeline(
.context(PipelineSnafu)?;
match r {
PipelineExecOutput::Transformed((row, table_suffix)) => {
PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(transformed_map, act_table_name, row, arr_len);
push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
}
PipelineExecOutput::AutoTransform(table_suffix, ts_keys) => {
PipelineExecOutput::AutoTransform(AutoTransformOutput {
table_suffix,
ts_unit_map,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
auto_map_ts_keys
.entry(act_table_name)
.or_insert_with(HashMap::new)
.extend(ts_keys);
.extend(ts_unit_map);
}
PipelineExecOutput::DispatchedTo(dispatched_to) => {
push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len);
@@ -153,7 +156,7 @@ async fn run_custom_pipeline(
}
}
let mut results = Vec::new();
let mut results = ContextReq::default();
if let Some(s) = pipeline.schemas() {
// transformed
@@ -161,14 +164,17 @@ async fn run_custom_pipeline(
// if current pipeline generates some transformed results, build it as
// `RowInsertRequest` and append to results. If the pipeline doesn't
// have dispatch, this will be only output of the pipeline.
for (table_name, rows) in transformed_map {
results.push(RowInsertRequest {
rows: Some(Rows {
rows,
schema: s.clone(),
}),
table_name,
});
for ((opt, table_name), rows) in transformed_map {
results.add_rows(
opt,
RowInsertRequest {
rows: Some(Rows {
rows,
schema: s.clone(),
}),
table_name,
},
);
}
} else {
// auto map
@@ -205,7 +211,7 @@ async fn run_custom_pipeline(
)
.await?;
results.extend(reqs);
results.merge(reqs);
}
}
@@ -240,7 +246,7 @@ async fn run_custom_pipeline(
))
.await?;
results.extend(requests);
results.merge(requests);
}
if is_top_level {

View File

@@ -18,10 +18,7 @@ use std::string::ToString;
use ahash::HashMap;
use api::prom_store::remote::Sample;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value,
};
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use prost::DecodeError;
@@ -55,17 +52,11 @@ impl TablesBuilder {
}
/// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
pub(crate) fn as_insert_requests(&mut self) -> (RowInsertRequests, usize) {
let mut total_rows = 0;
let inserts = self
.tables
pub(crate) fn as_insert_requests(&mut self) -> Vec<RowInsertRequest> {
self.tables
.drain()
.map(|(name, mut table)| {
total_rows += table.num_rows();
table.as_row_insert_request(name)
})
.collect();
(RowInsertRequests { inserts }, total_rows)
.map(|(name, mut table)| table.as_row_insert_request(name))
.collect()
}
}
@@ -116,11 +107,6 @@ impl TableBuilder {
}
}
/// Total number of rows inside table builder.
fn num_rows(&self) -> usize {
self.rows.len()
}
/// Adds a set of labels and samples to table builder.
pub(crate) fn add_labels_and_samples(
&mut self,

View File

@@ -18,11 +18,13 @@ use std::ops::Deref;
use std::slice;
use api::prom_store::remote::Sample;
use api::v1::RowInsertRequests;
use api::v1::RowInsertRequest;
use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::debug;
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value};
use pipeline::{
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value,
};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, WireType};
use prost::DecodeError;
@@ -267,7 +269,7 @@ impl Clear for PromWriteRequest {
}
impl PromWriteRequest {
pub fn as_row_insert_requests(&mut self) -> (RowInsertRequests, usize) {
pub fn as_row_insert_requests(&mut self) -> Vec<RowInsertRequest> {
self.table_data.as_insert_requests()
}
@@ -409,9 +411,7 @@ impl PromSeriesProcessor {
Ok(())
}
pub(crate) async fn exec_pipeline(
&mut self,
) -> crate::error::Result<(RowInsertRequests, usize)> {
pub(crate) async fn exec_pipeline(&mut self) -> crate::error::Result<ContextReq> {
// prepare params
let handler = self.pipeline_handler.as_ref().context(InternalSnafu {
err_msg: "pipeline handler is not set",
@@ -425,10 +425,9 @@ impl PromSeriesProcessor {
})?;
let pipeline_ctx = PipelineContext::new(pipeline_def, &pipeline_param, query_ctx.channel());
let mut size = 0;
// run pipeline
let mut inserts = Vec::with_capacity(self.table_values.len());
let mut req = ContextReq::default();
for (table_name, pipeline_maps) in self.table_values.iter_mut() {
let pipeline_req = PipelineIngestRequest {
table: table_name.clone(),
@@ -436,16 +435,10 @@ impl PromSeriesProcessor {
};
let row_req =
run_pipeline(handler, &pipeline_ctx, pipeline_req, query_ctx, true).await?;
size += row_req
.iter()
.map(|rq| rq.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
.sum::<usize>();
inserts.extend(row_req);
req.merge(row_req);
}
let row_insert_requests = RowInsertRequests { inserts };
Ok((row_insert_requests, size))
Ok(req)
}
}
@@ -489,7 +482,13 @@ mod tests {
prom_write_request
.merge(data.clone(), PromValidationMode::Strict, &mut p)
.unwrap();
let (prom_rows, samples) = prom_write_request.as_row_insert_requests();
let req = prom_write_request.as_row_insert_requests();
let samples = req
.iter()
.filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
.sum::<usize>();
let prom_rows = RowInsertRequests { inserts: req };
assert_eq!(expected_samples, samples);
assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len());

View File

@@ -122,7 +122,7 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler {
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> Result<Output>;
) -> Result<Vec<Output>>;
}
/// PipelineHandler is responsible for handling pipeline related requests.

29
src/session/src/hints.rs Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d`
pub const HINTS_KEY: &str = "x-greptime-hints";
pub const HINTS_KEY_PREFIX: &str = "x-greptime-hint-";
pub const READ_PREFERENCE_HINT: &str = "read_preference";
pub const HINT_KEYS: [&str; 7] = [
"x-greptime-hint-auto_create_table",
"x-greptime-hint-ttl",
"x-greptime-hint-append_mode",
"x-greptime-hint-merge_mode",
"x-greptime-hint-physical_table",
"x-greptime-hint-skip_wal",
"x-greptime-hint-read_preference",
];

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod context;
pub mod hints;
pub mod session_config;
pub mod table_name;

View File

@@ -104,6 +104,7 @@ macro_rules! http_tests {
test_identity_pipeline_with_custom_ts,
test_pipeline_dispatcher,
test_pipeline_suffix_template,
test_pipeline_context,
test_otlp_metrics,
test_otlp_traces_v0,
@@ -2008,6 +2009,125 @@ table_suffix: _${type}
guard.remove_all().await;
}
pub async fn test_pipeline_context(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(storage_type, "test_pipeline_context").await;
// handshake
let client = TestClient::new(app).await;
let root_pipeline = r#"
processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
transform:
- fields:
- id1, id1_root
- id2, id2_root
type: int32
- fields:
- type
- log
- logger
type: string
- field: time
type: time
index: timestamp
table_suffix: _${type}
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/root")
.header("Content-Type", "application/x-yaml")
.body(root_pipeline)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let data_body = r#"
[
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "http",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n",
"greptime_ttl": "1d"
},
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "db",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. check table list
validate_data(
"test_pipeline_context_table_list",
&client,
"show tables",
"[[\"d_table_db\"],[\"d_table_http\"],[\"demo\"],[\"numbers\"]]",
)
.await;
// 4. check each table's data
// CREATE TABLE IF NOT EXISTS "d_table_db" (
// ... ignore
// )
// ENGINE=mito
// WITH(
// append_mode = 'true'
// )
let expected = "[[\"d_table_db\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_db\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"test_pipeline_context_db",
&client,
"show create table d_table_db",
expected,
)
.await;
// CREATE TABLE IF NOT EXISTS "d_table_http" (
// ... ignore
// )
// ENGINE=mito
// WITH(
// append_mode = 'true',
// ttl = '1day'
// )
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n ttl = '1day'\\n)\"]]";
validate_data(
"test_pipeline_context_http",
&client,
"show create table d_table_http",
expected,
)
.await;
guard.remove_all().await;
}
pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =