feat: show flow's mem usage in INFORMATION_SCHEMA.FLOWS (#4890)

* feat: add flow mem size to sys table

* chore: rm dup def

* chore: remove unused variant

* chore: minor refactor

* refactor: per review
This commit is contained in:
discord9
2024-12-19 16:24:04 +08:00
committed by GitHub
parent 422d18da8b
commit 2d6f63a504
32 changed files with 942 additions and 33 deletions

128
Cargo.lock generated
View File

@@ -730,6 +730,36 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "attribute-derive"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1800e974930e9079c965b9ffbcb6667a40401063a26396c7b4f15edc92da690"
dependencies = [
"attribute-derive-macro",
"derive-where",
"manyhow",
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "attribute-derive-macro"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d908eb786ef94296bff86f90130b3b748b49401dc81fd2bb8b3dccd44cfacbd"
dependencies = [
"collection_literals",
"interpolator",
"manyhow",
"proc-macro-utils",
"proc-macro2",
"quote",
"quote-use",
"syn 2.0.90",
]
[[package]]
name = "atty"
version = "0.2.14"
@@ -1845,6 +1875,12 @@ dependencies = [
"tracing-appender",
]
[[package]]
name = "collection_literals"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186dce98367766de751c42c4f03970fc60fc012296e706ccbb9d5df9b6c1e271"
[[package]]
name = "colorchoice"
version = "1.0.2"
@@ -3346,6 +3382,17 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "derive-where"
version = "1.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62d671cc41a825ebabc75757b62d3d168c577f9149b2d49ece1dad1f72119d25"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "derive_arbitrary"
version = "1.3.2"
@@ -4011,6 +4058,8 @@ dependencies = [
"enum-as-inner",
"enum_dispatch",
"futures",
"get-size-derive2",
"get-size2",
"greptime-proto",
"hydroflow",
"itertools 0.10.5",
@@ -4415,6 +4464,23 @@ dependencies = [
"libm",
]
[[package]]
name = "get-size-derive2"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd26d3a97ea14d289c8b54180243ecfe465f3fa9c279a6336d7a003698fc39d"
dependencies = [
"attribute-derive",
"quote",
"syn 2.0.90",
]
[[package]]
name = "get-size2"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159c430715e540d2198fa981d39cd45563ccc60900de187f5b152b33b1cb408e"
[[package]]
name = "gethostname"
version = "0.2.3"
@@ -5346,6 +5412,12 @@ version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a"
[[package]]
name = "interpolator"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71dd52191aae121e8611f1e8dc3e324dd0dd1dee1e6dd91d10ee07a3cfb4d9d8"
[[package]]
name = "inventory"
version = "0.3.15"
@@ -6244,6 +6316,29 @@ dependencies = [
"libc",
]
[[package]]
name = "manyhow"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b33efb3ca6d3b07393750d4030418d594ab1139cee518f0dc88db70fec873587"
dependencies = [
"manyhow-macros",
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "manyhow-macros"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46fce34d199b78b6e6073abf984c9cf5fd3e9330145a93ee0738a7443e371495"
dependencies = [
"proc-macro-utils",
"proc-macro2",
"quote",
]
[[package]]
name = "maplit"
version = "1.0.2"
@@ -8528,6 +8623,17 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-utils"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeaf08a13de400bc215877b5bdc088f241b12eb42f0a548d3390dc1c56bb7071"
dependencies = [
"proc-macro2",
"quote",
"smallvec",
]
[[package]]
name = "proc-macro2"
version = "1.0.92"
@@ -9107,6 +9213,28 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "quote-use"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9619db1197b497a36178cfc736dc96b271fe918875fbf1344c436a7e93d0321e"
dependencies = [
"quote",
"quote-use-macros",
]
[[package]]
name = "quote-use-macros"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82ebfb7faafadc06a7ab141a6f67bcfb24cb8beb158c6fe933f2f035afa99f35"
dependencies = [
"proc-macro-utils",
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "radium"
version = "0.7.0"

View File

@@ -64,6 +64,13 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to list flow stats"))]
ListFlowStats {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows {
#[snafu(implicit)]
@@ -326,6 +333,7 @@ impl ErrorExt for Error {
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. }
| Error::ListFlowStats { source, .. }
| Error::ListProcedures { source, .. }
| Error::ListRegionStats { source, .. }
| Error::ConvertProtoData { source, .. } => source.status_code(),

View File

@@ -17,6 +17,7 @@ use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use meta_client::MetaClientRef;
@@ -89,4 +90,12 @@ impl InformationExtension for DistributedInformationExtension {
.map_err(BoxedError::new)
.context(error::ListRegionStatsSnafu)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
self.meta_client
.list_flow_stats()
.await
.map_err(BoxedError::new)
.context(crate::error::ListFlowStatsSnafu)
}
}

View File

@@ -35,6 +35,7 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
@@ -192,6 +193,7 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
)) as _),
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.flow_metadata_manager.clone(),
)) as _),
PROCEDURE_INFO => Some(
@@ -338,6 +340,9 @@ pub trait InformationExtension {
/// Gets the region statistics.
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
}
pub struct NoopInformationExtension;
@@ -357,4 +362,8 @@ impl InformationExtension for NoopInformationExtension {
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
Ok(vec![])
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
}

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -28,7 +29,9 @@ use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use datatypes::vectors::{
Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
@@ -38,6 +41,8 @@ use crate::error::{
};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
use crate::system_schema::utils;
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;
@@ -45,6 +50,7 @@ const INIT_CAPACITY: usize = 42;
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
pub const FLOW_ID: &str = "flow_id";
pub const STATE_SIZE: &str = "state_size";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const FLOW_DEFINITION: &str = "flow_definition";
pub const COMMENT: &str = "comment";
@@ -55,20 +61,24 @@ pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options";
/// The `information_schema.flows` to provides information about flows in databases.
///
pub(super) struct InformationSchemaFlows {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
}
impl InformationSchemaFlows {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
flow_metadata_manager,
}
}
@@ -80,6 +90,7 @@ impl InformationSchemaFlows {
vec![
(FLOW_NAME, CDT::string_datatype(), false),
(FLOW_ID, CDT::uint32_datatype(), false),
(STATE_SIZE, CDT::uint64_datatype(), true),
(TABLE_CATALOG, CDT::string_datatype(), false),
(FLOW_DEFINITION, CDT::string_datatype(), false),
(COMMENT, CDT::string_datatype(), true),
@@ -99,6 +110,7 @@ impl InformationSchemaFlows {
InformationSchemaFlowsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
&self.flow_metadata_manager,
)
}
@@ -144,10 +156,12 @@ impl InformationTable for InformationSchemaFlows {
struct InformationSchemaFlowsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
flow_names: StringVectorBuilder,
flow_ids: UInt32VectorBuilder,
state_sizes: UInt64VectorBuilder,
table_catalogs: StringVectorBuilder,
raw_sqls: StringVectorBuilder,
comments: StringVectorBuilder,
@@ -162,15 +176,18 @@ impl InformationSchemaFlowsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: &Arc<FlowMetadataManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
flow_metadata_manager: flow_metadata_manager.clone(),
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -195,6 +212,11 @@ impl InformationSchemaFlowsBuilder {
.flow_names(&catalog_name)
.await;
let flow_stat = {
let information_extension = utils::information_extension(&self.catalog_manager)?;
information_extension.flow_stats().await?
};
while let Some((flow_name, flow_id)) = stream
.try_next()
.await
@@ -213,7 +235,7 @@ impl InformationSchemaFlowsBuilder {
catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(),
})?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?;
}
self.finish()
@@ -224,6 +246,7 @@ impl InformationSchemaFlowsBuilder {
predicates: &Predicates,
flow_id: FlowId,
flow_info: FlowInfoValue,
flow_stat: &Option<FlowStat>,
) -> Result<()> {
let row = [
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
@@ -238,6 +261,11 @@ impl InformationSchemaFlowsBuilder {
}
self.flow_names.push(Some(flow_info.flow_name()));
self.flow_ids.push(Some(flow_id));
self.state_sizes.push(
flow_stat
.as_ref()
.and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
);
self.table_catalogs.push(Some(flow_info.catalog_name()));
self.raw_sqls.push(Some(flow_info.raw_sql()));
self.comments.push(Some(flow_info.comment()));
@@ -270,6 +298,7 @@ impl InformationSchemaFlowsBuilder {
let columns: Vec<VectorRef> = vec![
Arc::new(self.flow_names.finish()),
Arc::new(self.flow_ids.finish()),
Arc::new(self.state_sizes.finish()),
Arc::new(self.table_catalogs.finish()),
Arc::new(self.raw_sqls.finish()),
Arc::new(self.comments.finish()),

View File

@@ -34,7 +34,7 @@ use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use meta_client::client::{ClusterKvBackend, MetaClientBuilder};
use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser;
use query::query_engine::{DefaultSerializer, QueryEngineState};

View File

@@ -34,6 +34,7 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -70,7 +71,7 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, RwLock};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -507,7 +508,7 @@ impl StartCommand {
procedure_manager.clone(),
));
let catalog_manager = KvBackendCatalogManager::new(
information_extension,
information_extension.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
Some(procedure_manager.clone()),
@@ -532,6 +533,14 @@ impl StartCommand {
.context(OtherSnafu)?,
);
// set the ref to query for the local flow state
{
let flow_worker_manager = flownode.flow_worker_manager();
information_extension
.set_flow_worker_manager(flow_worker_manager.clone())
.await;
}
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
@@ -669,6 +678,7 @@ pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>,
}
impl StandaloneInformationExtension {
@@ -677,8 +687,15 @@ impl StandaloneInformationExtension {
region_server,
procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
flow_worker_manager: RwLock::new(None),
}
}
/// Set the flow worker manager for the standalone instance.
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) {
let mut guard = self.flow_worker_manager.write().await;
*guard = Some(flow_worker_manager);
}
}
#[async_trait::async_trait]
@@ -750,6 +767,18 @@ impl InformationExtension for StandaloneInformationExtension {
.collect::<Vec<_>>();
Ok(stats)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_worker_manager
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await,
))
}
}
#[cfg(test)]

View File

@@ -137,6 +137,7 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::key::flow::flow_state::FlowStateValue;
use crate::key::node_address::NodeAddressValue;
use crate::key::table_route::TableRouteKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
@@ -1262,7 +1263,8 @@ impl_metadata_value! {
FlowRouteValue,
TableFlowValue,
NodeAddressValue,
SchemaNameValue
SchemaNameValue,
FlowStateValue
}
impl_optional_metadata_value! {

View File

@@ -15,6 +15,7 @@
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flow_route;
pub mod flow_state;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
@@ -35,6 +36,7 @@ use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flow_state::FlowStateManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
@@ -102,6 +104,8 @@ pub struct FlowMetadataManager {
flownode_flow_manager: FlownodeFlowManager,
table_flow_manager: TableFlowManager,
flow_name_manager: FlowNameManager,
/// only metasrv have access to itself's memory backend, so for other case it should be None
flow_state_manager: Option<FlowStateManager>,
kv_backend: KvBackendRef,
}
@@ -114,6 +118,7 @@ impl FlowMetadataManager {
flow_name_manager: FlowNameManager::new(kv_backend.clone()),
flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
table_flow_manager: TableFlowManager::new(kv_backend.clone()),
flow_state_manager: None,
kv_backend,
}
}
@@ -123,6 +128,10 @@ impl FlowMetadataManager {
&self.flow_name_manager
}
pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
self.flow_state_manager.as_ref()
}
/// Returns the [`FlowInfoManager`].
pub fn flow_info_manager(&self) -> &FlowInfoManager {
&self.flow_info_manager

View File

@@ -0,0 +1,162 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::{FlowId, MetadataKey, MetadataValue};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
/// The entire FlowId to Flow Size's Map is stored directly in the value part of the key.
const FLOW_STATE_KEY: &str = "state";
/// The key of flow state.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowStateKeyInner;
impl FlowStateKeyInner {
pub fn new() -> Self {
Self
}
}
impl<'a> MetadataKey<'a, FlowStateKeyInner> for FlowStateKeyInner {
fn to_bytes(&self) -> Vec<u8> {
FLOW_STATE_KEY.as_bytes().to_vec()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidMetadataSnafu {
err_msg: format!(
"FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
if key != FLOW_STATE_KEY {
return Err(error::InvalidMetadataSnafu {
err_msg: format!("Invalid FlowStateKeyInner '{key}'"),
}
.build());
}
Ok(FlowStateKeyInner::new())
}
}
/// The key stores the state size of the flow.
///
/// The layout: `__flow/state`.
pub struct FlowStateKey(FlowScoped<FlowStateKeyInner>);
impl FlowStateKey {
/// Returns the [FlowStateKey].
pub fn new() -> FlowStateKey {
let inner = FlowStateKeyInner::new();
FlowStateKey(FlowScoped::new(inner))
}
}
impl Default for FlowStateKey {
fn default() -> Self {
Self::new()
}
}
impl<'a> MetadataKey<'a, FlowStateKey> for FlowStateKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKey> {
Ok(FlowStateKey(FlowScoped::<FlowStateKeyInner>::from_bytes(
bytes,
)?))
}
}
/// The value of flow state size
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowStateValue {
/// For each key, the bytes of the state in memory
pub state_size: BTreeMap<FlowId, usize>,
}
impl FlowStateValue {
pub fn new(state_size: BTreeMap<FlowId, usize>) -> Self {
Self { state_size }
}
}
pub type FlowStateManagerRef = Arc<FlowStateManager>;
/// The manager of [FlowStateKey]. Since state size changes frequently, we store it in memory.
///
/// This is only used in distributed mode. When meta-srv use heartbeat to update the flow stat report
/// and frontned use get to get the latest flow stat report.
pub struct FlowStateManager {
in_memory: KvBackendRef,
}
impl FlowStateManager {
pub fn new(in_memory: KvBackendRef) -> Self {
Self { in_memory }
}
pub async fn get(&self) -> Result<Option<FlowStateValue>> {
let key = FlowStateKey::new().to_bytes();
self.in_memory
.get(&key)
.await?
.map(|x| FlowStateValue::try_from_raw_value(&x.value))
.transpose()
}
pub async fn put(&self, value: FlowStateValue) -> Result<()> {
let key = FlowStateKey::new().to_bytes();
let value = value.try_as_raw_value()?;
let req = PutRequest::new().with_key(key).with_value(value);
self.in_memory.put(req).await?;
Ok(())
}
}
/// Flow's state report, send regularly through heartbeat message
#[derive(Debug, Clone)]
pub struct FlowStat {
/// For each key, the bytes of the state in memory
pub state_size: BTreeMap<u32, usize>,
}
impl From<FlowStateValue> for FlowStat {
fn from(value: FlowStateValue) -> Self {
Self {
state_size: value.state_size,
}
}
}
impl From<FlowStat> for FlowStateValue {
fn from(value: FlowStat) -> Self {
Self {
state_size: value.state_size,
}
}
}

View File

@@ -36,7 +36,7 @@ pub mod postgres;
pub mod test;
pub mod txn;
pub type KvBackendRef = Arc<dyn KvBackend<Error = Error> + Send + Sync>;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
#[async_trait]
pub trait KvBackend: TxnService
@@ -161,6 +161,9 @@ where
Self::Error: ErrorExt,
{
fn reset(&self);
/// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable.
fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
}
pub type ResettableKvBackendRef = Arc<dyn ResettableKvBackend<Error = Error> + Send + Sync>;
pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;

View File

@@ -16,13 +16,13 @@ use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use serde::Serializer;
use super::ResettableKvBackend;
use super::{KvBackendRef, ResettableKvBackend};
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
@@ -311,6 +311,10 @@ impl<T: ErrorExt + Send + Sync + 'static> ResettableKvBackend for MemoryKvBacken
fn reset(&self) {
self.clear();
}
fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<T> {
self
}
}
#[cfg(test)]

View File

@@ -40,6 +40,8 @@ datatypes.workspace = true
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"
futures = "0.3"
get-size-derive2 = "0.1.2"
get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo

View File

@@ -60,6 +60,7 @@ use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
mod flownode_impl;
mod parse_expr;
mod stat;
#[cfg(test)]
mod tests;
mod util;
@@ -69,6 +70,7 @@ pub(crate) mod node_context;
mod table_source;
use crate::error::Error;
use crate::utils::StateReportHandler;
use crate::FrontendInvoker;
// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
@@ -137,6 +139,8 @@ pub struct FlowWorkerManager {
///
/// So that a series of event like `inserts -> flush` can be handled correctly
flush_lock: RwLock<()>,
/// receive a oneshot sender to send state size report
state_report_handler: RwLock<Option<StateReportHandler>>,
}
/// Building FlownodeManager
@@ -170,9 +174,15 @@ impl FlowWorkerManager {
tick_manager,
node_id,
flush_lock: RwLock::new(()),
state_report_handler: RwLock::new(None),
}
}
pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
*self.state_report_handler.write().await = Some(handler);
self
}
/// Create a flownode manager with one worker
pub fn new_with_worker<'s>(
node_id: Option<u32>,
@@ -500,6 +510,27 @@ impl FlowWorkerManager {
/// Flow Runtime related methods
impl FlowWorkerManager {
/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
///
/// if heartbeat task is shutdown, this future will exit too
async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
let state_report_handler = self.state_report_handler.write().await.take();
if let Some(mut handler) = state_report_handler {
let zelf = self.clone();
let handler = common_runtime::spawn_global(async move {
while let Some(ret_handler) = handler.recv().await {
let state_report = zelf.gen_state_report().await;
ret_handler.send(state_report).unwrap_or_else(|err| {
common_telemetry::error!(err; "Send state size report error");
});
}
});
Some(handler)
} else {
None
}
}
/// run in common_runtime background runtime
pub fn run_background(
self: Arc<Self>,
@@ -507,6 +538,7 @@ impl FlowWorkerManager {
) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
common_runtime::spawn_global(async move {
let _state_report_handler = self.clone().start_state_report_handler().await;
self.run(shutdown).await;
})
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use common_meta::key::flow::flow_state::FlowStat;
use crate::FlowWorkerManager;
impl FlowWorkerManager {
pub async fn gen_state_report(&self) -> FlowStat {
let mut full_report = BTreeMap::new();
for worker in self.worker_handles.iter() {
let worker = worker.lock().await;
match worker.get_state_size().await {
Ok(state_size) => {
full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)))
}
Err(err) => {
common_telemetry::error!(err; "Get flow stat size error");
}
}
}
FlowStat {
state_size: full_report,
}
}
}

View File

@@ -197,6 +197,21 @@ impl WorkerHandle {
.fail()
}
}
pub async fn get_state_size(&self) -> Result<BTreeMap<FlowId, usize>, Error> {
let ret = self
.itc_client
.call_with_resp(Request::QueryStateSize)
.await?;
ret.into_query_state_size().map_err(|ret| {
InternalSnafu {
reason: format!(
"Flow Node/Worker itc failed, expect Response::QueryStateSize, found {ret:?}"
),
}
.build()
})
}
}
impl Drop for WorkerHandle {
@@ -361,6 +376,13 @@ impl<'s> Worker<'s> {
Some(Response::ContainTask { result: ret })
}
Request::Shutdown => return Err(()),
Request::QueryStateSize => {
let mut ret = BTreeMap::new();
for (flow_id, task_state) in self.task_states.iter() {
ret.insert(*flow_id, task_state.state.get_state_size());
}
Some(Response::QueryStateSize { result: ret })
}
};
Ok(ret)
}
@@ -391,6 +413,7 @@ pub enum Request {
flow_id: FlowId,
},
Shutdown,
QueryStateSize,
}
#[derive(Debug, EnumAsInner)]
@@ -406,6 +429,10 @@ enum Response {
result: bool,
},
RunAvail,
QueryStateSize {
/// each flow tasks' state size
result: BTreeMap<FlowId, usize>,
},
}
fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) {
@@ -423,10 +450,12 @@ struct InterThreadCallClient {
}
impl InterThreadCallClient {
/// call without response
fn call_no_resp(&self, req: Request) -> Result<(), Error> {
self.arg_sender.send((req, None)).map_err(from_send_error)
}
/// call with response
async fn call_with_resp(&self, req: Request) -> Result<Response, Error> {
let (tx, rx) = oneshot::channel();
self.arg_sender
@@ -527,6 +556,7 @@ mod test {
);
tx.send(Batch::empty()).unwrap();
handle.run_available(0, true).await.unwrap();
assert_eq!(handle.get_state_size().await.unwrap().len(), 1);
assert_eq!(sink_rx.recv().await.unwrap(), Batch::empty());
drop(handle);
worker_thread_handle.join().unwrap();

View File

@@ -16,6 +16,7 @@ use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use get_size2::GetSize;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
@@ -109,6 +110,10 @@ impl DataflowState {
pub fn expire_after(&self) -> Option<Timestamp> {
self.expire_after
}
pub fn get_state_size(&self) -> usize {
self.arrange_used.iter().map(|x| x.read().get_size()).sum()
}
}
#[derive(Debug, Clone)]

View File

@@ -24,6 +24,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::key::flow::flow_state::FlowStat;
use common_telemetry::{debug, error, info, warn};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
@@ -34,8 +35,27 @@ use tokio::sync::mpsc;
use tokio::time::Duration;
use crate::error::ExternalSnafu;
use crate::utils::SizeReportSender;
use crate::{Error, FlownodeOptions};
async fn query_flow_state(
query_stat_size: &Option<SizeReportSender>,
timeout: Duration,
) -> Option<FlowStat> {
if let Some(report_requester) = query_stat_size.as_ref() {
let ret = report_requester.query(timeout).await;
match ret {
Ok(latest) => Some(latest),
Err(err) => {
error!(err; "Failed to get query stat size");
None
}
}
} else {
None
}
}
/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {
@@ -47,9 +67,14 @@ pub struct HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
impl HeartbeatTask {
pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
self.query_stat_size = Some(query_stat_size);
self
}
pub fn new(
opts: &FlownodeOptions,
meta_client: Arc<MetaClient>,
@@ -65,6 +90,7 @@ impl HeartbeatTask {
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
}
@@ -112,6 +138,7 @@ impl HeartbeatTask {
message: Option<OutgoingMessage>,
peer: Option<Peer>,
start_time_ms: u64,
latest_report: &Option<FlowStat>,
) -> Option<HeartbeatRequest> {
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
Some(Ok(message)) => Some(message),
@@ -121,11 +148,22 @@ impl HeartbeatTask {
}
None => None,
};
let flow_stat = latest_report
.as_ref()
.map(|report| {
report
.state_size
.iter()
.map(|(k, v)| (*k, *v as u64))
.collect()
})
.map(|f| api::v1::meta::FlowStat { flow_stat_size: f });
Some(HeartbeatRequest {
mailbox_message,
peer,
info: Self::build_node_info(start_time_ms),
flow_stat,
..Default::default()
})
}
@@ -151,24 +189,27 @@ impl HeartbeatTask {
addr: self.peer_addr.clone(),
});
let query_stat_size = self.query_stat_size.clone();
common_runtime::spawn_hb(async move {
// note that using interval will cause it to first immediately send
// a heartbeat
let mut interval = tokio::time::interval(report_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut latest_report = None;
loop {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms)
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms, &latest_report)
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = interval.tick() => {
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms)
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms, &latest_report)
}
};
@@ -180,6 +221,10 @@ impl HeartbeatTask {
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
}
}
// after sending heartbeat, try to get the latest report
// TODO(discord9): consider a better place to update the size report
// set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
}
});
}

View File

@@ -22,12 +22,14 @@ use api::v1::Row as ProtoRow;
use datatypes::data_type::ConcreteDataType;
use datatypes::types::cast;
use datatypes::value::Value;
use get_size2::GetSize;
use itertools::Itertools;
pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu};
use crate::utils::get_value_heap_size;
/// System-wide Record count difference type. Useful for capture data change
///
@@ -105,6 +107,12 @@ pub struct Row {
pub inner: Vec<Value>,
}
impl GetSize for Row {
fn get_heap_size(&self) -> usize {
self.inner.iter().map(get_value_heap_size).sum()
}
}
impl Row {
/// Create an empty row
pub fn empty() -> Self {

View File

@@ -55,6 +55,7 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowWorkerManager, FlownodeOptions};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
@@ -236,6 +237,8 @@ pub struct FlownodeBuilder {
catalog_manager: CatalogManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
heartbeat_task: Option<HeartbeatTask>,
/// receive a oneshot sender to send state size report
state_report_handler: Option<StateReportHandler>,
}
impl FlownodeBuilder {
@@ -254,17 +257,20 @@ impl FlownodeBuilder {
catalog_manager,
flow_metadata_manager,
heartbeat_task: None,
state_report_handler: None,
}
}
pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
let (sender, receiver) = SizeReportSender::new();
Self {
heartbeat_task: Some(heartbeat_task),
heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
state_report_handler: Some(receiver),
..self
}
}
pub async fn build(self) -> Result<FlownodeInstance, Error> {
pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
// TODO(discord9): does this query engine need those?
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode is only used for translate plan with resolved table source.
@@ -383,7 +389,7 @@ impl FlownodeBuilder {
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
/// nor does it actually start running the worker.
async fn build_manager(
&self,
&mut self,
query_engine: Arc<dyn QueryEngine>,
) -> Result<FlowWorkerManager, Error> {
let table_meta = self.table_meta.clone();
@@ -402,12 +408,15 @@ impl FlownodeBuilder {
info!("Flow Worker started in new thread");
worker.run();
});
let man = rx.await.map_err(|_e| {
let mut man = rx.await.map_err(|_e| {
UnexpectedSnafu {
reason: "sender is dropped, failed to create flow node manager",
}
.build()
})?;
if let Some(handler) = self.state_report_handler.take() {
man = man.with_state_report_handler(handler).await;
}
info!("Flow Node Manager started");
Ok(man)
}

View File

@@ -18,16 +18,73 @@ use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
use std::sync::Arc;
use common_meta::key::flow::flow_state::FlowStat;
use common_telemetry::trace;
use datatypes::value::Value;
use get_size2::GetSize;
use smallvec::{smallvec, SmallVec};
use tokio::sync::RwLock;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::time::Instant;
use crate::error::InternalSnafu;
use crate::expr::{EvalError, ScalarExpr};
use crate::repr::{value_to_internal_ts, DiffRow, Duration, KeyValDiffRow, Row, Timestamp};
/// A batch of updates, arranged by key
pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
/// Get a estimate of heap size of a value
pub fn get_value_heap_size(v: &Value) -> usize {
match v {
Value::Binary(bin) => bin.len(),
Value::String(s) => s.len(),
Value::List(list) => list.items().iter().map(get_value_heap_size).sum(),
_ => 0,
}
}
#[derive(Clone)]
pub struct SizeReportSender {
inner: mpsc::Sender<oneshot::Sender<FlowStat>>,
}
impl SizeReportSender {
pub fn new() -> (Self, StateReportHandler) {
let (tx, rx) = mpsc::channel(1);
let zelf = Self { inner: tx };
(zelf, rx)
}
/// Query the size report, will timeout after one second if no response
pub async fn query(&self, timeout: std::time::Duration) -> crate::Result<FlowStat> {
let (tx, rx) = oneshot::channel();
self.inner.send(tx).await.map_err(|_| {
InternalSnafu {
reason: "failed to send size report request due to receiver dropped",
}
.build()
})?;
let timeout = tokio::time::timeout(timeout, rx);
timeout
.await
.map_err(|_elapsed| {
InternalSnafu {
reason: "failed to receive size report after one second timeout",
}
.build()
})?
.map_err(|_| {
InternalSnafu {
reason: "failed to receive size report due to sender dropped",
}
.build()
})
}
}
/// Handle the size report request, and send the report back
pub type StateReportHandler = mpsc::Receiver<oneshot::Sender<FlowStat>>;
/// A spine of batches, arranged by timestamp
/// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup
pub type Spine = BTreeMap<Timestamp, Batch>;
@@ -49,6 +106,24 @@ pub struct KeyExpiryManager {
event_timestamp_from_row: Option<ScalarExpr>,
}
impl GetSize for KeyExpiryManager {
fn get_heap_size(&self) -> usize {
let row_size = if let Some(row_size) = &self
.event_ts_to_key
.first_key_value()
.map(|(_, v)| v.first().get_heap_size())
{
*row_size
} else {
0
};
self.event_ts_to_key
.values()
.map(|v| v.len() * row_size + std::mem::size_of::<i64>())
.sum::<usize>()
}
}
impl KeyExpiryManager {
pub fn new(
key_expiration_duration: Option<Duration>,
@@ -154,7 +229,7 @@ impl KeyExpiryManager {
///
/// Note the two way arrow between reduce operator and arrange, it's because reduce operator need to query existing state
/// and also need to update existing state.
#[derive(Debug, Clone, Default, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Arrangement {
/// A name or identifier for the arrangement which can be used for debugging or logging purposes.
/// This field is not critical to the functionality but aids in monitoring and management of arrangements.
@@ -196,6 +271,61 @@ pub struct Arrangement {
/// The time that the last compaction happened, also known as the current time.
last_compaction_time: Option<Timestamp>,
/// Estimated size of the arrangement in heap size.
estimated_size: usize,
last_size_update: Instant,
size_update_interval: tokio::time::Duration,
}
impl Arrangement {
fn compute_size(&self) -> usize {
self.spine
.values()
.map(|v| {
let per_entry_size = v
.first_key_value()
.map(|(k, v)| {
k.get_heap_size()
+ v.len() * v.first().map(|r| r.get_heap_size()).unwrap_or(0)
})
.unwrap_or(0);
std::mem::size_of::<i64>() + v.len() * per_entry_size
})
.sum::<usize>()
+ self.expire_state.get_heap_size()
+ self.name.get_heap_size()
}
fn update_and_fetch_size(&mut self) -> usize {
if self.last_size_update.elapsed() > self.size_update_interval {
self.estimated_size = self.compute_size();
self.last_size_update = Instant::now();
}
self.estimated_size
}
}
impl GetSize for Arrangement {
fn get_heap_size(&self) -> usize {
self.estimated_size
}
}
impl Default for Arrangement {
fn default() -> Self {
Self {
spine: Default::default(),
full_arrangement: false,
is_written: false,
expire_state: None,
last_compaction_time: None,
name: Vec::new(),
estimated_size: 0,
last_size_update: Instant::now(),
size_update_interval: tokio::time::Duration::from_secs(3),
}
}
}
impl Arrangement {
@@ -207,6 +337,9 @@ impl Arrangement {
expire_state: None,
last_compaction_time: None,
name,
estimated_size: 0,
last_size_update: Instant::now(),
size_update_interval: tokio::time::Duration::from_secs(3),
}
}
@@ -269,6 +402,7 @@ impl Arrangement {
// without changing the order of updates within same tick
key_updates.sort_by_key(|(_val, ts, _diff)| *ts);
}
self.update_and_fetch_size();
Ok(max_expired_by)
}
@@ -390,6 +524,7 @@ impl Arrangement {
// insert the compacted batch into spine with key being `now`
self.spine.insert(now, compacting_batch);
self.update_and_fetch_size();
Ok(max_expired_by)
}

View File

@@ -25,6 +25,7 @@ use std::sync::Arc;
use api::v1::meta::{ProcedureDetailResponse, Role};
use cluster::Client as ClusterClient;
pub use cluster::ClusterKvBackend;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{
@@ -33,6 +34,8 @@ use common_meta::cluster::{
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult};
use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
@@ -54,7 +57,8 @@ use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error::{
ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, NotStartedSnafu, Result,
ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
Result,
};
pub type Id = (u64, u64);
@@ -347,6 +351,15 @@ fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
}
impl MetaClient {
pub async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
let flow_state_manager = FlowStateManager::new(cluster_backend);
let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
Ok(res.map(|r| r.into()))
}
pub fn new(id: Id) -> Self {
Self {
id,

View File

@@ -40,8 +40,8 @@ use tonic::Status;
use crate::client::ask_leader::AskLeader;
use crate::client::{util, Id};
use crate::error::{
ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, Result,
RetryTimesExceededSnafu,
ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
};
#[derive(Clone, Debug)]
@@ -308,3 +308,75 @@ impl Inner {
.map(|res| (res.leader, res.followers))
}
}
/// A client for the cluster info. Read only and corresponding to
/// `in_memory` kvbackend in the meta-srv.
#[derive(Clone, Debug)]
pub struct ClusterKvBackend {
inner: Arc<Client>,
}
impl ClusterKvBackend {
pub fn new(client: Arc<Client>) -> Self {
Self { inner: client }
}
fn unimpl(&self) -> common_meta::error::Error {
let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
name: self.name().to_string(),
}
.fail()
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu);
ret.unwrap_err()
}
}
impl TxnService for ClusterKvBackend {
type Error = common_meta::error::Error;
}
#[async_trait::async_trait]
impl KvBackend for ClusterKvBackend {
fn name(&self) -> &str {
"ClusterKvBackend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
self.inner
.range(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
Err(self.unimpl())
}
async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
Err(self.unimpl())
}
async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
Err(self.unimpl())
}
async fn delete_range(
&self,
_: DeleteRangeRequest,
) -> common_meta::error::Result<DeleteRangeResponse> {
Err(self.unimpl())
}
async fn batch_delete(
&self,
_: BatchDeleteRequest,
) -> common_meta::error::Result<BatchDeleteResponse> {
Err(self.unimpl())
}
}

View File

@@ -99,8 +99,22 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to get flow stat"))]
GetFlowStat {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
RetryTimesExceeded { times: usize, msg: String },
#[snafu(display("Trying to write to a read-only kv backend: {}", name))]
ReadOnlyKvBackend {
name: String,
#[snafu(implicit)]
location: Location,
},
}
#[allow(dead_code)]
@@ -120,13 +134,15 @@ impl ErrorExt for Error {
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. } => StatusCode::Internal,
| Error::RetryTimesExceeded { .. }
| Error::ReadOnlyKvBackend { .. } => StatusCode::Internal,
Error::MetaServer { code, .. } => *code,
Error::InvalidResponseHeader { source, .. }
| Error::ConvertMetaRequest { source, .. }
| Error::ConvertMetaResponse { source, .. } => source.status_code(),
| Error::ConvertMetaResponse { source, .. }
| Error::GetFlowStat { source, .. } => source.status_code(),
}
}
}

View File

@@ -716,6 +716,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Flow state handler error"))]
FlowStateHandler {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
}
impl Error {
@@ -761,7 +768,8 @@ impl ErrorExt for Error {
| Error::Join { .. }
| Error::PeerUnavailable { .. }
| Error::ExceededDeadline { .. }
| Error::ChooseItems { .. } => StatusCode::Internal,
| Error::ChooseItems { .. }
| Error::FlowStateHandler { .. } => StatusCode::Internal,
Error::Unsupported { .. } => StatusCode::Unsupported,

View File

@@ -51,6 +51,7 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::metasrv::Context;
use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
use crate::pubsub::PublisherRef;
@@ -64,6 +65,7 @@ pub mod collect_stats_handler;
pub mod extract_stat_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;
pub mod flow_state_handler;
pub mod keep_lease_handler;
pub mod mailbox_handler;
pub mod on_leader_start_handler;
@@ -482,6 +484,8 @@ pub struct HeartbeatHandlerGroupBuilder {
/// based on the number of received heartbeats. When the number of heartbeats
/// reaches this factor, a flush operation is triggered.
flush_stats_factor: Option<usize>,
/// A simple handler for flow internal state report
flow_state_handler: Option<FlowStateHandler>,
/// The plugins.
plugins: Option<Plugins>,
@@ -499,12 +503,18 @@ impl HeartbeatHandlerGroupBuilder {
region_failure_handler: None,
region_lease_handler: None,
flush_stats_factor: None,
flow_state_handler: None,
plugins: None,
pushers,
handlers: vec![],
}
}
pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
self.flow_state_handler = handler;
self
}
pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
self.region_lease_handler = handler;
self
@@ -564,6 +574,10 @@ impl HeartbeatHandlerGroupBuilder {
}
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
if let Some(flow_state_handler) = self.flow_state_handler.take() {
self.add_handler_last(flow_state_handler);
}
self
}

View File

@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{FlowStat, HeartbeatRequest, Role};
use common_meta::key::flow::flow_state::{FlowStateManager, FlowStateValue};
use snafu::ResultExt;
use crate::error::{FlowStateHandlerSnafu, Result};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct FlowStateHandler {
flow_state_manager: FlowStateManager,
}
impl FlowStateHandler {
pub fn new(flow_state_manager: FlowStateManager) -> Self {
Self { flow_state_manager }
}
}
#[async_trait::async_trait]
impl HeartbeatHandler for FlowStateHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Flownode
}
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if let Some(FlowStat { flow_stat_size }) = &req.flow_stat {
let state_size = flow_stat_size
.iter()
.map(|(k, v)| (*k, *v as usize))
.collect();
let value = FlowStateValue::new(state_size);
self.flow_state_manager
.put(value)
.await
.context(FlowStateHandlerSnafu)?;
}
Ok(HandleControl::Continue)
}
}

View File

@@ -26,6 +26,7 @@ use common_meta::ddl::{
};
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::maintenance::MaintenanceModeManager;
use common_meta::key::TableMetadataManager;
@@ -47,6 +48,7 @@ use crate::error::{self, Result};
use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
@@ -228,6 +230,7 @@ impl MetasrvBuilder {
peer_allocator,
))
});
let flow_metadata_allocator = {
// for now flownode just use round-robin selector
let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode);
@@ -248,6 +251,9 @@ impl MetasrvBuilder {
peer_allocator,
))
};
let flow_state_handler =
FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
let node_manager = node_manager.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
@@ -350,6 +356,7 @@ impl MetasrvBuilder {
.with_region_failure_handler(region_failover_handler)
.with_region_lease_handler(Some(region_lease_handler))
.with_flush_stats_factor(Some(options.flush_stats_factor))
.with_flow_state_handler(Some(flow_state_handler))
.add_default_handlers()
}
};

View File

@@ -386,6 +386,10 @@ impl ResettableKvBackend for LeaderCachedKvBackend {
fn reset(&self) {
self.cache.reset()
}
fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error> {
self
}
}
#[cfg(test)]

View File

@@ -1045,6 +1045,18 @@ FROM
| svc1 | 200 | 2024-10-18T19:01:31 |
+--------------+-----+---------------------+
-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;
+--------------+
| active_flows |
+--------------+
| 1 |
+--------------+
DROP FLOW requests_long_term;
Affected Rows: 0

View File

@@ -569,6 +569,12 @@ SELECT
FROM
requests_without_ip;
-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;
DROP FLOW requests_long_term;
DROP TABLE requests_without_ip;

View File

@@ -187,16 +187,17 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | files | update_count | 13 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | files | update_time | 34 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | No | datetime | | |
| greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | comment | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | expire_after | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
| greptime | information_schema | flows | flow_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | comment | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | expire_after | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
| greptime | information_schema | flows | flow_definition | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flow_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | flows | flow_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flownode_ids | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | sink_table_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | source_table_ids | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | table_catalog | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flownode_ids | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | options | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | sink_table_name | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | source_table_ids | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | state_size | 3 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | flows | table_catalog | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |