refactor: remove catalog prefix (#3835)

* refactor: remove catalog prefix

* refactor: remove scope.rs

* fix: fix tests

* chore: update comments

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-04-30 11:27:33 +08:00
committed by GitHub
parent d11b1fa389
commit 20cbc039e6
8 changed files with 148 additions and 325 deletions

View File

@@ -36,16 +36,16 @@
//! - The value is a [TableNameValue] struct; it contains the table id.
//! - Used in the table name to table id lookup.
//!
//! 6. Flow info key: `__flow/{catalog}/info/{flow_id}`
//! 6. Flow info key: `__flow/info/{flow_id}`
//! - Stores metadata of the flow.
//!
//! 7. Flow name key: `__flow/{catalog}/name/{flow_name}`
//! 7. Flow name key: `__flow/name/{catalog}/{flow_name}`
//! - Mapping {catalog}/{flow_name} to {flow_id}
//!
//! 8. Flownode flow key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! 8. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping {flownode_id} to {flow_id}
//!
//! 9. Table flow key: `__table_flow/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! 9. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
@@ -60,12 +60,12 @@
//! The whole picture of flow keys will be like this:
//!
//! __flow/
//! {catalog}/
//! info/
//! {tsak_id}
//! info/
//! {flow_id}
//!
//! name/
//! {flow_name}
//! {catalog_name}
//! {flow_name}
//!
//! flownode/
//! {flownode_id}/
@@ -84,7 +84,6 @@ pub mod datanode_table;
#[allow(unused)]
pub mod flow;
pub mod schema_name;
pub mod scope;
pub mod table_info;
pub mod table_name;
// TODO(weny): removes it.
@@ -194,6 +193,32 @@ pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}
/// The key of metadata.
pub trait MetaKey<T> {
fn to_bytes(&self) -> Vec<u8>;
fn from_bytes(bytes: &[u8]) -> Result<T>;
}
#[derive(Debug, Clone, PartialEq)]
pub struct BytesAdapter(Vec<u8>);
impl From<Vec<u8>> for BytesAdapter {
fn from(value: Vec<u8>) -> Self {
Self(value)
}
}
impl MetaKey<BytesAdapter> for BytesAdapter {
fn to_bytes(&self) -> Vec<u8> {
self.0.clone()
}
fn from_bytes(bytes: &[u8]) -> Result<BytesAdapter> {
Ok(BytesAdapter(bytes.to_vec()))
}
}
pub(crate) trait TableMetaKeyGetTxnOp {
fn build_get_op(
&self,

View File

@@ -82,7 +82,7 @@ impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> {
}
}
/// Decoder `KeyValue` to ({catalog},())
/// Decoder `KeyValue` to {catalog}
pub fn catalog_decoder(kv: KeyValue) -> Result<String> {
let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
let catalog_name = CatalogNameKey::try_from(str)?;

View File

@@ -30,9 +30,8 @@ use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::scope::MetaKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::FlowId;
use crate::key::{FlowId, MetaKey};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
@@ -139,20 +138,15 @@ impl FlowMetadataManager {
.flow_name_manager
.build_create_txn(&flow_value.catalog_name, &flow_value.flow_name, flow_id)?;
let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
&flow_value,
)?;
let (create_flow_txn, on_create_flow_failure) = self
.flow_info_manager
.build_create_txn(flow_id, &flow_value)?;
let create_flownode_flow_txn = self.flownode_flow_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
flow_value.flownode_ids().clone(),
);
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, flow_value.flownode_ids().clone());
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
flow_value.flownode_ids().clone(),
flow_value.source_table_ids(),
@@ -222,7 +216,6 @@ mod tests {
use super::*;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::scope::CatalogScoped;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::table_name::TableName;
@@ -245,27 +238,23 @@ mod tests {
#[test]
fn test_flow_scoped_to_bytes() {
let key = FlowScoped::new(CatalogScoped::new(
"my_catalog".to_string(),
MockKey {
inner: b"hi".to_vec(),
},
));
assert_eq!(b"__flow/my_catalog/hi".to_vec(), key.to_bytes());
let key = FlowScoped::new(MockKey {
inner: b"hi".to_vec(),
});
assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
}
#[test]
fn test_flow_scoped_from_bytes() {
let bytes = b"__flow/my_catalog/hi";
let key = FlowScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
let bytes = b"__flow/hi";
let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
assert_eq!(key.inner.inner, b"hi".to_vec());
}
#[test]
fn test_flow_scoped_from_bytes_mismatch() {
let bytes = b"__table/my_catalog/hi";
let err = FlowScoped::<CatalogScoped<MockKey>>::from_bytes(bytes).unwrap_err();
let bytes = b"__table/hi";
let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
assert_matches!(err, error::Error::MismatchPrefix { .. });
}
@@ -302,14 +291,14 @@ mod tests {
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(catalog_name, flow_id)
.get(flow_id)
.await
.unwrap()
.unwrap();
assert_eq!(got, flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(catalog_name, 1)
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
@@ -317,20 +306,11 @@ mod tests {
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.nodes(catalog_name, table_id)
.nodes(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
nodes,
vec![TableFlowKey::new(
catalog_name.to_string(),
table_id,
1,
flow_id,
0
)]
);
assert_eq!(nodes, vec![TableFlowKey::new(table_id, 1, flow_id, 0)]);
}
}

View File

@@ -22,9 +22,10 @@ use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, TableMetaValue};
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::table_name::TableName;
@@ -39,8 +40,8 @@ lazy_static! {
/// The key stores the metadata of the flow.
///
/// The layout: `__flow/{catalog}/info/{flow_id}`.
pub struct FlowInfoKey(FlowScoped<CatalogScoped<FlowInfoKeyInner>>);
/// The layout: `__flow/info/{flow_id}`.
pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
impl MetaKey<FlowInfoKey> for FlowInfoKey {
fn to_bytes(&self) -> Vec<u8> {
@@ -48,22 +49,17 @@ impl MetaKey<FlowInfoKey> for FlowInfoKey {
}
fn from_bytes(bytes: &[u8]) -> Result<FlowInfoKey> {
Ok(FlowInfoKey(
FlowScoped::<CatalogScoped<FlowInfoKeyInner>>::from_bytes(bytes)?,
))
Ok(FlowInfoKey(FlowScoped::<FlowInfoKeyInner>::from_bytes(
bytes,
)?))
}
}
impl FlowInfoKey {
/// Returns the [FlowInfoKey].
pub fn new(catalog: String, flow_id: FlowId) -> FlowInfoKey {
pub fn new(flow_id: FlowId) -> FlowInfoKey {
let inner = FlowInfoKeyInner::new(flow_id);
FlowInfoKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
FlowInfoKey(FlowScoped::new(inner))
}
/// Returns the [FlowId].
@@ -159,8 +155,8 @@ impl FlowInfoManager {
}
/// Returns the [FlowInfoValue] of specified `flow_id`.
pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes();
pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
let key = FlowInfoKey::new(flow_id).to_bytes();
self.kv_backend
.get(&key)
.await?
@@ -169,11 +165,10 @@ impl FlowInfoManager {
}
/// Builds a create flow transaction.
/// It is expected that the `__flow/{catalog}/info/{flow_id}` wasn't occupied.
/// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub(crate) fn build_create_txn(
&self,
catalog: &str,
flow_id: FlowId,
flow_value: &FlowInfoValue,
) -> Result<(
@@ -182,7 +177,7 @@ impl FlowInfoManager {
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>,
)> {
let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes();
let key = FlowInfoKey::new(flow_id).to_bytes();
let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?);
Ok((
@@ -198,15 +193,14 @@ mod tests {
#[test]
fn test_key_serialization() {
let flow_info = FlowInfoKey::new("my_catalog".to_string(), 2);
assert_eq!(b"__flow/my_catalog/info/2".to_vec(), flow_info.to_bytes());
let flow_info = FlowInfoKey::new(2);
assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes());
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/info/2".to_vec();
let bytes = b"__flow/info/2".to_vec();
let key = FlowInfoKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_id(), 2);
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::flow_server::Flow;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -19,34 +20,37 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, TableMetaValue, NAME_PATTERN};
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
const FLOW_NAME_KEY_PREFIX: &str = "name";
lazy_static! {
static ref FLOW_NAME_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap();
static ref FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
))
.unwrap();
}
/// The key of mapping {flow_name} to [FlowId].
///
/// The layout: `__flow/{catalog}/name/{flow_name}`.
pub struct FlowNameKey(FlowScoped<CatalogScoped<FlowNameKeyInner>>);
/// The layout: `__flow/name/{catalog_name}/{flow_name}`.
pub struct FlowNameKey(FlowScoped<FlowNameKeyInner>);
impl FlowNameKey {
/// Returns the [FlowNameKey]
pub fn new(catalog: String, flow_name: String) -> FlowNameKey {
let inner = FlowNameKeyInner::new(flow_name);
FlowNameKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
let inner = FlowNameKeyInner::new(catalog, flow_name);
FlowNameKey(FlowScoped::new(inner))
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
&self.0.catalog_name
}
/// Return the `flow_name`
@@ -61,21 +65,26 @@ impl MetaKey<FlowNameKey> for FlowNameKey {
}
fn from_bytes(bytes: &[u8]) -> Result<FlowNameKey> {
Ok(FlowNameKey(
FlowScoped::<CatalogScoped<FlowNameKeyInner>>::from_bytes(bytes)?,
))
Ok(FlowNameKey(FlowScoped::<FlowNameKeyInner>::from_bytes(
bytes,
)?))
}
}
/// The key of mapping name to [FlowId]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowNameKeyInner {
pub catalog_name: String,
pub flow_name: String,
}
impl MetaKey<FlowNameKeyInner> for FlowNameKeyInner {
fn to_bytes(&self) -> Vec<u8> {
format!("{FLOW_NAME_KEY_PREFIX}/{}", self.flow_name).into_bytes()
format!(
"{FLOW_NAME_KEY_PREFIX}/{}/{}",
self.catalog_name, self.flow_name
)
.into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<FlowNameKeyInner> {
@@ -95,15 +104,22 @@ impl MetaKey<FlowNameKeyInner> for FlowNameKeyInner {
err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_name = captures[1].to_string();
Ok(FlowNameKeyInner { flow_name })
let catalog_name = captures[1].to_string();
let flow_name = captures[2].to_string();
Ok(FlowNameKeyInner {
catalog_name,
flow_name,
})
}
}
impl FlowNameKeyInner {
/// Returns a [FlowNameKeyInner].
pub fn new(flow_name: String) -> Self {
Self { flow_name }
pub fn new(catalog_name: String, flow_name: String) -> Self {
Self {
catalog_name,
flow_name,
}
}
}
@@ -155,12 +171,12 @@ impl FlowNameManager {
}
/// Builds a create flow name transaction.
/// It's expected that the `__flow/{catalog}/name/{flow_name}` wasn't occupied.
/// It's expected that the `__flow/name/{catalog}/{flow_name}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
pub fn build_create_txn(
&self,
catalog: &str,
name: &str,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
@@ -168,7 +184,7 @@ impl FlowNameManager {
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowNameValue>>>,
)> {
let key = FlowNameKey::new(catalog.to_string(), name.to_string());
let key = FlowNameKey::new(catalog_name.to_string(), flow_name.to_string());
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let txn = txn_helper::build_put_if_absent_txn(
@@ -190,12 +206,12 @@ mod tests {
#[test]
fn test_key_serialization() {
let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string());
assert_eq!(b"__flow/my_catalog/name/my_task".to_vec(), key.to_bytes(),);
assert_eq!(b"__flow/name/my_catalog/my_task".to_vec(), key.to_bytes(),);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/name/my_task".to_vec();
let bytes = b"__flow/name/my_catalog/my_task".to_vec();
let key = FlowNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "my_task");

View File

@@ -22,8 +22,7 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowId, FlowPartitionId};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
@@ -42,8 +41,8 @@ const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";
/// The key of mapping [FlownodeId] to [FlowId].
///
/// The layout `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
pub struct FlownodeFlowKey(FlowScoped<CatalogScoped<FlownodeFlowKeyInner>>);
/// The layout `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
pub struct FlownodeFlowKey(FlowScoped<FlownodeFlowKeyInner>);
impl MetaKey<FlownodeFlowKey> for FlownodeFlowKey {
fn to_bytes(&self) -> Vec<u8> {
@@ -51,37 +50,29 @@ impl MetaKey<FlownodeFlowKey> for FlownodeFlowKey {
}
fn from_bytes(bytes: &[u8]) -> Result<FlownodeFlowKey> {
Ok(FlownodeFlowKey(FlowScoped::<
CatalogScoped<FlownodeFlowKeyInner>,
>::from_bytes(bytes)?))
Ok(FlownodeFlowKey(
FlowScoped::<FlownodeFlowKeyInner>::from_bytes(bytes)?,
))
}
}
impl FlownodeFlowKey {
/// Returns a new [FlownodeFlowKey].
pub fn new(
catalog: String,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> FlownodeFlowKey {
let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
FlownodeFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
FlownodeFlowKey(FlowScoped::new(inner))
}
/// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes()),
);
pub fn range_start_key(flownode_id: FlownodeId) -> Vec<u8> {
let inner =
BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes());
FlowScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
FlowScoped::new(inner).to_bytes()
}
/// Returns the [FlowId].
@@ -184,10 +175,9 @@ impl FlownodeFlowManager {
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,
catalog: &str,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
let start_key = FlownodeFlowKey::range_start_key(catalog.to_string(), flownode_id);
let start_key = FlownodeFlowKey::range_start_key(flownode_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
@@ -205,16 +195,13 @@ impl FlownodeFlowManager {
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_id: FlowId,
flownode_ids: I,
) -> Txn {
let txns = flownode_ids
.into_iter()
.map(|(partition_id, flownode_id)| {
let key =
FlownodeFlowKey::new(catalog.to_string(), flownode_id, flow_id, partition_id)
.to_bytes();
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
TxnOp::Put(key, vec![])
})
.collect::<Vec<_>>();
@@ -226,24 +213,20 @@ impl FlownodeFlowManager {
#[cfg(test)]
mod tests {
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::scope::MetaKey;
use crate::key::MetaKey;
#[test]
fn test_key_serialization() {
let flownode_flow = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0);
assert_eq!(
b"__flow/my_catalog/flownode/1/2/0".to_vec(),
flownode_flow.to_bytes()
);
let prefix = FlownodeFlowKey::range_start_key("my_catalog".to_string(), 1);
assert_eq!(b"__flow/my_catalog/flownode/1/".to_vec(), prefix);
let flownode_flow = FlownodeFlowKey::new(1, 2, 0);
assert_eq!(b"__flow/flownode/1/2/0".to_vec(), flownode_flow.to_bytes());
let prefix = FlownodeFlowKey::range_start_key(1);
assert_eq!(b"__flow/flownode/1/".to_vec(), prefix);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/flownode/1/2/0".to_vec();
let bytes = b"__flow/flownode/1/2/0".to_vec();
let key = FlownodeFlowKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_id(), 2);
assert_eq!(key.partition_id(), 0);

View File

@@ -22,8 +22,7 @@ use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowId, FlowPartitionId};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
@@ -51,9 +50,9 @@ struct TableFlowKeyInner {
/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
///
/// The layout: `__flow/{catalog}/table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`.
/// The layout: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`.
#[derive(Debug, PartialEq)]
pub struct TableFlowKey(FlowScoped<CatalogScoped<TableFlowKeyInner>>);
pub struct TableFlowKey(FlowScoped<TableFlowKeyInner>);
impl MetaKey<TableFlowKey> for TableFlowKey {
fn to_bytes(&self) -> Vec<u8> {
@@ -61,38 +60,29 @@ impl MetaKey<TableFlowKey> for TableFlowKey {
}
fn from_bytes(bytes: &[u8]) -> Result<TableFlowKey> {
Ok(TableFlowKey(
FlowScoped::<CatalogScoped<TableFlowKeyInner>>::from_bytes(bytes)?,
))
Ok(TableFlowKey(FlowScoped::<TableFlowKeyInner>::from_bytes(
bytes,
)?))
}
}
impl TableFlowKey {
/// Returns a new [TableFlowKey].
pub fn new(
catalog: String,
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKey {
let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
TableFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
TableFlowKey(FlowScoped::new(inner))
}
/// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
pub fn range_start_key(catalog: String, table_id: TableId) -> Vec<u8> {
let catalog_scoped_key = CatalogScoped::new(
catalog,
BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes()),
);
pub fn range_start_key(table_id: TableId) -> Vec<u8> {
let inner = BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes());
FlowScoped::new(catalog_scoped_key).to_bytes()
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog()
FlowScoped::new(inner).to_bytes()
}
/// Returns the source [TableId].
@@ -198,12 +188,8 @@ impl TableFlowManager {
}
/// Retrieves all [TableFlowKey]s of the specified `table_id`.
pub fn nodes(
&self,
catalog: &str,
table_id: TableId,
) -> BoxStream<'static, Result<TableFlowKey>> {
let start_key = TableFlowKey::range_start_key(catalog.to_string(), table_id);
pub fn nodes(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
@@ -217,10 +203,9 @@ impl TableFlowManager {
/// Builds a create table flow transaction.
///
/// Puts `__table_flow/{table_id}/{node_id}/{partition_id}` keys.
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_id: FlowId,
flownode_ids: I,
source_table_ids: &[TableId],
@@ -230,14 +215,7 @@ impl TableFlowManager {
.flat_map(|(partition_id, flownode_id)| {
source_table_ids.iter().map(move |table_id| {
TxnOp::Put(
TableFlowKey::new(
catalog.to_string(),
*table_id,
flownode_id,
flow_id,
partition_id,
)
.to_bytes(),
TableFlowKey::new(*table_id, flownode_id, flow_id, partition_id).to_bytes(),
vec![],
)
})
@@ -254,20 +232,19 @@ mod tests {
#[test]
fn test_key_serialization() {
let table_flow_key = TableFlowKey::new("my_catalog".to_string(), 1024, 1, 2, 0);
let table_flow_key = TableFlowKey::new(1024, 1, 2, 0);
assert_eq!(
b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(),
b"__flow/source_table/1024/1/2/0".to_vec(),
table_flow_key.to_bytes(),
);
let prefix = TableFlowKey::range_start_key("my_catalog".to_string(), 1024);
assert_eq!(b"__flow/my_catalog/source_table/1024/".to_vec(), prefix);
let prefix = TableFlowKey::range_start_key(1024);
assert_eq!(b"__flow/source_table/1024/".to_vec(), prefix);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/my_catalog/source_table/1024/1/2/0".to_vec();
let bytes = b"__flow/source_table/1024/1/2/0".to_vec();
let key = TableFlowKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.source_table_id(), 1024);
assert_eq!(key.flownode_id(), 1);
assert_eq!(key.flow_id(), 2);

View File

@@ -1,152 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use snafu::OptionExt;
use crate::error::{self, Result};
/// The delimiter of key.
pub(crate) const DELIMITER: u8 = b'/';
/// The key of metadata.
pub trait MetaKey<T> {
fn to_bytes(&self) -> Vec<u8>;
fn from_bytes(bytes: &[u8]) -> Result<T>;
}
/// The key of `{catalog}/` scope.
#[derive(Debug, PartialEq)]
pub struct CatalogScoped<T> {
inner: T,
catalog: String,
}
impl<T> Deref for CatalogScoped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> CatalogScoped<T> {
/// Returns a new [CatalogScoped] key.
pub fn new(catalog: String, inner: T) -> CatalogScoped<T> {
CatalogScoped { inner, catalog }
}
/// Returns the `catalog`.
pub fn catalog(&self) -> &str {
&self.catalog
}
}
impl<T: MetaKey<T>> MetaKey<CatalogScoped<T>> for CatalogScoped<T> {
fn to_bytes(&self) -> Vec<u8> {
let prefix = self.catalog.as_bytes();
let inner = self.inner.to_bytes();
let mut bytes = Vec::with_capacity(prefix.len() + inner.len() + 1);
bytes.extend(prefix);
bytes.push(DELIMITER);
bytes.extend(inner);
bytes
}
fn from_bytes(bytes: &[u8]) -> Result<CatalogScoped<T>> {
let pos = bytes
.iter()
.position(|c| *c == DELIMITER)
.with_context(|| error::DelimiterNotFoundSnafu {
key: String::from_utf8_lossy(bytes),
})?;
let catalog = String::from_utf8_lossy(&bytes[0..pos]).to_string();
// Safety: We don't need the `DELIMITER` char.
let inner = T::from_bytes(&bytes[pos + 1..])?;
Ok(CatalogScoped { inner, catalog })
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BytesAdapter(Vec<u8>);
impl From<Vec<u8>> for BytesAdapter {
fn from(value: Vec<u8>) -> Self {
Self(value)
}
}
impl MetaKey<BytesAdapter> for BytesAdapter {
fn to_bytes(&self) -> Vec<u8> {
self.0.clone()
}
fn from_bytes(bytes: &[u8]) -> Result<BytesAdapter> {
Ok(BytesAdapter(bytes.to_vec()))
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::error::Result;
#[derive(Debug)]
struct MockKey {
inner: Vec<u8>,
}
impl MetaKey<MockKey> for MockKey {
fn to_bytes(&self) -> Vec<u8> {
self.inner.clone()
}
fn from_bytes(bytes: &[u8]) -> Result<MockKey> {
Ok(MockKey {
inner: bytes.to_vec(),
})
}
}
#[test]
fn test_catalog_scoped_from_bytes() {
let key = "test_catalog_name/key";
let scoped_key = CatalogScoped::<MockKey>::from_bytes(key.as_bytes()).unwrap();
assert_eq!(scoped_key.catalog, "test_catalog_name");
assert_eq!(scoped_key.inner.inner, b"key".to_vec());
assert_eq!(key.as_bytes(), &scoped_key.to_bytes());
}
#[test]
fn test_catalog_scoped_from_bytes_delimiter_not_found() {
let key = "test_catalog_name";
let err = CatalogScoped::<MockKey>::from_bytes(key.as_bytes()).unwrap_err();
assert_matches!(err, error::Error::DelimiterNotFound { .. });
}
#[test]
fn test_catalog_scoped_to_bytes() {
let scoped_key = CatalogScoped {
inner: MockKey {
inner: b"hi".to_vec(),
},
catalog: "test_catalog".to_string(),
};
assert_eq!(b"test_catalog/hi".to_vec(), scoped_key.to_bytes());
}
}