feat: row write protocol (#2189)

* feat: datanode's row insrter

* refactor: ExprFactory

* feat: row inserter in standalon mode

* chore: minor refactor

* feat: influxdb line protocol's row protocol

* chore: minor refactor

* improve: avoid to use too many string

* no longer async

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: do not check empty data

* chore: by review comment

* chore: by comment

* chore: by review comment

* chore: by review comment

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
JeremyHi
2023-08-19 21:08:44 +08:00
committed by GitHub
parent 272f649b22
commit 033b650d0d
23 changed files with 988 additions and 145 deletions

View File

@@ -23,6 +23,7 @@ use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::VectorRef;
use datatypes::schema::SchemaRef;
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use table::metadata::TableId;
use table::requests::InsertRequest;
@@ -47,12 +48,11 @@ pub fn build_create_expr_from_insertion(
columns: &[Column],
engine: &str,
) -> Result<CreateTableExpr> {
let table_name = TableReference::full(catalog_name, schema_name, table_name);
let column_exprs = ColumnExpr::from_columns(columns);
util::build_create_table_expr(
catalog_name,
schema_name,
table_id,
table_name,
&table_name,
column_exprs,
engine,
"Created on insertion",

View File

@@ -16,7 +16,7 @@ mod alter;
pub mod delete;
pub mod error;
pub mod insert;
mod util;
pub mod util;
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
pub use insert::{build_create_expr_from_insertion, find_new_columns};

View File

@@ -19,39 +19,45 @@ use api::v1::{
};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use table::engine::TableReference;
use table::metadata::TableId;
use crate::error::{
DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, MissingTimestampColumnSnafu, Result,
};
pub struct ColumnExpr {
pub column_name: String,
pub struct ColumnExpr<'a> {
pub column_name: &'a str,
pub datatype: i32,
pub semantic_type: i32,
}
impl ColumnExpr {
impl<'a> ColumnExpr<'a> {
#[inline]
pub fn from_columns(columns: &[Column]) -> Vec<Self> {
pub fn from_columns(columns: &'a [Column]) -> Vec<Self> {
columns.iter().map(Self::from).collect()
}
#[inline]
pub fn from_column_schemas(schemas: &'a [ColumnSchema]) -> Vec<Self> {
schemas.iter().map(Self::from).collect()
}
}
impl From<&Column> for ColumnExpr {
fn from(column: &Column) -> Self {
impl<'a> From<&'a Column> for ColumnExpr<'a> {
fn from(column: &'a Column) -> Self {
Self {
column_name: column.column_name.clone(),
column_name: &column.column_name,
datatype: column.datatype,
semantic_type: column.semantic_type,
}
}
}
impl From<&ColumnSchema> for ColumnExpr {
fn from(schema: &ColumnSchema) -> Self {
impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> {
fn from(schema: &'a ColumnSchema) -> Self {
Self {
column_name: schema.column_name.clone(),
column_name: &schema.column_name,
datatype: schema.datatype,
semantic_type: schema.semantic_type,
}
@@ -59,10 +65,8 @@ impl From<&ColumnSchema> for ColumnExpr {
}
pub fn build_create_table_expr(
catalog_name: &str,
schema_name: &str,
table_id: Option<TableId>,
table_name: &str,
table_name: &TableReference<'_>,
column_exprs: Vec<ColumnExpr>,
engine: &str,
desc: &str,
@@ -77,8 +81,8 @@ pub fn build_create_table_expr(
let mut distinct_names = HashSet::with_capacity(column_exprs.len());
for ColumnExpr { column_name, .. } in &column_exprs {
ensure!(
distinct_names.insert(column_name),
DuplicatedColumnNameSnafu { name: column_name }
distinct_names.insert(*column_name),
DuplicatedColumnNameSnafu { name: *column_name }
);
}
@@ -94,16 +98,16 @@ pub fn build_create_table_expr(
{
let mut is_nullable = true;
match semantic_type {
v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.clone()),
v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.to_string()),
v if v == SemanticType::Timestamp as i32 => {
ensure!(
time_index.is_none(),
DuplicatedTimestampColumnSnafu {
exists: time_index.unwrap(),
duplicated: &column_name,
duplicated: column_name,
}
);
time_index = Some(column_name.clone());
time_index = Some(column_name.to_string());
// Timestamp column must not be null.
is_nullable = false;
}
@@ -111,7 +115,7 @@ pub fn build_create_table_expr(
}
let column_def = ColumnDef {
name: column_name,
name: column_name.to_string(),
datatype,
is_nullable,
default_constraint: vec![],
@@ -120,13 +124,13 @@ pub fn build_create_table_expr(
}
let time_index = time_index.context(MissingTimestampColumnSnafu {
msg: format!("table is {}", table_name),
msg: format!("table is {}", table_name.table),
})?;
let expr = CreateTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
catalog_name: table_name.catalog.to_string(),
schema_name: table_name.schema.to_string(),
table_name: table_name.table.to_string(),
desc: desc.to_string(),
column_defs,
time_index,
@@ -148,11 +152,11 @@ pub fn extract_new_columns(
) -> Result<Option<AddColumns>> {
let columns_to_add = column_exprs
.into_iter()
.filter(|expr| schema.column_schema_by_name(&expr.column_name).is_none())
.filter(|expr| schema.column_schema_by_name(expr.column_name).is_none())
.map(|expr| {
let is_key = expr.semantic_type == SemanticType::Tag as i32;
let column_def = Some(ColumnDef {
name: expr.column_name,
name: expr.column_name.to_string(),
datatype: expr.datatype,
is_nullable: true,
default_constraint: vec![],
@@ -170,7 +174,7 @@ pub fn extract_new_columns(
} else {
let mut distinct_names = HashSet::with_capacity(columns_to_add.len());
for add_column in &columns_to_add {
let name = &add_column.column_def.as_ref().unwrap().name;
let name = add_column.column_def.as_ref().unwrap().name.as_str();
ensure!(
distinct_names.insert(name),
DuplicatedColumnNameSnafu { name }

View File

@@ -478,6 +478,31 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Invalid insert row len, table: {}, expected: {}, actual: {}",
table_name,
expected,
actual
))]
InvalidInsertRowLen {
table_name: String,
expected: usize,
actual: usize,
location: Location,
},
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
location: Location,
source: api::error::Error,
},
#[snafu(display("Failed to create vector, source: {}", source))]
CreateVector {
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Unexpected, violated: {}", violated))]
Unexpected {
violated: String,
@@ -557,6 +582,7 @@ impl ErrorExt for Error {
TableEngineNotFound { source, .. } | EngineProcedureNotFound { source, .. } => {
source.status_code()
}
CreateVector { source, .. } => source.status_code(),
TableNotFound { .. } => StatusCode::TableNotFound,
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
@@ -583,7 +609,9 @@ impl ErrorExt for Error {
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. }
| MissingWalDirConfig { .. }
| PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
| PrepareImmutableTable { .. }
| InvalidInsertRowLen { .. }
| ColumnDataType { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => {
StatusCode::Unexpected

View File

@@ -67,6 +67,7 @@ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
use crate::heartbeat::HeartbeatTask;
use crate::row_inserter::RowInserter;
use crate::sql::{SqlHandler, SqlRequest};
use crate::store;
@@ -81,6 +82,7 @@ pub struct Instance {
pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
row_inserter: RowInserter,
procedure_manager: ProcedureManagerRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}
@@ -279,10 +281,14 @@ impl Instance {
plugins,
);
let query_engine = factory.query_engine();
let procedure_manager =
create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store)
.await?;
let sql_handler = SqlHandler::new(
engine_manager.clone(),
catalog_manager.clone(),
procedure_manager.clone(),
);
// Register all procedures.
// Register procedures of the mito engine.
mito_engine.register_procedure_loaders(&*procedure_manager);
@@ -295,23 +301,22 @@ impl Instance {
mito_engine.clone(),
&*procedure_manager,
);
let row_inserter = RowInserter::new(catalog_manager.clone());
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await;
let instance = Arc::new(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
engine_manager.clone(),
catalog_manager.clone(),
procedure_manager.clone(),
),
sql_handler,
catalog_manager: catalog_manager.clone(),
table_id_provider,
row_inserter,
procedure_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await,
greptimedb_telemetry_task,
});
let heartbeat_task = Instance::build_heartbeat_task(

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequests, InsertRequests};
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequests, InsertRequests, RowInsertRequests};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_grpc_expr::insert::to_table_insert_request;
@@ -129,7 +129,7 @@ impl Instance {
pub async fn handle_inserts(
&self,
requests: InsertRequests,
ctx: &QueryContextRef,
ctx: QueryContextRef,
) -> Result<Output> {
let results = future::try_join_all(requests.inserts.into_iter().map(|insert| {
let catalog_manager = self.catalog_manager.clone();
@@ -164,6 +164,14 @@ impl Instance {
Ok(Output::AffectedRows(affected_rows))
}
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.row_inserter.handle_inserts(requests, ctx).await
}
async fn handle_deletes(
&self,
request: DeleteRequests,
@@ -221,7 +229,7 @@ impl GrpcQueryHandler for Instance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_inserts(requests, &ctx).await,
Request::Inserts(requests) => self.handle_inserts(requests, ctx).await,
Request::Deletes(request) => self.handle_deletes(request, ctx).await,
Request::Query(query_request) => {
let query = query_request
@@ -232,8 +240,9 @@ impl GrpcQueryHandler for Instance {
self.handle_query(query, ctx).await
}
Request::Ddl(request) => self.handle_ddl(request, ctx).await,
Request::RowInserts(_) | Request::RowDeletes(_) => UnsupportedGrpcRequestSnafu {
kind: "row insert/delete",
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx).await,
Request::RowDeletes(_) => UnsupportedGrpcRequestSnafu {
kind: "row deletes",
}
.fail(),
}

View File

@@ -24,6 +24,7 @@ pub mod metrics;
#[cfg(any(test, feature = "testing"))]
mod mock;
pub mod region_server;
mod row_inserter;
pub mod server;
pub mod sql;
mod store;

View File

@@ -0,0 +1,143 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{RowInsertRequest, RowInsertRequests};
use catalog::CatalogManagerRef;
use common_query::Output;
use datatypes::data_type::{ConcreteDataType, DataType};
use futures_util::future;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::InsertRequest;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, InsertSnafu, InvalidInsertRowLenSnafu,
JoinTaskSnafu, Result, TableNotFoundSnafu,
};
pub struct RowInserter {
catalog_manager: CatalogManagerRef,
}
impl RowInserter {
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self { catalog_manager }
}
pub async fn handle_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let insert_tasks = requests.inserts.into_iter().map(|insert| {
let catalog_manager = self.catalog_manager.clone();
let catalog_name = ctx.current_catalog().to_owned();
let schema_name = ctx.current_schema().to_owned();
let table_name = insert.table_name.clone();
let insert_task = async move {
let Some(request) =
convert_to_table_insert_request(&catalog_name, &schema_name, insert)?
else {
// empty data
return Ok(0usize);
};
let table = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
})?;
table.insert(request).await.with_context(|_| InsertSnafu {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
})
};
common_runtime::spawn_write(insert_task)
});
let results = future::try_join_all(insert_tasks)
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<usize>>()?;
Ok(Output::AffectedRows(affected_rows))
}
}
fn convert_to_table_insert_request(
catalog_name: &str,
schema_name: &str,
request: RowInsertRequest,
) -> Result<Option<InsertRequest>> {
let table_name = request.table_name;
let region_number = request.region_number;
let Some(rows) = request.rows else {
return Ok(None);
};
let schema = rows.schema;
let rows = rows.rows;
let num_columns = schema.len();
let num_rows = rows.len();
if num_rows == 0 || num_columns == 0 {
return Ok(None);
}
let mut columns_values = Vec::with_capacity(num_columns);
for column_schema in schema {
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(column_schema.datatype)
.context(ColumnDataTypeSnafu)?
.into();
let mutable_vector = datatype.create_mutable_vector(num_rows);
columns_values.push((column_schema.column_name, mutable_vector));
}
for row in rows {
ensure!(
row.values.len() == num_columns,
InvalidInsertRowLenSnafu {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
expected: num_columns,
actual: row.values.len(),
}
);
for ((_, mutable_vector), value) in columns_values.iter_mut().zip(row.values.iter()) {
mutable_vector
.try_push_value_ref(helper::pb_value_to_value_ref(value))
.context(CreateVectorSnafu)?;
}
}
let columns_values = columns_values
.into_iter()
.map(|(k, mut v)| (k, v.to_vector()))
.collect();
let insert_request = InsertRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name,
columns_values,
region_number,
};
Ok(Some(insert_request))
}

View File

@@ -597,6 +597,9 @@ pub enum Error {
source: auth::error::Error,
location: Location,
},
#[snafu(display("Empty data: {}", msg))]
EmptyData { msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -618,7 +621,8 @@ impl ErrorExt for Error {
| Error::PrepareImmutableTable { .. }
| Error::BuildCsvConfig { .. }
| Error::ProjectSchema { .. }
| Error::UnsupportedFormat { .. } => StatusCode::InvalidArguments,
| Error::UnsupportedFormat { .. }
| Error::EmptyData { .. } => StatusCode::InvalidArguments,
Error::NotSupported { .. } => StatusCode::Unsupported,

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
@@ -22,6 +21,7 @@ use api::v1::{
DropColumns, RenameTable,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::schema::ColumnSchema;
use file_table_engine::table::immutable::ImmutableFileTableOptions;
@@ -33,49 +33,51 @@ use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def};
use sql::util::to_lowercase_options_map;
use table::engine::TableReference;
use table::requests::{TableOptions, IMMUTABLE_TABLE_META_KEY};
use crate::error::{
self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu,
ConvertColumnDefaultConstraintSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu,
InvalidSqlSnafu, ParseSqlSnafu, Result,
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu, NotSupportedSnafu,
ParseSqlSnafu, PrepareImmutableTableSnafu, Result, UnrecognizedTableOptionSnafu,
};
pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
#[derive(Debug, Copy, Clone)]
pub struct CreateExprFactory;
#[async_trait::async_trait]
pub trait CreateExprFactory {
async fn create_expr_by_columns(
impl CreateExprFactory {
pub fn create_table_expr_by_columns(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
columns: &[Column],
engine: &str,
) -> crate::error::Result<CreateTableExpr>;
}
#[derive(Debug)]
pub struct DefaultCreateExprFactory;
#[async_trait::async_trait]
impl CreateExprFactory for DefaultCreateExprFactory {
async fn create_expr_by_columns(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
table_name: &TableReference<'_>,
columns: &[Column],
engine: &str,
) -> Result<CreateTableExpr> {
let table_id = None;
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
catalog_name,
schema_name,
table_id,
let column_exprs = ColumnExpr::from_columns(columns);
let create_expr = common_grpc_expr::util::build_create_table_expr(
None,
table_name,
columns,
column_exprs,
engine,
"Created on insertion",
)
.context(BuildCreateExprOnInsertionSnafu)?;
Ok(create_expr)
}
pub fn create_table_expr_by_column_schemas(
&self,
table_name: &TableReference<'_>,
column_schemas: &[api::v1::ColumnSchema],
engine: &str,
) -> Result<CreateTableExpr> {
let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
let create_expr = common_grpc_expr::util::build_create_table_expr(
None,
table_name,
column_exprs,
engine,
"Created on insertion",
)
.context(BuildCreateExprOnInsertionSnafu)?;
@@ -90,18 +92,18 @@ pub(crate) async fn create_external_expr(
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
.context(ExternalSnafu)?;
let mut options = create.options;
let (files, schema) = prepare_immutable_file_table_files_and_schema(&options, &create.columns)
.await
.context(error::PrepareImmutableTableSnafu)?;
.context(PrepareImmutableTableSnafu)?;
let meta = ImmutableFileTableOptions { files };
let _ = options.insert(
IMMUTABLE_TABLE_META_KEY.to_string(),
serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?,
serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
);
let expr = CreateTableExpr {
@@ -126,12 +128,12 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
.context(ExternalSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let table_options = HashMap::from(
&TableOptions::try_from(&to_lowercase_options_map(&create.options))
.context(error::UnrecognizedTableOptionSnafu)?,
.context(UnrecognizedTableOptionSnafu)?,
);
let expr = CreateTableExpr {
catalog_name,
@@ -201,7 +203,7 @@ fn find_primary_keys(
Ok(primary_keys)
}
pub fn find_time_index(constraints: &[TableConstraint]) -> crate::error::Result<String> {
pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
let time_index = constraints
.iter()
.filter_map(|constraint| match constraint {
@@ -229,10 +231,7 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> crate::error::Result<
Ok(time_index.first().unwrap().to_string())
}
fn columns_to_expr(
column_defs: &[ColumnDef],
time_index: &str,
) -> crate::error::Result<Vec<api::v1::ColumnDef>> {
fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result<Vec<api::v1::ColumnDef>> {
let column_schemas = column_defs
.iter()
.map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu))
@@ -286,7 +285,7 @@ pub(crate) fn to_alter_expr(
let kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(_) => {
return error::NotSupportedSnafu {
return NotSupportedSnafu {
feat: "ADD CONSTRAINT",
}
.fail();

View File

@@ -29,7 +29,9 @@ use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::meta::Role;
use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests};
use api::v1::{
AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests, RowInsertRequests,
};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::remote::CachedMetaKvBackend;
@@ -76,6 +78,7 @@ use sql::parser::ParserContext;
use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use table::engine::TableReference;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
@@ -83,12 +86,13 @@ use crate::error::{
InvalidInsertRequestSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu,
PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::expr_factory::CreateExprFactory;
use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metrics;
use crate::row_inserter::RowInserter;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
use crate::statement::StatementExecutor;
@@ -120,16 +124,13 @@ pub struct Instance {
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
create_expr_factory: CreateExprFactoryRef,
create_expr_factory: CreateExprFactory,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
plugins: Arc<Plugins>,
servers: Arc<ServerHandlers>,
heartbeat_task: Option<HeartbeatTask>,
row_inserter: Arc<RowInserter>,
}
impl Instance {
@@ -209,16 +210,26 @@ impl Instance {
common_telemetry::init_node_id(opts.node_id.clone());
let create_expr_factory = CreateExprFactory;
let row_inserter = Arc::new(RowInserter::new(
MITO_ENGINE.to_string(),
catalog_manager.clone(),
create_expr_factory,
dist_instance.clone(),
));
Ok(Instance {
catalog_manager,
script_executor,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
create_expr_factory,
statement_executor,
query_engine,
grpc_query_handler: dist_instance,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
heartbeat_task,
row_inserter,
})
}
@@ -271,16 +282,27 @@ impl Instance {
dn_instance.clone(),
));
let create_expr_factory = CreateExprFactory;
let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone());
let row_inserter = Arc::new(RowInserter::new(
MITO_ENGINE.to_string(),
catalog_manager.clone(),
create_expr_factory,
grpc_query_handler.clone(),
));
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
create_expr_factory,
statement_executor,
query_engine,
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
grpc_query_handler,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
row_inserter,
})
}
@@ -295,6 +317,15 @@ impl Instance {
&self.catalog_manager
}
// Handle batch inserts with row-format
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.row_inserter.handle_inserts(requests, ctx).await
}
/// Handle batch inserts
pub async fn handle_inserts(
&self,
@@ -379,10 +410,10 @@ impl Instance {
let schema_name = ctx.current_schema();
// Create table automatically, build schema from data.
let create_expr = self
.create_expr_factory
.create_expr_by_columns(catalog_name, schema_name, table_name, columns, engine)
.await?;
let table_name = TableReference::full(catalog_name, schema_name, table_name);
let create_expr =
self.create_expr_factory
.create_table_expr_by_columns(&table_name, columns, engine)?;
info!(
"Try to create table: {} automatically with request: {:?}",

View File

@@ -666,7 +666,7 @@ impl GrpcQueryHandler for DistInstance {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::RowInserts(_) | Request::RowDeletes(_) => NotSupportedSnafu {
feat: "row insert/delete",
feat: "row inserts/deletes",
}
.fail(),
Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await,

View File

@@ -44,9 +44,10 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(_) | Request::RowDeletes(_) => {
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Request::RowDeletes(_) => {
return NotSupportedSnafu {
feat: "row insert/delete",
feat: "row deletes",
}
.fail();
}

View File

@@ -27,7 +27,7 @@ use crate::instance::Instance;
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(
&self,
request: &InfluxdbRequest,
request: InfluxdbRequest,
ctx: QueryContextRef,
) -> servers::error::Result<()> {
self.plugins

View File

@@ -22,6 +22,7 @@ pub mod frontend;
pub mod heartbeat;
pub mod instance;
pub(crate) mod metrics;
mod row_inserter;
mod script;
mod server;
pub mod service_config;

View File

@@ -0,0 +1,172 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{AlterExpr, ColumnSchema, DdlRequest, Row, RowInsertRequest, RowInsertRequests};
use catalog::CatalogManagerRef;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
use common_telemetry::info;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::TableReference;
use table::TableRef;
use crate::error::{CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, Result};
use crate::expr_factory::CreateExprFactory;
pub struct RowInserter {
engine_name: String,
catalog_manager: CatalogManagerRef,
create_expr_factory: CreateExprFactory,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
}
impl RowInserter {
pub fn new(
engine_name: String,
catalog_manager: CatalogManagerRef,
create_expr_factory: CreateExprFactory,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
) -> Self {
Self {
engine_name,
catalog_manager,
create_expr_factory,
grpc_query_handler,
}
}
pub async fn handle_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.create_or_alter_tables_on_demand(&requests, ctx.clone())
.await?;
let query = Request::RowInserts(requests);
self.grpc_query_handler.do_query(query, ctx).await
}
// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: QueryContextRef,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// TODO(jeremy): create and alter in batch?
for req in &requests.inserts {
let table_name = &req.table_name;
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?;
match table {
Some(table) => {
self.alter_table_on_demand(catalog_name, schema_name, table, req, ctx.clone())
.await?
}
None => {
let table_name = TableReference::full(catalog_name, schema_name, table_name);
self.create_table(&table_name, req, ctx.clone()).await?
}
}
}
Ok(())
}
async fn create_table(
&self,
table_name: &TableReference<'_>,
req: &RowInsertRequest,
ctx: QueryContextRef,
) -> Result<()> {
let (column_schemas, _) = extract_schema_and_rows(req)?;
let create_table_expr = self
.create_expr_factory
.create_table_expr_by_column_schemas(table_name, column_schemas, &self.engine_name)?;
let req = Request::Ddl(DdlRequest {
expr: Some(Expr::CreateTable(create_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx).await?;
Ok(())
}
async fn alter_table_on_demand(
&self,
catalog_name: &str,
schema_name: &str,
table: TableRef,
req: &RowInsertRequest,
ctx: QueryContextRef,
) -> Result<()> {
let (column_schemas, _) = extract_schema_and_rows(req)?;
let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
let add_columns = extract_new_columns(&table.schema(), column_exprs)
.context(FindNewColumnsOnInsertionSnafu)?;
let Some(add_columns) = add_columns else {
return Ok(());
};
let table_name = table.table_info().name.clone();
info!(
"Adding new columns: {:?} to table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
let alter_table_expr = AlterExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name,
kind: Some(Kind::AddColumns(add_columns)),
..Default::default()
};
let req = Request::Ddl(DdlRequest {
expr: Some(Expr::Alter(alter_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx).await?;
Ok(())
}
}
fn extract_schema_and_rows(req: &RowInsertRequest) -> Result<(&[ColumnSchema], &[Row])> {
let rows = req.rows.as_ref().with_context(|| EmptyDataSnafu {
msg: format!("insert to table: {:?}", &req.table_name),
})?;
let schema = &rows.schema;
let rows = &rows.rows;
ensure!(
!rows.is_empty(),
EmptyDataSnafu {
msg: format!("insert to table: {:?}", &req.table_name),
}
);
Ok((schema, rows))
}

View File

@@ -391,7 +391,7 @@ struct Get {
}
impl TryFrom<RangeRequest> for Get {
type Error = error::Error;
type Error = Error;
fn try_from(req: RangeRequest) -> Result<Self> {
let RangeRequest {
@@ -428,7 +428,7 @@ struct Put {
}
impl TryFrom<PutRequest> for Put {
type Error = error::Error;
type Error = Error;
fn try_from(req: PutRequest) -> Result<Self> {
let PutRequest {
@@ -456,7 +456,7 @@ struct BatchGet {
}
impl TryFrom<BatchGetRequest> for BatchGet {
type Error = error::Error;
type Error = Error;
fn try_from(req: BatchGetRequest) -> Result<Self> {
let BatchGetRequest { keys } = req;
@@ -476,7 +476,7 @@ struct BatchPut {
}
impl TryFrom<BatchPutRequest> for BatchPut {
type Error = error::Error;
type Error = Error;
fn try_from(req: BatchPutRequest) -> Result<Self> {
let BatchPutRequest { kvs, prev_kv } = req;
@@ -499,7 +499,7 @@ struct BatchDelete {
}
impl TryFrom<BatchDeleteRequest> for BatchDelete {
type Error = error::Error;
type Error = Error;
fn try_from(req: BatchDeleteRequest) -> Result<Self> {
let BatchDeleteRequest { keys, prev_kv } = req;
@@ -524,7 +524,7 @@ struct CompareAndPut {
}
impl TryFrom<CompareAndPutRequest> for CompareAndPut {
type Error = error::Error;
type Error = Error;
fn try_from(req: CompareAndPutRequest) -> Result<Self> {
let CompareAndPutRequest { key, expect, value } = req;
@@ -544,7 +544,7 @@ struct Delete {
}
impl TryFrom<DeleteRangeRequest> for Delete {
type Error = error::Error;
type Error = Error;
fn try_from(req: DeleteRangeRequest) -> Result<Self> {
let DeleteRangeRequest {
@@ -577,7 +577,7 @@ struct MoveValue {
}
impl TryFrom<MoveValueRequest> for MoveValue {
type Error = error::Error;
type Error = Error;
fn try_from(req: MoveValueRequest) -> Result<Self> {
let MoveValueRequest { from_key, to_key } = req;

View File

@@ -328,6 +328,21 @@ pub enum Error {
actual: opensrv_mysql::ColumnType,
location: Location,
},
#[snafu(display(
"Column: {}, {} incompatible, expected: {}, actual: {}",
column_name,
datatype,
expected,
actual
))]
IncompatibleSchema {
column_name: String,
datatype: String,
expected: i32,
actual: i32,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -371,7 +386,8 @@ impl ErrorExt for Error {
| InvalidPrepareStatement { .. }
| DataFrame { .. }
| PreparedStmtTypeMismatch { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments,
| TimePrecision { .. }
| IncompatibleSchema { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. }
| PromSeriesWrite { source, .. }

View File

@@ -97,7 +97,7 @@ pub async fn influxdb_write(
let request = InfluxdbRequest { precision, lines };
handler.exec(&request, ctx).await?;
handler.exec(request, ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}

View File

@@ -14,14 +14,22 @@
use std::collections::HashMap;
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, InsertRequest as GrpcInsertRequest, InsertRequests, Row,
RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
};
use common_grpc::writer;
use common_grpc::writer::{LinesWriter, Precision};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::{OptionExt, ResultExt};
use influxdb_line_protocol::{parse_lines, FieldSet, FieldValue, TagSet};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu};
use crate::error::{
Error, IncompatibleSchemaSnafu, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu,
TimePrecisionSnafu,
};
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
@@ -34,10 +42,10 @@ pub struct InfluxdbRequest {
type TableName = String;
impl TryFrom<&InfluxdbRequest> for InsertRequests {
impl TryFrom<InfluxdbRequest> for InsertRequests {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
@@ -92,24 +100,14 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests {
}
if let Some(timestamp) = line.timestamp {
let precision = unwarp_or_default_precision(value.precision);
let precision = unwrap_or_default_precision(value.precision);
writer
.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision))
.context(InfluxdbLinesWriteSnafu)?;
} else {
let precision = unwarp_or_default_precision(value.precision);
let precision = unwrap_or_default_precision(value.precision);
let timestamp = Timestamp::current_millis();
let unit = match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
};
let unit = get_time_unit(precision)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
@@ -141,7 +139,228 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests {
}
}
fn unwarp_or_default_precision(precision: Option<Precision>) -> Precision {
impl TryFrom<InfluxdbRequest> for RowInsertRequests {
type Error = Error;
fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
.context(InfluxdbLineProtocolSnafu)?;
struct TableData<'a> {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
column_indexes: HashMap<&'a str, usize>,
}
let mut table_data_map = HashMap::new();
for line in &lines {
let table_name = line.series.measurement.as_str();
let tags = &line.series.tag_set;
let fields = &line.field_set;
let ts = line.timestamp;
// tags.len + fields.len + timestamp(+1)
let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
let TableData {
schema,
rows,
column_indexes,
} = table_data_map
.entry(table_name)
.or_insert_with(|| TableData {
schema: Vec::with_capacity(num_columns),
rows: Vec::new(),
column_indexes: HashMap::with_capacity(num_columns),
});
let mut one_row = vec![Value { value_data: None }; schema.len()];
// tags
parse_tags(tags, column_indexes, schema, &mut one_row)?;
// fields
parse_fields(fields, column_indexes, schema, &mut one_row)?;
// timestamp
parse_ts(ts, value.precision, column_indexes, schema, &mut one_row)?;
rows.push(Row { values: one_row });
}
let inserts = table_data_map
.into_iter()
.map(
|(
table_name,
TableData {
schema, mut rows, ..
},
)| {
let num_columns = schema.len();
for row in rows.iter_mut() {
if num_columns > row.values.len() {
row.values.resize(num_columns, Value { value_data: None });
}
}
RowInsertRequest {
table_name: table_name.to_string(),
rows: Some(Rows { schema, rows }),
..Default::default()
}
},
)
.collect::<Vec<_>>();
Ok(RowInsertRequests { inserts })
}
}
fn parse_tags<'a>(
tags: &'a Option<TagSet>,
column_indexes: &mut HashMap<&'a str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
let Some(tags) = tags else {
return Ok(());
};
for (k, v) in tags {
let index = column_indexes.entry(k.as_str()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: k.to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
});
one_row.push(to_value(ValueData::StringValue(v.to_string())));
} else {
check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?;
one_row[*index].value_data = Some(ValueData::StringValue(v.to_string()));
}
}
Ok(())
}
fn parse_fields<'a>(
fields: &'a FieldSet,
column_indexes: &mut HashMap<&'a str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
for (k, v) in fields {
let index = column_indexes.entry(k.as_str()).or_insert(schema.len());
let (datatype, value) = match v {
FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
FieldValue::String(v) => (
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
),
FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
};
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: k.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
});
one_row.push(to_value(value));
} else {
check_schema(datatype, SemanticType::Field, &schema[*index])?;
one_row[*index].value_data = Some(value);
}
}
Ok(())
}
fn parse_ts(
ts: Option<i64>,
precision: Option<Precision>,
column_indexes: &mut HashMap<&str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
let precision = unwrap_or_default_precision(precision);
let ts = match ts {
Some(timestamp) => writer::to_ms_ts(precision, timestamp),
None => {
let timestamp = Timestamp::current_millis();
let unit = get_time_unit(precision)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
name: precision.to_string(),
})?;
writer::to_ms_ts(precision, timestamp.into())
}
};
let column_name = INFLUXDB_TIMESTAMP_COLUMN_NAME;
let index = column_indexes.entry(column_name).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: column_name.to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
});
one_row.push(to_value(ValueData::TsMillisecondValue(ts)))
} else {
check_schema(
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
&schema[*index],
)?;
one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts));
}
Ok(())
}
#[inline]
fn check_schema(
datatype: ColumnDataType,
semantic_type: SemanticType,
schema: &ColumnSchema,
) -> Result<(), Error> {
ensure!(
schema.datatype == datatype as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "datatype",
expected: schema.datatype,
actual: datatype as i32,
}
);
ensure!(
schema.semantic_type == semantic_type as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "semantic_type",
expected: schema.semantic_type,
actual: semantic_type as i32,
}
);
Ok(())
}
// TODO(jeremy): impl From<ValueData> for Value
#[inline]
fn to_value(value: ValueData) -> Value {
Value {
value_data: Some(value),
}
}
#[inline]
fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
if let Some(val) = precision {
val
} else {
@@ -149,6 +368,21 @@ fn unwarp_or_default_precision(precision: Option<Precision>) -> Precision {
}
}
#[inline]
fn get_time_unit(precision: Precision) -> Result<TimeUnit, Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
@@ -166,7 +400,7 @@ monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
let influxdb_req = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
@@ -306,4 +540,199 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
assert_eq!(b, bitvec.get(idx).unwrap())
}
}
#[test]
fn test_convert_influxdb_lines_to_rows() {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
assert_eq!(2, requests.inserts.len());
for request in requests.inserts {
match &request.table_name[..] {
"monitor1" => assert_monitor1_rows(&request.rows),
"monitor2" => assert_monitor2_rows(&request.rows),
_ => panic!(),
}
}
}
fn assert_monitor1_rows(rows: &Option<Rows>) {
let rows = rows.as_ref().unwrap();
let schema = &rows.schema;
let rows = &rows.rows;
assert_eq!(4, schema.len());
assert_eq!(2, rows.len());
for (i, column_schema) in schema.iter().enumerate() {
match &column_schema.column_name[..] {
"host" => {
assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref().unwrap();
match j {
0 => assert_eq!("host1", extract_string_value(v)),
1 => assert_eq!("host2", extract_string_value(v)),
_ => panic!(),
}
}
}
"cpu" => {
assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
1 => assert_eq!(None, v),
_ => panic!(),
}
}
}
"memory" => {
assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
_ => panic!(),
}
}
}
"ts" => {
assert_eq!(
ColumnDataType::TimestampMillisecond as i32,
column_schema.datatype
);
assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(
1663840496100023100 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
),
1 => assert_eq!(
1663840496400340001 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
),
_ => panic!(),
}
}
}
_ => panic!(),
}
}
}
fn assert_monitor2_rows(rows: &Option<Rows>) {
let rows = rows.as_ref().unwrap();
let schema = &rows.schema;
let rows = &rows.rows;
assert_eq!(4, schema.len());
assert_eq!(2, rows.len());
for (i, column_schema) in schema.iter().enumerate() {
match &column_schema.column_name[..] {
"host" => {
assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref().unwrap();
match j {
0 => assert_eq!("host3", extract_string_value(v)),
1 => assert_eq!("host4", extract_string_value(v)),
_ => panic!(),
}
}
}
"cpu" => {
assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
_ => panic!(),
}
}
}
"memory" => {
assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(None, v),
1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
_ => panic!(),
}
}
}
"ts" => {
assert_eq!(
ColumnDataType::TimestampMillisecond as i32,
column_schema.datatype
);
assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
for (j, row) in rows.iter().enumerate() {
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(
1663840496100023102 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
),
1 => assert_eq!(
1663840496400340003 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
),
_ => panic!(),
}
}
}
_ => panic!(),
}
}
}
fn extract_string_value(value: &ValueData) -> &str {
match value {
ValueData::StringValue(v) => v,
_ => panic!(),
}
}
fn extract_f64_value(value: &ValueData) -> f64 {
match value {
ValueData::F64Value(v) => *v,
_ => panic!(),
}
}
fn extract_ts_millis_value(value: &ValueData) -> i64 {
match value {
ValueData::TsMillisecondValue(v) => *v,
_ => panic!(),
}
}
}

View File

@@ -62,7 +62,7 @@ pub trait ScriptHandler {
pub trait InfluxdbLineProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()>;
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()>;
}
#[async_trait]

View File

@@ -53,7 +53,7 @@ impl GrpcQueryHandler for DummyInstance {
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
let requests: InsertRequests = request.try_into()?;
for expr in requests.inserts {
let _ = self

View File

@@ -64,7 +64,7 @@ monitor1,host=host2 memory=1027";
precision: None,
lines: lines.to_string(),
};
assert!(instance.exec(&request, QueryContext::arc()).await.is_ok());
assert!(instance.exec(request, QueryContext::arc()).await.is_ok());
let mut output = instance
.do_query(
@@ -93,7 +93,7 @@ monitor1,host=host2 memory=1027 1663840496400340001";
precision: None,
lines: lines.to_string(),
};
instance.exec(&request, QueryContext::arc()).await.unwrap();
instance.exec(request, QueryContext::arc()).await.unwrap();
let mut output = instance
.do_query(