refactor: simplify how Frontend instance handles other protocols (#831)

* refactor: make influxdb, opentsdb and prometheus read/write goes through GRPC interface, to unify and simplify the Frontend instance either in standalone or distributed mode
This commit is contained in:
LFC
2023-01-06 12:19:38 +08:00
committed by GitHub
parent ca7ed67dc5
commit d1730a9577
9 changed files with 133 additions and 621 deletions

View File

@@ -151,10 +151,21 @@ pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result<LiteralTyp
ScalarValue::Int64(Some(v)) => LiteralType::I64(*v),
ScalarValue::LargeUtf8(Some(v)) => LiteralType::String(v.clone()),
ScalarValue::LargeBinary(Some(v)) => LiteralType::Binary(v.clone()),
ScalarValue::TimestampSecond(Some(seconds), _) => {
LiteralType::Timestamp(*seconds * 1_000_000)
}
ScalarValue::TimestampMillisecond(Some(millis), _) => {
LiteralType::Timestamp(*millis * 1000)
}
ScalarValue::TimestampMicrosecond(Some(micros), _) => LiteralType::Timestamp(*micros),
ScalarValue::TimestampNanosecond(Some(nanos), _) => {
LiteralType::Timestamp(*nanos / 1000)
}
ScalarValue::Utf8(Some(s)) => LiteralType::String(s.clone()),
// TODO(LFC): Implement other conversions: ScalarValue => LiteralType
_ => {
return error::UnsupportedExprSnafu {
name: format!("{v:?}"),
name: format!("ScalarValue: {v:?}"),
}
.fail()
}
@@ -191,6 +202,7 @@ pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result<ScalarValue
LiteralType::Fp64(v) => ScalarValue::Float64(Some(v)),
LiteralType::String(v) => ScalarValue::LargeUtf8(Some(v)),
LiteralType::Binary(v) => ScalarValue::LargeBinary(Some(v)),
LiteralType::Timestamp(v) => ScalarValue::TimestampMicrosecond(Some(v), None),
// TODO(LFC): Implement other conversions: LiteralType => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {

View File

@@ -225,31 +225,12 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to bump table id when creating table, source: {}", source))]
BumpTableId {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Failed to create database: {}, source: {}", name, source))]
CreateDatabase {
name: String,
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to insert values to table, source: {}", source))]
Insert {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to create table on insertion, source: {}", source))]
CreateTableOnInsertion {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to build CreateExpr on insertion: {}", source))]
BuildCreateExprOnInsertion {
#[snafu(backtrace)]
@@ -271,12 +252,6 @@ pub enum Error {
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to deserialize insert batching: {}", source))]
InsertBatchToRequest {
#[snafu(backtrace)]
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to find catalog by name: {}", catalog_name))]
CatalogNotFound {
catalog_name: String,
@@ -295,15 +270,6 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Failed to get catalog manager"))]
CatalogManager { backtrace: Backtrace },
#[snafu(display("Failed to get full table name, source: {}", source))]
FullTableName {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Failed to find region routes for table {}", table_name))]
FindRegionRoutes {
table_name: String,
@@ -358,25 +324,12 @@ pub enum Error {
#[snafu(display("Cannot find primary key column by name: {}", msg))]
PrimaryKeyNotFound { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to execute sql: {}, source: {}", sql, source))]
ExecuteSql {
sql: String,
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to execute statement, source: {}", source))]
ExecuteStatement {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to do vector computation, source: {}", source))]
VectorComputation {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to build DataFusion logical plan, source: {}", source))]
BuildDfLogicalPlan {
source: datafusion_common::DataFusionError,
@@ -466,7 +419,6 @@ impl ErrorExt for Error {
| Error::InvalidInsertRequest { .. }
| Error::FindPartitionColumn { .. }
| Error::ColumnValuesNumberMismatch { .. }
| Error::CatalogManager { .. }
| Error::RegionKeysSize { .. }
| Error::InvalidFlightTicket { .. } => StatusCode::InvalidArguments,
@@ -478,13 +430,10 @@ impl ErrorExt for Error {
Error::ParseSql { source } => source.status_code(),
Error::FullTableName { source, .. } => source.status_code(),
Error::Table { source } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::ConvertScalarValue { source, .. }
| Error::VectorComputation { source }
| Error::ConvertArrowSchema { source } => source.status_code(),
Error::InvalidObjectResult { source, .. } | Error::RequestDatanode { source } => {
@@ -522,19 +471,14 @@ impl ErrorExt for Error {
Error::StartMetaClient { source } | Error::RequestMeta { source } => {
source.status_code()
}
Error::BumpTableId { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,
Error::CreateDatabase { source, .. }
| Error::CreateTableOnInsertion { source, .. }
| Error::Insert { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(),
Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
Error::ToTableInsertRequest { source, .. } => source.status_code(),
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
Error::ExecuteSql { source, .. } => source.status_code(),
Error::ExecuteStatement { source, .. } => source.status_code(),
Error::InsertBatchToRequest { source, .. } => source.status_code(),
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,

View File

@@ -30,7 +30,7 @@ use api::v1::{
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef};
use catalog::CatalogManagerRef;
use client::RpcOutput;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
@@ -42,13 +42,13 @@ use datanode::instance::InstanceRef as DnInstanceRef;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::{
GrpcQueryHandler, GrpcQueryHandlerRef, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler,
SqlQueryHandlerRef,
};
use servers::{error as server_error, Mode};
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::dialect::GenericDialect;
@@ -93,10 +93,6 @@ pub struct Instance {
grpc_query_handler: GrpcQueryHandlerRef,
create_expr_factory: CreateExprFactoryRef,
// TODO(fys): it should be a trait that corresponds to two implementations:
// Standalone and Distributed, then the code behind it doesn't need to use so
// many match statements.
mode: Mode,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
@@ -126,7 +122,6 @@ impl Instance {
catalog_manager,
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Distributed,
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
plugins: Default::default(),
@@ -168,7 +163,6 @@ impl Instance {
catalog_manager: dn_instance.catalog_manager().clone(),
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Standalone,
sql_handler: dn_instance.clone(),
grpc_query_handler: dn_instance.clone(),
plugins: Default::default(),
@@ -181,7 +175,6 @@ impl Instance {
catalog_manager: dist_instance.catalog_manager(),
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Distributed,
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
plugins: Default::default(),
@@ -349,22 +342,6 @@ impl Instance {
Ok(output.into())
}
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
self.catalog_manager
.catalog(catalog_name)
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu { catalog_name })
}
fn get_schema(provider: CatalogProviderRef, schema_name: &str) -> Result<SchemaProviderRef> {
provider
.schema(schema_name)
.context(error::CatalogSnafu)?
.context(error::SchemaNotFoundSnafu {
schema_info: schema_name,
})
}
fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
ensure!(
self.catalog_manager

View File

@@ -12,115 +12,88 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::{Column, InsertRequest as GrpcInsertRequest};
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_grpc_expr::column_to_vector;
use servers::error as server_error;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::InfluxdbLineProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;
use snafu::ResultExt;
use crate::error;
use crate::error::{InsertBatchToRequestSnafu, Result};
use crate::instance::Instance;
#[async_trait]
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
match self.mode {
Mode::Standalone => {
self.handle_inserts(request.try_into()?)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
query: &request.lines,
})?;
}
Mode::Distributed => {
self.dist_insert(request.try_into()?)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
}
}
let requests = request.try_into()?;
self.handle_inserts(requests)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{request:?}"),
})?;
Ok(())
}
}
impl Instance {
pub(crate) async fn dist_insert(&self, inserts: Vec<GrpcInsertRequest>) -> Result<usize> {
let mut joins = Vec::with_capacity(inserts.len());
let catalog_name = DEFAULT_CATALOG_NAME;
#[cfg(test)]
mod test {
use std::sync::Arc;
for insert in inserts {
let self_clone = self.clone();
use common_query::Output;
use common_recordbatch::RecordBatches;
use servers::query_handler::SqlQueryHandler;
use session::context::QueryContext;
let schema_name = insert.schema_name.to_string();
let table_name = insert.table_name.to_string();
use super::*;
use crate::tests;
let columns = &insert.columns;
let row_count = insert.row_count;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_put_influxdb_lines() {
let standalone =
tests::create_standalone_instance("test_standalone_put_influxdb_lines").await;
let instance = &standalone.instance;
self.create_or_alter_table_on_demand(catalog_name, &schema_name, &table_name, columns)
.await?;
let request = Self::columns_to_request(
catalog_name,
&schema_name,
&table_name,
columns,
row_count,
)?;
// TODO(fys): need a separate runtime here
let self_clone = self_clone.clone();
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(catalog_name)?;
let schema = Self::get_schema(catalog, &schema_name)?;
let table = schema
.table(&table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu { table_name })?;
table.insert(request).await.context(error::TableSnafu)
});
joins.push(join);
}
let mut affected = 0;
for join in joins {
affected += join.await.context(error::JoinTaskSnafu)??;
}
Ok(affected)
test_put_influxdb_lines(instance).await;
}
fn columns_to_request(
catalog_name: &str,
schema_name: &str,
table_name: &str,
columns: &[Column],
row_count: u32,
) -> Result<InsertRequest> {
let mut vectors = HashMap::with_capacity(columns.len());
for col in columns {
let vector = column_to_vector(col, row_count).context(InsertBatchToRequestSnafu)?;
vectors.insert(col.column_name.clone(), vector);
}
Ok(InsertRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
columns_values: vectors,
})
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines() {
let instance =
tests::create_distributed_instance("test_distributed_put_influxdb_lines").await;
let instance = &instance.frontend;
test_put_influxdb_lines(instance).await;
}
async fn test_put_influxdb_lines(instance: &Arc<Instance>) {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001";
let request = InfluxdbRequest {
precision: None,
db: "public".to_string(),
lines: lines.to_string(),
};
instance.exec(&request).await.unwrap();
let mut output = instance
.do_query(
"SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
QueryContext::arc(),
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-------------------------+-------+------+--------+
| ts | host | cpu | memory |
+-------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024 |
| 2022-09-22T09:54:56.400 | host2 | | 1027 |
+-------------------------+-------+------+--------+"
);
}
}

View File

@@ -14,9 +14,9 @@
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::prelude::*;
use crate::instance::Instance;
@@ -24,30 +24,8 @@ use crate::instance::Instance;
#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> {
// TODO(LFC): Insert metrics in batch, then make OpentsdbLineProtocolHandler::exec received multiple data points, when
// metric table and tags can be created upon insertion.
match self.mode {
Mode::Standalone => {
self.insert_opentsdb_metric(data_point).await?;
}
Mode::Distributed => {
self.dist_insert(vec![data_point.as_grpc_insert()])
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
}
}
Ok(())
}
}
impl Instance {
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> server_error::Result<()> {
let insert_expr = data_point.as_grpc_insert();
self.handle_insert(insert_expr)
let request = data_point.as_grpc_insert();
self.handle_insert(request)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
@@ -71,29 +49,22 @@ mod tests {
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_exec() {
let standalone = tests::create_standalone_instance("test_exec").await;
let instance = standalone.instance;
instance
.exec(
&DataPoint::try_create(
"put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0",
)
.unwrap(),
)
.await
.unwrap();
instance
.exec(&DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap())
.await
.unwrap();
async fn test_standalone_exec() {
let standalone = tests::create_standalone_instance("test_standalone_exec").await;
let instance = &standalone.instance;
test_exec(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_opentsdb_metric() {
let standalone = tests::create_standalone_instance("test_insert_opentsdb_metric").await;
let instance = standalone.instance;
async fn test_distributed_exec() {
let distributed = tests::create_distributed_instance("test_distributed_exec").await;
let instance = &distributed.frontend;
test_exec(instance).await;
}
async fn test_exec(instance: &Arc<Instance>) {
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
@@ -104,7 +75,7 @@ mod tests {
],
);
// should create new table "my_metric_1" directly
let result = instance.insert_opentsdb_metric(&data_point1).await;
let result = instance.exec(&data_point1).await;
assert!(result.is_ok());
let data_point2 = DataPoint::new(
@@ -117,12 +88,12 @@ mod tests {
],
);
// should create new column "tagk3" directly
let result = instance.insert_opentsdb_metric(&data_point2).await;
let result = instance.exec(&data_point2).await;
assert!(result.is_ok());
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
let result = instance.insert_opentsdb_metric(&data_point3).await;
let result = instance.exec(&data_point3).await;
assert!(result.is_ok());
let output = instance

View File

@@ -23,8 +23,7 @@ use common_telemetry::logging;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::Mode;
use servers::query_handler::{GrpcQueryHandler, PrometheusProtocolHandler, PrometheusResponse};
use snafu::{OptionExt, ResultExt};
use crate::instance::Instance;
@@ -89,7 +88,6 @@ impl Instance {
})),
};
let object_result = self
.grpc_query_handler
.do_query(query)
.await?
.try_into()
@@ -106,24 +104,12 @@ impl Instance {
impl PrometheusProtocolHandler for Instance {
async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> {
let requests = prometheus::to_grpc_insert_requests(database, request.clone())?;
match self.mode {
Mode::Standalone => {
self.handle_inserts(requests)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{request:?}"),
})?;
}
Mode::Distributed => {
self.dist_insert(requests)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{request:?}"),
})?;
}
}
self.handle_inserts(requests)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{request:?}"),
})?;
Ok(())
}
@@ -167,6 +153,8 @@ impl PrometheusProtocolHandler for Instance {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, LabelMatcher, Sample};
use servers::query_handler::SqlQueryHandler;
@@ -176,11 +164,24 @@ mod tests {
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_prometheus_remote_write_and_read() {
async fn test_standalone_prometheus_remote_rw() {
let standalone =
tests::create_standalone_instance("test_prometheus_remote_write_and_read").await;
let instance = standalone.instance;
tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await;
let instance = &standalone.instance;
test_prometheus_remote_rw(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_prometheus_remote_rw() {
let distributed =
tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await;
let instance = &distributed.frontend;
test_prometheus_remote_rw(instance).await;
}
async fn test_prometheus_remote_rw(instance: &Arc<Instance>) {
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
..Default::default()
@@ -188,12 +189,15 @@ mod tests {
let db = "prometheus";
assert!(instance
.do_query("CREATE DATABASE prometheus", QueryContext::arc())
.await
.get(0)
.unwrap()
.is_ok());
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
"CREATE DATABASE prometheus",
QueryContext::arc()
)
.await
.get(0)
.unwrap()
.is_ok());
instance.write(db, write_request).await.unwrap();

View File

@@ -18,14 +18,13 @@ use api::v1::InsertRequest as GrpcInsertRequest;
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
use table::requests::InsertRequest;
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu};
use crate::line_writer::LineWriter;
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
#[derive(Debug)]
pub struct InfluxdbRequest {
pub precision: Option<Precision>,
pub db: String,
@@ -34,58 +33,6 @@ pub struct InfluxdbRequest {
type TableName = String;
impl TryFrom<&InfluxdbRequest> for Vec<InsertRequest> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
.context(InfluxdbLineProtocolSnafu)?;
let line_len = lines.len();
let mut writers: HashMap<TableName, LineWriter> = HashMap::new();
let db = &value.db;
for line in lines {
let table_name = line.series.measurement;
let writer = writers
.entry(table_name.to_string())
.or_insert_with(|| LineWriter::with_lines(db, table_name, line_len));
let tags = line.series.tag_set;
if let Some(tags) = tags {
for (k, v) in tags {
writer.write_tag(k.as_str(), v.as_str());
}
}
let fields = line.field_set;
for (k, v) in fields {
let column_name = k.as_str();
match v {
FieldValue::I64(value) => writer.write_i64(column_name, value),
FieldValue::U64(value) => writer.write_u64(column_name, value),
FieldValue::F64(value) => writer.write_f64(column_name, value),
FieldValue::String(value) => writer.write_string(column_name, value.as_str()),
FieldValue::Boolean(value) => writer.write_bool(column_name, value),
}
}
if let Some(timestamp) = line.timestamp {
let precision = if let Some(val) = &value.precision {
*val
} else {
DEFAULT_TIME_PRECISION
};
writer.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision));
}
writer.commit();
}
Ok(writers.into_values().map(|x| x.finish()).collect())
}
}
// TODO(fys): will remove in the future.
impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
type Error = Error;
@@ -177,16 +124,9 @@ impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType};
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::value::Value;
use datatypes::vectors::Vector;
use table::requests::InsertRequest;
use super::*;
use crate::influxdb::InfluxdbRequest;
@@ -197,32 +137,6 @@ mod tests {
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
db: "influxdb".to_string(),
precision: None,
lines: lines.to_string(),
};
let insert_reqs: Vec<InsertRequest> = influxdb_req.try_into().unwrap();
for insert_req in insert_reqs {
assert_eq!("influxdb", insert_req.schema_name);
match &insert_req.table_name[..] {
"monitor1" => assert_table_1(&insert_req),
"monitor2" => assert_table_2(&insert_req),
_ => panic!(),
}
}
}
// TODO(fys): will remove in the future.
#[test]
fn test_convert_influxdb_lines_1() {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
@@ -244,77 +158,6 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
}
fn assert_table_1(insert_req: &InsertRequest) {
let table_name = &insert_req.table_name;
assert_eq!("monitor1", table_name);
let columns = &insert_req.columns_values;
let host = columns.get("host").unwrap();
let expected: Vec<Value> = vec!["host1".into(), "host2".into()];
assert_vector(&expected, host);
let cpu = columns.get("cpu").unwrap();
let expected: Vec<Value> = vec![66.6.into(), Value::Null];
assert_vector(&expected, cpu);
let memory = columns.get("memory").unwrap();
let expected: Vec<Value> = vec![1024.0.into(), 1027.0.into()];
assert_vector(&expected, memory);
let ts = columns.get("ts").unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496100,
TimeUnit::Millisecond,
)),
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496400,
TimeUnit::Millisecond,
)),
];
assert_vector(&expected, ts);
}
fn assert_table_2(insert_req: &InsertRequest) {
let table_name = &insert_req.table_name;
assert_eq!("monitor2", table_name);
let columns = &insert_req.columns_values;
let host = columns.get("host").unwrap();
let expected: Vec<Value> = vec!["host3".into(), "host4".into()];
assert_vector(&expected, host);
let cpu = columns.get("cpu").unwrap();
let expected: Vec<Value> = vec![66.5.into(), 66.3.into()];
assert_vector(&expected, cpu);
let memory = columns.get("memory").unwrap();
let expected: Vec<Value> = vec![Value::Null, 1029.0.into()];
assert_vector(&expected, memory);
let ts = columns.get("ts").unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496100,
TimeUnit::Millisecond,
)),
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496400,
TimeUnit::Millisecond,
)),
];
assert_vector(&expected, ts);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
fn assert_monitor_1(columns: &[Column]) {
assert_eq!(4, columns.len());
verify_column(

View File

@@ -15,11 +15,8 @@
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use table::requests::InsertRequest;
use crate::error::{self, Result};
use crate::line_writer::LineWriter;
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value";
@@ -128,24 +125,6 @@ impl DataPoint {
self.value
}
pub fn as_insert_request(&self) -> InsertRequest {
let mut line_writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, self.metric.clone(), 1);
line_writer.write_ts(
OPENTSDB_TIMESTAMP_COLUMN_NAME,
(self.ts_millis(), Precision::Millisecond),
);
line_writer.write_f64(OPENTSDB_VALUE_COLUMN_NAME, self.value);
for (tagk, tagv) in self.tags.iter() {
line_writer.write_tag(tagk, tagv);
}
line_writer.commit();
line_writer.finish()
}
// TODO(LFC): opentsdb and influxdb insertions should go through the Table trait directly.
// Currently: line protocol -> grpc request -> grpc interface -> table trait
pub fn as_grpc_insert(&self) -> GrpcInsertRequest {
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let mut columns = Vec::with_capacity(2 + self.tags.len());
@@ -211,13 +190,6 @@ impl DataPoint {
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::value::Value;
use datatypes::vectors::Vector;
use super::*;
#[test]
@@ -277,43 +249,6 @@ mod test {
);
}
#[test]
fn test_as_insert_request() {
let data_point = DataPoint {
metric: "my_metric_1".to_string(),
ts_millis: 1000,
value: 1.0,
tags: vec![
("tagk1".to_string(), "tagv1".to_string()),
("tagk2".to_string(), "tagv2".to_string()),
],
};
let insert_request = data_point.as_insert_request();
assert_eq!("my_metric_1", insert_request.table_name);
let columns = insert_request.columns_values;
assert_eq!(4, columns.len());
let ts = columns.get(OPENTSDB_TIMESTAMP_COLUMN_NAME).unwrap();
let expected = vec![datatypes::prelude::Value::Timestamp(Timestamp::new(
1000,
TimeUnit::Millisecond,
))];
assert_vector(&expected, ts);
let val = columns.get(OPENTSDB_VALUE_COLUMN_NAME).unwrap();
assert_vector(&[1.0.into()], val);
let tagk1 = columns.get("tagk1").unwrap();
assert_vector(&["tagv1".into()], tagk1);
let tagk2 = columns.get("tagk2").unwrap();
assert_vector(&["tagv2".into()], tagk2);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
// TODO(fys): will remove in the future.
#[test]
fn test_as_grpc_insert() {
let data_point = DataPoint {

View File

@@ -21,17 +21,14 @@ use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use common_grpc::writer::Precision::Millisecond;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{ensure, OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use table::requests::InsertRequest;
use crate::error::{self, Result};
use crate::line_writer::LineWriter;
const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
const VALUE_COLUMN_NAME: &str = "greptime_value";
@@ -287,58 +284,6 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
Ok(timeseries_map.into_values().collect())
}
/// Cast a remote write request into InsertRequest
pub fn write_request_to_insert_reqs(
db: &str,
mut request: WriteRequest,
) -> Result<Vec<InsertRequest>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(|timeseries| timeseries_to_insert_request(db, timeseries))
.collect()
}
fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result<InsertRequest> {
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
let mut table_name = None;
for label in &labels {
// The metric name is a special label
if label.name == METRIC_NAME_LABEL {
table_name = Some(&label.value);
}
}
let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?;
let row_count = samples.len();
let mut line_writer = LineWriter::with_lines(db, table_name, row_count);
for sample in samples {
let ts_millis = sample.timestamp;
let val = sample.value;
line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, Millisecond));
line_writer.write_f64(VALUE_COLUMN_NAME, val);
labels
.iter()
.filter(|label| label.name != METRIC_NAME_LABEL)
.for_each(|label| {
line_writer.write_tag(&label.name, &label.value);
});
line_writer.commit();
}
Ok(line_writer.finish())
}
// TODO(fys): it will remove in the future.
pub fn to_grpc_insert_requests(
database: &str,
mut request: WriteRequest,
@@ -351,7 +296,6 @@ pub fn to_grpc_insert_requests(
.collect()
}
// TODO(fys): it will remove in the future.
fn to_grpc_insert_request(database: &str, mut timeseries: TimeSeries) -> Result<GrpcInsertRequest> {
let schema_name = database.to_string();
@@ -506,11 +450,8 @@ mod tests {
use std::sync::Arc;
use api::prometheus::remote::LabelMatcher;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::value::Value;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector, Vector};
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use super::*;
@@ -570,94 +511,6 @@ mod tests {
assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
}
#[test]
fn test_write_request_to_insert_reqs() {
let write_request = WriteRequest {
timeseries: mock_timeseries(),
..Default::default()
};
let reqs = write_request_to_insert_reqs("public", write_request).unwrap();
assert_eq!(3, reqs.len());
let req1 = reqs.get(0).unwrap();
assert_eq!("public", req1.schema_name);
assert_eq!("metric1", req1.table_name);
let columns = &req1.columns_values;
let job = columns.get("job").unwrap();
let expected: Vec<Value> = vec!["spark".into(), "spark".into()];
assert_vector(&expected, job);
let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)),
];
assert_vector(&expected, ts);
let val = columns.get(VALUE_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![1.0_f64.into(), 2.0_f64.into()];
assert_vector(&expected, val);
let req2 = reqs.get(1).unwrap();
assert_eq!("public", req2.schema_name);
assert_eq!("metric2", req2.table_name);
let columns = &req2.columns_values;
let instance = columns.get("instance").unwrap();
let expected: Vec<Value> = vec!["test_host1".into(), "test_host1".into()];
assert_vector(&expected, instance);
let idc = columns.get("idc").unwrap();
let expected: Vec<Value> = vec!["z001".into(), "z001".into()];
assert_vector(&expected, idc);
let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)),
];
assert_vector(&expected, ts);
let val = columns.get(VALUE_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![3.0_f64.into(), 4.0_f64.into()];
assert_vector(&expected, val);
let req3 = reqs.get(2).unwrap();
assert_eq!("public", req3.schema_name);
assert_eq!("metric3", req3.table_name);
let columns = &req3.columns_values;
let idc = columns.get("idc").unwrap();
let expected: Vec<Value> = vec!["z002".into(), "z002".into(), "z002".into()];
assert_vector(&expected, idc);
let app = columns.get("app").unwrap();
let expected: Vec<Value> = vec!["biz".into(), "biz".into(), "biz".into()];
assert_vector(&expected, app);
let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(3000, TimeUnit::Millisecond)),
];
assert_vector(&expected, ts);
let val = columns.get(VALUE_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![5.0_f64.into(), 6.0_f64.into(), 7.0_f64.into()];
assert_vector(&expected, val);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
#[test]
fn test_write_request_to_insert_exprs() {
let write_request = WriteRequest {