feat: suspend frontend and datanode (#7370)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-12-10 20:18:24 +08:00
committed by GitHub
parent ab426cbf89
commit f1abe5d215
34 changed files with 751 additions and 150 deletions

23
Cargo.lock generated
View File

@@ -2846,6 +2846,15 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "convert_case"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9"
dependencies = [
"unicode-segmentation",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@@ -4185,21 +4194,23 @@ dependencies = [
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "1.0.0" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618"
dependencies = [ dependencies = [
"derive_more-impl", "derive_more-impl",
] ]
[[package]] [[package]]
name = "derive_more-impl" name = "derive_more-impl"
version = "1.0.0" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b"
dependencies = [ dependencies = [
"convert_case 0.10.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustc_version",
"syn 2.0.106", "syn 2.0.106",
"unicode-xid", "unicode-xid",
] ]
@@ -4916,6 +4927,7 @@ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
"auth", "auth",
"axum 0.8.4",
"bytes", "bytes",
"cache", "cache",
"catalog", "catalog",
@@ -4950,9 +4962,11 @@ dependencies = [
"hostname 0.4.1", "hostname 0.4.1",
"humantime", "humantime",
"humantime-serde", "humantime-serde",
"hyper-util",
"lazy_static", "lazy_static",
"log-query", "log-query",
"meta-client", "meta-client",
"meta-srv",
"num_cpus", "num_cpus",
"opentelemetry-proto", "opentelemetry-proto",
"operator", "operator",
@@ -4964,6 +4978,7 @@ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"query", "query",
"rand 0.9.1", "rand 0.9.1",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"servers", "servers",

View File

@@ -139,6 +139,7 @@ datafusion-substrait = "50"
deadpool = "0.12" deadpool = "0.12"
deadpool-postgres = "0.14" deadpool-postgres = "0.14"
derive_builder = "0.20" derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15" dotenv = "0.15"
either = "1.15" either = "1.15"
etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [

View File

@@ -35,6 +35,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_query::prelude::set_default_prefix; use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl; use common_stat::ResourceStatImpl;
use common_telemetry::info; use common_telemetry::info;
@@ -45,7 +46,7 @@ use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask; use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder; use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services; use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType}; use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use plugins::frontend::context::{ use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext, CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
}; };
@@ -440,30 +441,13 @@ impl StartCommand {
}; };
let catalog_manager = builder.build(); let catalog_manager = builder.build();
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);
let heartbeat_task = Some(heartbeat_task);
let instance = FrontendBuilder::new( let instance = FrontendBuilder::new(
opts.clone(), opts.clone(),
cached_meta_backend.clone(), cached_meta_backend.clone(),
layered_cache_registry.clone(), layered_cache_registry.clone(),
catalog_manager, catalog_manager,
client, client,
meta_client, meta_client.clone(),
process_manager, process_manager,
) )
.with_plugin(plugins.clone()) .with_plugin(plugins.clone())
@@ -471,6 +455,9 @@ impl StartCommand {
.try_build() .try_build()
.await .await
.context(error::StartFrontendSnafu)?; .context(error::StartFrontendSnafu)?;
let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
let instance = Arc::new(instance); let instance = Arc::new(instance);
let servers = Services::new(opts, instance.clone(), plugins) let servers = Services::new(opts, instance.clone(), plugins)
@@ -487,6 +474,28 @@ impl StartCommand {
} }
} }
pub fn create_heartbeat_task(
options: &frontend::frontend::FrontendOptions,
meta_client: MetaClientRef,
instance: &frontend::instance::Instance,
) -> HeartbeatTask {
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(instance.suspend_state())),
Arc::new(InvalidateCacheHandler::new(
instance.cache_invalidator().clone(),
)),
]));
let stat = {
let mut stat = ResourceStatImpl::default();
stat.start_collect_cpu_usage();
Arc::new(stat)
};
HeartbeatTask::new(options, meta_client, executor, stat)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Write; use std::io::Write;

View File

@@ -21,6 +21,8 @@ pub mod status_code;
use http::{HeaderMap, HeaderValue}; use http::{HeaderMap, HeaderValue};
pub use snafu; pub use snafu;
use crate::status_code::StatusCode;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers, // HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`. // please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
@@ -46,6 +48,29 @@ pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
header header
} }
/// Extract [StatusCode] and error message from [HeaderMap], if any.
///
/// Note that if the [StatusCode] is illegal, for example, a random number that is not pre-defined
/// as a [StatusCode], the result is still `None`.
pub fn from_header_to_err_code_msg(headers: &HeaderMap) -> Option<(StatusCode, &str)> {
let code = headers
.get(GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|value| {
value
.to_str()
.ok()
.and_then(|x| x.parse::<u32>().ok())
.and_then(StatusCode::from_u32)
});
let msg = headers
.get(GREPTIME_DB_HEADER_ERROR_MSG)
.and_then(|x| x.to_str().ok());
match (code, msg) {
(Some(code), Some(msg)) => Some((code, msg)),
_ => None,
}
}
/// Returns the external root cause of the source error (exclude the current error). /// Returns the external root cause of the source error (exclude the current error).
pub fn root_source(err: &dyn std::error::Error) -> Option<&dyn std::error::Error> { pub fn root_source(err: &dyn std::error::Error) -> Option<&dyn std::error::Error> {
// There are some divergence about the behavior of the `sources()` API // There are some divergence about the behavior of the `sources()` API

View File

@@ -42,6 +42,8 @@ pub enum StatusCode {
External = 1007, External = 1007,
/// The request is deadline exceeded (typically server-side). /// The request is deadline exceeded (typically server-side).
DeadlineExceeded = 1008, DeadlineExceeded = 1008,
/// Service got suspended for various reason. For example, resources exceed limit.
Suspended = 1009,
// ====== End of common status code ================ // ====== End of common status code ================
// ====== Begin of SQL related status code ========= // ====== Begin of SQL related status code =========
@@ -175,7 +177,8 @@ impl StatusCode {
| StatusCode::AccessDenied | StatusCode::AccessDenied
| StatusCode::PermissionDenied | StatusCode::PermissionDenied
| StatusCode::RequestOutdated | StatusCode::RequestOutdated
| StatusCode::External => false, | StatusCode::External
| StatusCode::Suspended => false,
} }
} }
@@ -223,7 +226,8 @@ impl StatusCode {
| StatusCode::InvalidAuthHeader | StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied | StatusCode::AccessDenied
| StatusCode::PermissionDenied | StatusCode::PermissionDenied
| StatusCode::RequestOutdated => false, | StatusCode::RequestOutdated
| StatusCode::Suspended => false,
} }
} }
@@ -347,7 +351,8 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::RegionNotReady => Code::Unavailable, | StatusCode::RegionNotReady => Code::Unavailable,
StatusCode::RuntimeResourcesExhausted StatusCode::RuntimeResourcesExhausted
| StatusCode::RateLimited | StatusCode::RateLimited
| StatusCode::RegionBusy => Code::ResourceExhausted, | StatusCode::RegionBusy
| StatusCode::Suspended => Code::ResourceExhausted,
StatusCode::UnsupportedPasswordType StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch | StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound | StatusCode::AuthHeaderNotFound

View File

@@ -39,7 +39,7 @@ datafusion-functions-aggregate-common.workspace = true
datafusion-pg-catalog.workspace = true datafusion-pg-catalog.workspace = true
datafusion-physical-expr.workspace = true datafusion-physical-expr.workspace = true
datatypes.workspace = true datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] } derive_more.workspace = true
geo = { version = "0.29", optional = true } geo = { version = "0.29", optional = true }
geo-types = { version = "0.7", optional = true } geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true } geohash = { version = "0.13", optional = true }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder}; use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr; use std::str::FromStr;
@@ -60,7 +61,7 @@ pub trait ClusterInfo {
} }
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`. /// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub struct NodeInfoKey { pub struct NodeInfoKey {
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`. /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role, pub role: Role,
@@ -135,7 +136,7 @@ pub struct NodeInfo {
pub hostname: String, pub hostname: String,
} }
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub enum Role { pub enum Role {
Datanode, Datanode,
Frontend, Frontend,
@@ -241,6 +242,12 @@ impl From<&NodeInfoKey> for Vec<u8> {
} }
} }
impl Display for NodeInfoKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}-{}", self.role, self.node_id)
}
}
impl FromStr for NodeInfo { impl FromStr for NodeInfo {
type Err = Error; type Err = Error;

View File

@@ -272,13 +272,6 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))] #[snafu(display("Failed to serde json"))]
SerdeJson { SerdeJson {
#[snafu(source)] #[snafu(source)]
@@ -1118,7 +1111,7 @@ impl ErrorExt for Error {
| DeserializeFlexbuffers { .. } | DeserializeFlexbuffers { .. }
| ConvertTimeRanges { .. } => StatusCode::Unexpected, | ConvertTimeRanges { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists, SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,

View File

@@ -23,6 +23,7 @@ use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod invalidate_table_cache; pub mod invalidate_table_cache;
pub mod parse_mailbox_message; pub mod parse_mailbox_message;
pub mod suspend;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use common_telemetry::{info, warn};
use crate::error::Result;
use crate::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use crate::instruction::Instruction;
/// A heartbeat response handler that handles special "suspend" error.
/// It will simply set or clear (if previously set) the inner suspend atomic state.
pub struct SuspendHandler {
suspend: Arc<AtomicBool>,
}
impl SuspendHandler {
pub fn new(suspend: Arc<AtomicBool>) -> Self {
Self { suspend }
}
}
#[async_trait]
impl HeartbeatResponseHandler for SuspendHandler {
fn is_acceptable(&self, context: &HeartbeatResponseHandlerContext) -> bool {
matches!(
context.incoming_message,
Some((_, Instruction::Suspend)) | None
)
}
async fn handle(&self, context: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
let flip_state = |expect: bool| {
self.suspend
.compare_exchange(expect, !expect, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
};
if let Some((_, Instruction::Suspend)) = context.incoming_message.take() {
if flip_state(false) {
warn!("Suspend instruction received from meta, entering suspension state");
}
} else {
// Suspended components are made always tried to get rid of this state, we don't want
// an "un-suspend" instruction to resume them running. That can be error-prone.
// So if the "suspend" instruction is not found in the heartbeat, just unset the state.
if flip_state(true) {
info!("clear suspend state");
}
}
Ok(HandleControl::Continue)
}
}

View File

@@ -15,8 +15,8 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::error::SendError;
use crate::error::{self, Result};
use crate::instruction::{Instruction, InstructionReply}; use crate::instruction::{Instruction, InstructionReply};
pub type IncomingMessage = (MessageMeta, Instruction); pub type IncomingMessage = (MessageMeta, Instruction);
@@ -51,13 +51,8 @@ impl HeartbeatMailbox {
Self { sender } Self { sender }
} }
pub async fn send(&self, message: OutgoingMessage) -> Result<()> { pub async fn send(&self, message: OutgoingMessage) -> Result<(), SendError<OutgoingMessage>> {
self.sender.send(message).await.map_err(|e| { self.sender.send(message).await
error::SendMessageSnafu {
err_msg: e.to_string(),
}
.build()
})
} }
} }

View File

@@ -539,6 +539,8 @@ pub enum Instruction {
GetFileRefs(GetFileRefs), GetFileRefs(GetFileRefs),
/// Triggers garbage collection for a region. /// Triggers garbage collection for a region.
GcRegions(GcRegions), GcRegions(GcRegions),
/// Temporary suspend serving reads or writes
Suspend,
} }
impl Instruction { impl Instruction {

View File

@@ -46,6 +46,22 @@ pub enum OutputData {
Stream(SendableRecordBatchStream), Stream(SendableRecordBatchStream),
} }
impl OutputData {
/// Consume the data to pretty printed string.
pub async fn pretty_print(self) -> String {
match self {
OutputData::AffectedRows(x) => {
format!("Affected Rows: {x}")
}
OutputData::RecordBatches(x) => x.pretty_print().unwrap_or_else(|e| e.to_string()),
OutputData::Stream(x) => common_recordbatch::util::collect_batches(x)
.await
.and_then(|x| x.pretty_print())
.unwrap_or_else(|e| e.to_string()),
}
}
}
/// OutputMeta stores meta information produced/generated during the execution /// OutputMeta stores meta information produced/generated during the execution
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct OutputMeta { pub struct OutputMeta {

View File

@@ -22,6 +22,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::TopicStatsReporter; use common_meta::datanode::TopicStatsReporter;
use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
@@ -281,21 +282,11 @@ impl DatanodeBuilder {
open_all_regions.await?; open_all_regions.await?;
} }
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = if let Some(meta_client) = meta_client { let heartbeat_task = if let Some(meta_client) = meta_client {
Some( let task = self
HeartbeatTask::try_new( .create_heartbeat_task(&region_server, meta_client, cache_registry)
&self.opts, .await?;
region_server.clone(), Some(task)
meta_client,
cache_registry,
self.plugins.clone(),
Arc::new(resource_stat),
)
.await?,
)
} else { } else {
None None
}; };
@@ -324,6 +315,29 @@ impl DatanodeBuilder {
}) })
} }
async fn create_heartbeat_task(
&self,
region_server: &RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
) -> Result<HeartbeatTask> {
let stat = {
let mut stat = ResourceStatImpl::default();
stat.start_collect_cpu_usage();
Arc::new(stat)
};
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cache_invalidator,
self.plugins.clone(),
stat,
)
.await
}
/// Builds [ObjectStoreManager] from [StorageConfig]. /// Builds [ObjectStoreManager] from [StorageConfig].
pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> { pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?; let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;

View File

@@ -25,6 +25,7 @@ use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_meta::heartbeat::handler::{ use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
}; };
@@ -91,6 +92,7 @@ impl HeartbeatTask {
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(), region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler), Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(region_server.suspend_state())),
Arc::new( Arc::new(
RegionHeartbeatResponseHandler::new(region_server.clone()) RegionHeartbeatResponseHandler::new(region_server.clone())
.with_open_region_parallelism(opts.init_regions_parallelism), .with_open_region_parallelism(opts.init_regions_parallelism),

View File

@@ -99,26 +99,30 @@ impl RegionHeartbeatResponseHandler {
self self
} }
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> { fn build_handler(
&self,
instruction: &Instruction,
) -> MetaResult<Option<Box<InstructionHandlers>>> {
match instruction { match instruction {
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())), Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
Instruction::OpenRegions(_) => Ok(Box::new( Instruction::OpenRegions(_) => Ok(Some(Box::new(
OpenRegionsHandler { OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism, open_region_parallelism: self.open_region_parallelism,
} }
.into(), .into(),
)), ))),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())), Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())), Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
Instruction::UpgradeRegions(_) => Ok(Box::new( Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
UpgradeRegionsHandler { UpgradeRegionsHandler {
upgrade_region_parallelism: self.open_region_parallelism, upgrade_region_parallelism: self.open_region_parallelism,
} }
.into(), .into(),
)), ))),
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())), Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())), Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::Suspend => Ok(None),
} }
} }
} }
@@ -216,30 +220,24 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
.context(InvalidHeartbeatResponseSnafu)?; .context(InvalidHeartbeatResponseSnafu)?;
let mailbox = ctx.mailbox.clone(); let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone(); if let Some(handler) = self.build_handler(&instruction)? {
let downgrade_tasks = self.downgrade_tasks.clone(); let context = HandlerContext {
let flush_tasks = self.flush_tasks.clone(); region_server: self.region_server.clone(),
let gc_tasks = self.gc_tasks.clone(); downgrade_tasks: self.downgrade_tasks.clone(),
let handler = self.build_handler(&instruction)?; flush_tasks: self.flush_tasks.clone(),
let _handle = common_runtime::spawn_global(async move { gc_tasks: self.gc_tasks.clone(),
let reply = handler };
.handle( let _handle = common_runtime::spawn_global(async move {
&HandlerContext { let reply = handler.handle(&context, instruction).await;
region_server, if let Some(reply) = reply
downgrade_tasks, && let Err(e) = mailbox.send((meta, reply)).await
flush_tasks, {
gc_tasks, let error = e.to_string();
}, let (meta, reply) = e.0;
instruction, error!("Failed to send reply {reply} to {meta:?}: {error}");
) }
.await; });
}
if let Some(reply) = reply
&& let Err(e) = mailbox.send((meta, reply)).await
{
error!(e; "Failed to send reply to mailbox");
}
});
Ok(HandleControl::Continue) Ok(HandleControl::Continue)
} }

View File

@@ -17,6 +17,7 @@ mod catalog;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
@@ -52,7 +53,9 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef, DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
}; };
use serde_json; use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::error::{
self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
};
use servers::grpc::FlightCompression; use servers::grpc::FlightCompression;
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler; use servers::grpc::region_server::RegionServerHandler;
@@ -89,6 +92,7 @@ use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInj
pub struct RegionServer { pub struct RegionServer {
inner: Arc<RegionServerInner>, inner: Arc<RegionServerInner>,
flight_compression: FlightCompression, flight_compression: FlightCompression,
suspend: Arc<AtomicBool>,
} }
pub struct RegionStat { pub struct RegionStat {
@@ -136,6 +140,7 @@ impl RegionServer {
), ),
)), )),
flight_compression, flight_compression,
suspend: Arc::new(AtomicBool::new(false)),
} }
} }
@@ -595,6 +600,14 @@ impl RegionServer {
.handle_sync_region(engine_with_status.engine(), region_id, manifest_info) .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
.await .await
} }
fn is_suspended(&self) -> bool {
self.suspend.load(Ordering::Relaxed)
}
pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
self.suspend.clone()
}
} }
#[async_trait] #[async_trait]
@@ -644,6 +657,8 @@ impl FlightCraft for RegionServer {
&self, &self,
request: Request<Ticket>, request: Request<Ticket>,
) -> TonicResult<Response<TonicStream<FlightData>>> { ) -> TonicResult<Response<TonicStream<FlightData>>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let ticket = request.into_inner().ticket; let ticket = request.into_inner().ticket;
let request = api::v1::region::QueryRequest::decode(ticket.as_ref()) let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?; .context(servers_error::InvalidFlightTicketSnafu)?;

View File

@@ -17,6 +17,7 @@ arc-swap = "1.0"
async-stream.workspace = true async-stream.workspace = true
async-trait.workspace = true async-trait.workspace = true
auth.workspace = true auth.workspace = true
axum.workspace = true
bytes.workspace = true bytes.workspace = true
cache.workspace = true cache.workspace = true
catalog.workspace = true catalog.workspace = true
@@ -85,6 +86,9 @@ common-test-util.workspace = true
datanode.workspace = true datanode.workspace = true
datatypes.workspace = true datatypes.workspace = true
futures.workspace = true futures.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
meta-srv.workspace = true
reqwest.workspace = true
serde_json.workspace = true serde_json.workspace = true
strfmt = "0.2" strfmt = "0.2"
tower.workspace = true tower.workspace = true

View File

@@ -364,6 +364,12 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -444,6 +450,8 @@ impl ErrorExt for Error {
Error::StatementTimeout { .. } => StatusCode::Cancelled, Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::AcquireLimiter { .. } => StatusCode::Internal, Error::AcquireLimiter { .. } => StatusCode::Internal,
Error::Suspended { .. } => StatusCode::Suspended,
} }
} }

View File

@@ -141,7 +141,43 @@ impl Frontend {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{
AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
Peer, ResponseHeader, Role, heartbeat_server,
};
use async_trait::async_trait;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::from_header_to_err_code_msg;
use common_error::status_code::StatusCode;
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_meta::instruction::Instruction;
use common_stat::ResourceStatImpl;
use meta_client::MetaClientRef;
use meta_client::client::MetaClientBuilder;
use meta_srv::service::GrpcStream;
use servers::grpc::{FlightCompression, GRPC_SERVER};
use servers::http::HTTP_SERVER;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use tokio::sync::mpsc;
use tonic::codec::CompressionEncoding;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use super::*; use super::*;
use crate::instance::builder::FrontendBuilder;
use crate::server::Services;
#[test] #[test]
fn test_toml() { fn test_toml() {
@@ -149,4 +185,277 @@ mod tests {
let toml_string = toml::to_string(&opts).unwrap(); let toml_string = toml::to_string(&opts).unwrap();
let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap(); let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
} }
struct SuspendableHeartbeatServer {
suspend: Arc<AtomicBool>,
}
#[async_trait]
impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
async fn heartbeat(
&self,
request: Request<Streaming<HeartbeatRequest>>,
) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
let (tx, rx) = mpsc::channel(4);
common_runtime::spawn_global({
let mut requests = request.into_inner();
let suspend = self.suspend.clone();
async move {
while let Some(request) = requests.next().await {
if let Err(e) = request {
let _ = tx.send(Err(e)).await;
return;
}
let mailbox_message =
suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
payload: Some(Payload::Json(
serde_json::to_string(&Instruction::Suspend).unwrap(),
)),
..Default::default()
});
let response = HeartbeatResponse {
header: Some(ResponseHeader::success()),
mailbox_message,
..Default::default()
};
let _ = tx.send(Ok(response)).await;
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
async fn ask_leader(
&self,
_: Request<AskLeaderRequest>,
) -> std::result::Result<Response<AskLeaderResponse>, Status> {
Ok(Response::new(AskLeaderResponse {
header: Some(ResponseHeader::success()),
leader: Some(Peer {
addr: "localhost:0".to_string(),
..Default::default()
}),
}))
}
}
async fn create_meta_client(
options: &MetaClientOptions,
heartbeat_server: Arc<SuspendableHeartbeatServer>,
) -> MetaClientRef {
let (client, server) = tokio::io::duplex(1024);
// create the heartbeat server:
common_runtime::spawn_global(async move {
let mut router = tonic::transport::Server::builder();
let router = router.add_service(
HeartbeatServer::from_arc(heartbeat_server)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd),
);
router
.serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
let connector = tower::service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(hyper_util::rt::TokioIo::new(client))
} else {
Err(std::io::Error::other("client already taken"))
}
}
});
let manager = ChannelManager::new();
manager
.reset_with_connector("localhost:0", connector)
.unwrap();
// create the heartbeat client:
let mut client = MetaClientBuilder::new(0, Role::Frontend)
.enable_heartbeat()
.heartbeat_channel_manager(manager)
.build();
client.start(&options.metasrv_addrs).await.unwrap();
Arc::new(client)
}
async fn create_frontend(
options: &FrontendOptions,
meta_client: MetaClientRef,
) -> Result<Frontend> {
let instance = Arc::new(
FrontendBuilder::new_test(options, meta_client.clone())
.try_build()
.await?,
);
let servers =
Services::new(options.clone(), instance.clone(), Default::default()).build()?;
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(instance.suspend_state())),
]));
let heartbeat_task = Some(HeartbeatTask::new(
options,
meta_client,
executor,
Arc::new(ResourceStatImpl::default()),
));
let mut frontend = Frontend {
instance,
servers,
heartbeat_task,
};
frontend.start().await?;
Ok(frontend)
}
async fn verify_suspend_state_by_http(
frontend: &Frontend,
expected: std::result::Result<&str, (StatusCode, &str)>,
) {
let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
.await
.unwrap();
let headers = response.headers();
let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
Err((code, error))
} else {
Ok(response.text().await.unwrap())
};
match (response, expected) {
(Ok(response), Ok(expected)) => {
let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
let response = serde_json::to_string(response.output()).unwrap();
assert_eq!(&response, expected);
}
(Err(actual), Err(expected)) => assert_eq!(actual, expected),
_ => unreachable!(),
}
}
async fn verify_suspend_state_by_grpc(
frontend: &Frontend,
expected: std::result::Result<&str, (StatusCode, &str)>,
) {
let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
let client = Client::with_urls([addr.to_string()]);
let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let response = client.sql("SELECT 1").await;
match (response, expected) {
(Ok(response), Ok(expected)) => {
let response = response.data.pretty_print().await;
assert_eq!(&response, expected.trim());
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.status_code(), expected.0);
assert_eq!(actual.output_msg(), expected.1);
}
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_suspend_frontend() -> Result<()> {
common_telemetry::init_default_ut_logging();
let meta_client_options = MetaClientOptions {
metasrv_addrs: vec!["localhost:0".to_string()],
..Default::default()
};
let options = FrontendOptions {
http: HttpOptions {
addr: "127.0.0.1:0".to_string(),
..Default::default()
},
grpc: GrpcOptions {
bind_addr: "127.0.0.1:0".to_string(),
flight_compression: FlightCompression::None,
..Default::default()
},
mysql: MysqlOptions {
enable: false,
..Default::default()
},
postgres: PostgresOptions {
enable: false,
..Default::default()
},
meta_client: Some(meta_client_options.clone()),
..Default::default()
};
let server = Arc::new(SuspendableHeartbeatServer {
suspend: Arc::new(AtomicBool::new(false)),
});
let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
let frontend = create_frontend(&options, meta_client).await?;
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// initial state: not suspend:
assert!(!frontend.instance.is_suspended());
verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
verify_suspend_state_by_grpc(
&frontend,
Ok(r#"
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+"#),
)
.await;
// make heartbeat server returned "suspend" instruction,
server.suspend.store(true, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// ... then the frontend is suspended:
assert!(frontend.instance.is_suspended());
verify_suspend_state_by_http(
&frontend,
Err((
StatusCode::Suspended,
"error: Service suspended, execution_time_ms: 0",
)),
)
.await;
verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
.await;
// make heartbeat server NOT returned "suspend" instruction,
server.suspend.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// ... then frontend's suspend state is cleared:
assert!(!frontend.instance.is_suspended());
verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
verify_suspend_state_by_grpc(
&frontend,
Ok(r#"
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+"#),
)
.await;
Ok(())
}
} }

View File

@@ -27,7 +27,6 @@ use common_stat::ResourceStatRef;
use common_telemetry::{debug, error, info, warn}; use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs; use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt; use snafu::ResultExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
@@ -54,7 +53,6 @@ impl HeartbeatTask {
pub fn new( pub fn new(
opts: &FrontendOptions, opts: &FrontendOptions,
meta_client: Arc<MetaClient>, meta_client: Arc<MetaClient>,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef, resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
resource_stat: ResourceStatRef, resource_stat: ResourceStatRef,
) -> Self { ) -> Self {
@@ -68,8 +66,8 @@ impl HeartbeatTask {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)) addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
}, },
meta_client, meta_client,
report_interval: heartbeat_opts.interval, report_interval: opts.heartbeat.interval,
retry_interval: heartbeat_opts.retry_interval, retry_interval: opts.heartbeat.retry_interval,
resp_handler_executor, resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64, start_time_ms: common_time::util::current_time_millis() as u64,
resource_stat, resource_stat,
@@ -196,7 +194,8 @@ impl HeartbeatTask {
let report_interval = self.report_interval; let report_interval = self.report_interval;
let start_time_ms = self.start_time_ms; let start_time_ms = self.start_time_ms;
let self_peer = Some(Peer { let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0. // The node id will be actually calculated from its address (by hashing the address
// string) in the metasrv. So it can be set to 0 here, as a placeholder.
id: 0, id: 0,
addr: self.peer_addr.clone(), addr: self.peer_addr.clone(),
}); });

View File

@@ -26,7 +26,8 @@ mod region_query;
pub mod standalone; pub mod standalone;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, atomic};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use async_stream::stream; use async_stream::stream;
@@ -119,6 +120,7 @@ pub struct Instance {
limiter: Option<LimiterRef>, limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef, process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions, slow_query_options: SlowQueryOptions,
suspend: Arc<AtomicBool>,
// cache for otlp metrics // cache for otlp metrics
// first layer key: db-string // first layer key: db-string
@@ -171,6 +173,14 @@ impl Instance {
pub fn procedure_executor(&self) -> &ProcedureExecutorRef { pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
self.statement_executor.procedure_executor() self.statement_executor.procedure_executor()
} }
pub fn suspend_state(&self) -> Arc<AtomicBool> {
self.suspend.clone()
}
pub(crate) fn is_suspended(&self) -> bool {
self.suspend.load(atomic::Ordering::Relaxed)
}
} }
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> { fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
@@ -513,6 +523,10 @@ impl SqlQueryHandler for Instance {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> { async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if self.is_suspended() {
return vec![error::SuspendedSnafu {}.fail()];
}
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>(); let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref(); let query_interceptor = query_interceptor_opt.as_ref();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
@@ -580,6 +594,8 @@ impl SqlQueryHandler for Instance {
plan: LogicalPlan, plan: LogicalPlan,
query_ctx: QueryContextRef, query_ctx: QueryContextRef,
) -> Result<Output> { ) -> Result<Output> {
ensure!(!self.is_suspended(), error::SuspendedSnafu);
if should_capture_statement(stmt.as_ref()) { if should_capture_statement(stmt.as_ref()) {
// It's safe to unwrap here because we've already checked the type. // It's safe to unwrap here because we've already checked the type.
let stmt = stmt.unwrap(); let stmt = stmt.unwrap();
@@ -641,6 +657,10 @@ impl SqlQueryHandler for Instance {
query: &PromQuery, query: &PromQuery,
query_ctx: QueryContextRef, query_ctx: QueryContextRef,
) -> Vec<Result<Output>> { ) -> Vec<Result<Output>> {
if self.is_suspended() {
return vec![error::SuspendedSnafu {}.fail()];
}
// check will be done in prometheus handler's do_query // check will be done in prometheus handler's do_query
let result = PrometheusHandler::do_query(self, query, query_ctx) let result = PrometheusHandler::do_query(self, query, query_ctx)
.await .await
@@ -655,6 +675,8 @@ impl SqlQueryHandler for Instance {
stmt: Statement, stmt: Statement,
query_ctx: QueryContextRef, query_ctx: QueryContextRef,
) -> Result<Option<DescribeResult>> { ) -> Result<Option<DescribeResult>> {
ensure!(!self.is_suspended(), error::SuspendedSnafu);
if matches!( if matches!(
stmt, stmt,
Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_) Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)

View File

@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
@@ -87,6 +88,33 @@ impl FrontendBuilder {
} }
} }
#[cfg(test)]
pub(crate) fn new_test(
options: &FrontendOptions,
meta_client: meta_client::MetaClientRef,
) -> Self {
let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
common_meta::cache::LayeredCacheRegistryBuilder::default()
.add_cache_registry(cache::build_fundamental_cache_registry(kv_backend.clone()))
.build(),
);
Self::new(
options.clone(),
kv_backend,
layered_cache_registry,
catalog::memory::MemoryCatalogManager::with_default_setup(),
Arc::new(client::client_manager::NodeClients::default()),
meta_client,
Arc::new(catalog::process_manager::ProcessManager::new(
"".to_string(),
None,
)),
)
}
pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self { pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
Self { Self {
local_cache_invalidator: Some(cache_invalidator), local_cache_invalidator: Some(cache_invalidator),
@@ -242,6 +270,7 @@ impl FrontendBuilder {
process_manager, process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(), otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(), slow_query_options: self.options.slow_query.clone(),
suspend: Arc::new(AtomicBool::new(false)),
}) })
} }
} }

View File

@@ -65,8 +65,7 @@ impl JaegerQueryHandler for Instance {
// It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`. // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
Ok(query_trace_table( Ok(query_trace_table(
ctx, ctx,
self.catalog_manager(), self,
self.query_engine(),
vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))], vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
vec![], vec![],
vec![], vec![],
@@ -107,8 +106,7 @@ impl JaegerQueryHandler for Instance {
// ```. // ```.
Ok(query_trace_table( Ok(query_trace_table(
ctx, ctx,
self.catalog_manager(), self,
self.query_engine(),
vec![ vec![
SelectExpr::from(col(SPAN_NAME_COLUMN)), SelectExpr::from(col(SPAN_NAME_COLUMN)),
SelectExpr::from(col(SPAN_KIND_COLUMN)), SelectExpr::from(col(SPAN_KIND_COLUMN)),
@@ -160,8 +158,7 @@ impl JaegerQueryHandler for Instance {
Ok(query_trace_table( Ok(query_trace_table(
ctx, ctx,
self.catalog_manager(), self,
self.query_engine(),
selects, selects,
filters, filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
@@ -220,8 +217,7 @@ impl JaegerQueryHandler for Instance {
// ```. // ```.
let output = query_trace_table( let output = query_trace_table(
ctx.clone(), ctx.clone(),
self.catalog_manager(), self,
self.query_engine(),
vec![wildcard()], vec![wildcard()],
filters, filters,
vec![], vec![],
@@ -285,8 +281,7 @@ impl JaegerQueryHandler for Instance {
// query all spans // query all spans
Ok(query_trace_table( Ok(query_trace_table(
ctx, ctx,
self.catalog_manager(), self,
self.query_engine(),
vec![wildcard()], vec![wildcard()],
filters, filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
@@ -303,8 +298,7 @@ impl JaegerQueryHandler for Instance {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn query_trace_table( async fn query_trace_table(
ctx: QueryContextRef, ctx: QueryContextRef,
catalog_manager: &CatalogManagerRef, instance: &Instance,
query_engine: &QueryEngineRef,
selects: Vec<SelectExpr>, selects: Vec<SelectExpr>,
filters: Vec<Expr>, filters: Vec<Expr>,
sorts: Vec<SortExpr>, sorts: Vec<SortExpr>,
@@ -334,7 +328,8 @@ async fn query_trace_table(
} }
}; };
let table = catalog_manager let table = instance
.catalog_manager()
.table( .table(
ctx.current_catalog(), ctx.current_catalog(),
&ctx.current_schema(), &ctx.current_schema(),
@@ -367,7 +362,7 @@ async fn query_trace_table(
.map(|s| format!("\"{}\"", s)) .map(|s| format!("\"{}\"", s))
.collect::<HashSet<String>>(); .collect::<HashSet<String>>();
let df_context = create_df_context(query_engine)?; let df_context = create_df_context(instance.query_engine())?;
let dataframe = df_context let dataframe = df_context
.read_table(Arc::new(DfTableProviderAdapter::new(table))) .read_table(Arc::new(DfTableProviderAdapter::new(table)))

View File

@@ -16,6 +16,9 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use auth::UserProviderRef; use auth::UserProviderRef;
use axum::extract::{Request, State};
use axum::middleware::Next;
use axum::response::IntoResponse;
use common_base::Plugins; use common_base::Plugins;
use common_config::Configurable; use common_config::Configurable;
use common_telemetry::info; use common_telemetry::info;
@@ -27,6 +30,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer}; use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef; use servers::http::event::LogValidatorRef;
use servers::http::result::error_result::ErrorResponse;
use servers::http::utils::router::RouterConfigurator; use servers::http::utils::router::RouterConfigurator;
use servers::http::{HttpServer, HttpServerBuilder}; use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef; use servers::interceptor::LogIngestInterceptorRef;
@@ -39,6 +43,7 @@ use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::server::{Server, ServerHandlers}; use servers::server::{Server, ServerHandlers};
use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config}; use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
use snafu::ResultExt; use snafu::ResultExt;
use tonic::Status;
use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu}; use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
use crate::frontend::FrontendOptions; use crate::frontend::FrontendOptions;
@@ -125,7 +130,16 @@ where
builder = builder.with_extra_router(configurator.router()); builder = builder.with_extra_router(configurator.router());
} }
builder builder.add_layer(axum::middleware::from_fn_with_state(
self.instance.clone(),
async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
if state.is_suspended() {
return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
.into_response();
}
next.run(request).await
},
))
} }
pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self { pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
@@ -197,7 +211,17 @@ where
self.instance.clone(), self.instance.clone(),
user_provider.clone(), user_provider.clone(),
)) ))
.flight_handler(flight_handler); .flight_handler(flight_handler)
.add_layer(axum::middleware::from_fn_with_state(
self.instance.clone(),
async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
if state.is_suspended() {
let status = Status::from(servers::error::SuspendedSnafu.build());
return status.into_http();
}
next.run(request).await
},
));
let grpc_server = if !external { let grpc_server = if !external {
let frontend_grpc_handler = let frontend_grpc_handler =

View File

@@ -32,7 +32,7 @@ use collect_leader_region_handler::CollectLeaderRegionHandler;
use collect_stats_handler::CollectStatsHandler; use collect_stats_handler::CollectStatsHandler;
use common_base::Plugins; use common_base::Plugins;
use common_meta::datanode::Stat; use common_meta::datanode::Stat;
use common_meta::instruction::{Instruction, InstructionReply}; use common_meta::instruction::InstructionReply;
use common_meta::sequence::Sequence; use common_meta::sequence::Sequence;
use common_telemetry::{debug, info, warn}; use common_telemetry::{debug, info, warn};
use dashmap::DashMap; use dashmap::DashMap;
@@ -114,16 +114,19 @@ pub enum HandleControl {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct HeartbeatAccumulator { pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>, pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>, mailbox_message: Option<MailboxMessage>,
pub stat: Option<Stat>, pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<RegionId>, pub inactive_region_ids: HashSet<RegionId>,
pub region_lease: Option<RegionLease>, pub region_lease: Option<RegionLease>,
} }
impl HeartbeatAccumulator { impl HeartbeatAccumulator {
pub fn into_mailbox_message(self) -> Option<MailboxMessage> { pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
// TODO(jiachun): to HeartbeatResponse payload self.mailbox_message.take()
None }
pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
let _ = self.mailbox_message.insert(message);
} }
} }
@@ -359,10 +362,11 @@ impl HeartbeatHandlerGroup {
} }
} }
let header = std::mem::take(&mut acc.header); let header = std::mem::take(&mut acc.header);
let mailbox_message = acc.take_mailbox_message();
let res = HeartbeatResponse { let res = HeartbeatResponse {
header, header,
region_lease: acc.region_lease, region_lease: acc.region_lease,
..Default::default() mailbox_message,
}; };
Ok(res) Ok(res)
} }
@@ -390,7 +394,9 @@ impl HeartbeatMailbox {
/// Parses the [Instruction] from [MailboxMessage]. /// Parses the [Instruction] from [MailboxMessage].
#[cfg(test)] #[cfg(test)]
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> { pub(crate) fn json_instruction(
msg: &MailboxMessage,
) -> Result<common_meta::instruction::Instruction> {
let Payload::Json(payload) = let Payload::Json(payload) =
msg.payload msg.payload
.as_ref() .as_ref()

View File

@@ -651,6 +651,12 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -777,6 +783,8 @@ impl ErrorExt for Error {
HandleOtelArrowRequest { .. } => StatusCode::Internal, HandleOtelArrowRequest { .. } => StatusCode::Internal,
Cancelled { .. } => StatusCode::Cancelled, Cancelled { .. } => StatusCode::Cancelled,
Suspended { .. } => StatusCode::Suspended,
} }
} }
@@ -857,7 +865,8 @@ pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
| StatusCode::TableUnavailable | StatusCode::TableUnavailable
| StatusCode::RegionBusy | StatusCode::RegionBusy
| StatusCode::StorageUnavailable | StatusCode::StorageUnavailable
| StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE, | StatusCode::External
| StatusCode::Suspended => HttpStatusCode::SERVICE_UNAVAILABLE,
StatusCode::Internal StatusCode::Internal
| StatusCode::Unexpected | StatusCode::Unexpected

View File

@@ -12,21 +12,28 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::convert::Infallible;
use api::v1::frontend::frontend_server::FrontendServer; use api::v1::frontend::frontend_server::FrontendServer;
use api::v1::greptime_database_server::GreptimeDatabaseServer; use api::v1::greptime_database_server::GreptimeDatabaseServer;
use api::v1::prometheus_gateway_server::PrometheusGatewayServer; use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
use api::v1::region::region_server::RegionServer; use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::flight_service_server::FlightServiceServer;
use auth::UserProviderRef; use auth::UserProviderRef;
use axum::extract::Request;
use axum::response::IntoResponse;
use axum::routing::Route;
use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result}; use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
use common_runtime::Runtime; use common_runtime::Runtime;
use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer; use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
use snafu::ResultExt; use snafu::ResultExt;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding; use tonic::codec::CompressionEncoding;
use tonic::codegen::Service;
use tonic::service::RoutesBuilder; use tonic::service::RoutesBuilder;
use tonic::service::interceptor::InterceptedService; use tonic::service::interceptor::InterceptedService;
use tonic::transport::{Identity, ServerTlsConfig}; use tonic::transport::{Identity, ServerTlsConfig};
use tower::Layer;
use crate::grpc::database::DatabaseService; use crate::grpc::database::DatabaseService;
use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper}; use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
@@ -206,6 +213,23 @@ impl GrpcServerBuilder {
Ok(self) Ok(self)
} }
pub fn add_layer<L>(self, layer: L) -> Self
where
L: Layer<Route> + Clone + Send + Sync + 'static,
L::Service: Service<Request> + Clone + Send + Sync + 'static,
<L::Service as Service<Request>>::Response: IntoResponse + 'static,
<L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
<L::Service as Service<Request>>::Future: Send + 'static,
{
let routes = self.routes_builder.routes();
let router = routes.into_axum_router();
let router = router.layer(layer);
Self {
routes_builder: RoutesBuilder::from(router),
..self
}
}
pub fn build(self) -> GrpcServer { pub fn build(self) -> GrpcServer {
GrpcServer { GrpcServer {
routes: Mutex::new(Some(self.routes_builder.routes())), routes: Mutex::new(Some(self.routes_builder.routes())),

View File

@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt::Display; use std::fmt::Display;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
@@ -20,9 +21,10 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use auth::UserProviderRef; use auth::UserProviderRef;
use axum::extract::DefaultBodyLimit; use axum::extract::{DefaultBodyLimit, Request};
use axum::http::StatusCode as HttpStatusCode; use axum::http::StatusCode as HttpStatusCode;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::routing::Route;
use axum::serve::ListenerExt; use axum::serve::ListenerExt;
use axum::{Router, middleware, routing}; use axum::{Router, middleware, routing};
use common_base::Plugins; use common_base::Plugins;
@@ -42,7 +44,8 @@ use serde_json::Value;
use snafu::{ResultExt, ensure}; use snafu::{ResultExt, ensure};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::oneshot::{self, Sender}; use tokio::sync::oneshot::{self, Sender};
use tower::ServiceBuilder; use tonic::codegen::Service;
use tower::{Layer, ServiceBuilder};
use tower_http::compression::CompressionLayer; use tower_http::compression::CompressionLayer;
use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::decompression::RequestDecompressionLayer; use tower_http::decompression::RequestDecompressionLayer;
@@ -732,6 +735,20 @@ impl HttpServerBuilder {
} }
} }
pub fn add_layer<L>(self, layer: L) -> Self
where
L: Layer<Route> + Clone + Send + Sync + 'static,
L::Service: Service<Request> + Clone + Send + Sync + 'static,
<L::Service as Service<Request>>::Response: IntoResponse + 'static,
<L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
<L::Service as Service<Request>>::Future: Send + 'static,
{
Self {
router: self.router.layer(layer),
..self
}
}
pub fn build(self) -> HttpServer { pub fn build(self) -> HttpServer {
let memory_limiter = let memory_limiter =
RequestMemoryLimiter::new(self.options.max_total_body_memory.as_bytes() as usize); RequestMemoryLimiter::new(self.options.max_total_body_memory.as_bytes() as usize);

View File

@@ -508,5 +508,6 @@ fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE, StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR, StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE, StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
StatusCode::Suspended => ErrorKind::ER_SERVER_SHUTDOWN,
} }
} }

View File

@@ -295,6 +295,10 @@ pub enum PgErrorCode {
/// operator_intervention /// operator_intervention
#[snafu(display("operator_intervention"))] #[snafu(display("operator_intervention"))]
Ec57000 = 3600, Ec57000 = 3600,
/// cannot_connect_now
#[snafu(display("cannot_connect_now"))]
Ec57P03 = 3601,
// === End of Class 57 — Operator Intervention ===== // === End of Class 57 — Operator Intervention =====
// === Begin of Class 58 — System Error (errors external to PostgreSQL itself) === // === Begin of Class 58 — System Error (errors external to PostgreSQL itself) ===
@@ -374,6 +378,7 @@ impl From<StatusCode> for PgErrorCode {
StatusCode::Unsupported => PgErrorCode::Ec0A000, StatusCode::Unsupported => PgErrorCode::Ec0A000,
StatusCode::InvalidArguments => PgErrorCode::Ec22023, StatusCode::InvalidArguments => PgErrorCode::Ec22023,
StatusCode::Cancelled => PgErrorCode::Ec57000, StatusCode::Cancelled => PgErrorCode::Ec57000,
StatusCode::Suspended => PgErrorCode::Ec57P03,
StatusCode::DeadlineExceeded => PgErrorCode::Ec57000, StatusCode::DeadlineExceeded => PgErrorCode::Ec57000,
StatusCode::External => PgErrorCode::Ec58000, StatusCode::External => PgErrorCode::Ec58000,

View File

@@ -24,6 +24,6 @@ common-telemetry.workspace = true
common-time.workspace = true common-time.workspace = true
datafusion-common.workspace = true datafusion-common.workspace = true
derive_builder.workspace = true derive_builder.workspace = true
derive_more = { version = "1", default-features = false, features = ["debug"] } derive_more.workspace = true
snafu.workspace = true snafu.workspace = true
sql.workspace = true sql.workspace = true

View File

@@ -30,13 +30,11 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder,
use catalog::process_manager::ProcessManager; use catalog::process_manager::ProcessManager;
use client::Client; use client::Client;
use client::client_manager::NodeClients; use client::client_manager::NodeClients;
use cmd::frontend::create_heartbeat_task;
use common_base::Plugins; use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::DatanodeId; use common_meta::DatanodeId;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::etcd::EtcdStore;
@@ -44,14 +42,12 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_runtime::Builder as RuntimeBuilder; use common_runtime::Builder as RuntimeBuilder;
use common_runtime::runtime::BuilderBuild; use common_runtime::runtime::BuilderBuild;
use common_stat::ResourceStatImpl;
use common_test_util::temp_dir::create_temp_dir; use common_test_util::temp_dir::create_temp_dir;
use common_time::util::DefaultSystemTimer; use common_time::util::DefaultSystemTimer;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::DatanodeOptions; use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::frontend::{Frontend, FrontendOptions}; use frontend::frontend::{Frontend, FrontendOptions};
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::Instance as FeInstance; use frontend::instance::Instance as FeInstance;
use frontend::instance::builder::FrontendBuilder; use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services; use frontend::server::Services;
@@ -68,7 +64,6 @@ use rand::Rng;
use servers::grpc::GrpcOptions; use servers::grpc::GrpcOptions;
use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler; use servers::grpc::region_server::RegionServerRequestHandler;
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::ServerHandlers; use servers::server::ServerHandlers;
use tempfile::TempDir; use tempfile::TempDir;
use tonic::codec::CompressionEncoding; use tonic::codec::CompressionEncoding;
@@ -427,31 +422,15 @@ impl GreptimeDbClusterBuilder {
) )
.build(); .build();
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(cache_registry.clone())),
]);
let fe_opts = self.build_frontend_options(); let fe_opts = self.build_frontend_options();
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = HeartbeatTask::new(
&fe_opts,
meta_client.clone(),
HeartbeatOptions::default(),
Arc::new(handlers_executor),
Arc::new(resource_stat),
);
let instance = FrontendBuilder::new( let instance = FrontendBuilder::new(
fe_opts.clone(), fe_opts.clone(),
cached_meta_backend.clone(), cached_meta_backend.clone(),
cache_registry.clone(), cache_registry.clone(),
catalog_manager, catalog_manager,
datanode_clients, datanode_clients,
meta_client, meta_client.clone(),
Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)), Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)),
) )
.with_local_cache_invalidator(cache_registry) .with_local_cache_invalidator(cache_registry)
@@ -459,6 +438,8 @@ impl GreptimeDbClusterBuilder {
.await .await
.unwrap(); .unwrap();
let heartbeat_task = create_heartbeat_task(&fe_opts, meta_client, &instance);
let instance = Arc::new(instance); let instance = Arc::new(instance);
// Build the servers for the frontend. // Build the servers for the frontend.