mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
Compare commits
1 Commits
flow/lb_fe
...
flow/admin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ca3d72e3f |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4876,7 +4876,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "greptime-proto"
|
name = "greptime-proto"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
|
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -132,7 +132,7 @@ etcd-client = "0.14"
|
|||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
http = "1"
|
http = "1"
|
||||||
humantime = "2.1"
|
humantime = "2.1"
|
||||||
|
|||||||
90
src/common/function/src/adjust_flow.rs
Normal file
90
src/common/function/src/adjust_flow.rs
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use common_macro::admin_fn;
|
||||||
|
use common_query::error::{
|
||||||
|
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||||
|
};
|
||||||
|
use common_query::prelude::Signature;
|
||||||
|
use datafusion::logical_expr::Volatility;
|
||||||
|
use datatypes::value::{Value, ValueRef};
|
||||||
|
use session::context::QueryContextRef;
|
||||||
|
use snafu::ensure;
|
||||||
|
use store_api::storage::ConcreteDataType;
|
||||||
|
|
||||||
|
use crate::handlers::FlowServiceHandlerRef;
|
||||||
|
use crate::helper::parse_catalog_flow;
|
||||||
|
|
||||||
|
fn adjust_signature() -> Signature {
|
||||||
|
Signature::exact(
|
||||||
|
vec![
|
||||||
|
ConcreteDataType::string_datatype(), // flow name
|
||||||
|
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||||
|
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||||
|
],
|
||||||
|
Volatility::Immutable,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[admin_fn(
|
||||||
|
name = AdjustFlowFunction,
|
||||||
|
display_name = adjust_flow,
|
||||||
|
sig_fn = adjust_signature,
|
||||||
|
ret = uint64
|
||||||
|
)]
|
||||||
|
pub(crate) async fn adjust_flow(
|
||||||
|
flow_service_handler: &FlowServiceHandlerRef,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
params: &[ValueRef<'_>],
|
||||||
|
) -> Result<Value> {
|
||||||
|
ensure!(
|
||||||
|
params.len() == 3,
|
||||||
|
InvalidFuncArgsSnafu {
|
||||||
|
err_msg: format!(
|
||||||
|
"The length of the args is not correct, expect 3, have: {}",
|
||||||
|
params.len()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||||
|
(
|
||||||
|
ValueRef::String(flow_name),
|
||||||
|
ValueRef::UInt64(min_run_interval),
|
||||||
|
ValueRef::UInt64(max_filter_num),
|
||||||
|
) => (flow_name, min_run_interval, max_filter_num),
|
||||||
|
_ => {
|
||||||
|
return UnsupportedInputDataTypeSnafu {
|
||||||
|
function: "adjust_flow",
|
||||||
|
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||||
|
|
||||||
|
let res = flow_service_handler
|
||||||
|
.adjust(
|
||||||
|
&catalog_name,
|
||||||
|
&flow_name,
|
||||||
|
min_run_interval,
|
||||||
|
max_filter_num as usize,
|
||||||
|
query_ctx.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let affected_rows = res.affected_rows;
|
||||||
|
|
||||||
|
Ok(Value::from(affected_rows))
|
||||||
|
}
|
||||||
@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
|||||||
use migrate_region::MigrateRegionFunction;
|
use migrate_region::MigrateRegionFunction;
|
||||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||||
|
|
||||||
|
use crate::adjust_flow::AdjustFlowFunction;
|
||||||
use crate::flush_flow::FlushFlowFunction;
|
use crate::flush_flow::FlushFlowFunction;
|
||||||
use crate::function_registry::FunctionRegistry;
|
use crate::function_registry::FunctionRegistry;
|
||||||
|
|
||||||
@@ -43,5 +44,6 @@ impl AdminFunction {
|
|||||||
registry.register_async(Arc::new(FlushTableFunction));
|
registry.register_async(Arc::new(FlushTableFunction));
|
||||||
registry.register_async(Arc::new(CompactTableFunction));
|
registry.register_async(Arc::new(CompactTableFunction));
|
||||||
registry.register_async(Arc::new(FlushFlowFunction));
|
registry.register_async(Arc::new(FlushFlowFunction));
|
||||||
|
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,21 +12,19 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use common_error::ext::BoxedError;
|
|
||||||
use common_macro::admin_fn;
|
use common_macro::admin_fn;
|
||||||
use common_query::error::{
|
use common_query::error::{
|
||||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||||
UnsupportedInputDataTypeSnafu,
|
|
||||||
};
|
};
|
||||||
use common_query::prelude::Signature;
|
use common_query::prelude::Signature;
|
||||||
use datafusion::logical_expr::Volatility;
|
use datafusion::logical_expr::Volatility;
|
||||||
use datatypes::value::{Value, ValueRef};
|
use datatypes::value::{Value, ValueRef};
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{ensure, ResultExt};
|
use snafu::ensure;
|
||||||
use sql::parser::ParserContext;
|
|
||||||
use store_api::storage::ConcreteDataType;
|
use store_api::storage::ConcreteDataType;
|
||||||
|
|
||||||
use crate::handlers::FlowServiceHandlerRef;
|
use crate::handlers::FlowServiceHandlerRef;
|
||||||
|
use crate::helper::parse_catalog_flow;
|
||||||
|
|
||||||
fn flush_signature() -> Signature {
|
fn flush_signature() -> Signature {
|
||||||
Signature::uniform(
|
Signature::uniform(
|
||||||
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
|
|||||||
query_ctx: &QueryContextRef,
|
query_ctx: &QueryContextRef,
|
||||||
params: &[ValueRef<'_>],
|
params: &[ValueRef<'_>],
|
||||||
) -> Result<Value> {
|
) -> Result<Value> {
|
||||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
|
||||||
|
|
||||||
let res = flow_service_handler
|
|
||||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
|
||||||
.await?;
|
|
||||||
let affected_rows = res.affected_rows;
|
|
||||||
|
|
||||||
Ok(Value::from(affected_rows))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_flush_flow(
|
|
||||||
params: &[ValueRef<'_>],
|
|
||||||
query_ctx: &QueryContextRef,
|
|
||||||
) -> Result<(String, String)> {
|
|
||||||
ensure!(
|
ensure!(
|
||||||
params.len() == 1,
|
params.len() == 1,
|
||||||
InvalidFuncArgsSnafu {
|
InvalidFuncArgsSnafu {
|
||||||
@@ -70,7 +54,6 @@ fn parse_flush_flow(
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let ValueRef::String(flow_name) = params[0] else {
|
let ValueRef::String(flow_name) = params[0] else {
|
||||||
return UnsupportedInputDataTypeSnafu {
|
return UnsupportedInputDataTypeSnafu {
|
||||||
function: "flush_flow",
|
function: "flush_flow",
|
||||||
@@ -78,27 +61,14 @@ fn parse_flush_flow(
|
|||||||
}
|
}
|
||||||
.fail();
|
.fail();
|
||||||
};
|
};
|
||||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(ExecuteSnafu)?;
|
|
||||||
|
|
||||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
let res = flow_service_handler
|
||||||
[flow_name] => (
|
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||||
query_ctx.current_catalog().to_string(),
|
.await?;
|
||||||
flow_name.value.clone(),
|
let affected_rows = res.affected_rows;
|
||||||
),
|
|
||||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
Ok(Value::from(affected_rows))
|
||||||
_ => {
|
|
||||||
return InvalidFuncArgsSnafu {
|
|
||||||
err_msg: format!(
|
|
||||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
|
||||||
obj_name
|
|
||||||
),
|
|
||||||
}
|
|
||||||
.fail()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok((catalog_name, flow_name))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -154,10 +124,7 @@ mod test {
|
|||||||
("catalog.flow_name", ("catalog", "flow_name")),
|
("catalog.flow_name", ("catalog", "flow_name")),
|
||||||
];
|
];
|
||||||
for (input, expected) in testcases.iter() {
|
for (input, expected) in testcases.iter() {
|
||||||
let args = vec![*input];
|
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
|
||||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
|
|||||||
flow: &str,
|
flow: &str,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
) -> Result<api::v1::flow::FlowResponse>;
|
) -> Result<api::v1::flow::FlowResponse>;
|
||||||
|
|
||||||
|
async fn adjust(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
flow: &str,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||||
|
|||||||
@@ -12,12 +12,15 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
use common_error::ext::BoxedError;
|
||||||
|
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use datatypes::types::cast::cast;
|
use datatypes::types::cast::cast;
|
||||||
use datatypes::value::ValueRef;
|
use datatypes::value::ValueRef;
|
||||||
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
use sql::parser::ParserContext;
|
||||||
|
|
||||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||||
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
|||||||
})
|
})
|
||||||
.map(|v| v.as_u64())
|
.map(|v| v.as_u64())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parse_catalog_flow(
|
||||||
|
flow_name: &str,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> Result<(String, String)> {
|
||||||
|
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExecuteSnafu)?;
|
||||||
|
|
||||||
|
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||||
|
[flow_name] => (
|
||||||
|
query_ctx.current_catalog().to_string(),
|
||||||
|
flow_name.value.clone(),
|
||||||
|
),
|
||||||
|
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||||
|
_ => {
|
||||||
|
return InvalidFuncArgsSnafu {
|
||||||
|
err_msg: format!(
|
||||||
|
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||||
|
obj_name
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok((catalog_name, flow_name))
|
||||||
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
#![feature(try_blocks)]
|
#![feature(try_blocks)]
|
||||||
|
|
||||||
|
mod adjust_flow;
|
||||||
mod admin;
|
mod admin;
|
||||||
mod flush_flow;
|
mod flush_flow;
|
||||||
mod macros;
|
mod macros;
|
||||||
|
|||||||
@@ -148,6 +148,17 @@ impl FunctionState {
|
|||||||
) -> Result<api::v1::flow::FlowResponse> {
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn adjust(
|
||||||
|
&self,
|
||||||
|
_catalog: &str,
|
||||||
|
_flow: &str,
|
||||||
|
_min_run_interval_secs: u64,
|
||||||
|
_max_filter_num_per_query: usize,
|
||||||
|
_ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ pub mod create_table;
|
|||||||
pub mod datanode_handler;
|
pub mod datanode_handler;
|
||||||
pub mod flownode_handler;
|
pub mod flownode_handler;
|
||||||
|
|
||||||
use std::assert_matches::assert_matches;
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use api::v1::meta::Partition;
|
use api::v1::meta::Partition;
|
||||||
@@ -76,6 +75,8 @@ pub async fn create_logical_table(
|
|||||||
physical_table_id: TableId,
|
physical_table_id: TableId,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
) -> TableId {
|
) -> TableId {
|
||||||
|
use std::assert_matches::assert_matches;
|
||||||
|
|
||||||
let tasks = vec![test_create_logical_table_task(table_name)];
|
let tasks = vec![test_create_logical_table_task(table_name)];
|
||||||
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
|
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
|
||||||
let status = procedure.on_prepare().await.unwrap();
|
let status = procedure.on_prepare().await.unwrap();
|
||||||
|
|||||||
@@ -14,7 +14,7 @@
|
|||||||
|
|
||||||
pub mod flow_info;
|
pub mod flow_info;
|
||||||
pub(crate) mod flow_name;
|
pub(crate) mod flow_name;
|
||||||
pub mod flow_route;
|
pub(crate) mod flow_route;
|
||||||
pub mod flow_state;
|
pub mod flow_state;
|
||||||
mod flownode_addr_helper;
|
mod flownode_addr_helper;
|
||||||
pub(crate) mod flownode_flow;
|
pub(crate) mod flownode_flow;
|
||||||
|
|||||||
@@ -114,37 +114,37 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
|
|||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct FlowInfoValue {
|
pub struct FlowInfoValue {
|
||||||
/// The source tables used by the flow.
|
/// The source tables used by the flow.
|
||||||
pub source_table_ids: Vec<TableId>,
|
pub(crate) source_table_ids: Vec<TableId>,
|
||||||
/// The sink table used by the flow.
|
/// The sink table used by the flow.
|
||||||
pub sink_table_name: TableName,
|
pub(crate) sink_table_name: TableName,
|
||||||
/// Which flow nodes this flow is running on.
|
/// Which flow nodes this flow is running on.
|
||||||
pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
|
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
|
||||||
/// The catalog name.
|
/// The catalog name.
|
||||||
pub catalog_name: String,
|
pub(crate) catalog_name: String,
|
||||||
/// The query context used when create flow.
|
/// The query context used when create flow.
|
||||||
/// Although flow doesn't belong to any schema, this query_context is needed to remember
|
/// Although flow doesn't belong to any schema, this query_context is needed to remember
|
||||||
/// the query context when `create_flow` is executed
|
/// the query context when `create_flow` is executed
|
||||||
/// for recovering flow using the same sql&query_context after db restart.
|
/// for recovering flow using the same sql&query_context after db restart.
|
||||||
/// if none, should use default query context
|
/// if none, should use default query context
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub query_context: Option<crate::rpc::ddl::QueryContext>,
|
pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
|
||||||
/// The flow name.
|
/// The flow name.
|
||||||
pub flow_name: String,
|
pub(crate) flow_name: String,
|
||||||
/// The raw sql.
|
/// The raw sql.
|
||||||
pub raw_sql: String,
|
pub(crate) raw_sql: String,
|
||||||
/// The expr of expire.
|
/// The expr of expire.
|
||||||
/// Duration in seconds as `i64`.
|
/// Duration in seconds as `i64`.
|
||||||
pub expire_after: Option<i64>,
|
pub(crate) expire_after: Option<i64>,
|
||||||
/// The comment.
|
/// The comment.
|
||||||
pub comment: String,
|
pub(crate) comment: String,
|
||||||
/// The options.
|
/// The options.
|
||||||
pub options: HashMap<String, String>,
|
pub(crate) options: HashMap<String, String>,
|
||||||
/// The created time
|
/// The created time
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub created_time: DateTime<Utc>,
|
pub(crate) created_time: DateTime<Utc>,
|
||||||
/// The updated time.
|
/// The updated time.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub updated_time: DateTime<Utc>,
|
pub(crate) updated_time: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowInfoValue {
|
impl FlowInfoValue {
|
||||||
|
|||||||
@@ -133,18 +133,6 @@ pub enum Error {
|
|||||||
source: datatypes::error::Error,
|
source: datatypes::error::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display(
|
|
||||||
"Failed to downcast vector of type '{:?}' to type '{:?}'",
|
|
||||||
from_type,
|
|
||||||
to_type
|
|
||||||
))]
|
|
||||||
DowncastVector {
|
|
||||||
from_type: ConcreteDataType,
|
|
||||||
to_type: ConcreteDataType,
|
|
||||||
#[snafu(implicit)]
|
|
||||||
location: Location,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Error occurs when performing arrow computation"))]
|
#[snafu(display("Error occurs when performing arrow computation"))]
|
||||||
ArrowCompute {
|
ArrowCompute {
|
||||||
#[snafu(source)]
|
#[snafu(source)]
|
||||||
@@ -204,8 +192,6 @@ impl ErrorExt for Error {
|
|||||||
| Error::PhysicalExpr { .. }
|
| Error::PhysicalExpr { .. }
|
||||||
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
|
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
|
||||||
|
|
||||||
Error::DowncastVector { .. } => StatusCode::Unexpected,
|
|
||||||
|
|
||||||
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
|
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
|
||||||
|
|
||||||
Error::ArrowCompute { .. } => StatusCode::IllegalState,
|
Error::ArrowCompute { .. } => StatusCode::IllegalState,
|
||||||
|
|||||||
@@ -30,16 +30,13 @@ pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecord
|
|||||||
use datatypes::arrow::compute::SortOptions;
|
use datatypes::arrow::compute::SortOptions;
|
||||||
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
|
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
|
||||||
use datatypes::arrow::util::pretty;
|
use datatypes::arrow::util::pretty;
|
||||||
use datatypes::prelude::{ConcreteDataType, VectorRef};
|
use datatypes::prelude::VectorRef;
|
||||||
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
|
use datatypes::schema::{Schema, SchemaRef};
|
||||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
|
||||||
use datatypes::types::json_type_value_to_string;
|
|
||||||
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
|
|
||||||
use error::Result;
|
use error::Result;
|
||||||
use futures::task::{Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
use futures::{Stream, TryStreamExt};
|
use futures::{Stream, TryStreamExt};
|
||||||
pub use recordbatch::RecordBatch;
|
pub use recordbatch::RecordBatch;
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, ResultExt};
|
||||||
|
|
||||||
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
|
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
|
||||||
fn name(&self) -> &str {
|
fn name(&self) -> &str {
|
||||||
@@ -61,146 +58,6 @@ pub struct OrderOption {
|
|||||||
pub options: SortOptions,
|
pub options: SortOptions,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
|
|
||||||
///
|
|
||||||
/// The mapper function is applied to each [RecordBatch] in the stream.
|
|
||||||
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
|
|
||||||
/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
|
|
||||||
/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
|
|
||||||
pub struct SendableRecordBatchMapper {
|
|
||||||
inner: SendableRecordBatchStream,
|
|
||||||
/// The mapper function is applied to each [RecordBatch] in the stream.
|
|
||||||
/// The original schema and the mapped schema are passed to the mapper function.
|
|
||||||
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
|
|
||||||
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
|
|
||||||
schema: SchemaRef,
|
|
||||||
/// Whether the mapper function is applied to each [RecordBatch] in the stream.
|
|
||||||
apply_mapper: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Maps the json type to string in the batch.
|
|
||||||
///
|
|
||||||
/// The json type is mapped to string by converting the json value to string.
|
|
||||||
/// The batch is updated to have the same number of columns as the original batch,
|
|
||||||
/// but with the json type mapped to string.
|
|
||||||
pub fn map_json_type_to_string(
|
|
||||||
batch: RecordBatch,
|
|
||||||
original_schema: &SchemaRef,
|
|
||||||
mapped_schema: &SchemaRef,
|
|
||||||
) -> Result<RecordBatch> {
|
|
||||||
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
|
|
||||||
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
|
|
||||||
if let ConcreteDataType::Json(j) = schema.data_type {
|
|
||||||
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
|
|
||||||
let binary_vector = vector
|
|
||||||
.as_any()
|
|
||||||
.downcast_ref::<BinaryVector>()
|
|
||||||
.with_context(|| error::DowncastVectorSnafu {
|
|
||||||
from_type: schema.data_type.clone(),
|
|
||||||
to_type: ConcreteDataType::binary_datatype(),
|
|
||||||
})?;
|
|
||||||
for value in binary_vector.iter_data() {
|
|
||||||
let Some(value) = value else {
|
|
||||||
string_vector_builder.push(None);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let string_value =
|
|
||||||
json_type_value_to_string(value, &j.format).with_context(|_| {
|
|
||||||
error::CastVectorSnafu {
|
|
||||||
from_type: schema.data_type.clone(),
|
|
||||||
to_type: ConcreteDataType::string_datatype(),
|
|
||||||
}
|
|
||||||
})?;
|
|
||||||
string_vector_builder.push(Some(string_value.as_str()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let string_vector = string_vector_builder.finish();
|
|
||||||
vectors.push(Arc::new(string_vector) as VectorRef);
|
|
||||||
} else {
|
|
||||||
vectors.push(vector.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
RecordBatch::new(mapped_schema.clone(), vectors)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Maps the json type to string in the schema.
|
|
||||||
///
|
|
||||||
/// The json type is mapped to string by converting the json value to string.
|
|
||||||
/// The schema is updated to have the same number of columns as the original schema,
|
|
||||||
/// but with the json type mapped to string.
|
|
||||||
///
|
|
||||||
/// Returns the new schema and whether the schema needs to be mapped to string.
|
|
||||||
pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
|
|
||||||
let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
|
|
||||||
let mut apply_mapper = false;
|
|
||||||
for column in schema.column_schemas() {
|
|
||||||
if matches!(column.data_type, ConcreteDataType::Json(_)) {
|
|
||||||
new_columns.push(ColumnSchema::new(
|
|
||||||
column.name.to_string(),
|
|
||||||
ConcreteDataType::string_datatype(),
|
|
||||||
column.is_nullable(),
|
|
||||||
));
|
|
||||||
apply_mapper = true;
|
|
||||||
} else {
|
|
||||||
new_columns.push(column.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(Arc::new(Schema::new(new_columns)), apply_mapper)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendableRecordBatchMapper {
|
|
||||||
/// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
|
|
||||||
pub fn new(
|
|
||||||
inner: SendableRecordBatchStream,
|
|
||||||
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
|
|
||||||
schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
|
|
||||||
) -> Self {
|
|
||||||
let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
mapper,
|
|
||||||
schema: mapped_schema,
|
|
||||||
apply_mapper,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RecordBatchStream for SendableRecordBatchMapper {
|
|
||||||
fn name(&self) -> &str {
|
|
||||||
"SendableRecordBatchMapper"
|
|
||||||
}
|
|
||||||
|
|
||||||
fn schema(&self) -> SchemaRef {
|
|
||||||
self.schema.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
|
||||||
self.inner.output_ordering()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
|
||||||
self.inner.metrics()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream for SendableRecordBatchMapper {
|
|
||||||
type Item = Result<RecordBatch>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
if self.apply_mapper {
|
|
||||||
Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
|
|
||||||
opt.map(|result| {
|
|
||||||
result
|
|
||||||
.and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Pin::new(&mut self.inner).poll_next(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
|
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
|
||||||
/// that will produce no results
|
/// that will produce no results
|
||||||
pub struct EmptyRecordBatchStream {
|
pub struct EmptyRecordBatchStream {
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ prost.workspace = true
|
|||||||
query.workspace = true
|
query.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
servers.workspace = true
|
servers.workspace = true
|
||||||
session.workspace = true
|
session.workspace = true
|
||||||
smallvec.workspace = true
|
smallvec.workspace = true
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::v1::flow::{
|
use api::v1::flow::{
|
||||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||||
};
|
};
|
||||||
use api::v1::region::InsertRequests;
|
use api::v1::region::InsertRequests;
|
||||||
use catalog::CatalogManager;
|
use catalog::CatalogManager;
|
||||||
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
|
|||||||
use datatypes::value::Value;
|
use datatypes::value::Value;
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use session::context::QueryContextBuilder;
|
use session::context::QueryContextBuilder;
|
||||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
struct Options {
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
}
|
||||||
|
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||||
|
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||||
|
})?;
|
||||||
|
self.batching_engine
|
||||||
|
.adjust_flow(
|
||||||
|
flow_id.unwrap().id as u64,
|
||||||
|
options.min_run_interval_secs,
|
||||||
|
options.max_filter_num_per_query,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(to_meta_err(snafu::location!()))?;
|
||||||
|
Ok(Default::default())
|
||||||
|
}
|
||||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -841,93 +861,6 @@ fn to_meta_err(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
|
||||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
|
||||||
let query_ctx = request
|
|
||||||
.header
|
|
||||||
.and_then(|h| h.query_context)
|
|
||||||
.map(|ctx| ctx.into());
|
|
||||||
match request.body {
|
|
||||||
Some(flow_request::Body::Create(CreateRequest {
|
|
||||||
flow_id: Some(task_id),
|
|
||||||
source_table_ids,
|
|
||||||
sink_table_name: Some(sink_table_name),
|
|
||||||
create_if_not_exists,
|
|
||||||
expire_after,
|
|
||||||
comment,
|
|
||||||
sql,
|
|
||||||
flow_options,
|
|
||||||
or_replace,
|
|
||||||
})) => {
|
|
||||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
|
||||||
let sink_table_name = [
|
|
||||||
sink_table_name.catalog_name,
|
|
||||||
sink_table_name.schema_name,
|
|
||||||
sink_table_name.table_name,
|
|
||||||
];
|
|
||||||
let expire_after = expire_after.map(|e| e.value);
|
|
||||||
let args = CreateFlowArgs {
|
|
||||||
flow_id: task_id.id as u64,
|
|
||||||
sink_table_name,
|
|
||||||
source_table_ids,
|
|
||||||
create_if_not_exists,
|
|
||||||
or_replace,
|
|
||||||
expire_after,
|
|
||||||
comment: Some(comment),
|
|
||||||
sql: sql.clone(),
|
|
||||||
flow_options,
|
|
||||||
query_ctx,
|
|
||||||
};
|
|
||||||
let ret = self
|
|
||||||
.create_flow(args)
|
|
||||||
.await
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
|
||||||
.map_err(to_meta_err(snafu::location!()))?;
|
|
||||||
METRIC_FLOW_TASK_COUNT.inc();
|
|
||||||
Ok(FlowResponse {
|
|
||||||
affected_flows: ret
|
|
||||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
|
||||||
.into_iter()
|
|
||||||
.collect_vec(),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Some(flow_request::Body::Drop(DropRequest {
|
|
||||||
flow_id: Some(flow_id),
|
|
||||||
})) => {
|
|
||||||
self.remove_flow(flow_id.id as u64)
|
|
||||||
.await
|
|
||||||
.map_err(to_meta_err(snafu::location!()))?;
|
|
||||||
METRIC_FLOW_TASK_COUNT.dec();
|
|
||||||
Ok(Default::default())
|
|
||||||
}
|
|
||||||
Some(flow_request::Body::Flush(FlushFlow {
|
|
||||||
flow_id: Some(flow_id),
|
|
||||||
})) => {
|
|
||||||
let row = self
|
|
||||||
.flush_flow_inner(flow_id.id as u64)
|
|
||||||
.await
|
|
||||||
.map_err(to_meta_err(snafu::location!()))?;
|
|
||||||
Ok(FlowResponse {
|
|
||||||
affected_flows: vec![flow_id],
|
|
||||||
affected_rows: row as u64,
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
|
||||||
self.handle_inserts_inner(request)
|
|
||||||
.await
|
|
||||||
.map(|_| Default::default())
|
|
||||||
.map_err(to_meta_err(snafu::location!()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FlowEngine for StreamingEngine {
|
impl FlowEngine for StreamingEngine {
|
||||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||||
self.create_flow_inner(args).await
|
self.create_flow_inner(args).await
|
||||||
|
|||||||
@@ -388,6 +388,20 @@ impl BatchingEngine {
|
|||||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||||
self.tasks.read().await.contains_key(&flow_id)
|
self.tasks.read().await.contains_key(&flow_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn adjust_flow(
|
||||||
|
&self,
|
||||||
|
flow_id: FlowId,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||||
|
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||||
|
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||||
|
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowEngine for BatchingEngine {
|
impl FlowEngine for BatchingEngine {
|
||||||
|
|||||||
@@ -14,9 +14,8 @@
|
|||||||
|
|
||||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::sync::{Arc, Weak};
|
||||||
use std::sync::{Arc, Mutex, Weak};
|
use std::time::SystemTime;
|
||||||
use std::time::{Duration, Instant, SystemTime};
|
|
||||||
|
|
||||||
use api::v1::greptime_request::Request;
|
use api::v1::greptime_request::Request;
|
||||||
use api::v1::CreateTableExpr;
|
use api::v1::CreateTableExpr;
|
||||||
@@ -27,21 +26,20 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
|||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use common_meta::rpc::store::RangeRequest;
|
use common_meta::rpc::store::RangeRequest;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::{debug, warn};
|
use common_telemetry::warn;
|
||||||
use itertools::Itertools;
|
|
||||||
use meta_client::client::MetaClient;
|
use meta_client::client::MetaClient;
|
||||||
|
use rand::rng;
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
use crate::batching_mode::task::BatchingTask;
|
|
||||||
use crate::batching_mode::{
|
use crate::batching_mode::{
|
||||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||||
GRPC_MAX_RETRIES,
|
GRPC_MAX_RETRIES,
|
||||||
};
|
};
|
||||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
use crate::{Error, FlowAuthHeader};
|
||||||
use crate::{Error, FlowAuthHeader, FlowId};
|
|
||||||
|
|
||||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||||
///
|
///
|
||||||
@@ -76,105 +74,6 @@ impl<
|
|||||||
|
|
||||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||||
|
|
||||||
/// Statistics about running query on this frontend from flownode
|
|
||||||
#[derive(Debug, Default, Clone)]
|
|
||||||
struct FrontendStat {
|
|
||||||
/// The query for flow id has been running since this timestamp
|
|
||||||
since: HashMap<FlowId, Instant>,
|
|
||||||
/// The average query time for each flow id
|
|
||||||
/// This is used to calculate the average query time for each flow id
|
|
||||||
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
|
||||||
pub struct FrontendStats {
|
|
||||||
/// The statistics for each flow id
|
|
||||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FrontendStats {
|
|
||||||
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
|
||||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
|
||||||
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
|
||||||
stat.since.insert(flow_id, Instant::now());
|
|
||||||
|
|
||||||
FrontendStatsGuard {
|
|
||||||
stats: self.stats.clone(),
|
|
||||||
frontend_addr: frontend_addr.to_string(),
|
|
||||||
cur: flow_id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// return frontend addrs sorted by load, from lightest to heaviest
|
|
||||||
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
|
||||||
pub fn sort_by_load(&self) -> Vec<String> {
|
|
||||||
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
|
||||||
let fe_load_factor = stats
|
|
||||||
.iter()
|
|
||||||
.map(|(node_addr, stat)| {
|
|
||||||
// total expected avg running time for all currently running queries
|
|
||||||
let total_expect_avg_run_time = stat
|
|
||||||
.since
|
|
||||||
.keys()
|
|
||||||
.map(|f| {
|
|
||||||
let (count, total_duration) =
|
|
||||||
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
|
||||||
if *count == 0 {
|
|
||||||
0.0
|
|
||||||
} else {
|
|
||||||
total_duration.as_secs_f64() / *count as f64
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.sum::<f64>();
|
|
||||||
let total_cur_running_time = stat
|
|
||||||
.since
|
|
||||||
.values()
|
|
||||||
.map(|since| since.elapsed().as_secs_f64())
|
|
||||||
.sum::<f64>();
|
|
||||||
(
|
|
||||||
node_addr.to_string(),
|
|
||||||
total_expect_avg_run_time + total_cur_running_time,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.sorted_by(|(_, load_a), (_, load_b)| {
|
|
||||||
load_a
|
|
||||||
.partial_cmp(load_b)
|
|
||||||
.unwrap_or(std::cmp::Ordering::Equal)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
debug!("Frontend load factor: {:?}", fe_load_factor);
|
|
||||||
for (node_addr, load) in &fe_load_factor {
|
|
||||||
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
|
||||||
.with_label_values(&[&node_addr.to_string()])
|
|
||||||
.observe(*load);
|
|
||||||
}
|
|
||||||
fe_load_factor
|
|
||||||
.into_iter()
|
|
||||||
.map(|(addr, _)| addr)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct FrontendStatsGuard {
|
|
||||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
|
||||||
frontend_addr: String,
|
|
||||||
cur: FlowId,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for FrontendStatsGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
|
||||||
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
|
||||||
if let Some(since) = stat.since.remove(&self.cur) {
|
|
||||||
let elapsed = since.elapsed();
|
|
||||||
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
|
||||||
*count += 1;
|
|
||||||
*total_duration += elapsed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A simple frontend client able to execute sql using grpc protocol
|
/// A simple frontend client able to execute sql using grpc protocol
|
||||||
///
|
///
|
||||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||||
@@ -184,7 +83,6 @@ pub enum FrontendClient {
|
|||||||
meta_client: Arc<MetaClient>,
|
meta_client: Arc<MetaClient>,
|
||||||
chnl_mgr: ChannelManager,
|
chnl_mgr: ChannelManager,
|
||||||
auth: Option<FlowAuthHeader>,
|
auth: Option<FlowAuthHeader>,
|
||||||
fe_stats: FrontendStats,
|
|
||||||
},
|
},
|
||||||
Standalone {
|
Standalone {
|
||||||
/// for the sake of simplicity still use grpc even in standalone mode
|
/// for the sake of simplicity still use grpc even in standalone mode
|
||||||
@@ -216,7 +114,6 @@ impl FrontendClient {
|
|||||||
ChannelManager::with_config(cfg)
|
ChannelManager::with_config(cfg)
|
||||||
},
|
},
|
||||||
auth,
|
auth,
|
||||||
fe_stats: Default::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -295,7 +192,6 @@ impl FrontendClient {
|
|||||||
meta_client: _,
|
meta_client: _,
|
||||||
chnl_mgr,
|
chnl_mgr,
|
||||||
auth,
|
auth,
|
||||||
fe_stats,
|
|
||||||
} = self
|
} = self
|
||||||
else {
|
else {
|
||||||
return UnexpectedSnafu {
|
return UnexpectedSnafu {
|
||||||
@@ -312,21 +208,8 @@ impl FrontendClient {
|
|||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_millis() as i64;
|
.as_millis() as i64;
|
||||||
let node_addrs_by_load = fe_stats.sort_by_load();
|
// shuffle the frontends to avoid always pick the same one
|
||||||
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
frontends.shuffle(&mut rng());
|
||||||
let addr2load = node_addrs_by_load
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(i, id)| (id.clone(), i + 1))
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
// sort frontends by load, from lightest to heaviest
|
|
||||||
frontends.sort_by(|(_, a), (_, b)| {
|
|
||||||
// if not even in stats, treat as 0 load since never been queried
|
|
||||||
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
|
||||||
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
|
||||||
load_a.cmp(load_b)
|
|
||||||
});
|
|
||||||
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
|
||||||
|
|
||||||
// found node with maximum last_activity_ts
|
// found node with maximum last_activity_ts
|
||||||
for (_, node_info) in frontends
|
for (_, node_info) in frontends
|
||||||
@@ -374,7 +257,6 @@ impl FrontendClient {
|
|||||||
create: CreateTableExpr,
|
create: CreateTableExpr,
|
||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
task: Option<&BatchingTask>,
|
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
self.handle(
|
self.handle(
|
||||||
Request::Ddl(api::v1::DdlRequest {
|
Request::Ddl(api::v1::DdlRequest {
|
||||||
@@ -383,7 +265,6 @@ impl FrontendClient {
|
|||||||
catalog,
|
catalog,
|
||||||
schema,
|
schema,
|
||||||
&mut None,
|
&mut None,
|
||||||
task,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -395,19 +276,15 @@ impl FrontendClient {
|
|||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
peer_desc: &mut Option<PeerDesc>,
|
peer_desc: &mut Option<PeerDesc>,
|
||||||
task: Option<&BatchingTask>,
|
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
match self {
|
match self {
|
||||||
FrontendClient::Distributed { fe_stats, .. } => {
|
FrontendClient::Distributed { .. } => {
|
||||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||||
|
|
||||||
*peer_desc = Some(PeerDesc::Dist {
|
*peer_desc = Some(PeerDesc::Dist {
|
||||||
peer: db.peer.clone(),
|
peer: db.peer.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
|
||||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
|
||||||
|
|
||||||
db.database
|
db.database
|
||||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
|||||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -52,6 +53,11 @@ pub struct TaskState {
|
|||||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||||
/// Task handle
|
/// Task handle
|
||||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
|
||||||
|
/// min run interval in seconds
|
||||||
|
pub(crate) min_run_interval: Option<u64>,
|
||||||
|
/// max filter number per query
|
||||||
|
pub(crate) max_filter_num: Option<usize>,
|
||||||
}
|
}
|
||||||
impl TaskState {
|
impl TaskState {
|
||||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||||
@@ -63,6 +69,8 @@ impl TaskState {
|
|||||||
exec_state: ExecState::Idle,
|
exec_state: ExecState::Idle,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
task_handle: None,
|
task_handle: None,
|
||||||
|
min_run_interval: None,
|
||||||
|
max_filter_num: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,20 +95,17 @@ impl TaskState {
|
|||||||
pub fn get_next_start_query_time(
|
pub fn get_next_start_query_time(
|
||||||
&self,
|
&self,
|
||||||
flow_id: FlowId,
|
flow_id: FlowId,
|
||||||
time_window_size: &Option<Duration>,
|
_time_window_size: &Option<Duration>,
|
||||||
max_timeout: Option<Duration>,
|
max_timeout: Option<Duration>,
|
||||||
) -> Instant {
|
) -> Instant {
|
||||||
let last_duration = max_timeout
|
let next_duration = max_timeout
|
||||||
.unwrap_or(self.last_query_duration)
|
.unwrap_or(self.last_query_duration)
|
||||||
.min(self.last_query_duration)
|
.min(self.last_query_duration)
|
||||||
.max(MIN_REFRESH_DURATION);
|
.max(
|
||||||
|
self.min_run_interval
|
||||||
let next_duration = time_window_size
|
.map(|s| Duration::from_secs(s))
|
||||||
.map(|t| {
|
.unwrap_or(MIN_REFRESH_DURATION),
|
||||||
let half = t / 2;
|
);
|
||||||
half.max(last_duration)
|
|
||||||
})
|
|
||||||
.unwrap_or(last_duration);
|
|
||||||
|
|
||||||
// if have dirty time window, execute immediately to clean dirty time window
|
// if have dirty time window, execute immediately to clean dirty time window
|
||||||
if self.dirty_time_windows.windows.is_empty() {
|
if self.dirty_time_windows.windows.is_empty() {
|
||||||
@@ -246,6 +251,10 @@ impl DirtyTimeWindows {
|
|||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[flow_id.to_string().as_str()])
|
||||||
.observe(first_nth.len() as f64);
|
.observe(first_nth.len() as f64);
|
||||||
|
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||||
|
.with_label_values(&[flow_id.to_string().as_str()])
|
||||||
|
.observe(self.windows.len() as f64);
|
||||||
|
|
||||||
let full_time_range = first_nth
|
let full_time_range = first_nth
|
||||||
.iter()
|
.iter()
|
||||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||||
|
|||||||
@@ -144,6 +144,12 @@ impl BatchingTask {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||||
|
let mut state = self.state.write().unwrap();
|
||||||
|
state.min_run_interval = Some(min_run_interval_secs);
|
||||||
|
state.max_filter_num = Some(max_filter_num_per_query);
|
||||||
|
}
|
||||||
|
|
||||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||||
///
|
///
|
||||||
/// useful for flush_flow to flush dirty time windows range
|
/// useful for flush_flow to flush dirty time windows range
|
||||||
@@ -280,7 +286,7 @@ impl BatchingTask {
|
|||||||
let catalog = &self.config.sink_table_name[0];
|
let catalog = &self.config.sink_table_name[0];
|
||||||
let schema = &self.config.sink_table_name[1];
|
let schema = &self.config.sink_table_name[1];
|
||||||
frontend_client
|
frontend_client
|
||||||
.create(expr.clone(), catalog, schema, Some(self))
|
.create(expr.clone(), catalog, schema)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -361,7 +367,7 @@ impl BatchingTask {
|
|||||||
};
|
};
|
||||||
|
|
||||||
frontend_client
|
frontend_client
|
||||||
.handle(req, catalog, schema, &mut peer_desc, Some(self))
|
.handle(req, catalog, schema, &mut peer_desc)
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -580,19 +586,20 @@ impl BatchingTask {
|
|||||||
),
|
),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let expr = self
|
let expr = {
|
||||||
.state
|
let mut state = self.state.write().unwrap();
|
||||||
.write()
|
let max_window_cnt = state
|
||||||
.unwrap()
|
.max_filter_num
|
||||||
.dirty_time_windows
|
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||||
.gen_filter_exprs(
|
state.dirty_time_windows.gen_filter_exprs(
|
||||||
&col_name,
|
&col_name,
|
||||||
Some(l),
|
Some(l),
|
||||||
window_size,
|
window_size,
|
||||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
max_window_cnt,
|
||||||
self.config.flow_id,
|
self.config.flow_id,
|
||||||
Some(self),
|
Some(self),
|
||||||
)?;
|
)?
|
||||||
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Flow id={:?}, Generated filter expr: {:?}",
|
"Flow id={:?}, Generated filter expr: {:?}",
|
||||||
|
|||||||
@@ -42,6 +42,14 @@ lazy_static! {
|
|||||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||||
|
register_histogram_vec!(
|
||||||
|
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||||
|
"flow batching engine stalled query time window count",
|
||||||
|
&["flow_id"],
|
||||||
|
vec![0.0, 5., 10., 20., 40.]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||||
register_histogram_vec!(
|
register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_query_window_cnt",
|
"greptime_flow_batching_engine_query_window_cnt",
|
||||||
@@ -58,14 +66,6 @@ lazy_static! {
|
|||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
|
||||||
register_histogram_vec!(
|
|
||||||
"greptime_flow_batching_engine_guess_fe_load",
|
|
||||||
"flow batching engine guessed frontend load",
|
|
||||||
&["fe_addr"],
|
|
||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||||
|
|||||||
@@ -375,7 +375,7 @@ impl HeartbeatMailbox {
|
|||||||
|
|
||||||
/// Parses the [Instruction] from [MailboxMessage].
|
/// Parses the [Instruction] from [MailboxMessage].
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
|
pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
|
||||||
let Payload::Json(payload) =
|
let Payload::Json(payload) =
|
||||||
msg.payload
|
msg.payload
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use api::v1::flow::FlowRequestHeader;
|
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_function::handlers::FlowServiceHandler;
|
use common_function::handlers::FlowServiceHandler;
|
||||||
@@ -22,6 +22,7 @@ use common_query::error::Result;
|
|||||||
use common_telemetry::tracing_context::TracingContext;
|
use common_telemetry::tracing_context::TracingContext;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use serde_json::json;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
|
|||||||
) -> Result<api::v1::flow::FlowResponse> {
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
self.flush_inner(catalog, flow, ctx).await
|
self.flush_inner(catalog, flow, ctx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn adjust(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
flow: &str,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
|
self.adjust_inner(
|
||||||
|
catalog,
|
||||||
|
flow,
|
||||||
|
min_run_interval_secs,
|
||||||
|
max_filter_num_per_query,
|
||||||
|
ctx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowServiceOperator {
|
impl FlowServiceOperator {
|
||||||
|
async fn adjust_inner(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
flow: &str,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
|
let id = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flow_name_manager()
|
||||||
|
.get(catalog, flow)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?
|
||||||
|
.context(common_meta::error::FlowNotFoundSnafu {
|
||||||
|
flow_name: format!("{}.{}", catalog, flow),
|
||||||
|
})
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?
|
||||||
|
.flow_id();
|
||||||
|
|
||||||
|
let all_flownode_peers = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flow_route_manager()
|
||||||
|
.routes(id)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?;
|
||||||
|
|
||||||
|
// order of flownodes doesn't matter here
|
||||||
|
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||||
|
all_flownode_peers
|
||||||
|
.iter()
|
||||||
|
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||||
|
)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// TODO(discord9): use proper type for flow options
|
||||||
|
let options = json!({
|
||||||
|
"min_run_interval_secs": min_run_interval_secs,
|
||||||
|
"max_filter_num_per_query": max_filter_num_per_query,
|
||||||
|
});
|
||||||
|
|
||||||
|
for node in all_flow_nodes {
|
||||||
|
let _res = {
|
||||||
|
use api::v1::flow::{flow_request, FlowRequest};
|
||||||
|
let flush_req = FlowRequest {
|
||||||
|
header: Some(FlowRequestHeader {
|
||||||
|
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||||
|
query_context: Some(
|
||||||
|
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||||
|
flow_id: Some(api::v1::FlowId { id }),
|
||||||
|
options: options.to_string(),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
node.handle(flush_req)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(Default::default())
|
||||||
|
}
|
||||||
|
|
||||||
/// Flush the flownodes according to the flow id.
|
/// Flush the flownodes according to the flow id.
|
||||||
async fn flush_inner(
|
async fn flush_inner(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -25,10 +25,7 @@ use common_datasource::object_store::{build_backend, parse_url};
|
|||||||
use common_datasource::util::find_dir_and_filename;
|
use common_datasource::util::find_dir_and_filename;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||||
use common_recordbatch::{
|
use common_recordbatch::SendableRecordBatchStream;
|
||||||
map_json_type_to_string, map_json_type_to_string_schema, RecordBatchStream,
|
|
||||||
SendableRecordBatchMapper, SendableRecordBatchStream,
|
|
||||||
};
|
|
||||||
use common_telemetry::{debug, tracing};
|
use common_telemetry::{debug, tracing};
|
||||||
use datafusion::datasource::DefaultTableSource;
|
use datafusion::datasource::DefaultTableSource;
|
||||||
use datafusion_common::TableReference as DfTableReference;
|
use datafusion_common::TableReference as DfTableReference;
|
||||||
@@ -60,11 +57,6 @@ impl StatementExecutor {
|
|||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
|
let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
|
||||||
|
|
||||||
let stream = Box::pin(SendableRecordBatchMapper::new(
|
|
||||||
stream,
|
|
||||||
map_json_type_to_string,
|
|
||||||
map_json_type_to_string_schema,
|
|
||||||
));
|
|
||||||
match format {
|
match format {
|
||||||
Format::Csv(_) => stream_to_csv(
|
Format::Csv(_) => stream_to_csv(
|
||||||
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
|
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
|
||||||
|
|||||||
@@ -187,42 +187,24 @@ impl StatementExecutor {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Check if is creating logical table
|
||||||
if create_table.engine == METRIC_ENGINE_NAME
|
if create_table.engine == METRIC_ENGINE_NAME
|
||||||
&& create_table
|
&& create_table
|
||||||
.table_options
|
.table_options
|
||||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||||
{
|
{
|
||||||
// Create logical tables
|
return self
|
||||||
ensure!(
|
.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||||
partitions.is_none(),
|
|
||||||
InvalidPartitionRuleSnafu {
|
|
||||||
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
|
||||||
}
|
|
||||||
);
|
|
||||||
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
|
||||||
.await?
|
.await?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.next()
|
.next()
|
||||||
.context(error::UnexpectedSnafu {
|
.context(error::UnexpectedSnafu {
|
||||||
violated: "expected to create logical tables",
|
violated: "expected to create a logical table",
|
||||||
})
|
});
|
||||||
} else {
|
|
||||||
// Create other normal table
|
|
||||||
self.create_non_logic_table(create_table, partitions, query_ctx)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
|
||||||
pub async fn create_non_logic_table(
|
|
||||||
&self,
|
|
||||||
create_table: &mut CreateTableExpr,
|
|
||||||
partitions: Option<Partitions>,
|
|
||||||
query_ctx: QueryContextRef,
|
|
||||||
) -> Result<TableRef> {
|
|
||||||
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
||||||
|
|
||||||
// Check if schema exists
|
|
||||||
let schema = self
|
let schema = self
|
||||||
.table_metadata_manager
|
.table_metadata_manager
|
||||||
.schema_manager()
|
.schema_manager()
|
||||||
@@ -232,6 +214,7 @@ impl StatementExecutor {
|
|||||||
))
|
))
|
||||||
.await
|
.await
|
||||||
.context(TableMetadataManagerSnafu)?;
|
.context(TableMetadataManagerSnafu)?;
|
||||||
|
|
||||||
ensure!(
|
ensure!(
|
||||||
schema.is_some(),
|
schema.is_some(),
|
||||||
SchemaNotFoundSnafu {
|
SchemaNotFoundSnafu {
|
||||||
|
|||||||
@@ -569,7 +569,6 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Convert SQL value error"))]
|
#[snafu(display("Convert SQL value error"))]
|
||||||
ConvertSqlValue {
|
ConvertSqlValue {
|
||||||
source: datatypes::error::Error,
|
source: datatypes::error::Error,
|
||||||
|
|||||||
@@ -391,16 +391,13 @@ fn parse_string_options(parser: &mut Parser) -> std::result::Result<(String, Str
|
|||||||
parser.expect_token(&Token::Eq)?;
|
parser.expect_token(&Token::Eq)?;
|
||||||
let value = if parser.parse_keyword(Keyword::NULL) {
|
let value = if parser.parse_keyword(Keyword::NULL) {
|
||||||
"".to_string()
|
"".to_string()
|
||||||
|
} else if let Ok(v) = parser.parse_literal_string() {
|
||||||
|
v
|
||||||
} else {
|
} else {
|
||||||
let next_token = parser.peek_token();
|
return Err(ParserError::ParserError(format!(
|
||||||
if let Token::Number(number_as_string, _) = next_token.token {
|
"Unexpected option value for alter table statements, expect string literal or NULL, got: `{}`",
|
||||||
parser.advance_token();
|
parser.next_token()
|
||||||
number_as_string
|
)));
|
||||||
} else {
|
|
||||||
parser.parse_literal_string().map_err(|_|{
|
|
||||||
ParserError::ParserError(format!("Unexpected option value for alter table statements, expect string literal, numeric literal or NULL, got: `{}`", next_token))
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
Ok((name, value))
|
Ok((name, value))
|
||||||
}
|
}
|
||||||
@@ -1091,38 +1088,4 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_parse_alter_with_numeric_value() {
|
|
||||||
for sql in [
|
|
||||||
"ALTER TABLE test SET 'compaction.twcs.trigger_file_num'=8;",
|
|
||||||
"ALTER TABLE test SET 'compaction.twcs.trigger_file_num'='8';",
|
|
||||||
] {
|
|
||||||
let mut result = ParserContext::create_with_dialect(
|
|
||||||
sql,
|
|
||||||
&GreptimeDbDialect {},
|
|
||||||
ParseOptions::default(),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(1, result.len());
|
|
||||||
|
|
||||||
let statement = result.remove(0);
|
|
||||||
assert_matches!(statement, Statement::AlterTable { .. });
|
|
||||||
match statement {
|
|
||||||
Statement::AlterTable(alter_table) => {
|
|
||||||
let alter_operation = alter_table.alter_operation();
|
|
||||||
assert_matches!(alter_operation, AlterTableOperation::SetTableOptions { .. });
|
|
||||||
match alter_operation {
|
|
||||||
AlterTableOperation::SetTableOptions { options } => {
|
|
||||||
assert_eq!(options.len(), 1);
|
|
||||||
assert_eq!(options[0].key, "compaction.twcs.trigger_file_num");
|
|
||||||
assert_eq!(options[0].value, "8");
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -211,12 +211,6 @@ impl ColumnExtensions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Partition on columns or values.
|
|
||||||
///
|
|
||||||
/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
|
|
||||||
/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
|
|
||||||
/// `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
|
|
||||||
/// Each expression stands for a partition.
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||||
pub struct Partitions {
|
pub struct Partitions {
|
||||||
pub column_list: Vec<Ident>,
|
pub column_list: Vec<Ident>,
|
||||||
|
|||||||
@@ -1,21 +1,15 @@
|
|||||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
insert into
|
|
||||||
demo(host, cpu, memory, jsons, ts)
|
|
||||||
values
|
|
||||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
|
||||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
|
||||||
|
|
||||||
Affected Rows: 2
|
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo(host, cpu, memory, ts)
|
demo(host, cpu, memory, ts)
|
||||||
values
|
values
|
||||||
|
('host1', 66.6, 1024, 1655276557000),
|
||||||
|
('host2', 88.8, 333.3, 1655276558000),
|
||||||
('host3', 99.9, 444.4, 1722077263000);
|
('host3', 99.9, 444.4, 1722077263000);
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 3
|
||||||
|
|
||||||
Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv');
|
Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv');
|
||||||
|
|
||||||
@@ -38,44 +32,6 @@ select * from with_filename order by ts;
|
|||||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||||
+-------+------+--------+---------------------+
|
+-------+------+--------+---------------------+
|
||||||
|
|
||||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
|
||||||
|
|
||||||
Affected Rows: 3
|
|
||||||
|
|
||||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
|
||||||
|
|
||||||
+-------+------+--------+---------------------------------+---------------------+
|
|
||||||
| host | cpu | memory | json_to_string(with_json.jsons) | ts |
|
|
||||||
+-------+------+--------+---------------------------------+---------------------+
|
|
||||||
| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 |
|
|
||||||
| host3 | 99.9 | 444.4 | | 2024-07-27T10:47:43 |
|
|
||||||
+-------+------+--------+---------------------------------+---------------------+
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL MYSQL
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
| host | cpu | memory | jsons | ts |
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 |
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL POSTGRES
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
| host | cpu | memory | jsons | ts |
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 |
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
|
|
||||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -154,10 +110,6 @@ drop table with_filename;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
drop table with_json;
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
drop table with_path;
|
drop table with_path;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|||||||
@@ -1,14 +1,10 @@
|
|||||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
insert into
|
|
||||||
demo(host, cpu, memory, jsons, ts)
|
|
||||||
values
|
|
||||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
|
||||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo(host, cpu, memory, ts)
|
demo(host, cpu, memory, ts)
|
||||||
values
|
values
|
||||||
|
('host1', 66.6, 1024, 1655276557000),
|
||||||
|
('host2', 88.8, 333.3, 1655276558000),
|
||||||
('host3', 99.9, 444.4, 1722077263000);
|
('host3', 99.9, 444.4, 1722077263000);
|
||||||
|
|
||||||
Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv');
|
Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv');
|
||||||
@@ -19,18 +15,6 @@ Copy with_filename FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format=
|
|||||||
|
|
||||||
select * from with_filename order by ts;
|
select * from with_filename order by ts;
|
||||||
|
|
||||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
|
||||||
|
|
||||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
|
||||||
|
|
||||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL MYSQL
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL POSTGRES
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||||
|
|
||||||
Copy with_path FROM '${SQLNESS_HOME}/demo/export/csv/' with (format='csv', start_time='2023-06-15 07:02:37');
|
Copy with_path FROM '${SQLNESS_HOME}/demo/export/csv/' with (format='csv', start_time='2023-06-15 07:02:37');
|
||||||
@@ -59,8 +43,6 @@ drop table demo;
|
|||||||
|
|
||||||
drop table with_filename;
|
drop table with_filename;
|
||||||
|
|
||||||
drop table with_json;
|
|
||||||
|
|
||||||
drop table with_path;
|
drop table with_path;
|
||||||
|
|
||||||
drop table with_pattern;
|
drop table with_pattern;
|
||||||
|
|||||||
@@ -1,21 +1,15 @@
|
|||||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
insert into
|
|
||||||
demo(host, cpu, memory, jsons, ts)
|
|
||||||
values
|
|
||||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
|
||||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
|
||||||
|
|
||||||
Affected Rows: 2
|
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo(host, cpu, memory, ts)
|
demo(host, cpu, memory, ts)
|
||||||
values
|
values
|
||||||
|
('host1', 66.6, 1024, 1655276557000),
|
||||||
|
('host2', 88.8, 333.3, 1655276558000),
|
||||||
('host3', 99.9, 444.4, 1722077263000);
|
('host3', 99.9, 444.4, 1722077263000);
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 3
|
||||||
|
|
||||||
Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||||
|
|
||||||
@@ -38,44 +32,6 @@ select * from with_filename order by ts;
|
|||||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||||
+-------+------+--------+---------------------+
|
+-------+------+--------+---------------------+
|
||||||
|
|
||||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
|
||||||
|
|
||||||
Affected Rows: 3
|
|
||||||
|
|
||||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
|
||||||
|
|
||||||
+-------+------+--------+---------------------------------+---------------------+
|
|
||||||
| host | cpu | memory | json_to_string(with_json.jsons) | ts |
|
|
||||||
+-------+------+--------+---------------------------------+---------------------+
|
|
||||||
| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 |
|
|
||||||
| host3 | 99.9 | 444.4 | | 2024-07-27T10:47:43 |
|
|
||||||
+-------+------+--------+---------------------------------+---------------------+
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL MYSQL
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
| host | cpu | memory | jsons | ts |
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 |
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL POSTGRES
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
| host | cpu | memory | jsons | ts |
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 |
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
|
|
||||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -154,10 +110,6 @@ drop table with_filename;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
drop table with_json;
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
drop table with_path;
|
drop table with_path;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|||||||
@@ -1,14 +1,10 @@
|
|||||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
insert into
|
|
||||||
demo(host, cpu, memory, jsons, ts)
|
|
||||||
values
|
|
||||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
|
||||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo(host, cpu, memory, ts)
|
demo(host, cpu, memory, ts)
|
||||||
values
|
values
|
||||||
|
('host1', 66.6, 1024, 1655276557000),
|
||||||
|
('host2', 88.8, 333.3, 1655276558000),
|
||||||
('host3', 99.9, 444.4, 1722077263000);
|
('host3', 99.9, 444.4, 1722077263000);
|
||||||
|
|
||||||
Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||||
@@ -19,18 +15,6 @@ Copy with_filename FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (forma
|
|||||||
|
|
||||||
select * from with_filename order by ts;
|
select * from with_filename order by ts;
|
||||||
|
|
||||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
|
||||||
|
|
||||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
|
||||||
|
|
||||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL MYSQL
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL POSTGRES
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||||
|
|
||||||
Copy with_path FROM '${SQLNESS_HOME}/demo/export/json/' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39');
|
Copy with_path FROM '${SQLNESS_HOME}/demo/export/json/' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39');
|
||||||
@@ -59,8 +43,6 @@ drop table demo;
|
|||||||
|
|
||||||
drop table with_filename;
|
drop table with_filename;
|
||||||
|
|
||||||
drop table with_json;
|
|
||||||
|
|
||||||
drop table with_path;
|
drop table with_path;
|
||||||
|
|
||||||
drop table with_pattern;
|
drop table with_pattern;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
@@ -6,20 +6,14 @@ CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time in
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
insert into
|
|
||||||
demo(host, cpu, memory, jsons, ts)
|
|
||||||
values
|
|
||||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
|
||||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
|
||||||
|
|
||||||
Affected Rows: 2
|
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo(host, cpu, memory, ts)
|
demo(host, cpu, memory, ts)
|
||||||
values
|
values
|
||||||
|
('host1', 66.6, 1024, 1655276557000),
|
||||||
|
('host2', 88.8, 333.3, 1655276558000),
|
||||||
('host3', 111.1, 444.4, 1722077263000);
|
('host3', 111.1, 444.4, 1722077263000);
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 3
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo_2(host, cpu, memory, ts)
|
demo_2(host, cpu, memory, ts)
|
||||||
@@ -76,44 +70,6 @@ select * from with_path order by ts;
|
|||||||
| host6 | 222.2 | 555.5 | 2024-07-27T10:47:44 |
|
| host6 | 222.2 | 555.5 | 2024-07-27T10:47:44 |
|
||||||
+-------+-------+--------+---------------------+
|
+-------+-------+--------+---------------------+
|
||||||
|
|
||||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
|
|
||||||
|
|
||||||
Affected Rows: 3
|
|
||||||
|
|
||||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
|
||||||
|
|
||||||
+-------+-------+--------+---------------------------------+---------------------+
|
|
||||||
| host | cpu | memory | json_to_string(with_json.jsons) | ts |
|
|
||||||
+-------+-------+--------+---------------------------------+---------------------+
|
|
||||||
| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 |
|
|
||||||
| host3 | 111.1 | 444.4 | | 2024-07-27T10:47:43 |
|
|
||||||
+-------+-------+--------+---------------------------------+---------------------+
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL MYSQL
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
| host | cpu | memory | jsons | ts |
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 |
|
|
||||||
+-------+------+--------+------------------------+---------------------+
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL POSTGRES
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
| host | cpu | memory | jsons | ts |
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 |
|
|
||||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 |
|
|
||||||
+-------+------+--------+------------------------+----------------------------+
|
|
||||||
|
|
||||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -215,10 +171,6 @@ drop table with_filename;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
drop table with_json;
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
drop table with_path;
|
drop table with_path;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|||||||
@@ -1,16 +1,12 @@
|
|||||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index);
|
CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||||
|
|
||||||
insert into
|
|
||||||
demo(host, cpu, memory, jsons, ts)
|
|
||||||
values
|
|
||||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
|
||||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
demo(host, cpu, memory, ts)
|
demo(host, cpu, memory, ts)
|
||||||
values
|
values
|
||||||
|
('host1', 66.6, 1024, 1655276557000),
|
||||||
|
('host2', 88.8, 333.3, 1655276558000),
|
||||||
('host3', 111.1, 444.4, 1722077263000);
|
('host3', 111.1, 444.4, 1722077263000);
|
||||||
|
|
||||||
insert into
|
insert into
|
||||||
@@ -36,18 +32,6 @@ Copy with_path FROM '${SQLNESS_HOME}/demo/export/parquet_files/';
|
|||||||
|
|
||||||
select * from with_path order by ts;
|
select * from with_path order by ts;
|
||||||
|
|
||||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
|
||||||
|
|
||||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
|
|
||||||
|
|
||||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL MYSQL
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
-- SQLNESS PROTOCOL POSTGRES
|
|
||||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
|
||||||
|
|
||||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||||
|
|
||||||
Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/parquet_files/' WITH (PATTERN = 'demo.*', start_time='2022-06-15 07:02:39');
|
Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/parquet_files/' WITH (PATTERN = 'demo.*', start_time='2022-06-15 07:02:39');
|
||||||
@@ -86,8 +70,6 @@ drop table demo_2;
|
|||||||
|
|
||||||
drop table with_filename;
|
drop table with_filename;
|
||||||
|
|
||||||
drop table with_json;
|
|
||||||
|
|
||||||
drop table with_path;
|
drop table with_path;
|
||||||
|
|
||||||
drop table with_pattern;
|
drop table with_pattern;
|
||||||
|
|||||||
@@ -1,15 +1,11 @@
|
|||||||
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX);
|
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, ts TIMESTAMP TIME INDEX);
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||||
|
|
||||||
Affected Rows: 2
|
Affected Rows: 2
|
||||||
|
|
||||||
insert into demo(host, cpu, memory, ts) values ('host3', 111.1, 444.4, 1722077263000);
|
|
||||||
|
|
||||||
Affected Rows: 1
|
|
||||||
|
|
||||||
COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 1
|
||||||
@@ -22,15 +18,15 @@ COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time=
|
|||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 1
|
||||||
|
|
||||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 1
|
||||||
|
|
||||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 1
|
||||||
|
|
||||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
||||||
|
|
||||||
Affected Rows: 1
|
Affected Rows: 1
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX);
|
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, ts TIMESTAMP TIME INDEX);
|
||||||
|
|
||||||
insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||||
|
|
||||||
insert into demo(host, cpu, memory, ts) values ('host3', 111.1, 444.4, 1722077263000);
|
|
||||||
|
|
||||||
COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
||||||
|
|
||||||
@@ -10,10 +8,10 @@ COPY demo TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', start_time='2
|
|||||||
|
|
||||||
COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
||||||
|
|
||||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
||||||
|
|
||||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
||||||
|
|
||||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
||||||
|
|
||||||
drop table demo;
|
drop table demo;
|
||||||
|
|||||||
@@ -1,81 +0,0 @@
|
|||||||
create table metric_engine_partition (
|
|
||||||
ts timestamp time index,
|
|
||||||
host string primary key,
|
|
||||||
cpu double,
|
|
||||||
)
|
|
||||||
partition on columns (host) (
|
|
||||||
host <= 'host1',
|
|
||||||
host > 'host1' and host <= 'host2',
|
|
||||||
host > 'host2'
|
|
||||||
)
|
|
||||||
engine = metric
|
|
||||||
with (
|
|
||||||
physical_metric_table = "true",
|
|
||||||
);
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
select count(*) from metric_engine_partition;
|
|
||||||
|
|
||||||
+----------+
|
|
||||||
| count(*) |
|
|
||||||
+----------+
|
|
||||||
| 0 |
|
|
||||||
+----------+
|
|
||||||
|
|
||||||
create table logical_table_1 (
|
|
||||||
ts timestamp time index,
|
|
||||||
host string primary key,
|
|
||||||
cpu double,
|
|
||||||
)
|
|
||||||
partition on columns (host) ()
|
|
||||||
engine = metric
|
|
||||||
with (
|
|
||||||
on_physical_table = "metric_engine_partition",
|
|
||||||
);
|
|
||||||
|
|
||||||
Error: 1004(InvalidArguments), Invalid partition rule: logical table in metric engine should not have partition rule, it will be inherited from physical table
|
|
||||||
|
|
||||||
create table logical_table_2 (
|
|
||||||
ts timestamp time index,
|
|
||||||
host string primary key,
|
|
||||||
cpu double,
|
|
||||||
)
|
|
||||||
engine = metric
|
|
||||||
with (
|
|
||||||
on_physical_table = "metric_engine_partition",
|
|
||||||
);
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
show create table logical_table_2;
|
|
||||||
|
|
||||||
+-----------------+-------------------------------------------------+
|
|
||||||
| Table | Create Table |
|
|
||||||
+-----------------+-------------------------------------------------+
|
|
||||||
| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( |
|
|
||||||
| | "cpu" DOUBLE NULL, |
|
|
||||||
| | "host" STRING NULL, |
|
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
|
||||||
| | TIME INDEX ("ts"), |
|
|
||||||
| | PRIMARY KEY ("host") |
|
|
||||||
| | ) |
|
|
||||||
| | PARTITION ON COLUMNS ("host") ( |
|
|
||||||
| | host <= 'host1', |
|
|
||||||
| | host > 'host2', |
|
|
||||||
| | host > 'host1' AND host <= 'host2' |
|
|
||||||
| | ) |
|
|
||||||
| | ENGINE=metric |
|
|
||||||
| | WITH( |
|
|
||||||
| | on_physical_table = 'metric_engine_partition' |
|
|
||||||
| | ) |
|
|
||||||
+-----------------+-------------------------------------------------+
|
|
||||||
|
|
||||||
drop table logical_table_2;
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
drop table metric_engine_partition;
|
|
||||||
|
|
||||||
Affected Rows: 0
|
|
||||||
|
|
||||||
@@ -1,43 +0,0 @@
|
|||||||
create table metric_engine_partition (
|
|
||||||
ts timestamp time index,
|
|
||||||
host string primary key,
|
|
||||||
cpu double,
|
|
||||||
)
|
|
||||||
partition on columns (host) (
|
|
||||||
host <= 'host1',
|
|
||||||
host > 'host1' and host <= 'host2',
|
|
||||||
host > 'host2'
|
|
||||||
)
|
|
||||||
engine = metric
|
|
||||||
with (
|
|
||||||
physical_metric_table = "true",
|
|
||||||
);
|
|
||||||
|
|
||||||
select count(*) from metric_engine_partition;
|
|
||||||
|
|
||||||
create table logical_table_1 (
|
|
||||||
ts timestamp time index,
|
|
||||||
host string primary key,
|
|
||||||
cpu double,
|
|
||||||
)
|
|
||||||
partition on columns (host) ()
|
|
||||||
engine = metric
|
|
||||||
with (
|
|
||||||
on_physical_table = "metric_engine_partition",
|
|
||||||
);
|
|
||||||
|
|
||||||
create table logical_table_2 (
|
|
||||||
ts timestamp time index,
|
|
||||||
host string primary key,
|
|
||||||
cpu double,
|
|
||||||
)
|
|
||||||
engine = metric
|
|
||||||
with (
|
|
||||||
on_physical_table = "metric_engine_partition",
|
|
||||||
);
|
|
||||||
|
|
||||||
show create table logical_table_2;
|
|
||||||
|
|
||||||
drop table logical_table_2;
|
|
||||||
|
|
||||||
drop table metric_engine_partition;
|
|
||||||
Reference in New Issue
Block a user