feat: row protocol support for opentsdb (#2623)

* feat: opentsdb row protocol

* fix: added commnets for num of rows and failure if output is not of affecetd rows

* fix: added extra 1 to number of columns

* fix: avoided cloning datapoints, took ownership instead

* fix: avoided cloning datapoints, took ownership instead

* fix: changed vecotr slice to vector

* fix: remove clone

* fix: combined datapoints and requests with zip instead of enumerating

---------

Co-authored-by: Ubuntu <ubuntu@ip-172-31-43-183.us-east-2.compute.internal>
This commit is contained in:
Baasit
2023-10-20 07:25:59 +01:00
committed by GitHub
parent e1dcf83326
commit 346b57cf10
9 changed files with 101 additions and 57 deletions

View File

@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::InsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -27,23 +27,27 @@ use crate::instance::Instance;
#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
async fn exec(
&self,
data_points: Vec<DataPoint>,
ctx: QueryContextRef,
) -> server_error::Result<usize> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Opentsdb)
.context(AuthSnafu)?;
let requests = InsertRequests {
inserts: vec![data_point.as_grpc_insert()],
};
let _ = self
.handle_inserts(requests, ctx)
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let output = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{data_point:?}"),
})?;
Ok(())
.context(servers::error::ExecuteGrpcQuerySnafu)?;
Ok(match output {
common_query::Output::AffectedRows(rows) => rows,
_ => unreachable!(),
})
}
}

View File

@@ -84,17 +84,19 @@ pub async fn put(
let summary = params.contains_key("summary");
let details = params.contains_key("details");
let data_points = parse_data_points(body).await?;
let data_point_requests = parse_data_points(body).await?;
let data_points = data_point_requests
.iter()
.map(|point| point.clone().into())
.collect::<Vec<_>>();
let response = if !summary && !details {
for data_point in data_points.into_iter() {
if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
}
(HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
} else {
@@ -108,15 +110,11 @@ pub async fn put(
},
};
for data_point in data_points.into_iter() {
let result = opentsdb_handler
.exec(&data_point.clone().into(), ctx.clone())
.await;
for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
match result {
Ok(()) => response.on_success(),
Err(e) => {
response.on_failed(data_point, e);
}
Ok(affected_rows) => response.on_success(affected_rows),
Err(e) => response.on_failed(request, e),
}
}
(
@@ -151,8 +149,8 @@ pub struct OpentsdbDebuggingResponse {
}
impl OpentsdbDebuggingResponse {
fn on_success(&mut self) {
self.success += 1;
fn on_success(&mut self, affected_rows: usize) {
self.success += affected_rows as i32;
}
fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {

View File

@@ -20,16 +20,20 @@ use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::error;
use futures::StreamExt;
use tokio::sync::broadcast;
use self::codec::DataPoint;
use crate::error::Result;
use crate::opentsdb::connection::Connection;
use crate::opentsdb::handler::Handler;
use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::row_writer::{self, MultiTableData};
use crate::server::{AbortableStream, BaseTcpServer, Server};
use crate::shutdown::Shutdown;
@@ -126,3 +130,38 @@ impl Server for OpentsdbServer {
OPENTSDB_SERVER
}
}
pub fn data_point_to_grpc_row_insert_requests(
data_points: Vec<DataPoint>,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_data = MultiTableData::new();
for mut data_point in data_points {
let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut());
let table_name = data_point.metric();
let value = data_point.value();
let timestamp = data_point.ts_millis();
// length of tags + 2 extra columns for greptime_timestamp and the value
let num_columns = tags.len() + 2;
let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1);
let mut one_row = table_data.alloc_one_row();
// tags
row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;
// value
row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
table_data,
TIMESTAMP_COLUMN_NAME,
Some(timestamp),
&mut one_row,
)?;
table_data.add_row(one_row);
}
Ok(multi_table_data.into_row_insert_requests())
}

View File

@@ -19,7 +19,7 @@ use crate::error::{self, Result};
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataPoint {
metric: String,
ts_millis: i64,
@@ -115,6 +115,10 @@ impl DataPoint {
&self.tags
}
pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.tags
}
pub fn ts_millis(&self) -> i64 {
self.ts_millis
}

View File

@@ -94,7 +94,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
match DataPoint::try_create(&line) {
Ok(data_point) => {
let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.output_msg()).await?;
}
@@ -128,8 +128,8 @@ mod tests {
#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
@@ -137,7 +137,7 @@ mod tests {
.fail();
}
self.tx.send(metric.to_string()).await.unwrap();
Ok(())
Ok(data_points.len())
}
}

View File

@@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler {
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
}
pub struct PromStoreResponse {

View File

@@ -51,7 +51,8 @@ impl GrpcQueryHandler for DummyInstance {
#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let data_point = data_points.first().unwrap();
if data_point.metric() == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
@@ -59,7 +60,7 @@ impl OpentsdbProtocolHandler for DummyInstance {
.fail();
}
let _ = self.tx.send(data_point.metric().to_string()).await;
Ok(())
Ok(data_points.len())
}
}
@@ -172,10 +173,7 @@ async fn test_opentsdb_put() {
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
);
assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]);
}
#[tokio::test]

View File

@@ -37,8 +37,8 @@ struct DummyOpentsdbInstance {
#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return server_error::InternalSnafu {
err_msg: "expected",
@@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
}
let i = metric.parse::<i32>().unwrap();
let _ = self.tx.send(i * i).await;
Ok(())
Ok(data_points.len())
}
}

View File

@@ -46,6 +46,8 @@ mod tests {
async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();
// should create new table "my_metric_1" directly
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
@@ -55,9 +57,8 @@ mod tests {
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
instance.exec(&data_point1, ctx.clone()).await.unwrap();
// should create new column "tagk3" directly
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
@@ -67,12 +68,12 @@ mod tests {
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
instance.exec(&data_point2, ctx.clone()).await.unwrap();
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
instance.exec(&data_point3, ctx.clone()).await.unwrap();
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
let data_points = vec![data_point1, data_point2, data_point3];
instance.exec(data_points, ctx.clone()).await.unwrap();
let output = instance
.do_query(
@@ -87,13 +88,13 @@ mod tests {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
"| 1970-01-01T00:00:03 | 3.0 | | | |",
"+---------------------+----------------+-------+-------+-------+",
"+-------+-------+----------------+---------------------+-------+",
"| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |",
"+-------+-------+----------------+---------------------+-------+",
"| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |",
"| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |",
"| | | 3.0 | 1970-01-01T00:00:03 | |",
"+-------+-------+----------------+---------------------+-------+",
]
.into_iter()
.join("\n");