diff --git a/Cargo.lock b/Cargo.lock index a8de2387ae..e031e23632 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5921,6 +5921,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "object-pool" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee9a3e7196d09ec86002b939f1576e8e446d58def8fd48fe578e2c72d5328d68" +dependencies = [ + "parking_lot 0.11.2", +] + [[package]] name = "object-store" version = "0.6.0" @@ -9054,6 +9063,7 @@ dependencies = [ "common-test-util", "common-time", "common-version", + "criterion", "datafusion", "datafusion-common", "datafusion-expr", @@ -9073,6 +9083,7 @@ dependencies = [ "mime_guess", "mysql_async", "notify", + "object-pool", "once_cell", "openmetrics-parser", "opensrv-mysql", diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 5e21188294..22402bebff 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use api::prom_store::remote::read_request::ResponseType; use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; +use api::v1::RowInsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_catalog::format_full_table_name; @@ -174,7 +175,7 @@ impl PromStoreProtocolHandler for Instance { .get::>(); interceptor_ref.pre_write(&request, ctx.clone())?; - let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?; + let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?; if with_metric_engine { let physical_table = ctx .extension(PHYSICAL_TABLE_PARAM) @@ -197,6 +198,38 @@ impl PromStoreProtocolHandler for Instance { Ok(()) } + async fn write_fast( + &self, + request: RowInsertRequests, + ctx: QueryContextRef, + with_metric_engine: bool, + ) -> ServerResult<()> { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) + .context(AuthSnafu)?; + + if with_metric_engine { + let physical_table = ctx + .extension(PHYSICAL_TABLE_PARAM) + .unwrap_or(GREPTIME_PHYSICAL_TABLE) + .to_string(); + let _ = self + .handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } else { + let _ = self + .handle_row_inserts(request, ctx.clone()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } + Ok(()) + } + async fn read( &self, request: ReadRequest, @@ -276,7 +309,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler { ctx: QueryContextRef, _: bool, ) -> ServerResult<()> { - let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?; + let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?; self.inserter .handle_metric_row_inserts( requests, @@ -290,6 +323,15 @@ impl PromStoreProtocolHandler for ExportMetricHandler { Ok(()) } + async fn write_fast( + &self, + _request: RowInsertRequests, + _ctx: QueryContextRef, + _with_metric_engine: bool, + ) -> ServerResult<()> { + unimplemented!() + } + async fn read( &self, _request: ReadRequest, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 69f3188154..a9363a0efe 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -60,6 +60,7 @@ itertools.workspace = true lazy_static.workspace = true mime_guess = "2.0" notify = "6.1" +object-pool = "0.5" once_cell.workspace = true openmetrics-parser = "0.4" opensrv-mysql = "0.7.0" @@ -114,6 +115,7 @@ catalog = { workspace = true, features = ["testing"] } client.workspace = true common-base.workspace = true common-test-util.workspace = true +criterion = "0.4" mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } @@ -129,3 +131,7 @@ tokio-test = "0.4" [build-dependencies] common-version.workspace = true + +[[bench]] +name = "bench_prom" +harness = false diff --git a/src/servers/benches/bench_prom.rs b/src/servers/benches/bench_prom.rs new file mode 100644 index 0000000000..df052844f1 --- /dev/null +++ b/src/servers/benches/bench_prom.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::criterion_main; + +mod prom_decode; + +criterion_main! { + prom_decode::benches +} diff --git a/src/servers/benches/prom_decode.rs b/src/servers/benches/prom_decode.rs new file mode 100644 index 0000000000..4b40683cf0 --- /dev/null +++ b/src/servers/benches/prom_decode.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use api::prom_store::remote::WriteRequest; +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use prost::Message; +use servers::prom_store::to_grpc_row_insert_requests; +use servers::proto::PromWriteRequest; + +fn bench_decode_prom_request(c: &mut Criterion) { + let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + d.push("benches"); + d.push("write_request.pb.data"); + + let data = Bytes::from(std::fs::read(d).unwrap()); + + let mut request = WriteRequest::default(); + let mut prom_request = PromWriteRequest::default(); + c.benchmark_group("decode") + .measurement_time(Duration::from_secs(3)) + .bench_function("write_request", |b| { + b.iter(|| { + request.clear(); + let data = data.clone(); + request.merge(data).unwrap(); + to_grpc_row_insert_requests(&request).unwrap(); + }); + }) + .bench_function("prom_write_request", |b| { + b.iter(|| { + let data = data.clone(); + prom_request.merge(data).unwrap(); + prom_request.as_row_insert_requests(); + }); + }); +} + +criterion_group!(benches, bench_decode_prom_request); +criterion_main!(benches); diff --git a/src/servers/benches/write_request.pb.data b/src/servers/benches/write_request.pb.data new file mode 100644 index 0000000000..19219dc4ee Binary files /dev/null and b/src/servers/benches/write_request.pb.data differ diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 9188755028..5e30b47fc0 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -15,14 +15,18 @@ use std::sync::Arc; use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::v1::RowInsertRequests; use axum::extract::{Query, RawBody, State}; use axum::http::{header, StatusCode}; use axum::response::IntoResponse; use axum::Extension; +use bytes::Bytes; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_telemetry::tracing; use hyper::Body; +use lazy_static::lazy_static; +use object_pool::Pool; use prost::Message; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -31,9 +35,14 @@ use snafu::prelude::*; use crate::error::{self, Result, UnexpectedPhysicalTableSnafu}; use crate::prom_store::snappy_decompress; +use crate::proto::PromWriteRequest; use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse}; pub const PHYSICAL_TABLE_PARAM: &str = "physical_table"; +lazy_static! { + static ref PROM_WRITE_REQUEST_POOL: Pool = + Pool::new(256, PromWriteRequest::default); +} #[derive(Debug, Serialize, Deserialize, JsonSchema)] pub struct DatabaseQuery { @@ -91,14 +100,15 @@ pub async fn remote_write( .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_write_request(body).await?; + let request = decode_remote_write_request_to_row_inserts(body).await?; + if let Some(physical_table) = params.physical_table { let mut new_query_ctx = query_ctx.as_ref().clone(); new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table); query_ctx = Arc::new(new_query_ctx); } - handler.write(request, query_ctx, true).await?; + handler.write_fast(request, query_ctx, true).await?; Ok((StatusCode::NO_CONTENT, ())) } @@ -136,6 +146,23 @@ pub async fn remote_read( handler.read(request, query_ctx).await } +async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); + let body = hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu)?; + + let buf = Bytes::from(snappy_decompress(&body[..])?); + + let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); + request + .merge(buf) + .context(error::DecodePromRemoteRequestSnafu)?; + let (requests, samples) = request.as_row_insert_requests(); + crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64); + Ok(requests) +} + async fn decode_remote_write_request(body: Body) -> Result { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 4d04661c5e..efa6084eae 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -36,9 +36,13 @@ pub mod mysql; pub mod opentsdb; pub mod otlp; pub mod postgres; +mod prom_row_builder; pub mod prom_store; pub mod prometheus_handler; +pub mod proto; pub mod query_handler; +#[allow(clippy::all)] +mod repeated_field; mod row_writer; pub mod server; mod shutdown; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs new file mode 100644 index 0000000000..20a049f472 --- /dev/null +++ b/src/servers/src/prom_row_builder.rs @@ -0,0 +1,272 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::string::ToString; + +use api::prom_store::remote::Sample; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, + Value, +}; +use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; + +use crate::proto::PromLabel; +use crate::repeated_field::Clear; + +/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests]. +#[derive(Default)] +pub(crate) struct TablesBuilder { + tables: HashMap, +} + +impl Clear for TablesBuilder { + fn clear(&mut self) { + self.tables.clear(); + } +} + +impl TablesBuilder { + /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist. + pub(crate) fn get_or_create_table_builder( + &mut self, + table_name: String, + label_num: usize, + row_num: usize, + ) -> &mut TableBuilder { + self.tables + .entry(table_name) + .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num)) + } + + /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states. + pub(crate) fn as_insert_requests(&mut self) -> (RowInsertRequests, usize) { + let mut total_rows = 0; + let inserts = self + .tables + .drain() + .map(|(name, mut table)| { + total_rows += table.num_rows(); + table.as_row_insert_request(name) + }) + .collect(); + (RowInsertRequests { inserts }, total_rows) + } +} + +/// Builder for one table. +pub(crate) struct TableBuilder { + /// Column schemas. + schema: Vec, + /// Rows written. + rows: Vec, + /// Indices of columns inside `schema`. + col_indexes: HashMap, +} + +impl Default for TableBuilder { + fn default() -> Self { + Self::with_capacity(2, 0) + } +} + +impl TableBuilder { + pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self { + let mut col_indexes = HashMap::with_capacity(cols); + col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0); + col_indexes.insert(GREPTIME_VALUE.to_string(), 1); + + let mut schema = Vec::with_capacity(cols); + schema.push(ColumnSchema { + column_name: GREPTIME_TIMESTAMP.to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + datatype_extension: None, + }); + + schema.push(ColumnSchema { + column_name: GREPTIME_VALUE.to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + }); + + Self { + schema, + rows: Vec::with_capacity(rows), + col_indexes, + } + } + + /// Total number of rows inside table builder. + fn num_rows(&self) -> usize { + self.rows.len() + } + + /// Adds a set of labels and samples to table builder. + pub(crate) fn add_labels_and_samples(&mut self, labels: &[PromLabel], samples: &[Sample]) { + let mut row = vec![Value { value_data: None }; self.col_indexes.len()]; + + for PromLabel { name, value } in labels { + // safety: we expect all labels are UTF-8 encoded strings. + let tag_name = unsafe { String::from_utf8_unchecked(name.to_vec()) }; + let tag_value = unsafe { String::from_utf8_unchecked(value.to_vec()) }; + let tag_value = Some(ValueData::StringValue(tag_value)); + let tag_num = self.col_indexes.len(); + + match self.col_indexes.entry(tag_name) { + Entry::Occupied(e) => { + row[*e.get()].value_data = tag_value; + } + Entry::Vacant(e) => { + let column_name = e.key().clone(); + e.insert(tag_num); + self.schema.push(ColumnSchema { + column_name, + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + datatype_extension: None, + }); + row.push(Value { + value_data: tag_value, + }); + } + } + } + + if samples.len() == 1 { + let sample = &samples[0]; + row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp)); + row[1].value_data = Some(ValueData::F64Value(sample.value)); + self.rows.push(Row { values: row }); + return; + } + for sample in samples { + row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp)); + row[1].value_data = Some(ValueData::F64Value(sample.value)); + self.rows.push(Row { + values: row.clone(), + }); + } + } + + /// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data. + pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest { + let mut rows = std::mem::take(&mut self.rows); + let schema = std::mem::take(&mut self.schema); + let col_num = schema.len(); + for row in &mut rows { + if row.values.len() < col_num { + row.values.resize(col_num, Value { value_data: None }); + } + } + + RowInsertRequest { + table_name, + rows: Some(Rows { schema, rows }), + } + } +} + +#[cfg(test)] +mod tests { + use api::prom_store::remote::Sample; + use api::v1::value::ValueData; + use api::v1::Value; + use bytes::Bytes; + + use crate::prom_row_builder::TableBuilder; + use crate::proto::PromLabel; + #[test] + fn test_table_builder() { + let mut builder = TableBuilder::default(); + builder.add_labels_and_samples( + &[ + PromLabel { + name: Bytes::from("tag0"), + value: Bytes::from("v0"), + }, + PromLabel { + name: Bytes::from("tag1"), + value: Bytes::from("v1"), + }, + ], + &[Sample { + value: 0.0, + timestamp: 0, + }], + ); + + builder.add_labels_and_samples( + &[ + PromLabel { + name: Bytes::from("tag0"), + value: Bytes::from("v0"), + }, + PromLabel { + name: Bytes::from("tag2"), + value: Bytes::from("v2"), + }, + ], + &[Sample { + value: 0.1, + timestamp: 1, + }], + ); + + let request = builder.as_row_insert_request("test".to_string()); + let rows = request.rows.unwrap().rows; + assert_eq!(2, rows.len()); + + assert_eq!( + vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)) + }, + Value { + value_data: Some(ValueData::F64Value(0.0)) + }, + Value { + value_data: Some(ValueData::StringValue("v0".to_string())) + }, + Value { + value_data: Some(ValueData::StringValue("v1".to_string())) + }, + Value { value_data: None }, + ], + rows[0].values + ); + + assert_eq!( + vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(1)) + }, + Value { + value_data: Some(ValueData::F64Value(0.1)) + }, + Value { + value_data: Some(ValueData::StringValue("v0".to_string())) + }, + Value { value_data: None }, + Value { + value_data: Some(ValueData::StringValue("v2".to_string())) + }, + ], + rows[1].values + ); + } +} diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index f86d30781c..7553d97912 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -39,6 +39,8 @@ use crate::row_writer::{self, MultiTableData}; pub const METRIC_NAME_LABEL: &str = "__name__"; +pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__"; + /// Metrics for push gateway protocol pub struct Metrics { pub exposition: MetricsExposition, @@ -300,12 +302,12 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result<(RowInsertRequests, usize)> { +pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer(); let mut multi_table_data = MultiTableData::new(); - for series in request.timeseries { + for series in &request.timeseries { let table_name = &series .labels .iter() @@ -329,11 +331,11 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe ); // labels - let kvs = series.labels.into_iter().filter_map(|label| { + let kvs = series.labels.iter().filter_map(|label| { if label.name == METRIC_NAME_LABEL { None } else { - Some((label.name, label.value)) + Some((label.name.clone(), label.value.clone())) } }); @@ -649,7 +651,7 @@ mod tests { ..Default::default() }; - let mut exprs = to_grpc_row_insert_requests(write_request) + let mut exprs = to_grpc_row_insert_requests(&write_request) .unwrap() .0 .inserts; diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs new file mode 100644 index 0000000000..1a96cd9ed8 --- /dev/null +++ b/src/servers/src/proto.rs @@ -0,0 +1,304 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; + +use api::prom_store::remote::Sample; +use api::v1::RowInsertRequests; +use bytes::{Buf, Bytes}; +use prost::encoding::message::merge; +use prost::encoding::{decode_key, decode_varint, DecodeContext, WireType}; +use prost::DecodeError; + +use crate::prom_row_builder::TablesBuilder; +use crate::prom_store::METRIC_NAME_LABEL_BYTES; +use crate::repeated_field::{Clear, RepeatedField}; + +impl Clear for Sample { + fn clear(&mut self) {} +} + +#[derive(Default, Clone)] +pub struct PromLabel { + pub name: Bytes, + pub value: Bytes, +} + +impl Clear for PromLabel { + fn clear(&mut self) { + self.name.clear(); + self.value.clear(); + } +} + +impl PromLabel { + pub fn merge_field( + &mut self, + tag: u32, + wire_type: WireType, + buf: &mut B, + ctx: DecodeContext, + ) -> Result<(), DecodeError> + where + B: Buf, + { + const STRUCT_NAME: &str = "PromLabel"; + match tag { + 1u32 => { + // decode label name + let value = &mut self.name; + prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| { + error.push(STRUCT_NAME, "name"); + error + }) + } + 2u32 => { + // decode label value + let value = &mut self.value; + prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| { + error.push(STRUCT_NAME, "value"); + error + }) + } + _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + } + } +} + +#[derive(Default)] +pub struct PromTimeSeries { + pub table_name: String, + pub labels: RepeatedField, + pub samples: RepeatedField, +} + +impl Clear for PromTimeSeries { + fn clear(&mut self) { + self.table_name.clear(); + self.labels.clear(); + self.samples.clear(); + } +} + +impl PromTimeSeries { + pub fn merge_field( + &mut self, + tag: u32, + wire_type: WireType, + buf: &mut B, + ctx: DecodeContext, + ) -> Result<(), DecodeError> + where + B: Buf, + { + const STRUCT_NAME: &str = "PromTimeSeries"; + match tag { + 1u32 => { + // decode labels + let label = self.labels.push_default(); + + let len = decode_varint(buf).map_err(|mut error| { + error.push(STRUCT_NAME, "labels"); + error + })?; + let remaining = buf.remaining(); + if len > remaining as u64 { + return Err(DecodeError::new("buffer underflow")); + } + + let limit = remaining - len as usize; + while buf.remaining() > limit { + let (tag, wire_type) = decode_key(buf)?; + label.merge_field(tag, wire_type, buf, ctx.clone())?; + } + if buf.remaining() != limit { + return Err(DecodeError::new("delimited length exceeded")); + } + if label.name.deref() == METRIC_NAME_LABEL_BYTES { + // safety: we expect all labels are UTF-8 encoded strings. + let table_name = unsafe { String::from_utf8_unchecked(label.value.to_vec()) }; + self.table_name = table_name; + self.labels.truncate(self.labels.len() - 1); // remove last label + } + Ok(()) + } + 2u32 => { + let sample = self.samples.push_default(); + merge(WireType::LengthDelimited, sample, buf, ctx).map_err(|mut error| { + error.push(STRUCT_NAME, "samples"); + error + })?; + Ok(()) + } + // skip exemplars + 3u32 => prost::encoding::skip_field(wire_type, tag, buf, ctx), + _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + } + } + + fn add_to_table_data(&mut self, table_builders: &mut TablesBuilder) { + let label_num = self.labels.len(); + let row_num = self.samples.len(); + let table_data = table_builders.get_or_create_table_builder( + std::mem::take(&mut self.table_name), + label_num, + row_num, + ); + table_data.add_labels_and_samples(self.labels.as_slice(), self.samples.as_slice()); + self.labels.clear(); + self.samples.clear(); + } +} + +#[derive(Default)] +pub struct PromWriteRequest { + table_data: TablesBuilder, + series: PromTimeSeries, +} + +impl Clear for PromWriteRequest { + fn clear(&mut self) { + self.table_data.clear(); + } +} + +impl PromWriteRequest { + pub fn as_row_insert_requests(&mut self) -> (RowInsertRequests, usize) { + self.table_data.as_insert_requests() + } + + pub fn merge(&mut self, mut buf: B) -> Result<(), DecodeError> + where + B: Buf, + Self: Sized, + { + const STRUCT_NAME: &str = "PromWriteRequest"; + let ctx = DecodeContext::default(); + while buf.has_remaining() { + let (tag, wire_type) = decode_key(&mut buf)?; + assert_eq!(WireType::LengthDelimited, wire_type); + match tag { + 1u32 => { + // decode TimeSeries + let len = decode_varint(&mut buf).map_err(|mut e| { + e.push(STRUCT_NAME, "timeseries"); + e + })?; + let remaining = buf.remaining(); + if len > remaining as u64 { + return Err(DecodeError::new("buffer underflow")); + } + + let limit = remaining - len as usize; + while buf.remaining() > limit { + let (tag, wire_type) = decode_key(&mut buf)?; + self.series + .merge_field(tag, wire_type, &mut buf, ctx.clone())?; + } + if buf.remaining() != limit { + return Err(DecodeError::new("delimited length exceeded")); + } + self.series.add_to_table_data(&mut self.table_data); + } + 3u32 => { + // we can ignore metadata for now. + prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?; + } + _ => prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?, + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use api::prom_store::remote::WriteRequest; + use api::v1::RowInsertRequests; + use bytes::Bytes; + use prost::Message; + + use crate::prom_store::to_grpc_row_insert_requests; + use crate::proto::PromWriteRequest; + use crate::repeated_field::Clear; + + fn check_deserialized( + prom_write_request: &mut PromWriteRequest, + data: &Bytes, + expected_samples: usize, + expected_rows: &RowInsertRequests, + ) { + prom_write_request.clear(); + prom_write_request.merge(data.clone()).unwrap(); + let (prom_rows, samples) = prom_write_request.as_row_insert_requests(); + + assert_eq!(expected_samples, samples); + assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); + + let schemas = expected_rows + .inserts + .iter() + .map(|r| { + ( + r.table_name.clone(), + r.rows + .as_ref() + .unwrap() + .schema + .iter() + .map(|c| (c.column_name.clone(), c.datatype, c.semantic_type)) + .collect::>(), + ) + }) + .collect::>(); + + for r in &prom_rows.inserts { + let expected = schemas.get(&r.table_name).unwrap(); + assert_eq!( + expected, + &r.rows + .as_ref() + .unwrap() + .schema + .iter() + .map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) }) + .collect() + ); + } + } + + // Ensures `WriteRequest` and `PromWriteRequest` produce the same gRPC request. + #[test] + fn test_decode_write_request() { + let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + d.push("benches"); + d.push("write_request.pb.data"); + let data = Bytes::from(std::fs::read(d).unwrap()); + + let (expected_rows, expected_samples) = + to_grpc_row_insert_requests(&WriteRequest::decode(data.clone()).unwrap()).unwrap(); + + let mut prom_write_request = PromWriteRequest::default(); + for _ in 0..3 { + check_deserialized( + &mut prom_write_request, + &data, + expected_samples, + &expected_rows, + ); + } + } +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index d36b7418b0..347ec52456 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -29,6 +29,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; use opentelemetry_proto::tonic::collector::metrics::v1::{ @@ -95,6 +96,15 @@ pub trait PromStoreProtocolHandler { ctx: QueryContextRef, with_metric_engine: bool, ) -> Result<()>; + + /// Handling prometheus remote write requests + async fn write_fast( + &self, + request: RowInsertRequests, + ctx: QueryContextRef, + with_metric_engine: bool, + ) -> Result<()>; + /// Handling prometheus remote read requests async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result; /// Handling push gateway requests diff --git a/src/servers/src/repeated_field.rs b/src/servers/src/repeated_field.rs new file mode 100644 index 0000000000..0e3baf16a5 --- /dev/null +++ b/src/servers/src/repeated_field.rs @@ -0,0 +1,540 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2019 Stepan Koltsov +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +// OR OTHER DEALINGS IN THE SOFTWARE. + +/// ! The [Clear] trait and [RepeatedField] are taken from [rust-protobuf](https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-examples/vs-prost) +/// to leverage the pooling mechanism to avoid frequent heap allocation/deallocation when decoding deeply nested structs. +use std::borrow::Borrow; +use std::cmp::Ordering; +use std::default::Default; +use std::hash::{Hash, Hasher}; +use std::iter::{FromIterator, IntoIterator}; +use std::ops::{Deref, DerefMut, Index, IndexMut}; +use std::{fmt, slice, vec}; + +use bytes::Bytes; + +/// anything that can be cleared +pub trait Clear { + /// Clear this make, make it equivalent to newly created object. + fn clear(&mut self); +} + +impl Clear for Option { + fn clear(&mut self) { + self.take(); + } +} + +impl Clear for String { + fn clear(&mut self) { + String::clear(self); + } +} + +impl Clear for Vec { + fn clear(&mut self) { + Vec::clear(self); + } +} + +impl Clear for Bytes { + fn clear(&mut self) { + Bytes::clear(self); + } +} + +/// Wrapper around vector to avoid deallocations on clear. +pub struct RepeatedField { + vec: Vec, + len: usize, +} + +impl RepeatedField { + /// Return number of elements in this container. + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Clear. + #[inline] + pub fn clear(&mut self) { + self.len = 0; + } +} + +impl Default for RepeatedField { + #[inline] + fn default() -> RepeatedField { + RepeatedField { + vec: Vec::new(), + len: 0, + } + } +} + +impl RepeatedField { + /// Create new empty container. + #[inline] + pub fn new() -> RepeatedField { + Default::default() + } + + /// Create a contained with data from given vec. + #[inline] + pub fn from_vec(vec: Vec) -> RepeatedField { + let len = vec.len(); + RepeatedField { vec, len } + } + + /// Convert data into vec. + #[inline] + pub fn into_vec(self) -> Vec { + let mut vec = self.vec; + vec.truncate(self.len); + vec + } + + /// Return current capacity. + #[inline] + pub fn capacity(&self) -> usize { + self.vec.capacity() + } + + /// View data as slice. + #[inline] + pub fn as_slice<'a>(&'a self) -> &'a [T] { + &self.vec[..self.len] + } + + /// View data as mutable slice. + #[inline] + pub fn as_mut_slice<'a>(&'a mut self) -> &'a mut [T] { + &mut self.vec[..self.len] + } + + /// Get subslice of this container. + #[inline] + pub fn slice(&self, start: usize, end: usize) -> &[T] { + &self.as_ref()[start..end] + } + + /// Get mutable subslice of this container. + #[inline] + pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] { + &mut self.as_mut_slice()[start..end] + } + + /// Get slice from given index. + #[inline] + pub fn slice_from(&self, start: usize) -> &[T] { + &self.as_ref()[start..] + } + + /// Get mutable slice from given index. + #[inline] + pub fn slice_from_mut(&mut self, start: usize) -> &mut [T] { + &mut self.as_mut_slice()[start..] + } + + /// Get slice to given index. + #[inline] + pub fn slice_to(&self, end: usize) -> &[T] { + &self.as_ref()[..end] + } + + /// Get mutable slice to given index. + #[inline] + pub fn slice_to_mut(&mut self, end: usize) -> &mut [T] { + &mut self.as_mut_slice()[..end] + } + + /// View this container as two slices split at given index. + #[inline] + pub fn split_at<'a>(&'a self, mid: usize) -> (&'a [T], &'a [T]) { + self.as_ref().split_at(mid) + } + + /// View this container as two mutable slices split at given index. + #[inline] + pub fn split_at_mut<'a>(&'a mut self, mid: usize) -> (&'a mut [T], &'a mut [T]) { + self.as_mut_slice().split_at_mut(mid) + } + + /// View all but first elements of this container. + #[inline] + pub fn tail(&self) -> &[T] { + &self.as_ref()[1..] + } + + /// Last element of this container. + #[inline] + pub fn last(&self) -> Option<&T> { + self.as_ref().last() + } + + /// Mutable last element of this container. + #[inline] + pub fn last_mut<'a>(&'a mut self) -> Option<&'a mut T> { + self.as_mut_slice().last_mut() + } + + /// View all but last elements of this container. + #[inline] + pub fn init<'a>(&'a self) -> &'a [T] { + let s = self.as_ref(); + &s[0..s.len() - 1] + } + + /// Push an element to the end. + #[inline] + pub fn push(&mut self, value: T) { + if self.len == self.vec.len() { + self.vec.push(value); + } else { + self.vec[self.len] = value; + } + self.len += 1; + } + + /// Pop last element. + #[inline] + pub fn pop(&mut self) -> Option { + if self.len == 0 { + None + } else { + self.vec.truncate(self.len); + self.len -= 1; + self.vec.pop() + } + } + + /// Insert an element at specified position. + #[inline] + pub fn insert(&mut self, index: usize, value: T) { + assert!(index <= self.len); + self.vec.insert(index, value); + self.len += 1; + } + + /// Remove an element from specified position. + #[inline] + pub fn remove(&mut self, index: usize) -> T { + assert!(index < self.len); + self.len -= 1; + self.vec.remove(index) + } + + /// Retains only the elements specified by the predicate. + /// + /// In other words, remove all elements `e` such that `f(&e)` returns `false`. + /// This method operates in place, visiting each element exactly once in the + /// original order, and preserves the order of the retained elements. + /// + /// # Examples + /// + /// ``` + /// # use protobuf::RepeatedField; + /// + /// let mut vec = RepeatedField::from(vec![1, 2, 3, 4]); + /// vec.retain(|&x| x % 2 == 0); + /// assert_eq!(vec, RepeatedField::from(vec![2, 4])); + /// ``` + pub fn retain(&mut self, f: F) + where + F: FnMut(&T) -> bool, + { + // suboptimal + self.vec.truncate(self.len); + self.vec.retain(f); + self.len = self.vec.len(); + } + + /// Truncate at specified length. + #[inline] + pub fn truncate(&mut self, len: usize) { + if self.len > len { + self.len = len; + } + } + + /// Reverse in place. + #[inline] + pub fn reverse(&mut self) { + self.as_mut_slice().reverse() + } + + /// Into owned iterator. + #[inline] + pub fn into_iter(mut self) -> vec::IntoIter { + self.vec.truncate(self.len); + self.vec.into_iter() + } + + /// Immutable data iterator. + #[inline] + pub fn iter<'a>(&'a self) -> slice::Iter<'a, T> { + self.as_ref().iter() + } + + /// Mutable data iterator. + #[inline] + pub fn iter_mut<'a>(&'a mut self) -> slice::IterMut<'a, T> { + self.as_mut_slice().iter_mut() + } + + /// Sort elements with given comparator. + #[inline] + pub fn sort_by(&mut self, compare: F) + where + F: Fn(&T, &T) -> Ordering, + { + self.as_mut_slice().sort_by(compare) + } + + /// Get data as raw pointer. + #[inline] + pub fn as_ptr(&self) -> *const T { + self.vec.as_ptr() + } + + /// Get data a mutable raw pointer. + #[inline] + pub fn as_mut_ptr(&mut self) -> *mut T { + self.vec.as_mut_ptr() + } +} + +impl RepeatedField { + /// Push default value. + /// This operation could be faster than `rf.push(Default::default())`, + /// because it may reuse previously allocated and cleared element. + pub fn push_default<'a>(&'a mut self) -> &'a mut T { + if self.len == self.vec.len() { + self.vec.push(Default::default()); + } else { + self.vec[self.len].clear(); + } + self.len += 1; + self.last_mut().unwrap() + } +} + +impl From> for RepeatedField { + #[inline] + fn from(values: Vec) -> RepeatedField { + RepeatedField::from_vec(values) + } +} + +impl<'a, T: Clone> From<&'a [T]> for RepeatedField { + #[inline] + fn from(values: &'a [T]) -> RepeatedField { + RepeatedField::from_slice(values) + } +} + +impl Into> for RepeatedField { + #[inline] + fn into(self) -> Vec { + self.into_vec() + } +} + +impl RepeatedField { + /// Copy slice data to `RepeatedField` + #[inline] + pub fn from_slice(values: &[T]) -> RepeatedField { + RepeatedField::from_vec(values.to_vec()) + } + + /// Copy slice data to `RepeatedField` + #[inline] + pub fn from_ref>(values: X) -> RepeatedField { + RepeatedField::from_slice(values.as_ref()) + } + + /// Copy this data into new vec. + #[inline] + pub fn to_vec(&self) -> Vec { + self.as_ref().to_vec() + } +} + +impl Clone for RepeatedField { + #[inline] + fn clone(&self) -> RepeatedField { + RepeatedField { + vec: self.to_vec(), + len: self.len(), + } + } +} + +impl FromIterator for RepeatedField { + #[inline] + fn from_iter>(iter: I) -> RepeatedField { + RepeatedField::from_vec(FromIterator::from_iter(iter)) + } +} + +impl<'a, T> IntoIterator for &'a RepeatedField { + type Item = &'a T; + type IntoIter = slice::Iter<'a, T>; + + fn into_iter(self) -> slice::Iter<'a, T> { + self.iter() + } +} + +impl<'a, T> IntoIterator for &'a mut RepeatedField { + type Item = &'a mut T; + type IntoIter = slice::IterMut<'a, T>; + + fn into_iter(self) -> slice::IterMut<'a, T> { + self.iter_mut() + } +} + +impl<'a, T> IntoIterator for RepeatedField { + type Item = T; + type IntoIter = vec::IntoIter; + + fn into_iter(self) -> vec::IntoIter { + self.into_iter() + } +} + +impl PartialEq for RepeatedField { + #[inline] + fn eq(&self, other: &RepeatedField) -> bool { + self.as_ref() == other.as_ref() + } +} + +impl Eq for RepeatedField {} + +impl PartialEq<[T]> for RepeatedField { + fn eq(&self, other: &[T]) -> bool { + self.as_slice() == other + } +} + +impl PartialEq> for [T] { + fn eq(&self, other: &RepeatedField) -> bool { + self == other.as_slice() + } +} + +impl RepeatedField { + /// True iff this container contains given element. + #[inline] + pub fn contains(&self, value: &T) -> bool { + self.as_ref().contains(value) + } +} + +impl Hash for RepeatedField { + fn hash(&self, state: &mut H) { + self.as_ref().hash(state); + } +} + +impl AsRef<[T]> for RepeatedField { + #[inline] + fn as_ref<'a>(&'a self) -> &'a [T] { + &self.vec[..self.len] + } +} + +impl Borrow<[T]> for RepeatedField { + #[inline] + fn borrow(&self) -> &[T] { + &self.vec[..self.len] + } +} + +impl Deref for RepeatedField { + type Target = [T]; + #[inline] + fn deref(&self) -> &[T] { + &self.vec[..self.len] + } +} + +impl DerefMut for RepeatedField { + #[inline] + fn deref_mut(&mut self) -> &mut [T] { + &mut self.vec[..self.len] + } +} + +impl Index for RepeatedField { + type Output = T; + + #[inline] + fn index<'a>(&'a self, index: usize) -> &'a T { + &self.as_ref()[index] + } +} + +impl IndexMut for RepeatedField { + #[inline] + fn index_mut<'a>(&'a mut self, index: usize) -> &'a mut T { + &mut self.as_mut_slice()[index] + } +} + +impl Extend for RepeatedField { + fn extend>(&mut self, iter: I) { + self.vec.truncate(self.len); + self.vec.extend(iter); + self.len = self.vec.len(); + } +} + +impl<'a, T: Copy + 'a> Extend<&'a T> for RepeatedField { + fn extend>(&mut self, iter: I) { + self.vec.truncate(self.len); + self.vec.extend(iter); + self.len = self.vec.len(); + } +} + +impl fmt::Debug for RepeatedField { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.as_ref().fmt(f) + } +} diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 44564c367b..f77c7d7cb2 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -18,6 +18,7 @@ use api::prom_store::remote::{ LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, }; use api::v1::greptime_request::Request; +use api::v1::RowInsertRequests; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -64,6 +65,16 @@ impl PromStoreProtocolHandler for DummyInstance { Ok(()) } + + async fn write_fast( + &self, + _request: RowInsertRequests, + _ctx: QueryContextRef, + _with_metric_engine: bool, + ) -> Result<()> { + Ok(()) + } + async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result { let _ = self .tx @@ -141,6 +152,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { #[tokio::test] async fn test_prometheus_remote_write_read() { + common_telemetry::init_default_ut_logging(); let (tx, mut rx) = mpsc::channel(100); let app = make_test_app(tx); @@ -219,28 +231,17 @@ async fn test_prometheus_remote_write_read() { requests.push(s); } - assert_eq!(4, requests.len()); + assert_eq!(2, requests.len()); - assert_eq!("public", requests[0].0); - assert_eq!("prometheus", requests[1].0); - assert_eq!("prometheus", requests[2].0); - assert_eq!("public", requests[3].0); - - assert_eq!( - write_request, - WriteRequest::decode(&(requests[0].1)[..]).unwrap() - ); - assert_eq!( - write_request, - WriteRequest::decode(&(requests[1].1)[..]).unwrap() - ); + assert_eq!("prometheus", requests[0].0); + assert_eq!("public", requests[1].0); assert_eq!( read_request, - ReadRequest::decode(&(requests[2].1)[..]).unwrap() + ReadRequest::decode(&(requests[0].1)[..]).unwrap() ); assert_eq!( read_request, - ReadRequest::decode(&(requests[3].1)[..]).unwrap() + ReadRequest::decode(&(requests[1].1)[..]).unwrap() ); }