feat: opentsdb support (#274)

* feat: opentsdb support

* fix: tests

* fix: resolve CR comments

* fix: resolve CR comments

* fix: resolve CR comments

* fix: resolve CR comments

* refactor: remove feature flags for opentsdb and pg

* fix: resolve CR comments

* fix: resolve CR comments

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-09-26 15:47:43 +08:00
committed by GitHub
parent 0fa68ab7a5
commit ca732d45f9
43 changed files with 2300 additions and 376 deletions

74
Cargo.lock generated
View File

@@ -286,7 +286,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043"
dependencies = [
"async-trait",
"axum-core",
"axum-core 0.2.8",
"bitflags",
"bytes",
"futures-util",
@@ -294,7 +294,38 @@ dependencies = [
"http-body",
"hyper",
"itoa 1.0.3",
"matchit",
"matchit 0.5.0",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.6.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2628a243073c55aef15a1c1fe45c87f21b84f9e89ca9e7b262a180d3d03543d"
dependencies = [
"async-trait",
"axum-core 0.3.0-rc.2",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa 1.0.3",
"matchit 0.6.0",
"memchr",
"mime",
"percent-encoding",
@@ -327,10 +358,26 @@ dependencies = [
]
[[package]]
name = "axum-macros"
version = "0.2.3"
name = "axum-core"
version = "0.3.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6293dae2ec708e679da6736e857cf8532886ef258e92930f38279c12641628b8"
checksum = "473bd0762170028bb6b5068be9e97de2a9f0af3bf2084498d840498f47194d3d"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-macros"
version = "0.3.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "247a599903eb2e02abbaf2facc6396140df7af6dcc84e64ce3b71d117702fa22"
dependencies = [
"heck 0.4.0",
"proc-macro2",
@@ -344,7 +391,7 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5f0c689f3a3cb707ea097813153343b74dcf73b3e46dedb25be91a24050913"
dependencies = [
"axum",
"axum 0.5.16",
"bytes",
"http",
"http-body",
@@ -1372,7 +1419,7 @@ dependencies = [
"api",
"arrow2",
"async-trait",
"axum",
"axum 0.6.0-rc.2",
"axum-macros",
"axum-test-helper",
"catalog",
@@ -2606,6 +2653,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "matchit"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dfc802da7b1cf80aefffa0c7b2f77247c8b32206cc83c270b61264f5b360a80"
[[package]]
name = "matrixmultiply"
version = "0.3.2"
@@ -4633,8 +4686,10 @@ version = "0.1.0"
dependencies = [
"api",
"async-trait",
"axum",
"axum 0.6.0-rc.2",
"axum-macros",
"axum-test-helper",
"bytes",
"catalog",
"common-base",
"common-error",
@@ -4662,6 +4717,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-stream",
"tokio-test",
"tonic",
"tower",
"tower-http",
@@ -5482,7 +5538,7 @@ checksum = "11cd56bdb54ef93935a6a79dbd1d91f1ebd4c64150fd61654031fd6b8b775c91"
dependencies = [
"async-stream",
"async-trait",
"axum",
"axum 0.5.16",
"base64",
"bytes",
"futures-core",

View File

@@ -21,7 +21,3 @@ toml = "0.5"
[dev-dependencies]
serde = "1.0"
tempdir = "0.3"
[features]
default = ["postgres"]
postgres = ["datanode/postgres"]

View File

@@ -39,7 +39,6 @@ struct StartCommand {
rpc_addr: Option<String>,
#[clap(long)]
mysql_addr: Option<String>,
#[cfg(feature = "postgres")]
#[clap(long)]
postgres_addr: Option<String>,
#[clap(short, long)]
@@ -81,7 +80,6 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(addr) = cmd.mysql_addr {
opts.mysql_addr = addr;
}
#[cfg(feature = "postgres")]
if let Some(addr) = cmd.postgres_addr {
opts.postgres_addr = addr;
}
@@ -102,7 +100,6 @@ mod tests {
http_addr: None,
rpc_addr: None,
mysql_addr: None,
#[cfg(feature = "postgres")]
postgres_addr: None,
config_file: Some(format!(
"{}/../../config/datanode.example.toml",
@@ -116,11 +113,8 @@ mod tests {
assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
#[cfg(feature = "postgres")]
{
assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr);
assert_eq!(4, options.postgres_runtime_size);
}
assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr);
assert_eq!(4, options.postgres_runtime_size);
match options.storage {
ObjectStoreConfig::File { data_dir } => {

View File

@@ -1,5 +1,8 @@
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -38,9 +41,10 @@ struct StartCommand {
grpc_addr: Option<String>,
#[clap(long)]
mysql_addr: Option<String>,
#[cfg(feature = "postgres")]
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
opentsdb_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
}
@@ -70,11 +74,22 @@ impl TryFrom<StartCommand> for FrontendOptions {
opts.grpc_addr = Some(addr);
}
if let Some(addr) = cmd.mysql_addr {
opts.mysql_addr = Some(addr);
opts.mysql_options = Some(MysqlOptions {
addr,
..Default::default()
});
}
#[cfg(feature = "postgres")]
if let Some(addr) = cmd.postgres_addr {
opts.postgres_addr = Some(addr);
opts.postgres_options = Some(PostgresOptions {
addr,
..Default::default()
});
}
if let Some(addr) = cmd.opentsdb_addr {
opts.opentsdb_options = Some(OpentsdbOptions {
addr,
..Default::default()
});
}
Ok(opts)
}
@@ -90,24 +105,36 @@ mod tests {
http_addr: Some("127.0.0.1:1234".to_string()),
grpc_addr: None,
mysql_addr: Some("127.0.0.1:5678".to_string()),
#[cfg(feature = "postgres")]
postgres_addr: Some("127.0.0.1:5432".to_string()),
opentsdb_addr: Some("127.0.0.1:4321".to_string()),
config_file: None,
};
let opts: FrontendOptions = command.try_into().unwrap();
assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string()));
assert_eq!(opts.mysql_addr, Some("127.0.0.1:5678".to_string()));
#[cfg(feature = "postgres")]
assert_eq!(opts.postgres_addr, Some("127.0.0.1:5432".to_string()));
assert_eq!(opts.mysql_options.as_ref().unwrap().addr, "127.0.0.1:5678");
assert_eq!(
opts.postgres_options.as_ref().unwrap().addr,
"127.0.0.1:5432"
);
assert_eq!(
opts.opentsdb_options.as_ref().unwrap().addr,
"127.0.0.1:4321"
);
let default_opts = FrontendOptions::default();
assert_eq!(opts.grpc_addr, default_opts.grpc_addr);
assert_eq!(opts.mysql_runtime_size, default_opts.mysql_runtime_size);
#[cfg(feature = "postgres")]
assert_eq!(
opts.postgres_runtime_size,
default_opts.postgres_runtime_size
opts.mysql_options.as_ref().unwrap().runtime_size,
default_opts.mysql_options.as_ref().unwrap().runtime_size
);
assert_eq!(
opts.postgres_options.as_ref().unwrap().runtime_size,
default_opts.postgres_options.as_ref().unwrap().runtime_size
);
assert_eq!(
opts.opentsdb_options.as_ref().unwrap().runtime_size,
default_opts.opentsdb_options.as_ref().unwrap().runtime_size
);
}
}

View File

@@ -36,6 +36,7 @@ pub enum StatusCode {
TableAlreadyExists = 4000,
TableNotFound = 4001,
TableColumnNotFound = 4002,
TableColumnExists = 4003,
// ====== End of catalog related status code =======
// ====== Begin of storage related status code =====

View File

@@ -4,17 +4,16 @@ version = "0.1.0"
edition = "2021"
[features]
default = ["python", "postgres"]
default = ["python"]
python = [
"dep:script"
]
postgres = ["servers/postgres"]
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
axum = "0.5"
axum-macros = "0.2"
axum = "0.6.0-rc.2"
axum-macros = "0.3.0-rc.1"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }

View File

@@ -26,9 +26,7 @@ pub struct DatanodeOptions {
pub rpc_addr: String,
pub mysql_addr: String,
pub mysql_runtime_size: u32,
#[cfg(feature = "postgres")]
pub postgres_addr: String,
#[cfg(feature = "postgres")]
pub postgres_runtime_size: u32,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
@@ -41,9 +39,7 @@ impl Default for DatanodeOptions {
rpc_addr: "0.0.0.0:3001".to_string(),
mysql_addr: "0.0.0.0:3306".to_string(),
mysql_runtime_size: 2,
#[cfg(feature = "postgres")]
postgres_addr: "0.0.0.0:5432".to_string(),
#[cfg(feature = "postgres")]
postgres_runtime_size: 2,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),

View File

@@ -220,10 +220,9 @@ impl Instance {
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
}
#[cfg(test)]
impl Instance {
// This method is used in other crate's testing codes, so move it out of "cfg(test)".
// TODO(LFC): Delete it when callers no longer need it.
pub async fn new_mock() -> Result<Self> {
use table_engine::table::test_util::new_test_object_store;
use table_engine::table::test_util::MockEngine;

View File

@@ -21,7 +21,6 @@ pub struct Services {
http_server: HttpServer,
grpc_server: GrpcServer,
mysql_server: Box<dyn Server>,
#[cfg(feature = "postgres")]
postgres_server: Box<dyn Server>,
}
@@ -34,7 +33,6 @@ impl Services {
.build()
.context(error::RuntimeResourceSnafu)?,
);
#[cfg(feature = "postgres")]
let postgres_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.postgres_runtime_size as usize)
@@ -46,7 +44,6 @@ impl Services {
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone()),
mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime),
#[cfg(feature = "postgres")]
postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)),
})
}
@@ -65,7 +62,6 @@ impl Services {
addr: &opts.mysql_addr,
})?;
#[cfg(feature = "postgres")]
let postgres_addr: SocketAddr =
opts.postgres_addr.parse().context(error::ParseAddrSnafu {
addr: &opts.postgres_addr,
@@ -75,7 +71,6 @@ impl Services {
self.http_server.start(http_addr),
self.grpc_server.start(grpc_addr),
self.mysql_server.start(mysql_addr),
#[cfg(feature = "postgres")]
self.postgres_server.start(postgres_addr),
)
.context(error::StartServerSnafu)?;

View File

@@ -193,8 +193,6 @@ pub async fn test_create_table_illegal_timestamp_type() {
#[tokio::test]
async fn test_alter_table() {
common_telemetry::init_default_ut_logging();
// TODO(LFC) Use real Mito engine when we can alter its region schema,
// and delete the `new_mock` method.
let instance = Instance::new_mock().await.unwrap();
@@ -237,13 +235,13 @@ async fn test_alter_table() {
let pretty_print = arrow_print::write(&recordbatch);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+-------+---------------------+-----+--------+--------+",
"| host | ts | cpu | memory | my_tag |",
"+-------+---------------------+-----+--------+--------+",
"| host1 | 1970-01-01 00:00:01 | 1.1 | 100 | |",
"| host2 | 1970-01-01 00:00:02 | 2.2 | 200 | hello |",
"| host3 | 1970-01-01 00:00:03 | 3.3 | 300 | |",
"+-------+---------------------+-----+--------+--------+",
"+-------+-----+--------+---------------------+--------+",
"| host | cpu | memory | ts | my_tag |",
"+-------+-----+--------+---------------------+--------+",
"| host1 | 1.1 | 100 | 1970-01-01 00:00:01 | |",
"| host2 | 2.2 | 200 | 1970-01-01 00:00:02 | hello |",
"| host3 | 3.3 | 300 | 1970-01-01 00:00:03 | |",
"+-------+-----+--------+---------------------+--------+",
];
assert_eq!(pretty_print, expected);
}

View File

@@ -37,7 +37,3 @@ futures = "0.3"
tempdir = "0.3"
tonic = "0.8"
tower = "0.4"
[features]
default = ["postgres"]
postgres = ["servers/postgres"]

View File

@@ -12,6 +12,12 @@ pub enum Error {
source: client::Error,
},
#[snafu(display("Failed to request Datanode, source: {}", source))]
RequestDatanode {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Runtime resource error, source: {}", source))]
RuntimeResource {
#[snafu(backtrace)]
@@ -64,6 +70,18 @@ pub enum Error {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Incomplete GRPC result: {}", err_msg))]
IncompleteGrpcResult {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to execute OpenTSDB put, reason: {}", reason))]
ExecOpentsdbPut {
reason: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -78,8 +96,12 @@ impl ErrorExt for Error {
Error::StartServer { source, .. } => source.status_code(),
Error::ParseSql { source } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(),
Error::RequestDatanode { source } => source.status_code(),
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } => StatusCode::Unexpected,
Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => {
StatusCode::Unexpected
}
Error::ExecOpentsdbPut { .. } => StatusCode::Internal,
}
}

View File

@@ -5,18 +5,18 @@ use snafu::prelude::*;
use crate::error::{self, Result};
use crate::instance::Instance;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FrontendOptions {
pub http_addr: Option<String>,
pub grpc_addr: Option<String>,
pub mysql_addr: Option<String>,
pub mysql_runtime_size: u32,
#[cfg(feature = "postgres")]
pub postgres_addr: Option<String>,
#[cfg(feature = "postgres")]
pub postgres_runtime_size: u32,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
}
impl Default for FrontendOptions {
@@ -24,12 +24,9 @@ impl Default for FrontendOptions {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
grpc_addr: Some("0.0.0.0:4001".to_string()),
mysql_addr: Some("0.0.0.0:4002".to_string()),
mysql_runtime_size: 2,
#[cfg(feature = "postgres")]
postgres_addr: Some("0.0.0.0:4003".to_string()),
#[cfg(feature = "postgres")]
postgres_runtime_size: 2,
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
}
}
}

View File

@@ -1,3 +1,5 @@
mod opentsdb;
use std::collections::HashMap;
use std::sync::Arc;
@@ -258,30 +260,21 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::codec::{InsertBatch, SelectResult};
use api::v1::greptime_client::GreptimeClient;
use api::v1::{
admin_expr, admin_result, column, object_expr, object_result, select_expr, Column,
ExprHeader, MutateResult, SelectExpr,
};
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance as DatanodeInstance;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::value::Value;
use servers::grpc::GrpcServer;
use tempdir::TempDir;
use tonic::transport::{Endpoint, Server};
use tower::service_fn;
use super::*;
use crate::tests;
#[tokio::test]
async fn test_execute_sql() {
common_telemetry::init_default_ut_logging();
let datanode_instance = create_datanode_instance().await;
let frontend_instance = create_frontend_instance(datanode_instance).await;
let instance = tests::create_frontend_instance().await;
let sql = r#"CREATE TABLE demo(
host STRING,
@@ -292,9 +285,7 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(ts, host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
_ => unreachable!(),
@@ -305,18 +296,14 @@ mod tests {
('frontend.host2', null, null, 2000),
('frontend.host3', 3.3, 300, 3000)
"#;
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 3),
_ => unreachable!(),
}
let sql = "select * from demo";
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
let recordbatches = recordbatches
@@ -341,9 +328,7 @@ mod tests {
};
let sql = "select * from demo where ts>cast(1000000000 as timestamp)"; // use nanoseconds as where condition
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
let recordbatches = recordbatches
@@ -369,10 +354,7 @@ mod tests {
#[tokio::test]
async fn test_execute_grpc() {
common_telemetry::init_default_ut_logging();
let datanode_instance = create_datanode_instance().await;
let frontend_instance = create_frontend_instance(datanode_instance).await;
let instance = tests::create_frontend_instance().await;
// testing data:
let expected_host_col = Column {
@@ -432,7 +414,7 @@ mod tests {
header: Some(ExprHeader::default()),
expr: Some(admin_expr::Expr::Create(create_expr)),
};
let result = GrpcAdminHandler::exec_admin_request(&*frontend_instance, admin_expr)
let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr)
.await
.unwrap();
assert_matches!(
@@ -462,7 +444,7 @@ mod tests {
header: Some(ExprHeader::default()),
expr: Some(object_expr::Expr::Insert(insert_expr)),
};
let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr)
let result = GrpcQueryHandler::do_query(&*instance, object_expr)
.await
.unwrap();
assert_matches!(
@@ -480,7 +462,7 @@ mod tests {
expr: Some(select_expr::Expr::Sql("select * from demo".to_string())),
})),
};
let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr)
let result = GrpcQueryHandler::do_query(&*instance, object_expr)
.await
.unwrap();
match result.result {
@@ -508,63 +490,6 @@ mod tests {
}
}
async fn create_datanode_instance() -> Arc<DatanodeInstance> {
let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap();
let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
..Default::default()
};
let instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap());
instance.start().await.unwrap();
instance
}
async fn create_frontend_instance(datanode_instance: Arc<DatanodeInstance>) -> Arc<Instance> {
let (client, server) = tokio::io::duplex(1024);
// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service =
GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service();
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "http://[::]:50051" is just a placeholder, does not actually connect to it,
// see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
}
}
}))
.await
.unwrap();
let client = Client::with_client(GreptimeClient::new(channel));
Arc::new(Instance::with_client(client))
}
fn create_expr() -> CreateExpr {
let column_defs = vec![
GrpcColumnDef {

View File

@@ -0,0 +1,280 @@
use std::collections::HashMap;
use api::v1::{alter_expr, AddColumn, AlterExpr, ColumnDataType, ColumnDef, CreateExpr};
use async_trait::async_trait;
use client::{Error as ClientError, ObjectResult};
use common_error::prelude::{BoxedError, StatusCode};
use common_telemetry::info;
use servers::error as server_error;
use servers::opentsdb::codec::{
DataPoint, OPENTSDB_TIMESTAMP_COLUMN_NAME, OPENTSDB_VALUE_COLUMN_NAME,
};
use servers::query_handler::OpentsdbProtocolHandler;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::instance::Instance;
#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> {
// TODO(LFC): Insert metrics in batch, then make OpentsdbLineProtocolHandler::exec received multiple data points, when
// metric table and tags can be created upon insertion.
self.insert_opentsdb_metric(data_point)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::PutOpentsdbDataPointSnafu {
data_point: format!("{:?}", data_point),
})?;
Ok(())
}
}
impl Instance {
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let expr = data_point.as_grpc_insert();
let result = self.db.insert(expr.clone()).await;
let object_result = match result {
Ok(result) => result,
Err(ClientError::Datanode { code, .. }) => {
let retry = if code == StatusCode::TableNotFound as u32 {
self.create_opentsdb_metric(data_point).await?;
true
} else if code == StatusCode::TableColumnNotFound as u32 {
self.create_opentsdb_tags(data_point).await?;
true
} else {
false
};
if retry {
self.db
.insert(expr.clone())
.await
.context(error::RequestDatanodeSnafu)?
} else {
// `unwrap_err` is safe because we are matching "result" in "Err" arm
return Err(result.context(error::RequestDatanodeSnafu).unwrap_err());
}
}
Err(_) => {
return Err(result.context(error::RequestDatanodeSnafu).unwrap_err());
}
};
match object_result {
ObjectResult::Mutate(mutate) => {
if mutate.success != 1 || mutate.failure != 0 {
return error::ExecOpentsdbPutSnafu {
reason: format!("illegal result: {:?}", mutate),
}
.fail();
}
}
ObjectResult::Select(_) => unreachable!(),
}
Ok(())
}
async fn create_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let mut column_defs = Vec::with_capacity(2 + data_point.tags().len());
let ts_column = ColumnDef {
name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Timestamp as i32,
is_nullable: false,
..Default::default()
};
column_defs.push(ts_column);
let value_column = ColumnDef {
name: OPENTSDB_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
..Default::default()
};
column_defs.push(value_column);
for (tagk, _) in data_point.tags().iter() {
column_defs.push(ColumnDef {
name: tagk.to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
..Default::default()
})
}
let expr = CreateExpr {
catalog_name: None,
schema_name: None,
table_name: data_point.metric().to_string(),
desc: Some(format!(
"Table for OpenTSDB metric: {}",
&data_point.metric()
)),
column_defs,
time_index: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
primary_keys: vec![],
create_if_not_exists: true,
table_options: HashMap::new(),
};
let result = self
.admin
.create(expr)
.await
.context(error::RequestDatanodeSnafu)?;
let header = result.header.context(error::IncompleteGrpcResultSnafu {
err_msg: "'header' is missing",
})?;
if header.code == (StatusCode::Success as u32)
|| header.code == (StatusCode::TableAlreadyExists as u32)
{
info!(
"OpenTSDB metric table for \"{}\" is created!",
data_point.metric()
);
Ok(())
} else {
error::ExecOpentsdbPutSnafu {
reason: format!("error code: {}", header.code),
}
.fail()
}
}
async fn create_opentsdb_tags(&self, data_point: &DataPoint) -> Result<()> {
// TODO(LFC): support adding columns in one request
for (tagk, _) in data_point.tags().iter() {
let tag_column = ColumnDef {
name: tagk.to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
..Default::default()
};
let expr = AlterExpr {
catalog_name: None,
schema_name: None,
table_name: data_point.metric().to_string(),
kind: Some(alter_expr::Kind::AddColumn(AddColumn {
column_def: Some(tag_column),
})),
};
let result = self
.admin
.alter(expr)
.await
.context(error::RequestDatanodeSnafu)?;
let header = result.header.context(error::IncompleteGrpcResultSnafu {
err_msg: "'header' is missing",
})?;
if header.code != (StatusCode::Success as u32)
&& header.code != (StatusCode::TableColumnExists as u32)
{
return error::ExecOpentsdbPutSnafu {
reason: format!("error code: {}", header.code),
}
.fail();
}
info!(
"OpenTSDB tag \"{}\" for metric \"{}\" is added!",
tagk,
data_point.metric()
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_query::Output;
use datafusion::arrow_print;
use servers::query_handler::SqlQueryHandler;
use super::*;
use crate::tests;
#[tokio::test]
async fn test_exec() {
let instance = tests::create_frontend_instance().await;
instance
.exec(
&DataPoint::try_create(
"put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0",
)
.unwrap(),
)
.await
.unwrap();
instance
.exec(&DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap())
.await
.unwrap();
}
#[tokio::test]
async fn test_insert_opentsdb_metric() {
let instance = tests::create_frontend_instance().await;
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
1.0,
vec![
("tagk1".to_string(), "tagv1".to_string()),
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
let result = instance.insert_opentsdb_metric(&data_point1).await;
assert!(result.is_ok());
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
2.0,
vec![
("tagk2".to_string(), "tagv2".to_string()),
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
let result = instance.insert_opentsdb_metric(&data_point2).await;
assert!(result.is_ok());
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
let result = instance.insert_opentsdb_metric(&data_point3).await;
assert!(result.is_ok());
let output = instance
.do_query("select * from my_metric_1")
.await
.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
let recordbatches = recordbatches
.take()
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<_>>();
let pretty_print = arrow_print::write(&recordbatches);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+---------------------+-------+-------+-------+-------+",
"| timestamp | value | tagk1 | tagk2 | tagk3 |",
"+---------------------+-------+-------+-------+-------+",
"| 1970-01-01 00:00:01 | 1 | tagv1 | tagv2 | |",
"| 1970-01-01 00:00:02 | 2 | | tagv2 | tagv3 |",
"| 1970-01-01 00:00:03 | 3 | | | |",
"+---------------------+-------+-------+-------+-------+",
];
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
};
}
}

View File

@@ -3,4 +3,9 @@
pub mod error;
pub mod frontend;
pub mod instance;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
mod server;
#[cfg(test)]
mod tests;

16
src/frontend/src/mysql.rs Normal file
View File

@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MysqlOptions {
pub addr: String,
pub runtime_size: usize,
}
impl Default for MysqlOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4002".to_string(),
runtime_size: 2,
}
}
}

View File

@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpentsdbOptions {
pub addr: String,
pub runtime_size: usize,
}
impl Default for OpentsdbOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4242".to_string(),
runtime_size: 2,
}
}
}

View File

@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PostgresOptions {
pub addr: String,
pub runtime_size: usize,
}
impl Default for PostgresOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4003".to_string(),
runtime_size: 2,
}
}
}

View File

@@ -5,7 +5,7 @@ use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::mysql::server::MysqlServer;
#[cfg(feature = "postgres")]
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::server::Server;
use snafu::ResultExt;
@@ -19,16 +19,6 @@ pub(crate) struct Services;
impl Services {
pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> {
let http_server_and_addr = if let Some(http_addr) = &opts.http_addr {
let http_addr = parse_addr(http_addr)?;
let http_server = HttpServer::new(instance.clone());
Some((Box::new(http_server) as _, http_addr))
} else {
None
};
let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr {
let grpc_addr = parse_addr(grpc_addr)?;
@@ -39,12 +29,12 @@ impl Services {
None
};
let mysql_server_and_addr = if let Some(mysql_addr) = &opts.mysql_addr {
let mysql_addr = parse_addr(mysql_addr)?;
let mysql_server_and_addr = if let Some(opts) = &opts.mysql_options {
let mysql_addr = parse_addr(&opts.addr)?;
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.worker_threads(opts.runtime_size)
.thread_name("mysql-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
@@ -57,13 +47,12 @@ impl Services {
None
};
#[cfg(feature = "postgres")]
let postgres_server_and_addr = if let Some(pg_addr) = &opts.postgres_addr {
let pg_addr = parse_addr(pg_addr)?;
let postgres_server_and_addr = if let Some(opts) = &opts.postgres_options {
let pg_addr = parse_addr(&opts.addr)?;
let pg_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.postgres_runtime_size as usize)
.worker_threads(opts.runtime_size)
.thread_name("pg-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
@@ -77,12 +66,43 @@ impl Services {
None
};
let opentsdb_server_and_addr = if let Some(opts) = &opts.opentsdb_options {
let addr = parse_addr(&opts.addr)?;
let io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.runtime_size)
.thread_name("opentsdb-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let server = OpentsdbServer::create_server(instance.clone(), io_runtime);
Some((server, addr))
} else {
None
};
let http_server_and_addr = if let Some(http_addr) = &opts.http_addr {
let http_addr = parse_addr(http_addr)?;
let mut http_server = HttpServer::new(instance.clone());
if opentsdb_server_and_addr.is_some() {
http_server.set_opentsdb_handler(instance.clone());
}
Some((Box::new(http_server) as _, http_addr))
} else {
None
};
try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr),
#[cfg(feature = "postgres")]
start_server(postgres_server_and_addr),
start_server(opentsdb_server_and_addr)
)
.context(error::StartServerSnafu)?;
Ok(())

61
src/frontend/src/tests.rs Normal file
View File

@@ -0,0 +1,61 @@
use std::sync::Arc;
use api::v1::greptime_client::GreptimeClient;
use client::Client;
use datanode::instance::Instance as DatanodeInstance;
use servers::grpc::GrpcServer;
use tonic::transport::{Endpoint, Server};
use tower::service_fn;
use crate::instance::Instance;
async fn create_datanode_instance() -> Arc<DatanodeInstance> {
// TODO(LFC) Use real Mito engine when we can alter its region schema,
// and delete the `new_mock` method.
let instance = Arc::new(DatanodeInstance::new_mock().await.unwrap());
instance.start().await.unwrap();
instance
}
pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
let datanode_instance = create_datanode_instance().await;
let (client, server) = tokio::io::duplex(1024);
// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service =
GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service();
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "http://[::]:50051" is just a placeholder, does not actually connect to it,
// see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
}
}
}))
.await
.unwrap();
let client = Client::with_client(GreptimeClient::new(channel));
Arc::new(Instance::with_client(client))
}

View File

@@ -6,8 +6,9 @@ edition = "2021"
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
axum = "0.5"
axum-macros = "0.2"
axum = "0.6.0-rc.2"
axum-macros = "0.3.0-rc.1"
bytes = "1.2"
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
@@ -16,12 +17,12 @@ common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
futures = "0.3"
hex = { version = "0.4", optional = true }
hex = { version = "0.4" }
hyper = { version = "0.14", features = ["full"] }
metrics = "0.20"
num_cpus = "1.13"
opensrv-mysql = "0.1"
pgwire = { version = "0.3", optional = true }
pgwire = { version = "0.3" }
query = { path = "../query" }
serde = "1.0"
serde_json = "1.0"
@@ -32,11 +33,8 @@ tonic = "0.8"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
[features]
default = ["postgres"]
postgres = ["hex", "pgwire"]
[dev-dependencies]
axum-test-helper = "0.1"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
mysql_async = "0.30"
@@ -44,3 +42,4 @@ rand = "0.8"
script = { path = "../script", features = ["python"] }
test-util = { path = "../../test-util" }
tokio-postgres = "0.7"
tokio-test = "0.4"

View File

@@ -1,7 +1,11 @@
use std::any::Any;
use std::net::SocketAddr;
use axum::http::StatusCode as HttpStatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::prelude::*;
use serde_json::json;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -71,6 +75,35 @@ pub enum Error {
reason: String,
backtrace: Backtrace,
},
#[snafu(display("Connection reset by peer"))]
ConnResetByPeer { backtrace: Backtrace },
#[snafu(display("Hyper error, source: {}", source))]
Hyper { source: hyper::Error },
#[snafu(display("Invalid OpenTSDB line, source: {}", source))]
InvalidOpentsdbLine {
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("Invalid OpenTSDB Json request, source: {}", source))]
InvalidOpentsdbJsonRequest {
source: serde_json::error::Error,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to put OpenTSDB data point: {:?}, source: {}",
data_point,
source
))]
PutOpentsdbDataPoint {
data_point: String,
#[snafu(backtrace)]
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -90,9 +123,16 @@ impl ErrorExt for Error {
InsertScript { source, .. }
| ExecuteScript { source, .. }
| ExecuteQuery { source, .. } => source.status_code(),
| ExecuteQuery { source, .. }
| PutOpentsdbDataPoint { source, .. } => source.status_code(),
NotSupported { .. } | InvalidQuery { .. } => StatusCode::InvalidArguments,
NotSupported { .. }
| InvalidQuery { .. }
| ConnResetByPeer { .. }
| InvalidOpentsdbLine { .. }
| InvalidOpentsdbJsonRequest { .. } => StatusCode::InvalidArguments,
Hyper { .. } => StatusCode::Unknown,
}
}
@@ -116,3 +156,18 @@ impl From<std::io::Error> for Error {
Error::InternalIo { source: e }
}
}
impl IntoResponse for Error {
fn into_response(self) -> Response {
let (status, error_message) = match self {
Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::InvalidQuery { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()),
_ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
};
let body = Json(json!({
"error": error_message,
}));
(status, body).into_response()
}
}

View File

@@ -1,4 +1,5 @@
pub mod handler;
pub mod opentsdb;
use std::net::SocketAddr;
use std::time::Duration;
@@ -8,7 +9,7 @@ use axum::{
error_handling::HandleErrorLayer,
response::IntoResponse,
response::{Json, Response},
routing, BoxError, Extension, Router,
routing, BoxError, Router,
};
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
@@ -19,13 +20,15 @@ use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
use crate::error::{Result, StartHttpSnafu};
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::query_handler::SqlQueryHandlerRef;
use crate::server::Server;
const HTTP_API_VERSION: &str = "v1";
pub struct HttpServer {
query_handler: SqlQueryHandlerRef,
sql_handler: SqlQueryHandlerRef,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
}
#[derive(Serialize, Debug)]
@@ -114,27 +117,50 @@ async fn shutdown_signal() {
}
impl HttpServer {
pub fn new(query_handler: SqlQueryHandlerRef) -> Self {
Self { query_handler }
pub fn new(sql_handler: SqlQueryHandlerRef) -> Self {
Self {
sql_handler,
opentsdb_handler: None,
}
}
pub fn set_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) {
debug_assert!(
self.opentsdb_handler.is_none(),
"OpenTSDB handler can be set only once!"
);
self.opentsdb_handler.get_or_insert(handler);
}
pub fn make_app(&self) -> Router {
Router::new()
.nest(
&format!("/{}", HTTP_API_VERSION),
Router::new()
// handlers
.route("/sql", routing::get(handler::sql))
.route("/scripts", routing::post(handler::scripts))
.route("/run-script", routing::post(handler::run_script)),
)
// TODO(LFC): Use released Axum.
// Axum version 0.6 introduces state within router, making router methods far more elegant
// to write. Though version 0.6 is rc, I think it's worth to upgrade.
// Prior to version 0.6, we only have a single "Extension" to share all query
// handlers amongst router methods. That requires us to pack all query handlers in a shared
// state, and check-then-get the desired query handler in different router methods, which
// is a lot of tedious work.
let sql_router = Router::with_state(self.sql_handler.clone())
.route("/sql", routing::get(handler::sql))
.route("/scripts", routing::post(handler::scripts))
.route("/run-script", routing::post(handler::run_script));
let mut router = Router::new().nest(&format!("/{}", HTTP_API_VERSION), sql_router);
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
let opentsdb_router = Router::with_state(opentsdb_handler.clone())
.route("/api/put", routing::post(opentsdb::put));
router = router.nest(&format!("/{}/opentsdb", HTTP_API_VERSION), opentsdb_router);
}
router
.route("/metrics", routing::get(handler::metrics))
// middlewares
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(TraceLayer::new_for_http())
.layer(Extension(self.query_handler.clone()))
// TODO(LFC): make timeout configurable
.layer(TimeoutLayer::new(Duration::from_secs(30))),
)

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use axum::extract::{Extension, Json, Query};
use axum::extract::{Json, Query, State};
use common_telemetry::metric;
use serde::{Deserialize, Serialize};
@@ -10,11 +10,11 @@ use crate::query_handler::SqlQueryHandlerRef;
/// Handler to execute sql
#[axum_macros::debug_handler]
pub async fn sql(
Extension(query_handler): Extension<SqlQueryHandlerRef>,
State(sql_handler): State<SqlQueryHandlerRef>,
Query(params): Query<HashMap<String, String>>,
) -> HttpResponse {
if let Some(sql) = params.get("sql") {
HttpResponse::Json(JsonResponse::from_output(query_handler.do_query(sql).await).await)
HttpResponse::Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await)
} else {
HttpResponse::Json(JsonResponse::with_error(Some(
"sql parameter is required.".to_string(),
@@ -24,10 +24,7 @@ pub async fn sql(
/// Handler to export metrics
#[axum_macros::debug_handler]
pub async fn metrics(
Extension(_query_handler): Extension<SqlQueryHandlerRef>,
Query(_params): Query<HashMap<String, String>>,
) -> HttpResponse {
pub async fn metrics(Query(_params): Query<HashMap<String, String>>) -> HttpResponse {
if let Some(handle) = metric::try_handle() {
HttpResponse::Text(handle.render())
} else {
@@ -44,7 +41,7 @@ pub struct ScriptExecution {
/// Handler to insert and compile script
#[axum_macros::debug_handler]
pub async fn scripts(
Extension(query_handler): Extension<SqlQueryHandlerRef>,
State(query_handler): State<SqlQueryHandlerRef>,
Json(payload): Json<ScriptExecution>,
) -> HttpResponse {
if payload.name.is_empty() || payload.script.is_empty() {
@@ -67,7 +64,7 @@ pub async fn scripts(
/// Handler to execute script
#[axum_macros::debug_handler]
pub async fn run_script(
Extension(query_handler): Extension<SqlQueryHandlerRef>,
State(query_handler): State<SqlQueryHandlerRef>,
Query(params): Query<HashMap<String, String>>,
) -> HttpResponse {
let name = params.get("name");

View File

@@ -0,0 +1,225 @@
use std::collections::HashMap;
use axum::extract::{Query, RawBody, State};
use axum::http::StatusCode as HttpStatusCode;
use axum::Json;
use hyper::Body;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{self, Error, Result};
use crate::opentsdb::codec::DataPoint;
use crate::query_handler::OpentsdbProtocolHandlerRef;
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum OneOrMany<T> {
One(T),
Vec(Vec<T>),
}
impl<T> From<OneOrMany<T>> for Vec<T> {
fn from(from: OneOrMany<T>) -> Self {
match from {
OneOrMany::One(val) => vec![val],
OneOrMany::Vec(vec) => vec,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct DataPointRequest {
metric: String,
timestamp: i64,
value: f64,
tags: HashMap<String, String>,
}
impl From<DataPointRequest> for DataPoint {
fn from(request: DataPointRequest) -> Self {
let ts_millis = DataPoint::timestamp_to_millis(request.timestamp);
let tags = request
.tags
.into_iter()
.map(|(k, v)| (k, v))
.collect::<Vec<(String, String)>>();
DataPoint::new(request.metric, ts_millis, request.value, tags)
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum OpentsdbPutResponse {
Empty,
Debug(OpentsdbDebuggingResponse),
}
// Please refer to the OpenTSDB documents of ["api/put"](http://opentsdb.net/docs/build/html/api_http/put.html)
// for more details.
#[axum_macros::debug_handler]
pub async fn put(
State(opentsdb_handler): State<OpentsdbProtocolHandlerRef>,
Query(params): Query<HashMap<String, String>>,
RawBody(body): RawBody,
) -> Result<(HttpStatusCode, Json<OpentsdbPutResponse>)> {
let summary = params.contains_key("summary");
let details = params.contains_key("details");
let data_points = parse_data_points(body).await?;
let response = if !summary && !details {
for data_point in data_points.into_iter() {
if let Err(e) = opentsdb_handler.exec(&data_point.into()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
}
}
(HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
} else {
let mut response = OpentsdbDebuggingResponse {
success: 0,
failed: 0,
errors: if details {
Some(Vec::with_capacity(data_points.len()))
} else {
None
},
};
for data_point in data_points.into_iter() {
let result = opentsdb_handler.exec(&data_point.clone().into()).await;
match result {
Ok(()) => response.on_success(),
Err(e) => {
response.on_failed(data_point, e);
}
}
}
(
HttpStatusCode::OK,
Json(OpentsdbPutResponse::Debug(response)),
)
};
Ok(response)
}
async fn parse_data_points(body: Body) -> Result<Vec<DataPointRequest>> {
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;
let data_points = serde_json::from_slice::<OneOrMany<DataPointRequest>>(&body[..])
.context(error::InvalidOpentsdbJsonRequestSnafu)?;
Ok(data_points.into())
}
#[derive(Serialize, Deserialize, Debug)]
struct OpentsdbDetailError {
datapoint: DataPointRequest,
error: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct OpentsdbDebuggingResponse {
success: i32,
failed: i32,
#[serde(skip_serializing_if = "Option::is_none")]
errors: Option<Vec<OpentsdbDetailError>>,
}
impl OpentsdbDebuggingResponse {
fn on_success(&mut self) {
self.success += 1;
}
fn on_failed(&mut self, datapoint: DataPointRequest, error: Error) {
self.failed += 1;
if let Some(details) = self.errors.as_mut() {
let error = OpentsdbDetailError {
datapoint,
error: error.to_string(),
};
details.push(error);
};
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_into_opentsdb_data_point() {
let request = DataPointRequest {
metric: "hello".to_string(),
timestamp: 1234,
value: 1.0,
tags: HashMap::from([("foo".to_string(), "a".to_string())]),
};
let data_point: DataPoint = request.into();
assert_eq!(data_point.metric(), "hello");
assert_eq!(data_point.ts_millis(), 1234000);
assert_eq!(data_point.value(), 1.0);
assert_eq!(
data_point.tags(),
&vec![("foo".to_string(), "a".to_string())]
);
}
#[tokio::test]
async fn test_parse_data_points() {
let raw_data_point1 = r#"{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}"#;
let data_point1 = serde_json::from_str::<DataPointRequest>(raw_data_point1).unwrap();
let raw_data_point2 = r#"{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 9,
"tags": {
"host": "web02",
"dc": "lga"
}
}"#;
let data_point2 = serde_json::from_str::<DataPointRequest>(raw_data_point2).unwrap();
let body = Body::from(raw_data_point1);
let data_points = parse_data_points(body).await.unwrap();
assert_eq!(data_points.len(), 1);
assert_eq!(data_points[0], data_point1);
let body = Body::from(format!("[{},{}]", raw_data_point1, raw_data_point2));
let data_points = parse_data_points(body).await.unwrap();
assert_eq!(data_points.len(), 2);
assert_eq!(data_points[0], data_point1);
assert_eq!(data_points[1], data_point2);
let body = Body::from("");
let result = parse_data_points(body).await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Invalid OpenTSDB Json request, source: EOF while parsing a value at line 1 column 0"
);
let body = Body::from("hello world");
let result = parse_data_points(body).await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Invalid OpenTSDB Json request, source: expected value at line 1 column 1"
);
}
}

View File

@@ -1,8 +1,11 @@
#![feature(assert_matches)]
pub mod error;
pub mod grpc;
pub mod http;
pub mod mysql;
#[cfg(feature = "postgres")]
pub mod opentsdb;
pub mod postgres;
pub mod query_handler;
pub mod server;
mod shutdown;

View File

@@ -5,69 +5,39 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{error, info};
use futures::future::AbortHandle;
use futures::future::AbortRegistration;
use futures::future::Abortable;
use futures::StreamExt;
use opensrv_mysql::AsyncMysqlIntermediary;
use snafu::prelude::*;
use tokio;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::TcpListenerStream;
use crate::error::{self, Result};
use crate::error::Result;
use crate::mysql::handler::MysqlInstanceShim;
use crate::query_handler::SqlQueryHandlerRef;
use crate::server::Server;
use crate::server::{AbortableStream, BaseTcpServer, Server};
pub struct MysqlServer {
// `abort_handle` and `abort_registration` are used in pairs in shutting down MySQL server.
// They work like sender and receiver for aborting stream. When the server is shutting down,
// calling `abort_handle.abort()` will "notify" `abort_registration` to stop emitting new
// elements in the stream.
abort_handle: AbortHandle,
abort_registration: Option<AbortRegistration>,
// A handle holding the TCP accepting task.
join_handle: Option<JoinHandle<()>>,
base_server: BaseTcpServer,
query_handler: SqlQueryHandlerRef,
io_runtime: Arc<Runtime>,
}
impl MysqlServer {
/// Creates a new MySQL server with provided [MysqlInstance] and [Runtime].
pub fn create_server(
query_handler: SqlQueryHandlerRef,
io_runtime: Arc<Runtime>,
) -> Box<dyn Server> {
let (abort_handle, registration) = AbortHandle::new_pair();
Box::new(MysqlServer {
abort_handle,
abort_registration: Some(registration),
join_handle: None,
base_server: BaseTcpServer::create_server("MySQL", io_runtime),
query_handler,
io_runtime,
})
}
async fn bind(addr: SocketAddr) -> Result<(TcpListenerStream, SocketAddr)> {
let listener = tokio::net::TcpListener::bind(addr)
.await
.context(error::TokioIoSnafu {
err_msg: format!("Failed to bind addr {}", addr),
})?;
// get actually bond addr in case input addr use port 0
let addr = listener.local_addr()?;
info!("MySQL server is bound to {}", addr);
Ok((TcpListenerStream::new(listener), addr))
}
fn accept(&self, accepting_stream: Abortable<TcpListenerStream>) -> impl Future<Output = ()> {
let io_runtime = self.io_runtime.clone();
fn accept(
&self,
io_runtime: Arc<Runtime>,
stream: AbortableStream,
) -> impl Future<Output = ()> {
let query_handler = self.query_handler.clone();
accepting_stream.for_each(move |tcp_stream| {
stream.for_each(move |tcp_stream| {
let io_runtime = io_runtime.clone();
let query_handler = query_handler.clone();
async move {
@@ -99,40 +69,15 @@ impl MysqlServer {
#[async_trait]
impl Server for MysqlServer {
async fn shutdown(&mut self) -> Result<()> {
match self.join_handle.take() {
Some(join_handle) => {
self.abort_handle.abort();
if let Err(error) = join_handle.await {
// Couldn't use `error!(e; xxx)` as JoinError doesn't implement ErrorExt.
error!(
"Unexpected error during shutdown MySQL server, error: {}",
error
);
} else {
info!("MySQL server is shutdown.")
}
Ok(())
}
None => error::InternalSnafu {
err_msg: "MySQL server is not started.",
}
.fail()?,
}
self.base_server.shutdown().await
}
async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr> {
match self.abort_registration.take() {
Some(registration) => {
let (stream, listener) = Self::bind(listening).await?;
let stream = Abortable::new(stream, registration);
self.join_handle = Some(tokio::spawn(self.accept(stream)));
Ok(listener)
}
None => error::InternalSnafu {
err_msg: "MySQL server has been started.",
}
.fail()?,
}
let (stream, addr) = self.base_server.bind(listening).await?;
let io_runtime = self.base_server.io_runtime();
let join_handle = tokio::spawn(self.accept(io_runtime, stream));
self.base_server.start_with(join_handle)?;
Ok(addr)
}
}

102
src/servers/src/opentsdb.rs Normal file
View File

@@ -0,0 +1,102 @@
pub mod codec;
pub mod connection;
mod handler;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::error;
use futures::StreamExt;
use tokio::sync::broadcast;
use crate::error::Result;
use crate::opentsdb::connection::Connection;
use crate::opentsdb::handler::Handler;
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::server::{AbortableStream, BaseTcpServer, Server};
use crate::shutdown::Shutdown;
pub struct OpentsdbServer {
base_server: BaseTcpServer,
query_handler: OpentsdbProtocolHandlerRef,
/// Broadcasts a shutdown signal to all active connections.
///
/// When a connection task is spawned, it is passed a broadcast receiver handle. We can send
/// a `()` value via `notify_shutdown` or just drop `notify_shutdown`, then each active
/// connection receives it, reaches a safe terminal state, and completes the task.
notify_shutdown: Option<broadcast::Sender<()>>,
}
impl OpentsdbServer {
pub fn create_server(
query_handler: OpentsdbProtocolHandlerRef,
io_runtime: Arc<Runtime>,
) -> Box<dyn Server> {
// When the provided `shutdown` future completes, we must send a shutdown
// message to all active connections. We use a broadcast channel for this
// purpose. The call below ignores the receiver of the broadcast pair, and when
// a receiver is needed, the subscribe() method on the sender is used to create
// one.
let (notify_shutdown, _) = broadcast::channel(1);
Box::new(OpentsdbServer {
base_server: BaseTcpServer::create_server("OpenTSDB", io_runtime),
query_handler,
notify_shutdown: Some(notify_shutdown),
})
}
fn accept(
&self,
io_runtime: Arc<Runtime>,
stream: AbortableStream,
) -> impl Future<Output = ()> {
let query_handler = self.query_handler.clone();
let notify_shutdown = self
.notify_shutdown
.clone()
.expect("`notify_shutdown` must be present when accepting connection!");
stream.for_each(move |stream| {
let io_runtime = io_runtime.clone();
let query_handler = query_handler.clone();
let shutdown = Shutdown::new(notify_shutdown.subscribe());
async move {
match stream {
Ok(stream) => {
let connection = Connection::new(stream);
let mut handler = Handler::new(query_handler, connection, shutdown);
let _ = io_runtime.spawn(async move {
if let Err(e) = handler.run().await {
error!(e; "Unexpected error when handling OpenTSDB connection");
}
});
}
Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt.
};
}
})
}
}
#[async_trait]
impl Server for OpentsdbServer {
async fn shutdown(&mut self) -> Result<()> {
self.base_server.shutdown().await?;
drop(self.notify_shutdown.take());
Ok(())
}
async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr> {
let (stream, addr) = self.base_server.bind(listening).await?;
let io_runtime = self.base_server.io_runtime();
let join_handle = tokio::spawn(self.accept(io_runtime, stream));
self.base_server.start_with(join_handle)?;
Ok(addr)
}
}

View File

@@ -0,0 +1,280 @@
use api::v1::codec::InsertBatch;
use api::v1::{column, insert_expr, Column, InsertExpr};
use crate::error::{self, Result};
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "value";
#[derive(Debug)]
pub struct DataPoint {
metric: String,
ts_millis: i64,
value: f64,
tags: Vec<(String, String)>,
}
impl DataPoint {
pub fn new(metric: String, ts_millis: i64, value: f64, tags: Vec<(String, String)>) -> Self {
Self {
metric,
ts_millis,
value,
tags,
}
}
pub fn try_create(line: &str) -> Result<Self> {
let tokens = line.split_whitespace().collect::<Vec<&str>>();
let cmd = if tokens.is_empty() { "" } else { tokens[0] };
// OpenTSDB command is case sensitive, verified in real OpenTSDB.
if cmd != "put" {
return error::InvalidQuerySnafu {
reason: format!("unknown command {}.", cmd),
}
.fail();
}
if tokens.len() < 4 {
return error::InvalidQuerySnafu {
reason: format!(
"put: illegal argument: not enough arguments (need least 4, got {})",
tokens.len()
),
}
.fail();
}
let metric = tokens[1];
let ts_millis = match tokens[2].parse::<i64>() {
Ok(t) => Self::timestamp_to_millis(t),
Err(_) => {
return error::InvalidQuerySnafu {
reason: format!("put: invalid timestamp: {}", tokens[2]),
}
.fail()
}
};
let value = match tokens[3].parse::<f64>() {
Ok(v) => v,
Err(_) => {
return error::InvalidQuerySnafu {
reason: format!("put: invalid value: {}", tokens[3]),
}
.fail()
}
};
let mut tags = Vec::with_capacity(tokens.len() - 4);
for token in tokens.iter().skip(4) {
let tag = token.split('=').collect::<Vec<&str>>();
if tag.len() != 2 || tag[0].is_empty() || tag[1].is_empty() {
return error::InvalidQuerySnafu {
reason: format!("put: invalid tag: {}", token),
}
.fail();
}
let tagk = tag[0].to_string();
let tagv = tag[1].to_string();
if tags.iter().any(|(t, _)| t == &tagk) {
return error::InvalidQuerySnafu {
reason: format!("put: illegal argument: duplicate tag: {}", tagk),
}
.fail();
}
tags.push((tagk, tagv));
}
Ok(DataPoint {
metric: metric.to_string(),
ts_millis,
value,
tags,
})
}
pub fn metric(&self) -> &str {
&self.metric
}
pub fn tags(&self) -> &Vec<(String, String)> {
&self.tags
}
pub fn ts_millis(&self) -> i64 {
self.ts_millis
}
pub fn value(&self) -> f64 {
self.value
}
pub fn as_grpc_insert(&self) -> InsertExpr {
let mut columns = Vec::with_capacity(2 + self.tags.len());
let ts_column = Column {
column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millis_values: vec![self.ts_millis],
..Default::default()
}),
..Default::default()
};
columns.push(ts_column);
let value_column = Column {
column_name: OPENTSDB_VALUE_COLUMN_NAME.to_string(),
values: Some(column::Values {
f64_values: vec![self.value],
..Default::default()
}),
..Default::default()
};
columns.push(value_column);
for (tagk, tagv) in self.tags.iter() {
columns.push(Column {
column_name: tagk.to_string(),
values: Some(column::Values {
string_values: vec![tagv.to_string()],
..Default::default()
}),
..Default::default()
});
}
let batch = InsertBatch {
columns,
row_count: 1,
};
InsertExpr {
table_name: self.metric.clone(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![batch.into()],
})),
}
}
pub fn timestamp_to_millis(t: i64) -> i64 {
// 9999999999999 (13 digits) is of date "Sat Nov 20 2286 17:46:39 UTC",
// 999999999999 (12 digits) is "Sun Sep 09 2001 01:46:39 UTC",
// so timestamp digits less than 13 means we got seconds here.
// (We are not expecting to store data that is 21 years ago, are we?)
if t.abs().to_string().len() < 13 {
t * 1000
} else {
t
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_try_create() {
fn test_illegal_line(line: &str, expected_err: &str) {
let result = DataPoint::try_create(line);
match result.unwrap_err() {
error::Error::InvalidQuery { reason, .. } => {
assert_eq!(reason, expected_err)
}
_ => unreachable!(),
}
}
test_illegal_line("no_put", "unknown command no_put.");
test_illegal_line(
"put",
"put: illegal argument: not enough arguments (need least 4, got 1)",
);
test_illegal_line(
"put metric.foo notatime 42 host=web01",
"put: invalid timestamp: notatime",
);
test_illegal_line(
"put metric.foo 1000 notavalue host=web01",
"put: invalid value: notavalue",
);
test_illegal_line("put metric.foo 1000 42 host=", "put: invalid tag: host=");
test_illegal_line(
"put metric.foo 1000 42 host=web01 host=web02",
"put: illegal argument: duplicate tag: host",
);
let data_point = DataPoint::try_create(
"put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0",
)
.unwrap();
assert_eq!(data_point.metric, "sys.if.bytes.out");
assert_eq!(data_point.ts_millis, 1479496100000);
assert_eq!(data_point.value, 1.3e3);
assert_eq!(
data_point.tags,
vec![
("host".to_string(), "web01".to_string()),
("interface".to_string(), "eth0".to_string())
]
);
let data_point =
DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap();
assert_eq!(data_point.metric, "sys.procs.running");
assert_eq!(data_point.ts_millis, 1479496100000);
assert_eq!(data_point.value, 42f64);
assert_eq!(
data_point.tags,
vec![("host".to_string(), "web01".to_string())]
);
}
#[test]
fn test_as_grpc_insert() {
let data_point = DataPoint {
metric: "my_metric_1".to_string(),
ts_millis: 1000,
value: 1.0,
tags: vec![
("tagk1".to_string(), "tagv1".to_string()),
("tagk2".to_string(), "tagv2".to_string()),
],
};
let grpc_insert = data_point.as_grpc_insert();
assert_eq!(grpc_insert.table_name, "my_metric_1");
match grpc_insert.expr {
Some(insert_expr::Expr::Values(insert_expr::Values { values })) => {
assert_eq!(values.len(), 1);
let insert_batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(insert_batch.row_count, 1);
let columns = insert_batch.columns;
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000]
);
assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME);
assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
assert_eq!(columns[2].column_name, "tagk1");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["tagv1"]
);
assert_eq!(columns[3].column_name, "tagk2");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["tagv2"]
);
}
_ => unreachable!(),
}
}
}

View File

@@ -0,0 +1,190 @@
//! Modified from Tokio's mini-redis example.
use bytes::{Buf, BytesMut};
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter};
use crate::error::{self, Result};
type Line = String;
#[derive(Debug)]
pub struct Connection<S: AsyncWrite + AsyncRead + Unpin> {
stream: BufWriter<S>,
buffer: BytesMut,
}
impl<S: AsyncWrite + AsyncRead + Unpin> Connection<S> {
pub fn new(stream: S) -> Connection<S> {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4 * 1024),
}
}
/// Read one line from the underlying stream.
///
/// The function waits until it has retrieved enough data to parse a line (terminated by \r\n).
/// Any data remaining in the read buffer after the line has been parsed is kept there for the
/// next call to `read_line`.
///
/// # Returns
///
/// On success, the received line is returned. If the stream is closed in a way that
/// doesn't break a line in half, it returns `None`. Otherwise, an error is returned.
pub async fn read_line(&mut self) -> Result<Option<Line>> {
loop {
// Attempt to parse a line from the buffered data. If enough data
// has been buffered, the line is returned.
if let Some(line) = self.parse_line()? {
return Ok(Some(line));
}
// There is not enough buffered data as a line. Attempt to read more from the socket.
// On success, the number of bytes is returned. `0` indicates "end of stream".
if self.stream.read_buf(&mut self.buffer).await? == 0 {
// The remote closed the connection. For this to be a clean shutdown, there should
// be no data in the read buffer. If there is, this means that the peer closed the
// socket while sending a line.
if self.buffer.is_empty() {
return Ok(None);
} else {
return error::ConnResetByPeerSnafu {}.fail();
}
}
}
}
/// Tries to parse a line from the buffer.
///
/// If the buffer contains enough data, the line is returned and the buffered data is removed.
/// If not enough data has been buffered yet, `Ok(None)` is returned.
/// If the buffered data does not represent a valid UTF8 line, `Err` is returned.
fn parse_line(&mut self) -> Result<Option<Line>> {
if self.buffer.is_empty() {
return Ok(None);
}
let buf = &self.buffer[..];
if let Some(pos) = buf.windows(2).position(|w| w == [b'\r', b'\n']) {
let line = buf[0..pos].to_vec();
self.buffer.advance(pos + 2);
Ok(Some(
String::from_utf8(line).context(error::InvalidOpentsdbLineSnafu)?,
))
} else {
// There is not enough data present in the read buffer to parse a single line. We must
// wait for more data to be received from the socket.
Ok(None)
}
}
pub async fn write_line(&mut self, line: String) -> Result<()> {
self.stream
.write_all(line.as_bytes())
.await
.context(error::InternalIoSnafu)?;
self.stream
.write(b"\r\n")
.await
.context(error::InternalIoSnafu)?;
self.stream.flush().await.context(error::InternalIoSnafu)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::io::Write;
use bytes::BufMut;
use tokio_test::io::Builder;
use super::*;
#[tokio::test]
async fn test_read_line() {
let mock = Builder::new()
.read(b"This is")
.read(b" a line.\r\n")
.build();
let mut conn = Connection::new(mock);
let line = conn.read_line().await.unwrap();
assert_eq!(line, Some("This is a line.".to_string()));
let line = conn.read_line().await.unwrap();
assert_eq!(line, None);
let buffer = &mut conn.buffer;
buffer
.writer()
.write_all(b"simulating buffer has remaining data")
.unwrap();
let result = conn.read_line().await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Connection reset by peer"));
}
#[test]
fn test_parse_line() {
let mock = Builder::new().build();
let mut conn = Connection::new(mock);
// initially, no data in the buffer, so return None
let line = conn.parse_line();
assert_matches!(line, Ok(None));
// still has no line, but we have data in the buffer
{
let buffer = &mut conn.buffer;
buffer.writer().write_all(b"This is a ").unwrap();
let line = conn.parse_line();
assert_matches!(line, Ok(None));
}
let buffer = &conn.buffer[..];
assert_eq!(String::from_utf8(buffer.to_vec()).unwrap(), "This is a ");
// finally gets a line, and the buffer has the remaining data
{
let buffer = &mut conn.buffer;
buffer
.writer()
.write_all(b"line.\r\n another line's remaining data")
.unwrap();
let line = conn.parse_line().unwrap();
assert_eq!(line, Some("This is a line.".to_string()));
}
let buffer = &conn.buffer[..];
assert_eq!(
String::from_utf8(buffer.to_vec()).unwrap(),
" another line's remaining data"
);
// expected failed on not valid utf-8 line
let buffer = &mut conn.buffer;
buffer.writer().write_all(b"Hello Wor\xffld.\r\n").unwrap();
let result = conn.parse_line();
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Invalid OpenTSDB line, source: invalid utf-8 sequence"));
}
#[tokio::test]
async fn test_write_err() {
let mock = Builder::new()
.write(b"An OpenTSDB error.")
.write(b"\r\n")
.build();
let mut conn = Connection::new(mock);
let result = conn.write_line("An OpenTSDB error.".to_string()).await;
assert!(result.is_ok());
}
}

View File

@@ -0,0 +1,184 @@
//! Modified from Tokio's mini-redis example.
use tokio::io::{AsyncRead, AsyncWrite};
use crate::error::Result;
use crate::opentsdb::codec::DataPoint;
use crate::opentsdb::connection::Connection;
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::shutdown::Shutdown;
/// Per-connection handler. Reads requests from `connection` and applies the OpenTSDB metric to
/// [OpentsdbLineProtocolHandler].
pub(crate) struct Handler<S: AsyncWrite + AsyncRead + Unpin> {
query_handler: OpentsdbProtocolHandlerRef,
/// The TCP connection decorated with OpenTSDB line protocol encoder / decoder implemented
/// using a buffered `TcpStream`.
///
/// When TCP listener receives an inbound connection, the `TcpStream` is passed to
/// `Connection::new`, which initializes the associated buffers. The byte level protocol
/// parsing details is encapsulated in `Connection`.
connection: Connection<S>,
/// Listen for shutdown notifications.
///
/// A wrapper around the `broadcast::Receiver` paired with the sender in TCP connections
/// listener. The connection handler processes requests from the connection until the peer
/// disconnects **or** a shutdown notification is received from `shutdown`. In the latter case,
/// any in-flight work being processed for the peer is continued until it reaches a safe state,
/// at which point the connection is terminated. (Graceful shutdown.)
shutdown: Shutdown,
}
impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
pub(crate) fn new(
query_handler: OpentsdbProtocolHandlerRef,
connection: Connection<S>,
shutdown: Shutdown,
) -> Self {
Self {
query_handler,
connection,
shutdown,
}
}
pub(crate) async fn run(&mut self) -> Result<()> {
while !self.shutdown.is_shutdown() {
// While reading a request, also listen for the shutdown signal.
let maybe_line = tokio::select! {
line = self.connection.read_line() => line?,
_ = self.shutdown.recv() => {
// If a shutdown signal is received, return from `run`.
// This will result in the task terminating.
return Ok(());
}
};
// If `None` is returned from `read_line()` then the peer closed the socket. There is
// no further work to do and the task can be terminated.
let line = match maybe_line {
Some(line) => line,
None => return Ok(()),
};
// Close connection upon receiving "quit" line. With actual OpenTSDB, telnet just won't
// quit, the connection to OpenTSDB server can be closed only via terminating telnet
// session manually, for example, close the terminal window. That is a little annoying,
// so I added "quit" command to the line protocol, to make telnet client able to quit
// gracefully.
if line.trim().eq_ignore_ascii_case("quit") {
return Ok(());
}
match DataPoint::try_create(&line) {
Ok(data_point) => {
let result = self.query_handler.exec(&data_point).await;
if let Err(e) = result {
self.connection.write_line(e.to_string()).await?;
}
}
Err(e) => {
self.connection.write_line(e.to_string()).await?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc};
use super::*;
use crate::error;
use crate::query_handler::OpentsdbProtocolHandler;
struct DummyQueryHandler {
tx: mpsc::Sender<String>,
}
#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_point: &DataPoint) -> Result<()> {
let metric = data_point.metric();
if metric == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
self.tx.send(metric.to_string()).await.unwrap();
Ok(())
}
}
#[tokio::test]
async fn test_run() {
let (tx, mut rx) = mpsc::channel(100);
let query_handler = Arc::new(DummyQueryHandler { tx });
let (notify_shutdown, _) = broadcast::channel(1);
let addr = start_server(query_handler, notify_shutdown).await;
let stream = TcpStream::connect(addr).await.unwrap();
let mut client = Connection::new(stream);
client
.write_line("put my_metric_1 1000 1.0 host=web01".to_string())
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap(), "my_metric_1");
client
.write_line("put my_metric_2 1000 1.0 host=web01".to_string())
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap(), "my_metric_2");
client
.write_line("put should_failed 1000 1.0 host=web01".to_string())
.await
.unwrap();
let resp = client.read_line().await.unwrap();
assert_eq!(resp, Some("Internal error: expected".to_string()));
client.write_line("get".to_string()).await.unwrap();
let resp = client.read_line().await.unwrap();
assert_eq!(
resp,
Some("Invalid query: unknown command get.".to_string())
);
}
async fn start_server(
query_handler: OpentsdbProtocolHandlerRef,
notify_shutdown: broadcast::Sender<()>,
) -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let query_handler = query_handler.clone();
let connection = Connection::new(stream);
let shutdown = Shutdown::new(notify_shutdown.subscribe());
tokio::spawn(async move {
Handler::new(query_handler, connection, shutdown)
.run()
.await
});
}
});
addr
}
}

View File

@@ -4,68 +4,40 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{error, info};
use futures::future::AbortHandle;
use futures::future::AbortRegistration;
use futures::future::Abortable;
use common_telemetry::logging::error;
use futures::StreamExt;
use pgwire::api::auth::noop::NoopStartupHandler;
use pgwire::tokio::process_socket;
use snafu::prelude::*;
use tokio;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::TcpListenerStream;
use crate::error::{self, Result};
use crate::error::Result;
use crate::postgres::handler::PostgresServerHandler;
use crate::query_handler::SqlQueryHandlerRef;
use crate::server::Server;
use crate::server::{AbortableStream, BaseTcpServer, Server};
pub struct PostgresServer {
// See MySQL module for usage of these types
abort_handle: AbortHandle,
abort_registration: Option<AbortRegistration>,
// A handle holding the TCP accepting task.
join_handle: Option<JoinHandle<()>>,
base_server: BaseTcpServer,
auth_handler: Arc<NoopStartupHandler>,
query_handler: Arc<PostgresServerHandler>,
io_runtime: Arc<Runtime>,
}
impl PostgresServer {
/// Creates a new Postgres server with provided query_handler and async runtime
pub fn new(query_handler: SqlQueryHandlerRef, io_runtime: Arc<Runtime>) -> PostgresServer {
let (abort_handle, registration) = AbortHandle::new_pair();
let postgres_handler = Arc::new(PostgresServerHandler::new(query_handler));
let startup_handler = Arc::new(NoopStartupHandler);
PostgresServer {
abort_handle,
abort_registration: Some(registration),
join_handle: None,
base_server: BaseTcpServer::create_server("Postgres", io_runtime),
auth_handler: startup_handler,
query_handler: postgres_handler,
io_runtime,
}
}
async fn bind(addr: SocketAddr) -> Result<(TcpListenerStream, SocketAddr)> {
let listener = tokio::net::TcpListener::bind(addr)
.await
.context(error::TokioIoSnafu {
err_msg: format!("Failed to bind addr {}", addr),
})?;
// get actually bond addr in case input addr use port 0
let addr = listener.local_addr()?;
info!("Postgres server is bound to {}", addr);
Ok((TcpListenerStream::new(listener), addr))
}
fn accept(&self, accepting_stream: Abortable<TcpListenerStream>) -> impl Future<Output = ()> {
let io_runtime = self.io_runtime.clone();
fn accept(
&self,
io_runtime: Arc<Runtime>,
accepting_stream: AbortableStream,
) -> impl Future<Output = ()> {
let auth_handler = self.auth_handler.clone();
let query_handler = self.query_handler.clone();
@@ -97,40 +69,15 @@ impl PostgresServer {
#[async_trait]
impl Server for PostgresServer {
async fn shutdown(&mut self) -> Result<()> {
match self.join_handle.take() {
Some(join_handle) => {
self.abort_handle.abort();
if let Err(error) = join_handle.await {
// Couldn't use `error!(e; xxx)` as JoinError doesn't implement ErrorExt.
error!(
"Unexpected error during shutdown Postgres server, error: {}",
error
);
} else {
info!("Postgres server is shutdown.")
}
Ok(())
}
None => error::InternalSnafu {
err_msg: "Postgres server is not started.",
}
.fail()?,
}
self.base_server.shutdown().await
}
async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr> {
match self.abort_registration.take() {
Some(registration) => {
let (stream, listener) = Self::bind(listening).await?;
let stream = Abortable::new(stream, registration);
self.join_handle = Some(tokio::spawn(self.accept(stream)));
Ok(listener)
}
None => error::InternalSnafu {
err_msg: "Postgres server has been started.",
}
.fail()?,
}
let (stream, addr) = self.base_server.bind(listening).await?;
let io_runtime = self.base_server.io_runtime();
let join_handle = tokio::spawn(self.accept(io_runtime, stream));
self.base_server.start_with(join_handle)?;
Ok(addr)
}
}

View File

@@ -5,6 +5,7 @@ use async_trait::async_trait;
use common_query::Output;
use crate::error::Result;
use crate::opentsdb::codec::DataPoint;
/// All query handler traits for various request protocols, like SQL or GRPC.
/// Instance that wishes to support certain request protocol, just implement the corresponding
@@ -19,6 +20,7 @@ use crate::error::Result;
pub type SqlQueryHandlerRef = Arc<dyn SqlQueryHandler + Send + Sync>;
pub type GrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler + Send + Sync>;
pub type GrpcAdminHandlerRef = Arc<dyn GrpcAdminHandler + Send + Sync>;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
#[async_trait]
pub trait SqlQueryHandler {
@@ -36,3 +38,10 @@ pub trait GrpcQueryHandler {
pub trait GrpcAdminHandler {
async fn exec_admin_request(&self, expr: AdminExpr) -> Result<AdminResult>;
}
#[async_trait]
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint) -> Result<()>;
}

View File

@@ -1,11 +1,115 @@
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{error, info};
use futures::future::AbortRegistration;
use futures::future::{AbortHandle, Abortable};
use snafu::ResultExt;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::TcpListenerStream;
use crate::error::Result;
use crate::error::{self, Result};
pub(crate) type AbortableStream = Abortable<TcpListenerStream>;
#[async_trait]
pub trait Server: Send {
async fn shutdown(&mut self) -> Result<()>;
async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr>;
}
pub(crate) struct BaseTcpServer {
name: String,
// `abort_handle` and `abort_registration` are used in pairs in shutting down the server.
// They work like sender and receiver for aborting stream. When the server is shutting down,
// calling `abort_handle.abort()` will "notify" `abort_registration` to stop emitting new
// elements in the stream.
abort_handle: AbortHandle,
abort_registration: Option<AbortRegistration>,
// A handle holding the TCP accepting task.
join_handle: Option<JoinHandle<()>>,
io_runtime: Arc<Runtime>,
}
impl BaseTcpServer {
pub(crate) fn create_server(name: impl Into<String>, io_runtime: Arc<Runtime>) -> Self {
let (abort_handle, registration) = AbortHandle::new_pair();
Self {
name: name.into(),
abort_handle,
abort_registration: Some(registration),
join_handle: None,
io_runtime,
}
}
pub(crate) async fn shutdown(&mut self) -> Result<()> {
match self.join_handle.take() {
Some(join_handle) => {
self.abort_handle.abort();
if let Err(error) = join_handle.await {
// Couldn't use `error!(e; xxx)` because JoinError doesn't implement ErrorExt.
error!(
"Unexpected error during shutdown {} server, error: {}",
&self.name, error
);
} else {
info!("{} server is shutdown.", &self.name);
}
Ok(())
}
None => error::InternalSnafu {
err_msg: format!("{} server is not started.", &self.name),
}
.fail()?,
}
}
pub(crate) async fn bind(
&mut self,
addr: SocketAddr,
) -> Result<(Abortable<TcpListenerStream>, SocketAddr)> {
match self.abort_registration.take() {
Some(registration) => {
let listener =
tokio::net::TcpListener::bind(addr)
.await
.context(error::TokioIoSnafu {
err_msg: format!("Failed to bind addr {}", addr),
})?;
// get actually bond addr in case input addr use port 0
let addr = listener.local_addr()?;
info!("{} server started at {}", &self.name, addr);
let stream = TcpListenerStream::new(listener);
let stream = Abortable::new(stream, registration);
Ok((stream, addr))
}
None => error::InternalSnafu {
err_msg: format!("{} server has been started.", &self.name),
}
.fail()?,
}
}
pub(crate) fn start_with(&mut self, join_handle: JoinHandle<()>) -> Result<()> {
if self.join_handle.is_some() {
return error::InternalSnafu {
err_msg: format!("{} server has been started.", &self.name),
}
.fail();
}
let _ = self.join_handle.insert(join_handle);
Ok(())
}
pub(crate) fn io_runtime(&self) -> Arc<Runtime> {
self.io_runtime.clone()
}
}

View File

@@ -0,0 +1,51 @@
//! Copied from tokio's mini-redis example.
use tokio::sync::broadcast;
/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub(crate) struct Shutdown {
/// `true` if the shutdown signal has been received
shutdown: bool,
/// The receive half of the channel used to listen for shutdown.
notify: broadcast::Receiver<()>,
}
impl Shutdown {
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown {
shutdown: false,
notify,
}
}
/// Returns `true` if the shutdown signal has been received.
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown
}
/// Receive the shutdown notice, waiting if necessary.
pub(crate) async fn recv(&mut self) {
// If the shutdown signal has already been received, then return
// immediately.
if self.shutdown {
return;
}
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
// Remember that the signal has been received.
self.shutdown = true;
}
}

View File

@@ -1,7 +1,6 @@
use std::collections::HashMap;
use axum::extract::{Json, Query};
use axum::Extension;
use axum::extract::{Json, Query, State};
use common_telemetry::metric;
use metrics::counter;
use servers::http::handler as http_handler;
@@ -14,9 +13,7 @@ use crate::create_testing_sql_query_handler;
#[tokio::test]
async fn test_sql_not_provided() {
let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let extension = Extension(query_handler);
let json = http_handler::sql(extension, Query(HashMap::default())).await;
let json = http_handler::sql(State(query_handler), Query(HashMap::default())).await;
match json {
HttpResponse::Json(json) => {
assert!(!json.success());
@@ -36,9 +33,8 @@ async fn test_sql_output_rows() {
let query = create_query();
let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let extension = Extension(query_handler);
let json = http_handler::sql(extension, query).await;
let json = http_handler::sql(State(query_handler), query).await;
match json {
HttpResponse::Json(json) => {
assert!(json.success(), "{:?}", json);
@@ -61,10 +57,7 @@ async fn test_metrics() {
counter!("test_metrics", 1);
let query = create_query();
let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let extension = Extension(query_handler);
let text = http_handler::metrics(extension, query).await;
let text = http_handler::metrics(query).await;
match text {
HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")),
_ => unreachable!(),
@@ -77,9 +70,8 @@ async fn test_scripts() {
let exec = create_script_payload();
let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let extension = Extension(query_handler);
let json = http_handler::scripts(extension, exec).await;
let json = http_handler::scripts(State(query_handler), exec).await;
match json {
HttpResponse::Json(json) => {
assert!(json.success(), "{:?}", json);

View File

@@ -1 +1,2 @@
mod http_handler_test;
mod opentsdb_test;

View File

@@ -0,0 +1,195 @@
use std::sync::Arc;
use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use servers::error::{self, Result};
use servers::http::HttpServer;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::{OpentsdbProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
struct DummyInstance {
tx: mpsc::Sender<String>,
}
#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint) -> Result<()> {
if data_point.metric() == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
let _ = self.tx.send(data_point.metric().to_string()).await;
Ok(())
}
}
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _query: &str) -> Result<Output> {
unimplemented!()
}
async fn insert_script(&self, _name: &str, _script: &str) -> Result<()> {
unimplemented!()
}
async fn execute_script(&self, _name: &str) -> Result<Output> {
unimplemented!()
}
}
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
server.set_opentsdb_handler(instance);
server.make_app()
}
#[tokio::test]
async fn test_opentsdb_put() {
let (tx, mut rx) = mpsc::channel(100);
let app = make_test_app(tx);
let client = TestClient::new(app);
// single data point put
let result = client
.post("/v1/opentsdb/api/put")
.body(create_data_point("m1"))
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// multiple data point put
let result = client
.post("/v1/opentsdb/api/put")
.body(format!(
"[{},{}]",
create_data_point("m2"),
create_data_point("m3")
))
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// bad data point
let result = client
.post("/v1/opentsdb/api/put")
.body("hello, world")
.send()
.await;
assert_eq!(result.status(), 400);
assert_eq!(
result.text().await,
"{\"error\":\"Invalid OpenTSDB Json request, source: expected value at line 1 column 1\"}"
);
// internal server error
let result = client
.post("/v1/opentsdb/api/put")
.body(create_data_point("should_failed"))
.send()
.await;
assert_eq!(result.status(), 500);
assert_eq!(
result.text().await,
"{\"error\":\"Internal error: Internal error: expected\"}"
);
let mut metrics = vec![];
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
);
}
#[tokio::test]
async fn test_opentsdb_debug_put() {
let (tx, mut rx) = mpsc::channel(100);
let app = make_test_app(tx);
let client = TestClient::new(app);
// single data point summary debug put
let result = client
.post("/v1/opentsdb/api/put?summary")
.body(create_data_point("m11"))
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":1,\"failed\":0}");
let result = client
.post("/v1/opentsdb/api/put?summary")
.body(create_data_point("should_failed"))
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":0,\"failed\":1}");
let result = client
.post("/v1/opentsdb/api/put?details")
.body(create_data_point("should_failed"))
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":0,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}");
// multiple data point summary debug put
let result = client
.post("/v1/opentsdb/api/put?summary")
.body(format!(
"[{},{}]",
create_data_point("should_failed"),
create_data_point("m22"),
))
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":1,\"failed\":1}");
let result = client
.post("/v1/opentsdb/api/put?details")
.body(format!(
"[{},{}]",
create_data_point("should_failed"),
create_data_point("m33")
))
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":1,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}");
let mut metrics = vec![];
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec!["m11".to_string(), "m22".to_string(), "m33".to_string()]
);
}
fn create_data_point(metric: &str) -> String {
format!(
r#"{{
"metric": "{}",
"timestamp": 1000,
"value": 1,
"tags": {{
"host": "web01"
}}
}}"#,
metric
)
}

View File

@@ -18,7 +18,7 @@ use script::{
engine::{CompileContext, EvalContext, Script, ScriptEngine},
python::{PyEngine, PyScript},
};
#[cfg(feature = "postgres")]
mod opentsdb;
mod postgres;
struct DummyInstance {

View File

@@ -0,0 +1,186 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_runtime::Builder as RuntimeBuilder;
use rand::rngs::StdRng;
use rand::Rng;
use servers::error::{self as server_error, Result};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::connection::Connection;
use servers::opentsdb::OpentsdbServer;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::server::Server;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
struct DummyOpentsdbInstance {
tx: mpsc::Sender<i32>,
}
#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_point: &DataPoint) -> Result<()> {
let metric = data_point.metric();
if metric == "should_failed" {
return server_error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
let i = metric.parse::<i32>().unwrap();
let _ = self.tx.send(i * i).await;
Ok(())
}
}
fn create_opentsdb_server(tx: mpsc::Sender<i32>) -> Result<Box<dyn Server>> {
let query_handler = Arc::new(DummyOpentsdbInstance { tx });
let io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("opentsdb-io-handlers")
.build()
.unwrap(),
);
Ok(OpentsdbServer::create_server(query_handler, io_runtime))
}
#[tokio::test]
async fn test_start_opentsdb_server() -> Result<()> {
let (tx, _) = mpsc::channel(100);
let mut server = create_opentsdb_server(tx)?;
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let result = server.start(listening).await;
assert!(result.is_ok());
let result = server.start(listening).await;
assert!(result
.unwrap_err()
.to_string()
.contains("OpenTSDB server has been started."));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_opentsdb_server() -> Result<()> {
let (tx, _) = mpsc::channel(100);
let mut server = create_opentsdb_server(tx)?;
let result = server.shutdown().await;
assert!(result
.unwrap_err()
.to_string()
.contains("OpenTSDB server is not started."));
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let mut join_handles = vec![];
for _ in 0..2 {
join_handles.push(tokio::spawn(async move {
for i in 0..1000 {
let stream = TcpStream::connect(addr).await;
match stream {
Ok(stream) => {
let mut connection = Connection::new(stream);
let result = connection.write_line(format!("put {} 1 1", i)).await;
if let Err(e) = result {
return Err(e.to_string());
}
}
Err(e) => return Err(e.to_string()),
}
}
Ok(())
}))
}
tokio::time::sleep(Duration::from_millis(10)).await;
let result = server.shutdown().await;
assert!(result.is_ok());
for handle in join_handles.iter_mut() {
let result = handle.await.unwrap();
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.contains("Connection refused") || error.contains("Connection reset by peer"));
}
Ok(())
}
#[tokio::test]
async fn test_query() -> Result<()> {
let (tx, mut rx) = mpsc::channel(10);
let mut server = create_opentsdb_server(tx)?;
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let stream = TcpStream::connect(addr).await.unwrap();
let mut connection = Connection::new(stream);
connection.write_line("put 100 1 1".to_string()).await?;
assert_eq!(rx.recv().await.unwrap(), 10000);
connection
.write_line("foo illegal put line".to_string())
.await
.unwrap();
let result = connection.read_line().await?;
assert_eq!(
result,
Some("Invalid query: unknown command foo.".to_string())
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_query_concurrently() -> Result<()> {
let threads = 4;
let expect_executed_queries_per_worker = 1000;
let (tx, mut rx) = mpsc::channel(threads * expect_executed_queries_per_worker);
let mut server = create_opentsdb_server(tx)?;
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let mut join_handles = vec![];
for _ in 0..threads {
join_handles.push(tokio::spawn(async move {
let mut rand: StdRng = rand::SeedableRng::from_entropy();
let stream = TcpStream::connect(addr).await.unwrap();
let mut connection = Connection::new(stream);
for i in 0..expect_executed_queries_per_worker {
connection
.write_line(format!("put {} 1 1", i))
.await
.unwrap();
let should_recreate_conn = rand.gen_range(0..100) == 1;
if should_recreate_conn {
let stream = TcpStream::connect(addr).await.unwrap();
connection = Connection::new(stream);
}
}
expect_executed_queries_per_worker
}))
}
let mut total_pending_queries = threads * expect_executed_queries_per_worker;
for handle in join_handles.iter_mut() {
total_pending_queries -= handle.await.unwrap();
}
assert_eq!(0, total_pending_queries);
let mut expected_result: i32 = (threads
* (0..expect_executed_queries_per_worker)
.map(|i| i * i)
.sum::<usize>()) as i32;
while let Some(i) = rx.recv().await {
expected_result -= i;
if expected_result == 0 {
break;
}
}
Ok(())
}

View File

@@ -196,12 +196,13 @@ impl ErrorExt for Error {
| BuildTableInfo { .. }
| BuildRegionDescriptor { .. }
| TableExists { .. }
| ColumnExists { .. }
| ProjectedColumnNotFound { .. }
| MissingTimestampIndex { .. }
| UnsupportedDefaultConstraint { .. }
| TableNotFound { .. } => StatusCode::InvalidArguments,
ColumnExists { .. } => StatusCode::TableColumnExists,
TableInfoNotFound { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,

View File

@@ -8,7 +8,7 @@ use async_trait::async_trait;
use common_error::mock::MockError;
use common_telemetry::logging;
use datatypes::prelude::{Value, VectorBuilder, VectorRef};
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, Schema};
use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::{Mutation, WriteBatch};
use store_api::storage::{
@@ -75,14 +75,31 @@ impl Snapshot for MockSnapshot {
async fn scan(
&self,
_ctx: &ReadContext,
_request: ScanRequest,
request: ScanRequest,
) -> Result<ScanResponse<MockChunkReader>> {
let memtable = {
let memtable = self.region.memtable.read().unwrap();
memtable.clone()
};
let schema = self.schema();
let projection_schema = if let Some(projection) = request.projection {
let mut columns = Vec::with_capacity(projection.len());
for idx in projection {
columns.push(
schema
.column_schema_by_name(schema.column_name_by_index(idx))
.unwrap()
.clone(),
);
}
Arc::new(Schema::new(columns))
} else {
schema.clone()
};
let reader = MockChunkReader {
schema: self.schema().clone(),
schema: projection_schema,
memtable,
read: false,
};