From d1730a957729d24186c22adbf570697ec900f0f4 Mon Sep 17 00:00:00 2001 From: LFC Date: Fri, 6 Jan 2023 12:19:38 +0800 Subject: [PATCH] refactor: simplify how Frontend instance handles other protocols (#831) * refactor: make influxdb, opentsdb and prometheus read/write goes through GRPC interface, to unify and simplify the Frontend instance either in standalone or distributed mode --- src/common/substrait/src/types.rs | 14 ++- src/frontend/src/error.rs | 58 +-------- src/frontend/src/instance.rs | 27 +--- src/frontend/src/instance/influxdb.rs | 153 ++++++++++------------- src/frontend/src/instance/opentsdb.rs | 65 +++------- src/frontend/src/instance/prometheus.rs | 64 +++++----- src/servers/src/influxdb.rs | 159 +----------------------- src/servers/src/opentsdb/codec.rs | 65 ---------- src/servers/src/prometheus.rs | 149 +--------------------- 9 files changed, 133 insertions(+), 621 deletions(-) diff --git a/src/common/substrait/src/types.rs b/src/common/substrait/src/types.rs index 8834acac36..c390ad4eb9 100644 --- a/src/common/substrait/src/types.rs +++ b/src/common/substrait/src/types.rs @@ -151,10 +151,21 @@ pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result LiteralType::I64(*v), ScalarValue::LargeUtf8(Some(v)) => LiteralType::String(v.clone()), ScalarValue::LargeBinary(Some(v)) => LiteralType::Binary(v.clone()), + ScalarValue::TimestampSecond(Some(seconds), _) => { + LiteralType::Timestamp(*seconds * 1_000_000) + } + ScalarValue::TimestampMillisecond(Some(millis), _) => { + LiteralType::Timestamp(*millis * 1000) + } + ScalarValue::TimestampMicrosecond(Some(micros), _) => LiteralType::Timestamp(*micros), + ScalarValue::TimestampNanosecond(Some(nanos), _) => { + LiteralType::Timestamp(*nanos / 1000) + } + ScalarValue::Utf8(Some(s)) => LiteralType::String(s.clone()), // TODO(LFC): Implement other conversions: ScalarValue => LiteralType _ => { return error::UnsupportedExprSnafu { - name: format!("{v:?}"), + name: format!("ScalarValue: {v:?}"), } .fail() } @@ -191,6 +202,7 @@ pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result ScalarValue::Float64(Some(v)), LiteralType::String(v) => ScalarValue::LargeUtf8(Some(v)), LiteralType::Binary(v) => ScalarValue::LargeBinary(Some(v)), + LiteralType::Timestamp(v) => ScalarValue::TimestampMicrosecond(Some(v), None), // TODO(LFC): Implement other conversions: LiteralType => ScalarValue _ => { return error::UnsupportedSubstraitTypeSnafu { diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 6223f64a2e..9111f21660 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -225,31 +225,12 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to bump table id when creating table, source: {}", source))] - BumpTableId { - #[snafu(backtrace)] - source: table::error::Error, - }, - - #[snafu(display("Failed to create database: {}, source: {}", name, source))] - CreateDatabase { - name: String, - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to insert values to table, source: {}", source))] Insert { #[snafu(backtrace)] source: client::Error, }, - #[snafu(display("Failed to create table on insertion, source: {}", source))] - CreateTableOnInsertion { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] BuildCreateExprOnInsertion { #[snafu(backtrace)] @@ -271,12 +252,6 @@ pub enum Error { source: common_grpc_expr::error::Error, }, - #[snafu(display("Failed to deserialize insert batching: {}", source))] - InsertBatchToRequest { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - #[snafu(display("Failed to find catalog by name: {}", catalog_name))] CatalogNotFound { catalog_name: String, @@ -295,15 +270,6 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Failed to get catalog manager"))] - CatalogManager { backtrace: Backtrace }, - - #[snafu(display("Failed to get full table name, source: {}", source))] - FullTableName { - #[snafu(backtrace)] - source: sql::error::Error, - }, - #[snafu(display("Failed to find region routes for table {}", table_name))] FindRegionRoutes { table_name: String, @@ -358,25 +324,12 @@ pub enum Error { #[snafu(display("Cannot find primary key column by name: {}", msg))] PrimaryKeyNotFound { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to execute sql: {}, source: {}", sql, source))] - ExecuteSql { - sql: String, - #[snafu(backtrace)] - source: query::error::Error, - }, - #[snafu(display("Failed to execute statement, source: {}", source))] ExecuteStatement { #[snafu(backtrace)] source: query::error::Error, }, - #[snafu(display("Failed to do vector computation, source: {}", source))] - VectorComputation { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failed to build DataFusion logical plan, source: {}", source))] BuildDfLogicalPlan { source: datafusion_common::DataFusionError, @@ -466,7 +419,6 @@ impl ErrorExt for Error { | Error::InvalidInsertRequest { .. } | Error::FindPartitionColumn { .. } | Error::ColumnValuesNumberMismatch { .. } - | Error::CatalogManager { .. } | Error::RegionKeysSize { .. } | Error::InvalidFlightTicket { .. } => StatusCode::InvalidArguments, @@ -478,13 +430,10 @@ impl ErrorExt for Error { Error::ParseSql { source } => source.status_code(), - Error::FullTableName { source, .. } => source.status_code(), - Error::Table { source } => source.status_code(), Error::ConvertColumnDefaultConstraint { source, .. } | Error::ConvertScalarValue { source, .. } - | Error::VectorComputation { source } | Error::ConvertArrowSchema { source } => source.status_code(), Error::InvalidObjectResult { source, .. } | Error::RequestDatanode { source } => { @@ -522,19 +471,14 @@ impl ErrorExt for Error { Error::StartMetaClient { source } | Error::RequestMeta { source } => { source.status_code() } - Error::BumpTableId { source, .. } => source.status_code(), Error::SchemaNotFound { .. } => StatusCode::InvalidArguments, Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, - Error::CreateDatabase { source, .. } - | Error::CreateTableOnInsertion { source, .. } - | Error::Insert { source, .. } => source.status_code(), + Error::Insert { source, .. } => source.status_code(), Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(), Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), Error::ToTableInsertRequest { source, .. } => source.status_code(), Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, - Error::ExecuteSql { source, .. } => source.status_code(), Error::ExecuteStatement { source, .. } => source.status_code(), - Error::InsertBatchToRequest { source, .. } => source.status_code(), Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, Error::AlterExprToRequest { source, .. } => source.status_code(), Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ff04e61093..2fa7068100 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -30,7 +30,7 @@ use api::v1::{ }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; -use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef}; +use catalog::CatalogManagerRef; use client::RpcOutput; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; @@ -42,13 +42,13 @@ use datanode::instance::InstanceRef as DnInstanceRef; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; +use servers::error as server_error; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; use servers::query_handler::{ GrpcQueryHandler, GrpcQueryHandlerRef, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler, SqlQueryHandlerRef, }; -use servers::{error as server_error, Mode}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::GenericDialect; @@ -93,10 +93,6 @@ pub struct Instance { grpc_query_handler: GrpcQueryHandlerRef, create_expr_factory: CreateExprFactoryRef, - // TODO(fys): it should be a trait that corresponds to two implementations: - // Standalone and Distributed, then the code behind it doesn't need to use so - // many match statements. - mode: Mode, /// plugins: this map holds extensions to customize query or auth /// behaviours. @@ -126,7 +122,6 @@ impl Instance { catalog_manager, script_handler: None, create_expr_factory: Arc::new(DefaultCreateExprFactory), - mode: Mode::Distributed, sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, plugins: Default::default(), @@ -168,7 +163,6 @@ impl Instance { catalog_manager: dn_instance.catalog_manager().clone(), script_handler: None, create_expr_factory: Arc::new(DefaultCreateExprFactory), - mode: Mode::Standalone, sql_handler: dn_instance.clone(), grpc_query_handler: dn_instance.clone(), plugins: Default::default(), @@ -181,7 +175,6 @@ impl Instance { catalog_manager: dist_instance.catalog_manager(), script_handler: None, create_expr_factory: Arc::new(DefaultCreateExprFactory), - mode: Mode::Distributed, sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, plugins: Default::default(), @@ -349,22 +342,6 @@ impl Instance { Ok(output.into()) } - fn get_catalog(&self, catalog_name: &str) -> Result { - self.catalog_manager - .catalog(catalog_name) - .context(error::CatalogSnafu)? - .context(error::CatalogNotFoundSnafu { catalog_name }) - } - - fn get_schema(provider: CatalogProviderRef, schema_name: &str) -> Result { - provider - .schema(schema_name) - .context(error::CatalogSnafu)? - .context(error::SchemaNotFoundSnafu { - schema_info: schema_name, - }) - } - fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { ensure!( self.catalog_manager diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 0cd69b7432..1849ecca0f 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -12,115 +12,88 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - -use api::v1::{Column, InsertRequest as GrpcInsertRequest}; use async_trait::async_trait; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; -use common_grpc_expr::column_to_vector; +use servers::error as server_error; use servers::influxdb::InfluxdbRequest; use servers::query_handler::InfluxdbLineProtocolHandler; -use servers::{error as server_error, Mode}; -use snafu::{OptionExt, ResultExt}; -use table::requests::InsertRequest; +use snafu::ResultExt; -use crate::error; -use crate::error::{InsertBatchToRequestSnafu, Result}; use crate::instance::Instance; #[async_trait] impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { - match self.mode { - Mode::Standalone => { - self.handle_inserts(request.try_into()?) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { - query: &request.lines, - })?; - } - Mode::Distributed => { - self.dist_insert(request.try_into()?) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteInsertSnafu { - msg: "execute insert failed", - })?; - } - } - + let requests = request.try_into()?; + self.handle_inserts(requests) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{request:?}"), + })?; Ok(()) } } -impl Instance { - pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { - let mut joins = Vec::with_capacity(inserts.len()); - let catalog_name = DEFAULT_CATALOG_NAME; +#[cfg(test)] +mod test { + use std::sync::Arc; - for insert in inserts { - let self_clone = self.clone(); + use common_query::Output; + use common_recordbatch::RecordBatches; + use servers::query_handler::SqlQueryHandler; + use session::context::QueryContext; - let schema_name = insert.schema_name.to_string(); - let table_name = insert.table_name.to_string(); + use super::*; + use crate::tests; - let columns = &insert.columns; - let row_count = insert.row_count; + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_put_influxdb_lines() { + let standalone = + tests::create_standalone_instance("test_standalone_put_influxdb_lines").await; + let instance = &standalone.instance; - self.create_or_alter_table_on_demand(catalog_name, &schema_name, &table_name, columns) - .await?; - - let request = Self::columns_to_request( - catalog_name, - &schema_name, - &table_name, - columns, - row_count, - )?; - - // TODO(fys): need a separate runtime here - let self_clone = self_clone.clone(); - let join = tokio::spawn(async move { - let catalog = self_clone.get_catalog(catalog_name)?; - let schema = Self::get_schema(catalog, &schema_name)?; - let table = schema - .table(&table_name) - .context(error::CatalogSnafu)? - .context(error::TableNotFoundSnafu { table_name })?; - - table.insert(request).await.context(error::TableSnafu) - }); - joins.push(join); - } - - let mut affected = 0; - - for join in joins { - affected += join.await.context(error::JoinTaskSnafu)??; - } - - Ok(affected) + test_put_influxdb_lines(instance).await; } - fn columns_to_request( - catalog_name: &str, - schema_name: &str, - table_name: &str, - columns: &[Column], - row_count: u32, - ) -> Result { - let mut vectors = HashMap::with_capacity(columns.len()); - for col in columns { - let vector = column_to_vector(col, row_count).context(InsertBatchToRequestSnafu)?; - vectors.insert(col.column_name.clone(), vector); - } - Ok(InsertRequest { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - columns_values: vectors, - }) + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_put_influxdb_lines() { + let instance = + tests::create_distributed_instance("test_distributed_put_influxdb_lines").await; + let instance = &instance.frontend; + + test_put_influxdb_lines(instance).await; + } + + async fn test_put_influxdb_lines(instance: &Arc) { + let lines = r" +monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 +monitor1,host=host2 memory=1027 1663840496400340001"; + let request = InfluxdbRequest { + precision: None, + db: "public".to_string(), + lines: lines.to_string(), + }; + instance.exec(&request).await.unwrap(); + + let mut output = instance + .do_query( + "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts", + QueryContext::arc(), + ) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++-------------------------+-------+------+--------+ +| ts | host | cpu | memory | ++-------------------------+-------+------+--------+ +| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024 | +| 2022-09-22T09:54:56.400 | host2 | | 1027 | ++-------------------------+-------+------+--------+" + ); } } diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 2545b2c4e6..32841365d6 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -14,9 +14,9 @@ use async_trait::async_trait; use common_error::prelude::BoxedError; +use servers::error as server_error; use servers::opentsdb::codec::DataPoint; use servers::query_handler::OpentsdbProtocolHandler; -use servers::{error as server_error, Mode}; use snafu::prelude::*; use crate::instance::Instance; @@ -24,30 +24,8 @@ use crate::instance::Instance; #[async_trait] impl OpentsdbProtocolHandler for Instance { async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> { - // TODO(LFC): Insert metrics in batch, then make OpentsdbLineProtocolHandler::exec received multiple data points, when - // metric table and tags can be created upon insertion. - match self.mode { - Mode::Standalone => { - self.insert_opentsdb_metric(data_point).await?; - } - Mode::Distributed => { - self.dist_insert(vec![data_point.as_grpc_insert()]) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteInsertSnafu { - msg: "execute insert failed", - })?; - } - } - - Ok(()) - } -} - -impl Instance { - async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> server_error::Result<()> { - let insert_expr = data_point.as_grpc_insert(); - self.handle_insert(insert_expr) + let request = data_point.as_grpc_insert(); + self.handle_insert(request) .await .map_err(BoxedError::new) .with_context(|_| server_error::ExecuteQuerySnafu { @@ -71,29 +49,22 @@ mod tests { use crate::tests; #[tokio::test(flavor = "multi_thread")] - async fn test_exec() { - let standalone = tests::create_standalone_instance("test_exec").await; - let instance = standalone.instance; - instance - .exec( - &DataPoint::try_create( - "put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0", - ) - .unwrap(), - ) - .await - .unwrap(); - instance - .exec(&DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap()) - .await - .unwrap(); + async fn test_standalone_exec() { + let standalone = tests::create_standalone_instance("test_standalone_exec").await; + let instance = &standalone.instance; + + test_exec(instance).await; } #[tokio::test(flavor = "multi_thread")] - async fn test_insert_opentsdb_metric() { - let standalone = tests::create_standalone_instance("test_insert_opentsdb_metric").await; - let instance = standalone.instance; + async fn test_distributed_exec() { + let distributed = tests::create_distributed_instance("test_distributed_exec").await; + let instance = &distributed.frontend; + test_exec(instance).await; + } + + async fn test_exec(instance: &Arc) { let data_point1 = DataPoint::new( "my_metric_1".to_string(), 1000, @@ -104,7 +75,7 @@ mod tests { ], ); // should create new table "my_metric_1" directly - let result = instance.insert_opentsdb_metric(&data_point1).await; + let result = instance.exec(&data_point1).await; assert!(result.is_ok()); let data_point2 = DataPoint::new( @@ -117,12 +88,12 @@ mod tests { ], ); // should create new column "tagk3" directly - let result = instance.insert_opentsdb_metric(&data_point2).await; + let result = instance.exec(&data_point2).await; assert!(result.is_ok()); let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); // should handle null tags properly - let result = instance.insert_opentsdb_metric(&data_point3).await; + let result = instance.exec(&data_point3).await; assert!(result.is_ok()); let output = instance diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index dc4cc71e62..ab8ddb9058 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -23,8 +23,7 @@ use common_telemetry::logging; use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; -use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; -use servers::Mode; +use servers::query_handler::{GrpcQueryHandler, PrometheusProtocolHandler, PrometheusResponse}; use snafu::{OptionExt, ResultExt}; use crate::instance::Instance; @@ -89,7 +88,6 @@ impl Instance { })), }; let object_result = self - .grpc_query_handler .do_query(query) .await? .try_into() @@ -106,24 +104,12 @@ impl Instance { impl PrometheusProtocolHandler for Instance { async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> { let requests = prometheus::to_grpc_insert_requests(database, request.clone())?; - match self.mode { - Mode::Standalone => { - self.handle_inserts(requests) - .await - .map_err(BoxedError::new) - .with_context(|_| error::ExecuteInsertSnafu { - msg: format!("{request:?}"), - })?; - } - Mode::Distributed => { - self.dist_insert(requests) - .await - .map_err(BoxedError::new) - .with_context(|_| error::ExecuteInsertSnafu { - msg: format!("{request:?}"), - })?; - } - } + self.handle_inserts(requests) + .await + .map_err(BoxedError::new) + .with_context(|_| error::ExecuteInsertSnafu { + msg: format!("{request:?}"), + })?; Ok(()) } @@ -167,6 +153,8 @@ impl PrometheusProtocolHandler for Instance { #[cfg(test)] mod tests { + use std::sync::Arc; + use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, LabelMatcher, Sample}; use servers::query_handler::SqlQueryHandler; @@ -176,11 +164,24 @@ mod tests { use crate::tests; #[tokio::test(flavor = "multi_thread")] - async fn test_prometheus_remote_write_and_read() { + async fn test_standalone_prometheus_remote_rw() { let standalone = - tests::create_standalone_instance("test_prometheus_remote_write_and_read").await; - let instance = standalone.instance; + tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await; + let instance = &standalone.instance; + test_prometheus_remote_rw(instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_prometheus_remote_rw() { + let distributed = + tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await; + let instance = &distributed.frontend; + + test_prometheus_remote_rw(instance).await; + } + + async fn test_prometheus_remote_rw(instance: &Arc) { let write_request = WriteRequest { timeseries: prometheus::mock_timeseries(), ..Default::default() @@ -188,12 +189,15 @@ mod tests { let db = "prometheus"; - assert!(instance - .do_query("CREATE DATABASE prometheus", QueryContext::arc()) - .await - .get(0) - .unwrap() - .is_ok()); + assert!(SqlQueryHandler::do_query( + instance.as_ref(), + "CREATE DATABASE prometheus", + QueryContext::arc() + ) + .await + .get(0) + .unwrap() + .is_ok()); instance.write(db, write_request).await.unwrap(); diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index d90a902894..3cb9de060e 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -18,14 +18,13 @@ use api::v1::InsertRequest as GrpcInsertRequest; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; -use table::requests::InsertRequest; use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu}; -use crate::line_writer::LineWriter; pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; +#[derive(Debug)] pub struct InfluxdbRequest { pub precision: Option, pub db: String, @@ -34,58 +33,6 @@ pub struct InfluxdbRequest { type TableName = String; -impl TryFrom<&InfluxdbRequest> for Vec { - type Error = Error; - - fn try_from(value: &InfluxdbRequest) -> Result { - let lines = parse_lines(&value.lines) - .collect::>>() - .context(InfluxdbLineProtocolSnafu)?; - let line_len = lines.len(); - let mut writers: HashMap = HashMap::new(); - let db = &value.db; - - for line in lines { - let table_name = line.series.measurement; - let writer = writers - .entry(table_name.to_string()) - .or_insert_with(|| LineWriter::with_lines(db, table_name, line_len)); - - let tags = line.series.tag_set; - if let Some(tags) = tags { - for (k, v) in tags { - writer.write_tag(k.as_str(), v.as_str()); - } - } - - let fields = line.field_set; - for (k, v) in fields { - let column_name = k.as_str(); - match v { - FieldValue::I64(value) => writer.write_i64(column_name, value), - FieldValue::U64(value) => writer.write_u64(column_name, value), - FieldValue::F64(value) => writer.write_f64(column_name, value), - FieldValue::String(value) => writer.write_string(column_name, value.as_str()), - FieldValue::Boolean(value) => writer.write_bool(column_name, value), - } - } - - if let Some(timestamp) = line.timestamp { - let precision = if let Some(val) = &value.precision { - *val - } else { - DEFAULT_TIME_PRECISION - }; - writer.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision)); - } - - writer.commit(); - } - Ok(writers.into_values().map(|x| x.finish()).collect()) - } -} - -// TODO(fys): will remove in the future. impl TryFrom<&InfluxdbRequest> for Vec { type Error = Error; @@ -177,16 +124,9 @@ impl TryFrom<&InfluxdbRequest> for Vec { #[cfg(test)] mod tests { - use std::sync::Arc; - use api::v1::column::{SemanticType, Values}; use api::v1::{Column, ColumnDataType}; use common_base::BitVec; - use common_time::timestamp::TimeUnit; - use common_time::Timestamp; - use datatypes::value::Value; - use datatypes::vectors::Vector; - use table::requests::InsertRequest; use super::*; use crate::influxdb::InfluxdbRequest; @@ -197,32 +137,6 @@ mod tests { monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 monitor1,host=host2 memory=1027 1663840496400340001 monitor2,host=host3 cpu=66.5 1663840496100023102 -monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; - - let influxdb_req = &InfluxdbRequest { - db: "influxdb".to_string(), - precision: None, - lines: lines.to_string(), - }; - let insert_reqs: Vec = influxdb_req.try_into().unwrap(); - - for insert_req in insert_reqs { - assert_eq!("influxdb", insert_req.schema_name); - match &insert_req.table_name[..] { - "monitor1" => assert_table_1(&insert_req), - "monitor2" => assert_table_2(&insert_req), - _ => panic!(), - } - } - } - - // TODO(fys): will remove in the future. - #[test] - fn test_convert_influxdb_lines_1() { - let lines = r" -monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 -monitor1,host=host2 memory=1027 1663840496400340001 -monitor2,host=host3 cpu=66.5 1663840496100023102 monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let influxdb_req = &InfluxdbRequest { @@ -244,77 +158,6 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } } - fn assert_table_1(insert_req: &InsertRequest) { - let table_name = &insert_req.table_name; - assert_eq!("monitor1", table_name); - - let columns = &insert_req.columns_values; - - let host = columns.get("host").unwrap(); - let expected: Vec = vec!["host1".into(), "host2".into()]; - assert_vector(&expected, host); - - let cpu = columns.get("cpu").unwrap(); - let expected: Vec = vec![66.6.into(), Value::Null]; - assert_vector(&expected, cpu); - - let memory = columns.get("memory").unwrap(); - let expected: Vec = vec![1024.0.into(), 1027.0.into()]; - assert_vector(&expected, memory); - - let ts = columns.get("ts").unwrap(); - let expected: Vec = vec![ - datatypes::prelude::Value::Timestamp(Timestamp::new( - 1663840496100, - TimeUnit::Millisecond, - )), - datatypes::prelude::Value::Timestamp(Timestamp::new( - 1663840496400, - TimeUnit::Millisecond, - )), - ]; - assert_vector(&expected, ts); - } - - fn assert_table_2(insert_req: &InsertRequest) { - let table_name = &insert_req.table_name; - assert_eq!("monitor2", table_name); - - let columns = &insert_req.columns_values; - - let host = columns.get("host").unwrap(); - let expected: Vec = vec!["host3".into(), "host4".into()]; - assert_vector(&expected, host); - - let cpu = columns.get("cpu").unwrap(); - let expected: Vec = vec![66.5.into(), 66.3.into()]; - assert_vector(&expected, cpu); - - let memory = columns.get("memory").unwrap(); - let expected: Vec = vec![Value::Null, 1029.0.into()]; - assert_vector(&expected, memory); - - let ts = columns.get("ts").unwrap(); - let expected: Vec = vec![ - datatypes::prelude::Value::Timestamp(Timestamp::new( - 1663840496100, - TimeUnit::Millisecond, - )), - datatypes::prelude::Value::Timestamp(Timestamp::new( - 1663840496400, - TimeUnit::Millisecond, - )), - ]; - assert_vector(&expected, ts); - } - - fn assert_vector(expected: &[Value], vector: &Arc) { - for (idx, expected) in expected.iter().enumerate() { - let val = vector.get(idx); - assert_eq!(*expected, val); - } - } - fn assert_monitor_1(columns: &[Column]) { assert_eq!(4, columns.len()); verify_column( diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index b7191a28c2..42fc491f93 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -15,11 +15,8 @@ use api::v1::column::SemanticType; use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; -use common_grpc::writer::Precision; -use table::requests::InsertRequest; use crate::error::{self, Result}; -use crate::line_writer::LineWriter; pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value"; @@ -128,24 +125,6 @@ impl DataPoint { self.value } - pub fn as_insert_request(&self) -> InsertRequest { - let mut line_writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, self.metric.clone(), 1); - line_writer.write_ts( - OPENTSDB_TIMESTAMP_COLUMN_NAME, - (self.ts_millis(), Precision::Millisecond), - ); - - line_writer.write_f64(OPENTSDB_VALUE_COLUMN_NAME, self.value); - - for (tagk, tagv) in self.tags.iter() { - line_writer.write_tag(tagk, tagv); - } - line_writer.commit(); - line_writer.finish() - } - - // TODO(LFC): opentsdb and influxdb insertions should go through the Table trait directly. - // Currently: line protocol -> grpc request -> grpc interface -> table trait pub fn as_grpc_insert(&self) -> GrpcInsertRequest { let schema_name = DEFAULT_SCHEMA_NAME.to_string(); let mut columns = Vec::with_capacity(2 + self.tags.len()); @@ -211,13 +190,6 @@ impl DataPoint { #[cfg(test)] mod test { - use std::sync::Arc; - - use common_time::timestamp::TimeUnit; - use common_time::Timestamp; - use datatypes::value::Value; - use datatypes::vectors::Vector; - use super::*; #[test] @@ -277,43 +249,6 @@ mod test { ); } - #[test] - fn test_as_insert_request() { - let data_point = DataPoint { - metric: "my_metric_1".to_string(), - ts_millis: 1000, - value: 1.0, - tags: vec![ - ("tagk1".to_string(), "tagv1".to_string()), - ("tagk2".to_string(), "tagv2".to_string()), - ], - }; - let insert_request = data_point.as_insert_request(); - assert_eq!("my_metric_1", insert_request.table_name); - let columns = insert_request.columns_values; - assert_eq!(4, columns.len()); - let ts = columns.get(OPENTSDB_TIMESTAMP_COLUMN_NAME).unwrap(); - let expected = vec![datatypes::prelude::Value::Timestamp(Timestamp::new( - 1000, - TimeUnit::Millisecond, - ))]; - assert_vector(&expected, ts); - let val = columns.get(OPENTSDB_VALUE_COLUMN_NAME).unwrap(); - assert_vector(&[1.0.into()], val); - let tagk1 = columns.get("tagk1").unwrap(); - assert_vector(&["tagv1".into()], tagk1); - let tagk2 = columns.get("tagk2").unwrap(); - assert_vector(&["tagv2".into()], tagk2); - } - - fn assert_vector(expected: &[Value], vector: &Arc) { - for (idx, expected) in expected.iter().enumerate() { - let val = vector.get(idx); - assert_eq!(*expected, val); - } - } - - // TODO(fys): will remove in the future. #[test] fn test_as_grpc_insert() { let data_point = DataPoint { diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 0ee66c3bd8..3cd0b13c3d 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -21,17 +21,14 @@ use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::v1::column::SemanticType; use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; -use common_grpc::writer::Precision::Millisecond; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; use datatypes::prelude::{ConcreteDataType, Value}; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{ensure, OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; -use table::requests::InsertRequest; use crate::error::{self, Result}; -use crate::line_writer::LineWriter; const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; const VALUE_COLUMN_NAME: &str = "greptime_value"; @@ -287,58 +284,6 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result> { - let timeseries = std::mem::take(&mut request.timeseries); - - timeseries - .into_iter() - .map(|timeseries| timeseries_to_insert_request(db, timeseries)) - .collect() -} - -fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result { - // TODO(dennis): save exemplars into a column - let labels = std::mem::take(&mut timeseries.labels); - let samples = std::mem::take(&mut timeseries.samples); - - let mut table_name = None; - for label in &labels { - // The metric name is a special label - if label.name == METRIC_NAME_LABEL { - table_name = Some(&label.value); - } - } - let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu { - msg: "missing '__name__' label in timeseries", - })?; - - let row_count = samples.len(); - let mut line_writer = LineWriter::with_lines(db, table_name, row_count); - - for sample in samples { - let ts_millis = sample.timestamp; - let val = sample.value; - - line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, Millisecond)); - line_writer.write_f64(VALUE_COLUMN_NAME, val); - - labels - .iter() - .filter(|label| label.name != METRIC_NAME_LABEL) - .for_each(|label| { - line_writer.write_tag(&label.name, &label.value); - }); - - line_writer.commit(); - } - Ok(line_writer.finish()) -} - -// TODO(fys): it will remove in the future. pub fn to_grpc_insert_requests( database: &str, mut request: WriteRequest, @@ -351,7 +296,6 @@ pub fn to_grpc_insert_requests( .collect() } -// TODO(fys): it will remove in the future. fn to_grpc_insert_request(database: &str, mut timeseries: TimeSeries) -> Result { let schema_name = database.to_string(); @@ -506,11 +450,8 @@ mod tests { use std::sync::Arc; use api::prometheus::remote::LabelMatcher; - use common_time::timestamp::TimeUnit; - use common_time::Timestamp; use datatypes::schema::{ColumnSchema, Schema}; - use datatypes::value::Value; - use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector, Vector}; + use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; use super::*; @@ -570,94 +511,6 @@ mod tests { assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql); } - #[test] - fn test_write_request_to_insert_reqs() { - let write_request = WriteRequest { - timeseries: mock_timeseries(), - ..Default::default() - }; - - let reqs = write_request_to_insert_reqs("public", write_request).unwrap(); - - assert_eq!(3, reqs.len()); - - let req1 = reqs.get(0).unwrap(); - assert_eq!("public", req1.schema_name); - assert_eq!("metric1", req1.table_name); - - let columns = &req1.columns_values; - let job = columns.get("job").unwrap(); - let expected: Vec = vec!["spark".into(), "spark".into()]; - assert_vector(&expected, job); - - let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap(); - let expected: Vec = vec![ - datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), - datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)), - ]; - assert_vector(&expected, ts); - - let val = columns.get(VALUE_COLUMN_NAME).unwrap(); - let expected: Vec = vec![1.0_f64.into(), 2.0_f64.into()]; - assert_vector(&expected, val); - - let req2 = reqs.get(1).unwrap(); - assert_eq!("public", req2.schema_name); - assert_eq!("metric2", req2.table_name); - - let columns = &req2.columns_values; - let instance = columns.get("instance").unwrap(); - let expected: Vec = vec!["test_host1".into(), "test_host1".into()]; - assert_vector(&expected, instance); - - let idc = columns.get("idc").unwrap(); - let expected: Vec = vec!["z001".into(), "z001".into()]; - assert_vector(&expected, idc); - - let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap(); - let expected: Vec = vec![ - datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), - datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)), - ]; - assert_vector(&expected, ts); - - let val = columns.get(VALUE_COLUMN_NAME).unwrap(); - let expected: Vec = vec![3.0_f64.into(), 4.0_f64.into()]; - assert_vector(&expected, val); - - let req3 = reqs.get(2).unwrap(); - assert_eq!("public", req3.schema_name); - assert_eq!("metric3", req3.table_name); - - let columns = &req3.columns_values; - let idc = columns.get("idc").unwrap(); - let expected: Vec = vec!["z002".into(), "z002".into(), "z002".into()]; - assert_vector(&expected, idc); - - let app = columns.get("app").unwrap(); - let expected: Vec = vec!["biz".into(), "biz".into(), "biz".into()]; - assert_vector(&expected, app); - - let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap(); - let expected: Vec = vec![ - datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), - datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)), - datatypes::prelude::Value::Timestamp(Timestamp::new(3000, TimeUnit::Millisecond)), - ]; - assert_vector(&expected, ts); - - let val = columns.get(VALUE_COLUMN_NAME).unwrap(); - let expected: Vec = vec![5.0_f64.into(), 6.0_f64.into(), 7.0_f64.into()]; - assert_vector(&expected, val); - } - - fn assert_vector(expected: &[Value], vector: &Arc) { - for (idx, expected) in expected.iter().enumerate() { - let val = vector.get(idx); - assert_eq!(*expected, val); - } - } - #[test] fn test_write_request_to_insert_exprs() { let write_request = WriteRequest {