feat(flow): check sink table mismatch on flow creation (#5112)

* tests: more mismatch errors

* feat: check sink table schema if exists&prompt nice err msg

* chore: rm unused variant

* chore: fmt

* chore: cargo clippy

* feat: check schema on create

* feat: better err msg when mismatch

* tests: fix a schema mismatch

* todo: create sink table

* feat: create sink table

* fix: find time index

* tests: auto created sink table

* fix: remove empty keys

* refactor: per review

* chore: fmt

* test: sqlness

* chore: after rebase
This commit is contained in:
discord9
2024-12-25 21:42:37 +08:00
committed by GitHub
parent 4051be4214
commit abf34b845c
23 changed files with 1382 additions and 183 deletions

2
Cargo.lock generated
View File

@@ -2016,6 +2016,7 @@ dependencies = [
name = "common-error"
version = "0.12.0"
dependencies = [
"http 0.2.12",
"snafu 0.8.5",
"strum 0.25.0",
"tonic 0.11.0",
@@ -4061,6 +4062,7 @@ dependencies = [
"get-size-derive2",
"get-size2",
"greptime-proto",
"http 0.2.12",
"hydroflow",
"itertools 0.10.5",
"lazy_static",

View File

@@ -126,6 +126,7 @@ futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -8,6 +8,7 @@ license.workspace = true
workspace = true
[dependencies]
http.workspace = true
snafu.workspace = true
strum.workspace = true
tonic.workspace = true

View File

@@ -18,9 +18,30 @@ pub mod ext;
pub mod mock;
pub mod status_code;
use http::{HeaderMap, HeaderValue};
pub use snafu;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
/// Create a http header map from error code and message.
/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys.
pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
let mut header = HeaderMap::new();
let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| {
HeaderValue::from_bytes(
&msg.as_bytes()
.iter()
.flat_map(|b| std::ascii::escape_default(*b))
.collect::<Vec<u8>>(),
)
.expect("Already escaped string should be valid ascii")
});
header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into());
header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg);
header
}

View File

@@ -45,6 +45,7 @@ get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
http.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true

View File

@@ -30,7 +30,7 @@ use common_telemetry::{debug, info, trace};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
use itertools::Itertools;
use itertools::{EitherOrBoth, Itertools};
use meta_client::MetaClientOptions;
use query::QueryEngine;
use serde::{Deserialize, Serialize};
@@ -46,17 +46,19 @@ use tokio::sync::{broadcast, watch, Mutex, RwLock};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::util::{
relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc,
};
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu,
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
mod flownode_impl;
mod parse_expr;
@@ -245,8 +247,12 @@ impl FlowWorkerManager {
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
let (is_ts_placeholder, proto_schema) =
self.try_fetch_or_create_table(&table_name).await?;
let (is_ts_placeholder, proto_schema) = self
.try_fetch_existing_table(&table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Table not found: {}", table_name.join(".")),
})?;
let schema_len = proto_schema.len();
let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
@@ -396,14 +402,12 @@ impl FlowWorkerManager {
Ok(output)
}
/// Fetch table info or create table from flow's schema if not exist
async fn try_fetch_or_create_table(
/// Fetch table schema and primary key from table info source, if table not exist return None
async fn fetch_table_pk_schema(
&self,
table_name: &TableName,
) -> Result<(bool, Vec<api::v1::ColumnSchema>), Error> {
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self
) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(table_name)
.await?
@@ -420,97 +424,64 @@ impl FlowWorkerManager {
.map(|i| meta.schema.column_schemas[i].name.clone())
.collect_vec();
let schema = meta.schema.column_schemas;
// check if the last column is the auto created timestamp column, hence the table is auto created from
// flow's plan type
let is_auto_create = {
let correct_name = schema
.last()
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
.unwrap_or(false);
let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1);
correct_name && correct_time_index
};
(primary_keys, schema, is_auto_create)
let time_index = meta.schema.timestamp_index;
Ok(Some((primary_keys, time_index, schema)))
} else {
// TODO(discord9): condiser remove buggy auto create by schema
Ok(None)
}
}
let node_ctx = self.node_context.read().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(table_name)
.map(|x| x.1)
.unwrap();
let schema = node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.typ()
.keys
.first()
.map(|v| {
v.column_indices
.iter()
.map(|i| {
schema
.get_name(*i)
.clone()
.unwrap_or_else(|| format!("col_{i}"))
})
.collect_vec()
})
.unwrap_or_default();
let update_at = ColumnSchema::new(
UPDATE_AT_TS_COL,
/// return (primary keys, schema and if the table have a placeholder timestamp column)
/// schema of the table comes from flow's output plan
///
/// adjust to add `update_at` column and ts placeholder if needed
async fn adjust_auto_created_table_schema(
&self,
schema: &RelationDesc,
) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
// TODO(discord9): condiser remove buggy auto create by schema
// TODO(discord9): use default key from schema
let primary_keys = schema
.typ()
.keys
.first()
.map(|v| {
v.column_indices
.iter()
.map(|i| {
schema
.get_name(*i)
.clone()
.unwrap_or_else(|| format!("col_{i}"))
})
.collect_vec()
})
.unwrap_or_default();
let update_at = ColumnSchema::new(
UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
let mut with_auto_added_col = original_schema.clone();
with_auto_added_col.push(update_at);
// if no time index, add one as placeholder
let no_time_index = schema.typ().time_index.is_none();
if no_time_index {
let ts_col = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
)
.with_time_index(true);
with_auto_added_col.push(ts_col);
}
let original_schema = schema
.typ()
.column_types
.clone()
.into_iter()
.enumerate()
.map(|(idx, typ)| {
let name = schema
.names
.get(idx)
.cloned()
.flatten()
.unwrap_or(format!("col_{}", idx));
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
if schema.typ().time_index == Some(idx) {
ret.with_time_index(true)
} else {
ret
}
})
.collect_vec();
let mut with_auto_added_col = original_schema.clone();
with_auto_added_col.push(update_at);
// if no time index, add one as placeholder
let no_time_index = schema.typ().time_index.is_none();
if no_time_index {
let ts_col = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
with_auto_added_col.push(ts_col);
}
(primary_keys, with_auto_added_col, no_time_index)
};
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Ok((is_ts_placeholder, proto_schema))
Ok((primary_keys, with_auto_added_col, no_time_index))
}
}
@@ -813,7 +784,85 @@ impl FlowWorkerManager {
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
// check schema against actual table schema if exists
// if not exist create sink table immediately
if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
// for column schema, only `data_type` need to be check for equality
// since one can omit flow's column name when write flow query
// print a user friendly error message about mismatch and how to correct them
for (idx, zipped) in auto_schema
.iter()
.zip_longest(real_schema.iter())
.enumerate()
{
match zipped {
EitherOrBoth::Both(auto, real) => {
if auto.data_type != real.data_type {
InvalidQuerySnafu {
reason: format!(
"Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
idx,
real.name,
auto.name,
real.data_type,
auto.data_type
),
}
.fail()?;
}
}
EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
// if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
continue;
}
_ => InvalidQuerySnafu {
reason: format!(
"schema length mismatched, expected {} found {}",
real_schema.len(),
auto_schema.len()
),
}
.fail()?,
}
}
let table_id = self
.table_info_source
.get_table_id_from_name(&sink_table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table id for table name {:?}", sink_table_name),
})?;
let table_info_value = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table info value for table id {:?}", table_id),
})?;
let real_schema = table_info_value_to_relation_desc(table_info_value)?;
node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?;
} else {
// assign inferred schema to sink table
// create sink table
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
let did_create = self
.create_table_from_relation(
&format!("flow-id={flow_id}"),
&sink_table_name,
&flow_plan.schema,
)
.await?;
if !did_create {
UnexpectedSnafu {
reason: format!("Failed to create table {:?}", sink_table_name),
}
.fail()?;
}
}
let _ = comment;
let _ = flow_options;

View File

@@ -331,12 +331,14 @@ impl FlownodeContext {
} else {
let global_id = self.new_global_id();
// table id is Some meaning db must have created the table
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database havn't assign one yet or we don't need it
// still update the mapping with new global id
self.table_repr.insert(table_name, table_id, global_id);
Ok(global_id)
}
@@ -358,6 +360,7 @@ impl FlownodeContext {
})?;
self.schema.insert(gid, schema);
Ok(())
}

View File

@@ -20,11 +20,12 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use crate::adapter::util::table_info_value_to_relation_desc;
use crate::adapter::TableName;
use crate::error::{
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::repr::{self, ColumnType, RelationDesc, RelationType};
use crate::repr::RelationDesc;
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
@@ -121,38 +122,7 @@ impl TableSource {
table_name.table_name,
];
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
.clone()
.into_iter()
.map(|col| {
(
ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
},
Some(col.name),
)
})
.unzip();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok((
table_name,
RelationDesc {
typ: RelationType {
column_types,
keys,
time_index,
// by default table schema's column are all non-auto
auto_columns: vec![],
},
names: col_names,
},
))
let desc = table_info_value_to_relation_desc(table_info_value)?;
Ok((table_name, desc))
}
}

View File

@@ -12,16 +12,153 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType};
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use snafu::ResultExt;
use operator::expr_factory::CreateExprFactory;
use session::context::QueryContextBuilder;
use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;
use crate::error::{Error, ExternalSnafu};
use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::FlowWorkerManager;
impl FlowWorkerManager {
/// Create table from given schema(will adjust to add auto column if needed), return true if table is created
pub(crate) async fn create_table_from_relation(
&self,
flow_name: &str,
table_name: &TableName,
relation_desc: &RelationDesc,
) -> Result<bool, Error> {
if self.fetch_table_pk_schema(table_name).await?.is_some() {
return Ok(false);
}
let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?;
//create sink table using pks, column types and is_ts_auto
let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?;
// create sink table
let create_expr = CreateExprFactory {}
.create_table_expr_by_column_schemas(
&TableReference {
catalog: &table_name[0],
schema: &table_name[1],
table: &table_name[2],
},
&proto_schema,
"mito",
Some(&format!("Sink table for flow {}", flow_name)),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
self.submit_create_sink_table_ddl(create_expr).await?;
Ok(true)
}
/// Try fetch table with adjusted schema(added auto column if needed)
pub(crate) async fn try_fetch_existing_table(
&self,
table_name: &TableName,
) -> Result<Option<(bool, Vec<api::v1::ColumnSchema>)>, Error> {
if let Some((primary_keys, time_index, schema)) =
self.fetch_table_pk_schema(table_name).await?
{
// check if the last column is the auto created timestamp column, hence the table is auto created from
// flow's plan type
let is_auto_create = {
let correct_name = schema
.last()
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
.unwrap_or(false);
let correct_time_index = time_index == Some(schema.len() - 1);
correct_name && correct_time_index
};
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Ok(Some((is_auto_create, proto_schema)))
} else {
Ok(None)
}
}
/// submit a create table ddl
pub(crate) async fn submit_create_sink_table_ddl(
&self,
mut create_table: CreateTableExpr,
) -> Result<(), Error> {
let stmt_exec = {
self.frontend_invoker
.read()
.await
.as_ref()
.map(|f| f.statement_executor())
}
.context(UnexpectedSnafu {
reason: "Failed to get statement executor",
})?;
let ctx = Arc::new(
QueryContextBuilder::default()
.current_catalog(create_table.catalog_name.clone())
.current_schema(create_table.schema_name.clone())
.build(),
);
stmt_exec
.create_table_inner(&mut create_table, None, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(())
}
}
pub fn table_info_value_to_relation_desc(
table_info_value: TableInfoValue,
) -> Result<RelationDesc, Error> {
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
.clone()
.into_iter()
.map(|col| {
(
ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
},
Some(col.name),
)
})
.unzip();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![crate::repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok(RelationDesc {
typ: RelationType {
column_types,
keys,
time_index,
// by default table schema's column are all non-auto
auto_columns: vec![],
},
names: col_names,
})
}
pub fn from_proto_to_data_type(
column_schema: &api::v1::ColumnSchema,
@@ -75,3 +212,29 @@ pub fn column_schemas_to_proto(
.collect();
Ok(ret)
}
/// Convert `RelationDesc` to `ColumnSchema` list,
/// if the column name is not present, use `col_{idx}` as the column name
pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec<ColumnSchema> {
schema
.typ()
.column_types
.clone()
.into_iter()
.enumerate()
.map(|(idx, typ)| {
let name = schema
.names
.get(idx)
.cloned()
.flatten()
.unwrap_or(format!("col_{}", idx));
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
if schema.typ().time_index == Some(idx) {
ret.with_time_index(true)
} else {
ret
}
})
.collect_vec()
}

View File

@@ -16,12 +16,13 @@
use std::any::Any;
use common_error::define_into_tonic_status;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
use tonic::metadata::MetadataMap;
use crate::adapter::FlowId;
use crate::expr::EvalError;
@@ -186,6 +187,20 @@ pub enum Error {
},
}
/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
let msg = err.to_string();
let last_err_msg = common_error::ext::StackError::last(&err).to_string();
let code = err.status_code() as u32;
let header = from_err_code_msg_to_header(code, &last_err_msg);
tonic::Status::with_metadata(
tonic::Code::InvalidArgument,
msg,
MetadataMap::from_headers(header),
)
}
/// Result type for flow module
pub type Result<T> = std::result::Result<T, Error>;
@@ -200,9 +215,8 @@ impl ErrorExt for Error {
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::InvalidQuery { .. } | Self::Plan { .. } | Self::Datatypes { .. } => {
StatusCode::PlanQuery
}
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -212,6 +212,8 @@ impl RelationType {
for key in &mut self.keys {
key.remove_col(time_index.unwrap_or(usize::MAX));
}
// remove empty keys
self.keys.retain(|key| !key.is_empty());
self
}

View File

@@ -50,8 +50,8 @@ use tonic::{Request, Response, Status};
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu,
ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
@@ -87,10 +87,7 @@ impl flow_server::Flow for FlowService {
.handle(request)
.await
.map(Response::new)
.map_err(|e| {
let msg = format!("failed to handle request: {:?}", e);
Status::internal(msg)
})
.map_err(to_status_with_last_err)
}
async fn handle_mirror_request(
@@ -126,10 +123,7 @@ impl flow_server::Flow for FlowService {
.handle_inserts(request)
.await
.map(Response::new)
.map_err(|e| {
let msg = format!("failed to handle request: {:?}", e);
Status::internal(msg)
})
.map_err(to_status_with_last_err)
}
}
@@ -544,4 +538,8 @@ impl FrontendInvoker {
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)
}
pub fn statement_executor(&self) -> Arc<StatementExecutor> {
self.statement_executor.clone()
}
}

View File

@@ -216,6 +216,7 @@ impl KeyValPlan {
/// find out the column that should be time index in group exprs(which is all columns that should be keys)
/// TODO(discord9): better ways to assign time index
/// for now, it will found the first column that is timestamp or has a tumble window floor function
fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option<usize> {
group_exprs.iter().position(|expr| {
matches!(
@@ -224,7 +225,7 @@ fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option<usize> {
func: UnaryFunc::TumbleWindowFloor { .. },
expr: _
}
)
) || expr.typ.scalar_type.is_timestamp()
})
}
@@ -1482,7 +1483,7 @@ mod test {
ColumnType::new(CDT::float64_datatype(), true),
ColumnType::new(CDT::timestamp_millisecond_datatype(), true),
])
.with_key(vec![1])
.with_time_index(Some(1))
.into_named(vec![
Some(
"MAX(numbers_with_ts.number) - MIN(numbers_with_ts.number) / Float64(30)"
@@ -1571,7 +1572,7 @@ mod test {
ColumnType::new(ConcreteDataType::uint32_datatype(), true), // max
ColumnType::new(ConcreteDataType::uint32_datatype(), true), // min
])
.with_key(vec![0])
.with_time_index(Some(0))
.into_unnamed(),
),
),

View File

@@ -68,6 +68,7 @@ impl CreateExprFactory {
table_name: &TableReference<'_>,
column_schemas: &[api::v1::ColumnSchema],
engine: &str,
desc: Option<&str>,
) -> Result<CreateTableExpr> {
let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
let create_expr = common_grpc_expr::util::build_create_table_expr(
@@ -75,7 +76,7 @@ impl CreateExprFactory {
table_name,
column_exprs,
engine,
"Created on insertion",
desc.unwrap_or("Created on insertion"),
)
.context(BuildCreateExprOnInsertionSnafu)?;

View File

@@ -870,5 +870,5 @@ fn build_create_table_expr(
request_schema: &[ColumnSchema],
engine: &str,
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine)
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None)
}

View File

@@ -56,7 +56,7 @@ futures-util.workspace = true
hashbrown = "0.14"
headers = "0.3"
hostname = "0.3"
http = "0.2"
http.workspace = true
http-body = "0.4"
humantime.workspace = true
humantime-serde.workspace = true

View File

@@ -12,17 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_error::from_err_code_msg_to_header;
use common_error::status_code::StatusCode;
use common_telemetry::{debug, error};
use serde::{Deserialize, Serialize};
use crate::error::status_code_to_http_status;
use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME;
#[derive(Serialize, Deserialize, Debug)]
pub struct ErrorResponse {
@@ -74,13 +72,16 @@ impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let code = self.code;
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code));
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
let new_header = from_err_code_msg_to_header(
code,
&format!(
"error: {}, execution_time_ms: {}",
self.error, execution_time
),
);
let mut resp = Json(self).into_response();
resp.headers_mut().extend(new_header);
let status = StatusCode::from_u32(code).unwrap_or(StatusCode::Unknown);
let status_code = status_code_to_http_status(&status);

View File

@@ -0,0 +1,161 @@
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "SUM(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
-- SQLNESS ARG restart=true
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "SUM(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
SHOW CREATE FLOW test_numbers_basic;
+--------------------+-------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+--------------------+-------------------------------------------------------------------------------------------------------+
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') |
+--------------------+-------------------------------------------------------------------------------------------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number) as sumup, ts as event_time
FROM
numbers_input_basic
GROUP BY
ts;
Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sumup" BIGINT NULL, |
| | "event_time" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("event_time") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SHOW CREATE FLOW test_numbers_basic;
+--------------------+---------------------------------------------------------------------------------------+
| Flow | Create Flow |
+--------------------+---------------------------------------------------------------------------------------+
| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT sum(number) AS sumup, ts AS event_time FROM numbers_input_basic GROUP BY ts |
+--------------------+---------------------------------------------------------------------------------------+
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sumup" BIGINT NULL, |
| | "event_time" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("event_time") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0

View File

@@ -0,0 +1,58 @@
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SHOW CREATE TABLE out_num_cnt_basic;
SHOW CREATE FLOW test_numbers_basic;
DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number) as sumup, ts as event_time
FROM
numbers_input_basic
GROUP BY
ts;
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SHOW CREATE FLOW test_numbers_basic;
SHOW CREATE TABLE out_num_cnt_basic;
DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;

View File

@@ -17,6 +17,24 @@ GROUP BY
Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "SUM(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
@@ -28,6 +46,24 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
| FLOW_FLUSHED |
+----------------------------------------+
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "SUM(numbers_input_basic.number)" BIGINT NULL, |
| | "window_start" TIMESTAMP(3) NOT NULL, |
| | "window_end" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("window_start"), |
| | PRIMARY KEY ("window_end") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
INSERT INTO
numbers_input_basic
@@ -130,6 +166,22 @@ FROM
Affected Rows: 0
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
@@ -142,6 +194,23 @@ FROM
Affected Rows: 0
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
-- SQLNESS ARG restart=true
INSERT INTO
input_basic
VALUES
@@ -159,6 +228,22 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
@@ -197,6 +282,23 @@ FROM
Affected Rows: 0
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
@@ -320,7 +422,9 @@ CREATE TABLE numbers_input_basic (
Affected Rows: 0
create table out_num_cnt_basic (
number INT,
a INTERVAL,
b INTERVAL,
c INTERVAL,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX
);
@@ -348,6 +452,23 @@ SHOW CREATE FLOW filter_numbers_basic;
| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+-----------------------------------------------------------+
| Table | Create Table |
+-------------------+-----------------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "a" INTERVAL NULL, |
| | "b" INTERVAL NULL, |
| | "c" INTERVAL NULL, |
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+-----------------------------------------------------------+
drop flow filter_numbers_basic;
Affected Rows: 0
@@ -390,6 +511,22 @@ GROUP BY
Affected Rows: 0
SHOW CREATE TABLE approx_rate;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( |
| | "rate" DOUBLE NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
INSERT INTO
bytes_log
VALUES
@@ -542,6 +679,23 @@ FROM
Affected Rows: 0
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.country" STRING NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("ngx_access_log.country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------+
INSERT INTO
ngx_access_log
VALUES
@@ -675,6 +829,23 @@ GROUP BY
Affected Rows: 0
SHOW CREATE TABLE ngx_country;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window"), |
| | PRIMARY KEY ("ngx_access_log.country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
INSERT INTO
ngx_access_log
VALUES
@@ -693,21 +864,20 @@ ADMIN FLUSH_FLOW('calc_ngx_country');
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("ngx_access_log.country", "time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------------------+
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window"), |
| | PRIMARY KEY ("ngx_access_log.country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
SELECT
"ngx_access_log.country",
@@ -824,6 +994,23 @@ HAVING
Affected Rows: 0
SHOW CREATE TABLE temp_alerts;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( |
| | "sensor_id" INT NULL, |
| | "loc" STRING NULL, |
| | "max_temp" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
INSERT INTO
temp_sensor_data
VALUES
@@ -963,6 +1150,25 @@ GROUP BY
Affected Rows: 0
SHOW CREATE TABLE ngx_distribution;
+------------------+-------------------------------------------------+
| Table | Create Table |
+------------------+-------------------------------------------------+
| ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( |
| | "stat" INT NULL, |
| | "bucket_size" INT NULL, |
| | "total_logs" BIGINT NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window"), |
| | PRIMARY KEY ("stat", "bucket_size") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+------------------+-------------------------------------------------+
INSERT INTO
ngx_access_log
VALUES
@@ -1070,6 +1276,23 @@ FROM
Affected Rows: 0
SHOW CREATE TABLE requests_without_ip;
+---------------------+----------------------------------------------------+
| Table | Create Table |
+---------------------+----------------------------------------------------+
| requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( |
| | "service_name" STRING NULL, |
| | "val" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("service_name") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+---------------------+----------------------------------------------------+
INSERT INTO
requests
VALUES
@@ -1269,6 +1492,25 @@ GROUP BY
Affected Rows: 0
SHOW CREATE TABLE android_log_abnormal;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( |
| | "crash" BIGINT NULL, |
| | "fatal" BIGINT NULL, |
| | "backtrace" BIGINT NULL, |
| | "anr" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------------------+-----------------------------------------------------+
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
@@ -1361,6 +1603,25 @@ GROUP BY
Affected Rows: 0
SHOW CREATE TABLE android_log_abnormal;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( |
| | "crash" BIGINT NULL, |
| | "fatal" BIGINT NULL, |
| | "backtrace" BIGINT NULL, |
| | "anr" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------------------+-----------------------------------------------------+
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
@@ -1419,3 +1680,210 @@ DROP TABLE android_log;
Affected Rows: 0
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num
FROM
numbers_input_basic;
Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "avg_after_filter_num" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
-- SQLNESS ARG restart=true
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT avg_after_filter_num FROM out_num_cnt_basic;
+----------------------+
| avg_after_filter_num |
+----------------------+
| 1 |
+----------------------+
INSERT INTO
numbers_input_basic
VALUES
(10, "2021-07-01 00:00:00.200"),
(23, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0
CREATE TABLE `live_connection_log`
(
`device_model` STRING NULL,
`connect_protocol` INT NULL,
`connect_mode` INT NULL,
`connect_retry_times` DOUBLE NULL,
`connect_result` INT NULL,
`first_frame_time` DOUBLE NULL,
`record_time` TIMESTAMP TIME INDEX,
`iot_online` INT NULL,
PRIMARY KEY (`device_model`,`connect_protocol`),
);
Affected Rows: 0
CREATE TABLE `live_connection_statistics_detail`
(
`device_model` STRING NULL,
`connect_protocol` INT NULL,
`connect_mode` INT NULL,
`avg_connect_retry_times` DOUBLE NULL,
`total_connect_result_ok` INT64 NULL,
`total_connect_result_fail` INT64 NULL,
`total_connect` INT64 NULL,
`conection_rate` DOUBLE NULL,
`avg_first_frame_time` DOUBLE NULL,
`max_first_frame_time` DOUBLE NULL,
`ok_conection_rate` DOUBLE NULL,
`record_time_window` TIMESTAMP TIME INDEX,
`update_at` TIMESTAMP,
PRIMARY KEY (`device_model`,`connect_protocol`),
);
Affected Rows: 0
CREATE FLOW live_connection_aggregation_detail
SINK TO live_connection_statistics_detail
AS
SELECT
device_model,
connect_protocol,
connect_mode,
avg(connect_retry_times) as avg_connect_retry_times,
sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok,
sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail,
count(connect_result) as total_connect,
sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate,
avg(first_frame_time) as avg_first_frame_time,
max(first_frame_time) as max_first_frame_time,
sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate,
date_bin(INTERVAL '1 minutes', record_time) as record_time_window,
FROM live_connection_log
WHERE iot_online = 1
GROUP BY
device_model,
connect_protocol,
connect_mode,
record_time_window;
Affected Rows: 0
INSERT INTO
live_connection_log
VALUES
("STM51", 1, 1, 0.5, 1, 0.1, 0, 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('live_connection_aggregation_detail');
+--------------------------------------------------------+
| ADMIN FLUSH_FLOW('live_connection_aggregation_detail') |
+--------------------------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------------------------+
SELECT device_model,
connect_protocol,
connect_mode,
avg_connect_retry_times,
total_connect_result_ok,
total_connect_result_fail,
total_connect,
conection_rate,
avg_first_frame_time,
max_first_frame_time,
ok_conection_rate,
record_time_window FROM live_connection_statistics_detail;
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
| device_model | connect_protocol | connect_mode | avg_connect_retry_times | total_connect_result_ok | total_connect_result_fail | total_connect | conection_rate | avg_first_frame_time | max_first_frame_time | ok_conection_rate | record_time_window |
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
| STM51 | 1 | 1 | 0.5 | 1 | 0 | 1 | 1.0 | 0.1 | 0.1 | 1.0 | 1970-01-01T00:00:00 |
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
DROP FLOW live_connection_aggregation_detail;
Affected Rows: 0
DROP TABLE live_connection_log;
Affected Rows: 0
DROP TABLE live_connection_statistics_detail;
Affected Rows: 0

View File

@@ -13,11 +13,15 @@ FROM
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
SHOW CREATE TABLE out_num_cnt_basic;
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
INSERT INTO
numbers_input_basic
@@ -75,6 +79,8 @@ SELECT
FROM
input_basic;
SHOW CREATE TABLE out_basic;
DROP FLOW test_wildcard_basic;
CREATE FLOW test_wildcard_basic sink TO out_basic AS
@@ -83,6 +89,9 @@ SELECT
FROM
input_basic;
SHOW CREATE TABLE out_basic;
-- SQLNESS ARG restart=true
INSERT INTO
input_basic
VALUES
@@ -92,6 +101,8 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SHOW CREATE TABLE out_basic;
SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
@@ -112,6 +123,8 @@ SELECT
FROM
distinct_basic;
SHOW CREATE TABLE out_distinct_basic;
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
@@ -168,7 +181,9 @@ CREATE TABLE numbers_input_basic (
);
create table out_num_cnt_basic (
number INT,
a INTERVAL,
b INTERVAL,
c INTERVAL,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX
);
@@ -184,6 +199,8 @@ where
SHOW CREATE FLOW filter_numbers_basic;
SHOW CREATE TABLE out_num_cnt_basic;
drop flow filter_numbers_basic;
drop table out_num_cnt_basic;
@@ -214,6 +231,8 @@ from
GROUP BY
time_window;
SHOW CREATE TABLE approx_rate;
INSERT INTO
bytes_log
VALUES
@@ -294,6 +313,8 @@ SELECT
FROM
ngx_access_log;
SHOW CREATE TABLE ngx_country;
INSERT INTO
ngx_access_log
VALUES
@@ -359,6 +380,8 @@ GROUP BY
country,
time_window;
SHOW CREATE TABLE ngx_country;
INSERT INTO
ngx_access_log
VALUES
@@ -437,6 +460,8 @@ GROUP BY
HAVING
max_temp > 100;
SHOW CREATE TABLE temp_alerts;
INSERT INTO
temp_sensor_data
VALUES
@@ -516,6 +541,8 @@ GROUP BY
time_window,
bucket_size;
SHOW CREATE TABLE ngx_distribution;
INSERT INTO
ngx_access_log
VALUES
@@ -580,6 +607,8 @@ SELECT
FROM
requests;
SHOW CREATE TABLE requests_without_ip;
INSERT INTO
requests
VALUES
@@ -680,6 +709,8 @@ FROM android_log
GROUP BY
time_window;
SHOW CREATE TABLE android_log_abnormal;
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
@@ -732,6 +763,8 @@ FROM android_log
GROUP BY
time_window;
SHOW CREATE TABLE android_log_abnormal;
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
@@ -755,3 +788,128 @@ DROP FLOW calc_android_log_abnormal;
DROP TABLE android_log_abnormal;
DROP TABLE android_log;
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num
FROM
numbers_input_basic;
SHOW CREATE TABLE out_num_cnt_basic;
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT avg_after_filter_num FROM out_num_cnt_basic;
INSERT INTO
numbers_input_basic
VALUES
(10, "2021-07-01 00:00:00.200"),
(23, "2021-07-01 00:00:00.600");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;
CREATE TABLE `live_connection_log`
(
`device_model` STRING NULL,
`connect_protocol` INT NULL,
`connect_mode` INT NULL,
`connect_retry_times` DOUBLE NULL,
`connect_result` INT NULL,
`first_frame_time` DOUBLE NULL,
`record_time` TIMESTAMP TIME INDEX,
`iot_online` INT NULL,
PRIMARY KEY (`device_model`,`connect_protocol`),
);
CREATE TABLE `live_connection_statistics_detail`
(
`device_model` STRING NULL,
`connect_protocol` INT NULL,
`connect_mode` INT NULL,
`avg_connect_retry_times` DOUBLE NULL,
`total_connect_result_ok` INT64 NULL,
`total_connect_result_fail` INT64 NULL,
`total_connect` INT64 NULL,
`conection_rate` DOUBLE NULL,
`avg_first_frame_time` DOUBLE NULL,
`max_first_frame_time` DOUBLE NULL,
`ok_conection_rate` DOUBLE NULL,
`record_time_window` TIMESTAMP TIME INDEX,
`update_at` TIMESTAMP,
PRIMARY KEY (`device_model`,`connect_protocol`),
);
CREATE FLOW live_connection_aggregation_detail
SINK TO live_connection_statistics_detail
AS
SELECT
device_model,
connect_protocol,
connect_mode,
avg(connect_retry_times) as avg_connect_retry_times,
sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok,
sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail,
count(connect_result) as total_connect,
sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate,
avg(first_frame_time) as avg_first_frame_time,
max(first_frame_time) as max_first_frame_time,
sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate,
date_bin(INTERVAL '1 minutes', record_time) as record_time_window,
FROM live_connection_log
WHERE iot_online = 1
GROUP BY
device_model,
connect_protocol,
connect_mode,
record_time_window;
INSERT INTO
live_connection_log
VALUES
("STM51", 1, 1, 0.5, 1, 0.1, 0, 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('live_connection_aggregation_detail');
SELECT device_model,
connect_protocol,
connect_mode,
avg_connect_retry_times,
total_connect_result_ok,
total_connect_result_fail,
total_connect,
conection_rate,
avg_first_frame_time,
max_first_frame_time,
ok_conection_rate,
record_time_window FROM live_connection_statistics_detail;
DROP FLOW live_connection_aggregation_detail;
DROP TABLE live_connection_log;
DROP TABLE live_connection_statistics_detail;

View File

@@ -322,3 +322,87 @@ drop table numbers_input_show;
Affected Rows: 0
CREATE TABLE numbers_input_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
create table out_num_cnt_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX,
PRIMARY KEY(number),
);
Affected Rows: 0
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10;
Affected Rows: 0
INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT number FROM out_num_cnt_show;
+--------+
| number |
+--------+
| 15 |
| 16 |
+--------+
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
Error: 1003(Internal), Internal error: 1003
-- sink table stays the same since the flow error out due to column mismatch
SELECT number FROM out_num_cnt_show;
+--------+
| number |
+--------+
| 15 |
| 16 |
+--------+
DROP FLOW filter_numbers_show;
Affected Rows: 0
drop table out_num_cnt_show;
Affected Rows: 0
drop table numbers_input_show;
Affected Rows: 0

View File

@@ -125,3 +125,45 @@ DROP FLOW filter_numbers_show;
drop table out_num_cnt_show;
drop table numbers_input_show;
CREATE TABLE numbers_input_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX,
PRIMARY KEY(number),
);
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10;
INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT number FROM out_num_cnt_show;
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
-- sink table stays the same since the flow error out due to column mismatch
SELECT number FROM out_num_cnt_show;
DROP FLOW filter_numbers_show;
drop table out_num_cnt_show;
drop table numbers_input_show;