diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 3456a17489..125c185a5a 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -16,7 +16,6 @@ mod client; pub mod client_manager; -#[cfg(feature = "testing")] mod database; pub mod error; pub mod flow; @@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use snafu::OptionExt; pub use self::client::Client; -#[cfg(feature = "testing")] pub use self::database::Database; pub use self::error::{Error, Result}; use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu}; diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index a186e57d89..8dcf29dee9 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -33,6 +33,7 @@ mod expr; pub mod heartbeat; mod metrics; mod plan; +mod recording_rules; mod repr; mod server; mod transform; diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs new file mode 100644 index 0000000000..92f81cf8f6 --- /dev/null +++ b/src/flow/src/recording_rules.rs @@ -0,0 +1,23 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Run flow as recording rule which is time-window-aware normal query triggered when new data arrives + +use std::time::Duration; + +mod frontend_client; + +/// TODO(discord9): make those constants configurable +/// The default rule engine query timeout is 10 minutes +pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60); diff --git a/src/flow/src/recording_rules/frontend_client.rs b/src/flow/src/recording_rules/frontend_client.rs new file mode 100644 index 0000000000..6ca94181da --- /dev/null +++ b/src/flow/src/recording_rules/frontend_client.rs @@ -0,0 +1,148 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user + +use std::sync::Arc; + +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::BoxedError; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; +use common_meta::peer::Peer; +use common_meta::rpc::store::RangeRequest; +use meta_client::client::MetaClient; +use snafu::ResultExt; + +use crate::error::{ExternalSnafu, UnexpectedSnafu}; +use crate::recording_rules::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT; +use crate::Error; + +fn default_channel_mgr() -> ChannelManager { + let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT); + ChannelManager::with_config(cfg) +} + +fn client_from_urls(addrs: Vec) -> Client { + Client::with_manager_and_urls(default_channel_mgr(), addrs) +} + +/// A simple frontend client able to execute sql using grpc protocol +#[derive(Debug)] +pub enum FrontendClient { + Distributed { + meta_client: Arc, + }, + Standalone { + /// for the sake of simplicity still use grpc even in standalone mode + /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn + /// TODO(discord9): not use grpc under standalone mode + database_client: DatabaseWithPeer, + }, +} + +#[derive(Debug, Clone)] +pub struct DatabaseWithPeer { + pub database: Database, + pub peer: Peer, +} + +impl DatabaseWithPeer { + fn new(database: Database, peer: Peer) -> Self { + Self { database, peer } + } +} + +impl FrontendClient { + pub fn from_meta_client(meta_client: Arc) -> Self { + Self::Distributed { meta_client } + } + + pub fn from_static_grpc_addr(addr: String) -> Self { + let peer = Peer { + id: 0, + addr: addr.clone(), + }; + + let client = client_from_urls(vec![addr]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + Self::Standalone { + database_client: DatabaseWithPeer::new(database, peer), + } + } +} + +impl FrontendClient { + async fn scan_for_frontend(&self) -> Result, Error> { + let Self::Distributed { meta_client, .. } = self else { + return Ok(vec![]); + }; + let cluster_client = meta_client + .cluster_client() + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); + let req = RangeRequest::new().with_prefix(prefix); + let resp = cluster_client + .range(req) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let mut res = Vec::with_capacity(resp.kvs.len()); + for kv in resp.kvs { + let key = NodeInfoKey::try_from(kv.key) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let val = NodeInfo::try_from(kv.value) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + res.push((key, val)); + } + Ok(res) + } + + /// Get the database with max `last_activity_ts` + async fn get_last_active_frontend(&self) -> Result { + if let Self::Standalone { database_client } = self { + return Ok(database_client.clone()); + } + + let frontends = self.scan_for_frontend().await?; + let mut peer = None; + + if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) { + peer = Some(val.peer.clone()); + } + + let Some(peer) = peer else { + UnexpectedSnafu { + reason: format!("No frontend available: {:?}", frontends), + } + .fail()? + }; + let client = client_from_urls(vec![peer.addr.clone()]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + Ok(DatabaseWithPeer::new(database, peer)) + } + + /// Get a database client, and possibly update it before returning. + pub async fn get_database_client(&self) -> Result { + match self { + Self::Standalone { database_client } => Ok(database_client.clone()), + Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await, + } + } +}