feat: support write data via influxdb line protocol in frontend (#280)

* feat: support influxdb line protocol write
This commit is contained in:
fys
2022-09-29 17:08:08 +08:00
committed by GitHub
parent ed89cc3e21
commit fe8327fc78
24 changed files with 1131 additions and 15 deletions

17
Cargo.lock generated
View File

@@ -918,6 +918,8 @@ dependencies = [
"api",
"arrow2",
"async-trait",
"common-base",
"common-error",
"datafusion",
"snafu",
]
@@ -2257,6 +2259,17 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "influxdb_line_protocol"
version = "0.1.0"
source = "git+https://github.com/evenyag/influxdb_iox?branch=feat/line-protocol#10ef0d0b02705ac7518717390939fa3a9bcfcacc"
dependencies = [
"bytes",
"nom",
"smallvec",
"snafu",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -4693,8 +4706,10 @@ dependencies = [
"axum-test-helper",
"bytes",
"catalog",
"client",
"common-base",
"common-error",
"common-grpc",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -4704,6 +4719,7 @@ dependencies = [
"futures",
"hex",
"hyper",
"influxdb_line_protocol",
"metrics 0.20.1",
"mysql_async",
"num_cpus",
@@ -4715,6 +4731,7 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"table",
"test-util",
"tokio",
"tokio-postgres",

View File

@@ -2,6 +2,7 @@ use datatypes::prelude::ConcreteDataType;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::v1::column::Values;
use crate::v1::ColumnDataType;
#[derive(Debug, PartialEq, Eq)]
@@ -71,10 +72,140 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
}
}
impl Values {
pub fn with_capacity(datatype: ColumnDataType, capacity: usize) -> Self {
match datatype {
ColumnDataType::Boolean => Values {
bool_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Int8 => Values {
i8_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Int16 => Values {
i16_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Int32 => Values {
i32_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Int64 => Values {
i64_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Uint8 => Values {
u8_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Uint16 => Values {
u16_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Uint32 => Values {
u32_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Uint64 => Values {
u64_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Float32 => Values {
f32_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Float64 => Values {
f64_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Binary => Values {
binary_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::String => Values {
string_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Date => Values {
date_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Datetime => Values {
datetime_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Timestamp => Values {
ts_millis_values: Vec::with_capacity(capacity),
..Default::default()
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_values_with_capacity() {
let values = Values::with_capacity(ColumnDataType::Int8, 2);
let values = values.i8_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Int32, 2);
let values = values.i32_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Int64, 2);
let values = values.i64_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Uint8, 2);
let values = values.u8_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Uint32, 2);
let values = values.u32_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Uint64, 2);
let values = values.u64_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Float32, 2);
let values = values.f32_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Float64, 2);
let values = values.f64_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Binary, 2);
let values = values.binary_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Boolean, 2);
let values = values.bool_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::String, 2);
let values = values.string_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Date, 2);
let values = values.date_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Datetime, 2);
let values = values.datetime_values;
assert_eq!(2, values.capacity());
let values = Values::with_capacity(ColumnDataType::Timestamp, 2);
let values = values.ts_millis_values;
assert_eq!(2, values.capacity());
}
#[test]
fn test_concrete_datatype_from_column_datatype() {
assert_eq!(

View File

@@ -1,5 +1,6 @@
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::influxdb::InfluxdbOptions;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
@@ -47,6 +48,8 @@ struct StartCommand {
opentsdb_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
#[clap(short, long)]
influxdb_enable: Option<bool>,
}
impl StartCommand {
@@ -91,6 +94,9 @@ impl TryFrom<StartCommand> for FrontendOptions {
..Default::default()
});
}
if let Some(enable) = cmd.influxdb_enable {
opts.influxdb_options = Some(InfluxdbOptions { enable });
}
Ok(opts)
}
}
@@ -107,6 +113,7 @@ mod tests {
mysql_addr: Some("127.0.0.1:5678".to_string()),
postgres_addr: Some("127.0.0.1:5432".to_string()),
opentsdb_addr: Some("127.0.0.1:4321".to_string()),
influxdb_enable: Some(false),
config_file: None,
};
@@ -136,5 +143,7 @@ mod tests {
opts.opentsdb_options.as_ref().unwrap().runtime_size,
default_opts.opentsdb_options.as_ref().unwrap().runtime_size
);
assert!(!opts.influxdb_options.unwrap().enable);
}
}

View File

@@ -6,6 +6,8 @@ edition = "2021"
[dependencies]
api = { path = "../../api" }
async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -1,6 +1,11 @@
use std::any::Any;
use api::DecodeError;
use common_error::prelude::{ErrorExt, StatusCode};
use datafusion::error::DataFusionError;
use snafu::{Backtrace, Snafu};
use snafu::{Backtrace, ErrorCompat, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -31,4 +36,42 @@ pub enum Error {
source: DecodeError,
backtrace: Backtrace,
},
#[snafu(display(
"Write type mismatch, column name: {}, expected: {}, actual: {}",
column_name,
expected,
actual
))]
TypeMismatch {
column_name: String,
expected: String,
actual: String,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::EmptyPhysicalPlan { .. }
| Error::EmptyPhysicalExpr { .. }
| Error::MissingField { .. }
| Error::TypeMismatch { .. } => StatusCode::InvalidArguments,
Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => {
StatusCode::Unsupported
}
Error::NewProjection { .. } | Error::DecodePhysicalPlanNode { .. } => {
StatusCode::Internal
}
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1,5 +1,6 @@
pub mod error;
pub mod physical;
pub mod writer;
pub use error::Error;
pub use physical::{

View File

@@ -0,0 +1,367 @@
use std::collections::HashMap;
use api::v1::{
codec::InsertBatch,
column::{SemanticType, Values},
Column, ColumnDataType,
};
use common_base::BitVec;
use snafu::ensure;
use crate::error::{Result, TypeMismatchSnafu};
type ColumnName = String;
#[derive(Default)]
pub struct LinesWriter {
column_name_index: HashMap<ColumnName, usize>,
null_masks: Vec<BitVec>,
batch: InsertBatch,
lines: usize,
}
impl LinesWriter {
pub fn with_lines(lines: usize) -> Self {
Self {
lines,
..Default::default()
}
}
pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::Timestamp,
SemanticType::Timestamp,
);
ensure!(
column.datatype == Some(ColumnDataType::Timestamp.into()),
TypeMismatchSnafu {
column_name,
expected: "timestamp",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.ts_millis_values.push(to_ms_ts(value.1, value.0));
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> {
let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag);
ensure!(
column.datatype == Some(ColumnDataType::String.into()),
TypeMismatchSnafu {
column_name,
expected: "string",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.string_values.push(value.to_string());
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_u64(&mut self, column_name: &str, value: u64) -> Result<()> {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Uint64, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Uint64.into()),
TypeMismatchSnafu {
column_name,
expected: "u64",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.u64_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_i64(&mut self, column_name: &str, value: i64) -> Result<()> {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Int64, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Int64.into()),
TypeMismatchSnafu {
column_name,
expected: "i64",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.i64_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_f64(&mut self, column_name: &str, value: f64) -> Result<()> {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Float64, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Float64.into()),
TypeMismatchSnafu {
column_name,
expected: "f64",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.f64_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_string(&mut self, column_name: &str, value: &str) -> Result<()> {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::String, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::String.into()),
TypeMismatchSnafu {
column_name,
expected: "string",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.string_values.push(value.to_string());
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_bool(&mut self, column_name: &str, value: bool) -> Result<()> {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Boolean, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Boolean.into()),
TypeMismatchSnafu {
column_name,
expected: "boolean",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.bool_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn commit(&mut self) {
let batch = &mut self.batch;
batch.row_count += 1;
for i in 0..batch.columns.len() {
let null_mask = &mut self.null_masks[i];
if batch.row_count as usize > null_mask.len() {
null_mask.push(true);
}
}
}
pub fn finish(mut self) -> InsertBatch {
let null_masks = self.null_masks;
for (i, null_mask) in null_masks.into_iter().enumerate() {
let columns = &mut self.batch.columns;
columns[i].null_mask = null_mask.into_vec();
}
self.batch
}
fn mut_column(
&mut self,
column_name: &str,
datatype: ColumnDataType,
semantic_type: SemanticType,
) -> (usize, &mut Column) {
let column_names = &mut self.column_name_index;
let column_idx = match column_names.get(column_name) {
Some(i) => *i,
None => {
let new_idx = column_names.len();
let batch = &mut self.batch;
let to_insert = self.lines;
let mut null_mask = BitVec::with_capacity(to_insert);
null_mask.extend(BitVec::repeat(true, batch.row_count as usize));
self.null_masks.push(null_mask);
batch.columns.push(Column {
column_name: column_name.to_string(),
semantic_type: semantic_type.into(),
values: Some(Values::with_capacity(datatype, to_insert)),
datatype: Some(datatype.into()),
null_mask: Vec::default(),
});
column_names.insert(column_name.to_string(), new_idx);
new_idx
}
};
(column_idx, &mut self.batch.columns[column_idx])
}
}
fn to_ms_ts(p: Precision, ts: i64) -> i64 {
match p {
Precision::NANOSECOND => ts / 1_000_000,
Precision::MICROSECOND => ts / 1000,
Precision::MILLISECOND => ts,
Precision::SECOND => ts * 1000,
Precision::MINUTE => ts * 1000 * 60,
Precision::HOUR => ts * 1000 * 60 * 60,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Precision {
NANOSECOND,
MICROSECOND,
MILLISECOND,
SECOND,
MINUTE,
HOUR,
}
#[cfg(test)]
mod tests {
use api::v1::{column::SemanticType, ColumnDataType};
use common_base::BitVec;
use super::LinesWriter;
use crate::writer::{to_ms_ts, Precision};
#[test]
fn test_lines_writer() {
let mut writer = LinesWriter::with_lines(3);
writer.write_tag("host", "host1").unwrap();
writer.write_f64("cpu", 0.5).unwrap();
writer.write_f64("memory", 0.4).unwrap();
writer.write_string("name", "name1").unwrap();
writer
.write_ts("ts", (101011000, Precision::MILLISECOND))
.unwrap();
writer.commit();
writer.write_tag("host", "host2").unwrap();
writer
.write_ts("ts", (102011001, Precision::MILLISECOND))
.unwrap();
writer.write_bool("enable_reboot", true).unwrap();
writer.write_u64("year_of_service", 2).unwrap();
writer.write_i64("temperature", 4).unwrap();
writer.commit();
writer.write_tag("host", "host3").unwrap();
writer.write_f64("cpu", 0.4).unwrap();
writer.write_u64("cpu_core_num", 16).unwrap();
writer
.write_ts("ts", (103011002, Precision::MILLISECOND))
.unwrap();
writer.commit();
let insert_batch = writer.finish();
assert_eq!(3, insert_batch.row_count);
let columns = insert_batch.columns;
assert_eq!(9, columns.len());
let column = &columns[0];
assert_eq!("host", columns[0].column_name);
assert_eq!(Some(ColumnDataType::String as i32), column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
assert_eq!(
vec!["host1", "host2", "host3"],
column.values.as_ref().unwrap().string_values
);
verify_null_mask(&column.null_mask, vec![false, false, false]);
let column = &columns[1];
assert_eq!("cpu", column.column_name);
assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![0.5, 0.4], column.values.as_ref().unwrap().f64_values);
verify_null_mask(&column.null_mask, vec![false, true, false]);
let column = &columns[2];
assert_eq!("memory", column.column_name);
assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![0.4], column.values.as_ref().unwrap().f64_values);
verify_null_mask(&column.null_mask, vec![false, true, true]);
let column = &columns[3];
assert_eq!("name", column.column_name);
assert_eq!(Some(ColumnDataType::String as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec!["name1"], column.values.as_ref().unwrap().string_values);
verify_null_mask(&column.null_mask, vec![false, true, true]);
let column = &columns[4];
assert_eq!("ts", column.column_name);
assert_eq!(Some(ColumnDataType::Timestamp as i32), column.datatype);
assert_eq!(SemanticType::Timestamp as i32, column.semantic_type);
assert_eq!(
vec![101011000, 102011001, 103011002],
column.values.as_ref().unwrap().ts_millis_values
);
verify_null_mask(&column.null_mask, vec![false, false, false]);
let column = &columns[5];
assert_eq!("enable_reboot", column.column_name);
assert_eq!(Some(ColumnDataType::Boolean as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![true], column.values.as_ref().unwrap().bool_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[6];
assert_eq!("year_of_service", column.column_name);
assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![2], column.values.as_ref().unwrap().u64_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[7];
assert_eq!("temperature", column.column_name);
assert_eq!(Some(ColumnDataType::Int64 as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![4], column.values.as_ref().unwrap().i64_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[8];
assert_eq!("cpu_core_num", column.column_name);
assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![16], column.values.as_ref().unwrap().u64_values);
verify_null_mask(&column.null_mask, vec![true, true, false]);
}
fn verify_null_mask(data: &[u8], expected: Vec<bool>) {
let bitvec = BitVec::from_slice(data);
for (idx, b) in expected.iter().enumerate() {
assert_eq!(b, bitvec.get(idx).unwrap())
}
}
#[test]
fn test_to_ms() {
assert_eq!(100, to_ms_ts(Precision::NANOSECOND, 100110000));
assert_eq!(100110, to_ms_ts(Precision::MICROSECOND, 100110000));
assert_eq!(100110000, to_ms_ts(Precision::MILLISECOND, 100110000));
assert_eq!(
100110000 * 1000 * 60,
to_ms_ts(Precision::MINUTE, 100110000)
);
assert_eq!(
100110000 * 1000 * 60 * 60,
to_ms_ts(Precision::HOUR, 100110000)
);
}
}

View File

@@ -67,12 +67,10 @@ pub fn insertion_expr_to_request(
}
fn insert_batches(bytes_vec: Vec<Vec<u8>>) -> Result<Vec<InsertBatch>> {
let mut insert_batches = Vec::with_capacity(bytes_vec.len());
for bytes in bytes_vec {
insert_batches.push(bytes.deref().try_into().context(DecodeInsertSnafu)?);
}
Ok(insert_batches)
bytes_vec
.iter()
.map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu))
.collect()
}
fn add_values_to_builder(

View File

@@ -129,7 +129,6 @@ macro_rules! convert_arrow_array_to_grpc_vals {
_ => unimplemented!(),
}
};
}
pub fn values(arrays: &[Arc<dyn Array>]) -> Result<Values> {

View File

@@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::influxdb::InfluxdbOptions;
use crate::instance::Instance;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
@@ -17,6 +18,7 @@ pub struct FrontendOptions {
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
}
impl Default for FrontendOptions {
@@ -27,6 +29,7 @@ impl Default for FrontendOptions {
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
}
}
}

View File

@@ -0,0 +1,23 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InfluxdbOptions {
pub enable: bool,
}
impl Default for InfluxdbOptions {
fn default() -> Self {
Self { enable: true }
}
}
#[cfg(test)]
mod tests {
use super::InfluxdbOptions;
#[test]
fn test_influxdb_options() {
let default = InfluxdbOptions::default();
assert!(default.enable);
}
}

View File

@@ -1,3 +1,4 @@
mod influxdb;
mod opentsdb;
use std::collections::HashMap;

View File

@@ -0,0 +1,45 @@
use api::v1::{insert_expr::Expr, InsertExpr};
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::influxdb::InfluxdbRequest;
use servers::{
error::ExecuteQuerySnafu, influxdb::InsertBatches, query_handler::InfluxdbLineProtocolHandler,
};
use snafu::ResultExt;
use crate::error::RequestDatanodeSnafu;
use crate::error::Result;
use crate::instance::Instance;
#[async_trait]
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
// TODO(fys): use batch insert
self.do_insert(request.try_into()?)
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu {
query: &request.lines,
})?;
Ok(())
}
}
impl Instance {
async fn do_insert(&self, insert_batches: InsertBatches) -> Result<()> {
for (table_name, batch) in insert_batches.data {
let expr = Expr::Values(api::v1::insert_expr::Values {
values: vec![batch.into()],
});
let _object_result = self
.db
.insert(InsertExpr {
table_name,
expr: Some(expr),
})
.await
.context(RequestDatanodeSnafu)?;
}
Ok(())
}
}

View File

@@ -2,6 +2,7 @@
pub mod error;
pub mod frontend;
pub mod influxdb;
pub mod instance;
pub mod mysql;
pub mod opentsdb;

View File

@@ -13,6 +13,7 @@ use tokio::try_join;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::InstanceRef;
pub(crate) struct Services;
@@ -91,6 +92,12 @@ impl Services {
if opentsdb_server_and_addr.is_some() {
http_server.set_opentsdb_handler(instance.clone());
}
if matches!(
opts.influxdb_options,
Some(InfluxdbOptions { enable: true })
) {
http_server.set_influxdb_handler(instance.clone());
}
Some((Box::new(http_server) as _, http_addr))
} else {

View File

@@ -9,7 +9,10 @@ async-trait = "0.1"
axum = "0.6.0-rc.2"
axum-macros = "0.3.0-rc.1"
bytes = "1.2"
client = { path = "../client" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
@@ -19,6 +22,7 @@ datatypes = { path = "../datatypes" }
futures = "0.3"
hex = { version = "0.4" }
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
metrics = "0.20"
num_cpus = "1.13"
opensrv-mysql = "0.1"
@@ -27,6 +31,7 @@ query = { path = "../query" }
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
tokio = { version = "1.20", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"

View File

@@ -2,8 +2,10 @@ use std::any::Any;
use std::net::SocketAddr;
use axum::http::StatusCode as HttpStatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use axum::{
response::{IntoResponse, Response},
Json,
};
use common_error::prelude::*;
use serde_json::json;
@@ -76,6 +78,21 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to parse InfluxDB line protocol, source: {}", source))]
InfluxdbLineProtocol {
#[snafu(backtrace)]
source: influxdb_line_protocol::Error,
},
#[snafu(display("Failed to write InfluxDB line protocol, source: {}", source))]
InfluxdbLinesWrite {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
#[snafu(display("Failed to convert time precision, name: {}", name))]
TimePrecision { name: String, backtrace: Backtrace },
#[snafu(display("Connection reset by peer"))]
ConnResetByPeer { backtrace: Backtrace },
@@ -128,10 +145,13 @@ impl ErrorExt for Error {
NotSupported { .. }
| InvalidQuery { .. }
| InfluxdbLineProtocol { .. }
| ConnResetByPeer { .. }
| InvalidOpentsdbLine { .. }
| InvalidOpentsdbJsonRequest { .. } => StatusCode::InvalidArguments,
| InvalidOpentsdbJsonRequest { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. } => source.status_code(),
Hyper { .. } => StatusCode::Unknown,
}
}
@@ -160,9 +180,12 @@ impl From<std::io::Error> for Error {
impl IntoResponse for Error {
fn into_response(self) -> Response {
let (status, error_message) = match self {
Error::InvalidOpentsdbLine { .. }
Error::InfluxdbLineProtocol { .. }
| Error::InfluxdbLinesWrite { .. }
| Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::InvalidQuery { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()),
| Error::InvalidQuery { .. }
| Error::TimePrecision { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()),
_ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
};
let body = Json(json!({

View File

@@ -1,4 +1,5 @@
pub mod handler;
pub mod influxdb;
pub mod opentsdb;
use std::net::SocketAddr;
@@ -19,15 +20,17 @@ use snafu::ResultExt;
use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
use self::influxdb::influxdb_write;
use crate::error::{Result, StartHttpSnafu};
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::query_handler::SqlQueryHandlerRef;
use crate::query_handler::{InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef};
use crate::server::Server;
const HTTP_API_VERSION: &str = "v1";
pub struct HttpServer {
sql_handler: SqlQueryHandlerRef,
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
}
@@ -121,6 +124,7 @@ impl HttpServer {
Self {
sql_handler,
opentsdb_handler: None,
influxdb_handler: None,
}
}
@@ -132,6 +136,14 @@ impl HttpServer {
self.opentsdb_handler.get_or_insert(handler);
}
pub fn set_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) {
debug_assert!(
self.influxdb_handler.is_none(),
"Influxdb line protocol handler can be set only once!"
);
self.influxdb_handler.get_or_insert(handler);
}
pub fn make_app(&self) -> Router {
// TODO(LFC): Use released Axum.
// Axum version 0.6 introduces state within router, making router methods far more elegant
@@ -148,12 +160,20 @@ impl HttpServer {
let mut router = Router::new().nest(&format!("/{}", HTTP_API_VERSION), sql_router);
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
let opentsdb_router = Router::with_state(opentsdb_handler.clone())
let opentsdb_router = Router::with_state(opentsdb_handler)
.route("/api/put", routing::post(opentsdb::put));
router = router.nest(&format!("/{}/opentsdb", HTTP_API_VERSION), opentsdb_router);
}
// TODO(fys): Creating influxdb's database when we can create greptime schema.
if let Some(influxdb_handler) = self.influxdb_handler.clone() {
let influxdb_router =
Router::with_state(influxdb_handler).route("/write", routing::post(influxdb_write));
router = router.nest(&format!("/{}/influxdb", HTTP_API_VERSION), influxdb_router);
}
router
.route("/metrics", routing::get(handler::metrics))
// middlewares

View File

@@ -0,0 +1,59 @@
use std::collections::HashMap;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use common_grpc::writer::Precision;
use crate::error::Result;
use crate::error::TimePrecisionSnafu;
use crate::http::HttpResponse;
use crate::influxdb::InfluxdbRequest;
use crate::query_handler::InfluxdbLineProtocolHandlerRef;
#[axum_macros::debug_handler]
pub async fn influxdb_write(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(params): Query<HashMap<String, String>>,
lines: String,
) -> Result<(StatusCode, HttpResponse)> {
let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;
let request = InfluxdbRequest { precision, lines };
handler.exec(&request).await?;
Ok((StatusCode::NO_CONTENT, HttpResponse::Text("".to_string())))
}
fn parse_time_precision(value: &str) -> Result<Precision> {
match value {
"n" => Ok(Precision::NANOSECOND),
"u" => Ok(Precision::MICROSECOND),
"ms" => Ok(Precision::MILLISECOND),
"s" => Ok(Precision::SECOND),
"m" => Ok(Precision::MINUTE),
"h" => Ok(Precision::HOUR),
unknown => TimePrecisionSnafu {
name: unknown.to_string(),
}
.fail(),
}
}
#[cfg(test)]
mod tests {
use common_grpc::writer::Precision;
use crate::http::influxdb::parse_time_precision;
#[test]
fn test_parse_time_precision() {
assert_eq!(Precision::NANOSECOND, parse_time_precision("n").unwrap());
assert_eq!(Precision::MICROSECOND, parse_time_precision("u").unwrap());
assert_eq!(Precision::MILLISECOND, parse_time_precision("ms").unwrap());
assert_eq!(Precision::SECOND, parse_time_precision("s").unwrap());
assert_eq!(Precision::MINUTE, parse_time_precision("m").unwrap());
assert_eq!(Precision::HOUR, parse_time_precision("h").unwrap());
assert!(parse_time_precision("unknown").is_err());
}
}

269
src/servers/src/influxdb.rs Normal file
View File

@@ -0,0 +1,269 @@
use std::collections::HashMap;
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu};
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND;
pub struct InfluxdbRequest {
pub precision: Option<Precision>,
pub lines: String,
}
type TableName = String;
pub struct InsertBatches {
pub data: Vec<(TableName, api::v1::codec::InsertBatch)>,
}
impl TryFrom<&InfluxdbRequest> for InsertBatches {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> std::result::Result<Self, Self::Error> {
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
.context(InfluxdbLineProtocolSnafu)?;
let line_len = lines.len();
for line in lines {
let table_name = line.series.measurement;
let writer = writers
.entry(table_name.to_string())
.or_insert_with(|| LinesWriter::with_lines(line_len));
let tags = line.series.tag_set;
if let Some(tags) = tags {
for (k, v) in tags {
writer
.write_tag(k.as_str(), v.as_str())
.context(InfluxdbLinesWriteSnafu)?;
}
}
let fields = line.field_set;
for (k, v) in fields {
let column_name = k.as_str();
match v {
FieldValue::I64(value) => {
writer
.write_i64(column_name, value)
.context(InfluxdbLinesWriteSnafu)?;
}
FieldValue::U64(value) => {
writer
.write_u64(column_name, value)
.context(InfluxdbLinesWriteSnafu)?;
}
FieldValue::F64(value) => {
writer
.write_f64(column_name, value)
.context(InfluxdbLinesWriteSnafu)?;
}
FieldValue::String(value) => {
writer
.write_string(column_name, value.as_str())
.context(InfluxdbLinesWriteSnafu)?;
}
FieldValue::Boolean(value) => {
writer
.write_bool(column_name, value)
.context(InfluxdbLinesWriteSnafu)?;
}
}
}
if let Some(timestamp) = line.timestamp {
let precision = if let Some(val) = &value.precision {
*val
} else {
DEFAULT_TIME_PRECISION
};
writer
.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision))
.context(InfluxdbLinesWriteSnafu)?;
}
writer.commit();
}
Ok(InsertBatches {
data: writers
.into_iter()
.map(|(table_name, writer)| (table_name, writer.finish()))
.collect(),
})
}
}
#[cfg(test)]
mod tests {
use api::v1::{
codec::InsertBatch,
column::{SemanticType, Values},
Column, ColumnDataType,
};
use common_base::BitVec;
use super::InsertBatches;
use crate::influxdb::InfluxdbRequest;
#[test]
fn test_convert_influxdb_lines() {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
let insert_batches: InsertBatches = influxdb_req.try_into().unwrap();
let insert_batches = insert_batches.data;
assert_eq!(2, insert_batches.len());
for (table_name, insert_batch) in &insert_batches {
if table_name == "monitor1" {
assert_monitor_1(insert_batch);
} else if table_name == "monitor2" {
assert_monitor_2(insert_batch);
} else {
panic!()
}
}
}
fn assert_monitor_1(insert_batch: &InsertBatch) {
let columns = &insert_batch.columns;
assert_eq!(4, columns.len());
verify_column(
&columns[0],
"host",
ColumnDataType::String,
SemanticType::Tag,
Vec::new(),
Values {
string_values: vec!["host1".to_string(), "host2".to_string()],
..Default::default()
},
);
verify_column(
&columns[1],
"cpu",
ColumnDataType::Float64,
SemanticType::Field,
vec![false, true],
Values {
f64_values: vec![66.6],
..Default::default()
},
);
verify_column(
&columns[2],
"memory",
ColumnDataType::Float64,
SemanticType::Field,
Vec::new(),
Values {
f64_values: vec![1024.0, 1027.0],
..Default::default()
},
);
verify_column(
&columns[3],
"ts",
ColumnDataType::Timestamp,
SemanticType::Timestamp,
Vec::new(),
Values {
ts_millis_values: vec![1663840496100, 1663840496400],
..Default::default()
},
);
}
fn assert_monitor_2(insert_batch: &InsertBatch) {
let columns = &insert_batch.columns;
assert_eq!(4, columns.len());
verify_column(
&columns[0],
"host",
ColumnDataType::String,
SemanticType::Tag,
Vec::new(),
Values {
string_values: vec!["host3".to_string(), "host4".to_string()],
..Default::default()
},
);
verify_column(
&columns[1],
"cpu",
ColumnDataType::Float64,
SemanticType::Field,
Vec::new(),
Values {
f64_values: vec![66.5, 66.3],
..Default::default()
},
);
verify_column(
&columns[2],
"ts",
ColumnDataType::Timestamp,
SemanticType::Timestamp,
Vec::new(),
Values {
ts_millis_values: vec![1663840496100, 1663840496400],
..Default::default()
},
);
verify_column(
&columns[3],
"memory",
ColumnDataType::Float64,
SemanticType::Field,
vec![true, false],
Values {
f64_values: vec![1029.0],
..Default::default()
},
);
}
fn verify_column(
column: &Column,
name: &str,
datatype: ColumnDataType,
semantic_type: SemanticType,
null_mask: Vec<bool>,
vals: Values,
) {
assert_eq!(name, column.column_name);
assert_eq!(Some(datatype as i32), column.datatype);
assert_eq!(semantic_type as i32, column.semantic_type);
verify_null_mask(&column.null_mask, null_mask);
assert_eq!(Some(vals), column.values);
}
fn verify_null_mask(data: &[u8], expected: Vec<bool>) {
let bitvec = BitVec::from_slice(data);
for (idx, b) in expected.iter().enumerate() {
assert_eq!(b, bitvec.get(idx).unwrap())
}
}
}

View File

@@ -3,6 +3,7 @@
pub mod error;
pub mod grpc;
pub mod http;
pub mod influxdb;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;

View File

@@ -5,6 +5,7 @@ use async_trait::async_trait;
use common_query::Output;
use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
/// All query handler traits for various request protocols, like SQL or GRPC.
@@ -21,6 +22,7 @@ pub type SqlQueryHandlerRef = Arc<dyn SqlQueryHandler + Send + Sync>;
pub type GrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler + Send + Sync>;
pub type GrpcAdminHandlerRef = Arc<dyn GrpcAdminHandler + Send + Sync>;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
#[async_trait]
pub trait SqlQueryHandler {
@@ -39,6 +41,13 @@ pub trait GrpcAdminHandler {
async fn exec_admin_request(&self, expr: AdminExpr) -> Result<AdminResult>;
}
#[async_trait]
pub trait InfluxdbLineProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, request: &InfluxdbRequest) -> Result<()>;
}
#[async_trait]
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.

View File

@@ -0,0 +1,82 @@
use std::sync::Arc;
use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use servers::error::Result;
use servers::http::HttpServer;
use servers::influxdb::{InfluxdbRequest, InsertBatches};
use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
struct DummyInstance {
tx: mpsc::Sender<String>,
}
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest) -> Result<()> {
let batches: InsertBatches = request.try_into()?;
for (table_name, _) in batches.data {
let _ = self.tx.send(table_name).await;
}
Ok(())
}
}
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _query: &str) -> Result<Output> {
unimplemented!()
}
async fn insert_script(&self, _name: &str, _script: &str) -> Result<()> {
unimplemented!()
}
async fn execute_script(&self, _name: &str) -> Result<Output> {
unimplemented!()
}
}
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
server.set_influxdb_handler(instance);
server.make_app()
}
#[tokio::test]
async fn test_influxdb_write() {
let (tx, mut rx) = mpsc::channel(100);
let app = make_test_app(tx);
let client = TestClient::new(app);
// right request
let result = client
.post("/v1/influxdb/write")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// bad request
let result = client
.post("/v1/influxdb/write")
.body("monitor, host=host1 cpu=1.2 1664370459457010101")
.send()
.await;
assert_eq!(result.status(), 400);
assert!(!result.text().await.is_empty());
let mut metrics = vec![];
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(metrics, vec!["monitor".to_string()]);
}

View File

@@ -1,2 +1,3 @@
mod http_handler_test;
mod influxdb_test;
mod opentsdb_test;