feat: support standalone and distributed insert in frontend (#473)

* feat: support standalone and distributed insert in frontend

* cr
This commit is contained in:
fys
2022-11-13 11:57:23 +08:00
committed by GitHub
parent 2d869e1e43
commit 488eabce4a
6 changed files with 241 additions and 48 deletions

View File

@@ -20,6 +20,7 @@ use crate::partitioning::range::RangePartitionRule;
use crate::table::route::TableRoutes;
use crate::table::DistTable;
#[derive(Clone)]
pub struct FrontendCatalogManager {
backend: KvBackendRef,
table_routes: Arc<TableRoutes>,

View File

@@ -193,6 +193,33 @@ pub enum Error {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find catalog by name: {}", catalog_name))]
CatalogNotFound {
catalog_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find schema, schema info: {}", schema_info))]
SchemaNotFound {
schema_info: String,
backtrace: Backtrace,
},
#[snafu(display("Table occurs error, source: {}", source))]
Table {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Failed to get catalog manager"))]
CatalogManager { backtrace: Backtrace },
#[snafu(display("Failed to get full table name, source: {}", source))]
FullTableName {
#[snafu(backtrace)]
source: sql::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -208,6 +235,7 @@ impl ErrorExt for Error {
| Error::InvalidInsertRequest { .. }
| Error::FindPartitionColumn { .. }
| Error::ColumnValuesNumberMismatch { .. }
| Error::CatalogManager { .. }
| Error::RegionKeysSize { .. } => StatusCode::InvalidArguments,
Error::RuntimeResource { source, .. } => source.status_code(),
@@ -216,6 +244,10 @@ impl ErrorExt for Error {
Error::ParseSql { source } => source.status_code(),
Error::FullTableName { source, .. } => source.status_code(),
Error::Table { source } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::ConvertScalarValue { source, .. } => source.status_code(),
@@ -234,7 +266,9 @@ impl ErrorExt for Error {
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::JoinTask { .. } => StatusCode::Unexpected,
Error::JoinTask { .. }
| Error::SchemaNotFound { .. }
| Error::CatalogNotFound { .. } => StatusCode::Unexpected,
Error::Catalog { source, .. } => source.status_code(),
Error::ParseCatalogEntry { source, .. } => source.status_code(),

View File

@@ -13,6 +13,7 @@ use api::v1::{
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogList, CatalogProviderRef, SchemaProviderRef};
use client::admin::{admin_result_to_output, Admin};
use client::{Client, Database, Select};
use common_error::prelude::BoxedError;
@@ -29,6 +30,7 @@ use servers::query_handler::{
use snafu::prelude::*;
use sql::ast::{ColumnDef, TableConstraint};
use sql::statements::create::{CreateTable, TIME_INDEX};
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::{dialect::GenericDialect, parser::ParserContext};
@@ -37,6 +39,7 @@ use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::frontend::{FrontendOptions, Mode};
use crate::sql::insert_to_request;
use crate::table::route::TableRoutes;
#[async_trait]
@@ -56,13 +59,27 @@ pub trait FrontendInstance:
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
#[derive(Default)]
#[derive(Clone)]
pub struct Instance {
// TODO(hl): In standalone mode, there is only one client.
// But in distribute mode, frontend should fetch datanodes' addresses from metasrv.
client: Client,
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<FrontendCatalogManager>,
// TODO(fys): it should be a trait that corresponds to two implementations:
// Standalone and Distributed, then the code behind it doesn't need to use so
// many match statements.
mode: Mode,
}
impl Default for Instance {
fn default() -> Self {
Self {
client: Client::default(),
catalog_manager: None,
mode: Mode::Standalone,
}
}
}
impl Instance {
@@ -115,6 +132,43 @@ impl Instance {
pub fn admin(&self) -> Admin {
Admin::new("greptime", self.client.clone())
}
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
self.catalog_manager
.as_ref()
.context(error::CatalogManagerSnafu)?
.catalog(catalog_name)
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu { catalog_name })
}
fn get_schema(provider: CatalogProviderRef, schema_name: &str) -> Result<SchemaProviderRef> {
provider
.schema(schema_name)
.context(error::CatalogSnafu)?
.context(error::SchemaNotFoundSnafu {
schema_info: schema_name,
})
}
async fn sql_dist_insert(&self, insert: Box<Insert>) -> Result<usize> {
let (catalog, schema, table) = insert.full_table_name().context(error::ParseSqlSnafu)?;
let catalog_provider = self.get_catalog(&catalog)?;
let schema_provider = Self::get_schema(catalog_provider, &schema)?;
let insert_request = insert_to_request(&schema_provider, *insert)?;
let table = schema_provider
.table(&table)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu { table_name: &table })?;
table
.insert(insert_request)
.await
.context(error::TableSnafu)
}
}
#[async_trait]
@@ -131,6 +185,7 @@ impl Instance {
Self {
client,
catalog_manager: None,
mode: Mode::Standalone,
}
}
}
@@ -156,26 +211,44 @@ impl SqlQueryHandler for Instance {
.database()
.select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into()),
.and_then(|object_result| object_result.try_into())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::Insert(insert) => {
// TODO(dennis): respect schema_name when inserting data
let (_catalog_name, _schema_name, table_name) = insert
.full_table_name()
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "Failed to get table name",
})?;
match self.mode {
Mode::Standalone => {
// TODO(dennis): respect schema_name when inserting data
let (_catalog_name, _schema_name, table_name) = insert
.full_table_name()
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "Failed to get table name",
})?;
let expr = InsertExpr {
table_name,
expr: Some(insert_expr::Expr::Sql(query.to_string())),
options: HashMap::default(),
};
self.database()
.insert(expr)
.await
.and_then(|object_result| object_result.try_into())
let expr = InsertExpr {
table_name,
expr: Some(insert_expr::Expr::Sql(query.to_string())),
options: HashMap::default(),
};
self.database()
.insert(expr)
.await
.and_then(|object_result| object_result.try_into())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Mode::Distributed => {
let affected = self
.sql_dist_insert(insert)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
Ok(Output::AffectedRows(affected))
}
}
}
Statement::CreateTable(create) => {
let expr = create_to_expr(create)
@@ -185,13 +258,17 @@ impl SqlQueryHandler for Instance {
.create(expr)
.await
.and_then(admin_result_to_output)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Statement::ShowDatabases(_) | Statement::ShowTables(_) => self
.database()
.select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into()),
.and_then(|object_result| object_result.try_into())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::CreateDatabase(c) => {
let expr = CreateDatabaseExpr {
@@ -201,6 +278,8 @@ impl SqlQueryHandler for Instance {
.create_database(expr)
.await
.and_then(admin_result_to_output)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Statement::Alter(alter_stmt) => self
.admin()
@@ -210,13 +289,13 @@ impl SqlQueryHandler for Instance {
.context(server_error::ExecuteAlterSnafu { query })?,
)
.await
.and_then(admin_result_to_output),
.and_then(admin_result_to_output)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::ShowCreateTable(_) => {
return server_error::NotSupportedSnafu { feat: query }.fail()
}
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
async fn insert_script(&self, _name: &str, _script: &str) -> server_error::Result<()> {

View File

@@ -2,22 +2,72 @@ use api::v1::InsertExpr;
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::influxdb::InfluxdbRequest;
use servers::{error::ExecuteQuerySnafu, query_handler::InfluxdbLineProtocolHandler};
use snafu::ResultExt;
use servers::{error as server_error, query_handler::InfluxdbLineProtocolHandler};
use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;
use crate::error;
use crate::error::Result;
use crate::frontend::Mode;
use crate::instance::Instance;
#[async_trait]
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
let exprs: Vec<InsertExpr> = request.try_into()?;
self.database()
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu {
query: &request.lines,
})?;
match self.mode {
Mode::Standalone => {
let exprs: Vec<InsertExpr> = request.try_into()?;
self.database()
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
query: &request.lines,
})?;
}
Mode::Distributed => {
self.dist_insert(request.try_into()?)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
}
}
Ok(())
}
}
impl Instance {
pub(crate) async fn dist_insert(&self, inserts: Vec<InsertRequest>) -> Result<usize> {
let mut joins = Vec::with_capacity(inserts.len());
for insert in inserts {
let self_clone = self.clone();
// TODO(fys): need a separate runtime here
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(&insert.catalog_name)?;
let schema = Self::get_schema(catalog, &insert.schema_name)?;
let table = schema
.table(&insert.table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
table_name: &insert.table_name,
})?;
table.insert(insert).await.context(error::TableSnafu)
});
joins.push(join);
}
let mut affected = 0;
for join in joins {
affected += join.await.context(error::JoinTaskSnafu)??;
}
Ok(affected)
}
}

View File

@@ -7,6 +7,7 @@ use servers::query_handler::OpentsdbProtocolHandler;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::frontend::Mode;
use crate::instance::Instance;
#[async_trait]
@@ -14,12 +15,25 @@ impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> {
// TODO(LFC): Insert metrics in batch, then make OpentsdbLineProtocolHandler::exec received multiple data points, when
// metric table and tags can be created upon insertion.
self.insert_opentsdb_metric(data_point)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::PutOpentsdbDataPointSnafu {
data_point: format!("{:?}", data_point),
})?;
match self.mode {
Mode::Standalone => {
self.insert_opentsdb_metric(data_point)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::PutOpentsdbDataPointSnafu {
data_point: format!("{:?}", data_point),
})?;
}
Mode::Distributed => {
self.dist_insert(vec![data_point.as_insert_request()])
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
}
}
Ok(())
}
}

View File

@@ -12,6 +12,7 @@ use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use snafu::{OptionExt, ResultExt};
use crate::frontend::Mode;
use crate::instance::Instance;
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -90,15 +91,29 @@ async fn handle_remote_queries(
#[async_trait]
impl PrometheusProtocolHandler for Instance {
async fn write(&self, request: WriteRequest) -> ServerResult<()> {
let exprs = prometheus::write_request_to_insert_exprs(request)?;
match self.mode {
Mode::Standalone => {
let exprs = prometheus::write_request_to_insert_exprs(request)?;
self.database()
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "failed to write prometheus remote request",
})?;
self.database()
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "failed to write prometheus remote request",
})?;
}
Mode::Distributed => {
let inserts = prometheus::write_request_to_insert_reqs(request)?;
self.dist_insert(inserts)
.await
.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
}
}
Ok(())
}