feat: frontend instance (#238)

* feat: frontend instance

* no need to carry column length in `Column` proto

* add more tests

* rebase develop

* create a new variant with already provisioned RecordBatches in Output

* resolve code review comments

* new frontend instance does not connect datanode grpc

* add more tests

* add more tests

* rebase develop

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-09-13 17:10:22 +08:00
committed by GitHub
parent bdd5bdd917
commit ec99eb0cd0
71 changed files with 2324 additions and 362 deletions

38
src/frontend/Cargo.toml Normal file
View File

@@ -0,0 +1,38 @@
[package]
name = "frontend"
version = "0.1.0"
edition = "2021"
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dependencies]
api = { path = "../api" }
async-stream = "0.3"
async-trait = "0.1"
catalog = { path = "../catalog" }
client = { path = "../client" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
query = { path = "../query" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
serde = "1.0"
servers = { path = "../servers" }
sql = { path = "../sql" }
[dev-dependencies]
datanode = { path = "../datanode" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
futures = "0.3"
tonic = "0.8"
tempdir = "0.3"
tower = "0.4"

81
src/frontend/src/error.rs Normal file
View File

@@ -0,0 +1,81 @@
use std::any::Any;
use common_error::prelude::*;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to connect Datanode at {}, source: {}", addr, source))]
ConnectDatanode {
addr: String,
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Runtime resource error, source: {}", source))]
RuntimeResource {
#[snafu(backtrace)]
source: common_runtime::error::Error,
},
#[snafu(display("Failed to start server, source: {}", source))]
StartServer {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
source: std::net::AddrParseError,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
ParseSql {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
source: api::error::Error,
},
#[snafu(display("Invalid SQL, error: {}", err_msg))]
InvalidSql {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Illegal Frontend state: {}", err_msg))]
IllegalFrontendState {
err_msg: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ConnectDatanode { .. } | Error::ParseAddr { .. } | Error::InvalidSql { .. } => {
StatusCode::InvalidArguments
}
Error::RuntimeResource { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ParseSql { source } => source.status_code(),
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } => StatusCode::Unexpected,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,62 @@
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::instance::Instance;
use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FrontendOptions {
pub http_addr: Option<String>,
pub grpc_addr: Option<String>,
pub mysql_addr: Option<String>,
pub mysql_runtime_size: u32,
}
impl Default for FrontendOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
grpc_addr: Some("0.0.0.0:4001".to_string()),
mysql_addr: Some("0.0.0.0:4002".to_string()),
mysql_runtime_size: 2,
}
}
}
impl FrontendOptions {
// TODO(LFC) Get Datanode address from Meta.
pub(crate) fn datanode_grpc_addr(&self) -> String {
"http://127.0.0.1:3001".to_string()
}
}
pub struct Frontend {
opts: FrontendOptions,
instance: Option<Instance>,
}
impl Frontend {
pub fn new(opts: FrontendOptions) -> Self {
let instance = Instance::new();
Self {
opts,
instance: Some(instance),
}
}
pub async fn start(&mut self) -> Result<()> {
let mut instance = self
.instance
.take()
.context(error::IllegalFrontendStateSnafu {
err_msg: "Frontend instance not initialized",
})?;
instance.start(&self.opts).await?;
let instance = Arc::new(instance);
Services::start(&self.opts, instance).await
}
}

View File

@@ -0,0 +1,550 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
insert_expr, AdminExpr, AdminResult, ColumnDataType, ColumnDef as GrpcColumnDef, CreateExpr,
InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use client::admin::{admin_result_to_output, Admin};
use client::{Client, Database, Select};
use common_error::prelude::BoxedError;
use datatypes::schema::ColumnSchema;
use query::Output;
use servers::error as server_error;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use snafu::prelude::*;
use sql::ast::{ColumnDef, TableConstraint};
use sql::statements::create_table::{CreateTable, TIME_INDEX};
use sql::statements::statement::Statement;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::{dialect::GenericDialect, parser::ParserContext};
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
pub(crate) type InstanceRef = Arc<Instance>;
pub struct Instance {
db: Database,
admin: Admin,
}
impl Instance {
pub(crate) fn new() -> Self {
let client = Client::default();
let db = Database::new("greptime", client.clone());
let admin = Admin::new("greptime", client);
Self { db, admin }
}
pub(crate) async fn start(&mut self, opts: &FrontendOptions) -> Result<()> {
let addr = opts.datanode_grpc_addr();
self.db
.start(addr.clone())
.await
.context(error::ConnectDatanodeSnafu { addr: addr.clone() })?;
self.admin
.start(addr.clone())
.await
.context(error::ConnectDatanodeSnafu { addr })?;
Ok(())
}
}
#[cfg(test)]
impl Instance {
pub fn with_client(client: Client) -> Self {
Self {
db: Database::new("greptime", client.clone()),
admin: Admin::new("greptime", client),
}
}
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
let mut stmt = ParserContext::create_with_dialect(query, &GenericDialect {})
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
if stmt.len() != 1 {
// TODO(LFC): Support executing multiple SQLs,
// which seems to be a major change to our whole server framework?
return server_error::NotSupportedSnafu {
feat: "Only one SQL is allowed to be executed at one time.",
}
.fail();
}
let stmt = stmt.remove(0);
match stmt {
Statement::Query(_) => self
.db
.select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into()),
Statement::Insert(insert) => {
let table_name = insert.table_name();
let expr = InsertExpr {
table_name,
expr: Some(insert_expr::Expr::Sql(query.to_string())),
};
self.db
.insert(expr)
.await
.and_then(|object_result| object_result.try_into())
}
Statement::Create(create) => {
let expr = create_to_expr(create)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.admin
.create(expr)
.await
.and_then(admin_result_to_output)
}
// TODO(LFC): Support other SQL execution,
// update, delete, alter, explain, etc.
_ => return server_error::NotSupportedSnafu { feat: query }.fail(),
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
async fn insert_script(&self, _name: &str, _script: &str) -> server_error::Result<()> {
server_error::NotSupportedSnafu {
feat: "Script execution in Frontend",
}
.fail()
}
async fn execute_script(&self, _script: &str) -> server_error::Result<Output> {
server_error::NotSupportedSnafu {
feat: "Script execution in Frontend",
}
.fail()
}
}
fn create_to_expr(create: CreateTable) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let expr = CreateExpr {
catalog_name,
schema_name,
table_name,
column_defs: columns_to_expr(&create.columns)?,
time_index: find_time_index(&create.constraints)?,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine)]),
..Default::default()
};
Ok(expr)
}
fn find_primary_keys(constraints: &[TableConstraint]) -> Result<Vec<String>> {
let primary_keys = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: _,
columns,
is_primary: true,
} => Some(columns.iter().map(|ident| ident.value.clone())),
_ => None,
})
.flatten()
.collect::<Vec<String>>();
Ok(primary_keys)
}
fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
let time_index = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: Some(name),
columns,
is_primary: false,
} => {
if name.value == TIME_INDEX {
Some(columns.iter().map(|ident| &ident.value))
} else {
None
}
}
_ => None,
})
.flatten()
.collect::<Vec<&String>>();
ensure!(
time_index.len() == 1,
error::InvalidSqlSnafu {
err_msg: "must have one and only one TimeIndex columns",
}
);
Ok(time_index.first().unwrap().to_string())
}
fn columns_to_expr(column_defs: &[ColumnDef]) -> Result<Vec<GrpcColumnDef>> {
let column_schemas = column_defs
.iter()
.map(|c| column_def_to_schema(c).context(error::ParseSqlSnafu))
.collect::<Result<Vec<ColumnSchema>>>()?;
let column_datatypes = column_schemas
.iter()
.map(|c| {
ColumnDataTypeWrapper::try_from(c.data_type.clone())
.map(|w| w.datatype())
.context(error::ColumnDataTypeSnafu)
})
.collect::<Result<Vec<ColumnDataType>>>()?;
Ok(column_schemas
.iter()
.zip(column_datatypes.into_iter())
.map(|(schema, datatype)| GrpcColumnDef {
name: schema.name.clone(),
data_type: datatype as i32,
is_nullable: schema.is_nullable,
})
.collect::<Vec<GrpcColumnDef>>())
}
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> server_error::Result<GrpcObjectResult> {
self.db
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> server_error::Result<AdminResult> {
self.admin
.do_request(expr.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", expr),
})
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::codec::{InsertBatch, SelectResult};
use api::v1::greptime_client::GreptimeClient;
use api::v1::{
admin_expr, admin_result, column, object_expr, object_result, select_expr, Column,
ExprHeader, MutateResult, SelectExpr,
};
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance as DatanodeInstance;
use servers::grpc::GrpcServer;
use tempdir::TempDir;
use tonic::transport::{Endpoint, Server};
use tower::service_fn;
use super::*;
#[tokio::test]
async fn test_execute_sql() {
common_telemetry::init_default_ut_logging();
let datanode_instance = create_datanode_instance().await;
let frontend_instance = create_frontend_instance(datanode_instance).await;
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
_ => unreachable!(),
}
let sql = r#"insert into demo(host, cpu, memory, ts) values
('frontend.host1', 1.1, 100, 1000),
('frontend.host2', null, null, 2000),
('frontend.host3', 3.3, 300, 3000)
"#;
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 3),
_ => unreachable!(),
}
let sql = "select * from demo";
let output = SqlQueryHandler::do_query(&*frontend_instance, sql)
.await
.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
let recordbatches = recordbatches
.to_vec()
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let pretty_print = arrow_print::write(&recordbatches);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+----------------+---------------------+-----+--------+",
"| host | ts | cpu | memory |",
"+----------------+---------------------+-----+--------+",
"| frontend.host1 | 1970-01-01 00:00:01 | 1.1 | 100 |",
"| frontend.host2 | 1970-01-01 00:00:02 | | |",
"| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 |",
"+----------------+---------------------+-----+--------+",
];
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
};
}
#[tokio::test]
async fn test_execute_grpc() {
common_telemetry::init_default_ut_logging();
let datanode_instance = create_datanode_instance().await;
let frontend_instance = create_frontend_instance(datanode_instance).await;
// testing data:
let expected_host_col = Column {
column_name: "host".to_string(),
values: Some(column::Values {
string_values: vec!["fe.host.a", "fe.host.b", "fe.host.c", "fe.host.d"]
.into_iter()
.map(|s| s.to_string())
.collect(),
..Default::default()
}),
datatype: 12, // string
..Default::default()
};
let expected_cpu_col = Column {
column_name: "cpu".to_string(),
values: Some(column::Values {
f64_values: vec![1.0, 3.0, 4.0],
..Default::default()
}),
null_mask: vec![2],
datatype: 10, // float64
..Default::default()
};
let expected_mem_col = Column {
column_name: "memory".to_string(),
values: Some(column::Values {
f64_values: vec![100.0, 200.0, 400.0],
..Default::default()
}),
null_mask: vec![4],
datatype: 10, // float64
..Default::default()
};
let expected_ts_col = Column {
column_name: "ts".to_string(),
values: Some(column::Values {
ts_millis_values: vec![1000, 2000, 3000, 4000],
..Default::default()
}),
datatype: 15, // timestamp
..Default::default()
};
// create
let create_expr = create_expr();
let admin_expr = AdminExpr {
header: Some(ExprHeader::default()),
expr: Some(admin_expr::Expr::Create(create_expr)),
};
let result = GrpcAdminHandler::exec_admin_request(&*frontend_instance, admin_expr)
.await
.unwrap();
assert_matches!(
result.result,
Some(admin_result::Result::Mutate(MutateResult {
success: 1,
failure: 0
}))
);
// insert
let values = vec![InsertBatch {
columns: vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
expected_mem_col.clone(),
expected_ts_col.clone(),
],
row_count: 4,
}
.into()];
let insert_expr = InsertExpr {
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })),
};
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),
expr: Some(object_expr::Expr::Insert(insert_expr)),
};
let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr)
.await
.unwrap();
assert_matches!(
result.result,
Some(object_result::Result::Mutate(MutateResult {
success: 4,
failure: 0
}))
);
// select
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),
expr: Some(object_expr::Expr::Select(SelectExpr {
expr: Some(select_expr::Expr::Sql("select * from demo".to_string())),
})),
};
let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr)
.await
.unwrap();
match result.result {
Some(object_result::Result::Select(select_result)) => {
let select_result: SelectResult = (*select_result.raw_data).try_into().unwrap();
assert_eq!(4, select_result.row_count);
let actual_columns = select_result.columns;
assert_eq!(4, actual_columns.len());
// Respect the order in create table schema
let expected_columns = vec![
expected_host_col,
expected_cpu_col,
expected_mem_col,
expected_ts_col,
];
expected_columns
.iter()
.zip(actual_columns.iter())
.for_each(|(x, y)| assert_eq!(x, y));
}
_ => unreachable!(),
}
}
async fn create_datanode_instance() -> Arc<DatanodeInstance> {
let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap();
let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
..Default::default()
};
let instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap());
instance.start().await.unwrap();
instance
}
async fn create_frontend_instance(datanode_instance: Arc<DatanodeInstance>) -> Arc<Instance> {
let (client, server) = tokio::io::duplex(1024);
// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service =
GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service();
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "http://[::]:50051" is just a placeholder, does not actually connect to it,
// see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
}
}
}))
.await
.unwrap();
let client = Client::with_client(GreptimeClient::new(channel));
Arc::new(Instance::with_client(client))
}
fn create_expr() -> CreateExpr {
let column_defs = vec![
GrpcColumnDef {
name: "host".to_string(),
data_type: 12, // string
is_nullable: false,
},
GrpcColumnDef {
name: "cpu".to_string(),
data_type: 10, // float64
is_nullable: true,
},
GrpcColumnDef {
name: "memory".to_string(),
data_type: 10, // float64
is_nullable: true,
},
GrpcColumnDef {
name: "ts".to_string(),
data_type: 15, // timestamp
is_nullable: true,
},
];
CreateExpr {
table_name: "demo".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
..Default::default()
}
}
}

6
src/frontend/src/lib.rs Normal file
View File

@@ -0,0 +1,6 @@
#![feature(assert_matches)]
pub mod error;
pub mod frontend;
pub mod instance;
mod server;

View File

@@ -0,0 +1,80 @@
use std::net::SocketAddr;
use std::sync::Arc;
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::mysql::server::MysqlServer;
use servers::server::Server;
use snafu::ResultExt;
use tokio::try_join;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::instance::InstanceRef;
pub(crate) struct Services;
impl Services {
pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> {
let http_server_and_addr = if let Some(http_addr) = &opts.http_addr {
let http_addr = parse_addr(http_addr)?;
let http_server = HttpServer::new(instance.clone());
Some((Box::new(http_server) as _, http_addr))
} else {
None
};
let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr {
let grpc_addr = parse_addr(grpc_addr)?;
let grpc_server = GrpcServer::new(instance.clone(), instance.clone());
Some((Box::new(grpc_server) as _, grpc_addr))
} else {
None
};
let mysql_server_and_addr = if let Some(mysql_addr) = &opts.mysql_addr {
let mysql_addr = parse_addr(mysql_addr)?;
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let mysql_server = MysqlServer::create_server(instance.clone(), mysql_io_runtime);
Some((mysql_server, mysql_addr))
} else {
None
};
try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr)
)
.context(error::StartServerSnafu)?;
Ok(())
}
}
fn parse_addr(addr: &str) -> Result<SocketAddr> {
addr.parse().context(error::ParseAddrSnafu { addr })
}
async fn start_server(
server_and_addr: Option<(Box<dyn Server>, SocketAddr)>,
) -> servers::error::Result<Option<SocketAddr>> {
if let Some((mut server, addr)) = server_and_addr {
server.start(addr).await.map(Some)
} else {
Ok(None)
}
}