mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 06:20:38 +00:00
chore: bump version to 0.14.2 (#6032)
* chore: only retry when retry-able in flow (#5987) * chore: only retry when retry-able * chore: revert dbg change * refactor: per review * fix: check for available frontend first * docs: more explain&longer timeout&feat: more retry at every level&try send select 1 * fix: use `sql` method for "SELECT 1" * fix: also put recover flows in spawned task and a dead loop * test: update transient error in flow rebuild test * chore: sleep after sqlness sleep * chore: add a warning * chore: wait even more time after reboot * fix: sanitize_connection_string (#6012) * fix: disable recursion limit in prost (#6010) Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * ci: fix the bugs of release-dev-builder-images and add update-dev-builder-image-tag (#6009) * fix: the dev-builder release job is not triggered by merged event * ci: add update-dev-builder-image-tag * fix: always create mito engine (#6018) * fix: force streaming mode for instant source table (#6031) * fix: force streaming mode for instant source table * tests: sqlness test&refactor: get table * refactor: per review * chore: bump version to 0.14.2 --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: jeremyhi <jiachun_feng@proton.me> Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: zyy17 <zyylsxm@gmail.com> Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
@@ -37,7 +37,7 @@ use common_meta::rpc::ddl::{
|
||||
};
|
||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, info, tracing};
|
||||
use common_telemetry::{debug, info, tracing, warn};
|
||||
use common_time::Timezone;
|
||||
use datafusion_common::tree_node::TreeNodeVisitor;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
@@ -369,7 +369,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let flow_type = self
|
||||
.determine_flow_type(&expr.sql, query_context.clone())
|
||||
.determine_flow_type(&expr, query_context.clone())
|
||||
.await?;
|
||||
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
|
||||
|
||||
@@ -398,9 +398,49 @@ impl StatementExecutor {
|
||||
/// Determine the flow type based on the SQL query
|
||||
///
|
||||
/// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
|
||||
async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result<FlowType> {
|
||||
async fn determine_flow_type(
|
||||
&self,
|
||||
expr: &CreateFlowExpr,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<FlowType> {
|
||||
// first check if source table's ttl is instant, if it is, force streaming mode
|
||||
for src_table_name in &expr.source_table_names {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&src_table_name.catalog_name,
|
||||
&src_table_name.schema_name,
|
||||
&src_table_name.table_name,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&src_table_name.catalog_name,
|
||||
&src_table_name.schema_name,
|
||||
&src_table_name.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
// instant source table can only be handled by streaming mode
|
||||
if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
|
||||
warn!(
|
||||
"Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
|
||||
format_full_table_name(
|
||||
&src_table_name.catalog_name,
|
||||
&src_table_name.schema_name,
|
||||
&src_table_name.table_name
|
||||
),
|
||||
expr.flow_name
|
||||
);
|
||||
return Ok(FlowType::Streaming);
|
||||
}
|
||||
}
|
||||
|
||||
let engine = &self.query_engine;
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
|
||||
Reference in New Issue
Block a user