From 9206f60b2828d2b759dad2f39780bcdbf739b092 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 25 Apr 2024 19:59:24 +0800 Subject: [PATCH] feat: implement FlowTaskMetadataManager (#3766) * feat: implement `FlowMetadataManager` * chore: remove dead code * refactor: change `sink_tables` to `sink_table` * refactor: add `PartitionId` * feat: implement FlowTaskNameManager * refactor: update doc of keys * fix: return partition id in `tasks` * refactor: rename to `FlowTaskId` * chore: add comments * chore: add `task_id` in `TaskAlreadyExists` * chore: add comments * fix: fmt * refactor: simplify the docoder * chore: update comments * feat: implement `FlowTaskScoped` and `CatalogScoped` * refactor: refactor flow task keys * refactor: remove metadata mod * refactor: rename to `FlowTaskInfo` * chore: add comments * refactor: rename to `FlowTaskMetadataManager` * chore: remove dead code * Apply suggestions from code review * chore: change to `pub(crate)` * chore: apply suggestions from CR * fix: fix fmt * chore: fmt doc --- src/common/meta/src/error.rs | 27 +- src/common/meta/src/key.rs | 66 ++- src/common/meta/src/key/flow_task.rs | 407 ++++++++++++++++++ .../meta/src/key/flow_task/flow_task_info.rs | 221 ++++++++++ .../meta/src/key/flow_task/flow_task_name.rs | 201 +++++++++ .../meta/src/key/flow_task/flownode_task.rs | 259 +++++++++++ .../meta/src/key/flow_task/table_task.rs | 279 ++++++++++++ src/common/meta/src/key/scope.rs | 152 +++++++ src/common/meta/src/key/tombstone.rs | 6 - src/common/meta/src/lib.rs | 4 + 10 files changed, 1606 insertions(+), 16 deletions(-) create mode 100644 src/common/meta/src/key/flow_task.rs create mode 100644 src/common/meta/src/key/flow_task/flow_task_info.rs create mode 100644 src/common/meta/src/key/flow_task/flow_task_name.rs create mode 100644 src/common/meta/src/key/flow_task/flownode_task.rs create mode 100644 src/common/meta/src/key/flow_task/table_task.rs create mode 100644 src/common/meta/src/key/scope.rs diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index fc70bd7e07..323b2c0fee 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -23,6 +23,7 @@ use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; +use crate::key::FlowTaskId; use crate::peer::Peer; use crate::DatanodeId; @@ -241,6 +242,17 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Task already exists, task: {}, flow_task_id: {}", + task_name, + flow_task_id + ))] + TaskAlreadyExists { + task_name: String, + flow_task_id: FlowTaskId, + location: Location, + }, + #[snafu(display("Catalog already exists, catalog: {}", catalog))] CatalogAlreadyExists { catalog: String, location: Location }, @@ -421,6 +433,16 @@ pub enum Error { #[snafu(display("Invalid role: {}", role))] InvalidRole { role: i32, location: Location }, + #[snafu(display("Delimiter not found, key: {}", key))] + DelimiterNotFound { key: String, location: Location }, + + #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))] + MismatchPrefix { + prefix: String, + key: String, + location: Location, + }, + #[snafu(display("Failed to move values: {err_msg}"))] MoveValues { err_msg: String, location: Location }, @@ -494,7 +516,10 @@ impl ErrorExt for Error { | EmptyKey { .. } | InvalidEngineType { .. } | AlterLogicalTablesInvalidArguments { .. } - | CreateLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments, + | CreateLogicalTablesInvalidArguments { .. } + | TaskAlreadyExists { .. } + | MismatchPrefix { .. } + | DelimiterNotFound { .. } => StatusCode::InvalidArguments, TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 3911399b13..618b092c3f 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -36,16 +36,57 @@ //! - The value is a [TableNameValue] struct; it contains the table id. //! - Used in the table name to table id lookup. //! +//! 6. Flow task info key: `__flow_task/{catalog}/info/{flow_task_id}` +//! - Stores metadata of the flow task. +//! +//! 7. Flow task name key: `__flow_task/{catalog}/name/{task_name}` +//! - Mapping {catalog}/{task_name} to {flow_task_id} +//! +//! 8. Flownode task key: `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}` +//! - Mapping {flownode_id} to {flow_task_id} +//! +//! 9. Table task key: `__table_task/{catalog}/source_table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}` +//! - Mapping source table's {table_id} to {flownode_id} +//! - Used in `Flownode` booting. +//! //! All keys have related managers. The managers take care of the serialization and deserialization //! of keys and values, and the interaction with the underlying KV store backend. //! -//! To simplify the managers used in struct fields and function parameters, we define a "unify" -//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above. -//! It's recommended to just use this manager only. +//! To simplify the managers used in struct fields and function parameters, we define "unify" +//! table metadata manager: [TableMetadataManager] +//! and flow task metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager). +//! It contains all the managers defined above. It's recommended to just use this manager only. +//! +//! The whole picture of flow task keys will be like this: +//! +//! __flow_task/ +//! {catalog}/ +//! info/ +//! {tsak_id} +//! +//! name/ +//! {task_name} +//! +//! flownode/ +//! flownode_id/ +//! {flownode_id}/ +//! {task_id}/ +//! {partition_id} +//! +//! source_table/ +//! flow_task/ +//! {table_id}/ +//! {flownode_id}/ +//! {task_id}/ +//! {partition_id} pub mod catalog_name; pub mod datanode_table; +/// TODO(weny):removes id. +#[allow(unused)] +pub mod flow_task; pub mod schema_name; +pub mod scope; pub mod table_info; pub mod table_name; // TODO(weny): removes it. @@ -56,10 +97,8 @@ pub mod table_region; pub mod table_route; #[cfg(any(test, feature = "testing"))] pub mod test_utils; -// TODO(weny): remove it. -#[allow(dead_code)] mod tombstone; -mod txn_helper; +pub(crate) mod txn_helper; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; @@ -84,6 +123,8 @@ use table_name::{TableNameKey, TableNameManager, TableNameValue}; use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; use self::datanode_table::RegionInfo; +use self::flow_task::flow_task_info::FlowTaskInfoValue; +use self::flow_task::flow_task_name::FlowTaskNameValue; use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use self::tombstone::TombstoneManager; @@ -103,7 +144,6 @@ pub const MAINTENANCE_KEY: &str = "maintenance"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; - pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; @@ -119,9 +159,14 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [ pub type RegionDistribution = BTreeMap>; +/// The id of flow task. +pub type FlowTaskId = u32; +/// The partition of flow task. +pub type FlowTaskPartitionId = u32; + lazy_static! { static ref DATANODE_TABLE_KEY_PATTERN: Regex = - Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9])/([0-9])$")).unwrap(); + Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap(); } lazy_static! { @@ -199,6 +244,7 @@ pub struct TableMetadataManager { kv_backend: KvBackendRef, } +#[macro_export] macro_rules! ensure_values { ($got:expr, $expected_value:expr, $name:expr) => { ensure!( @@ -1007,7 +1053,9 @@ macro_rules! impl_optional_meta_value { impl_table_meta_value! { TableNameValue, TableInfoValue, - DatanodeTableValue + DatanodeTableValue, + FlowTaskInfoValue, + FlowTaskNameValue } impl_optional_meta_value! { diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow_task.rs new file mode 100644 index 0000000000..f5fc9b4793 --- /dev/null +++ b/src/common/meta/src/key/flow_task.rs @@ -0,0 +1,407 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod flow_task_info; +pub(crate) mod flow_task_name; +pub(crate) mod flownode_task; +pub(crate) mod table_task; + +use std::ops::Deref; + +use common_telemetry::info; +use snafu::{ensure, OptionExt}; + +use self::flow_task_info::FlowTaskInfoValue; +use crate::ensure_values; +use crate::error::{self, Result}; +use crate::key::flow_task::flow_task_info::FlowTaskInfoManager; +use crate::key::flow_task::flow_task_name::FlowTaskNameManager; +use crate::key::flow_task::flownode_task::FlownodeTaskManager; +use crate::key::flow_task::table_task::TableTaskManager; +use crate::key::scope::MetaKey; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::FlowTaskId; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; + +/// The key of `__flow_task/` scope. +#[derive(Debug, PartialEq)] +pub struct FlowTaskScoped { + inner: T, +} + +impl Deref for FlowTaskScoped { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl FlowTaskScoped { + const PREFIX: &'static str = "__flow_task/"; + + /// Returns a new [FlowTaskScoped] key. + pub fn new(inner: T) -> FlowTaskScoped { + Self { inner } + } +} + +impl> MetaKey> for FlowTaskScoped { + fn to_bytes(&self) -> Vec { + let prefix = FlowTaskScoped::::PREFIX.as_bytes(); + let inner = self.inner.to_bytes(); + let mut bytes = Vec::with_capacity(prefix.len() + inner.len()); + bytes.extend(prefix); + bytes.extend(inner); + bytes + } + + fn from_bytes(bytes: &[u8]) -> Result> { + let prefix = FlowTaskScoped::::PREFIX.as_bytes(); + ensure!( + bytes.starts_with(prefix), + error::MismatchPrefixSnafu { + prefix: String::from_utf8_lossy(prefix), + key: String::from_utf8_lossy(bytes), + } + ); + let inner = T::from_bytes(&bytes[prefix.len()..])?; + Ok(FlowTaskScoped { inner }) + } +} + +/// The manager of metadata, provides ability to: +/// - Create metadata of the task. +/// - Retrieve metadata of the task. +/// - Delete metadata of the task. +pub struct FlowTaskMetadataManager { + flow_task_manager: FlowTaskInfoManager, + flownode_task_manager: FlownodeTaskManager, + table_task_manager: TableTaskManager, + flow_task_name_manager: FlowTaskNameManager, + kv_backend: KvBackendRef, +} + +impl FlowTaskMetadataManager { + /// Returns a new [FlowTaskMetadataManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { + flow_task_manager: FlowTaskInfoManager::new(kv_backend.clone()), + flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()), + flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), + table_task_manager: TableTaskManager::new(kv_backend.clone()), + kv_backend, + } + } + + /// Returns the [FlowTaskManager]. + pub fn flow_task_manager(&self) -> &FlowTaskInfoManager { + &self.flow_task_manager + } + + /// Returns the [FlownodeTaskManager]. + pub fn flownode_task_manager(&self) -> &FlownodeTaskManager { + &self.flownode_task_manager + } + + /// Returns the [TableTaskManager]. + pub fn table_task_manager(&self) -> &TableTaskManager { + &self.table_task_manager + } + + /// Creates metadata for task and returns an error if different metadata exists. + pub async fn create_flow_task_metadata( + &self, + flow_task_id: FlowTaskId, + flow_task_value: FlowTaskInfoValue, + ) -> Result<()> { + let (create_flow_task_name_txn, on_create_flow_task_name_failure) = + self.flow_task_name_manager.build_create_txn( + &flow_task_value.catalog_name, + &flow_task_value.task_name, + flow_task_id, + )?; + + let (create_flow_task_txn, on_create_flow_task_failure) = + self.flow_task_manager.build_create_txn( + &flow_task_value.catalog_name, + flow_task_id, + &flow_task_value, + )?; + + let create_flownode_task_txn = self.flownode_task_manager.build_create_txn( + &flow_task_value.catalog_name, + flow_task_id, + flow_task_value.flownode_ids().clone(), + ); + + let create_table_task_txn = self.table_task_manager.build_create_txn( + &flow_task_value.catalog_name, + flow_task_id, + flow_task_value.flownode_ids().clone(), + flow_task_value.source_table_ids(), + ); + + let txn = Txn::merge_all(vec![ + create_flow_task_name_txn, + create_flow_task_txn, + create_flownode_task_txn, + create_table_task_txn, + ]); + info!( + "Creating flow task {}.{}({}), with {} txn operations", + flow_task_value.catalog_name, + flow_task_value.task_name, + flow_task_id, + txn.max_operations() + ); + + let mut resp = self.kv_backend.txn(txn).await?; + if !resp.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + let remote_flow_task_name = on_create_flow_task_name_failure(&mut set)? + .with_context(||error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow task name during the creating flow task, flow_task_id: {flow_task_id}" + ), + })?; + + if remote_flow_task_name.flow_task_id() != flow_task_id { + info!( + "Trying to create flow task {}.{}({}), but flow task({}) already exists", + flow_task_value.catalog_name, + flow_task_value.task_name, + flow_task_id, + remote_flow_task_name.flow_task_id() + ); + + return error::TaskAlreadyExistsSnafu { + task_name: format!( + "{}.{}", + flow_task_value.catalog_name, flow_task_value.task_name + ), + flow_task_id, + } + .fail(); + } + + let remote_flow_task = on_create_flow_task_failure(&mut set)?.with_context(|| { + error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow task during the creating flow task, flow_task_id: {flow_task_id}" + ), + } + })?; + let op_name = "creating flow task"; + ensure_values!(*remote_flow_task, flow_task_value, op_name); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use futures::TryStreamExt; + + use super::*; + use crate::key::flow_task::table_task::TableTaskKey; + use crate::key::scope::CatalogScoped; + use crate::kv_backend::memory::MemoryKvBackend; + + #[derive(Debug)] + struct MockKey { + inner: Vec, + } + + impl MetaKey for MockKey { + fn to_bytes(&self) -> Vec { + self.inner.clone() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(MockKey { + inner: bytes.to_vec(), + }) + } + } + + #[test] + fn test_flow_scoped_to_bytes() { + let key = FlowTaskScoped::new(CatalogScoped::new( + "my_catalog".to_string(), + MockKey { + inner: b"hi".to_vec(), + }, + )); + assert_eq!(b"__flow_task/my_catalog/hi".to_vec(), key.to_bytes()); + } + + #[test] + fn test_flow_scoped_from_bytes() { + let bytes = b"__flow_task/my_catalog/hi"; + let key = FlowTaskScoped::>::from_bytes(bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.inner.inner, b"hi".to_vec()); + } + + #[test] + fn test_flow_scoped_from_bytes_mismatch() { + let bytes = b"__table/my_catalog/hi"; + let err = FlowTaskScoped::>::from_bytes(bytes).unwrap_err(); + assert_matches!(err, error::Error::MismatchPrefix { .. }); + } + + #[tokio::test] + async fn test_create_flow_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone()); + let task_id = 10; + let catalog_name = "greptime"; + let flow_task_value = FlowTaskInfoValue { + catalog_name: catalog_name.to_string(), + task_name: "task".to_string(), + source_tables: vec![1024, 1025, 1026], + sink_table: 2049, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_when: "expr".to_string(), + comment: "hi".to_string(), + options: Default::default(), + }; + flow_metadata_manager + .create_flow_task_metadata(task_id, flow_task_value.clone()) + .await + .unwrap(); + // Creates again. + flow_metadata_manager + .create_flow_task_metadata(task_id, flow_task_value.clone()) + .await + .unwrap(); + let got = flow_metadata_manager + .flow_task_manager() + .get(catalog_name, task_id) + .await + .unwrap() + .unwrap(); + assert_eq!(got, flow_task_value); + let tasks = flow_metadata_manager + .flownode_task_manager() + .tasks(catalog_name, 1) + .try_collect::>() + .await + .unwrap(); + assert_eq!(tasks, vec![(task_id, 0)]); + for table_id in [1024, 1025, 1026] { + let nodes = flow_metadata_manager + .table_task_manager() + .nodes(catalog_name, table_id) + .try_collect::>() + .await + .unwrap(); + assert_eq!( + nodes, + vec![TableTaskKey::new( + catalog_name.to_string(), + table_id, + 1, + task_id, + 0 + )] + ); + } + } + + #[tokio::test] + async fn test_create_table_metadata_task_exists_err() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); + let task_id = 10; + let flow_task_value = FlowTaskInfoValue { + catalog_name: "greptime".to_string(), + task_name: "task".to_string(), + source_tables: vec![1024, 1025, 1026], + sink_table: 2049, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_when: "expr".to_string(), + comment: "hi".to_string(), + options: Default::default(), + }; + flow_metadata_manager + .create_flow_task_metadata(task_id, flow_task_value.clone()) + .await + .unwrap(); + // Creates again. + let flow_task_value = FlowTaskInfoValue { + catalog_name: "greptime".to_string(), + task_name: "task".to_string(), + source_tables: vec![1024, 1025, 1026], + sink_table: 2049, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_when: "expr".to_string(), + comment: "hi".to_string(), + options: Default::default(), + }; + let err = flow_metadata_manager + .create_flow_task_metadata(task_id + 1, flow_task_value) + .await + .unwrap_err(); + assert_matches!(err, error::Error::TaskAlreadyExists { .. }); + } + + #[tokio::test] + async fn test_create_table_metadata_unexpected_err() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); + let task_id = 10; + let flow_task_value = FlowTaskInfoValue { + catalog_name: "greptime".to_string(), + task_name: "task".to_string(), + source_tables: vec![1024, 1025, 1026], + sink_table: 2049, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_when: "expr".to_string(), + comment: "hi".to_string(), + options: Default::default(), + }; + flow_metadata_manager + .create_flow_task_metadata(task_id, flow_task_value.clone()) + .await + .unwrap(); + // Creates again. + let flow_task_value = FlowTaskInfoValue { + catalog_name: "greptime".to_string(), + task_name: "task".to_string(), + source_tables: vec![1024, 1025, 1026], + sink_table: 2048, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_when: "expr".to_string(), + comment: "hi".to_string(), + options: Default::default(), + }; + let err = flow_metadata_manager + .create_flow_task_metadata(task_id, flow_task_value) + .await + .unwrap_err(); + assert!(err.to_string().contains("Reads the different value")); + } +} diff --git a/src/common/meta/src/key/flow_task/flow_task_info.rs b/src/common/meta/src/key/flow_task/flow_task_info.rs new file mode 100644 index 0000000000..f30d3217f8 --- /dev/null +++ b/src/common/meta/src/key/flow_task/flow_task_info.rs @@ -0,0 +1,221 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; + +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::error::{self, Result}; +use crate::key::flow_task::FlowTaskScoped; +use crate::key::scope::{CatalogScoped, MetaKey}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::{ + txn_helper, DeserializedValueWithBytes, FlowTaskId, FlowTaskPartitionId, TableMetaValue, +}; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; +use crate::FlownodeId; + +const FLOW_TASK_INFO_KEY_PREFIX: &str = "info"; + +lazy_static! { + static ref FLOW_TASK_INFO_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOW_TASK_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + +/// The key stores the metadata of the task. +/// +/// The layout: `__flow_task/{catalog}/info/{flow_task_id}`. +pub struct FlowTaskInfoKey(FlowTaskScoped>); + +impl MetaKey for FlowTaskInfoKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlowTaskInfoKey(FlowTaskScoped::< + CatalogScoped, + >::from_bytes(bytes)?)) + } +} + +impl FlowTaskInfoKey { + /// Returns the [FlowTaskInfoKey]. + pub fn new(catalog: String, flow_task_id: FlowTaskId) -> FlowTaskInfoKey { + let inner = FlowTaskInfoKeyInner::new(flow_task_id); + FlowTaskInfoKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Returns the [FlowTaskId]. + pub fn flow_task_id(&self) -> FlowTaskId { + self.0.flow_task_id + } +} + +/// The key of flow task metadata. +#[derive(Debug, Clone, Copy, PartialEq)] +struct FlowTaskInfoKeyInner { + flow_task_id: FlowTaskId, +} + +impl FlowTaskInfoKeyInner { + /// Returns a [FlowTaskInfoKey] with the specified `flow_task_id`. + pub fn new(flow_task_id: FlowTaskId) -> FlowTaskInfoKeyInner { + FlowTaskInfoKeyInner { flow_task_id } + } +} + +impl MetaKey for FlowTaskInfoKeyInner { + fn to_bytes(&self) -> Vec { + format!("{FLOW_TASK_INFO_KEY_PREFIX}/{}", self.flow_task_id).into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlowTaskInfoKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOW_TASK_INFO_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlowTaskInfoKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flow_task_id = captures[1].parse::().unwrap(); + Ok(FlowTaskInfoKeyInner { flow_task_id }) + } +} + +// The metadata of the flow task. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FlowTaskInfoValue { + /// The source tables used by the task. + pub(crate) source_tables: Vec, + /// The sink table used by the task. + pub(crate) sink_table: TableId, + /// Which flow nodes this task is running on. + pub(crate) flownode_ids: BTreeMap, + /// The catalog name. + pub(crate) catalog_name: String, + /// The task name. + pub(crate) task_name: String, + /// The raw sql. + pub(crate) raw_sql: String, + /// The expr of expire. + pub(crate) expire_when: String, + /// The comment. + pub(crate) comment: String, + /// The options. + pub(crate) options: HashMap, +} + +impl FlowTaskInfoValue { + /// Returns the `flownode_id`. + pub fn flownode_ids(&self) -> &BTreeMap { + &self.flownode_ids + } + + /// Returns the `source_table`. + pub fn source_table_ids(&self) -> &[TableId] { + &self.source_tables + } +} + +/// The manager of [FlowTaskInfoKey]. +pub struct FlowTaskInfoManager { + kv_backend: KvBackendRef, +} + +impl FlowTaskInfoManager { + /// Returns a new [FlowTaskInfoManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Returns the [FlowTaskInfoValue] of specified `flow_task_id`. + pub async fn get( + &self, + catalog: &str, + flow_task_id: FlowTaskId, + ) -> Result> { + let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes(); + self.kv_backend + .get(&key) + .await? + .map(|x| FlowTaskInfoValue::try_from_raw_value(&x.value)) + .transpose() + } + + /// Builds a create flow task transaction. + /// It is expected that the `__flow_task/{catalog}/info/{flow_task_id}` wasn't occupied. + /// Otherwise, the transaction will retrieve existing value. + pub(crate) fn build_create_txn( + &self, + catalog: &str, + flow_task_id: FlowTaskId, + flow_task_value: &FlowTaskInfoValue, + ) -> Result<( + Txn, + impl FnOnce( + &mut TxnOpGetResponseSet, + ) -> Result>>, + )> { + let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes(); + let txn = + txn_helper::build_put_if_absent_txn(key.clone(), flow_task_value.try_as_raw_value()?); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serialization() { + let flow_task = FlowTaskInfoKey::new("my_catalog".to_string(), 2); + assert_eq!( + b"__flow_task/my_catalog/info/2".to_vec(), + flow_task.to_bytes() + ); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow_task/my_catalog/info/2".to_vec(); + let key = FlowTaskInfoKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.flow_task_id(), 2); + } +} diff --git a/src/common/meta/src/key/flow_task/flow_task_name.rs b/src/common/meta/src/key/flow_task/flow_task_name.rs new file mode 100644 index 0000000000..9828283e64 --- /dev/null +++ b/src/common/meta/src/key/flow_task/flow_task_name.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{self, Result}; +use crate::key::flow_task::FlowTaskScoped; +use crate::key::scope::{CatalogScoped, MetaKey}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::{ + txn_helper, DeserializedValueWithBytes, FlowTaskId, TableMetaValue, NAME_PATTERN, +}; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; + +const FLOW_TASK_NAME_KEY_PREFIX: &str = "name"; + +lazy_static! { + static ref FLOW_TASK_NAME_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOW_TASK_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap(); +} + +/// The key of mapping {task_name} to [FlowTaskId]. +/// +/// The layout: `__flow_task/{catalog}/name/{task_name}`. +pub struct FlowTaskNameKey(FlowTaskScoped>); + +impl FlowTaskNameKey { + /// Returns the [FlowTaskNameKey] + pub fn new(catalog: String, task_name: String) -> FlowTaskNameKey { + let inner = FlowTaskNameKeyInner::new(task_name); + FlowTaskNameKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Return the `task_name` + pub fn task_name(&self) -> &str { + &self.0.task_name + } +} + +impl MetaKey for FlowTaskNameKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlowTaskNameKey(FlowTaskScoped::< + CatalogScoped, + >::from_bytes(bytes)?)) + } +} + +/// The key of mapping name to [FlowTaskId] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FlowTaskNameKeyInner { + pub task_name: String, +} + +impl MetaKey for FlowTaskNameKeyInner { + fn to_bytes(&self) -> Vec { + format!("{FLOW_TASK_NAME_KEY_PREFIX}/{}", self.task_name).into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlowTaskNameKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOW_TASK_NAME_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlowTaskNameKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let task = captures[1].to_string(); + Ok(FlowTaskNameKeyInner { task_name: task }) + } +} + +impl FlowTaskNameKeyInner { + /// Returns a [FlowTaskNameKeyInner]. + pub fn new(task: String) -> Self { + Self { task_name: task } + } +} + +/// The value of [FlowTaskNameKey]. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub struct FlowTaskNameValue { + flow_task_id: FlowTaskId, +} + +impl FlowTaskNameValue { + /// Returns a [FlowTaskNameValue] with specified [FlowTaskId]. + pub fn new(flow_task_id: FlowTaskId) -> Self { + Self { flow_task_id } + } + + /// Returns the [FlowTaskId] + pub fn flow_task_id(&self) -> FlowTaskId { + self.flow_task_id + } +} + +/// The manager of [FlowTaskNameKey]. +pub struct FlowTaskNameManager { + kv_backend: KvBackendRef, +} + +impl FlowTaskNameManager { + /// Returns a new [FlowTaskNameManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Returns the [FlowTaskNameValue] of specified `catalog.task`. + pub async fn get(&self, catalog: &str, task: &str) -> Result> { + let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| FlowTaskNameValue::try_from_raw_value(&x.value)) + .transpose() + } + + /// Builds a create flow task name transaction. + /// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied. + /// Otherwise, the transaction will retrieve existing value. + pub fn build_create_txn( + &self, + catalog: &str, + name: &str, + flow_task_id: FlowTaskId, + ) -> Result<( + Txn, + impl FnOnce( + &mut TxnOpGetResponseSet, + ) -> Result>>, + )> { + let key = FlowTaskNameKey::new(catalog.to_string(), name.to_string()); + let raw_key = key.to_bytes(); + let flow_task_name_value = FlowTaskNameValue::new(flow_task_id); + let txn = txn_helper::build_put_if_absent_txn( + raw_key.clone(), + flow_task_name_value.try_as_raw_value()?, + ); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serialization() { + let table_task_key = FlowTaskNameKey::new("my_catalog".to_string(), "my_task".to_string()); + assert_eq!( + b"__flow_task/my_catalog/name/my_task".to_vec(), + table_task_key.to_bytes(), + ); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow_task/my_catalog/name/my_task".to_vec(); + let key = FlowTaskNameKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.task_name(), "my_task"); + } +} diff --git a/src/common/meta/src/key/flow_task/flownode_task.rs b/src/common/meta/src/key/flow_task/flownode_task.rs new file mode 100644 index 0000000000..bacff5326e --- /dev/null +++ b/src/common/meta/src/key/flow_task/flownode_task.rs @@ -0,0 +1,259 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::stream::BoxStream; +use futures::TryStreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use snafu::OptionExt; + +use crate::error::{self, Result}; +use crate::key::flow_task::FlowTaskScoped; +use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; +use crate::key::{FlowTaskId, FlowTaskPartitionId}; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; +use crate::FlownodeId; + +lazy_static! { + static ref FLOWNODE_TASK_KEY_PATTERN: Regex = Regex::new(&format!( + "^{FLOWNODE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$" + )) + .unwrap(); +} + +const FLOWNODE_TASK_KEY_PREFIX: &str = "flownode"; + +/// The key of mapping [FlownodeId] to [FlowTaskId]. +/// +/// The layout `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}` +pub struct FlownodeTaskKey(FlowTaskScoped>); + +impl MetaKey for FlownodeTaskKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlownodeTaskKey(FlowTaskScoped::< + CatalogScoped, + >::from_bytes(bytes)?)) + } +} + +impl FlownodeTaskKey { + /// Returns a new [FlownodeTaskKey]. + pub fn new( + catalog: String, + flownode_id: FlownodeId, + flow_task_id: FlowTaskId, + partition_id: FlowTaskPartitionId, + ) -> FlownodeTaskKey { + let inner = FlownodeTaskKeyInner::new(flownode_id, flow_task_id, partition_id); + FlownodeTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`. + pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec { + let catalog_scoped_key = CatalogScoped::new( + catalog, + BytesAdapter::from(FlownodeTaskKeyInner::range_start_key(flownode_id).into_bytes()), + ); + + FlowTaskScoped::new(catalog_scoped_key).to_bytes() + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Returns the [FlowTaskId]. + pub fn flow_task_id(&self) -> FlowTaskId { + self.0.flow_task_id + } + + /// Returns the [FlownodeId]. + pub fn flownode_id(&self) -> FlownodeId { + self.0.flownode_id + } + + /// Returns the [PartitionId]. + pub fn partition_id(&self) -> FlowTaskPartitionId { + self.0.partition_id + } +} + +/// The key of mapping [FlownodeId] to [FlowTaskId]. +pub struct FlownodeTaskKeyInner { + flownode_id: FlownodeId, + flow_task_id: FlowTaskId, + partition_id: FlowTaskPartitionId, +} + +impl FlownodeTaskKeyInner { + /// Returns a [FlownodeTaskKey] with the specified `flownode_id`, `flow_task_id` and `partition_id`. + pub fn new( + flownode_id: FlownodeId, + flow_task_id: FlowTaskId, + partition_id: FlowTaskPartitionId, + ) -> Self { + Self { + flownode_id, + flow_task_id, + partition_id, + } + } + + fn prefix(flownode_id: FlownodeId) -> String { + format!("{}/{flownode_id}", FLOWNODE_TASK_KEY_PREFIX) + } + + /// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`. + fn range_start_key(flownode_id: FlownodeId) -> String { + format!("{}/", Self::prefix(flownode_id)) + } +} + +impl MetaKey for FlownodeTaskKeyInner { + fn to_bytes(&self) -> Vec { + format!( + "{FLOWNODE_TASK_KEY_PREFIX}/{}/{}/{}", + self.flownode_id, self.flow_task_id, self.partition_id, + ) + .into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlownodeTaskKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOWNODE_TASK_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlownodeTaskKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flownode_id = captures[1].parse::().unwrap(); + let flow_task_id = captures[2].parse::().unwrap(); + let partition_id = captures[3].parse::().unwrap(); + + Ok(FlownodeTaskKeyInner { + flownode_id, + flow_task_id, + partition_id, + }) + } +} + +/// The manager of [FlownodeTaskKey]. +pub struct FlownodeTaskManager { + kv_backend: KvBackendRef, +} + +/// Decodes `KeyValue` to [FlownodeTaskKey]. +pub fn flownode_task_key_decoder(kv: KeyValue) -> Result { + FlownodeTaskKey::from_bytes(&kv.key) +} + +impl FlownodeTaskManager { + /// Returns a new [FlownodeTaskManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Retrieves all [FlowTaskId] and [PartitionId]s of the specified `flownode_id`. + pub fn tasks( + &self, + catalog: &str, + flownode_id: FlownodeId, + ) -> BoxStream<'static, Result<(FlowTaskId, FlowTaskPartitionId)>> { + let start_key = FlownodeTaskKey::range_start_key(catalog.to_string(), flownode_id); + let req = RangeRequest::new().with_prefix(start_key); + + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(flownode_task_key_decoder), + ); + + Box::pin(stream.map_ok(|key| (key.flow_task_id(), key.partition_id()))) + } + + /// Builds a create flownode task transaction. + /// + /// Puts `__flownode_task/{flownode_id}/{flow_task_id}/{partition_id}` keys. + pub(crate) fn build_create_txn>( + &self, + catalog: &str, + flow_task_id: FlowTaskId, + flownode_ids: I, + ) -> Txn { + let txns = flownode_ids + .into_iter() + .map(|(partition_id, flownode_id)| { + let key = FlownodeTaskKey::new( + catalog.to_string(), + flownode_id, + flow_task_id, + partition_id, + ) + .to_bytes(); + TxnOp::Put(key, vec![]) + }) + .collect::>(); + + Txn::new().and_then(txns) + } +} + +#[cfg(test)] +mod tests { + use crate::key::flow_task::flownode_task::FlownodeTaskKey; + use crate::key::scope::MetaKey; + + #[test] + fn test_key_serialization() { + let flownode_task = FlownodeTaskKey::new("my_catalog".to_string(), 1, 2, 0); + assert_eq!( + b"__flow_task/my_catalog/flownode/1/2/0".to_vec(), + flownode_task.to_bytes() + ); + let prefix = FlownodeTaskKey::range_start_key("my_catalog".to_string(), 1); + assert_eq!(b"__flow_task/my_catalog/flownode/1/".to_vec(), prefix); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow_task/my_catalog/flownode/1/2/0".to_vec(); + let key = FlownodeTaskKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.flownode_id(), 1); + assert_eq!(key.flow_task_id(), 2); + assert_eq!(key.partition_id(), 0); + } +} diff --git a/src/common/meta/src/key/flow_task/table_task.rs b/src/common/meta/src/key/flow_task/table_task.rs new file mode 100644 index 0000000000..dd0d34adcf --- /dev/null +++ b/src/common/meta/src/key/flow_task/table_task.rs @@ -0,0 +1,279 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::stream::BoxStream; +use lazy_static::lazy_static; +use regex::Regex; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::error::{self, Result}; +use crate::key::flow_task::FlowTaskScoped; +use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; +use crate::key::{FlowTaskId, FlowTaskPartitionId}; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; +use crate::FlownodeId; + +const TABLE_TASK_KEY_PREFIX: &str = "source_table"; + +lazy_static! { + static ref TABLE_TASK_KEY_PATTERN: Regex = Regex::new(&format!( + "^{TABLE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$" + )) + .unwrap(); +} + +/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct TableTaskKeyInner { + table_id: TableId, + flownode_id: FlownodeId, + flow_task_id: FlowTaskId, + partition_id: FlowTaskPartitionId, +} + +/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId]. +/// +/// The layout: `__flow_task/{catalog}/table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}`. +#[derive(Debug, PartialEq)] +pub struct TableTaskKey(FlowTaskScoped>); + +impl MetaKey for TableTaskKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(TableTaskKey(FlowTaskScoped::< + CatalogScoped, + >::from_bytes(bytes)?)) + } +} + +impl TableTaskKey { + /// Returns a new [TableTaskKey]. + pub fn new( + catalog: String, + table_id: TableId, + flownode_id: FlownodeId, + flow_task_id: FlowTaskId, + partition_id: FlowTaskPartitionId, + ) -> TableTaskKey { + let inner = TableTaskKeyInner::new(table_id, flownode_id, flow_task_id, partition_id); + TableTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`. + pub fn range_start_key(catalog: String, table_id: TableId) -> Vec { + let catalog_scoped_key = CatalogScoped::new( + catalog, + BytesAdapter::from(TableTaskKeyInner::range_start_key(table_id).into_bytes()), + ); + + FlowTaskScoped::new(catalog_scoped_key).to_bytes() + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Returns the source [TableId]. + pub fn source_table_id(&self) -> TableId { + self.0.table_id + } + + /// Returns the [FlowTaskId]. + pub fn flow_task_id(&self) -> FlowTaskId { + self.0.flow_task_id + } + + /// Returns the [FlownodeId]. + pub fn flownode_id(&self) -> FlownodeId { + self.0.flownode_id + } + + /// Returns the [PartitionId]. + pub fn partition_id(&self) -> FlowTaskPartitionId { + self.0.partition_id + } +} + +impl TableTaskKeyInner { + /// Returns a new [TableTaskKey]. + fn new( + table_id: TableId, + flownode_id: FlownodeId, + flow_task_id: FlowTaskId, + partition_id: FlowTaskPartitionId, + ) -> TableTaskKeyInner { + Self { + table_id, + flownode_id, + flow_task_id, + partition_id, + } + } + + fn prefix(table_id: TableId) -> String { + format!("{}/{table_id}", TABLE_TASK_KEY_PREFIX) + } + + /// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`. + fn range_start_key(table_id: TableId) -> String { + format!("{}/", Self::prefix(table_id)) + } +} + +impl MetaKey for TableTaskKeyInner { + fn to_bytes(&self) -> Vec { + format!( + "{TABLE_TASK_KEY_PREFIX}/{}/{}/{}/{}", + self.table_id, self.flownode_id, self.flow_task_id, self.partition_id + ) + .into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "TableTaskKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + TABLE_TASK_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableTaskKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + let flownode_id = captures[2].parse::().unwrap(); + let flow_task_id = captures[3].parse::().unwrap(); + let partition_id = captures[4].parse::().unwrap(); + Ok(TableTaskKeyInner::new( + table_id, + flownode_id, + flow_task_id, + partition_id, + )) + } +} + +/// Decodes `KeyValue` to [TableTaskKey]. +pub fn table_task_decoder(kv: KeyValue) -> Result { + TableTaskKey::from_bytes(&kv.key) +} + +/// The manager of [TableTaskKey]. +pub struct TableTaskManager { + kv_backend: KvBackendRef, +} + +impl TableTaskManager { + /// Returns a new [TableTaskManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Retrieves all [TableTaskKey]s of the specified `table_id`. + pub fn nodes( + &self, + catalog: &str, + table_id: TableId, + ) -> BoxStream<'static, Result> { + let start_key = TableTaskKey::range_start_key(catalog.to_string(), table_id); + let req = RangeRequest::new().with_prefix(start_key); + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(table_task_decoder), + ); + + Box::pin(stream) + } + + /// Builds a create table task transaction. + /// + /// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys. + pub fn build_create_txn>( + &self, + catalog: &str, + flow_task_id: FlowTaskId, + flownode_ids: I, + source_table_ids: &[TableId], + ) -> Txn { + let txns = flownode_ids + .into_iter() + .flat_map(|(partition_id, flownode_id)| { + source_table_ids.iter().map(move |table_id| { + TxnOp::Put( + TableTaskKey::new( + catalog.to_string(), + *table_id, + flownode_id, + flow_task_id, + partition_id, + ) + .to_bytes(), + vec![], + ) + }) + }) + .collect::>(); + + Txn::new().and_then(txns) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serialization() { + let table_task_key = TableTaskKey::new("my_catalog".to_string(), 1024, 1, 2, 0); + assert_eq!( + b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec(), + table_task_key.to_bytes(), + ); + let prefix = TableTaskKey::range_start_key("my_catalog".to_string(), 1024); + assert_eq!( + b"__flow_task/my_catalog/source_table/1024/".to_vec(), + prefix + ); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec(); + let key = TableTaskKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.source_table_id(), 1024); + assert_eq!(key.flownode_id(), 1); + assert_eq!(key.flow_task_id(), 2); + assert_eq!(key.partition_id(), 0); + } +} diff --git a/src/common/meta/src/key/scope.rs b/src/common/meta/src/key/scope.rs new file mode 100644 index 0000000000..7f185a81d3 --- /dev/null +++ b/src/common/meta/src/key/scope.rs @@ -0,0 +1,152 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; + +use snafu::OptionExt; + +use crate::error::{self, Result}; + +/// The delimiter of key. +pub(crate) const DELIMITER: u8 = b'/'; + +/// The key of metadata. +pub trait MetaKey { + fn to_bytes(&self) -> Vec; + + fn from_bytes(bytes: &[u8]) -> Result; +} + +/// The key of `{catalog}/` scope. +#[derive(Debug, PartialEq)] +pub struct CatalogScoped { + inner: T, + catalog: String, +} + +impl Deref for CatalogScoped { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl CatalogScoped { + /// Returns a new [CatalogScoped] key. + pub fn new(catalog: String, inner: T) -> CatalogScoped { + CatalogScoped { inner, catalog } + } + + /// Returns the `catalog`. + pub fn catalog(&self) -> &str { + &self.catalog + } +} + +impl> MetaKey> for CatalogScoped { + fn to_bytes(&self) -> Vec { + let prefix = self.catalog.as_bytes(); + let inner = self.inner.to_bytes(); + let mut bytes = Vec::with_capacity(prefix.len() + inner.len() + 1); + bytes.extend(prefix); + bytes.push(DELIMITER); + bytes.extend(inner); + bytes + } + + fn from_bytes(bytes: &[u8]) -> Result> { + let pos = bytes + .iter() + .position(|c| *c == DELIMITER) + .with_context(|| error::DelimiterNotFoundSnafu { + key: String::from_utf8_lossy(bytes), + })?; + let catalog = String::from_utf8_lossy(&bytes[0..pos]).to_string(); + // Safety: We don't need the `DELIMITER` char. + let inner = T::from_bytes(&bytes[pos + 1..])?; + Ok(CatalogScoped { inner, catalog }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct BytesAdapter(Vec); + +impl From> for BytesAdapter { + fn from(value: Vec) -> Self { + Self(value) + } +} + +impl MetaKey for BytesAdapter { + fn to_bytes(&self) -> Vec { + self.0.clone() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(BytesAdapter(bytes.to_vec())) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::error::Result; + + #[derive(Debug)] + struct MockKey { + inner: Vec, + } + + impl MetaKey for MockKey { + fn to_bytes(&self) -> Vec { + self.inner.clone() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(MockKey { + inner: bytes.to_vec(), + }) + } + } + + #[test] + fn test_catalog_scoped_from_bytes() { + let key = "test_catalog_name/key"; + let scoped_key = CatalogScoped::::from_bytes(key.as_bytes()).unwrap(); + assert_eq!(scoped_key.catalog, "test_catalog_name"); + assert_eq!(scoped_key.inner.inner, b"key".to_vec()); + assert_eq!(key.as_bytes(), &scoped_key.to_bytes()); + } + + #[test] + fn test_catalog_scoped_from_bytes_delimiter_not_found() { + let key = "test_catalog_name"; + let err = CatalogScoped::::from_bytes(key.as_bytes()).unwrap_err(); + assert_matches!(err, error::Error::DelimiterNotFound { .. }); + } + + #[test] + fn test_catalog_scoped_to_bytes() { + let scoped_key = CatalogScoped { + inner: MockKey { + inner: b"hi".to_vec(), + }, + catalog: "test_catalog".to_string(), + }; + assert_eq!(b"test_catalog/hi".to_vec(), scoped_key.to_bytes()); + } +} diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs index 38648f2695..9aa2dd69ee 100644 --- a/src/common/meta/src/key/tombstone.rs +++ b/src/common/meta/src/key/tombstone.rs @@ -31,12 +31,6 @@ pub(crate) struct TombstoneManager { const TOMBSTONE_PREFIX: &str = "__tombstone/"; -pub(crate) struct TombstoneKeyValue { - pub(crate) origin_key: Vec, - pub(crate) tombstone_key: Vec, - pub(crate) value: Vec, -} - fn to_tombstone(key: &[u8]) -> Vec { [TOMBSTONE_PREFIX.as_bytes(), key].concat() } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 655e6d27c1..8aa8c8abec 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -43,7 +43,11 @@ pub mod test_util; pub mod util; pub mod wal_options_allocator; +// The id of the cluster. pub type ClusterId = u64; +// The id of the datanode. pub type DatanodeId = u64; +// The id of the flownode. +pub type FlownodeId = u64; pub use instruction::RegionIdent;