From fe8327fc7840adca1a16ea34f47138defa1e9d41 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Thu, 29 Sep 2022 17:08:08 +0800 Subject: [PATCH] feat: support write data via influxdb line protocol in frontend (#280) * feat: support influxdb line protocol write --- Cargo.lock | 17 ++ src/api/src/helper.rs | 131 +++++++++ src/cmd/src/frontend.rs | 9 + src/common/grpc/Cargo.toml | 2 + src/common/grpc/src/error.rs | 45 ++- src/common/grpc/src/lib.rs | 1 + src/common/grpc/src/writer.rs | 367 ++++++++++++++++++++++++ src/datanode/src/server/grpc/insert.rs | 10 +- src/datanode/src/server/grpc/select.rs | 1 - src/frontend/src/frontend.rs | 3 + src/frontend/src/influxdb.rs | 23 ++ src/frontend/src/instance.rs | 1 + src/frontend/src/instance/influxdb.rs | 45 +++ src/frontend/src/lib.rs | 1 + src/frontend/src/server.rs | 7 + src/servers/Cargo.toml | 5 + src/servers/src/error.rs | 33 ++- src/servers/src/http.rs | 24 +- src/servers/src/http/influxdb.rs | 59 ++++ src/servers/src/influxdb.rs | 269 +++++++++++++++++ src/servers/src/lib.rs | 1 + src/servers/src/query_handler.rs | 9 + src/servers/tests/http/influxdb_test.rs | 82 ++++++ src/servers/tests/http/mod.rs | 1 + 24 files changed, 1131 insertions(+), 15 deletions(-) create mode 100644 src/common/grpc/src/writer.rs create mode 100644 src/frontend/src/influxdb.rs create mode 100644 src/frontend/src/instance/influxdb.rs create mode 100644 src/servers/src/http/influxdb.rs create mode 100644 src/servers/src/influxdb.rs create mode 100644 src/servers/tests/http/influxdb_test.rs diff --git a/Cargo.lock b/Cargo.lock index b4ef88c40e..15786de88d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -918,6 +918,8 @@ dependencies = [ "api", "arrow2", "async-trait", + "common-base", + "common-error", "datafusion", "snafu", ] @@ -2257,6 +2259,17 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "influxdb_line_protocol" +version = "0.1.0" +source = "git+https://github.com/evenyag/influxdb_iox?branch=feat/line-protocol#10ef0d0b02705ac7518717390939fa3a9bcfcacc" +dependencies = [ + "bytes", + "nom", + "smallvec", + "snafu", +] + [[package]] name = "instant" version = "0.1.12" @@ -4693,8 +4706,10 @@ dependencies = [ "axum-test-helper", "bytes", "catalog", + "client", "common-base", "common-error", + "common-grpc", "common-query", "common-recordbatch", "common-runtime", @@ -4704,6 +4719,7 @@ dependencies = [ "futures", "hex", "hyper", + "influxdb_line_protocol", "metrics 0.20.1", "mysql_async", "num_cpus", @@ -4715,6 +4731,7 @@ dependencies = [ "serde", "serde_json", "snafu", + "table", "test-util", "tokio", "tokio-postgres", diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 57a726c620..13c8afc346 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -2,6 +2,7 @@ use datatypes::prelude::ConcreteDataType; use snafu::prelude::*; use crate::error::{self, Result}; +use crate::v1::column::Values; use crate::v1::ColumnDataType; #[derive(Debug, PartialEq, Eq)] @@ -71,10 +72,140 @@ impl TryFrom for ColumnDataTypeWrapper { } } +impl Values { + pub fn with_capacity(datatype: ColumnDataType, capacity: usize) -> Self { + match datatype { + ColumnDataType::Boolean => Values { + bool_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int8 => Values { + i8_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int16 => Values { + i16_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int32 => Values { + i32_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int64 => Values { + i64_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint8 => Values { + u8_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint16 => Values { + u16_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint32 => Values { + u32_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint64 => Values { + u64_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Float32 => Values { + f32_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Float64 => Values { + f64_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Binary => Values { + binary_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::String => Values { + string_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Date => Values { + date_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Datetime => Values { + datetime_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Timestamp => Values { + ts_millis_values: Vec::with_capacity(capacity), + ..Default::default() + }, + } + } +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_values_with_capacity() { + let values = Values::with_capacity(ColumnDataType::Int8, 2); + let values = values.i8_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Int32, 2); + let values = values.i32_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Int64, 2); + let values = values.i64_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Uint8, 2); + let values = values.u8_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Uint32, 2); + let values = values.u32_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Uint64, 2); + let values = values.u64_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Float32, 2); + let values = values.f32_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Float64, 2); + let values = values.f64_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Binary, 2); + let values = values.binary_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Boolean, 2); + let values = values.bool_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::String, 2); + let values = values.string_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Date, 2); + let values = values.date_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Datetime, 2); + let values = values.datetime_values; + assert_eq!(2, values.capacity()); + + let values = Values::with_capacity(ColumnDataType::Timestamp, 2); + let values = values.ts_millis_values; + assert_eq!(2, values.capacity()); + } + #[test] fn test_concrete_datatype_from_column_datatype() { assert_eq!( diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 4da2dbf925..33050b45c1 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -1,5 +1,6 @@ use clap::Parser; use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::influxdb::InfluxdbOptions; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; @@ -47,6 +48,8 @@ struct StartCommand { opentsdb_addr: Option, #[clap(short, long)] config_file: Option, + #[clap(short, long)] + influxdb_enable: Option, } impl StartCommand { @@ -91,6 +94,9 @@ impl TryFrom for FrontendOptions { ..Default::default() }); } + if let Some(enable) = cmd.influxdb_enable { + opts.influxdb_options = Some(InfluxdbOptions { enable }); + } Ok(opts) } } @@ -107,6 +113,7 @@ mod tests { mysql_addr: Some("127.0.0.1:5678".to_string()), postgres_addr: Some("127.0.0.1:5432".to_string()), opentsdb_addr: Some("127.0.0.1:4321".to_string()), + influxdb_enable: Some(false), config_file: None, }; @@ -136,5 +143,7 @@ mod tests { opts.opentsdb_options.as_ref().unwrap().runtime_size, default_opts.opentsdb_options.as_ref().unwrap().runtime_size ); + + assert!(!opts.influxdb_options.unwrap().enable); } } diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index c9d49817f7..72f23fe0f2 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] api = { path = "../../api" } async-trait = "0.1" +common-base = { path = "../base" } +common-error = { path = "../error" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 992167e27e..9c31c3dad2 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -1,6 +1,11 @@ +use std::any::Any; + use api::DecodeError; +use common_error::prelude::{ErrorExt, StatusCode}; use datafusion::error::DataFusionError; -use snafu::{Backtrace, Snafu}; +use snafu::{Backtrace, ErrorCompat, Snafu}; + +pub type Result = std::result::Result; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -31,4 +36,42 @@ pub enum Error { source: DecodeError, backtrace: Backtrace, }, + + #[snafu(display( + "Write type mismatch, column name: {}, expected: {}, actual: {}", + column_name, + expected, + actual + ))] + TypeMismatch { + column_name: String, + expected: String, + actual: String, + backtrace: Backtrace, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::EmptyPhysicalPlan { .. } + | Error::EmptyPhysicalExpr { .. } + | Error::MissingField { .. } + | Error::TypeMismatch { .. } => StatusCode::InvalidArguments, + Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => { + StatusCode::Unsupported + } + Error::NewProjection { .. } | Error::DecodePhysicalPlanNode { .. } => { + StatusCode::Internal + } + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index 202f244b9d..4ba2ebd375 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; pub mod physical; +pub mod writer; pub use error::Error; pub use physical::{ diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs new file mode 100644 index 0000000000..2404b74cc1 --- /dev/null +++ b/src/common/grpc/src/writer.rs @@ -0,0 +1,367 @@ +use std::collections::HashMap; + +use api::v1::{ + codec::InsertBatch, + column::{SemanticType, Values}, + Column, ColumnDataType, +}; +use common_base::BitVec; +use snafu::ensure; + +use crate::error::{Result, TypeMismatchSnafu}; + +type ColumnName = String; + +#[derive(Default)] +pub struct LinesWriter { + column_name_index: HashMap, + null_masks: Vec, + batch: InsertBatch, + lines: usize, +} + +impl LinesWriter { + pub fn with_lines(lines: usize) -> Self { + Self { + lines, + ..Default::default() + } + } + + pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) -> Result<()> { + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Timestamp, + SemanticType::Timestamp, + ); + ensure!( + column.datatype == Some(ColumnDataType::Timestamp.into()), + TypeMismatchSnafu { + column_name, + expected: "timestamp", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.ts_millis_values.push(to_ms_ts(value.1, value.0)); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> { + let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag); + ensure!( + column.datatype == Some(ColumnDataType::String.into()), + TypeMismatchSnafu { + column_name, + expected: "string", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.string_values.push(value.to_string()); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn write_u64(&mut self, column_name: &str, value: u64) -> Result<()> { + let (idx, column) = + self.mut_column(column_name, ColumnDataType::Uint64, SemanticType::Field); + ensure!( + column.datatype == Some(ColumnDataType::Uint64.into()), + TypeMismatchSnafu { + column_name, + expected: "u64", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.u64_values.push(value); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn write_i64(&mut self, column_name: &str, value: i64) -> Result<()> { + let (idx, column) = + self.mut_column(column_name, ColumnDataType::Int64, SemanticType::Field); + ensure!( + column.datatype == Some(ColumnDataType::Int64.into()), + TypeMismatchSnafu { + column_name, + expected: "i64", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.i64_values.push(value); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn write_f64(&mut self, column_name: &str, value: f64) -> Result<()> { + let (idx, column) = + self.mut_column(column_name, ColumnDataType::Float64, SemanticType::Field); + ensure!( + column.datatype == Some(ColumnDataType::Float64.into()), + TypeMismatchSnafu { + column_name, + expected: "f64", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.f64_values.push(value); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn write_string(&mut self, column_name: &str, value: &str) -> Result<()> { + let (idx, column) = + self.mut_column(column_name, ColumnDataType::String, SemanticType::Field); + ensure!( + column.datatype == Some(ColumnDataType::String.into()), + TypeMismatchSnafu { + column_name, + expected: "string", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.string_values.push(value.to_string()); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn write_bool(&mut self, column_name: &str, value: bool) -> Result<()> { + let (idx, column) = + self.mut_column(column_name, ColumnDataType::Boolean, SemanticType::Field); + ensure!( + column.datatype == Some(ColumnDataType::Boolean.into()), + TypeMismatchSnafu { + column_name, + expected: "boolean", + actual: format!("{:?}", column.datatype) + } + ); + // It is safe to use unwrap here, because values has been initialized in mut_column() + let values = column.values.as_mut().unwrap(); + values.bool_values.push(value); + self.null_masks[idx].push(false); + Ok(()) + } + + pub fn commit(&mut self) { + let batch = &mut self.batch; + batch.row_count += 1; + + for i in 0..batch.columns.len() { + let null_mask = &mut self.null_masks[i]; + if batch.row_count as usize > null_mask.len() { + null_mask.push(true); + } + } + } + + pub fn finish(mut self) -> InsertBatch { + let null_masks = self.null_masks; + for (i, null_mask) in null_masks.into_iter().enumerate() { + let columns = &mut self.batch.columns; + columns[i].null_mask = null_mask.into_vec(); + } + self.batch + } + + fn mut_column( + &mut self, + column_name: &str, + datatype: ColumnDataType, + semantic_type: SemanticType, + ) -> (usize, &mut Column) { + let column_names = &mut self.column_name_index; + let column_idx = match column_names.get(column_name) { + Some(i) => *i, + None => { + let new_idx = column_names.len(); + let batch = &mut self.batch; + let to_insert = self.lines; + let mut null_mask = BitVec::with_capacity(to_insert); + null_mask.extend(BitVec::repeat(true, batch.row_count as usize)); + self.null_masks.push(null_mask); + batch.columns.push(Column { + column_name: column_name.to_string(), + semantic_type: semantic_type.into(), + values: Some(Values::with_capacity(datatype, to_insert)), + datatype: Some(datatype.into()), + null_mask: Vec::default(), + }); + column_names.insert(column_name.to_string(), new_idx); + new_idx + } + }; + (column_idx, &mut self.batch.columns[column_idx]) + } +} + +fn to_ms_ts(p: Precision, ts: i64) -> i64 { + match p { + Precision::NANOSECOND => ts / 1_000_000, + Precision::MICROSECOND => ts / 1000, + Precision::MILLISECOND => ts, + Precision::SECOND => ts * 1000, + Precision::MINUTE => ts * 1000 * 60, + Precision::HOUR => ts * 1000 * 60 * 60, + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Precision { + NANOSECOND, + MICROSECOND, + MILLISECOND, + SECOND, + MINUTE, + HOUR, +} + +#[cfg(test)] +mod tests { + use api::v1::{column::SemanticType, ColumnDataType}; + use common_base::BitVec; + + use super::LinesWriter; + use crate::writer::{to_ms_ts, Precision}; + + #[test] + fn test_lines_writer() { + let mut writer = LinesWriter::with_lines(3); + + writer.write_tag("host", "host1").unwrap(); + writer.write_f64("cpu", 0.5).unwrap(); + writer.write_f64("memory", 0.4).unwrap(); + writer.write_string("name", "name1").unwrap(); + writer + .write_ts("ts", (101011000, Precision::MILLISECOND)) + .unwrap(); + writer.commit(); + + writer.write_tag("host", "host2").unwrap(); + writer + .write_ts("ts", (102011001, Precision::MILLISECOND)) + .unwrap(); + writer.write_bool("enable_reboot", true).unwrap(); + writer.write_u64("year_of_service", 2).unwrap(); + writer.write_i64("temperature", 4).unwrap(); + writer.commit(); + + writer.write_tag("host", "host3").unwrap(); + writer.write_f64("cpu", 0.4).unwrap(); + writer.write_u64("cpu_core_num", 16).unwrap(); + writer + .write_ts("ts", (103011002, Precision::MILLISECOND)) + .unwrap(); + writer.commit(); + + let insert_batch = writer.finish(); + assert_eq!(3, insert_batch.row_count); + + let columns = insert_batch.columns; + assert_eq!(9, columns.len()); + + let column = &columns[0]; + assert_eq!("host", columns[0].column_name); + assert_eq!(Some(ColumnDataType::String as i32), column.datatype); + assert_eq!(SemanticType::Tag as i32, column.semantic_type); + assert_eq!( + vec!["host1", "host2", "host3"], + column.values.as_ref().unwrap().string_values + ); + verify_null_mask(&column.null_mask, vec![false, false, false]); + + let column = &columns[1]; + assert_eq!("cpu", column.column_name); + assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec![0.5, 0.4], column.values.as_ref().unwrap().f64_values); + verify_null_mask(&column.null_mask, vec![false, true, false]); + + let column = &columns[2]; + assert_eq!("memory", column.column_name); + assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec![0.4], column.values.as_ref().unwrap().f64_values); + verify_null_mask(&column.null_mask, vec![false, true, true]); + + let column = &columns[3]; + assert_eq!("name", column.column_name); + assert_eq!(Some(ColumnDataType::String as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec!["name1"], column.values.as_ref().unwrap().string_values); + verify_null_mask(&column.null_mask, vec![false, true, true]); + + let column = &columns[4]; + assert_eq!("ts", column.column_name); + assert_eq!(Some(ColumnDataType::Timestamp as i32), column.datatype); + assert_eq!(SemanticType::Timestamp as i32, column.semantic_type); + assert_eq!( + vec![101011000, 102011001, 103011002], + column.values.as_ref().unwrap().ts_millis_values + ); + verify_null_mask(&column.null_mask, vec![false, false, false]); + + let column = &columns[5]; + assert_eq!("enable_reboot", column.column_name); + assert_eq!(Some(ColumnDataType::Boolean as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec![true], column.values.as_ref().unwrap().bool_values); + verify_null_mask(&column.null_mask, vec![true, false, true]); + + let column = &columns[6]; + assert_eq!("year_of_service", column.column_name); + assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec![2], column.values.as_ref().unwrap().u64_values); + verify_null_mask(&column.null_mask, vec![true, false, true]); + + let column = &columns[7]; + assert_eq!("temperature", column.column_name); + assert_eq!(Some(ColumnDataType::Int64 as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec![4], column.values.as_ref().unwrap().i64_values); + verify_null_mask(&column.null_mask, vec![true, false, true]); + + let column = &columns[8]; + assert_eq!("cpu_core_num", column.column_name); + assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + assert_eq!(vec![16], column.values.as_ref().unwrap().u64_values); + verify_null_mask(&column.null_mask, vec![true, true, false]); + } + + fn verify_null_mask(data: &[u8], expected: Vec) { + let bitvec = BitVec::from_slice(data); + for (idx, b) in expected.iter().enumerate() { + assert_eq!(b, bitvec.get(idx).unwrap()) + } + } + + #[test] + fn test_to_ms() { + assert_eq!(100, to_ms_ts(Precision::NANOSECOND, 100110000)); + assert_eq!(100110, to_ms_ts(Precision::MICROSECOND, 100110000)); + assert_eq!(100110000, to_ms_ts(Precision::MILLISECOND, 100110000)); + assert_eq!( + 100110000 * 1000 * 60, + to_ms_ts(Precision::MINUTE, 100110000) + ); + assert_eq!( + 100110000 * 1000 * 60 * 60, + to_ms_ts(Precision::HOUR, 100110000) + ); + } +} diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 60bb309c38..e528ba3f9f 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -67,12 +67,10 @@ pub fn insertion_expr_to_request( } fn insert_batches(bytes_vec: Vec>) -> Result> { - let mut insert_batches = Vec::with_capacity(bytes_vec.len()); - - for bytes in bytes_vec { - insert_batches.push(bytes.deref().try_into().context(DecodeInsertSnafu)?); - } - Ok(insert_batches) + bytes_vec + .iter() + .map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu)) + .collect() } fn add_values_to_builder( diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 936b2eede4..dab83caff0 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -129,7 +129,6 @@ macro_rules! convert_arrow_array_to_grpc_vals { _ => unimplemented!(), } }; - } pub fn values(arrays: &[Arc]) -> Result { diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 9ca7e60081..29bf944ad4 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use snafu::prelude::*; use crate::error::{self, Result}; +use crate::influxdb::InfluxdbOptions; use crate::instance::Instance; use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; @@ -17,6 +18,7 @@ pub struct FrontendOptions { pub mysql_options: Option, pub postgres_options: Option, pub opentsdb_options: Option, + pub influxdb_options: Option, } impl Default for FrontendOptions { @@ -27,6 +29,7 @@ impl Default for FrontendOptions { mysql_options: Some(MysqlOptions::default()), postgres_options: Some(PostgresOptions::default()), opentsdb_options: Some(OpentsdbOptions::default()), + influxdb_options: Some(InfluxdbOptions::default()), } } } diff --git a/src/frontend/src/influxdb.rs b/src/frontend/src/influxdb.rs new file mode 100644 index 0000000000..da96bc9989 --- /dev/null +++ b/src/frontend/src/influxdb.rs @@ -0,0 +1,23 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct InfluxdbOptions { + pub enable: bool, +} + +impl Default for InfluxdbOptions { + fn default() -> Self { + Self { enable: true } + } +} + +#[cfg(test)] +mod tests { + use super::InfluxdbOptions; + + #[test] + fn test_influxdb_options() { + let default = InfluxdbOptions::default(); + assert!(default.enable); + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 38ada0f2c0..47f1253fd0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -1,3 +1,4 @@ +mod influxdb; mod opentsdb; use std::collections::HashMap; diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs new file mode 100644 index 0000000000..a0832c3727 --- /dev/null +++ b/src/frontend/src/instance/influxdb.rs @@ -0,0 +1,45 @@ +use api::v1::{insert_expr::Expr, InsertExpr}; +use async_trait::async_trait; +use common_error::prelude::BoxedError; +use servers::influxdb::InfluxdbRequest; +use servers::{ + error::ExecuteQuerySnafu, influxdb::InsertBatches, query_handler::InfluxdbLineProtocolHandler, +}; +use snafu::ResultExt; + +use crate::error::RequestDatanodeSnafu; +use crate::error::Result; +use crate::instance::Instance; + +#[async_trait] +impl InfluxdbLineProtocolHandler for Instance { + async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { + // TODO(fys): use batch insert + self.do_insert(request.try_into()?) + .await + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu { + query: &request.lines, + })?; + Ok(()) + } +} + +impl Instance { + async fn do_insert(&self, insert_batches: InsertBatches) -> Result<()> { + for (table_name, batch) in insert_batches.data { + let expr = Expr::Values(api::v1::insert_expr::Values { + values: vec![batch.into()], + }); + let _object_result = self + .db + .insert(InsertExpr { + table_name, + expr: Some(expr), + }) + .await + .context(RequestDatanodeSnafu)?; + } + Ok(()) + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 932e213a87..1c26a86ee1 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod frontend; +pub mod influxdb; pub mod instance; pub mod mysql; pub mod opentsdb; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 44674f791f..99686041d2 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -13,6 +13,7 @@ use tokio::try_join; use crate::error::{self, Result}; use crate::frontend::FrontendOptions; +use crate::influxdb::InfluxdbOptions; use crate::instance::InstanceRef; pub(crate) struct Services; @@ -91,6 +92,12 @@ impl Services { if opentsdb_server_and_addr.is_some() { http_server.set_opentsdb_handler(instance.clone()); } + if matches!( + opts.influxdb_options, + Some(InfluxdbOptions { enable: true }) + ) { + http_server.set_influxdb_handler(instance.clone()); + } Some((Box::new(http_server) as _, http_addr)) } else { diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 95852cd68c..6767d0fd17 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -9,7 +9,10 @@ async-trait = "0.1" axum = "0.6.0-rc.2" axum-macros = "0.3.0-rc.1" bytes = "1.2" +client = { path = "../client" } +common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } @@ -19,6 +22,7 @@ datatypes = { path = "../datatypes" } futures = "0.3" hex = { version = "0.4" } hyper = { version = "0.14", features = ["full"] } +influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } metrics = "0.20" num_cpus = "1.13" opensrv-mysql = "0.1" @@ -27,6 +31,7 @@ query = { path = "../query" } serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } +table = { path = "../table" } tokio = { version = "1.20", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 946b5e08f7..91312bf9e2 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -2,8 +2,10 @@ use std::any::Any; use std::net::SocketAddr; use axum::http::StatusCode as HttpStatusCode; -use axum::response::{IntoResponse, Response}; -use axum::Json; +use axum::{ + response::{IntoResponse, Response}, + Json, +}; use common_error::prelude::*; use serde_json::json; @@ -76,6 +78,21 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Failed to parse InfluxDB line protocol, source: {}", source))] + InfluxdbLineProtocol { + #[snafu(backtrace)] + source: influxdb_line_protocol::Error, + }, + + #[snafu(display("Failed to write InfluxDB line protocol, source: {}", source))] + InfluxdbLinesWrite { + #[snafu(backtrace)] + source: common_grpc::error::Error, + }, + + #[snafu(display("Failed to convert time precision, name: {}", name))] + TimePrecision { name: String, backtrace: Backtrace }, + #[snafu(display("Connection reset by peer"))] ConnResetByPeer { backtrace: Backtrace }, @@ -128,10 +145,13 @@ impl ErrorExt for Error { NotSupported { .. } | InvalidQuery { .. } + | InfluxdbLineProtocol { .. } | ConnResetByPeer { .. } | InvalidOpentsdbLine { .. } - | InvalidOpentsdbJsonRequest { .. } => StatusCode::InvalidArguments, + | InvalidOpentsdbJsonRequest { .. } + | TimePrecision { .. } => StatusCode::InvalidArguments, + InfluxdbLinesWrite { source, .. } => source.status_code(), Hyper { .. } => StatusCode::Unknown, } } @@ -160,9 +180,12 @@ impl From for Error { impl IntoResponse for Error { fn into_response(self) -> Response { let (status, error_message) = match self { - Error::InvalidOpentsdbLine { .. } + Error::InfluxdbLineProtocol { .. } + | Error::InfluxdbLinesWrite { .. } + | Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbJsonRequest { .. } - | Error::InvalidQuery { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()), + | Error::InvalidQuery { .. } + | Error::TimePrecision { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()), _ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()), }; let body = Json(json!({ diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a9922f49d1..9f31d19f6b 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1,4 +1,5 @@ pub mod handler; +pub mod influxdb; pub mod opentsdb; use std::net::SocketAddr; @@ -19,15 +20,17 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; +use self::influxdb::influxdb_write; use crate::error::{Result, StartHttpSnafu}; -use crate::query_handler::OpentsdbProtocolHandlerRef; use crate::query_handler::SqlQueryHandlerRef; +use crate::query_handler::{InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef}; use crate::server::Server; const HTTP_API_VERSION: &str = "v1"; pub struct HttpServer { sql_handler: SqlQueryHandlerRef, + influxdb_handler: Option, opentsdb_handler: Option, } @@ -121,6 +124,7 @@ impl HttpServer { Self { sql_handler, opentsdb_handler: None, + influxdb_handler: None, } } @@ -132,6 +136,14 @@ impl HttpServer { self.opentsdb_handler.get_or_insert(handler); } + pub fn set_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) { + debug_assert!( + self.influxdb_handler.is_none(), + "Influxdb line protocol handler can be set only once!" + ); + self.influxdb_handler.get_or_insert(handler); + } + pub fn make_app(&self) -> Router { // TODO(LFC): Use released Axum. // Axum version 0.6 introduces state within router, making router methods far more elegant @@ -148,12 +160,20 @@ impl HttpServer { let mut router = Router::new().nest(&format!("/{}", HTTP_API_VERSION), sql_router); if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { - let opentsdb_router = Router::with_state(opentsdb_handler.clone()) + let opentsdb_router = Router::with_state(opentsdb_handler) .route("/api/put", routing::post(opentsdb::put)); router = router.nest(&format!("/{}/opentsdb", HTTP_API_VERSION), opentsdb_router); } + // TODO(fys): Creating influxdb's database when we can create greptime schema. + if let Some(influxdb_handler) = self.influxdb_handler.clone() { + let influxdb_router = + Router::with_state(influxdb_handler).route("/write", routing::post(influxdb_write)); + + router = router.nest(&format!("/{}/influxdb", HTTP_API_VERSION), influxdb_router); + } + router .route("/metrics", routing::get(handler::metrics)) // middlewares diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs new file mode 100644 index 0000000000..b9c68fbd8f --- /dev/null +++ b/src/servers/src/http/influxdb.rs @@ -0,0 +1,59 @@ +use std::collections::HashMap; + +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use common_grpc::writer::Precision; + +use crate::error::Result; +use crate::error::TimePrecisionSnafu; +use crate::http::HttpResponse; +use crate::influxdb::InfluxdbRequest; +use crate::query_handler::InfluxdbLineProtocolHandlerRef; + +#[axum_macros::debug_handler] +pub async fn influxdb_write( + State(handler): State, + Query(params): Query>, + lines: String, +) -> Result<(StatusCode, HttpResponse)> { + let precision = params + .get("precision") + .map(|val| parse_time_precision(val)) + .transpose()?; + let request = InfluxdbRequest { precision, lines }; + handler.exec(&request).await?; + Ok((StatusCode::NO_CONTENT, HttpResponse::Text("".to_string()))) +} + +fn parse_time_precision(value: &str) -> Result { + match value { + "n" => Ok(Precision::NANOSECOND), + "u" => Ok(Precision::MICROSECOND), + "ms" => Ok(Precision::MILLISECOND), + "s" => Ok(Precision::SECOND), + "m" => Ok(Precision::MINUTE), + "h" => Ok(Precision::HOUR), + unknown => TimePrecisionSnafu { + name: unknown.to_string(), + } + .fail(), + } +} + +#[cfg(test)] +mod tests { + use common_grpc::writer::Precision; + + use crate::http::influxdb::parse_time_precision; + + #[test] + fn test_parse_time_precision() { + assert_eq!(Precision::NANOSECOND, parse_time_precision("n").unwrap()); + assert_eq!(Precision::MICROSECOND, parse_time_precision("u").unwrap()); + assert_eq!(Precision::MILLISECOND, parse_time_precision("ms").unwrap()); + assert_eq!(Precision::SECOND, parse_time_precision("s").unwrap()); + assert_eq!(Precision::MINUTE, parse_time_precision("m").unwrap()); + assert_eq!(Precision::HOUR, parse_time_precision("h").unwrap()); + assert!(parse_time_precision("unknown").is_err()); + } +} diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs new file mode 100644 index 0000000000..e25e0b214e --- /dev/null +++ b/src/servers/src/influxdb.rs @@ -0,0 +1,269 @@ +use std::collections::HashMap; + +use common_grpc::writer::{LinesWriter, Precision}; +use influxdb_line_protocol::{parse_lines, FieldValue}; +use snafu::ResultExt; + +use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu}; + +pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; +pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND; + +pub struct InfluxdbRequest { + pub precision: Option, + pub lines: String, +} + +type TableName = String; + +pub struct InsertBatches { + pub data: Vec<(TableName, api::v1::codec::InsertBatch)>, +} + +impl TryFrom<&InfluxdbRequest> for InsertBatches { + type Error = Error; + + fn try_from(value: &InfluxdbRequest) -> std::result::Result { + let mut writers: HashMap = HashMap::new(); + let lines = parse_lines(&value.lines) + .collect::>>() + .context(InfluxdbLineProtocolSnafu)?; + let line_len = lines.len(); + + for line in lines { + let table_name = line.series.measurement; + let writer = writers + .entry(table_name.to_string()) + .or_insert_with(|| LinesWriter::with_lines(line_len)); + + let tags = line.series.tag_set; + if let Some(tags) = tags { + for (k, v) in tags { + writer + .write_tag(k.as_str(), v.as_str()) + .context(InfluxdbLinesWriteSnafu)?; + } + } + + let fields = line.field_set; + for (k, v) in fields { + let column_name = k.as_str(); + match v { + FieldValue::I64(value) => { + writer + .write_i64(column_name, value) + .context(InfluxdbLinesWriteSnafu)?; + } + FieldValue::U64(value) => { + writer + .write_u64(column_name, value) + .context(InfluxdbLinesWriteSnafu)?; + } + FieldValue::F64(value) => { + writer + .write_f64(column_name, value) + .context(InfluxdbLinesWriteSnafu)?; + } + FieldValue::String(value) => { + writer + .write_string(column_name, value.as_str()) + .context(InfluxdbLinesWriteSnafu)?; + } + FieldValue::Boolean(value) => { + writer + .write_bool(column_name, value) + .context(InfluxdbLinesWriteSnafu)?; + } + } + } + + if let Some(timestamp) = line.timestamp { + let precision = if let Some(val) = &value.precision { + *val + } else { + DEFAULT_TIME_PRECISION + }; + writer + .write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision)) + .context(InfluxdbLinesWriteSnafu)?; + } + + writer.commit(); + } + + Ok(InsertBatches { + data: writers + .into_iter() + .map(|(table_name, writer)| (table_name, writer.finish())) + .collect(), + }) + } +} + +#[cfg(test)] +mod tests { + use api::v1::{ + codec::InsertBatch, + column::{SemanticType, Values}, + Column, ColumnDataType, + }; + use common_base::BitVec; + + use super::InsertBatches; + use crate::influxdb::InfluxdbRequest; + + #[test] + fn test_convert_influxdb_lines() { + let lines = r" +monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 +monitor1,host=host2 memory=1027 1663840496400340001 +monitor2,host=host3 cpu=66.5 1663840496100023102 +monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; + + let influxdb_req = &InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + + let insert_batches: InsertBatches = influxdb_req.try_into().unwrap(); + let insert_batches = insert_batches.data; + + assert_eq!(2, insert_batches.len()); + + for (table_name, insert_batch) in &insert_batches { + if table_name == "monitor1" { + assert_monitor_1(insert_batch); + } else if table_name == "monitor2" { + assert_monitor_2(insert_batch); + } else { + panic!() + } + } + } + + fn assert_monitor_1(insert_batch: &InsertBatch) { + let columns = &insert_batch.columns; + assert_eq!(4, columns.len()); + verify_column( + &columns[0], + "host", + ColumnDataType::String, + SemanticType::Tag, + Vec::new(), + Values { + string_values: vec!["host1".to_string(), "host2".to_string()], + ..Default::default() + }, + ); + + verify_column( + &columns[1], + "cpu", + ColumnDataType::Float64, + SemanticType::Field, + vec![false, true], + Values { + f64_values: vec![66.6], + ..Default::default() + }, + ); + + verify_column( + &columns[2], + "memory", + ColumnDataType::Float64, + SemanticType::Field, + Vec::new(), + Values { + f64_values: vec![1024.0, 1027.0], + ..Default::default() + }, + ); + + verify_column( + &columns[3], + "ts", + ColumnDataType::Timestamp, + SemanticType::Timestamp, + Vec::new(), + Values { + ts_millis_values: vec![1663840496100, 1663840496400], + ..Default::default() + }, + ); + } + + fn assert_monitor_2(insert_batch: &InsertBatch) { + let columns = &insert_batch.columns; + assert_eq!(4, columns.len()); + verify_column( + &columns[0], + "host", + ColumnDataType::String, + SemanticType::Tag, + Vec::new(), + Values { + string_values: vec!["host3".to_string(), "host4".to_string()], + ..Default::default() + }, + ); + + verify_column( + &columns[1], + "cpu", + ColumnDataType::Float64, + SemanticType::Field, + Vec::new(), + Values { + f64_values: vec![66.5, 66.3], + ..Default::default() + }, + ); + + verify_column( + &columns[2], + "ts", + ColumnDataType::Timestamp, + SemanticType::Timestamp, + Vec::new(), + Values { + ts_millis_values: vec![1663840496100, 1663840496400], + ..Default::default() + }, + ); + + verify_column( + &columns[3], + "memory", + ColumnDataType::Float64, + SemanticType::Field, + vec![true, false], + Values { + f64_values: vec![1029.0], + ..Default::default() + }, + ); + } + + fn verify_column( + column: &Column, + name: &str, + datatype: ColumnDataType, + semantic_type: SemanticType, + null_mask: Vec, + vals: Values, + ) { + assert_eq!(name, column.column_name); + assert_eq!(Some(datatype as i32), column.datatype); + assert_eq!(semantic_type as i32, column.semantic_type); + verify_null_mask(&column.null_mask, null_mask); + assert_eq!(Some(vals), column.values); + } + + fn verify_null_mask(data: &[u8], expected: Vec) { + let bitvec = BitVec::from_slice(data); + for (idx, b) in expected.iter().enumerate() { + assert_eq!(b, bitvec.get(idx).unwrap()) + } + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 5b75f6e4b7..af3b257882 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -3,6 +3,7 @@ pub mod error; pub mod grpc; pub mod http; +pub mod influxdb; pub mod mysql; pub mod opentsdb; pub mod postgres; diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 74400a46b3..690bbf1050 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use common_query::Output; use crate::error::Result; +use crate::influxdb::InfluxdbRequest; use crate::opentsdb::codec::DataPoint; /// All query handler traits for various request protocols, like SQL or GRPC. @@ -21,6 +22,7 @@ pub type SqlQueryHandlerRef = Arc; pub type GrpcQueryHandlerRef = Arc; pub type GrpcAdminHandlerRef = Arc; pub type OpentsdbProtocolHandlerRef = Arc; +pub type InfluxdbLineProtocolHandlerRef = Arc; #[async_trait] pub trait SqlQueryHandler { @@ -39,6 +41,13 @@ pub trait GrpcAdminHandler { async fn exec_admin_request(&self, expr: AdminExpr) -> Result; } +#[async_trait] +pub trait InfluxdbLineProtocolHandler { + /// A successful request will not return a response. + /// Only on error will the socket return a line of data. + async fn exec(&self, request: &InfluxdbRequest) -> Result<()>; +} + #[async_trait] pub trait OpentsdbProtocolHandler { /// A successful request will not return a response. diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs new file mode 100644 index 0000000000..662dbafa90 --- /dev/null +++ b/src/servers/tests/http/influxdb_test.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use axum::Router; +use axum_test_helper::TestClient; +use common_query::Output; +use servers::error::Result; +use servers::http::HttpServer; +use servers::influxdb::{InfluxdbRequest, InsertBatches}; +use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler}; +use tokio::sync::mpsc; + +struct DummyInstance { + tx: mpsc::Sender, +} + +#[async_trait] +impl InfluxdbLineProtocolHandler for DummyInstance { + async fn exec(&self, request: &InfluxdbRequest) -> Result<()> { + let batches: InsertBatches = request.try_into()?; + + for (table_name, _) in batches.data { + let _ = self.tx.send(table_name).await; + } + + Ok(()) + } +} + +#[async_trait] +impl SqlQueryHandler for DummyInstance { + async fn do_query(&self, _query: &str) -> Result { + unimplemented!() + } + + async fn insert_script(&self, _name: &str, _script: &str) -> Result<()> { + unimplemented!() + } + + async fn execute_script(&self, _name: &str) -> Result { + unimplemented!() + } +} + +fn make_test_app(tx: mpsc::Sender) -> Router { + let instance = Arc::new(DummyInstance { tx }); + let mut server = HttpServer::new(instance.clone()); + server.set_influxdb_handler(instance); + server.make_app() +} + +#[tokio::test] +async fn test_influxdb_write() { + let (tx, mut rx) = mpsc::channel(100); + + let app = make_test_app(tx); + let client = TestClient::new(app); + + // right request + let result = client + .post("/v1/influxdb/write") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // bad request + let result = client + .post("/v1/influxdb/write") + .body("monitor, host=host1 cpu=1.2 1664370459457010101") + .send() + .await; + assert_eq!(result.status(), 400); + assert!(!result.text().await.is_empty()); + + let mut metrics = vec![]; + while let Ok(s) = rx.try_recv() { + metrics.push(s); + } + assert_eq!(metrics, vec!["monitor".to_string()]); +} diff --git a/src/servers/tests/http/mod.rs b/src/servers/tests/http/mod.rs index 5e8292dceb..eecf87523c 100644 --- a/src/servers/tests/http/mod.rs +++ b/src/servers/tests/http/mod.rs @@ -1,2 +1,3 @@ mod http_handler_test; +mod influxdb_test; mod opentsdb_test;