feat: CREATE OR REPLACE FLOW (#5001)

* feat: Replace flow

* refactor: better show create flow&tests: better check

* tests: sqlness result update

* tests: unit test for update

* refactor: cmp with raw bytes

* refactor: rename

* refactor: per review
This commit is contained in:
discord9
2024-11-19 16:44:57 +08:00
committed by GitHub
parent dbb3f2d98d
commit 4d8fe29ea8
12 changed files with 932 additions and 78 deletions

View File

@@ -28,6 +28,7 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -43,7 +44,7 @@ use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{FlowId, FlowPartitionId};
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
@@ -75,6 +76,7 @@ impl CreateFlowProcedure {
source_table_ids: vec![],
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
},
}
}
@@ -90,6 +92,7 @@ impl CreateFlowProcedure {
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
let create_if_not_exists = self.data.task.create_if_not_exists;
let or_replace = self.data.task.or_replace;
let flow_name_value = self
.context
@@ -98,16 +101,56 @@ impl CreateFlowProcedure {
.get(catalog_name, flow_name)
.await?;
if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
}
.fail();
}
if let Some(value) = flow_name_value {
ensure!(
create_if_not_exists,
create_if_not_exists || or_replace,
error::FlowAlreadyExistsSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
let flow_id = value.flow_id();
return Ok(Status::done_with_output(flow_id));
if create_if_not_exists {
info!("Flow already exists, flow_id: {}", flow_id);
return Ok(Status::done_with_output(flow_id));
}
let flow_id = value.flow_id();
let peers = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);
let flow_info_value = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await?;
ensure!(
flow_info_value.is_some(),
error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
self.data.prev_flow_info_value = flow_info_value;
}
// Ensures sink table doesn't exist.
@@ -128,7 +171,9 @@ impl CreateFlowProcedure {
}
self.collect_source_tables().await?;
self.allocate_flow_id().await?;
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
Ok(Status::executing(true))
@@ -153,7 +198,10 @@ impl CreateFlowProcedure {
.map_err(add_peer_context_if_needed(peer.clone()))
});
}
info!(
"Creating flow({:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers
);
join_all(create_flow)
.await
.into_iter()
@@ -170,18 +218,29 @@ impl CreateFlowProcedure {
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
let (flow_info, flow_routes) = (&self.data).into();
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
&& self.data.task.or_replace
{
self.context
.flow_metadata_manager
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
} else {
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
}
self.data.state = CreateFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}
async fn on_broadcast(&mut self) -> Result<Status> {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let ctx = Context {
@@ -192,10 +251,13 @@ impl CreateFlowProcedure {
.cache_invalidator
.invalidate(
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
})],
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.await?;
@@ -270,6 +332,9 @@ pub struct CreateFlowData {
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
}
impl From<&CreateFlowData> for CreateRequest {
@@ -284,9 +349,9 @@ impl From<&CreateFlowData> for CreateRequest {
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
// Always be true to ensure idempotent in case of retry
create_if_not_exists: true,
or_replace: true,
or_replace: value.task.or_replace,
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),

View File

@@ -655,10 +655,17 @@ async fn handle_create_flow_task(
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
if !create_flow_task.or_replace {
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
} else {
info!(
"Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
}
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),

View File

@@ -38,7 +38,7 @@ use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetadataKey};
use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
@@ -230,6 +230,102 @@ impl FlowMetadataManager {
Ok(())
}
/// Update metadata for flow and returns an error if old metadata IS NOT exists.
pub async fn update_flow_metadata(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
new_flow_info: &FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
self.flow_name_manager.build_update_txn(
&new_flow_info.catalog_name,
&new_flow_info.flow_name,
flow_id,
)?;
let (create_flow_txn, on_create_flow_failure) =
self.flow_info_manager
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
.collect(),
new_flow_info.source_table_ids(),
)?;
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
new_flow_info.catalog_name,
new_flow_info.flow_name,
flow_id,
txn.max_operations()
);
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_flow_name =
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name during the updating flow, flow_id: {flow_id}"
),
}
})?;
if remote_flow_flow_name.flow_id() != flow_id {
info!(
"Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
new_flow_info.catalog_name,
new_flow_info.flow_name,
flow_id,
remote_flow_flow_name.flow_id()
);
return error::UnexpectedSnafu {
err_msg: format!(
"Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
remote_flow_flow_name.flow_id(),
flow_id,
new_flow_info.catalog_name,
new_flow_info.flow_name,
),
}.fail();
}
let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow during the updating flow, flow_id: {flow_id}"
),
})?;
let op_name = "updating flow";
ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
}
Ok(())
}
fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
let source_table_ids = flow_value.source_table_ids();
let mut keys =
@@ -560,4 +656,222 @@ mod tests {
// Ensures all keys are deleted
assert!(mem_kv.is_empty())
}
#[tokio::test]
async fn test_update_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value(
"flow",
[(0, 1u64), (1, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
let new_flow_value = {
let mut tmp = flow_value.clone();
tmp.raw_sql = "new".to_string();
tmp
};
// Update flow instead
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&new_flow_value,
flow_routes.clone(),
)
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
routes,
vec![
(
FlowRouteKey::new(flow_id, 1),
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
FlowRouteKey::new(flow_id, 2),
FlowRouteValue {
peer: Peer::empty(2),
},
),
]
);
assert_eq!(got, new_flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 1, flow_id, 1),
TableFlowValue {
peer: Peer::empty(1)
}
),
(
TableFlowKey::new(table_id, 2, flow_id, 2),
TableFlowValue {
peer: Peer::empty(2)
}
)
]
);
}
}
#[tokio::test]
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
// update again with same flow id
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes.clone(),
)
.await
.unwrap();
// update again with wrong flow id, expected error
let err = flow_metadata_manager
.update_flow_metadata(
flow_id + 1,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes,
)
.await
.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
assert!(err
.to_string()
.contains("Reads different flow id when updating flow"));
}
#[tokio::test]
async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
// Creates again.
let another_sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "another_sink_table".to_string(),
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
};
let err = flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes.clone(),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("Reads the different value"),
"error: {:?}",
err
);
}
}

View File

@@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;
@@ -196,6 +196,19 @@ impl FlowInfoManager {
.transpose()
}
/// Returns the [FlowInfoValue] with original bytes of specified `flow_id`.
pub async fn get_raw(
&self,
flow_id: FlowId,
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
let key = FlowInfoKey::new(flow_id).to_bytes();
self.kv_backend
.get(&key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
/// Builds a create flow transaction.
/// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
@@ -215,6 +228,36 @@ impl FlowInfoManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
/// Builds a update flow transaction.
/// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`,
/// but the new value can be the same, so to allow replace operation to happen even when the value is the same.
/// Otherwise, the transaction will retrieve existing value and fail.
pub(crate) fn build_update_txn(
&self,
flow_id: FlowId,
current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
new_flow_value: &FlowInfoValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
)> {
let key = FlowInfoKey::new(flow_id).to_bytes();
let raw_value = new_flow_value.try_as_raw_value()?;
let prev_value = current_flow_value.get_raw_bytes();
let txn = Txn::new()
.when(vec![
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}
#[cfg(test)]

View File

@@ -26,7 +26,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
@@ -237,6 +237,37 @@ impl FlowNameManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Builds a update flow name transaction. Which doesn't change either the name or id, just checking if they are the same.
/// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied,
/// and both flow name and flow id is the same.
/// Otherwise, the transaction will retrieve existing value(and fail).
pub fn build_update_txn(
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let raw_value = flow_flow_name_value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::new(
raw_key.clone(),
CompareOp::Equal,
Some(raw_value),
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]

View File

@@ -50,7 +50,10 @@ use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu,
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
@@ -673,6 +676,21 @@ impl FlowWorkerManager {
}
}
/// The arguments to create a flow in [`FlowWorkerManager`].
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {
pub flow_id: FlowId,
pub sink_table_name: TableName,
pub source_table_ids: Vec<TableId>,
pub create_if_not_exists: bool,
pub or_replace: bool,
pub expire_after: Option<i64>,
pub comment: Option<String>,
pub sql: String,
pub flow_options: HashMap<String, String>,
pub query_ctx: Option<QueryContext>,
}
/// Create&Remove flow
impl FlowWorkerManager {
/// remove a flow by it's id
@@ -694,18 +712,48 @@ impl FlowWorkerManager {
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exists: bool,
expire_after: Option<i64>,
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment,
sql,
flow_options,
query_ctx,
} = args;
let already_exist = {
let mut flag = false;
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
@@ -717,7 +765,7 @@ impl FlowWorkerManager {
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in source_table_ids {
for source in &source_table_ids {
node_ctx
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
.await?;
@@ -726,7 +774,7 @@ impl FlowWorkerManager {
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
.await?;
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it

View File

@@ -28,7 +28,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::FlowWorkerManager;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -57,7 +57,7 @@ impl Flownode for FlowWorkerManager {
comment,
sql,
flow_options,
or_replace: _,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
@@ -66,20 +66,19 @@ impl Flownode for FlowWorkerManager {
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let ret = self
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
expire_after,
Some(comment),
sql,
flow_options,
query_ctx,
)
.await
.map_err(to_meta_err)?;
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql,
flow_options,
query_ctx,
};
let ret = self.create_flow(args).await.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret

View File

@@ -48,7 +48,7 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::FlowWorkerManagerRef;
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
@@ -355,23 +355,26 @@ impl FlownodeBuilder {
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
manager
.create_flow(
flow_id as _,
sink_table_name,
info.source_table_ids(),
true,
info.expire_after(),
Some(info.comment().clone()),
info.raw_sql().clone(),
info.options().clone(),
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
)
.await?;
let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
};
manager.create_flow(args).await?;
}
Ok(cnt)

View File

@@ -865,7 +865,9 @@ pub fn show_create_flow(
value: flow_val.sink_table_name().table_name.clone(),
quote_style: None,
}]),
or_replace: true,
// notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
// so we set `or_replace` to false.
or_replace: false,
if_not_exists: true,
expire_after: flow_val.expire_after(),
comment,

View File

@@ -259,7 +259,7 @@ SHOW CREATE FLOW filter_numbers_basic;
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic |
| filter_numbers_basic | CREATE FLOW IF NOT EXISTS filter_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -9,7 +9,9 @@ Affected Rows: 0
create table out_num_cnt_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX,
PRIMARY KEY(number),
);
Affected Rows: 0
@@ -32,7 +34,7 @@ SHOW CREATE FLOW filter_numbers_show;
+---------------------+------------------------------------------------------------+
| Flow | Create Flow |
+---------------------+------------------------------------------------------------+
| filter_numbers_show | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show |
| filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show |
| | SINK TO out_num_cnt_show |
| | AS SELECT number FROM numbers_input_show WHERE number > 10 |
+---------------------+------------------------------------------------------------+
@@ -67,6 +69,251 @@ SHOW FLOWS LIKE 'filter_numbers_show';
++
++
-- also test `CREATE OR REPLACE` and `IF NOT EXISTS`
-- (flow exists, replace, if not exists)=(false, false, false)
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 |
+---------------------+---------------+-------------------------------------------------------------+
-- this one should error out
-- (flow exists, replace, if not exists)=(true, false, false)
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15;
Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_show
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 |
+---------------------+---------------+-------------------------------------------------------------+
-- makesure it's not replaced in flownode
INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT number, ts FROM out_num_cnt_show;
+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+
-- after this one, the flow SHOULD NOT be replaced
-- (flow exists, replace, if not exists)=(true, false, true)
CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 |
+---------------------+---------------+-------------------------------------------------------------+
-- makesure it's not replaced in flownode
INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4);
Affected Rows: 4
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT number, ts FROM out_num_cnt_show;
+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| 11 | 1970-01-01T00:00:00.004 |
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+
-- after this, the flow SHOULD be replaced
-- (flow exists, replace, if not exists)=(true, true, false)
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 3 |
+---------------------+---------------+------------------------------------------------------------+
-- makesure it's replaced in flownode
INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4);
Affected Rows: 4
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT number, ts FROM out_num_cnt_show;
+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| 4 | 1970-01-01T00:00:00.002 |
| 10 | 1970-01-01T00:00:00.003 |
| 11 | 1970-01-01T00:00:00.004 |
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+
-- after this, the flow SHOULD error out since having both `replace` and `if not exists`
-- (flow exists, replace, if not exists)=(true, true, true)
CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0;
Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE`
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 3 |
+---------------------+---------------+------------------------------------------------------------+
DROP FLOW filter_numbers_show;
Affected Rows: 0
-- (flow exists, replace, if not exists)=(false, true, true)
CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1;
Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE`
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
++
++
DROP FLOW filter_numbers_show;
Error: 8001(FlowNotFound), Flow not found: greptime.filter_numbers_show
-- following always create since didn't exist
-- (flow exists, replace, if not exists)=(false, true, false)
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -2 |
+---------------------+---------------+-------------------------------------------------------------+
DROP FLOW filter_numbers_show;
Affected Rows: 0
-- (flow exists, replace, if not exists)=(false, false, true)
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 |
+---------------------+---------------+-------------------------------------------------------------+
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 |
+---------------------+---------------+-------------------------------------------------------------+
SELECT * FROM out_num_cnt_show;
+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| 4 | 1970-01-01T00:00:00.002 |
| 10 | 1970-01-01T00:00:00.003 |
| 11 | 1970-01-01T00:00:00.004 |
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+
INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3);
Affected Rows: 4
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT * FROM out_num_cnt_show;
+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| -2 | 1970-01-01T00:00:00.002 |
| -1 | 1970-01-01T00:00:00.003 |
| 4 | 1970-01-01T00:00:00.002 |
| 10 | 1970-01-01T00:00:00.003 |
| 11 | 1970-01-01T00:00:00.004 |
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+
DROP FLOW filter_numbers_show;
Affected Rows: 0
drop table out_num_cnt_show;
Affected Rows: 0

View File

@@ -6,7 +6,9 @@ CREATE TABLE numbers_input_show (
);
create table out_num_cnt_show (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX,
PRIMARY KEY(number),
);
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
@@ -27,6 +29,99 @@ SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS W
SHOW FLOWS LIKE 'filter_numbers_show';
-- also test `CREATE OR REPLACE` and `IF NOT EXISTS`
-- (flow exists, replace, if not exists)=(false, false, false)
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
-- this one should error out
-- (flow exists, replace, if not exists)=(true, false, false)
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
-- makesure it's not replaced in flownode
INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT number, ts FROM out_num_cnt_show;
-- after this one, the flow SHOULD NOT be replaced
-- (flow exists, replace, if not exists)=(true, false, true)
CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
-- makesure it's not replaced in flownode
INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT number, ts FROM out_num_cnt_show;
-- after this, the flow SHOULD be replaced
-- (flow exists, replace, if not exists)=(true, true, false)
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
-- makesure it's replaced in flownode
INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT number, ts FROM out_num_cnt_show;
-- after this, the flow SHOULD error out since having both `replace` and `if not exists`
-- (flow exists, replace, if not exists)=(true, true, true)
CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
DROP FLOW filter_numbers_show;
-- (flow exists, replace, if not exists)=(false, true, true)
CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
DROP FLOW filter_numbers_show;
-- following always create since didn't exist
-- (flow exists, replace, if not exists)=(false, true, false)
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
DROP FLOW filter_numbers_show;
-- (flow exists, replace, if not exists)=(false, false, true)
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
SELECT * FROM out_num_cnt_show;
INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT * FROM out_num_cnt_show;
DROP FLOW filter_numbers_show;
drop table out_num_cnt_show;
drop table numbers_input_show;