diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index f6332d3bdb..dfb5738e71 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -125,7 +125,7 @@ impl OpenTelemetryProtocolHandler for Instance { pipeline_params: GreptimePipelineParams, table_name: String, ctx: QueryContextRef, - ) -> ServerResult { + ) -> ServerResult> { self.plugins .get::() .as_ref() @@ -137,7 +137,7 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; - let (requests, rows) = otlp::logs::to_grpc_insert_requests( + let opt_req = otlp::logs::to_grpc_insert_requests( request, pipeline, pipeline_params, @@ -148,7 +148,7 @@ impl OpenTelemetryProtocolHandler for Instance { .await?; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&requests); + let result = limiter.limit_ctx_req(&opt_req); if result.is_none() { return InFlightWriteBytesExceededSnafu.fail(); } @@ -157,10 +157,24 @@ impl OpenTelemetryProtocolHandler for Instance { None }; - self.handle_log_inserts(requests, ctx) - .await - .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu) + let mut outputs = vec![]; + + for (temp_ctx, requests) in opt_req.as_req_iter(ctx) { + let cnt = requests + .inserts + .iter() + .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len())) + .sum::(); + + let o = self + .handle_log_inserts(requests, temp_ctx) + .await + .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64)) + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + outputs.push(o); + } + + Ok(outputs) } } diff --git a/src/frontend/src/limiter.rs b/src/frontend/src/limiter.rs index b1d3b81b36..3c09192cbe 100644 --- a/src/frontend/src/limiter.rs +++ b/src/frontend/src/limiter.rs @@ -18,8 +18,11 @@ use std::sync::Arc; use api::v1::column::Values; use api::v1::greptime_request::Request; use api::v1::value::ValueData; -use api::v1::{Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequests}; +use api::v1::{ + Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequest, RowInsertRequests, +}; use common_telemetry::{debug, warn}; +use pipeline::ContextReq; pub(crate) type LimiterRef = Arc; @@ -75,7 +78,9 @@ impl Limiter { pub fn limit_request(&self, request: &Request) -> Option { let size = match request { Request::Inserts(requests) => self.insert_requests_data_size(requests), - Request::RowInserts(requests) => self.rows_insert_requests_data_size(requests), + Request::RowInserts(requests) => { + self.rows_insert_requests_data_size(requests.inserts.iter()) + } _ => 0, }; self.limit_in_flight_write_bytes(size as u64) @@ -85,7 +90,12 @@ impl Limiter { &self, requests: &RowInsertRequests, ) -> Option { - let size = self.rows_insert_requests_data_size(requests); + let size = self.rows_insert_requests_data_size(requests.inserts.iter()); + self.limit_in_flight_write_bytes(size as u64) + } + + pub fn limit_ctx_req(&self, opt_req: &ContextReq) -> Option { + let size = self.rows_insert_requests_data_size(opt_req.ref_all_req()); self.limit_in_flight_write_bytes(size as u64) } @@ -137,9 +147,12 @@ impl Limiter { size } - fn rows_insert_requests_data_size(&self, request: &RowInsertRequests) -> usize { + fn rows_insert_requests_data_size<'a>( + &self, + inserts: impl Iterator, + ) -> usize { let mut size: usize = 0; - for insert in &request.inserts { + for insert in inserts { if let Some(rows) = &insert.rows { for row in &rows.rows { for value in &row.values { diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index db5453310f..9f13a45317 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -13,6 +13,7 @@ // limitations under the License. #![allow(dead_code)] +pub mod ctx_req; pub mod field; pub mod processor; pub mod transform; @@ -153,21 +154,39 @@ impl DispatchedTo { /// The result of pipeline execution #[derive(Debug)] pub enum PipelineExecOutput { - Transformed((Row, Option)), - // table_suffix, ts_key -> unit - AutoTransform(Option, HashMap), + Transformed(TransformedOutput), + AutoTransform(AutoTransformOutput), DispatchedTo(DispatchedTo), } +#[derive(Debug)] +pub struct TransformedOutput { + pub opt: String, + pub row: Row, + pub table_suffix: Option, +} + +#[derive(Debug)] +pub struct AutoTransformOutput { + pub table_suffix: Option, + // ts_column_name -> unit + pub ts_unit_map: HashMap, +} + impl PipelineExecOutput { + // Note: This is a test only function, do not use it in production. pub fn into_transformed(self) -> Option<(Row, Option)> { - if let Self::Transformed(o) = self { - Some(o) + if let Self::Transformed(TransformedOutput { + row, table_suffix, .. + }) = self + { + Some((row, table_suffix)) } else { None } } + // Note: This is a test only function, do not use it in production. pub fn into_dispatched(self) -> Option { if let Self::DispatchedTo(d) = self { Some(d) @@ -224,9 +243,13 @@ impl Pipeline { } if let Some(transformer) = self.transformer() { - let row = transformer.transform_mut(val)?; + let (opt, row) = transformer.transform_mut(val)?; let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); - Ok(PipelineExecOutput::Transformed((row, table_suffix))) + Ok(PipelineExecOutput::Transformed(TransformedOutput { + opt, + row, + table_suffix, + })) } else { let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); let mut ts_unit_map = HashMap::with_capacity(4); @@ -238,7 +261,10 @@ impl Pipeline { } } } - Ok(PipelineExecOutput::AutoTransform(table_suffix, ts_unit_map)) + Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput { + table_suffix, + ts_unit_map, + })) } } diff --git a/src/pipeline/src/etl/ctx_req.rs b/src/pipeline/src/etl/ctx_req.rs new file mode 100644 index 0000000000..6d846a1bcc --- /dev/null +++ b/src/pipeline/src/etl/ctx_req.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::IntoIter; +use std::collections::BTreeMap; +use std::sync::Arc; + +use ahash::{HashMap, HashMapExt}; +use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; +use itertools::Itertools; +use session::context::{QueryContext, QueryContextRef}; + +use crate::PipelineMap; + +const DEFAULT_OPT: &str = ""; + +pub const PIPELINE_HINT_KEYS: [&str; 6] = [ + "greptime_auto_create_table", + "greptime_ttl", + "greptime_append_mode", + "greptime_merge_mode", + "greptime_physical_table", + "greptime_skip_wal", +]; + +const PIPELINE_HINT_PREFIX: &str = "greptime_"; + +// Remove hints from the pipeline context and form a option string +// e.g: skip_wal=true,ttl=1d +pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> String { + let mut btreemap = BTreeMap::new(); + for k in PIPELINE_HINT_KEYS { + if let Some(v) = pipeline_map.remove(k) { + btreemap.insert(k, v.to_str_value()); + } + } + btreemap + .into_iter() + .map(|(k, v)| format!("{}={}", k.replace(PIPELINE_HINT_PREFIX, ""), v)) + .join(",") +} + +// split the option string back to a map +fn from_opt_to_map(opt: &str) -> HashMap<&str, &str> { + opt.split(',') + .filter_map(|s| { + s.split_once("=") + .filter(|(k, v)| !k.is_empty() && !v.is_empty()) + }) + .collect() +} + +// ContextReq is a collection of row insert requests with different options. +// The default option is empty string. +// Because options are set in query context, we have to split them into sequential calls +// e.g: +// { +// "skip_wal=true,ttl=1d": [RowInsertRequest], +// "ttl=1d": [RowInsertRequest], +// } +#[derive(Debug, Default)] +pub struct ContextReq { + req: HashMap>, +} + +impl ContextReq { + pub fn from_opt_map(opt_map: HashMap, table_name: String) -> Self { + Self { + req: opt_map + .into_iter() + .map(|(opt, rows)| { + ( + opt, + vec![RowInsertRequest { + table_name: table_name.clone(), + rows: Some(rows), + }], + ) + }) + .collect::>>(), + } + } + + pub fn default_opt_with_reqs(reqs: Vec) -> Self { + let mut req_map = HashMap::new(); + req_map.insert(DEFAULT_OPT.to_string(), reqs); + Self { req: req_map } + } + + pub fn add_rows(&mut self, opt: String, req: RowInsertRequest) { + self.req.entry(opt).or_default().push(req); + } + + pub fn merge(&mut self, other: Self) { + for (opt, req) in other.req { + self.req.entry(opt).or_default().extend(req); + } + } + + pub fn as_req_iter(self, ctx: QueryContextRef) -> ContextReqIter { + let ctx = (*ctx).clone(); + + ContextReqIter { + opt_req: self.req.into_iter(), + ctx_template: ctx, + } + } + + pub fn all_req(self) -> impl Iterator { + self.req.into_iter().flat_map(|(_, req)| req) + } + + pub fn ref_all_req(&self) -> impl Iterator { + self.req.values().flatten() + } +} + +// ContextReqIter is an iterator that iterates over the ContextReq. +// The context template is cloned from the original query context. +// It will clone the query context for each option and set the options to the context. +// Then it will return the context and the row insert requests for actual insert. +pub struct ContextReqIter { + opt_req: IntoIter>, + ctx_template: QueryContext, +} + +impl Iterator for ContextReqIter { + type Item = (QueryContextRef, RowInsertRequests); + + fn next(&mut self) -> Option { + let (opt, req_vec) = self.opt_req.next()?; + + let opt_map = from_opt_to_map(&opt); + + let mut ctx = self.ctx_template.clone(); + for (k, v) in opt_map { + ctx.set_extension(k, v); + } + + Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec })) + } +} diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 5178a60ec6..c1836b00e7 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -40,7 +40,7 @@ use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::etl::value::{Timestamp, Value}; use crate::etl::PipelineMap; -use crate::PipelineContext; +use crate::{from_pipeline_map_to_opt, PipelineContext}; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -185,13 +185,15 @@ impl GreptimeTransformer { } } - pub fn transform_mut(&self, val: &mut PipelineMap) -> Result { + pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(String, Row)> { + let opt = from_pipeline_map_to_opt(pipeline_map); + let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; let mut output_index = 0; for transform in self.transforms.iter() { for field in transform.fields.iter() { let index = field.input_field(); - match val.get(index) { + match pipeline_map.get(index) { Some(v) => { let value_data = coerce_value(v, transform)?; // every transform fields has only one output field @@ -217,7 +219,7 @@ impl GreptimeTransformer { output_index += 1; } } - Ok(Row { values }) + Ok((opt, Row { values })) } pub fn transforms(&self) -> &Transforms { @@ -517,8 +519,7 @@ fn resolve_value( fn identity_pipeline_inner( pipeline_maps: Vec, pipeline_ctx: &PipelineContext<'_>, -) -> Result<(SchemaInfo, Vec)> { - let mut rows = Vec::with_capacity(pipeline_maps.len()); +) -> Result<(SchemaInfo, HashMap>)> { let mut schema_info = SchemaInfo::default(); let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); @@ -539,20 +540,30 @@ fn identity_pipeline_inner( options: None, }); - for values in pipeline_maps { - let row = values_to_row(&mut schema_info, values, pipeline_ctx)?; - rows.push(row); + let mut opt_map = HashMap::new(); + let len = pipeline_maps.len(); + + for mut pipeline_map in pipeline_maps { + let opt = from_pipeline_map_to_opt(&mut pipeline_map); + let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?; + + opt_map + .entry(opt) + .or_insert_with(|| Vec::with_capacity(len)) + .push(row); } let column_count = schema_info.schema.len(); - for row in rows.iter_mut() { - let diff = column_count - row.values.len(); - for _ in 0..diff { - row.values.push(GreptimeValue { value_data: None }); + for (_, row) in opt_map.iter_mut() { + for row in row.iter_mut() { + let diff = column_count - row.values.len(); + for _ in 0..diff { + row.values.push(GreptimeValue { value_data: None }); + } } } - Ok((schema_info, rows)) + Ok((schema_info, opt_map)) } /// Identity pipeline for Greptime @@ -567,7 +578,7 @@ pub fn identity_pipeline( array: Vec, table: Option>, pipeline_ctx: &PipelineContext<'_>, -) -> Result { +) -> Result> { let input = if pipeline_ctx.pipeline_param.flatten_json_object() { array .into_iter() @@ -577,7 +588,7 @@ pub fn identity_pipeline( array }; - identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, rows)| { + identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| { if let Some(table) = table { let table_info = table.table_info(); for tag_name in table_info.meta.row_key_column_names() { @@ -586,10 +597,19 @@ pub fn identity_pipeline( } } } - Rows { - schema: schema.schema, - rows, - } + + opt_map + .into_iter() + .map(|(opt, rows)| { + ( + opt, + Rows { + schema: schema.schema.clone(), + rows, + }, + ) + }) + .collect::>() }) } @@ -739,7 +759,9 @@ mod tests { ]; let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx); assert!(rows.is_ok()); - let rows = rows.unwrap(); + let mut rows = rows.unwrap(); + assert!(rows.len() == 1); + let rows = rows.remove("").unwrap(); assert_eq!(rows.schema.len(), 8); assert_eq!(rows.rows.len(), 2); assert_eq!(8, rows.rows[0].values.len()); @@ -769,12 +791,16 @@ mod tests { let tag_column_names = ["name".to_string(), "address".to_string()]; let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx) - .map(|(mut schema, rows)| { + .map(|(mut schema, mut rows)| { for name in tag_column_names { if let Some(index) = schema.index.get(&name) { schema.schema[*index].semantic_type = SemanticType::Tag as i32; } } + + assert!(rows.len() == 1); + let rows = rows.remove("").unwrap(); + Rows { schema: schema.schema, rows, diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index d656094c2e..f9d1746498 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -19,14 +19,16 @@ mod manager; mod metrics; mod tablesuffix; +pub use etl::ctx_req::{from_pipeline_map_to_opt, ContextReq}; pub use etl::processor::Processor; pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo}; pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::GreptimeTransformer; pub use etl::value::{Array, Map, Value}; pub use etl::{ - json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, Content, - DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap, + json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, + AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap, + TransformedOutput, }; pub use manager::{ pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition, diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 018fd33e60..6924ee90a8 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -38,6 +38,7 @@ use common_telemetry::{debug, error, tracing, warn}; use common_time::timezone::parse_timezone; use futures_util::StreamExt; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; +use session::hints::READ_PREFERENCE_HINT; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use tokio::sync::mpsc; @@ -49,7 +50,6 @@ use crate::error::{ }; use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; use crate::grpc::TonicResult; -use crate::hint_headers::READ_PREFERENCE_HINT; use crate::metrics; use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; diff --git a/src/servers/src/hint_headers.rs b/src/servers/src/hint_headers.rs index 30a9fe59f8..a143911d63 100644 --- a/src/servers/src/hint_headers.rs +++ b/src/servers/src/hint_headers.rs @@ -13,23 +13,9 @@ // limitations under the License. use http::HeaderMap; +use session::hints::{HINTS_KEY, HINTS_KEY_PREFIX, HINT_KEYS}; use tonic::metadata::MetadataMap; -// For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d` -pub const HINTS_KEY: &str = "x-greptime-hints"; - -pub const READ_PREFERENCE_HINT: &str = "read_preference"; - -const HINT_KEYS: [&str; 7] = [ - "x-greptime-hint-auto_create_table", - "x-greptime-hint-ttl", - "x-greptime-hint-append_mode", - "x-greptime-hint-merge_mode", - "x-greptime-hint-physical_table", - "x-greptime-hint-skip_wal", - "x-greptime-hint-read_preference", -]; - pub(crate) fn extract_hints(headers: &T) -> Vec<(String, String)> { let mut hints = Vec::new(); if let Some(value_str) = headers.get(HINTS_KEY) { @@ -44,7 +30,7 @@ pub(crate) fn extract_hints(headers: &T) -> Vec<(String, String) } for key in HINT_KEYS.iter() { if let Some(value) = headers.get(key) { - let new_key = key.replace("x-greptime-hint-", ""); + let new_key = key.replace(HINTS_KEY_PREFIX, ""); hints.push((new_key, value.trim().to_string())); } } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 80922cf904..7db16bcda9 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -18,7 +18,6 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Instant; -use api::v1::RowInsertRequests; use async_trait::async_trait; use axum::body::Bytes; use axum::extract::{FromRequest, Multipart, Path, Query, Request, State}; @@ -34,7 +33,9 @@ use datatypes::value::column_data_to_json; use headers::ContentType; use lazy_static::lazy_static; use pipeline::util::to_pipeline_version; -use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap}; +use pipeline::{ + ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, +}; use serde::{Deserialize, Serialize}; use serde_json::{json, Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; @@ -345,7 +346,7 @@ async fn dryrun_pipeline_inner( let name_key = "name"; let results = results - .into_iter() + .all_req() .filter_map(|row| { if let Some(rows) = row.rows { let table_name = row.table_name; @@ -798,7 +799,7 @@ pub(crate) async fn ingest_logs_inner( let db = query_ctx.get_db_string(); let exec_timer = std::time::Instant::now(); - let mut insert_requests = Vec::with_capacity(log_ingest_requests.len()); + let mut req = ContextReq::default(); let pipeline_params = GreptimePipelineParams::from_params( headers @@ -811,36 +812,42 @@ pub(crate) async fn ingest_logs_inner( let requests = run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?; - insert_requests.extend(requests); + req.merge(requests); } - let output = handler - .insert( - RowInsertRequests { - inserts: insert_requests, - }, - query_ctx, - ) - .await; + let mut outputs = Vec::new(); + let mut total_rows: u64 = 0; + let mut fail = false; + for (temp_ctx, act_req) in req.as_req_iter(query_ctx) { + let output = handler.insert(act_req, temp_ctx).await; - if let Ok(Output { - data: OutputData::AffectedRows(rows), - meta: _, - }) = &output - { + if let Ok(Output { + data: OutputData::AffectedRows(rows), + meta: _, + }) = &output + { + total_rows += *rows as u64; + } else { + fail = true; + } + outputs.push(output); + } + + if total_rows > 0 { METRIC_HTTP_LOGS_INGESTION_COUNTER .with_label_values(&[db.as_str()]) - .inc_by(*rows as u64); + .inc_by(total_rows); METRIC_HTTP_LOGS_INGESTION_ELAPSED .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) .observe(exec_timer.elapsed().as_secs_f64()); - } else { + } + if fail { METRIC_HTTP_LOGS_INGESTION_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) .observe(exec_timer.elapsed().as_secs_f64()); } - let response = GreptimedbV1Response::from_output(vec![output]) + let response = GreptimedbV1Response::from_output(outputs) .await .with_execution_time(exec_timer.elapsed().as_millis() as u64); Ok(response) diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 402313fb64..95cc2113d8 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -166,7 +166,7 @@ pub async fn logs( resp_body: ExportLogsServiceResponse { partial_success: None, }, - write_cost: o.meta.cost, + write_cost: o.iter().map(|o| o.meta.cost).sum(), }) } diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index a609edb988..1c23a6722b 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use api::prom_store::remote::ReadRequest; -use api::v1::RowInsertRequests; use axum::body::Bytes; use axum::extract::{Query, State}; use axum::http::{header, HeaderValue, StatusCode}; @@ -29,7 +28,7 @@ use hyper::HeaderMap; use lazy_static::lazy_static; use object_pool::Pool; use pipeline::util::to_pipeline_version; -use pipeline::PipelineDefinition; +use pipeline::{ContextReq, PipelineDefinition}; use prost::Message; use serde::{Deserialize, Serialize}; use session::context::{Channel, QueryContext}; @@ -133,18 +132,24 @@ pub async fn remote_write( processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def); } - let (request, samples) = + let req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?; - let output = prom_store_handler - .write(request, query_ctx, prom_store_with_metric_engine) - .await?; - crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(( - StatusCode::NO_CONTENT, - write_cost_header_map(output.meta.cost), - ) - .into_response()) + let mut cost = 0; + for (temp_ctx, reqs) in req.as_req_iter(query_ctx) { + let cnt: u64 = reqs + .inserts + .iter() + .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64)) + .sum(); + let output = prom_store_handler + .write(reqs, temp_ctx, prom_store_with_metric_engine) + .await?; + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(cnt); + cost += output.meta.cost; + } + + Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response()) } impl IntoResponse for PromStoreResponse { @@ -202,7 +207,7 @@ async fn decode_remote_write_request( body: Bytes, prom_validation_mode: PromValidationMode, processor: &mut PromSeriesProcessor, -) -> Result<(RowInsertRequests, usize)> { +) -> Result { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); // due to vmagent's limitation, there is a chance that vmagent is @@ -227,7 +232,8 @@ async fn decode_remote_write_request( if processor.use_pipeline { processor.exec_pipeline().await } else { - Ok(request.as_row_insert_requests()) + let reqs = request.as_row_insert_requests(); + Ok(ContextReq::default_opt_with_reqs(reqs)) } } diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 04f2d5665d..59d2230934 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -18,13 +18,15 @@ use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; use api::v1::{ ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row, - RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue, + RowInsertRequest, Rows, SemanticType, Value as GreptimeValue, }; use jsonb::{Number as JsonbNumber, Value as JsonbValue}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; -use pipeline::{GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo}; +use pipeline::{ + ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo, +}; use serde_json::{Map, Value}; use session::context::QueryContextRef; use snafu::{ensure, ResultExt}; @@ -55,21 +57,16 @@ pub async fn to_grpc_insert_requests( table_name: String, query_ctx: &QueryContextRef, pipeline_handler: PipelineHandlerRef, -) -> Result<(RowInsertRequests, usize)> { +) -> Result { match pipeline { PipelineWay::OtlpLogDirect(select_info) => { let rows = parse_export_logs_service_request_to_rows(request, select_info)?; - let len = rows.rows.len(); let insert_request = RowInsertRequest { rows: Some(rows), table_name, }; - Ok(( - RowInsertRequests { - inserts: vec![insert_request], - }, - len, - )) + + Ok(ContextReq::default_opt_with_reqs(vec![insert_request])) } PipelineWay::Pipeline(pipeline_def) => { let data = parse_export_logs_service_request(request); @@ -77,7 +74,7 @@ pub async fn to_grpc_insert_requests( let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel()); - let inserts = run_pipeline( + run_pipeline( &pipeline_handler, &pipeline_ctx, PipelineIngestRequest { @@ -87,20 +84,7 @@ pub async fn to_grpc_insert_requests( query_ctx, true, ) - .await?; - let len = inserts - .iter() - .map(|insert| { - insert - .rows - .as_ref() - .map(|rows| rows.rows.len()) - .unwrap_or(0) - }) - .sum(); - - let insert_requests = RowInsertRequests { inserts }; - Ok((insert_requests, len)) + .await } _ => NotSupportedSnafu { feat: "Unsupported pipeline for logs", diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index c16b20b549..8e8cb7518b 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -20,8 +20,9 @@ use api::v1::{RowInsertRequest, Rows}; use itertools::Itertools; use pipeline::error::AutoTransformOneTimestampSnafu; use pipeline::{ - DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, PipelineDefinition, - PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, + PipelineDefinition, PipelineExecOutput, PipelineMap, TransformedOutput, + GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::{Channel, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -66,7 +67,7 @@ pub(crate) async fn run_pipeline( pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, is_top_level: bool, -) -> Result> { +) -> Result { if pipeline_ctx.pipeline_definition.is_identity() { run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await } else { @@ -79,7 +80,7 @@ async fn run_identity_pipeline( pipeline_ctx: &PipelineContext<'_>, pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, -) -> Result> { +) -> Result { let PipelineIngestRequest { table: table_name, values: data_array, @@ -93,12 +94,7 @@ async fn run_identity_pipeline( .context(CatalogSnafu)? }; pipeline::identity_pipeline(data_array, table, pipeline_ctx) - .map(|rows| { - vec![RowInsertRequest { - rows: Some(rows), - table_name, - }] - }) + .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name)) .context(PipelineSnafu) } @@ -108,7 +104,7 @@ async fn run_custom_pipeline( pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, is_top_level: bool, -) -> Result> { +) -> Result { let db = query_ctx.get_db_string(); let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?; @@ -135,17 +131,24 @@ async fn run_custom_pipeline( .context(PipelineSnafu)?; match r { - PipelineExecOutput::Transformed((row, table_suffix)) => { + PipelineExecOutput::Transformed(TransformedOutput { + opt, + row, + table_suffix, + }) => { let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); - push_to_map!(transformed_map, act_table_name, row, arr_len); + push_to_map!(transformed_map, (opt, act_table_name), row, arr_len); } - PipelineExecOutput::AutoTransform(table_suffix, ts_keys) => { + PipelineExecOutput::AutoTransform(AutoTransformOutput { + table_suffix, + ts_unit_map, + }) => { let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len); auto_map_ts_keys .entry(act_table_name) .or_insert_with(HashMap::new) - .extend(ts_keys); + .extend(ts_unit_map); } PipelineExecOutput::DispatchedTo(dispatched_to) => { push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len); @@ -153,7 +156,7 @@ async fn run_custom_pipeline( } } - let mut results = Vec::new(); + let mut results = ContextReq::default(); if let Some(s) = pipeline.schemas() { // transformed @@ -161,14 +164,17 @@ async fn run_custom_pipeline( // if current pipeline generates some transformed results, build it as // `RowInsertRequest` and append to results. If the pipeline doesn't // have dispatch, this will be only output of the pipeline. - for (table_name, rows) in transformed_map { - results.push(RowInsertRequest { - rows: Some(Rows { - rows, - schema: s.clone(), - }), - table_name, - }); + for ((opt, table_name), rows) in transformed_map { + results.add_rows( + opt, + RowInsertRequest { + rows: Some(Rows { + rows, + schema: s.clone(), + }), + table_name, + }, + ); } } else { // auto map @@ -205,7 +211,7 @@ async fn run_custom_pipeline( ) .await?; - results.extend(reqs); + results.merge(reqs); } } @@ -240,7 +246,7 @@ async fn run_custom_pipeline( )) .await?; - results.extend(requests); + results.merge(requests); } if is_top_level { diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 008440b224..27e4cdf661 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -18,10 +18,7 @@ use std::string::ToString; use ahash::HashMap; use api::prom_store::remote::Sample; use api::v1::value::ValueData; -use api::v1::{ - ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, - Value, -}; +use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use prost::DecodeError; @@ -55,17 +52,11 @@ impl TablesBuilder { } /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states. - pub(crate) fn as_insert_requests(&mut self) -> (RowInsertRequests, usize) { - let mut total_rows = 0; - let inserts = self - .tables + pub(crate) fn as_insert_requests(&mut self) -> Vec { + self.tables .drain() - .map(|(name, mut table)| { - total_rows += table.num_rows(); - table.as_row_insert_request(name) - }) - .collect(); - (RowInsertRequests { inserts }, total_rows) + .map(|(name, mut table)| table.as_row_insert_request(name)) + .collect() } } @@ -116,11 +107,6 @@ impl TableBuilder { } } - /// Total number of rows inside table builder. - fn num_rows(&self) -> usize { - self.rows.len() - } - /// Adds a set of labels and samples to table builder. pub(crate) fn add_labels_and_samples( &mut self, diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 210f45e849..0a8ec5271b 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -18,11 +18,13 @@ use std::ops::Deref; use std::slice; use api::prom_store::remote::Sample; -use api::v1::RowInsertRequests; +use api::v1::RowInsertRequest; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_telemetry::debug; -use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value}; +use pipeline::{ + ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value, +}; use prost::encoding::message::merge; use prost::encoding::{decode_key, decode_varint, WireType}; use prost::DecodeError; @@ -267,7 +269,7 @@ impl Clear for PromWriteRequest { } impl PromWriteRequest { - pub fn as_row_insert_requests(&mut self) -> (RowInsertRequests, usize) { + pub fn as_row_insert_requests(&mut self) -> Vec { self.table_data.as_insert_requests() } @@ -409,9 +411,7 @@ impl PromSeriesProcessor { Ok(()) } - pub(crate) async fn exec_pipeline( - &mut self, - ) -> crate::error::Result<(RowInsertRequests, usize)> { + pub(crate) async fn exec_pipeline(&mut self) -> crate::error::Result { // prepare params let handler = self.pipeline_handler.as_ref().context(InternalSnafu { err_msg: "pipeline handler is not set", @@ -425,10 +425,9 @@ impl PromSeriesProcessor { })?; let pipeline_ctx = PipelineContext::new(pipeline_def, &pipeline_param, query_ctx.channel()); - let mut size = 0; // run pipeline - let mut inserts = Vec::with_capacity(self.table_values.len()); + let mut req = ContextReq::default(); for (table_name, pipeline_maps) in self.table_values.iter_mut() { let pipeline_req = PipelineIngestRequest { table: table_name.clone(), @@ -436,16 +435,10 @@ impl PromSeriesProcessor { }; let row_req = run_pipeline(handler, &pipeline_ctx, pipeline_req, query_ctx, true).await?; - size += row_req - .iter() - .map(|rq| rq.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0)) - .sum::(); - inserts.extend(row_req); + req.merge(row_req); } - let row_insert_requests = RowInsertRequests { inserts }; - - Ok((row_insert_requests, size)) + Ok(req) } } @@ -489,7 +482,13 @@ mod tests { prom_write_request .merge(data.clone(), PromValidationMode::Strict, &mut p) .unwrap(); - let (prom_rows, samples) = prom_write_request.as_row_insert_requests(); + + let req = prom_write_request.as_row_insert_requests(); + let samples = req + .iter() + .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len())) + .sum::(); + let prom_rows = RowInsertRequests { inserts: req }; assert_eq!(expected_samples, samples); assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 30fd360455..2e365c9b47 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -122,7 +122,7 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler { pipeline_params: GreptimePipelineParams, table_name: String, ctx: QueryContextRef, - ) -> Result; + ) -> Result>; } /// PipelineHandler is responsible for handling pipeline related requests. diff --git a/src/session/src/hints.rs b/src/session/src/hints.rs new file mode 100644 index 0000000000..4c0f359b84 --- /dev/null +++ b/src/session/src/hints.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d` +pub const HINTS_KEY: &str = "x-greptime-hints"; +pub const HINTS_KEY_PREFIX: &str = "x-greptime-hint-"; + +pub const READ_PREFERENCE_HINT: &str = "read_preference"; + +pub const HINT_KEYS: [&str; 7] = [ + "x-greptime-hint-auto_create_table", + "x-greptime-hint-ttl", + "x-greptime-hint-append_mode", + "x-greptime-hint-merge_mode", + "x-greptime-hint-physical_table", + "x-greptime-hint-skip_wal", + "x-greptime-hint-read_preference", +]; diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index fa78699774..ac0d628448 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod context; +pub mod hints; pub mod session_config; pub mod table_name; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8cb8575fa7..5159a6ca97 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -104,6 +104,7 @@ macro_rules! http_tests { test_identity_pipeline_with_custom_ts, test_pipeline_dispatcher, test_pipeline_suffix_template, + test_pipeline_context, test_otlp_metrics, test_otlp_traces_v0, @@ -2008,6 +2009,125 @@ table_suffix: _${type} guard.remove_all().await; } +pub async fn test_pipeline_context(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(storage_type, "test_pipeline_context").await; + + // handshake + let client = TestClient::new(app).await; + + let root_pipeline = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + +transform: + - fields: + - id1, id1_root + - id2, id2_root + type: int32 + - fields: + - type + - log + - logger + type: string + - field: time + type: time + index: timestamp +table_suffix: _${type} +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/root") + .header("Content-Type", "application/x-yaml") + .body(root_pipeline) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "http", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n", + "greptime_ttl": "1d" + }, + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "db", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=d_table&pipeline_name=root") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. check table list + validate_data( + "test_pipeline_context_table_list", + &client, + "show tables", + "[[\"d_table_db\"],[\"d_table_http\"],[\"demo\"],[\"numbers\"]]", + ) + .await; + + // 4. check each table's data + // CREATE TABLE IF NOT EXISTS "d_table_db" ( + // ... ignore + // ) + // ENGINE=mito + // WITH( + // append_mode = 'true' + // ) + let expected = "[[\"d_table_db\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_db\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + + validate_data( + "test_pipeline_context_db", + &client, + "show create table d_table_db", + expected, + ) + .await; + + // CREATE TABLE IF NOT EXISTS "d_table_http" ( + // ... ignore + // ) + // ENGINE=mito + // WITH( + // append_mode = 'true', + // ttl = '1day' + // ) + let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n ttl = '1day'\\n)\"]]"; + validate_data( + "test_pipeline_context_http", + &client, + "show create table d_table_http", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =