feat: implement FlowTaskMetadataManager (#3766)

* feat: implement `FlowMetadataManager`

* chore: remove dead code

* refactor: change `sink_tables` to `sink_table`

* refactor: add `PartitionId`

* feat: implement FlowTaskNameManager

* refactor: update doc of keys

* fix: return partition id in `tasks`

* refactor: rename to `FlowTaskId`

* chore: add comments

* chore: add `task_id` in `TaskAlreadyExists`

* chore: add comments

* fix: fmt

* refactor: simplify the docoder

* chore: update comments

* feat: implement `FlowTaskScoped` and `CatalogScoped`

* refactor: refactor flow task keys

* refactor: remove metadata mod

* refactor: rename to `FlowTaskInfo`

* chore: add comments

* refactor: rename to `FlowTaskMetadataManager`

* chore: remove dead code

* Apply suggestions from code review

* chore: change to `pub(crate)`

* chore: apply suggestions from CR

* fix: fix fmt

* chore: fmt doc
This commit is contained in:
Weny Xu
2024-04-25 19:59:24 +08:00
committed by GitHub
parent 2d0f493040
commit 9206f60b28
10 changed files with 1606 additions and 16 deletions

View File

@@ -23,6 +23,7 @@ use snafu::{Location, Snafu};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::key::FlowTaskId;
use crate::peer::Peer;
use crate::DatanodeId;
@@ -241,6 +242,17 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Task already exists, task: {}, flow_task_id: {}",
task_name,
flow_task_id
))]
TaskAlreadyExists {
task_name: String,
flow_task_id: FlowTaskId,
location: Location,
},
#[snafu(display("Catalog already exists, catalog: {}", catalog))]
CatalogAlreadyExists { catalog: String, location: Location },
@@ -421,6 +433,16 @@ pub enum Error {
#[snafu(display("Invalid role: {}", role))]
InvalidRole { role: i32, location: Location },
#[snafu(display("Delimiter not found, key: {}", key))]
DelimiterNotFound { key: String, location: Location },
#[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
MismatchPrefix {
prefix: String,
key: String,
location: Location,
},
#[snafu(display("Failed to move values: {err_msg}"))]
MoveValues { err_msg: String, location: Location },
@@ -494,7 +516,10 @@ impl ErrorExt for Error {
| EmptyKey { .. }
| InvalidEngineType { .. }
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments,
| CreateLogicalTablesInvalidArguments { .. }
| TaskAlreadyExists { .. }
| MismatchPrefix { .. }
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

View File

@@ -36,16 +36,57 @@
//! - The value is a [TableNameValue] struct; it contains the table id.
//! - Used in the table name to table id lookup.
//!
//! 6. Flow task info key: `__flow_task/{catalog}/info/{flow_task_id}`
//! - Stores metadata of the flow task.
//!
//! 7. Flow task name key: `__flow_task/{catalog}/name/{task_name}`
//! - Mapping {catalog}/{task_name} to {flow_task_id}
//!
//! 8. Flownode task key: `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}`
//! - Mapping {flownode_id} to {flow_task_id}
//!
//! 9. Table task key: `__table_task/{catalog}/source_table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
//! To simplify the managers used in struct fields and function parameters, we define a "unify"
//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above.
//! It's recommended to just use this manager only.
//! To simplify the managers used in struct fields and function parameters, we define "unify"
//! table metadata manager: [TableMetadataManager]
//! and flow task metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager).
//! It contains all the managers defined above. It's recommended to just use this manager only.
//!
//! The whole picture of flow task keys will be like this:
//!
//! __flow_task/
//! {catalog}/
//! info/
//! {tsak_id}
//!
//! name/
//! {task_name}
//!
//! flownode/
//! flownode_id/
//! {flownode_id}/
//! {task_id}/
//! {partition_id}
//!
//! source_table/
//! flow_task/
//! {table_id}/
//! {flownode_id}/
//! {task_id}/
//! {partition_id}
pub mod catalog_name;
pub mod datanode_table;
/// TODO(weny):removes id.
#[allow(unused)]
pub mod flow_task;
pub mod schema_name;
pub mod scope;
pub mod table_info;
pub mod table_name;
// TODO(weny): removes it.
@@ -56,10 +97,8 @@ pub mod table_region;
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
// TODO(weny): remove it.
#[allow(dead_code)]
mod tombstone;
mod txn_helper;
pub(crate) mod txn_helper;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
@@ -84,6 +123,8 @@ use table_name::{TableNameKey, TableNameManager, TableNameValue};
use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::datanode_table::RegionInfo;
use self::flow_task::flow_task_info::FlowTaskInfoValue;
use self::flow_task::flow_task_name::FlowTaskNameValue;
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
@@ -103,7 +144,6 @@ pub const MAINTENANCE_KEY: &str = "maintenance";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
@@ -119,9 +159,14 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
/// The id of flow task.
pub type FlowTaskId = u32;
/// The partition of flow task.
pub type FlowTaskPartitionId = u32;
lazy_static! {
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9])/([0-9])$")).unwrap();
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
}
lazy_static! {
@@ -199,6 +244,7 @@ pub struct TableMetadataManager {
kv_backend: KvBackendRef,
}
#[macro_export]
macro_rules! ensure_values {
($got:expr, $expected_value:expr, $name:expr) => {
ensure!(
@@ -1007,7 +1053,9 @@ macro_rules! impl_optional_meta_value {
impl_table_meta_value! {
TableNameValue,
TableInfoValue,
DatanodeTableValue
DatanodeTableValue,
FlowTaskInfoValue,
FlowTaskNameValue
}
impl_optional_meta_value! {

View File

@@ -0,0 +1,407 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod flow_task_info;
pub(crate) mod flow_task_name;
pub(crate) mod flownode_task;
pub(crate) mod table_task;
use std::ops::Deref;
use common_telemetry::info;
use snafu::{ensure, OptionExt};
use self::flow_task_info::FlowTaskInfoValue;
use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow_task::flow_task_info::FlowTaskInfoManager;
use crate::key::flow_task::flow_task_name::FlowTaskNameManager;
use crate::key::flow_task::flownode_task::FlownodeTaskManager;
use crate::key::flow_task::table_task::TableTaskManager;
use crate::key::scope::MetaKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::FlowTaskId;
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
/// The key of `__flow_task/` scope.
#[derive(Debug, PartialEq)]
pub struct FlowTaskScoped<T> {
inner: T,
}
impl<T> Deref for FlowTaskScoped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> FlowTaskScoped<T> {
const PREFIX: &'static str = "__flow_task/";
/// Returns a new [FlowTaskScoped] key.
pub fn new(inner: T) -> FlowTaskScoped<T> {
Self { inner }
}
}
impl<T: MetaKey<T>> MetaKey<FlowTaskScoped<T>> for FlowTaskScoped<T> {
fn to_bytes(&self) -> Vec<u8> {
let prefix = FlowTaskScoped::<T>::PREFIX.as_bytes();
let inner = self.inner.to_bytes();
let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
bytes.extend(prefix);
bytes.extend(inner);
bytes
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskScoped<T>> {
let prefix = FlowTaskScoped::<T>::PREFIX.as_bytes();
ensure!(
bytes.starts_with(prefix),
error::MismatchPrefixSnafu {
prefix: String::from_utf8_lossy(prefix),
key: String::from_utf8_lossy(bytes),
}
);
let inner = T::from_bytes(&bytes[prefix.len()..])?;
Ok(FlowTaskScoped { inner })
}
}
/// The manager of metadata, provides ability to:
/// - Create metadata of the task.
/// - Retrieve metadata of the task.
/// - Delete metadata of the task.
pub struct FlowTaskMetadataManager {
flow_task_manager: FlowTaskInfoManager,
flownode_task_manager: FlownodeTaskManager,
table_task_manager: TableTaskManager,
flow_task_name_manager: FlowTaskNameManager,
kv_backend: KvBackendRef,
}
impl FlowTaskMetadataManager {
/// Returns a new [FlowTaskMetadataManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
flow_task_manager: FlowTaskInfoManager::new(kv_backend.clone()),
flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()),
flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()),
table_task_manager: TableTaskManager::new(kv_backend.clone()),
kv_backend,
}
}
/// Returns the [FlowTaskManager].
pub fn flow_task_manager(&self) -> &FlowTaskInfoManager {
&self.flow_task_manager
}
/// Returns the [FlownodeTaskManager].
pub fn flownode_task_manager(&self) -> &FlownodeTaskManager {
&self.flownode_task_manager
}
/// Returns the [TableTaskManager].
pub fn table_task_manager(&self) -> &TableTaskManager {
&self.table_task_manager
}
/// Creates metadata for task and returns an error if different metadata exists.
pub async fn create_flow_task_metadata(
&self,
flow_task_id: FlowTaskId,
flow_task_value: FlowTaskInfoValue,
) -> Result<()> {
let (create_flow_task_name_txn, on_create_flow_task_name_failure) =
self.flow_task_name_manager.build_create_txn(
&flow_task_value.catalog_name,
&flow_task_value.task_name,
flow_task_id,
)?;
let (create_flow_task_txn, on_create_flow_task_failure) =
self.flow_task_manager.build_create_txn(
&flow_task_value.catalog_name,
flow_task_id,
&flow_task_value,
)?;
let create_flownode_task_txn = self.flownode_task_manager.build_create_txn(
&flow_task_value.catalog_name,
flow_task_id,
flow_task_value.flownode_ids().clone(),
);
let create_table_task_txn = self.table_task_manager.build_create_txn(
&flow_task_value.catalog_name,
flow_task_id,
flow_task_value.flownode_ids().clone(),
flow_task_value.source_table_ids(),
);
let txn = Txn::merge_all(vec![
create_flow_task_name_txn,
create_flow_task_txn,
create_flownode_task_txn,
create_table_task_txn,
]);
info!(
"Creating flow task {}.{}({}), with {} txn operations",
flow_task_value.catalog_name,
flow_task_value.task_name,
flow_task_id,
txn.max_operations()
);
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_task_name = on_create_flow_task_name_failure(&mut set)?
.with_context(||error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow task name during the creating flow task, flow_task_id: {flow_task_id}"
),
})?;
if remote_flow_task_name.flow_task_id() != flow_task_id {
info!(
"Trying to create flow task {}.{}({}), but flow task({}) already exists",
flow_task_value.catalog_name,
flow_task_value.task_name,
flow_task_id,
remote_flow_task_name.flow_task_id()
);
return error::TaskAlreadyExistsSnafu {
task_name: format!(
"{}.{}",
flow_task_value.catalog_name, flow_task_value.task_name
),
flow_task_id,
}
.fail();
}
let remote_flow_task = on_create_flow_task_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow task during the creating flow task, flow_task_id: {flow_task_id}"
),
}
})?;
let op_name = "creating flow task";
ensure_values!(*remote_flow_task, flow_task_value, op_name);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use futures::TryStreamExt;
use super::*;
use crate::key::flow_task::table_task::TableTaskKey;
use crate::key::scope::CatalogScoped;
use crate::kv_backend::memory::MemoryKvBackend;
#[derive(Debug)]
struct MockKey {
inner: Vec<u8>,
}
impl MetaKey<MockKey> for MockKey {
fn to_bytes(&self) -> Vec<u8> {
self.inner.clone()
}
fn from_bytes(bytes: &[u8]) -> Result<MockKey> {
Ok(MockKey {
inner: bytes.to_vec(),
})
}
}
#[test]
fn test_flow_scoped_to_bytes() {
let key = FlowTaskScoped::new(CatalogScoped::new(
"my_catalog".to_string(),
MockKey {
inner: b"hi".to_vec(),
},
));
assert_eq!(b"__flow_task/my_catalog/hi".to_vec(), key.to_bytes());
}
#[test]
fn test_flow_scoped_from_bytes() {
let bytes = b"__flow_task/my_catalog/hi";
let key = FlowTaskScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.inner.inner, b"hi".to_vec());
}
#[test]
fn test_flow_scoped_from_bytes_mismatch() {
let bytes = b"__table/my_catalog/hi";
let err = FlowTaskScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap_err();
assert_matches!(err, error::Error::MismatchPrefix { .. });
}
#[tokio::test]
async fn test_create_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone());
let task_id = 10;
let catalog_name = "greptime";
let flow_task_value = FlowTaskInfoValue {
catalog_name: catalog_name.to_string(),
task_name: "task".to_string(),
source_tables: vec![1024, 1025, 1026],
sink_table: 2049,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.await
.unwrap();
// Creates again.
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.await
.unwrap();
let got = flow_metadata_manager
.flow_task_manager()
.get(catalog_name, task_id)
.await
.unwrap()
.unwrap();
assert_eq!(got, flow_task_value);
let tasks = flow_metadata_manager
.flownode_task_manager()
.tasks(catalog_name, 1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(tasks, vec![(task_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_task_manager()
.nodes(catalog_name, table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
nodes,
vec![TableTaskKey::new(
catalog_name.to_string(),
table_id,
1,
task_id,
0
)]
);
}
}
#[tokio::test]
async fn test_create_table_metadata_task_exists_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv);
let task_id = 10;
let flow_task_value = FlowTaskInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
source_tables: vec![1024, 1025, 1026],
sink_table: 2049,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.await
.unwrap();
// Creates again.
let flow_task_value = FlowTaskInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
source_tables: vec![1024, 1025, 1026],
sink_table: 2049,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
let err = flow_metadata_manager
.create_flow_task_metadata(task_id + 1, flow_task_value)
.await
.unwrap_err();
assert_matches!(err, error::Error::TaskAlreadyExists { .. });
}
#[tokio::test]
async fn test_create_table_metadata_unexpected_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv);
let task_id = 10;
let flow_task_value = FlowTaskInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
source_tables: vec![1024, 1025, 1026],
sink_table: 2049,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value.clone())
.await
.unwrap();
// Creates again.
let flow_task_value = FlowTaskInfoValue {
catalog_name: "greptime".to_string(),
task_name: "task".to_string(),
source_tables: vec![1024, 1025, 1026],
sink_table: 2048,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
let err = flow_metadata_manager
.create_flow_task_metadata(task_id, flow_task_value)
.await
.unwrap_err();
assert!(err.to_string().contains("Reads the different value"));
}
}

View File

@@ -0,0 +1,221 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, FlowTaskPartitionId, TableMetaValue,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;
const FLOW_TASK_INFO_KEY_PREFIX: &str = "info";
lazy_static! {
static ref FLOW_TASK_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_TASK_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}
/// The key stores the metadata of the task.
///
/// The layout: `__flow_task/{catalog}/info/{flow_task_id}`.
pub struct FlowTaskInfoKey(FlowTaskScoped<CatalogScoped<FlowTaskInfoKeyInner>>);
impl MetaKey<FlowTaskInfoKey> for FlowTaskInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskInfoKey> {
Ok(FlowTaskInfoKey(FlowTaskScoped::<
CatalogScoped<FlowTaskInfoKeyInner>,
>::from_bytes(bytes)?))
}
}
impl FlowTaskInfoKey {
/// Returns the [FlowTaskInfoKey].
pub fn new(catalog: String, flow_task_id: FlowTaskId) -> FlowTaskInfoKey {
let inner = FlowTaskInfoKeyInner::new(flow_task_id);
FlowTaskInfoKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the [FlowTaskId].
pub fn flow_task_id(&self) -> FlowTaskId {
self.0.flow_task_id
}
}
/// The key of flow task metadata.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowTaskInfoKeyInner {
flow_task_id: FlowTaskId,
}
impl FlowTaskInfoKeyInner {
/// Returns a [FlowTaskInfoKey] with the specified `flow_task_id`.
pub fn new(flow_task_id: FlowTaskId) -> FlowTaskInfoKeyInner {
FlowTaskInfoKeyInner { flow_task_id }
}
}
impl MetaKey<FlowTaskInfoKeyInner> for FlowTaskInfoKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_TASK_INFO_KEY_PREFIX}/{}", self.flow_task_id).into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskInfoKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlowTaskInfoKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_TASK_INFO_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlowTaskInfoKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_task_id = captures[1].parse::<FlowTaskId>().unwrap();
Ok(FlowTaskInfoKeyInner { flow_task_id })
}
}
// The metadata of the flow task.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowTaskInfoValue {
/// The source tables used by the task.
pub(crate) source_tables: Vec<TableId>,
/// The sink table used by the task.
pub(crate) sink_table: TableId,
/// Which flow nodes this task is running on.
pub(crate) flownode_ids: BTreeMap<FlowTaskPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
/// The task name.
pub(crate) task_name: String,
/// The raw sql.
pub(crate) raw_sql: String,
/// The expr of expire.
pub(crate) expire_when: String,
/// The comment.
pub(crate) comment: String,
/// The options.
pub(crate) options: HashMap<String, String>,
}
impl FlowTaskInfoValue {
/// Returns the `flownode_id`.
pub fn flownode_ids(&self) -> &BTreeMap<FlowTaskPartitionId, FlownodeId> {
&self.flownode_ids
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_tables
}
}
/// The manager of [FlowTaskInfoKey].
pub struct FlowTaskInfoManager {
kv_backend: KvBackendRef,
}
impl FlowTaskInfoManager {
/// Returns a new [FlowTaskInfoManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Returns the [FlowTaskInfoValue] of specified `flow_task_id`.
pub async fn get(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
) -> Result<Option<FlowTaskInfoValue>> {
let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes();
self.kv_backend
.get(&key)
.await?
.map(|x| FlowTaskInfoValue::try_from_raw_value(&x.value))
.transpose()
}
/// Builds a create flow task transaction.
/// It is expected that the `__flow_task/{catalog}/info/{flow_task_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub(crate) fn build_create_txn(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
flow_task_value: &FlowTaskInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowTaskInfoValue>>>,
)> {
let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes();
let txn =
txn_helper::build_put_if_absent_txn(key.clone(), flow_task_value.try_as_raw_value()?);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let flow_task = FlowTaskInfoKey::new("my_catalog".to_string(), 2);
assert_eq!(
b"__flow_task/my_catalog/info/2".to_vec(),
flow_task.to_bytes()
);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/info/2".to_vec();
let key = FlowTaskInfoKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_task_id(), 2);
}
}

View File

@@ -0,0 +1,201 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, TableMetaValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
const FLOW_TASK_NAME_KEY_PREFIX: &str = "name";
lazy_static! {
static ref FLOW_TASK_NAME_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_TASK_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap();
}
/// The key of mapping {task_name} to [FlowTaskId].
///
/// The layout: `__flow_task/{catalog}/name/{task_name}`.
pub struct FlowTaskNameKey(FlowTaskScoped<CatalogScoped<FlowTaskNameKeyInner>>);
impl FlowTaskNameKey {
/// Returns the [FlowTaskNameKey]
pub fn new(catalog: String, task_name: String) -> FlowTaskNameKey {
let inner = FlowTaskNameKeyInner::new(task_name);
FlowTaskNameKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Return the `task_name`
pub fn task_name(&self) -> &str {
&self.0.task_name
}
}
impl MetaKey<FlowTaskNameKey> for FlowTaskNameKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskNameKey> {
Ok(FlowTaskNameKey(FlowTaskScoped::<
CatalogScoped<FlowTaskNameKeyInner>,
>::from_bytes(bytes)?))
}
}
/// The key of mapping name to [FlowTaskId]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowTaskNameKeyInner {
pub task_name: String,
}
impl MetaKey<FlowTaskNameKeyInner> for FlowTaskNameKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_TASK_NAME_KEY_PREFIX}/{}", self.task_name).into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowTaskNameKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlowTaskNameKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_TASK_NAME_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlowTaskNameKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let task = captures[1].to_string();
Ok(FlowTaskNameKeyInner { task_name: task })
}
}
impl FlowTaskNameKeyInner {
/// Returns a [FlowTaskNameKeyInner].
pub fn new(task: String) -> Self {
Self { task_name: task }
}
}
/// The value of [FlowTaskNameKey].
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowTaskNameValue {
flow_task_id: FlowTaskId,
}
impl FlowTaskNameValue {
/// Returns a [FlowTaskNameValue] with specified [FlowTaskId].
pub fn new(flow_task_id: FlowTaskId) -> Self {
Self { flow_task_id }
}
/// Returns the [FlowTaskId]
pub fn flow_task_id(&self) -> FlowTaskId {
self.flow_task_id
}
}
/// The manager of [FlowTaskNameKey].
pub struct FlowTaskNameManager {
kv_backend: KvBackendRef,
}
impl FlowTaskNameManager {
/// Returns a new [FlowTaskNameManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Returns the [FlowTaskNameValue] of specified `catalog.task`.
pub async fn get(&self, catalog: &str, task: &str) -> Result<Option<FlowTaskNameValue>> {
let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string());
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| FlowTaskNameValue::try_from_raw_value(&x.value))
.transpose()
}
/// Builds a create flow task name transaction.
/// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub fn build_create_txn(
&self,
catalog: &str,
name: &str,
flow_task_id: FlowTaskId,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowTaskNameValue>>>,
)> {
let key = FlowTaskNameKey::new(catalog.to_string(), name.to_string());
let raw_key = key.to_bytes();
let flow_task_name_value = FlowTaskNameValue::new(flow_task_id);
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
flow_task_name_value.try_as_raw_value()?,
);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let table_task_key = FlowTaskNameKey::new("my_catalog".to_string(), "my_task".to_string());
assert_eq!(
b"__flow_task/my_catalog/name/my_task".to_vec(),
table_task_key.to_bytes(),
);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/name/my_task".to_vec();
let key = FlowTaskNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.task_name(), "my_task");
}
}

View File

@@ -0,0 +1,259 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowTaskId, FlowTaskPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::FlownodeId;
lazy_static! {
static ref FLOWNODE_TASK_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOWNODE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
const FLOWNODE_TASK_KEY_PREFIX: &str = "flownode";
/// The key of mapping [FlownodeId] to [FlowTaskId].
///
/// The layout `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}`
pub struct FlownodeTaskKey(FlowTaskScoped<CatalogScoped<FlownodeTaskKeyInner>>);
impl MetaKey<FlownodeTaskKey> for FlownodeTaskKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeTaskKey> {
Ok(FlownodeTaskKey(FlowTaskScoped::<
CatalogScoped<FlownodeTaskKeyInner>,
>::from_bytes(bytes)?))
}
}
impl FlownodeTaskKey {
/// Returns a new [FlownodeTaskKey].
pub fn new(
catalog: String,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> FlownodeTaskKey {
let inner = FlownodeTaskKeyInner::new(flownode_id, flow_task_id, partition_id);
FlownodeTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`.
pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(FlownodeTaskKeyInner::range_start_key(flownode_id).into_bytes()),
);
FlowTaskScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the [FlowTaskId].
pub fn flow_task_id(&self) -> FlowTaskId {
self.0.flow_task_id
}
/// Returns the [FlownodeId].
pub fn flownode_id(&self) -> FlownodeId {
self.0.flownode_id
}
/// Returns the [PartitionId].
pub fn partition_id(&self) -> FlowTaskPartitionId {
self.0.partition_id
}
}
/// The key of mapping [FlownodeId] to [FlowTaskId].
pub struct FlownodeTaskKeyInner {
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
}
impl FlownodeTaskKeyInner {
/// Returns a [FlownodeTaskKey] with the specified `flownode_id`, `flow_task_id` and `partition_id`.
pub fn new(
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> Self {
Self {
flownode_id,
flow_task_id,
partition_id,
}
}
fn prefix(flownode_id: FlownodeId) -> String {
format!("{}/{flownode_id}", FLOWNODE_TASK_KEY_PREFIX)
}
/// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`.
fn range_start_key(flownode_id: FlownodeId) -> String {
format!("{}/", Self::prefix(flownode_id))
}
}
impl MetaKey<FlownodeTaskKeyInner> for FlownodeTaskKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!(
"{FLOWNODE_TASK_KEY_PREFIX}/{}/{}/{}",
self.flownode_id, self.flow_task_id, self.partition_id,
)
.into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeTaskKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"FlownodeTaskKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOWNODE_TASK_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid FlownodeTaskKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
let flow_task_id = captures[2].parse::<FlowTaskId>().unwrap();
let partition_id = captures[3].parse::<FlowTaskPartitionId>().unwrap();
Ok(FlownodeTaskKeyInner {
flownode_id,
flow_task_id,
partition_id,
})
}
}
/// The manager of [FlownodeTaskKey].
pub struct FlownodeTaskManager {
kv_backend: KvBackendRef,
}
/// Decodes `KeyValue` to [FlownodeTaskKey].
pub fn flownode_task_key_decoder(kv: KeyValue) -> Result<FlownodeTaskKey> {
FlownodeTaskKey::from_bytes(&kv.key)
}
impl FlownodeTaskManager {
/// Returns a new [FlownodeTaskManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Retrieves all [FlowTaskId] and [PartitionId]s of the specified `flownode_id`.
pub fn tasks(
&self,
catalog: &str,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowTaskId, FlowTaskPartitionId)>> {
let start_key = FlownodeTaskKey::range_start_key(catalog.to_string(), flownode_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flownode_task_key_decoder),
);
Box::pin(stream.map_ok(|key| (key.flow_task_id(), key.partition_id())))
}
/// Builds a create flownode task transaction.
///
/// Puts `__flownode_task/{flownode_id}/{flow_task_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowTaskPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
flownode_ids: I,
) -> Txn {
let txns = flownode_ids
.into_iter()
.map(|(partition_id, flownode_id)| {
let key = FlownodeTaskKey::new(
catalog.to_string(),
flownode_id,
flow_task_id,
partition_id,
)
.to_bytes();
TxnOp::Put(key, vec![])
})
.collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]
mod tests {
use crate::key::flow_task::flownode_task::FlownodeTaskKey;
use crate::key::scope::MetaKey;
#[test]
fn test_key_serialization() {
let flownode_task = FlownodeTaskKey::new("my_catalog".to_string(), 1, 2, 0);
assert_eq!(
b"__flow_task/my_catalog/flownode/1/2/0".to_vec(),
flownode_task.to_bytes()
);
let prefix = FlownodeTaskKey::range_start_key("my_catalog".to_string(), 1);
assert_eq!(b"__flow_task/my_catalog/flownode/1/".to_vec(), prefix);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/flownode/1/2/0".to_vec();
let key = FlownodeTaskKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_task_id(), 2);
assert_eq!(key.partition_id(), 0);
}
}

View File

@@ -0,0 +1,279 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowTaskId, FlowTaskPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::FlownodeId;
const TABLE_TASK_KEY_PREFIX: &str = "source_table";
lazy_static! {
static ref TABLE_TASK_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TableTaskKeyInner {
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
}
/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId].
///
/// The layout: `__flow_task/{catalog}/table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}`.
#[derive(Debug, PartialEq)]
pub struct TableTaskKey(FlowTaskScoped<CatalogScoped<TableTaskKeyInner>>);
impl MetaKey<TableTaskKey> for TableTaskKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<TableTaskKey> {
Ok(TableTaskKey(FlowTaskScoped::<
CatalogScoped<TableTaskKeyInner>,
>::from_bytes(bytes)?))
}
}
impl TableTaskKey {
/// Returns a new [TableTaskKey].
pub fn new(
catalog: String,
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> TableTaskKey {
let inner = TableTaskKeyInner::new(table_id, flownode_id, flow_task_id, partition_id);
TableTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
}
/// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`.
pub fn range_start_key(catalog: String, table_id: TableId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(TableTaskKeyInner::range_start_key(table_id).into_bytes()),
);
FlowTaskScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
}
/// Returns the source [TableId].
pub fn source_table_id(&self) -> TableId {
self.0.table_id
}
/// Returns the [FlowTaskId].
pub fn flow_task_id(&self) -> FlowTaskId {
self.0.flow_task_id
}
/// Returns the [FlownodeId].
pub fn flownode_id(&self) -> FlownodeId {
self.0.flownode_id
}
/// Returns the [PartitionId].
pub fn partition_id(&self) -> FlowTaskPartitionId {
self.0.partition_id
}
}
impl TableTaskKeyInner {
/// Returns a new [TableTaskKey].
fn new(
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: FlowTaskPartitionId,
) -> TableTaskKeyInner {
Self {
table_id,
flownode_id,
flow_task_id,
partition_id,
}
}
fn prefix(table_id: TableId) -> String {
format!("{}/{table_id}", TABLE_TASK_KEY_PREFIX)
}
/// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`.
fn range_start_key(table_id: TableId) -> String {
format!("{}/", Self::prefix(table_id))
}
}
impl MetaKey<TableTaskKeyInner> for TableTaskKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!(
"{TABLE_TASK_KEY_PREFIX}/{}/{}/{}/{}",
self.table_id, self.flownode_id, self.flow_task_id, self.partition_id
)
.into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<TableTaskKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidTableMetadataSnafu {
err_msg: format!(
"TableTaskKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
TABLE_TASK_KEY_PATTERN
.captures(key)
.context(error::InvalidTableMetadataSnafu {
err_msg: format!("Invalid TableTaskKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let table_id = captures[1].parse::<TableId>().unwrap();
let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
let flow_task_id = captures[3].parse::<FlowTaskId>().unwrap();
let partition_id = captures[4].parse::<FlowTaskPartitionId>().unwrap();
Ok(TableTaskKeyInner::new(
table_id,
flownode_id,
flow_task_id,
partition_id,
))
}
}
/// Decodes `KeyValue` to [TableTaskKey].
pub fn table_task_decoder(kv: KeyValue) -> Result<TableTaskKey> {
TableTaskKey::from_bytes(&kv.key)
}
/// The manager of [TableTaskKey].
pub struct TableTaskManager {
kv_backend: KvBackendRef,
}
impl TableTaskManager {
/// Returns a new [TableTaskManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Retrieves all [TableTaskKey]s of the specified `table_id`.
pub fn nodes(
&self,
catalog: &str,
table_id: TableId,
) -> BoxStream<'static, Result<TableTaskKey>> {
let start_key = TableTaskKey::range_start_key(catalog.to_string(), table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_task_decoder),
);
Box::pin(stream)
}
/// Builds a create table task transaction.
///
/// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowTaskPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
flownode_ids: I,
source_table_ids: &[TableId],
) -> Txn {
let txns = flownode_ids
.into_iter()
.flat_map(|(partition_id, flownode_id)| {
source_table_ids.iter().map(move |table_id| {
TxnOp::Put(
TableTaskKey::new(
catalog.to_string(),
*table_id,
flownode_id,
flow_task_id,
partition_id,
)
.to_bytes(),
vec![],
)
})
})
.collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let table_task_key = TableTaskKey::new("my_catalog".to_string(), 1024, 1, 2, 0);
assert_eq!(
b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec(),
table_task_key.to_bytes(),
);
let prefix = TableTaskKey::range_start_key("my_catalog".to_string(), 1024);
assert_eq!(
b"__flow_task/my_catalog/source_table/1024/".to_vec(),
prefix
);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec();
let key = TableTaskKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.source_table_id(), 1024);
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_task_id(), 2);
assert_eq!(key.partition_id(), 0);
}
}

View File

@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use snafu::OptionExt;
use crate::error::{self, Result};
/// The delimiter of key.
pub(crate) const DELIMITER: u8 = b'/';
/// The key of metadata.
pub trait MetaKey<T> {
fn to_bytes(&self) -> Vec<u8>;
fn from_bytes(bytes: &[u8]) -> Result<T>;
}
/// The key of `{catalog}/` scope.
#[derive(Debug, PartialEq)]
pub struct CatalogScoped<T> {
inner: T,
catalog: String,
}
impl<T> Deref for CatalogScoped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> CatalogScoped<T> {
/// Returns a new [CatalogScoped] key.
pub fn new(catalog: String, inner: T) -> CatalogScoped<T> {
CatalogScoped { inner, catalog }
}
/// Returns the `catalog`.
pub fn catalog(&self) -> &str {
&self.catalog
}
}
impl<T: MetaKey<T>> MetaKey<CatalogScoped<T>> for CatalogScoped<T> {
fn to_bytes(&self) -> Vec<u8> {
let prefix = self.catalog.as_bytes();
let inner = self.inner.to_bytes();
let mut bytes = Vec::with_capacity(prefix.len() + inner.len() + 1);
bytes.extend(prefix);
bytes.push(DELIMITER);
bytes.extend(inner);
bytes
}
fn from_bytes(bytes: &[u8]) -> Result<CatalogScoped<T>> {
let pos = bytes
.iter()
.position(|c| *c == DELIMITER)
.with_context(|| error::DelimiterNotFoundSnafu {
key: String::from_utf8_lossy(bytes),
})?;
let catalog = String::from_utf8_lossy(&bytes[0..pos]).to_string();
// Safety: We don't need the `DELIMITER` char.
let inner = T::from_bytes(&bytes[pos + 1..])?;
Ok(CatalogScoped { inner, catalog })
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BytesAdapter(Vec<u8>);
impl From<Vec<u8>> for BytesAdapter {
fn from(value: Vec<u8>) -> Self {
Self(value)
}
}
impl MetaKey<BytesAdapter> for BytesAdapter {
fn to_bytes(&self) -> Vec<u8> {
self.0.clone()
}
fn from_bytes(bytes: &[u8]) -> Result<BytesAdapter> {
Ok(BytesAdapter(bytes.to_vec()))
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::error::Result;
#[derive(Debug)]
struct MockKey {
inner: Vec<u8>,
}
impl MetaKey<MockKey> for MockKey {
fn to_bytes(&self) -> Vec<u8> {
self.inner.clone()
}
fn from_bytes(bytes: &[u8]) -> Result<MockKey> {
Ok(MockKey {
inner: bytes.to_vec(),
})
}
}
#[test]
fn test_catalog_scoped_from_bytes() {
let key = "test_catalog_name/key";
let scoped_key = CatalogScoped::<MockKey>::from_bytes(key.as_bytes()).unwrap();
assert_eq!(scoped_key.catalog, "test_catalog_name");
assert_eq!(scoped_key.inner.inner, b"key".to_vec());
assert_eq!(key.as_bytes(), &scoped_key.to_bytes());
}
#[test]
fn test_catalog_scoped_from_bytes_delimiter_not_found() {
let key = "test_catalog_name";
let err = CatalogScoped::<MockKey>::from_bytes(key.as_bytes()).unwrap_err();
assert_matches!(err, error::Error::DelimiterNotFound { .. });
}
#[test]
fn test_catalog_scoped_to_bytes() {
let scoped_key = CatalogScoped {
inner: MockKey {
inner: b"hi".to_vec(),
},
catalog: "test_catalog".to_string(),
};
assert_eq!(b"test_catalog/hi".to_vec(), scoped_key.to_bytes());
}
}

View File

@@ -31,12 +31,6 @@ pub(crate) struct TombstoneManager {
const TOMBSTONE_PREFIX: &str = "__tombstone/";
pub(crate) struct TombstoneKeyValue {
pub(crate) origin_key: Vec<u8>,
pub(crate) tombstone_key: Vec<u8>,
pub(crate) value: Vec<u8>,
}
fn to_tombstone(key: &[u8]) -> Vec<u8> {
[TOMBSTONE_PREFIX.as_bytes(), key].concat()
}

View File

@@ -43,7 +43,11 @@ pub mod test_util;
pub mod util;
pub mod wal_options_allocator;
// The id of the cluster.
pub type ClusterId = u64;
// The id of the datanode.
pub type DatanodeId = u64;
// The id of the flownode.
pub type FlownodeId = u64;
pub use instruction::RegionIdent;