refactor: rename flow task to flow (#3833)

* refactor: rename to `MIN_USER_FLOW_ID`

* refactor: rename to `FLOW_ID_SEQ`

* refactor: rename to `flow_id_sequence`

* refactor: rename to `FlowMetadataManager`

* refactor: rename flow_task.rs to flow.rs

* refactor: rename to FlowInfoManager

* refactor: rename to FlowName

* refactor: rename to FlownodeFlow

* refactor: rename to TableFlow

* refactor: remove TASK

* refactor: rename to __flow

* refactor: rename to flow_id

* refactor: rename to flow_name

* refactor: update comments

* refactor: rename to flow_metadata_manager

* refactor: rename to flow_metadata_allocator

* refactor: rename to FlowMetadataAllocator

* refactor: rename task suffix

* refactor: rename FlowTask to FlowInfo

* refactor: rename FlowTaskScoped to FlowScoped

* refactor: rename FlowTaskId to FlowId

* chore: bump proto to b5412f7

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-04-29 22:02:52 +08:00
committed by GitHub
parent b493ea1b38
commit 701aba9cdb
32 changed files with 1113 additions and 1151 deletions

2
Cargo.lock generated
View File

@@ -3883,7 +3883,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aba235025ac5643c12bfdcefd656af11ad58ea8e#aba235025ac5643c12bfdcefd656af11ad58ea8e"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b5412f72257c18410fdccbb893fa5d245b846141#b5412f72257c18410fdccbb893fa5d245b846141"
dependencies = [
"prost 0.12.4",
"serde",

View File

@@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aba235025ac5643c12bfdcefd656af11ad58ea8e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -478,8 +478,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
None => "ddl.empty",
}
}

View File

@@ -18,14 +18,14 @@ use std::{fs, path};
use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
@@ -47,7 +47,7 @@ use frontend::server::Services;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
@@ -419,9 +419,9 @@ impl StartCommand {
.step(10)
.build(),
);
let flow_task_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_TASK_ID as u64)
let flow_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_ID as u64)
.step(10)
.build(),
);
@@ -431,14 +431,14 @@ impl StartCommand {
));
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_task_meta_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence),
);
let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
));
let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(),
@@ -446,8 +446,8 @@ impl StartCommand {
multi_cache_invalidator,
table_metadata_manager,
table_meta_allocator,
flow_task_metadata_manager,
flow_task_meta_allocator,
flow_metadata_manager,
flow_meta_allocator,
)
.await?;
@@ -480,8 +480,8 @@ impl StartCommand {
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
@@ -491,8 +491,8 @@ impl StartCommand {
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
},
procedure_manager,
true,

View File

@@ -19,9 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime";
pub const DEFAULT_SCHEMA_NAME: &str = "public";
pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private";
/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage.
/// Reserves [0,MIN_USER_FLOW_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_FLOW_TASK_ID: u32 = 1024;
pub const MIN_USER_FLOW_ID: u32 = 1024;
/// Reserves [0,MIN_USER_TABLE_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_TABLE_ID: u32 = 1024;

View File

@@ -19,10 +19,10 @@ use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef;
use crate::error::Result;
use crate::key::flow_task::FlowTaskMetadataManagerRef;
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
@@ -39,9 +39,9 @@ pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
pub mod flow_meta;
mod physical_table_metadata;
pub mod table_meta;
pub mod task_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
@@ -110,8 +110,8 @@ pub struct DdlContext {
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.
pub table_metadata_allocator: TableMetadataAllocatorRef,
/// Flow task metadata manager.
pub flow_task_metadata_manager: FlowTaskMetadataManagerRef,
/// Allocator for flow task metadata.
pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
/// Flow metadata manager.
pub flow_metadata_manager: FlowMetadataManagerRef,
/// Allocator for flow metadata.
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
}

View File

@@ -36,17 +36,17 @@ use super::utils::add_peer_context_if_needed;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::flow_task::flow_task_info::FlowTaskInfoValue;
use crate::key::FlowTaskId;
use crate::lock_key::{CatalogLock, FlowTaskNameLock, TableNameLock};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::FlowId;
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateFlowTask;
use crate::{metrics, ClusterId};
/// The procedure of flow task creation.
/// The procedure of flow creation.
pub struct CreateFlowProcedure {
pub context: DdlContext,
pub data: CreateFlowTaskData,
pub data: CreateFlowData,
}
impl CreateFlowProcedure {
@@ -56,13 +56,13 @@ impl CreateFlowProcedure {
pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self {
Self {
context,
data: CreateFlowTaskData {
data: CreateFlowData {
cluster_id,
task,
flow_task_id: None,
flow_id: None,
peers: vec![],
source_table_ids: vec![],
state: CreateFlowTaskState::CreateMetadata,
state: CreateFlowState::CreateMetadata,
},
}
}
@@ -76,21 +76,21 @@ impl CreateFlowProcedure {
async fn on_prepare(&mut self) -> Result<Status> {
self.check_creation().await?;
self.collect_source_tables().await?;
self.allocate_flow_task_id().await?;
self.data.state = CreateFlowTaskState::CreateFlows;
self.allocate_flow_id().await?;
self.data.state = CreateFlowState::CreateFlows;
Ok(Status::executing(true))
}
async fn on_flownode_create_flows(&mut self) -> Result<Status> {
// Safety: must be allocated.
let mut create_flow_task = Vec::with_capacity(self.data.peers.len());
let mut create_flow = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Create((&self.data).into())),
};
create_flow_task.push(async move {
create_flow.push(async move {
requester
.handle(request)
.await
@@ -98,29 +98,29 @@ impl CreateFlowProcedure {
});
}
join_all(create_flow_task)
join_all(create_flow)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.data.state = CreateFlowTaskState::CreateMetadata;
self.data.state = CreateFlowState::CreateMetadata;
Ok(Status::executing(true))
}
/// Creates flow task metadata.
/// Creates flow metadata.
///
/// Abort(not-retry):
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow task id must be allocated.
let flow_task_id = self.data.flow_task_id.unwrap();
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
self.context
.flow_task_metadata_manager
.create_flow_task_metadata(flow_task_id, (&self.data).into())
.flow_metadata_manager
.create_flow_metadata(flow_id, (&self.data).into())
.await?;
info!("Created flow task metadata for flow {flow_task_id}");
Ok(Status::done_with_output(flow_task_id))
info!("Created flow metadata for flow {flow_id}");
Ok(Status::done_with_output(flow_id))
}
}
@@ -133,14 +133,14 @@ impl Procedure for CreateFlowProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateFlowTaskState::Prepare => self.on_prepare().await,
CreateFlowTaskState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await,
CreateFlowState::Prepare => self.on_prepare().await,
CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}
@@ -151,7 +151,7 @@ impl Procedure for CreateFlowProcedure {
fn lock_key(&self) -> LockKey {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
LockKey::new(vec![
@@ -162,14 +162,14 @@ impl Procedure for CreateFlowProcedure {
&sink_table_name.catalog_name,
)
.into(),
FlowTaskNameLock::new(catalog_name, task_name).into(),
FlowNameLock::new(catalog_name, flow_name).into(),
])
}
}
/// The state of [CreateFlowTaskProcedure].
/// The state of [CreateFlowProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowTaskState {
pub enum CreateFlowState {
/// Prepares to create the flow.
Prepare,
/// Creates flows on the flownode.
@@ -180,22 +180,22 @@ pub enum CreateFlowTaskState {
/// The serializable data.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateFlowTaskData {
pub struct CreateFlowData {
pub(crate) cluster_id: ClusterId,
pub(crate) state: CreateFlowTaskState,
pub(crate) state: CreateFlowState,
pub(crate) task: CreateFlowTask,
pub(crate) flow_task_id: Option<FlowTaskId>,
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
}
impl From<&CreateFlowTaskData> for CreateRequest {
fn from(value: &CreateFlowTaskData) -> Self {
let flow_task_id = value.flow_task_id.unwrap();
impl From<&CreateFlowData> for CreateRequest {
fn from(value: &CreateFlowData) -> Self {
let flow_id = value.flow_id.unwrap();
let source_table_ids = &value.source_table_ids;
CreateRequest {
task_id: Some(api::v1::flow::TaskId { id: flow_task_id }),
flow_id: Some(api::v1::flow::TaskId { id: flow_id }),
source_table_ids: source_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })
@@ -206,21 +206,21 @@ impl From<&CreateFlowTaskData> for CreateRequest {
expire_when: value.task.expire_when.clone(),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
task_options: value.task.options.clone(),
flow_options: value.task.flow_options.clone(),
}
}
}
impl From<&CreateFlowTaskData> for FlowTaskInfoValue {
fn from(value: &CreateFlowTaskData) -> Self {
impl From<&CreateFlowData> for FlowInfoValue {
fn from(value: &CreateFlowData) -> Self {
let CreateFlowTask {
catalog_name,
task_name,
flow_name,
sink_table_name,
expire_when,
comment,
sql,
options,
flow_options: options,
..
} = value.task.clone();
@@ -231,12 +231,12 @@ impl From<&CreateFlowTaskData> for FlowTaskInfoValue {
.map(|(idx, peer)| (idx as u32, peer.id))
.collect::<BTreeMap<_, _>>();
FlowTaskInfoValue {
FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
task_name,
flow_name,
raw_sql: sql,
expire_when,
comment,

View File

@@ -24,20 +24,20 @@ impl CreateFlowProcedure {
/// - The sink table doesn't exist.
pub(crate) async fn check_creation(&self) -> Result<()> {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
// Ensures the task name doesn't exist.
let exists = self
.context
.flow_task_metadata_manager
.flow_task_name_manager()
.exists(catalog_name, task_name)
.flow_metadata_manager
.flow_name_manager()
.exists(catalog_name, flow_name)
.await?;
ensure!(
!exists,
error::TaskAlreadyExistsSnafu {
task_name: format!("{}.{}", catalog_name, task_name),
error::FlowAlreadyExistsSnafu {
flow_name: format!("{}.{}", catalog_name, flow_name),
}
);

View File

@@ -19,16 +19,16 @@ use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
impl CreateFlowProcedure {
/// Allocates the [FlowTaskId].
pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> {
// TODO(weny, ruihang): We don't support the partitions. It's always be 1, now.
/// Allocates the [FlowId].
pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
let partitions = 1;
let (flow_task_id, peers) = self
let (flow_id, peers) = self
.context
.flow_task_metadata_allocator
.flow_metadata_allocator
.create(partitions)
.await?;
self.data.flow_task_id = Some(flow_task_id);
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
Ok(())

View File

@@ -17,43 +17,43 @@ use std::sync::Arc;
use tonic::async_trait;
use crate::error::Result;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;
/// The reference of [FlowTaskMetadataAllocator].
pub type FlowTaskMetadataAllocatorRef = Arc<FlowTaskMetadataAllocator>;
/// The reference of [FlowMetadataAllocator].
pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
/// [FlowTaskMetadataAllocator] provides the ability of:
/// - [FlowTaskId] Allocation.
/// [FlowMetadataAllocator] provides the ability of:
/// - [FlowId] Allocation.
/// - [FlownodeId] Selection.
#[derive(Clone)]
pub struct FlowTaskMetadataAllocator {
flow_task_id_sequence: SequenceRef,
pub struct FlowMetadataAllocator {
flow_id_sequence: SequenceRef,
partition_peer_allocator: PartitionPeerAllocatorRef,
}
impl FlowTaskMetadataAllocator {
/// Returns the [FlowTaskMetadataAllocator] with [NoopPartitionPeerAllocator].
pub fn with_noop_peer_allocator(flow_task_id_sequence: SequenceRef) -> Self {
impl FlowMetadataAllocator {
/// Returns the [FlowMetadataAllocator] with [NoopPartitionPeerAllocator].
pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
Self {
flow_task_id_sequence,
flow_id_sequence,
partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator),
}
}
/// Allocates a the [FlowTaskId].
pub(crate) async fn allocate_flow_task_id(&self) -> Result<FlowTaskId> {
let flow_task_id = self.flow_task_id_sequence.next().await? as FlowTaskId;
Ok(flow_task_id)
/// Allocates a the [FlowId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}
/// Allocates the [FlowTaskId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec<Peer>)> {
let flow_task_id = self.allocate_flow_task_id().await?;
/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;
Ok((flow_task_id, peers))
Ok((flow_id, peers))
}
}

View File

@@ -616,7 +616,7 @@ async fn handle_create_flow_task(
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.task_name,
create_flow_task.catalog_name, create_flow_task.flow_name,
);
Ok(SubmitDdlTaskResponse {
@@ -756,11 +756,11 @@ mod tests {
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::flow_meta::FlowMetadataAllocator;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::task_meta::FlowTaskMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::DdlContext;
use crate::key::flow_task::FlowTaskMetadataManager;
use crate::key::flow::FlowMetadataManager;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
@@ -792,11 +792,10 @@ mod tests {
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
));
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let flow_task_metadata_allocator =
Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator(
Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
));
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));
@@ -807,8 +806,8 @@ mod tests {
cache_invalidator: Arc::new(DummyCacheInvalidator),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
},
procedure_manager.clone(),

View File

@@ -241,9 +241,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Task already exists: {}", task_name))]
TaskAlreadyExists {
task_name: String,
#[snafu(display("Flow already exists: {}", flow_name))]
FlowAlreadyExists {
flow_name: String,
location: Location,
},
@@ -511,7 +511,7 @@ impl ErrorExt for Error {
| InvalidEngineType { .. }
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| TaskAlreadyExists { .. }
| FlowAlreadyExists { .. }
| MismatchPrefix { .. }
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,

View File

@@ -36,16 +36,16 @@
//! - The value is a [TableNameValue] struct; it contains the table id.
//! - Used in the table name to table id lookup.
//!
//! 6. Flow task info key: `__flow_task/{catalog}/info/{flow_task_id}`
//! - Stores metadata of the flow task.
//! 6. Flow info key: `__flow/{catalog}/info/{flow_id}`
//! - Stores metadata of the flow.
//!
//! 7. Flow task name key: `__flow_task/{catalog}/name/{task_name}`
//! - Mapping {catalog}/{task_name} to {flow_task_id}
//! 7. Flow name key: `__flow/{catalog}/name/{flow_name}`
//! - Mapping {catalog}/{flow_name} to {flow_id}
//!
//! 8. Flownode task key: `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}`
//! - Mapping {flownode_id} to {flow_task_id}
//! 8. Flownode flow key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping {flownode_id} to {flow_id}
//!
//! 9. Table task key: `__table_task/{catalog}/source_table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}`
//! 9. Table flow key: `__table_flow/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
@@ -54,37 +54,35 @@
//!
//! To simplify the managers used in struct fields and function parameters, we define "unify"
//! table metadata manager: [TableMetadataManager]
//! and flow task metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager).
//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
//! It contains all the managers defined above. It's recommended to just use this manager only.
//!
//! The whole picture of flow task keys will be like this:
//! The whole picture of flow keys will be like this:
//!
//! __flow_task/
//! __flow/
//! {catalog}/
//! info/
//! {tsak_id}
//!
//! name/
//! {task_name}
//! {flow_name}
//!
//! flownode/
//! flownode_id/
//! {flownode_id}/
//! {task_id}/
//! {partition_id}
//! {flownode_id}/
//! {flow_id}/
//! {partition_id}
//!
//! source_table/
//! flow_task/
//! {table_id}/
//! {flownode_id}/
//! {task_id}/
//! {partition_id}
//! {table_id}/
//! {flownode_id}/
//! {flow_id}/
//! {partition_id}
pub mod catalog_name;
pub mod datanode_table;
/// TODO(weny):removes id.
#[allow(unused)]
pub mod flow_task;
pub mod flow;
pub mod schema_name;
pub mod scope;
pub mod table_info;
@@ -123,8 +121,8 @@ use table_name::{TableNameKey, TableNameManager, TableNameValue};
use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::datanode_table::RegionInfo;
use self::flow_task::flow_task_info::FlowTaskInfoValue;
use self::flow_task::flow_task_name::FlowTaskNameValue;
use self::flow::flow_info::FlowInfoValue;
use self::flow::flow_name::FlowNameValue;
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
@@ -159,10 +157,10 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
/// The id of flow task.
pub type FlowTaskId = u32;
/// The partition of flow task.
pub type FlowTaskPartitionId = u32;
/// The id of flow.
pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;
lazy_static! {
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
@@ -1054,8 +1052,8 @@ impl_table_meta_value! {
TableNameValue,
TableInfoValue,
DatanodeTableValue,
FlowTaskInfoValue,
FlowTaskNameValue
FlowInfoValue,
FlowNameValue
}
impl_optional_meta_value! {

View File

@@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod flow_task_info;
pub(crate) mod flow_task_name;
pub(crate) mod flownode_task;
pub(crate) mod table_task;
pub(crate) mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
use std::ops::Deref;
use std::sync::Arc;
@@ -23,26 +23,26 @@ use std::sync::Arc;
use common_telemetry::info;
use snafu::{ensure, OptionExt};
use self::flow_task_info::FlowTaskInfoValue;
use self::flow_info::FlowInfoValue;
use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow_task::flow_task_info::FlowTaskInfoManager;
use crate::key::flow_task::flow_task_name::FlowTaskNameManager;
use crate::key::flow_task::flownode_task::FlownodeTaskManager;
use crate::key::flow_task::table_task::TableTaskManager;
use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::scope::MetaKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
/// The key of `__flow_task/` scope.
/// The key of `__flow/` scope.
#[derive(Debug, PartialEq)]
pub struct FlowTaskScoped<T> {
pub struct FlowScoped<T> {
inner: T,
}
impl<T> Deref for FlowTaskScoped<T> {
impl<T> Deref for FlowScoped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
@@ -50,18 +50,18 @@ impl<T> Deref for FlowTaskScoped<T> {
}
}
impl<T> FlowTaskScoped<T> {
const PREFIX: &'static str = "__flow_task/";
impl<T> FlowScoped<T> {
const PREFIX: &'static str = "__flow/";
/// Returns a new [FlowTaskScoped] key.
pub fn new(inner: T) -> FlowTaskScoped<T> {
/// Returns a new [FlowScoped] key.
pub fn new(inner: T) -> FlowScoped<T> {
Self { inner }
}
}
impl<T: MetaKey<T>> MetaKey<FlowTaskScoped<T>> for FlowTaskScoped<T> {
impl<T: MetaKey<T>> MetaKey<FlowScoped<T>> for FlowScoped<T> {
fn to_bytes(&self) -> Vec<u8> {
let prefix = FlowTaskScoped::<T>::PREFIX.as_bytes();
let prefix = FlowScoped::<T>::PREFIX.as_bytes();
let inner = self.inner.to_bytes();
let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
bytes.extend(prefix);
@@ -69,8 +69,8 @@ impl<T: MetaKey<T>> MetaKey<FlowTaskScoped<T>> for FlowTaskScoped<T> {
bytes
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskScoped<T>> {
let prefix = FlowTaskScoped::<T>::PREFIX.as_bytes();
fn from_bytes(bytes: &[u8]) -> Result<FlowScoped<T>> {
let prefix = FlowScoped::<T>::PREFIX.as_bytes();
ensure!(
bytes.starts_with(prefix),
error::MismatchPrefixSnafu {
@@ -79,140 +79,134 @@ impl<T: MetaKey<T>> MetaKey<FlowTaskScoped<T>> for FlowTaskScoped<T> {
}
);
let inner = T::from_bytes(&bytes[prefix.len()..])?;
Ok(FlowTaskScoped { inner })
Ok(FlowScoped { inner })
}
}
pub type FlowTaskMetadataManagerRef = Arc<FlowTaskMetadataManager>;
pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
/// The manager of metadata, provides ability to:
/// - Create metadata of the task.
/// - Retrieve metadata of the task.
/// - Delete metadata of the task.
pub struct FlowTaskMetadataManager {
flow_task_info_manager: FlowTaskInfoManager,
flownode_task_manager: FlownodeTaskManager,
table_task_manager: TableTaskManager,
flow_task_name_manager: FlowTaskNameManager,
/// - Create metadata of the flow.
/// - Retrieve metadata of the flow.
/// - Delete metadata of the flow.
pub struct FlowMetadataManager {
flow_info_manager: FlowInfoManager,
flownode_flow_manager: FlownodeFlowManager,
table_flow_manager: TableFlowManager,
flow_name_manager: FlowNameManager,
kv_backend: KvBackendRef,
}
impl FlowTaskMetadataManager {
/// Returns a new [FlowTaskMetadataManager].
impl FlowMetadataManager {
/// Returns a new [FlowMetadataManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
flow_task_info_manager: FlowTaskInfoManager::new(kv_backend.clone()),
flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()),
flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()),
table_task_manager: TableTaskManager::new(kv_backend.clone()),
flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
flow_name_manager: FlowNameManager::new(kv_backend.clone()),
flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
table_flow_manager: TableFlowManager::new(kv_backend.clone()),
kv_backend,
}
}
/// Returns the [FlowTaskNameManager].
pub fn flow_task_name_manager(&self) -> &FlowTaskNameManager {
&self.flow_task_name_manager
/// Returns the [FlowNameManager].
pub fn flow_name_manager(&self) -> &FlowNameManager {
&self.flow_name_manager
}
/// Returns the [FlowTaskInfoManager].
pub fn flow_task_info_manager(&self) -> &FlowTaskInfoManager {
&self.flow_task_info_manager
/// Returns the [FlowManager].
pub fn flow_info_manager(&self) -> &FlowInfoManager {
&self.flow_info_manager
}
/// Returns the [FlownodeTaskManager].
pub fn flownode_task_manager(&self) -> &FlownodeTaskManager {
&self.flownode_task_manager
/// Returns the [FlownodeFlowManager].
pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
&self.flownode_flow_manager
}
/// Returns the [TableTaskManager].
pub fn table_task_manager(&self) -> &TableTaskManager {
&self.table_task_manager
/// Returns the [TableFlowManager].
pub fn table_flow_manager(&self) -> &TableFlowManager {
&self.table_flow_manager
}
/// Creates metadata for task and returns an error if different metadata exists.
pub async fn create_flow_task_metadata(
/// Creates metadata for flow and returns an error if different metadata exists.
pub async fn create_flow_metadata(
&self,
flow_task_id: FlowTaskId,
flow_task_value: FlowTaskInfoValue,
flow_id: FlowId,
flow_value: FlowInfoValue,
) -> Result<()> {
let (create_flow_task_name_txn, on_create_flow_task_name_failure) =
self.flow_task_name_manager.build_create_txn(
&flow_task_value.catalog_name,
&flow_task_value.task_name,
flow_task_id,
)?;
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
.flow_name_manager
.build_create_txn(&flow_value.catalog_name, &flow_value.flow_name, flow_id)?;
let (create_flow_task_txn, on_create_flow_task_failure) =
self.flow_task_info_manager.build_create_txn(
&flow_task_value.catalog_name,
flow_task_id,
&flow_task_value,
)?;
let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
&flow_value,
)?;
let create_flownode_task_txn = self.flownode_task_manager.build_create_txn(
&flow_task_value.catalog_name,
flow_task_id,
flow_task_value.flownode_ids().clone(),
let create_flownode_flow_txn = self.flownode_flow_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
flow_value.flownode_ids().clone(),
);
let create_table_task_txn = self.table_task_manager.build_create_txn(
&flow_task_value.catalog_name,
flow_task_id,
flow_task_value.flownode_ids().clone(),
flow_task_value.source_table_ids(),
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
flow_value.flownode_ids().clone(),
flow_value.source_table_ids(),
);
let txn = Txn::merge_all(vec![
create_flow_task_name_txn,
create_flow_task_txn,
create_flownode_task_txn,
create_table_task_txn,
create_flow_flow_name_txn,
create_flow_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow task {}.{}({}), with {} txn operations",
flow_task_value.catalog_name,
flow_task_value.task_name,
flow_task_id,
"Creating flow {}.{}({}), with {} txn operations",
flow_value.catalog_name,
flow_value.flow_name,
flow_id,
txn.max_operations()
);
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_task_name = on_create_flow_task_name_failure(&mut set)?
.with_context(||error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow task name during the creating flow task, flow_task_id: {flow_task_id}"
let remote_flow_flow_name =
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name during the creating flow, flow_id: {flow_id}"
),
}
})?;
if remote_flow_task_name.flow_task_id() != flow_task_id {
if remote_flow_flow_name.flow_id() != flow_id {
info!(
"Trying to create flow task {}.{}({}), but flow task({}) already exists",
flow_task_value.catalog_name,
flow_task_value.task_name,
flow_task_id,
remote_flow_task_name.flow_task_id()
"Trying to create flow {}.{}({}), but flow({}) already exists",
flow_value.catalog_name,
flow_value.flow_name,
flow_id,
remote_flow_flow_name.flow_id()
);
return error::TaskAlreadyExistsSnafu {
task_name: format!(
"{}.{}",
flow_task_value.catalog_name, flow_task_value.task_name
),
return error::FlowAlreadyExistsSnafu {
flow_name: format!("{}.{}", flow_value.catalog_name, flow_value.flow_name),
}
.fail();
}
let remote_flow_task = on_create_flow_task_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow task during the creating flow task, flow_task_id: {flow_task_id}"
),
}
})?;
let op_name = "creating flow task";
ensure_values!(*remote_flow_task, flow_task_value, op_name);
"Reads the empty flow during the creating flow, flow_id: {flow_id}"
),
})?;
let op_name = "creating flow";
ensure_values!(*remote_flow, flow_value, op_name);
}
Ok(())
@@ -227,7 +221,7 @@ mod tests {
use futures::TryStreamExt;
use super::*;
use crate::key::flow_task::table_task::TableTaskKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::scope::CatalogScoped;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::table_name::TableName;
@@ -251,19 +245,19 @@ mod tests {
#[test]
fn test_flow_scoped_to_bytes() {
let key = FlowTaskScoped::new(CatalogScoped::new(
let key = FlowScoped::new(CatalogScoped::new(
"my_catalog".to_string(),
MockKey {
inner: b"hi".to_vec(),
},
));
assert_eq!(b"__flow_task/my_catalog/hi".to_vec(), key.to_bytes());
assert_eq!(b"__flow/my_catalog/hi".to_vec(), key.to_bytes());
}
#[test]
fn test_flow_scoped_from_bytes() {
let bytes = b"__flow_task/my_catalog/hi";
let key = FlowTaskScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap();
let bytes = b"__flow/my_catalog/hi";
let key = FlowScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.inner.inner, b"hi".to_vec());
}
@@ -271,24 +265,24 @@ mod tests {
#[test]
fn test_flow_scoped_from_bytes_mismatch() {
let bytes = b"__table/my_catalog/hi";
let err = FlowTaskScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap_err();
let err = FlowScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap_err();
assert_matches!(err, error::Error::MismatchPrefix { .. });
}
#[tokio::test]
async fn test_create_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone());
let task_id = 10;
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "sink_table".to_string(),
};
let flow_task_value = FlowTaskInfoValue {
let flow_value = FlowInfoValue {
catalog_name: catalog_name.to_string(),
task_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name,
flownode_ids: [(0, 1u64)].into(),
@@ -298,42 +292,42 @@ mod tests {
options: Default::default(),
};
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
let got = flow_metadata_manager
.flow_task_info_manager()
.get(catalog_name, task_id)
.flow_info_manager()
.get(catalog_name, flow_id)
.await
.unwrap()
.unwrap();
assert_eq!(got, flow_task_value);
let tasks = flow_metadata_manager
.flownode_task_manager()
.tasks(catalog_name, 1)
assert_eq!(got, flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(catalog_name, 1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(tasks, vec![(task_id, 0)]);
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_task_manager()
.table_flow_manager()
.nodes(catalog_name, table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
nodes,
vec![TableTaskKey::new(
vec![TableFlowKey::new(
catalog_name.to_string(),
table_id,
1,
task_id,
flow_id,
0
)]
);
@@ -341,19 +335,19 @@ mod tests {
}
#[tokio::test]
async fn test_create_table_metadata_task_exists_err() {
async fn test_create_table_metadata_flow_exists_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv);
let task_id = 10;
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "sink_table".to_string(),
};
let flow_task_value = FlowTaskInfoValue {
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: sink_table_name.clone(),
flownode_ids: [(0, 1u64)].into(),
@@ -363,13 +357,13 @@ mod tests {
options: Default::default(),
};
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
let flow_task_value = FlowTaskInfoValue {
let flow_value = FlowInfoValue {
catalog_name: catalog_name.to_string(),
task_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name,
flownode_ids: [(0, 1u64)].into(),
@@ -379,26 +373,26 @@ mod tests {
options: Default::default(),
};
let err = flow_metadata_manager
.create_flow_task_metadata(task_id + 1, flow_task_value)
.create_flow_metadata(flow_id + 1, flow_value)
.await
.unwrap_err();
assert_matches!(err, error::Error::TaskAlreadyExists { .. });
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
}
#[tokio::test]
async fn test_create_table_metadata_unexpected_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv);
let task_id = 10;
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "sink_table".to_string(),
};
let flow_task_value = FlowTaskInfoValue {
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: sink_table_name.clone(),
flownode_ids: [(0, 1u64)].into(),
@@ -408,7 +402,7 @@ mod tests {
options: Default::default(),
};
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
@@ -417,9 +411,9 @@ mod tests {
schema_name: "my_schema".to_string(),
table_name: "another_sink_table".to_string(),
};
let flow_task_value = FlowTaskInfoValue {
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
@@ -429,7 +423,7 @@ mod tests {
options: Default::default(),
};
let err = flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value)
.create_flow_metadata(flow_id, flow_value)
.await
.unwrap_err();
assert!(err.to_string().contains("Reads the different value"));

View File

@@ -0,0 +1,212 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, TableMetaValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::table_name::TableName;
use crate::FlownodeId;
const FLOW_INFO_KEY_PREFIX: &str = "info";
lazy_static! {
static ref FLOW_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}
/// The key stores the metadata of the flow.
///
/// The layout: `__flow/{catalog}/info/{flow_id}`.
pub struct FlowInfoKey(FlowScoped<CatalogScoped<FlowInfoKeyInner>>);
impl MetaKey<FlowInfoKey> for FlowInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowInfoKey> {
Ok(FlowInfoKey(
FlowScoped::<CatalogScoped<FlowInfoKeyInner>>::from_bytes(bytes)?,
))
}
}
impl FlowInfoKey {
/// Returns the [FlowInfoKey].
pub fn new(catalog: String, flow_id: FlowId) -> FlowInfoKey {
let inner = FlowInfoKeyInner::new(flow_id);
FlowInfoKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the [FlowId].
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
}
/// The key of flow metadata.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowInfoKeyInner {
flow_id: FlowId,
}
impl FlowInfoKeyInner {
/// Returns a [FlowInfoKey] with the specified `flow_id`.
pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
FlowInfoKeyInner { flow_id }
}
}
impl MetaKey<FlowInfoKeyInner> for FlowInfoKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowInfoKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_INFO_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_id = captures[1].parse::<FlowId>().unwrap();
Ok(FlowInfoKeyInner { flow_id })
}
}
// The metadata of the flow.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowInfoValue {
/// The source tables used by the flow.
pub(crate) source_table_ids: Vec<TableId>,
/// The sink table used by the flow.
pub(crate) sink_table_name: TableName,
/// Which flow nodes this flow is running on.
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
/// The flow name.
pub(crate) flow_name: String,
/// The raw sql.
pub(crate) raw_sql: String,
/// The expr of expire.
pub(crate) expire_when: String,
/// The comment.
pub(crate) comment: String,
/// The options.
pub(crate) options: HashMap<String, String>,
}
impl FlowInfoValue {
/// Returns the `flownode_id`.
pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
&self.flownode_ids
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
}
}
/// The manager of [FlowInfoKey].
pub struct FlowInfoManager {
kv_backend: KvBackendRef,
}
impl FlowInfoManager {
/// Returns a new [FlowInfoManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Returns the [FlowInfoValue] of specified `flow_id`.
pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes();
self.kv_backend
.get(&key)
.await?
.map(|x| FlowInfoValue::try_from_raw_value(&x.value))
.transpose()
}
/// Builds a create flow transaction.
/// It is expected that the `__flow/{catalog}/info/{flow_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub(crate) fn build_create_txn(
&self,
catalog: &str,
flow_id: FlowId,
flow_value: &FlowInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>,
)> {
let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes();
let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let flow_info = FlowInfoKey::new("my_catalog".to_string(), 2);
assert_eq!(b"__flow/my_catalog/info/2".to_vec(), flow_info.to_bytes());
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/info/2".to_vec();
let key = FlowInfoKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_id(), 2);
}
}

View File

@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, TableMetaValue, NAME_PATTERN};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
const FLOW_NAME_KEY_PREFIX: &str = "name";
lazy_static! {
static ref FLOW_NAME_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap();
}
/// The key of mapping {flow_name} to [FlowId].
///
/// The layout: `__flow/{catalog}/name/{flow_name}`.
pub struct FlowNameKey(FlowScoped<CatalogScoped<FlowNameKeyInner>>);
impl FlowNameKey {
/// Returns the [FlowNameKey]
pub fn new(catalog: String, flow_name: String) -> FlowNameKey {
let inner = FlowNameKeyInner::new(flow_name);
FlowNameKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Return the `flow_name`
pub fn flow_name(&self) -> &str {
&self.0.flow_name
}
}
impl MetaKey<FlowNameKey> for FlowNameKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowNameKey> {
Ok(FlowNameKey(
FlowScoped::<CatalogScoped<FlowNameKeyInner>>::from_bytes(bytes)?,
))
}
}
/// The key of mapping name to [FlowId]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowNameKeyInner {
pub flow_name: String,
}
impl MetaKey<FlowNameKeyInner> for FlowNameKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_NAME_KEY_PREFIX}/{}", self.flow_name).into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowNameKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlowNameKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_NAME_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_name = captures[1].to_string();
Ok(FlowNameKeyInner { flow_name })
}
}
impl FlowNameKeyInner {
/// Returns a [FlowNameKeyInner].
pub fn new(flow_name: String) -> Self {
Self { flow_name }
}
}
/// The value of [FlowNameKey].
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowNameValue {
flow_id: FlowId,
}
impl FlowNameValue {
/// Returns a [FlowNameValue] with specified [FlowId].
pub fn new(flow_id: FlowId) -> Self {
Self { flow_id }
}
/// Returns the [FlowId]
pub fn flow_id(&self) -> FlowId {
self.flow_id
}
}
/// The manager of [FlowNameKey].
pub struct FlowNameManager {
kv_backend: KvBackendRef,
}
impl FlowNameManager {
/// Returns a new [FlowNameManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Returns the [FlowNameValue] of specified `catalog.flow`.
pub async fn get(&self, catalog: &str, flow: &str) -> Result<Option<FlowNameValue>> {
let key = FlowNameKey::new(catalog.to_string(), flow.to_string());
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| FlowNameValue::try_from_raw_value(&x.value))
.transpose()
}
/// Returns true if the `flow` exists.
pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
let key = FlowNameKey::new(catalog.to_string(), flow.to_string());
let raw_key = key.to_bytes();
self.kv_backend.exists(&raw_key).await
}
/// Builds a create flow name transaction.
/// It's expected that the `__flow/{catalog}/name/{flow_name}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub fn build_create_txn(
&self,
catalog: &str,
name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowNameValue>>>,
)> {
let key = FlowNameKey::new(catalog.to_string(), name.to_string());
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
flow_flow_name_value.try_as_raw_value()?,
);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string());
assert_eq!(b"__flow/my_catalog/name/my_task".to_vec(), key.to_bytes(),);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/name/my_task".to_vec();
let key = FlowNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "my_task");
}
}

View File

@@ -0,0 +1,251 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::FlownodeId;
lazy_static! {
static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOWNODE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";
/// The key of mapping [FlownodeId] to [FlowId].
///
/// The layout `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
pub struct FlownodeFlowKey(FlowScoped<CatalogScoped<FlownodeFlowKeyInner>>);
impl MetaKey<FlownodeFlowKey> for FlownodeFlowKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeFlowKey> {
Ok(FlownodeFlowKey(FlowScoped::<
CatalogScoped<FlownodeFlowKeyInner>,
>::from_bytes(bytes)?))
}
}
impl FlownodeFlowKey {
/// Returns a new [FlownodeFlowKey].
pub fn new(
catalog: String,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> FlownodeFlowKey {
let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
FlownodeFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
}
/// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes()),
);
FlowScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the [FlowId].
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
/// Returns the [FlownodeId].
pub fn flownode_id(&self) -> FlownodeId {
self.0.flownode_id
}
/// Returns the [PartitionId].
pub fn partition_id(&self) -> FlowPartitionId {
self.0.partition_id
}
}
/// The key of mapping [FlownodeId] to [FlowId].
pub struct FlownodeFlowKeyInner {
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
}
impl FlownodeFlowKeyInner {
/// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`.
pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
Self {
flownode_id,
flow_id,
partition_id,
}
}
fn prefix(flownode_id: FlownodeId) -> String {
format!("{}/{flownode_id}", FLOWNODE_FLOW_KEY_PREFIX)
}
/// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
fn range_start_key(flownode_id: FlownodeId) -> String {
format!("{}/", Self::prefix(flownode_id))
}
}
impl MetaKey<FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!(
"{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}",
self.flownode_id, self.flow_id, self.partition_id,
)
.into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeFlowKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlownodeFlowKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOWNODE_FLOW_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlownodeFlowKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
let flow_id = captures[2].parse::<FlowId>().unwrap();
let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();
Ok(FlownodeFlowKeyInner {
flownode_id,
flow_id,
partition_id,
})
}
}
/// The manager of [FlownodeFlowKey].
pub struct FlownodeFlowManager {
kv_backend: KvBackendRef,
}
/// Decodes `KeyValue` to [FlownodeFlowKey].
pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
FlownodeFlowKey::from_bytes(&kv.key)
}
impl FlownodeFlowManager {
/// Returns a new [FlownodeFlowManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,
catalog: &str,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
let start_key = FlownodeFlowKey::range_start_key(catalog.to_string(), flownode_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flownode_flow_key_decoder),
);
Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
}
/// Builds a create flownode flow transaction.
///
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_id: FlowId,
flownode_ids: I,
) -> Txn {
let txns = flownode_ids
.into_iter()
.map(|(partition_id, flownode_id)| {
let key =
FlownodeFlowKey::new(catalog.to_string(), flownode_id, flow_id, partition_id)
.to_bytes();
TxnOp::Put(key, vec![])
})
.collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]
mod tests {
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::scope::MetaKey;
#[test]
fn test_key_serialization() {
let flownode_flow = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0);
assert_eq!(
b"__flow/my_catalog/flownode/1/2/0".to_vec(),
flownode_flow.to_bytes()
);
let prefix = FlownodeFlowKey::range_start_key("my_catalog".to_string(), 1);
assert_eq!(b"__flow/my_catalog/flownode/1/".to_vec(), prefix);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/flownode/1/2/0".to_vec();
let key = FlownodeFlowKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_id(), 2);
assert_eq!(key.partition_id(), 0);
}
}

View File

@@ -21,9 +21,9 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::flow::FlowScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowTaskId, FlowTaskPartitionId};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
@@ -31,63 +31,63 @@ use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::FlownodeId;
const TABLE_TASK_KEY_PREFIX: &str = "source_table";
const TABLE_FLOW_KEY_PREFIX: &str = "source_table";
lazy_static! {
static ref TABLE_TASK_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
static ref TABLE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId].
/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TableTaskKeyInner {
struct TableFlowKeyInner {
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
flow_id: FlowId,
partition_id: FlowPartitionId,
}
/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId].
/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
///
/// The layout: `__flow_task/{catalog}/table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}`.
/// The layout: `__flow/{catalog}/table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`.
#[derive(Debug, PartialEq)]
pub struct TableTaskKey(FlowTaskScoped<CatalogScoped<TableTaskKeyInner>>);
pub struct TableFlowKey(FlowScoped<CatalogScoped<TableFlowKeyInner>>);
impl MetaKey<TableTaskKey> for TableTaskKey {
impl MetaKey<TableFlowKey> for TableFlowKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<TableTaskKey> {
Ok(TableTaskKey(FlowTaskScoped::<
CatalogScoped<TableTaskKeyInner>,
>::from_bytes(bytes)?))
fn from_bytes(bytes: &[u8]) -> Result<TableFlowKey> {
Ok(TableFlowKey(
FlowScoped::<CatalogScoped<TableFlowKeyInner>>::from_bytes(bytes)?,
))
}
}
impl TableTaskKey {
/// Returns a new [TableTaskKey].
impl TableFlowKey {
/// Returns a new [TableFlowKey].
pub fn new(
catalog: String,
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> TableTaskKey {
let inner = TableTaskKeyInner::new(table_id, flownode_id, flow_task_id, partition_id);
TableTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKey {
let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
TableFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
}
/// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`.
/// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
pub fn range_start_key(catalog: String, table_id: TableId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(TableTaskKeyInner::range_start_key(table_id).into_bytes()),
BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes()),
);
FlowTaskScoped::new(catalog_scoped_key).to_bytes()
FlowScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
@@ -100,9 +100,9 @@ impl TableTaskKey {
self.0.table_id
}
/// Returns the [FlowTaskId].
pub fn flow_task_id(&self) -> FlowTaskId {
self.0.flow_task_id
/// Returns the [FlowId].
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
/// Returns the [FlownodeId].
@@ -111,117 +111,117 @@ impl TableTaskKey {
}
/// Returns the [PartitionId].
pub fn partition_id(&self) -> FlowTaskPartitionId {
pub fn partition_id(&self) -> FlowPartitionId {
self.0.partition_id
}
}
impl TableTaskKeyInner {
/// Returns a new [TableTaskKey].
impl TableFlowKeyInner {
/// Returns a new [TableFlowKey].
fn new(
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> TableTaskKeyInner {
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKeyInner {
Self {
table_id,
flownode_id,
flow_task_id,
flow_id,
partition_id,
}
}
fn prefix(table_id: TableId) -> String {
format!("{}/{table_id}", TABLE_TASK_KEY_PREFIX)
format!("{}/{table_id}", TABLE_FLOW_KEY_PREFIX)
}
/// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`.
/// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
fn range_start_key(table_id: TableId) -> String {
format!("{}/", Self::prefix(table_id))
}
}
impl MetaKey<TableTaskKeyInner> for TableTaskKeyInner {
impl MetaKey<TableFlowKeyInner> for TableFlowKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!(
"{TABLE_TASK_KEY_PREFIX}/{}/{}/{}/{}",
self.table_id, self.flownode_id, self.flow_task_id, self.partition_id
"{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}",
self.table_id, self.flownode_id, self.flow_id, self.partition_id
)
.into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<TableTaskKeyInner> {
fn from_bytes(bytes: &[u8]) -> Result<TableFlowKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"TableTaskKeyInner '{}' is not a valid UTF8 string: {e}",
"TableFlowKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
TABLE_TASK_KEY_PATTERN
TABLE_FLOW_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid TableTaskKeyInner '{key}'"),
err_msg: format!("Invalid TableFlowKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let table_id = captures[1].parse::<TableId>().unwrap();
let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
let flow_task_id = captures[3].parse::<FlowTaskId>().unwrap();
let partition_id = captures[4].parse::<FlowTaskPartitionId>().unwrap();
Ok(TableTaskKeyInner::new(
let flow_id = captures[3].parse::<FlowId>().unwrap();
let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
Ok(TableFlowKeyInner::new(
table_id,
flownode_id,
flow_task_id,
flow_id,
partition_id,
))
}
}
/// Decodes `KeyValue` to [TableTaskKey].
pub fn table_task_decoder(kv: KeyValue) -> Result<TableTaskKey> {
TableTaskKey::from_bytes(&kv.key)
/// Decodes `KeyValue` to [TableFlowKey].
pub fn table_flow_decoder(kv: KeyValue) -> Result<TableFlowKey> {
TableFlowKey::from_bytes(&kv.key)
}
/// The manager of [TableTaskKey].
pub struct TableTaskManager {
/// The manager of [TableFlowKey].
pub struct TableFlowManager {
kv_backend: KvBackendRef,
}
impl TableTaskManager {
/// Returns a new [TableTaskManager].
impl TableFlowManager {
/// Returns a new [TableFlowManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Retrieves all [TableTaskKey]s of the specified `table_id`.
/// Retrieves all [TableFlowKey]s of the specified `table_id`.
pub fn nodes(
&self,
catalog: &str,
table_id: TableId,
) -> BoxStream<'static, Result<TableTaskKey>> {
let start_key = TableTaskKey::range_start_key(catalog.to_string(), table_id);
) -> BoxStream<'static, Result<TableFlowKey>> {
let start_key = TableFlowKey::range_start_key(catalog.to_string(), table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_task_decoder),
Arc::new(table_flow_decoder),
);
Box::pin(stream)
}
/// Builds a create table task transaction.
/// Builds a create table flow transaction.
///
/// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowTaskPartitionId, FlownodeId)>>(
/// Puts `__table_flow/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
flow_id: FlowId,
flownode_ids: I,
source_table_ids: &[TableId],
) -> Txn {
@@ -230,11 +230,11 @@ impl TableTaskManager {
.flat_map(|(partition_id, flownode_id)| {
source_table_ids.iter().map(move |table_id| {
TxnOp::Put(
TableTaskKey::new(
TableFlowKey::new(
catalog.to_string(),
*table_id,
flownode_id,
flow_task_id,
flow_id,
partition_id,
)
.to_bytes(),
@@ -254,26 +254,23 @@ mod tests {
#[test]
fn test_key_serialization() {
let table_task_key = TableTaskKey::new("my_catalog".to_string(), 1024, 1, 2, 0);
let table_flow_key = TableFlowKey::new("my_catalog".to_string(), 1024, 1, 2, 0);
assert_eq!(
b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec(),
table_task_key.to_bytes(),
);
let prefix = TableTaskKey::range_start_key("my_catalog".to_string(), 1024);
assert_eq!(
b"__flow_task/my_catalog/source_table/1024/".to_vec(),
prefix
b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(),
table_flow_key.to_bytes(),
);
let prefix = TableFlowKey::range_start_key("my_catalog".to_string(), 1024);
assert_eq!(b"__flow/my_catalog/source_table/1024/".to_vec(), prefix);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec();
let key = TableTaskKey::from_bytes(&bytes).unwrap();
let bytes = b"__flow/my_catalog/source_table/1024/1/2/0".to_vec();
let key = TableFlowKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.source_table_id(), 1024);
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_task_id(), 2);
assert_eq!(key.flow_id(), 2);
assert_eq!(key.partition_id(), 0);
}
}

View File

@@ -1,222 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, FlowTaskPartitionId, TableMetaValue,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::table_name::TableName;
use crate::FlownodeId;
const FLOW_TASK_INFO_KEY_PREFIX: &str = "info";
lazy_static! {
static ref FLOW_TASK_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_TASK_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}
/// The key stores the metadata of the task.
///
/// The layout: `__flow_task/{catalog}/info/{flow_task_id}`.
pub struct FlowTaskInfoKey(FlowTaskScoped<CatalogScoped<FlowTaskInfoKeyInner>>);
impl MetaKey<FlowTaskInfoKey> for FlowTaskInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskInfoKey> {
Ok(FlowTaskInfoKey(FlowTaskScoped::<
CatalogScoped<FlowTaskInfoKeyInner>,
>::from_bytes(bytes)?))
}
}
impl FlowTaskInfoKey {
/// Returns the [FlowTaskInfoKey].
pub fn new(catalog: String, flow_task_id: FlowTaskId) -> FlowTaskInfoKey {
let inner = FlowTaskInfoKeyInner::new(flow_task_id);
FlowTaskInfoKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the [FlowTaskId].
pub fn flow_task_id(&self) -> FlowTaskId {
self.0.flow_task_id
}
}
/// The key of flow task metadata.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowTaskInfoKeyInner {
flow_task_id: FlowTaskId,
}
impl FlowTaskInfoKeyInner {
/// Returns a [FlowTaskInfoKey] with the specified `flow_task_id`.
pub fn new(flow_task_id: FlowTaskId) -> FlowTaskInfoKeyInner {
FlowTaskInfoKeyInner { flow_task_id }
}
}
impl MetaKey<FlowTaskInfoKeyInner> for FlowTaskInfoKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_TASK_INFO_KEY_PREFIX}/{}", self.flow_task_id).into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskInfoKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlowTaskInfoKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_TASK_INFO_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlowTaskInfoKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_task_id = captures[1].parse::<FlowTaskId>().unwrap();
Ok(FlowTaskInfoKeyInner { flow_task_id })
}
}
// The metadata of the flow task.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowTaskInfoValue {
/// The source tables used by the task.
pub(crate) source_table_ids: Vec<TableId>,
/// The sink table used by the task.
pub(crate) sink_table_name: TableName,
/// Which flow nodes this task is running on.
pub(crate) flownode_ids: BTreeMap<FlowTaskPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
/// The task name.
pub(crate) task_name: String,
/// The raw sql.
pub(crate) raw_sql: String,
/// The expr of expire.
pub(crate) expire_when: String,
/// The comment.
pub(crate) comment: String,
/// The options.
pub(crate) options: HashMap<String, String>,
}
impl FlowTaskInfoValue {
/// Returns the `flownode_id`.
pub fn flownode_ids(&self) -> &BTreeMap<FlowTaskPartitionId, FlownodeId> {
&self.flownode_ids
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
}
}
/// The manager of [FlowTaskInfoKey].
pub struct FlowTaskInfoManager {
kv_backend: KvBackendRef,
}
impl FlowTaskInfoManager {
/// Returns a new [FlowTaskInfoManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Returns the [FlowTaskInfoValue] of specified `flow_task_id`.
pub async fn get(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
) -> Result<Option<FlowTaskInfoValue>> {
let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes();
self.kv_backend
.get(&key)
.await?
.map(|x| FlowTaskInfoValue::try_from_raw_value(&x.value))
.transpose()
}
/// Builds a create flow task transaction.
/// It is expected that the `__flow_task/{catalog}/info/{flow_task_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub(crate) fn build_create_txn(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
flow_task_value: &FlowTaskInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowTaskInfoValue>>>,
)> {
let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes();
let txn =
txn_helper::build_put_if_absent_txn(key.clone(), flow_task_value.try_as_raw_value()?);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let flow_task = FlowTaskInfoKey::new("my_catalog".to_string(), 2);
assert_eq!(
b"__flow_task/my_catalog/info/2".to_vec(),
flow_task.to_bytes()
);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/info/2".to_vec();
let key = FlowTaskInfoKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_task_id(), 2);
}
}

View File

@@ -1,208 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, TableMetaValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
const FLOW_TASK_NAME_KEY_PREFIX: &str = "name";
lazy_static! {
static ref FLOW_TASK_NAME_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_TASK_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap();
}
/// The key of mapping {task_name} to [FlowTaskId].
///
/// The layout: `__flow_task/{catalog}/name/{task_name}`.
pub struct FlowTaskNameKey(FlowTaskScoped<CatalogScoped<FlowTaskNameKeyInner>>);
impl FlowTaskNameKey {
/// Returns the [FlowTaskNameKey]
pub fn new(catalog: String, task_name: String) -> FlowTaskNameKey {
let inner = FlowTaskNameKeyInner::new(task_name);
FlowTaskNameKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Return the `task_name`
pub fn task_name(&self) -> &str {
&self.0.task_name
}
}
impl MetaKey<FlowTaskNameKey> for FlowTaskNameKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskNameKey> {
Ok(FlowTaskNameKey(FlowTaskScoped::<
CatalogScoped<FlowTaskNameKeyInner>,
>::from_bytes(bytes)?))
}
}
/// The key of mapping name to [FlowTaskId]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowTaskNameKeyInner {
pub task_name: String,
}
impl MetaKey<FlowTaskNameKeyInner> for FlowTaskNameKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_TASK_NAME_KEY_PREFIX}/{}", self.task_name).into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskNameKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlowTaskNameKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_TASK_NAME_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlowTaskNameKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let task = captures[1].to_string();
Ok(FlowTaskNameKeyInner { task_name: task })
}
}
impl FlowTaskNameKeyInner {
/// Returns a [FlowTaskNameKeyInner].
pub fn new(task: String) -> Self {
Self { task_name: task }
}
}
/// The value of [FlowTaskNameKey].
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowTaskNameValue {
flow_task_id: FlowTaskId,
}
impl FlowTaskNameValue {
/// Returns a [FlowTaskNameValue] with specified [FlowTaskId].
pub fn new(flow_task_id: FlowTaskId) -> Self {
Self { flow_task_id }
}
/// Returns the [FlowTaskId]
pub fn flow_task_id(&self) -> FlowTaskId {
self.flow_task_id
}
}
/// The manager of [FlowTaskNameKey].
pub struct FlowTaskNameManager {
kv_backend: KvBackendRef,
}
impl FlowTaskNameManager {
/// Returns a new [FlowTaskNameManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Returns the [FlowTaskNameValue] of specified `catalog.task`.
pub async fn get(&self, catalog: &str, task: &str) -> Result<Option<FlowTaskNameValue>> {
let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string());
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| FlowTaskNameValue::try_from_raw_value(&x.value))
.transpose()
}
/// Returns true if the `task` exists.
pub async fn exists(&self, catalog: &str, task: &str) -> Result<bool> {
let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string());
let raw_key = key.to_bytes();
self.kv_backend.exists(&raw_key).await
}
/// Builds a create flow task name transaction.
/// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub fn build_create_txn(
&self,
catalog: &str,
name: &str,
flow_task_id: FlowTaskId,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowTaskNameValue>>>,
)> {
let key = FlowTaskNameKey::new(catalog.to_string(), name.to_string());
let raw_key = key.to_bytes();
let flow_task_name_value = FlowTaskNameValue::new(flow_task_id);
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
flow_task_name_value.try_as_raw_value()?,
);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let table_task_key = FlowTaskNameKey::new("my_catalog".to_string(), "my_task".to_string());
assert_eq!(
b"__flow_task/my_catalog/name/my_task".to_vec(),
table_task_key.to_bytes(),
);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/name/my_task".to_vec();
let key = FlowTaskNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.task_name(), "my_task");
}
}

View File

@@ -1,259 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowTaskId, FlowTaskPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::FlownodeId;
lazy_static! {
static ref FLOWNODE_TASK_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOWNODE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
const FLOWNODE_TASK_KEY_PREFIX: &str = "flownode";
/// The key of mapping [FlownodeId] to [FlowTaskId].
///
/// The layout `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}`
pub struct FlownodeTaskKey(FlowTaskScoped<CatalogScoped<FlownodeTaskKeyInner>>);
impl MetaKey<FlownodeTaskKey> for FlownodeTaskKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeTaskKey> {
Ok(FlownodeTaskKey(FlowTaskScoped::<
CatalogScoped<FlownodeTaskKeyInner>,
>::from_bytes(bytes)?))
}
}
impl FlownodeTaskKey {
/// Returns a new [FlownodeTaskKey].
pub fn new(
catalog: String,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> FlownodeTaskKey {
let inner = FlownodeTaskKeyInner::new(flownode_id, flow_task_id, partition_id);
FlownodeTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`.
pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(FlownodeTaskKeyInner::range_start_key(flownode_id).into_bytes()),
);
FlowTaskScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the [FlowTaskId].
pub fn flow_task_id(&self) -> FlowTaskId {
self.0.flow_task_id
}
/// Returns the [FlownodeId].
pub fn flownode_id(&self) -> FlownodeId {
self.0.flownode_id
}
/// Returns the [PartitionId].
pub fn partition_id(&self) -> FlowTaskPartitionId {
self.0.partition_id
}
}
/// The key of mapping [FlownodeId] to [FlowTaskId].
pub struct FlownodeTaskKeyInner {
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
}
impl FlownodeTaskKeyInner {
/// Returns a [FlownodeTaskKey] with the specified `flownode_id`, `flow_task_id` and `partition_id`.
pub fn new(
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> Self {
Self {
flownode_id,
flow_task_id,
partition_id,
}
}
fn prefix(flownode_id: FlownodeId) -> String {
format!("{}/{flownode_id}", FLOWNODE_TASK_KEY_PREFIX)
}
/// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`.
fn range_start_key(flownode_id: FlownodeId) -> String {
format!("{}/", Self::prefix(flownode_id))
}
}
impl MetaKey<FlownodeTaskKeyInner> for FlownodeTaskKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!(
"{FLOWNODE_TASK_KEY_PREFIX}/{}/{}/{}",
self.flownode_id, self.flow_task_id, self.partition_id,
)
.into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeTaskKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlownodeTaskKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOWNODE_TASK_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlownodeTaskKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
let flow_task_id = captures[2].parse::<FlowTaskId>().unwrap();
let partition_id = captures[3].parse::<FlowTaskPartitionId>().unwrap();
Ok(FlownodeTaskKeyInner {
flownode_id,
flow_task_id,
partition_id,
})
}
}
/// The manager of [FlownodeTaskKey].
pub struct FlownodeTaskManager {
kv_backend: KvBackendRef,
}
/// Decodes `KeyValue` to [FlownodeTaskKey].
pub fn flownode_task_key_decoder(kv: KeyValue) -> Result<FlownodeTaskKey> {
FlownodeTaskKey::from_bytes(&kv.key)
}
impl FlownodeTaskManager {
/// Returns a new [FlownodeTaskManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Retrieves all [FlowTaskId] and [PartitionId]s of the specified `flownode_id`.
pub fn tasks(
&self,
catalog: &str,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowTaskId, FlowTaskPartitionId)>> {
let start_key = FlownodeTaskKey::range_start_key(catalog.to_string(), flownode_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flownode_task_key_decoder),
);
Box::pin(stream.map_ok(|key| (key.flow_task_id(), key.partition_id())))
}
/// Builds a create flownode task transaction.
///
/// Puts `__flownode_task/{flownode_id}/{flow_task_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowTaskPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
flownode_ids: I,
) -> Txn {
let txns = flownode_ids
.into_iter()
.map(|(partition_id, flownode_id)| {
let key = FlownodeTaskKey::new(
catalog.to_string(),
flownode_id,
flow_task_id,
partition_id,
)
.to_bytes();
TxnOp::Put(key, vec![])
})
.collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]
mod tests {
use crate::key::flow_task::flownode_task::FlownodeTaskKey;
use crate::key::scope::MetaKey;
#[test]
fn test_key_serialization() {
let flownode_task = FlownodeTaskKey::new("my_catalog".to_string(), 1, 2, 0);
assert_eq!(
b"__flow_task/my_catalog/flownode/1/2/0".to_vec(),
flownode_task.to_bytes()
);
let prefix = FlownodeTaskKey::range_start_key("my_catalog".to_string(), 1);
assert_eq!(b"__flow_task/my_catalog/flownode/1/".to_vec(), prefix);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/flownode/1/2/0".to_vec();
let key = FlownodeTaskKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_task_id(), 2);
assert_eq!(key.partition_id(), 0);
}
}

View File

@@ -22,7 +22,7 @@ const CATALOG_LOCK_PREFIX: &str = "__catalog_lock";
const SCHEMA_LOCK_PREFIX: &str = "__schema_lock";
const TABLE_LOCK_PREFIX: &str = "__table_lock";
const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
const FLOW_TASK_NAME_LOCK_PREFIX: &str = "__flow_task_name_lock";
const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
const REGION_LOCK_PREFIX: &str = "__region_lock";
/// [CatalogLock] acquires the lock on the tenant level.
@@ -111,28 +111,28 @@ impl From<TableNameLock> for StringKey {
}
}
/// [FlowTaskNameLock] prevents any procedures trying to create a flow task named it.
pub enum FlowTaskNameLock {
/// [FlowNameLock] prevents any procedures trying to create a flow named it.
pub enum FlowNameLock {
Write(String),
}
impl FlowTaskNameLock {
impl FlowNameLock {
pub fn new(catalog: &str, table: &str) -> Self {
Self::Write(format!("{catalog}.{table}"))
}
}
impl Display for FlowTaskNameLock {
impl Display for FlowNameLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let FlowTaskNameLock::Write(name) = self;
write!(f, "{}/{}", FLOW_TASK_NAME_LOCK_PREFIX, name)
let FlowNameLock::Write(name) = self;
write!(f, "{}/{}", FLOW_NAME_LOCK_PREFIX, name)
}
}
impl From<FlowTaskNameLock> for StringKey {
fn from(value: FlowTaskNameLock) -> Self {
impl From<FlowNameLock> for StringKey {
fn from(value: FlowNameLock) -> Self {
match value {
FlowTaskNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
FlowNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
}
}
}

View File

@@ -39,9 +39,9 @@ lazy_static! {
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_create_flow_task",
"meta procedure create flow task",
pub static ref METRIC_META_PROCEDURE_CREATE_FLOW: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_create_flow",
"meta procedure create flow",
&["step"]
)
.unwrap();

View File

@@ -26,8 +26,8 @@ use api::v1::meta::{
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowTaskExpr, CreateTableExpr, DropDatabaseExpr,
DropFlowTaskExpr, DropTableExpr, TruncateTableExpr,
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr,
DropTableExpr, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
@@ -729,11 +729,11 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
}
}
/// Create flow task
/// Create flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateFlowTask {
pub catalog_name: String,
pub task_name: String,
pub flow_name: String,
pub source_table_names: Vec<TableName>,
pub sink_table_name: TableName,
pub or_replace: bool,
@@ -741,16 +741,16 @@ pub struct CreateFlowTask {
pub expire_when: String,
pub comment: String,
pub sql: String,
pub options: HashMap<String, String>,
pub flow_options: HashMap<String, String>,
}
impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
type Error = error::Error;
fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
let CreateFlowTaskExpr {
let CreateFlowExpr {
catalog_name,
task_name,
flow_name,
source_table_names,
sink_table_name,
or_replace,
@@ -758,14 +758,14 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
expire_when,
comment,
sql,
task_options,
} = pb.create_flow_task.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create_flow_task",
flow_options,
} = pb.create_flow.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create_flow",
})?;
Ok(CreateFlowTask {
catalog_name,
task_name,
flow_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: sink_table_name
.context(error::InvalidProtoMsgSnafu {
@@ -777,7 +777,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
expire_when,
comment,
sql,
options: task_options,
flow_options,
})
}
}
@@ -786,7 +786,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
fn from(
CreateFlowTask {
catalog_name,
task_name,
flow_name,
source_table_names,
sink_table_name,
or_replace,
@@ -794,13 +794,13 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
expire_when,
comment,
sql,
options,
flow_options,
}: CreateFlowTask,
) -> Self {
PbCreateFlowTask {
create_flow_task: Some(CreateFlowTaskExpr {
create_flow: Some(CreateFlowExpr {
catalog_name,
task_name,
flow_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: Some(sink_table_name.into()),
or_replace,
@@ -808,31 +808,31 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
expire_when,
comment,
sql,
task_options: options,
flow_options,
}),
}
}
}
/// Drop flow task
/// Drop flow
pub struct DropFlowTask {
pub catalog_name: String,
pub task_name: String,
pub flow_name: String,
}
impl TryFrom<PbDropFlowTask> for DropFlowTask {
type Error = error::Error;
fn try_from(pb: PbDropFlowTask) -> Result<Self> {
let DropFlowTaskExpr {
let DropFlowExpr {
catalog_name,
task_name,
} = pb.drop_flow_task.context(error::InvalidProtoMsgSnafu {
flow_name,
} = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
err_msg: "expected sink_table_name",
})?;
Ok(DropFlowTask {
catalog_name,
task_name,
flow_name,
})
}
}
@@ -841,13 +841,13 @@ impl From<DropFlowTask> for PbDropFlowTask {
fn from(
DropFlowTask {
catalog_name,
task_name,
flow_name,
}: DropFlowTask,
) -> Self {
PbDropFlowTask {
drop_flow_task: Some(DropFlowTaskExpr {
drop_flow: Some(DropFlowExpr {
catalog_name,
task_name,
flow_name,
}),
}
}

View File

@@ -20,11 +20,11 @@ pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::ddl::flow_meta::FlowMetadataAllocator;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::task_meta::FlowTaskMetadataAllocator;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::flow_task::FlowTaskMetadataManager;
use crate::key::flow::FlowMetadataManager;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
@@ -109,21 +109,20 @@ pub fn new_ddl_context_with_kv_backend(
),
Arc::new(WalOptionsAllocator::default()),
));
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let flow_task_metadata_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new(
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator =
Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
SequenceBuilder::new("flow-test", kv_backend)
.initial(1024)
.build(),
)),
);
)));
DdlContext {
node_manager,
cache_invalidator: Arc::new(DummyCacheInvalidator),
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
table_metadata_allocator,
table_metadata_manager,
flow_task_metadata_allocator,
flow_task_metadata_manager,
flow_metadata_allocator,
flow_metadata_manager,
}
}

View File

@@ -137,10 +137,10 @@ impl GrpcQueryHandler for Instance {
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor.truncate_table(table_name).await?
}
DdlExpr::CreateFlowTask(_) => {
DdlExpr::CreateFlow(_) => {
unimplemented!()
}
DdlExpr::DropFlowTask(_) => {
DdlExpr::DropFlow(_) => {
unimplemented!()
}
}
@@ -181,12 +181,12 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
Expr::TruncateTable(expr) => {
check_and_fill!(expr);
}
Expr::CreateFlowTask(expr) => {
Expr::CreateFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::DropFlowTask(expr) => {
Expr::DropFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}

View File

@@ -59,7 +59,7 @@ use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::{become_follower, become_leader, StateRef};
pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_TASK_ID_SEQ: &str = "flow_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]

View File

@@ -18,14 +18,14 @@ use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::task_meta::FlowTaskMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow_task::FlowTaskMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -38,7 +38,7 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
use super::FLOW_TASK_ID_SEQ;
use super::FLOW_ID_SEQ;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
@@ -205,7 +205,7 @@ impl MetasrvBuilder {
let table_metadata_manager = Arc::new(TableMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
@@ -239,14 +239,13 @@ impl MetasrvBuilder {
))
});
// TODO(weny): use the real allocator.
let flow_task_metadata_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new(
SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_TASK_ID as u64)
let flow_metadata_allocator =
Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_ID as u64)
.step(10)
.build(),
)),
);
)));
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
let node_manager = node_manager.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
@@ -273,8 +272,8 @@ impl MetasrvBuilder {
memory_region_keeper: memory_region_keeper.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_task_metadata_manager: flow_task_metadata_manager.clone(),
flow_task_metadata_allocator: flow_task_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
},
procedure_manager.clone(),
true,

View File

@@ -105,10 +105,10 @@ pub mod test_data {
use chrono::DateTime;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::task_meta::FlowTaskMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::key::flow_task::FlowTaskMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::node_manager::NodeManagerRef;
@@ -201,11 +201,10 @@ pub mod test_data {
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
));
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let flow_task_metadata_allocator =
Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator(
Arc::new(SequenceBuilder::new("test", kv_backend).build()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
Arc::new(SequenceBuilder::new("test", kv_backend).build()),
));
DdlContext {
node_manager,
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
@@ -216,8 +215,8 @@ pub mod test_data {
)),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
}
}

View File

@@ -541,7 +541,7 @@ pub fn to_create_flow_task_expr(
Ok(CreateFlowTask {
catalog_name: query_ctx.current_catalog().to_string(),
task_name: create_flow.flow_name.to_string(),
flow_name: create_flow.flow_name.to_string(),
source_table_names,
sink_table_name,
or_replace: create_flow.or_replace,
@@ -552,7 +552,7 @@ pub fn to_create_flow_task_expr(
.unwrap_or_default(),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
options: HashMap::new(),
flow_options: HashMap::new(),
})
}

View File

@@ -175,7 +175,7 @@ impl<'a> ParserContext<'a> {
fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
let if_not_exists = self.parse_if_not_exist()?;
let task_name = self.intern_parse_table_name()?;
let flow_name = self.intern_parse_table_name()?;
self.parser
.expect_token(&Token::make_keyword(SINK))
@@ -219,7 +219,7 @@ impl<'a> ParserContext<'a> {
let query = Box::new(self.parser.parse_query().context(error::SyntaxSnafu)?);
Ok(Statement::CreateFlow(CreateFlow {
flow_name: task_name,
flow_name,
sink_table_name: output_table_name,
or_replace,
if_not_exists,

View File

@@ -17,14 +17,14 @@ use std::sync::Arc;
use catalog::kvbackend::KvBackendCatalogManager;
use cmd::options::MixOptions;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::KvBackendConfig;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::task_meta::FlowTaskMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::FlowTaskMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
@@ -38,7 +38,7 @@ use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use servers::Mode;
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
@@ -129,7 +129,7 @@ impl GreptimeDbStandaloneBuilder {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
let catalog_manager =
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;
@@ -142,9 +142,9 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let flow_task_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_TASK_ID as u64)
let flow_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_ID as u64)
.step(10)
.build(),
);
@@ -156,9 +156,9 @@ impl GreptimeDbStandaloneBuilder {
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_task_metadata_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence),
);
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
));
let ddl_task_executor = Arc::new(
DdlManager::try_new(
@@ -168,8 +168,8 @@ impl GreptimeDbStandaloneBuilder {
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
},
procedure_manager.clone(),
register_procedure_loaders,