From 339affea69a86df06fea67e2c95cc7266021c97e Mon Sep 17 00:00:00 2001 From: luofucong Date: Thu, 4 Dec 2025 19:50:34 +0800 Subject: [PATCH] feat: suspend frontend and datanode Signed-off-by: luofucong --- Cargo.lock | 22 +- Cargo.toml | 1 + src/cmd/src/frontend.rs | 47 ++-- src/common/error/src/lib.rs | 25 ++ src/common/error/src/status_code.rs | 11 +- src/common/function/Cargo.toml | 2 +- .../function/src/scalars/geo/relation.rs | 1 + src/common/meta/src/cluster.rs | 11 +- src/common/meta/src/error.rs | 9 +- src/common/meta/src/heartbeat/handler.rs | 1 + .../meta/src/heartbeat/handler/suspend.rs | 55 ++++ src/common/meta/src/heartbeat/mailbox.rs | 11 +- src/common/meta/src/instruction.rs | 2 + src/datanode/src/datanode.rs | 42 ++- src/datanode/src/heartbeat.rs | 4 +- src/datanode/src/heartbeat/handler.rs | 66 +++-- src/datanode/src/region_server.rs | 19 +- src/frontend/Cargo.toml | 3 + src/frontend/src/error.rs | 8 + src/frontend/src/frontend.rs | 260 ++++++++++++++++++ src/frontend/src/heartbeat.rs | 9 +- src/frontend/src/instance.rs | 28 +- src/frontend/src/instance/builder.rs | 29 ++ src/frontend/src/instance/grpc.rs | 4 +- src/frontend/src/instance/influxdb.rs | 7 +- src/frontend/src/instance/jaeger.rs | 29 +- src/frontend/src/instance/log_handler.rs | 10 +- src/frontend/src/instance/logs.rs | 6 +- src/frontend/src/instance/opentsdb.rs | 6 +- src/frontend/src/instance/otlp.rs | 10 +- src/frontend/src/instance/prom_store.rs | 8 +- src/meta-srv/src/handler.rs | 20 +- src/servers/src/error.rs | 11 +- src/servers/src/mysql/writer.rs | 1 + src/servers/src/postgres/types/error.rs | 5 + src/session/Cargo.toml | 2 +- tests-integration/src/cluster.rs | 27 +- 37 files changed, 650 insertions(+), 162 deletions(-) create mode 100644 src/common/meta/src/heartbeat/handler/suspend.rs diff --git a/Cargo.lock b/Cargo.lock index d93c7cd42e..71a3d49563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2845,6 +2845,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -4184,21 +4193,23 @@ dependencies = [ [[package]] name = "derive_more" -version = "1.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618" dependencies = [ "derive_more-impl", ] [[package]] name = "derive_more-impl" -version = "1.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b" dependencies = [ + "convert_case 0.10.0", "proc-macro2", "quote", + "rustc_version", "syn 2.0.106", "unicode-xid", ] @@ -4949,9 +4960,11 @@ dependencies = [ "hostname 0.4.1", "humantime", "humantime-serde", + "hyper-util", "lazy_static", "log-query", "meta-client", + "meta-srv", "num_cpus", "opentelemetry-proto", "operator", @@ -4963,6 +4976,7 @@ dependencies = [ "prost 0.13.5", "query", "rand 0.9.1", + "reqwest", "serde", "serde_json", "servers", diff --git a/Cargo.toml b/Cargo.toml index fdc23da7ed..63039ab1ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,6 +139,7 @@ datafusion-substrait = "50" deadpool = "0.12" deadpool-postgres = "0.14" derive_builder = "0.20" +derive_more = { version = "2.1", features = ["full"] } dotenv = "0.15" either = "1.15" etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [ diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index d74b3cee5c..f1c7f137af 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -35,6 +35,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::suspend::SuspendHandler; use common_query::prelude::set_default_prefix; use common_stat::ResourceStatImpl; use common_telemetry::info; @@ -45,7 +46,7 @@ use frontend::frontend::Frontend; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; -use meta_client::{MetaClientOptions, MetaClientType}; +use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType}; use plugins::frontend::context::{ CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext, }; @@ -440,30 +441,13 @@ impl StartCommand { }; let catalog_manager = builder.build(); - let executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), - ]); - - let mut resource_stat = ResourceStatImpl::default(); - resource_stat.start_collect_cpu_usage(); - - let heartbeat_task = HeartbeatTask::new( - &opts, - meta_client.clone(), - opts.heartbeat.clone(), - Arc::new(executor), - Arc::new(resource_stat), - ); - let heartbeat_task = Some(heartbeat_task); - let instance = FrontendBuilder::new( opts.clone(), cached_meta_backend.clone(), layered_cache_registry.clone(), catalog_manager, client, - meta_client, + meta_client.clone(), process_manager, ) .with_plugin(plugins.clone()) @@ -471,6 +455,9 @@ impl StartCommand { .try_build() .await .context(error::StartFrontendSnafu)?; + + let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance)); + let instance = Arc::new(instance); let servers = Services::new(opts, instance.clone(), plugins) @@ -487,6 +474,28 @@ impl StartCommand { } } +pub fn create_heartbeat_task( + options: &frontend::frontend::FrontendOptions, + meta_client: MetaClientRef, + instance: &frontend::instance::Instance, +) -> HeartbeatTask { + let executor = Arc::new(HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler), + Arc::new(SuspendHandler::new(instance.suspend())), + Arc::new(InvalidateCacheHandler::new( + instance.cache_invalidator().clone(), + )), + ])); + + let stat = { + let mut stat = ResourceStatImpl::default(); + stat.start_collect_cpu_usage(); + Arc::new(stat) + }; + + HeartbeatTask::new(options, meta_client, executor, stat) +} + #[cfg(test)] mod tests { use std::io::Write; diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 18e6a0c9ae..9b6facda2c 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -21,6 +21,8 @@ pub mod status_code; use http::{HeaderMap, HeaderValue}; pub use snafu; +use crate::status_code::StatusCode; + // HACK - these headers are here for shared in gRPC services. For common HTTP headers, // please define in `src/servers/src/http/header.rs`. pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; @@ -46,6 +48,29 @@ pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap { header } +/// Extract [StatusCode] and error message from [HeaderMap], if any. +/// +/// Note that if the [StatusCode] is illegal, for example, a random number that is not pre-defined +/// as a [StatusCode], the result is still `None`. +pub fn from_header_to_err_code_msg(headers: &HeaderMap) -> Option<(StatusCode, &str)> { + let code = headers + .get(GREPTIME_DB_HEADER_ERROR_CODE) + .and_then(|value| { + value + .to_str() + .ok() + .and_then(|x| x.parse::().ok()) + .and_then(StatusCode::from_u32) + }); + let msg = headers + .get(GREPTIME_DB_HEADER_ERROR_MSG) + .and_then(|x| x.to_str().ok()); + match (code, msg) { + (Some(code), Some(msg)) => Some((code, msg)), + _ => None, + } +} + /// Returns the external root cause of the source error (exclude the current error). pub fn root_source(err: &dyn std::error::Error) -> Option<&dyn std::error::Error> { // There are some divergence about the behavior of the `sources()` API diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 08f33af609..9044cb3057 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -42,6 +42,8 @@ pub enum StatusCode { External = 1007, /// The request is deadline exceeded (typically server-side). DeadlineExceeded = 1008, + /// Service got suspended for various reason. For example, resources exceed limit. + Suspend = 1009, // ====== End of common status code ================ // ====== Begin of SQL related status code ========= @@ -175,7 +177,8 @@ impl StatusCode { | StatusCode::AccessDenied | StatusCode::PermissionDenied | StatusCode::RequestOutdated - | StatusCode::External => false, + | StatusCode::External + | StatusCode::Suspend => false, } } @@ -223,7 +226,8 @@ impl StatusCode { | StatusCode::InvalidAuthHeader | StatusCode::AccessDenied | StatusCode::PermissionDenied - | StatusCode::RequestOutdated => false, + | StatusCode::RequestOutdated + | StatusCode::Suspend => false, } } @@ -347,7 +351,8 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::RegionNotReady => Code::Unavailable, StatusCode::RuntimeResourcesExhausted | StatusCode::RateLimited - | StatusCode::RegionBusy => Code::ResourceExhausted, + | StatusCode::RegionBusy + | StatusCode::Suspend => Code::ResourceExhausted, StatusCode::UnsupportedPasswordType | StatusCode::UserPasswordMismatch | StatusCode::AuthHeaderNotFound diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 34de004e79..557fbac6e0 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -39,7 +39,7 @@ datafusion-functions-aggregate-common.workspace = true datafusion-pg-catalog.workspace = true datafusion-physical-expr.workspace = true datatypes.workspace = true -derive_more = { version = "1", default-features = false, features = ["display"] } +derive_more.workspace = true geo = { version = "0.29", optional = true } geo-types = { version = "0.7", optional = true } geohash = { version = "0.13", optional = true } diff --git a/src/common/function/src/scalars/geo/relation.rs b/src/common/function/src/scalars/geo/relation.rs index 4567e56bb5..ccbbe53000 100644 --- a/src/common/function/src/scalars/geo/relation.rs +++ b/src/common/function/src/scalars/geo/relation.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Display; use std::sync::Arc; use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder}; diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 74485513e9..78af133e8f 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Display, Formatter}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::str::FromStr; @@ -60,7 +61,7 @@ pub trait ClusterInfo { } /// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`. -#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)] pub struct NodeInfoKey { /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`. pub role: Role, @@ -135,7 +136,7 @@ pub struct NodeInfo { pub hostname: String, } -#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)] pub enum Role { Datanode, Frontend, @@ -241,6 +242,12 @@ impl From<&NodeInfoKey> for Vec { } } +impl Display for NodeInfoKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}-{}", self.role, self.node_id) + } +} + impl FromStr for NodeInfo { type Err = Error; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4bac6450a2..5a79f806a8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -272,13 +272,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to send message: {err_msg}"))] - SendMessage { - err_msg: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to serde json"))] SerdeJson { #[snafu(source)] @@ -1118,7 +1111,7 @@ impl ErrorExt for Error { | DeserializeFlexbuffers { .. } | ConvertTimeRanges { .. } => StatusCode::Unexpected, - SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, + GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists, diff --git a/src/common/meta/src/heartbeat/handler.rs b/src/common/meta/src/heartbeat/handler.rs index afa71f0edf..ecc735083b 100644 --- a/src/common/meta/src/heartbeat/handler.rs +++ b/src/common/meta/src/heartbeat/handler.rs @@ -23,6 +23,7 @@ use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; pub mod invalidate_table_cache; pub mod parse_mailbox_message; +pub mod suspend; #[cfg(test)] mod tests; diff --git a/src/common/meta/src/heartbeat/handler/suspend.rs b/src/common/meta/src/heartbeat/handler/suspend.rs new file mode 100644 index 0000000000..56034ded8d --- /dev/null +++ b/src/common/meta/src/heartbeat/handler/suspend.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, atomic}; + +use async_trait::async_trait; +use common_telemetry::{info, warn}; + +use crate::error::Result; +use crate::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; +use crate::instruction::Instruction; + +/// A heartbeat response handler that handles special "suspend" error. +/// It will simply set or clear (if previously set) the inner suspend atomic state. +pub struct SuspendHandler { + suspend: Arc, +} + +impl SuspendHandler { + pub fn new(suspend: Arc) -> Self { + Self { suspend } + } +} + +#[async_trait] +impl HeartbeatResponseHandler for SuspendHandler { + fn is_acceptable(&self, _: &HeartbeatResponseHandlerContext) -> bool { + true + } + + async fn handle(&self, context: &mut HeartbeatResponseHandlerContext) -> Result { + if let Some((_, Instruction::Suspend)) = context.incoming_message.take() { + self.suspend.store(true, atomic::Ordering::Relaxed); + warn!("set suspend state"); + } else if self.suspend.load(atomic::Ordering::Relaxed) { + self.suspend.store(false, atomic::Ordering::Relaxed); + info!("clear suspend state"); + } + Ok(HandleControl::Continue) + } +} diff --git a/src/common/meta/src/heartbeat/mailbox.rs b/src/common/meta/src/heartbeat/mailbox.rs index 538a81b72c..5ee45436a0 100644 --- a/src/common/meta/src/heartbeat/mailbox.rs +++ b/src/common/meta/src/heartbeat/mailbox.rs @@ -15,8 +15,8 @@ use std::sync::Arc; use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::error::SendError; -use crate::error::{self, Result}; use crate::instruction::{Instruction, InstructionReply}; pub type IncomingMessage = (MessageMeta, Instruction); @@ -51,13 +51,8 @@ impl HeartbeatMailbox { Self { sender } } - pub async fn send(&self, message: OutgoingMessage) -> Result<()> { - self.sender.send(message).await.map_err(|e| { - error::SendMessageSnafu { - err_msg: e.to_string(), - } - .build() - }) + pub async fn send(&self, message: OutgoingMessage) -> Result<(), SendError> { + self.sender.send(message).await } } diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index c731c90490..230c076673 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -539,6 +539,8 @@ pub enum Instruction { GetFileRefs(GetFileRefs), /// Triggers garbage collection for a region. GcRegions(GcRegions), + /// Temporary suspend serving reads or writes + Suspend, } impl Instruction { diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5a1279db9b..e202ce9f2c 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -22,6 +22,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::TopicStatsReporter; use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; @@ -281,21 +282,11 @@ impl DatanodeBuilder { open_all_regions.await?; } - let mut resource_stat = ResourceStatImpl::default(); - resource_stat.start_collect_cpu_usage(); - let heartbeat_task = if let Some(meta_client) = meta_client { - Some( - HeartbeatTask::try_new( - &self.opts, - region_server.clone(), - meta_client, - cache_registry, - self.plugins.clone(), - Arc::new(resource_stat), - ) - .await?, - ) + let task = self + .create_heartbeat_task(®ion_server, meta_client, cache_registry) + .await?; + Some(task) } else { None }; @@ -324,6 +315,29 @@ impl DatanodeBuilder { }) } + async fn create_heartbeat_task( + &self, + region_server: &RegionServer, + meta_client: MetaClientRef, + cache_invalidator: CacheInvalidatorRef, + ) -> Result { + let stat = { + let mut stat = ResourceStatImpl::default(); + stat.start_collect_cpu_usage(); + Arc::new(stat) + }; + + HeartbeatTask::try_new( + &self.opts, + region_server.clone(), + meta_client, + cache_invalidator, + self.plugins.clone(), + stat, + ) + .await + } + /// Builds [ObjectStoreManager] from [StorageConfig]. pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result { let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 33ba648830..439527cd12 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -25,6 +25,7 @@ use common_meta::datanode::REGION_STATISTIC_KEY; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::suspend::SuspendHandler; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; @@ -89,8 +90,9 @@ impl HeartbeatTask { opts.heartbeat.interval.as_millis() as u64, )); let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ - region_alive_keeper.clone(), Arc::new(ParseMailboxMessageHandler), + Arc::new(SuspendHandler::new(region_server.suspend())), + region_alive_keeper.clone(), Arc::new( RegionHeartbeatResponseHandler::new(region_server.clone()) .with_open_region_parallelism(opts.init_regions_parallelism), diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 8954513653..9accd138fd 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -99,26 +99,30 @@ impl RegionHeartbeatResponseHandler { self } - fn build_handler(&self, instruction: &Instruction) -> MetaResult> { + fn build_handler( + &self, + instruction: &Instruction, + ) -> MetaResult>> { match instruction { - Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())), - Instruction::OpenRegions(_) => Ok(Box::new( + Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))), + Instruction::OpenRegions(_) => Ok(Some(Box::new( OpenRegionsHandler { open_region_parallelism: self.open_region_parallelism, } .into(), - )), - Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())), - Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())), - Instruction::UpgradeRegions(_) => Ok(Box::new( + ))), + Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))), + Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))), + Instruction::UpgradeRegions(_) => Ok(Some(Box::new( UpgradeRegionsHandler { upgrade_region_parallelism: self.open_region_parallelism, } .into(), - )), - Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())), - Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())), + ))), + Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))), + Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), + Instruction::Suspend => Ok(None), } } } @@ -216,30 +220,24 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { .context(InvalidHeartbeatResponseSnafu)?; let mailbox = ctx.mailbox.clone(); - let region_server = self.region_server.clone(); - let downgrade_tasks = self.downgrade_tasks.clone(); - let flush_tasks = self.flush_tasks.clone(); - let gc_tasks = self.gc_tasks.clone(); - let handler = self.build_handler(&instruction)?; - let _handle = common_runtime::spawn_global(async move { - let reply = handler - .handle( - &HandlerContext { - region_server, - downgrade_tasks, - flush_tasks, - gc_tasks, - }, - instruction, - ) - .await; - - if let Some(reply) = reply - && let Err(e) = mailbox.send((meta, reply)).await - { - error!(e; "Failed to send reply to mailbox"); - } - }); + if let Some(handler) = self.build_handler(&instruction)? { + let context = HandlerContext { + region_server: self.region_server.clone(), + downgrade_tasks: self.downgrade_tasks.clone(), + flush_tasks: self.flush_tasks.clone(), + gc_tasks: self.gc_tasks.clone(), + }; + let _handle = common_runtime::spawn_global(async move { + let reply = handler.handle(&context, instruction).await; + if let Some(reply) = reply + && let Err(e) = mailbox.send((meta, reply)).await + { + let error = e.to_string(); + let (meta, reply) = e.0; + error!("Failed to send reply {reply} to {meta:?}: {error}"); + } + }); + } Ok(HandleControl::Continue) } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 88680ed195..1bba28f9f8 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -17,6 +17,7 @@ mod catalog; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -52,7 +53,9 @@ pub use query::dummy_catalog::{ DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef, }; use serde_json; -use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; +use servers::error::{ + self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu, +}; use servers::grpc::FlightCompression; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; @@ -89,6 +92,7 @@ use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInj pub struct RegionServer { inner: Arc, flight_compression: FlightCompression, + suspend: Arc, } pub struct RegionStat { @@ -136,6 +140,7 @@ impl RegionServer { ), )), flight_compression, + suspend: Arc::new(AtomicBool::new(false)), } } @@ -595,11 +600,21 @@ impl RegionServer { .handle_sync_region(engine_with_status.engine(), region_id, manifest_info) .await } + + fn is_suspended(&self) -> bool { + self.suspend.load(Ordering::Relaxed) + } + + pub(crate) fn suspend(&self) -> Arc { + self.suspend.clone() + } } #[async_trait] impl RegionServerHandler for RegionServer { async fn handle(&self, request: region_request::Body) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT .with_label_values(&[request.as_ref()]); let response = match &request { @@ -644,6 +659,8 @@ impl FlightCraft for RegionServer { &self, request: Request, ) -> TonicResult>> { + ensure!(!self.is_suspended(), SuspendedSnafu); + let ticket = request.into_inner().ticket; let request = api::v1::region::QueryRequest::decode(ticket.as_ref()) .context(servers_error::InvalidFlightTicketSnafu)?; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 24d9c8c5ff..14eed2020c 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -85,6 +85,9 @@ common-test-util.workspace = true datanode.workspace = true datatypes.workspace = true futures.workspace = true +hyper-util = { workspace = true, features = ["tokio"] } +meta-srv.workspace = true +reqwest.workspace = true serde_json.workspace = true strfmt = "0.2" tower.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 710c855958..8c4c1b51fa 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -364,6 +364,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Service suspended"))] + Suspended { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -444,6 +450,8 @@ impl ErrorExt for Error { Error::StatementTimeout { .. } => StatusCode::Cancelled, Error::AcquireLimiter { .. } => StatusCode::Internal, + + Error::Suspended { .. } => StatusCode::Suspend, } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 027f8a4254..a7bed2521a 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -141,7 +141,39 @@ impl Frontend { #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::time::Duration; + + use api::v1::meta::heartbeat_server::HeartbeatServer; + use api::v1::meta::mailbox_message::Payload; + use api::v1::meta::{ + AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage, + Peer, ResponseHeader, Role, heartbeat_server, + }; + use async_trait::async_trait; + use common_error::from_header_to_err_code_msg; + use common_error::status_code::StatusCode; + use common_grpc::channel_manager::ChannelManager; + use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; + use common_meta::heartbeat::handler::HandlerGroupExecutor; + use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; + use common_meta::heartbeat::handler::suspend::SuspendHandler; + use common_meta::instruction::Instruction; + use common_stat::ResourceStatImpl; + use meta_client::MetaClientRef; + use meta_client::client::MetaClientBuilder; + use meta_srv::service::GrpcStream; + use servers::http::HTTP_SERVER; + use servers::http::result::greptime_result_v1::GreptimedbV1Response; + use tokio::sync::mpsc; + use tonic::codec::CompressionEncoding; + use tonic::codegen::tokio_stream::StreamExt; + use tonic::codegen::tokio_stream::wrappers::ReceiverStream; + use tonic::{Request, Response, Status, Streaming}; + use super::*; + use crate::instance::builder::FrontendBuilder; + use crate::server::Services; #[test] fn test_toml() { @@ -149,4 +181,232 @@ mod tests { let toml_string = toml::to_string(&opts).unwrap(); let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap(); } + + struct SuspendableHeartbeatServer { + suspend: Arc, + } + + #[async_trait] + impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer { + type HeartbeatStream = GrpcStream; + + async fn heartbeat( + &self, + request: Request>, + ) -> std::result::Result, Status> { + let (tx, rx) = mpsc::channel(4); + + common_runtime::spawn_global({ + let mut requests = request.into_inner(); + let suspend = self.suspend.clone(); + async move { + while let Some(request) = requests.next().await { + if let Err(e) = request { + let _ = tx.send(Err(e)).await; + return; + } + + let mailbox_message = + suspend.load(Ordering::Relaxed).then(|| MailboxMessage { + payload: Some(Payload::Json( + serde_json::to_string(&Instruction::Suspend).unwrap(), + )), + ..Default::default() + }); + let response = HeartbeatResponse { + header: Some(ResponseHeader::success()), + mailbox_message, + ..Default::default() + }; + + let _ = tx.send(Ok(response)).await; + } + } + }); + + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } + + async fn ask_leader( + &self, + _: Request, + ) -> std::result::Result, Status> { + Ok(Response::new(AskLeaderResponse { + header: Some(ResponseHeader::success()), + leader: Some(Peer { + addr: "localhost:0".to_string(), + ..Default::default() + }), + })) + } + } + + async fn create_meta_client( + options: &MetaClientOptions, + heartbeat_server: Arc, + ) -> MetaClientRef { + let (client, server) = tokio::io::duplex(1024); + + // create the heartbeat server: + common_runtime::spawn_global(async move { + let mut router = tonic::transport::Server::builder(); + let router = router.add_service( + HeartbeatServer::from_arc(heartbeat_server) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd), + ); + router + .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)])) + .await + }); + + // Move client to an option so we can _move_ the inner value + // on the first attempt to connect. All other attempts will fail. + let mut client = Some(client); + let connector = tower::service_fn(move |_| { + let client = client.take(); + async move { + if let Some(client) = client { + Ok(hyper_util::rt::TokioIo::new(client)) + } else { + Err(std::io::Error::other("client already taken")) + } + } + }); + let manager = ChannelManager::new(); + manager + .reset_with_connector("localhost:0", connector) + .unwrap(); + + // create the heartbeat client: + let mut client = MetaClientBuilder::new(0, Role::Frontend) + .enable_heartbeat() + .heartbeat_channel_manager(manager) + .build(); + client.start(&options.metasrv_addrs).await.unwrap(); + Arc::new(client) + } + + async fn create_frontend( + options: &FrontendOptions, + meta_client: MetaClientRef, + ) -> Result { + let instance = Arc::new( + FrontendBuilder::new_test(options, meta_client.clone()) + .try_build() + .await?, + ); + + let servers = + Services::new(options.clone(), instance.clone(), Default::default()).build()?; + + let executor = Arc::new(HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler), + Arc::new(SuspendHandler::new(instance.suspend())), + ])); + let heartbeat_task = Some(HeartbeatTask::new( + options, + meta_client, + executor, + Arc::new(ResourceStatImpl::default()), + )); + + let mut frontend = Frontend { + instance, + servers, + heartbeat_task, + }; + frontend.start().await?; + Ok(frontend) + } + + async fn verify_suspend_state( + frontend: &Frontend, + expected: std::result::Result<&str, (StatusCode, &str)>, + ) { + let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap(); + let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr)) + .await + .unwrap(); + + let headers = response.headers(); + let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) { + Err((code, error)) + } else { + Ok(response.text().await.unwrap()) + }; + + match (response, expected) { + (Ok(response), Ok(expected)) => { + let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap(); + let response = serde_json::to_string(response.output()).unwrap(); + assert_eq!(&response, expected); + } + (Err(actual), Err(expected)) => assert_eq!(actual, expected), + _ => unreachable!(), + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_suspend_frontend() -> Result<()> { + common_telemetry::init_default_ut_logging(); + + let meta_client_options = MetaClientOptions { + metasrv_addrs: vec!["localhost:0".to_string()], + ..Default::default() + }; + let options = FrontendOptions { + http: HttpOptions { + addr: "127.0.0.1:0".to_string(), + ..Default::default() + }, + grpc: GrpcOptions { + bind_addr: "127.0.0.1:0".to_string(), + ..Default::default() + }, + mysql: MysqlOptions { + enable: false, + ..Default::default() + }, + postgres: PostgresOptions { + enable: false, + ..Default::default() + }, + meta_client: Some(meta_client_options.clone()), + ..Default::default() + }; + + let server = Arc::new(SuspendableHeartbeatServer { + suspend: Arc::new(AtomicBool::new(false)), + }); + let meta_client = create_meta_client(&meta_client_options, server.clone()).await; + let frontend = create_frontend(&options, meta_client).await?; + + tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; + // initial state: not suspend: + assert!(!frontend.instance.is_suspended()); + verify_suspend_state(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await; + + // make heartbeat server returned "suspend" instruction, + server.suspend.store(true, Ordering::Relaxed); + tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; + // ... then the frontend is suspended: + assert!(frontend.instance.is_suspended()); + verify_suspend_state( + &frontend, + Err(( + StatusCode::Suspend, + "error: Service suspended, execution_time_ms: 0", + )), + ) + .await; + + // make heartbeat server NOT returned "suspend" instruction, + server.suspend.store(false, Ordering::Relaxed); + tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; + // ... then frontend's suspend state is cleared: + assert!(!frontend.instance.is_suspended()); + verify_suspend_state(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await; + Ok(()) + } } diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 9c3954b0c6..64680abfd4 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -27,7 +27,6 @@ use common_stat::ResourceStatRef; use common_telemetry::{debug, error, info, warn}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; -use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; @@ -54,7 +53,6 @@ impl HeartbeatTask { pub fn new( opts: &FrontendOptions, meta_client: Arc, - heartbeat_opts: HeartbeatOptions, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, resource_stat: ResourceStatRef, ) -> Self { @@ -68,8 +66,8 @@ impl HeartbeatTask { addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)) }, meta_client, - report_interval: heartbeat_opts.interval, - retry_interval: heartbeat_opts.retry_interval, + report_interval: opts.heartbeat.interval, + retry_interval: opts.heartbeat.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, resource_stat, @@ -196,7 +194,8 @@ impl HeartbeatTask { let report_interval = self.report_interval; let start_time_ms = self.start_time_ms; let self_peer = Some(Peer { - // The peer id doesn't make sense for frontend, so we just set it 0. + // The node id will be actually calculated from its address (by hashing the address + // string) in the metasrv. So it can be set to 0 here, as a placeholder. id: 0, addr: self.peer_addr.clone(), }); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 062bc0cf95..8dba8af0ba 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -26,7 +26,8 @@ mod region_query; pub mod standalone; use std::pin::Pin; -use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, atomic}; use std::time::{Duration, SystemTime}; use async_stream::stream; @@ -69,7 +70,7 @@ use query::query_engine::DescribeResult; use query::query_engine::options::{QueryOptions, validate_catalog_and_schema}; use servers::error::{ self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu, - OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu, + OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, SuspendedSnafu, UnexpectedResultSnafu, }; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, @@ -119,6 +120,7 @@ pub struct Instance { limiter: Option, process_manager: ProcessManagerRef, slow_query_options: SlowQueryOptions, + suspend: Arc, // cache for otlp metrics // first layer key: db-string @@ -171,6 +173,14 @@ impl Instance { pub fn procedure_executor(&self) -> &ProcedureExecutorRef { self.statement_executor.procedure_executor() } + + pub fn suspend(&self) -> Arc { + self.suspend.clone() + } + + pub(crate) fn is_suspended(&self) -> bool { + self.suspend.load(atomic::Ordering::Relaxed) + } } fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result> { @@ -513,6 +523,10 @@ impl SqlQueryHandler for Instance { #[tracing::instrument(skip_all)] async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + if self.is_suspended() { + return vec![error::SuspendedSnafu {}.fail()]; + } + let query_interceptor_opt = self.plugins.get::>(); let query_interceptor = query_interceptor_opt.as_ref(); let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { @@ -580,6 +594,8 @@ impl SqlQueryHandler for Instance { plan: LogicalPlan, query_ctx: QueryContextRef, ) -> Result { + ensure!(!self.is_suspended(), error::SuspendedSnafu); + if should_capture_statement(stmt.as_ref()) { // It's safe to unwrap here because we've already checked the type. let stmt = stmt.unwrap(); @@ -655,6 +671,8 @@ impl SqlQueryHandler for Instance { stmt: Statement, query_ctx: QueryContextRef, ) -> Result> { + ensure!(!self.is_suspended(), error::SuspendedSnafu); + if matches!( stmt, Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_) @@ -710,6 +728,8 @@ impl PrometheusHandler for Instance { query: &PromQuery, query_ctx: QueryContextRef, ) -> server_error::Result { + ensure!(!self.is_suspended(), SuspendedSnafu); + let interceptor = self .plugins .get::>(); @@ -797,6 +817,8 @@ impl PrometheusHandler for Instance { matchers: Vec, ctx: &QueryContextRef, ) -> server_error::Result> { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.handle_query_metric_names(matchers, ctx) .await .map_err(BoxedError::new) @@ -812,6 +834,8 @@ impl PrometheusHandler for Instance { end: SystemTime, ctx: &QueryContextRef, ) -> server_error::Result> { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.handle_query_label_values(metric, label_name, matchers, start, end, ctx) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ff42ba53f2..bd3547b371 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::sync::atomic::AtomicBool; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; @@ -87,6 +88,33 @@ impl FrontendBuilder { } } + #[cfg(test)] + pub(crate) fn new_test( + options: &FrontendOptions, + meta_client: meta_client::MetaClientRef, + ) -> Self { + let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new()); + + let layered_cache_registry = Arc::new( + common_meta::cache::LayeredCacheRegistryBuilder::default() + .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend.clone())) + .build(), + ); + + Self::new( + options.clone(), + kv_backend, + layered_cache_registry, + catalog::memory::MemoryCatalogManager::with_default_setup(), + Arc::new(client::client_manager::NodeClients::default()), + meta_client, + Arc::new(catalog::process_manager::ProcessManager::new( + "".to_string(), + None, + )), + ) + } + pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self { Self { local_cache_invalidator: Some(cache_invalidator), @@ -242,6 +270,7 @@ impl FrontendBuilder { process_manager, otlp_metrics_table_legacy_cache: DashMap::new(), slow_query_options: self.options.slow_query.clone(), + suspend: Arc::new(AtomicBool::new(false)), }) } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 09736c5c7f..aefcc3b968 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -49,7 +49,7 @@ use table::table_name::TableName; use crate::error::{ CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result, - SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu, + SubstraitDecodeLogicalPlanSnafu, SuspendedSnafu, TableNotFoundSnafu, TableOperationSnafu, }; use crate::instance::{Instance, attach_timer}; use crate::metrics::{ @@ -61,6 +61,8 @@ impl GrpcQueryHandler for Instance { type Error = Error; async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { + ensure!(!self.is_suspended(), SuspendedSnafu); + let interceptor_ref = self.plugins.get::>(); let interceptor = interceptor_ref.as_ref(); interceptor.pre_execute(&request, ctx.clone())?; diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 1b8e0c9f46..29bc7f9f3a 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -22,13 +22,14 @@ use common_error::ext::BoxedError; use common_time::Timestamp; use common_time::timestamp::TimeUnit; use servers::error::{ - AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu, + AuthSnafu, CatalogSnafu, Error, OtherSnafu, SuspendedSnafu, TimestampOverflowSnafu, + UnexpectedResultSnafu, }; use servers::influxdb::InfluxdbRequest; use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use crate::instance::Instance; @@ -39,6 +40,8 @@ impl InfluxdbLineProtocolHandler for Instance { request: InfluxdbRequest, ctx: QueryContextRef, ) -> servers::error::Result { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index e7f9388538..d95d05b4ed 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -38,7 +38,7 @@ use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_n use query::QueryEngineRef; use serde_json::Value as JsonValue; use servers::error::{ - CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, + CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, SuspendedSnafu, TableNotFoundSnafu, }; use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent}; @@ -49,7 +49,7 @@ use servers::otlp::trace::{ }; use servers::query_handler::JaegerQueryHandler; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use table::TableRef; use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1}; use table::table::adapter::DfTableProviderAdapter; @@ -65,8 +65,7 @@ impl JaegerQueryHandler for Instance { // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`. Ok(query_trace_table( ctx, - self.catalog_manager(), - self.query_engine(), + self, vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))], vec![], vec![], @@ -107,8 +106,7 @@ impl JaegerQueryHandler for Instance { // ```. Ok(query_trace_table( ctx, - self.catalog_manager(), - self.query_engine(), + self, vec![ SelectExpr::from(col(SPAN_NAME_COLUMN)), SelectExpr::from(col(SPAN_KIND_COLUMN)), @@ -160,8 +158,7 @@ impl JaegerQueryHandler for Instance { Ok(query_trace_table( ctx, - self.catalog_manager(), - self.query_engine(), + self, selects, filters, vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. @@ -220,8 +217,7 @@ impl JaegerQueryHandler for Instance { // ```. let output = query_trace_table( ctx.clone(), - self.catalog_manager(), - self.query_engine(), + self, vec![wildcard()], filters, vec![], @@ -285,8 +281,7 @@ impl JaegerQueryHandler for Instance { // query all spans Ok(query_trace_table( ctx, - self.catalog_manager(), - self.query_engine(), + self, vec![wildcard()], filters, vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. @@ -303,8 +298,7 @@ impl JaegerQueryHandler for Instance { #[allow(clippy::too_many_arguments)] async fn query_trace_table( ctx: QueryContextRef, - catalog_manager: &CatalogManagerRef, - query_engine: &QueryEngineRef, + instance: &Instance, selects: Vec, filters: Vec, sorts: Vec, @@ -312,6 +306,8 @@ async fn query_trace_table( tags: Option>, distincts: Vec, ) -> ServerResult { + ensure!(!instance.is_suspended(), SuspendedSnafu); + let trace_table_name = ctx .extension(JAEGER_QUERY_TABLE_NAME_KEY) .unwrap_or(TRACE_TABLE_NAME); @@ -334,7 +330,8 @@ async fn query_trace_table( } }; - let table = catalog_manager + let table = instance + .catalog_manager() .table( ctx.current_catalog(), &ctx.current_schema(), @@ -367,7 +364,7 @@ async fn query_trace_table( .map(|s| format!("\"{}\"", s)) .collect::>(); - let df_context = create_df_context(query_engine)?; + let df_context = create_df_context(instance.query_engine())?; let dataframe = df_context .read_table(Arc::new(DfTableProviderAdapter::new(table))) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 946f121c37..a5d0180451 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -24,12 +24,12 @@ use pipeline::pipeline_operator::PipelineOperator; use pipeline::{Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{ AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu, - Result as ServerResult, + Result as ServerResult, SuspendedSnafu, }; use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; use session::context::{QueryContext, QueryContextRef}; -use snafu::ResultExt; +use snafu::{ResultExt, ensure}; use table::Table; use crate::instance::Instance; @@ -37,6 +37,8 @@ use crate::instance::Instance; #[async_trait] impl PipelineHandler for Instance { async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() @@ -71,6 +73,8 @@ impl PipelineHandler for Instance { pipeline: &str, query_ctx: QueryContextRef, ) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.pipeline_operator .insert_pipeline(name, content_type, pipeline, query_ctx) .await @@ -83,6 +87,8 @@ impl PipelineHandler for Instance { version: PipelineVersion, ctx: QueryContextRef, ) -> ServerResult> { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.pipeline_operator .delete_pipeline(name, version, ctx) .await diff --git a/src/frontend/src/instance/logs.rs b/src/frontend/src/instance/logs.rs index 8aacb86f5d..74db1b4526 100644 --- a/src/frontend/src/instance/logs.rs +++ b/src/frontend/src/instance/logs.rs @@ -19,11 +19,11 @@ use client::Output; use common_error::ext::BoxedError; use log_query::LogQuery; use server_error::Result as ServerResult; -use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu}; +use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu, SuspendedSnafu}; use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef}; use servers::query_handler::LogQueryHandler; use session::context::{QueryContext, QueryContextRef}; -use snafu::ResultExt; +use snafu::{ResultExt, ensure}; use tonic::async_trait; use crate::instance::Instance; @@ -31,6 +31,8 @@ use crate::instance::Instance; #[async_trait] impl LogQueryHandler for Instance { async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + let interceptor = self .plugins .get::>(); diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index d7deb840cb..7d50adf8ac 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -16,7 +16,9 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; use common_telemetry::tracing; -use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu}; +use servers::error::{ + self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu, SuspendedSnafu, +}; use servers::opentsdb::codec::DataPoint; use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; @@ -33,6 +35,8 @@ impl OpentsdbProtocolHandler for Instance { data_points: Vec, ctx: QueryContextRef, ) -> server_error::Result { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index f0b152e08d..1240a0d331 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -24,13 +24,13 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; -use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult, SuspendedSnafu}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef}; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{ResultExt, ensure}; use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::instance::Instance; @@ -44,6 +44,8 @@ impl OpenTelemetryProtocolHandler for Instance { request: ExportMetricsServiceRequest, ctx: QueryContextRef, ) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() @@ -123,6 +125,8 @@ impl OpenTelemetryProtocolHandler for Instance { table_name: String, ctx: QueryContextRef, ) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() @@ -170,6 +174,8 @@ impl OpenTelemetryProtocolHandler for Instance { table_name: String, ctx: QueryContextRef, ) -> ServerResult> { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index e9c9499372..a9ba617340 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing}; use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; -use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, Result as ServerResult, SuspendedSnafu}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef}; @@ -39,7 +39,7 @@ use servers::query_handler::{ PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse, }; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use crate::error::{ CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result, @@ -165,6 +165,8 @@ impl PromStoreProtocolHandler for Instance { ctx: QueryContextRef, with_metric_engine: bool, ) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() @@ -211,6 +213,8 @@ impl PromStoreProtocolHandler for Instance { request: ReadRequest, ctx: QueryContextRef, ) -> ServerResult { + ensure!(!self.is_suspended(), SuspendedSnafu); + self.plugins .get::() .as_ref() diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 8f7aba2f92..778c124f7f 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -32,7 +32,7 @@ use collect_leader_region_handler::CollectLeaderRegionHandler; use collect_stats_handler::CollectStatsHandler; use common_base::Plugins; use common_meta::datanode::Stat; -use common_meta::instruction::{Instruction, InstructionReply}; +use common_meta::instruction::InstructionReply; use common_meta::sequence::Sequence; use common_telemetry::{debug, info, warn}; use dashmap::DashMap; @@ -114,16 +114,19 @@ pub enum HandleControl { #[derive(Debug, Default)] pub struct HeartbeatAccumulator { pub header: Option, - pub instructions: Vec, + mailbox_message: Option, pub stat: Option, pub inactive_region_ids: HashSet, pub region_lease: Option, } impl HeartbeatAccumulator { - pub fn into_mailbox_message(self) -> Option { - // TODO(jiachun): to HeartbeatResponse payload - None + pub(crate) fn take_mailbox_message(&mut self) -> Option { + self.mailbox_message.take() + } + + pub fn set_mailbox_message(&mut self, message: MailboxMessage) { + let _ = self.mailbox_message.insert(message); } } @@ -351,10 +354,11 @@ impl HeartbeatHandlerGroup { } } let header = std::mem::take(&mut acc.header); + let mailbox_message = acc.take_mailbox_message(); let res = HeartbeatResponse { header, region_lease: acc.region_lease, - ..Default::default() + mailbox_message, }; Ok(res) } @@ -382,7 +386,9 @@ impl HeartbeatMailbox { /// Parses the [Instruction] from [MailboxMessage]. #[cfg(test)] - pub fn json_instruction(msg: &MailboxMessage) -> Result { + pub(crate) fn json_instruction( + msg: &MailboxMessage, + ) -> Result { let Payload::Json(payload) = msg.payload .as_ref() diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2e39f80c85..09d343f2c9 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -651,6 +651,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Service suspended"))] + Suspended { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -777,6 +783,8 @@ impl ErrorExt for Error { HandleOtelArrowRequest { .. } => StatusCode::Internal, Cancelled { .. } => StatusCode::Cancelled, + + Suspended { .. } => StatusCode::Suspend, } } @@ -857,7 +865,8 @@ pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode { | StatusCode::TableUnavailable | StatusCode::RegionBusy | StatusCode::StorageUnavailable - | StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE, + | StatusCode::External + | StatusCode::Suspend => HttpStatusCode::SERVICE_UNAVAILABLE, StatusCode::Internal | StatusCode::Unexpected diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 7165a893d7..b6d38b7394 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -508,5 +508,6 @@ fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind { StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE, StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR, StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE, + StatusCode::Suspend => ErrorKind::ER_SERVER_SHUTDOWN, } } diff --git a/src/servers/src/postgres/types/error.rs b/src/servers/src/postgres/types/error.rs index 143f02342a..2760de9c3f 100644 --- a/src/servers/src/postgres/types/error.rs +++ b/src/servers/src/postgres/types/error.rs @@ -295,6 +295,10 @@ pub enum PgErrorCode { /// operator_intervention #[snafu(display("operator_intervention"))] Ec57000 = 3600, + + /// cannot_connect_now + #[snafu(display("cannot_connect_now"))] + Ec57P03 = 3601, // === End of Class 57 — Operator Intervention ===== // === Begin of Class 58 — System Error (errors external to PostgreSQL itself) === @@ -374,6 +378,7 @@ impl From for PgErrorCode { StatusCode::Unsupported => PgErrorCode::Ec0A000, StatusCode::InvalidArguments => PgErrorCode::Ec22023, StatusCode::Cancelled => PgErrorCode::Ec57000, + StatusCode::Suspend => PgErrorCode::Ec57P03, StatusCode::DeadlineExceeded => PgErrorCode::Ec57000, StatusCode::External => PgErrorCode::Ec58000, diff --git a/src/session/Cargo.toml b/src/session/Cargo.toml index d6ee98650f..5b8b60f5ab 100644 --- a/src/session/Cargo.toml +++ b/src/session/Cargo.toml @@ -24,6 +24,6 @@ common-telemetry.workspace = true common-time.workspace = true datafusion-common.workspace = true derive_builder.workspace = true -derive_more = { version = "1", default-features = false, features = ["debug"] } +derive_more.workspace = true snafu.workspace = true sql.workspace = true diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index fc3803f32e..282ab110e7 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -30,13 +30,11 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, use catalog::process_manager::ProcessManager; use client::Client; use client::client_manager::NodeClients; +use cmd::frontend::create_heartbeat_task; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::DatanodeId; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; -use common_meta::heartbeat::handler::HandlerGroupExecutor; -use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; -use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; @@ -44,14 +42,12 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_runtime::Builder as RuntimeBuilder; use common_runtime::runtime::BuilderBuild; -use common_stat::ResourceStatImpl; use common_test_util::temp_dir::create_temp_dir; use common_time::util::DefaultSystemTimer; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use frontend::frontend::{Frontend, FrontendOptions}; -use frontend::heartbeat::HeartbeatTask; use frontend::instance::Instance as FeInstance; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; @@ -68,7 +64,6 @@ use rand::Rng; use servers::grpc::GrpcOptions; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; -use servers::heartbeat_options::HeartbeatOptions; use servers::server::ServerHandlers; use tempfile::TempDir; use tonic::codec::CompressionEncoding; @@ -427,31 +422,15 @@ impl GreptimeDbClusterBuilder { ) .build(); - let handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateCacheHandler::new(cache_registry.clone())), - ]); - let fe_opts = self.build_frontend_options(); - let mut resource_stat = ResourceStatImpl::default(); - resource_stat.start_collect_cpu_usage(); - - let heartbeat_task = HeartbeatTask::new( - &fe_opts, - meta_client.clone(), - HeartbeatOptions::default(), - Arc::new(handlers_executor), - Arc::new(resource_stat), - ); - let instance = FrontendBuilder::new( fe_opts.clone(), cached_meta_backend.clone(), cache_registry.clone(), catalog_manager, datanode_clients, - meta_client, + meta_client.clone(), Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)), ) .with_local_cache_invalidator(cache_registry) @@ -459,6 +438,8 @@ impl GreptimeDbClusterBuilder { .await .unwrap(); + let heartbeat_task = create_heartbeat_task(&fe_opts, meta_client, &instance); + let instance = Arc::new(instance); // Build the servers for the frontend.