feat: unify servers and mysql server in datanode (#172)

* address PR comments

address PR comments

use 3306 for mysql server's default port

upgrade metric to version 0.20

move crate "servers" out of "common"

make mysql io threads count configurable in config file

add snafu backtrace for errors with source

use common-server error for mysql server

add test for grpc server

refactor testing codes

fix rustfmt check

start mysql server in datanode

move grpc server codes from datanode to common-servers

feat: unify servers

* rebase develop and resolve conflicts

* remove an unnecessary todo

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-08-17 14:29:12 +08:00
committed by GitHub
parent 6d23118aa0
commit ccda17248e
53 changed files with 1036 additions and 768 deletions

View File

@@ -19,15 +19,17 @@ catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
datatypes = { path = "../datatypes"}
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }
metrics = "0.18"
metrics = "0.20"
object-store = { path = "../object-store" }
query = { path = "../query" }
serde = "1.0"
serde_json = "1.0"
servers = { path = "../servers" }
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
storage = { path = "../storage" }
@@ -42,6 +44,7 @@ tower-http = { version ="0.3", features = ["full"]}
[dev-dependencies]
axum-test-helper = "0.1"
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
tempdir = "0.3"

View File

@@ -24,6 +24,8 @@ impl Default for ObjectStoreConfig {
pub struct DatanodeOptions {
pub http_addr: String,
pub rpc_addr: String,
pub mysql_addr: String,
pub mysql_runtime_size: u32,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
}
@@ -33,6 +35,8 @@ impl Default for DatanodeOptions {
Self {
http_addr: "0.0.0.0:3000".to_string(),
rpc_addr: "0.0.0.0:3001".to_string(),
mysql_addr: "0.0.0.0:3306".to_string(),
mysql_runtime_size: 2,
wal_dir: "/tmp/wal".to_string(),
storage: ObjectStoreConfig::default(),
}
@@ -49,15 +53,15 @@ pub struct Datanode {
impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::new(&opts).await?);
let services = Services::try_new(instance.clone(), &opts)?;
Ok(Self {
opts,
services: Services::new(instance.clone()),
services,
instance,
})
}
pub async fn start(&self) -> Result<()> {
pub async fn start(&mut self) -> Result<()> {
self.instance.start().await?;
self.services.start(&self.opts).await
}

View File

@@ -80,10 +80,11 @@ pub enum Error {
#[snafu(display("Fail to convert bytes to insert batch, {}", source))]
DecodeInsert { source: DecodeError },
// The error source of http error is clear even without backtrace now so
// a backtrace is not carried in this varaint.
#[snafu(display("Fail to start HTTP server, source: {}", source))]
StartHttp { source: hyper::Error },
#[snafu(display("Failed to start server, source: {}", source))]
StartServer {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Fail to parse address {}, source: {}", addr, source))]
ParseAddr {
@@ -122,6 +123,12 @@ pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String },
#[snafu(display("Runtime resource error, source: {}", source))]
RuntimeResource {
#[snafu(backtrace)]
source: common_runtime::error::Error,
},
#[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))]
InvalidCreateTableSql { msg: String, backtrace: Backtrace },
@@ -179,7 +186,7 @@ impl ErrorExt for Error {
| Error::KeyColumnNotFound { .. }
| Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. }
Error::StartServer { .. }
| Error::ParseAddr { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. }
@@ -190,6 +197,7 @@ impl ErrorExt for Error {
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
}
}

View File

@@ -1,16 +1,18 @@
use std::{fs, path, sync::Arc};
use api::v1::InsertExpr;
use api::v1::{object_expr, select_expr, InsertExpr, ObjectExpr, ObjectResult, SelectExpr};
use async_trait::async_trait;
use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
use common_telemetry::logging::info;
use common_telemetry::timer;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use object_store::{backend::fs::Backend, util, ObjectStore};
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::{OptionExt, ResultExt};
use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler};
use snafu::prelude::*;
use sql::statements::statement::Statement;
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
#[cfg(test)]
use table::engine::TableEngineRef;
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
@@ -18,7 +20,10 @@ use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu,
};
use crate::metric;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::insertion_expr_to_request;
use crate::server::grpc::select::to_object_result;
use crate::sql::{SqlHandler, SqlRequest};
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
@@ -146,61 +151,36 @@ impl Instance {
Ok(())
}
#[cfg(test)]
pub fn table_engine(&self) -> TableEngineRef {
self.sql_handler.table_engine()
async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult {
match self.execute_grpc_insert(insert_expr).await {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Err(err) => {
// TODO(fys): failure count
build_err_result(&err)
}
_ => unreachable!(),
}
}
#[cfg(test)]
pub async fn create_test_table(&self) -> Result<()> {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use table::engine::EngineContext;
use table::requests::CreateTableRequest;
async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult {
match select_expr.expr {
Some(select_expr::Expr::Sql(sql)) => {
let result = self.execute_sql(&sql).await;
to_object_result(result).await
}
None => ObjectResult::default(),
}
}
use crate::error::CreateTableSnafu;
pub fn sql_handler(&self) -> &SqlHandler<DefaultEngine> {
&self.sql_handler
}
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
let table_name = "demo";
let table = self
.table_engine()
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
table_name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(
Schema::with_timestamp_index(column_schemas, 3)
.expect("ts is expected to be timestamp column"),
),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
},
)
.await
.context(CreateTableSnafu { table_name })?;
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.unwrap();
Ok(())
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
}
@@ -243,84 +223,40 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result<LocalFile
Ok(log_store)
}
#[cfg(test)]
mod tests {
use arrow::array::UInt64Array;
use common_recordbatch::util;
use super::*;
use crate::test_util;
#[tokio::test]
async fn test_execute_insert() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
instance.create_test_table().await.unwrap();
let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(&self, query: &str) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_sql(query)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test]
async fn test_execute_query() {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
.execute_sql("select sum(number) from numbers limit 20")
.await
.unwrap();
match output {
Output::RecordBatch(recordbatch) => {
let numbers = util::collect(recordbatch).await.unwrap();
let columns = numbers[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(columns[0].len(), 1);
assert_eq!(
*columns[0].as_any().downcast_ref::<UInt64Array>().unwrap(),
UInt64Array::from_slice(&[4950])
);
}
_ => unreachable!(),
}
}
#[tokio::test]
pub async fn test_execute_create() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
instance.create_test_table().await.unwrap();
let output = instance
.execute_sql(
r#"create table test_table(
host string,
ts bigint,
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
) engine=mito with(regions=1);"#,
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
// TODO(LFC): use snafu's `context` to include source error and backtrace.
// Ideally we should define a snafu in servers::error to wrap the error thrown
// by `execute_sql`. However, we cannot do that because that would introduce a circular
// dependency.
.map_err(|e| {
servers::error::ExecuteQuerySnafu {
query,
err_msg: format!("{}", e),
}
.fail::<servers::error::Error>()
.unwrap_err()
})
}
}
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> servers::error::Result<ObjectResult> {
let object_resp = match query.expr {
Some(object_expr::Expr::Insert(insert_expr)) => self.handle_insert(insert_expr).await,
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
}
.fail();
}
};
Ok(object_resp)
}
}

View File

@@ -6,8 +6,3 @@ pub mod instance;
mod metric;
pub mod server;
mod sql;
#[cfg(test)]
pub mod test_util;
#[cfg(test)]
mod tests;

View File

@@ -1,33 +1,66 @@
pub mod grpc;
pub mod http;
use grpc::GrpcServer;
use http::HttpServer;
use std::net::SocketAddr;
use std::sync::Arc;
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::mysql::server::MysqlServer;
use servers::server::Server;
use snafu::ResultExt;
use tokio::try_join;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::error::{self, Result};
use crate::instance::InstanceRef;
/// All rpc services.
pub struct Services {
http_server: HttpServer,
grpc_server: GrpcServer,
mysql_server: Box<dyn Server>,
}
impl Services {
pub fn new(instance: InstanceRef) -> Self {
Self {
pub fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result<Self> {
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
Ok(Self {
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance),
}
grpc_server: GrpcServer::new(instance.clone()),
mysql_server: MysqlServer::create_server(instance, mysql_io_runtime),
})
}
pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> {
// TODO(LFC): make servers started on demand (not starting mysql if no needed, for example)
pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let http_addr = &opts.http_addr;
let http_addr: SocketAddr = http_addr
.parse()
.context(error::ParseAddrSnafu { addr: http_addr })?;
let grpc_addr = &opts.rpc_addr;
let grpc_addr: SocketAddr = grpc_addr
.parse()
.context(error::ParseAddrSnafu { addr: grpc_addr })?;
let mysql_addr = &opts.mysql_addr;
let mysql_addr: SocketAddr = mysql_addr
.parse()
.context(error::ParseAddrSnafu { addr: mysql_addr })?;
try_join!(
self.http_server.start(&opts.http_addr),
self.grpc_server.start(&opts.rpc_addr)
)?;
self.http_server.start(http_addr),
self.grpc_server.start(grpc_addr),
self.mysql_server.start(mysql_addr),
)
.context(error::StartServerSnafu)?;
Ok(())
}
}

View File

@@ -1,43 +1,3 @@
mod handler;
pub(crate) mod handler;
pub mod insert;
mod select;
mod server;
use common_telemetry::logging::info;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use crate::{
error::{Result, StartGrpcSnafu, TcpBindSnafu},
instance::InstanceRef,
server::grpc::{handler::BatchHandler, server::Server},
};
pub struct GrpcServer {
handler: BatchHandler,
}
impl GrpcServer {
pub fn new(instance: InstanceRef) -> Self {
Self {
handler: BatchHandler::new(instance),
}
}
pub async fn start(&self, addr: &str) -> Result<()> {
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
info!("The gRPC server is running at {}", addr);
let svc = Server::new(self.handler.clone()).into_service();
let _ = tonic::transport::Server::builder()
.add_service(svc)
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.context(StartGrpcSnafu)?;
Ok(())
}
}
pub mod select;

View File

@@ -1,83 +1,10 @@
use api::v1::{
codec::SelectResult, object_expr, object_result, select_expr, BatchRequest, BatchResponse,
DatabaseResponse, InsertExpr, MutateResult, ObjectResult, ResultHeader, SelectExpr,
codec::SelectResult, object_result, MutateResult, ObjectResult, ResultHeader,
SelectResult as SelectResultRaw,
};
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use query::Output;
use crate::server::grpc::{select::to_object_result, server::PROTOCOL_VERSION};
use crate::{error::Result, error::UnsupportedExprSnafu, instance::InstanceRef};
#[derive(Clone)]
pub struct BatchHandler {
instance: InstanceRef,
}
impl BatchHandler {
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}
pub async fn batch(&self, batch_req: BatchRequest) -> Result<BatchResponse> {
let mut batch_resp = BatchResponse::default();
let mut db_resp = DatabaseResponse::default();
let databases = batch_req.databases;
for req in databases {
let exprs = req.exprs;
for obj_expr in exprs {
let object_resp = match obj_expr.expr {
Some(object_expr::Expr::Insert(insert_expr)) => {
self.handle_insert(insert_expr).await
}
Some(object_expr::Expr::Select(select_expr)) => {
self.handle_select(select_expr).await
}
other => {
return UnsupportedExprSnafu {
name: format!("{:?}", other),
}
.fail();
}
};
db_resp.results.push(object_resp);
}
}
batch_resp.databases.push(db_resp);
Ok(batch_resp)
}
pub async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult {
match self.instance.execute_grpc_insert(insert_expr).await {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Err(err) => {
// TODO(fys): failure count
build_err_result(&err)
}
_ => unreachable!(),
}
}
pub async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult {
let expr = match select_expr.expr {
Some(expr) => expr,
None => return ObjectResult::default(),
};
match expr {
select_expr::Expr::Sql(sql) => {
let result = self.instance.execute_sql(&sql).await;
to_object_result(result).await
}
}
}
}
pub const PROTOCOL_VERSION: u32 = 1;
pub type Success = u32;
pub type Failure = u32;
@@ -165,9 +92,8 @@ mod tests {
use api::v1::{object_result, MutateResult};
use common_error::status_code::StatusCode;
use super::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::handler::UnsupportedExprSnafu;
use crate::server::grpc::server::PROTOCOL_VERSION;
use super::*;
use crate::error::UnsupportedExprSnafu;
#[test]
fn test_object_result_builder() {

View File

@@ -13,7 +13,7 @@ use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result};
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
pub(crate) async fn to_object_result(result: Result<Output>) -> ObjectResult {
pub async fn to_object_result(result: Result<Output>) -> ObjectResult {
match result {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)

View File

@@ -1,30 +0,0 @@
use api::v1::*;
use tonic::{Request, Response, Status};
use super::handler::BatchHandler;
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Clone)]
pub struct Server {
handler: BatchHandler,
}
impl Server {
pub fn new(handler: BatchHandler) -> Self {
Self { handler }
}
pub fn into_service(self) -> greptime_server::GreptimeServer<Self> {
greptime_server::GreptimeServer::new(self)
}
}
#[tonic::async_trait]
impl greptime_server::Greptime for Server {
async fn batch(&self, req: Request<BatchRequest>) -> Result<Response<BatchResponse>, Status> {
let req = req.into_inner();
let res = self.handler.batch(req).await?;
Ok(Response::new(res))
}
}

View File

@@ -1,139 +0,0 @@
mod handler;
use std::net::SocketAddr;
use std::time::Duration;
use axum::{
error_handling::HandleErrorLayer,
response::IntoResponse,
response::{Json, Response},
routing::get,
BoxError, Extension, Router,
};
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::info;
use query::Output;
use serde::Serialize;
use snafu::ResultExt;
use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
use crate::error::{ParseAddrSnafu, Result, StartHttpSnafu};
use crate::server::InstanceRef;
/// Http server
pub struct HttpServer {
instance: InstanceRef,
}
#[derive(Serialize, Debug)]
pub enum JsonOutput {
AffectedRows(usize),
Rows(Vec<RecordBatch>),
}
/// Http response
#[derive(Serialize, Debug)]
pub enum HttpResponse {
Json(JsonResponse),
Text(String),
}
/// Json response
#[derive(Serialize, Debug)]
pub struct JsonResponse {
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
output: Option<JsonOutput>,
}
impl IntoResponse for HttpResponse {
fn into_response(self) -> Response {
match self {
HttpResponse::Json(json) => Json(json).into_response(),
HttpResponse::Text(text) => text.into_response(),
}
}
}
impl JsonResponse {
fn with_error(error: Option<String>) -> Self {
JsonResponse {
success: false,
error,
output: None,
}
}
fn with_output(output: Option<JsonOutput>) -> Self {
JsonResponse {
success: true,
error: None,
output,
}
}
/// Create a json response from query result
async fn from_output(output: Result<Output>) -> Self {
match output {
Ok(Output::AffectedRows(rows)) => {
Self::with_output(Some(JsonOutput::AffectedRows(rows)))
}
Ok(Output::RecordBatch(stream)) => match util::collect(stream).await {
Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))),
Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))),
},
Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))),
}
}
}
async fn shutdown_signal() {
// Wait for the CTRL+C signal
// It has an issue on chrome: https://github.com/sigp/lighthouse/issues/478
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
}
impl HttpServer {
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}
pub fn make_app(&self) -> Router {
Router::new()
// handlers
.route("/sql", get(handler::sql))
.route("/metrics", get(handler::metrics))
// middlewares
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(TraceLayer::new_for_http())
.layer(Extension(self.instance.clone()))
// TODO configure timeout
.layer(TimeoutLayer::new(Duration::from_secs(30))),
)
}
pub async fn start(&self, addr: &str) -> Result<()> {
let app = self.make_app();
let socket_addr: SocketAddr = addr.parse().context(ParseAddrSnafu { addr })?;
info!("Datanode HTTP server is listening on {}", socket_addr);
let server = axum::Server::bind(&socket_addr).serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await.context(StartHttpSnafu)
}
}
/// handle error middleware
async fn handle_error(err: BoxError) -> Json<JsonResponse> {
Json(JsonResponse {
success: false,
error: Some(format!("Unhandled internal error: {}", err)),
output: None,
})
}

View File

@@ -1,123 +0,0 @@
// http handlers
use std::collections::HashMap;
use axum::extract::{Extension, Query};
use common_telemetry::{metric, timer};
use crate::instance::InstanceRef;
use crate::metric::METRIC_HANDLE_SQL_ELAPSED;
use crate::server::http::{HttpResponse, JsonResponse};
/// Handler to execute sql
#[axum_macros::debug_handler]
pub async fn sql(
Extension(instance): Extension<InstanceRef>,
Query(params): Query<HashMap<String, String>>,
) -> HttpResponse {
let _timer = timer!(METRIC_HANDLE_SQL_ELAPSED);
if let Some(sql) = params.get("sql") {
HttpResponse::Json(JsonResponse::from_output(instance.execute_sql(sql).await).await)
} else {
HttpResponse::Json(JsonResponse::with_error(Some(
"sql parameter is required.".to_string(),
)))
}
}
/// Handler to export metrics
#[axum_macros::debug_handler]
pub async fn metrics(
Extension(_instance): Extension<InstanceRef>,
Query(_params): Query<HashMap<String, String>>,
) -> HttpResponse {
if let Some(handle) = metric::try_handle() {
HttpResponse::Text(handle.render())
} else {
HttpResponse::Text("Prometheus handle not initialized.".to_string())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use metrics::counter;
use super::*;
use crate::instance::Instance;
use crate::server::http::JsonOutput;
use crate::test_util::{self, TestGuard};
fn create_params() -> Query<HashMap<String, String>> {
let mut map = HashMap::new();
map.insert(
"sql".to_string(),
"select sum(number) from numbers limit 20".to_string(),
);
Query(map)
}
async fn create_extension() -> (Extension<InstanceRef>, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
(Extension(instance), guard)
}
#[tokio::test]
async fn test_sql_not_provided() {
let (extension, _guard) = create_extension().await;
let json = sql(extension, Query(HashMap::default())).await;
match json {
HttpResponse::Json(json) => {
assert!(!json.success);
assert_eq!(Some("sql parameter is required.".to_string()), json.error);
assert!(json.output.is_none());
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_sql_output_rows() {
common_telemetry::init_default_ut_logging();
let query = create_params();
let (extension, _guard) = create_extension().await;
let json = sql(extension, query).await;
match json {
HttpResponse::Json(json) => {
assert!(json.success, "{:?}", json);
assert!(json.error.is_none());
assert!(json.output.is_some());
match json.output.unwrap() {
JsonOutput::Rows(rows) => {
assert_eq!(1, rows.len());
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_metrics() {
metric::init_default_metrics_recorder();
counter!("test_metrics", 1);
let query = create_params();
let (extension, _guard) = create_extension().await;
let text = metrics(extension, query).await;
match text {
HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")),
_ => unreachable!(),
}
}
}

View File

@@ -50,7 +50,6 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
.context(TableNotFoundSnafu { table_name })
}
#[cfg(test)]
pub fn table_engine(&self) -> Arc<Engine> {
self.table_engine.clone()
}

View File

@@ -1,32 +0,0 @@
use tempdir::TempDir;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
/// Only for test.
///
/// TODO: Add a test feature
pub struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
}
pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap();
let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
..Default::default()
};
(
opts,
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
},
)
}

View File

@@ -1 +0,0 @@
mod http_test;

View File

@@ -0,0 +1,113 @@
mod test_util;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::{codec::InsertBatch, column, select_expr, Column, SelectExpr};
use client::{Client, Database, ObjectResult};
use datanode::instance::Instance;
use servers::grpc::GrpcServer;
use servers::server::Server;
#[tokio::test]
async fn test_insert_and_select() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
tokio::spawn(async move {
let mut grpc_server = GrpcServer::new(instance);
let addr = "127.0.0.1:3001".parse::<SocketAddr>().unwrap();
grpc_server.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let db = Database::new("greptime", grpc_client);
// testing data:
let expected_host_col = Column {
column_name: "host".to_string(),
values: Some(column::Values {
string_values: vec!["host1", "host2", "host3", "host4"]
.into_iter()
.map(|s| s.to_string())
.collect(),
..Default::default()
}),
..Default::default()
};
let expected_cpu_col = Column {
column_name: "cpu".to_string(),
values: Some(column::Values {
f64_values: vec![0.31, 0.41, 0.2],
..Default::default()
}),
null_mask: vec![2],
..Default::default()
};
let expected_mem_col = Column {
column_name: "memory".to_string(),
values: Some(column::Values {
f64_values: vec![0.1, 0.2, 0.3],
..Default::default()
}),
null_mask: vec![4],
..Default::default()
};
let expected_ts_col = Column {
column_name: "ts".to_string(),
values: Some(column::Values {
i64_values: vec![100, 101, 102, 103],
..Default::default()
}),
..Default::default()
};
// insert
let values = vec![InsertBatch {
columns: vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
expected_mem_col.clone(),
expected_ts_col.clone(),
],
row_count: 4,
}
.into()];
let result = db.insert("demo", values).await;
assert!(result.is_ok());
// select
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::Sql("select * from demo".to_string())),
};
let result = db.select(select_expr).await.unwrap();
assert!(matches!(result, ObjectResult::Select(_)));
match result {
ObjectResult::Select(select_result) => {
assert_eq!(4, select_result.row_count);
let actual_columns = select_result.columns;
assert_eq!(4, actual_columns.len());
let expected_columns = vec![
expected_ts_col,
expected_host_col,
expected_cpu_col,
expected_mem_col,
];
expected_columns
.iter()
.zip(actual_columns.iter())
.for_each(|(x, y)| assert_eq!(x, y));
}
_ => unreachable!(),
}
}

View File

@@ -1,14 +1,13 @@
//! http server test
mod test_util;
use std::sync::Arc;
use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use crate::instance::Instance;
use crate::server::http::HttpServer;
use crate::test_util::{self, TestGuard};
use datanode::instance::Instance;
use servers::http::HttpServer;
use test_util::TestGuard;
async fn make_test_app() -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts();

View File

@@ -0,0 +1,80 @@
mod test_util;
use arrow::array::UInt64Array;
use common_recordbatch::util;
use datanode::instance::Instance;
use query::Output;
#[tokio::test]
async fn test_execute_insert() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test]
async fn test_execute_query() {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
.execute_sql("select sum(number) from numbers limit 20")
.await
.unwrap();
match output {
Output::RecordBatch(recordbatch) => {
let numbers = util::collect(recordbatch).await.unwrap();
let columns = numbers[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(columns[0].len(), 1);
assert_eq!(
*columns[0].as_any().downcast_ref::<UInt64Array>().unwrap(),
UInt64Array::from_slice(&[4950])
);
}
_ => unreachable!(),
}
}
#[tokio::test]
pub async fn test_execute_create() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
let output = instance
.execute_sql(
r#"create table test_table(
host string,
ts bigint,
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
) engine=mito with(regions=1);"#,
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
}

View File

@@ -0,0 +1,85 @@
use std::sync::Arc;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::Instance;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use snafu::ResultExt;
use table::engine::EngineContext;
use table::engine::TableEngineRef;
use table::requests::CreateTableRequest;
use tempdir::TempDir;
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
/// Only for test.
pub struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
}
pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap();
let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
..Default::default()
};
(
opts,
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
},
)
}
// It's actually not dead codes, at least been used in instance_test.rs and grpc_test.rs
// However, clippy keeps warning us, so I temporary add an "allow" to bypass it.
// TODO(LFC): further investigate why clippy falsely warning "dead_code"
#[allow(dead_code)]
pub async fn create_test_table(instance: &Instance) -> Result<()> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
let table_name = "demo";
let table_engine: TableEngineRef = instance.sql_handler().table_engine();
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
table_name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(
Schema::with_timestamp_index(column_schemas, 3)
.expect("ts is expected to be timestamp column"),
),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
},
)
.await
.context(CreateTableSnafu { table_name })?;
let schema_provider = instance
.catalog_manager()
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.unwrap();
Ok(())
}