feat: suspend frontend and datanode

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2025-12-04 19:50:34 +08:00
parent 84e4e42ee7
commit 339affea69
37 changed files with 650 additions and 162 deletions

22
Cargo.lock generated
View File

@@ -2845,6 +2845,15 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "convert_case"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@@ -4184,21 +4193,23 @@ dependencies = [
[[package]]
name = "derive_more"
version = "1.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05"
checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "1.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22"
checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b"
dependencies = [
"convert_case 0.10.0",
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.106",
"unicode-xid",
]
@@ -4949,9 +4960,11 @@ dependencies = [
"hostname 0.4.1",
"humantime",
"humantime-serde",
"hyper-util",
"lazy_static",
"log-query",
"meta-client",
"meta-srv",
"num_cpus",
"opentelemetry-proto",
"operator",
@@ -4963,6 +4976,7 @@ dependencies = [
"prost 0.13.5",
"query",
"rand 0.9.1",
"reqwest",
"serde",
"serde_json",
"servers",

View File

@@ -139,6 +139,7 @@ datafusion-substrait = "50"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [

View File

@@ -35,6 +35,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
@@ -45,7 +46,7 @@ use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
@@ -440,30 +441,13 @@ impl StartCommand {
};
let catalog_manager = builder.build();
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);
let heartbeat_task = Some(heartbeat_task);
let instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
client,
meta_client,
meta_client.clone(),
process_manager,
)
.with_plugin(plugins.clone())
@@ -471,6 +455,9 @@ impl StartCommand {
.try_build()
.await
.context(error::StartFrontendSnafu)?;
let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
let instance = Arc::new(instance);
let servers = Services::new(opts, instance.clone(), plugins)
@@ -487,6 +474,28 @@ impl StartCommand {
}
}
pub fn create_heartbeat_task(
options: &frontend::frontend::FrontendOptions,
meta_client: MetaClientRef,
instance: &frontend::instance::Instance,
) -> HeartbeatTask {
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(instance.suspend())),
Arc::new(InvalidateCacheHandler::new(
instance.cache_invalidator().clone(),
)),
]));
let stat = {
let mut stat = ResourceStatImpl::default();
stat.start_collect_cpu_usage();
Arc::new(stat)
};
HeartbeatTask::new(options, meta_client, executor, stat)
}
#[cfg(test)]
mod tests {
use std::io::Write;

View File

@@ -21,6 +21,8 @@ pub mod status_code;
use http::{HeaderMap, HeaderValue};
pub use snafu;
use crate::status_code::StatusCode;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
@@ -46,6 +48,29 @@ pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
header
}
/// Extract [StatusCode] and error message from [HeaderMap], if any.
///
/// Note that if the [StatusCode] is illegal, for example, a random number that is not pre-defined
/// as a [StatusCode], the result is still `None`.
pub fn from_header_to_err_code_msg(headers: &HeaderMap) -> Option<(StatusCode, &str)> {
let code = headers
.get(GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|value| {
value
.to_str()
.ok()
.and_then(|x| x.parse::<u32>().ok())
.and_then(StatusCode::from_u32)
});
let msg = headers
.get(GREPTIME_DB_HEADER_ERROR_MSG)
.and_then(|x| x.to_str().ok());
match (code, msg) {
(Some(code), Some(msg)) => Some((code, msg)),
_ => None,
}
}
/// Returns the external root cause of the source error (exclude the current error).
pub fn root_source(err: &dyn std::error::Error) -> Option<&dyn std::error::Error> {
// There are some divergence about the behavior of the `sources()` API

View File

@@ -42,6 +42,8 @@ pub enum StatusCode {
External = 1007,
/// The request is deadline exceeded (typically server-side).
DeadlineExceeded = 1008,
/// Service got suspended for various reason. For example, resources exceed limit.
Suspend = 1009,
// ====== End of common status code ================
// ====== Begin of SQL related status code =========
@@ -175,7 +177,8 @@ impl StatusCode {
| StatusCode::AccessDenied
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated
| StatusCode::External => false,
| StatusCode::External
| StatusCode::Suspend => false,
}
}
@@ -223,7 +226,8 @@ impl StatusCode {
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated => false,
| StatusCode::RequestOutdated
| StatusCode::Suspend => false,
}
}
@@ -347,7 +351,8 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::RegionNotReady => Code::Unavailable,
StatusCode::RuntimeResourcesExhausted
| StatusCode::RateLimited
| StatusCode::RegionBusy => Code::ResourceExhausted,
| StatusCode::RegionBusy
| StatusCode::Suspend => Code::ResourceExhausted,
StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound

View File

@@ -39,7 +39,7 @@ datafusion-functions-aggregate-common.workspace = true
datafusion-pg-catalog.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
derive_more.workspace = true
geo = { version = "0.29", optional = true }
geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
@@ -60,7 +61,7 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub struct NodeInfoKey {
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -135,7 +136,7 @@ pub struct NodeInfo {
pub hostname: String,
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub enum Role {
Datanode,
Frontend,
@@ -241,6 +242,12 @@ impl From<&NodeInfoKey> for Vec<u8> {
}
}
impl Display for NodeInfoKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}-{}", self.role, self.node_id)
}
}
impl FromStr for NodeInfo {
type Err = Error;

View File

@@ -272,13 +272,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
@@ -1118,7 +1111,7 @@ impl ErrorExt for Error {
| DeserializeFlexbuffers { .. }
| ConvertTimeRanges { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,

View File

@@ -23,6 +23,7 @@ use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod invalidate_table_cache;
pub mod parse_mailbox_message;
pub mod suspend;
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, atomic};
use async_trait::async_trait;
use common_telemetry::{info, warn};
use crate::error::Result;
use crate::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use crate::instruction::Instruction;
/// A heartbeat response handler that handles special "suspend" error.
/// It will simply set or clear (if previously set) the inner suspend atomic state.
pub struct SuspendHandler {
suspend: Arc<AtomicBool>,
}
impl SuspendHandler {
pub fn new(suspend: Arc<AtomicBool>) -> Self {
Self { suspend }
}
}
#[async_trait]
impl HeartbeatResponseHandler for SuspendHandler {
fn is_acceptable(&self, _: &HeartbeatResponseHandlerContext) -> bool {
true
}
async fn handle(&self, context: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
if let Some((_, Instruction::Suspend)) = context.incoming_message.take() {
self.suspend.store(true, atomic::Ordering::Relaxed);
warn!("set suspend state");
} else if self.suspend.load(atomic::Ordering::Relaxed) {
self.suspend.store(false, atomic::Ordering::Relaxed);
info!("clear suspend state");
}
Ok(HandleControl::Continue)
}
}

View File

@@ -15,8 +15,8 @@
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::error::SendError;
use crate::error::{self, Result};
use crate::instruction::{Instruction, InstructionReply};
pub type IncomingMessage = (MessageMeta, Instruction);
@@ -51,13 +51,8 @@ impl HeartbeatMailbox {
Self { sender }
}
pub async fn send(&self, message: OutgoingMessage) -> Result<()> {
self.sender.send(message).await.map_err(|e| {
error::SendMessageSnafu {
err_msg: e.to_string(),
}
.build()
})
pub async fn send(&self, message: OutgoingMessage) -> Result<(), SendError<OutgoingMessage>> {
self.sender.send(message).await
}
}

View File

@@ -539,6 +539,8 @@ pub enum Instruction {
GetFileRefs(GetFileRefs),
/// Triggers garbage collection for a region.
GcRegions(GcRegions),
/// Temporary suspend serving reads or writes
Suspend,
}
impl Instruction {

View File

@@ -22,6 +22,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::TopicStatsReporter;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
@@ -281,21 +282,11 @@ impl DatanodeBuilder {
open_all_regions.await?;
}
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = if let Some(meta_client) = meta_client {
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cache_registry,
self.plugins.clone(),
Arc::new(resource_stat),
)
.await?,
)
let task = self
.create_heartbeat_task(&region_server, meta_client, cache_registry)
.await?;
Some(task)
} else {
None
};
@@ -324,6 +315,29 @@ impl DatanodeBuilder {
})
}
async fn create_heartbeat_task(
&self,
region_server: &RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
) -> Result<HeartbeatTask> {
let stat = {
let mut stat = ResourceStatImpl::default();
stat.start_collect_cpu_usage();
Arc::new(stat)
};
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cache_invalidator,
self.plugins.clone(),
stat,
)
.await
}
/// Builds [ObjectStoreManager] from [StorageConfig].
pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;

View File

@@ -25,6 +25,7 @@ use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
@@ -89,8 +90,9 @@ impl HeartbeatTask {
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(region_server.suspend())),
region_alive_keeper.clone(),
Arc::new(
RegionHeartbeatResponseHandler::new(region_server.clone())
.with_open_region_parallelism(opts.init_regions_parallelism),

View File

@@ -99,26 +99,30 @@ impl RegionHeartbeatResponseHandler {
self
}
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> {
fn build_handler(
&self,
instruction: &Instruction,
) -> MetaResult<Option<Box<InstructionHandlers>>> {
match instruction {
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())),
Instruction::OpenRegions(_) => Ok(Box::new(
Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
Instruction::OpenRegions(_) => Ok(Some(Box::new(
OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
Instruction::UpgradeRegions(_) => Ok(Box::new(
))),
Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
UpgradeRegionsHandler {
upgrade_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
))),
Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::Suspend => Ok(None),
}
}
}
@@ -216,30 +220,24 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
.context(InvalidHeartbeatResponseSnafu)?;
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let gc_tasks = self.gc_tasks.clone();
let handler = self.build_handler(&instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler
.handle(
&HandlerContext {
region_server,
downgrade_tasks,
flush_tasks,
gc_tasks,
},
instruction,
)
.await;
if let Some(reply) = reply
&& let Err(e) = mailbox.send((meta, reply)).await
{
error!(e; "Failed to send reply to mailbox");
}
});
if let Some(handler) = self.build_handler(&instruction)? {
let context = HandlerContext {
region_server: self.region_server.clone(),
downgrade_tasks: self.downgrade_tasks.clone(),
flush_tasks: self.flush_tasks.clone(),
gc_tasks: self.gc_tasks.clone(),
};
let _handle = common_runtime::spawn_global(async move {
let reply = handler.handle(&context, instruction).await;
if let Some(reply) = reply
&& let Err(e) = mailbox.send((meta, reply)).await
{
let error = e.to_string();
let (meta, reply) = e.0;
error!("Failed to send reply {reply} to {meta:?}: {error}");
}
});
}
Ok(HandleControl::Continue)
}

View File

@@ -17,6 +17,7 @@ mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
@@ -52,7 +53,9 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::error::{
self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
};
use servers::grpc::FlightCompression;
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
@@ -89,6 +92,7 @@ use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInj
pub struct RegionServer {
inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
suspend: Arc<AtomicBool>,
}
pub struct RegionStat {
@@ -136,6 +140,7 @@ impl RegionServer {
),
)),
flight_compression,
suspend: Arc::new(AtomicBool::new(false)),
}
}
@@ -595,11 +600,21 @@ impl RegionServer {
.handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
.await
}
fn is_suspended(&self) -> bool {
self.suspend.load(Ordering::Relaxed)
}
pub(crate) fn suspend(&self) -> Arc<AtomicBool> {
self.suspend.clone()
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
.with_label_values(&[request.as_ref()]);
let response = match &request {
@@ -644,6 +659,8 @@ impl FlightCraft for RegionServer {
&self,
request: Request<Ticket>,
) -> TonicResult<Response<TonicStream<FlightData>>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let ticket = request.into_inner().ticket;
let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?;

View File

@@ -85,6 +85,9 @@ common-test-util.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
meta-srv.workspace = true
reqwest.workspace = true
serde_json.workspace = true
strfmt = "0.2"
tower.workspace = true

View File

@@ -364,6 +364,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -444,6 +450,8 @@ impl ErrorExt for Error {
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::AcquireLimiter { .. } => StatusCode::Internal,
Error::Suspended { .. } => StatusCode::Suspend,
}
}

View File

@@ -141,7 +141,39 @@ impl Frontend {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{
AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
Peer, ResponseHeader, Role, heartbeat_server,
};
use async_trait::async_trait;
use common_error::from_header_to_err_code_msg;
use common_error::status_code::StatusCode;
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_meta::instruction::Instruction;
use common_stat::ResourceStatImpl;
use meta_client::MetaClientRef;
use meta_client::client::MetaClientBuilder;
use meta_srv::service::GrpcStream;
use servers::http::HTTP_SERVER;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use tokio::sync::mpsc;
use tonic::codec::CompressionEncoding;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use super::*;
use crate::instance::builder::FrontendBuilder;
use crate::server::Services;
#[test]
fn test_toml() {
@@ -149,4 +181,232 @@ mod tests {
let toml_string = toml::to_string(&opts).unwrap();
let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
}
struct SuspendableHeartbeatServer {
suspend: Arc<AtomicBool>,
}
#[async_trait]
impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
async fn heartbeat(
&self,
request: Request<Streaming<HeartbeatRequest>>,
) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
let (tx, rx) = mpsc::channel(4);
common_runtime::spawn_global({
let mut requests = request.into_inner();
let suspend = self.suspend.clone();
async move {
while let Some(request) = requests.next().await {
if let Err(e) = request {
let _ = tx.send(Err(e)).await;
return;
}
let mailbox_message =
suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
payload: Some(Payload::Json(
serde_json::to_string(&Instruction::Suspend).unwrap(),
)),
..Default::default()
});
let response = HeartbeatResponse {
header: Some(ResponseHeader::success()),
mailbox_message,
..Default::default()
};
let _ = tx.send(Ok(response)).await;
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
async fn ask_leader(
&self,
_: Request<AskLeaderRequest>,
) -> std::result::Result<Response<AskLeaderResponse>, Status> {
Ok(Response::new(AskLeaderResponse {
header: Some(ResponseHeader::success()),
leader: Some(Peer {
addr: "localhost:0".to_string(),
..Default::default()
}),
}))
}
}
async fn create_meta_client(
options: &MetaClientOptions,
heartbeat_server: Arc<SuspendableHeartbeatServer>,
) -> MetaClientRef {
let (client, server) = tokio::io::duplex(1024);
// create the heartbeat server:
common_runtime::spawn_global(async move {
let mut router = tonic::transport::Server::builder();
let router = router.add_service(
HeartbeatServer::from_arc(heartbeat_server)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd),
);
router
.serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
let connector = tower::service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(hyper_util::rt::TokioIo::new(client))
} else {
Err(std::io::Error::other("client already taken"))
}
}
});
let manager = ChannelManager::new();
manager
.reset_with_connector("localhost:0", connector)
.unwrap();
// create the heartbeat client:
let mut client = MetaClientBuilder::new(0, Role::Frontend)
.enable_heartbeat()
.heartbeat_channel_manager(manager)
.build();
client.start(&options.metasrv_addrs).await.unwrap();
Arc::new(client)
}
async fn create_frontend(
options: &FrontendOptions,
meta_client: MetaClientRef,
) -> Result<Frontend> {
let instance = Arc::new(
FrontendBuilder::new_test(options, meta_client.clone())
.try_build()
.await?,
);
let servers =
Services::new(options.clone(), instance.clone(), Default::default()).build()?;
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(instance.suspend())),
]));
let heartbeat_task = Some(HeartbeatTask::new(
options,
meta_client,
executor,
Arc::new(ResourceStatImpl::default()),
));
let mut frontend = Frontend {
instance,
servers,
heartbeat_task,
};
frontend.start().await?;
Ok(frontend)
}
async fn verify_suspend_state(
frontend: &Frontend,
expected: std::result::Result<&str, (StatusCode, &str)>,
) {
let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
.await
.unwrap();
let headers = response.headers();
let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
Err((code, error))
} else {
Ok(response.text().await.unwrap())
};
match (response, expected) {
(Ok(response), Ok(expected)) => {
let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
let response = serde_json::to_string(response.output()).unwrap();
assert_eq!(&response, expected);
}
(Err(actual), Err(expected)) => assert_eq!(actual, expected),
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_suspend_frontend() -> Result<()> {
common_telemetry::init_default_ut_logging();
let meta_client_options = MetaClientOptions {
metasrv_addrs: vec!["localhost:0".to_string()],
..Default::default()
};
let options = FrontendOptions {
http: HttpOptions {
addr: "127.0.0.1:0".to_string(),
..Default::default()
},
grpc: GrpcOptions {
bind_addr: "127.0.0.1:0".to_string(),
..Default::default()
},
mysql: MysqlOptions {
enable: false,
..Default::default()
},
postgres: PostgresOptions {
enable: false,
..Default::default()
},
meta_client: Some(meta_client_options.clone()),
..Default::default()
};
let server = Arc::new(SuspendableHeartbeatServer {
suspend: Arc::new(AtomicBool::new(false)),
});
let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
let frontend = create_frontend(&options, meta_client).await?;
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// initial state: not suspend:
assert!(!frontend.instance.is_suspended());
verify_suspend_state(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
// make heartbeat server returned "suspend" instruction,
server.suspend.store(true, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// ... then the frontend is suspended:
assert!(frontend.instance.is_suspended());
verify_suspend_state(
&frontend,
Err((
StatusCode::Suspend,
"error: Service suspended, execution_time_ms: 0",
)),
)
.await;
// make heartbeat server NOT returned "suspend" instruction,
server.suspend.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// ... then frontend's suspend state is cleared:
assert!(!frontend.instance.is_suspended());
verify_suspend_state(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
Ok(())
}
}

View File

@@ -27,7 +27,6 @@ use common_stat::ResourceStatRef;
use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
@@ -54,7 +53,6 @@ impl HeartbeatTask {
pub fn new(
opts: &FrontendOptions,
meta_client: Arc<MetaClient>,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
resource_stat: ResourceStatRef,
) -> Self {
@@ -68,8 +66,8 @@ impl HeartbeatTask {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
},
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
report_interval: opts.heartbeat.interval,
retry_interval: opts.heartbeat.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
resource_stat,
@@ -196,7 +194,8 @@ impl HeartbeatTask {
let report_interval = self.report_interval;
let start_time_ms = self.start_time_ms;
let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0.
// The node id will be actually calculated from its address (by hashing the address
// string) in the metasrv. So it can be set to 0 here, as a placeholder.
id: 0,
addr: self.peer_addr.clone(),
});

View File

@@ -26,7 +26,8 @@ mod region_query;
pub mod standalone;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, atomic};
use std::time::{Duration, SystemTime};
use async_stream::stream;
@@ -69,7 +70,7 @@ use query::query_engine::DescribeResult;
use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
use servers::error::{
self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, SuspendedSnafu, UnexpectedResultSnafu,
};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
@@ -119,6 +120,7 @@ pub struct Instance {
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions,
suspend: Arc<AtomicBool>,
// cache for otlp metrics
// first layer key: db-string
@@ -171,6 +173,14 @@ impl Instance {
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
self.statement_executor.procedure_executor()
}
pub fn suspend(&self) -> Arc<AtomicBool> {
self.suspend.clone()
}
pub(crate) fn is_suspended(&self) -> bool {
self.suspend.load(atomic::Ordering::Relaxed)
}
}
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
@@ -513,6 +523,10 @@ impl SqlQueryHandler for Instance {
#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if self.is_suspended() {
return vec![error::SuspendedSnafu {}.fail()];
}
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
@@ -580,6 +594,8 @@ impl SqlQueryHandler for Instance {
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
ensure!(!self.is_suspended(), error::SuspendedSnafu);
if should_capture_statement(stmt.as_ref()) {
// It's safe to unwrap here because we've already checked the type.
let stmt = stmt.unwrap();
@@ -655,6 +671,8 @@ impl SqlQueryHandler for Instance {
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Option<DescribeResult>> {
ensure!(!self.is_suspended(), error::SuspendedSnafu);
if matches!(
stmt,
Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
@@ -710,6 +728,8 @@ impl PrometheusHandler for Instance {
query: &PromQuery,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let interceptor = self
.plugins
.get::<PromQueryInterceptorRef<server_error::Error>>();
@@ -797,6 +817,8 @@ impl PrometheusHandler for Instance {
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> server_error::Result<Vec<String>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.handle_query_metric_names(matchers, ctx)
.await
.map_err(BoxedError::new)
@@ -812,6 +834,8 @@ impl PrometheusHandler for Instance {
end: SystemTime,
ctx: &QueryContextRef,
) -> server_error::Result<Vec<String>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
.await
.map_err(BoxedError::new)

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef;
@@ -87,6 +88,33 @@ impl FrontendBuilder {
}
}
#[cfg(test)]
pub(crate) fn new_test(
options: &FrontendOptions,
meta_client: meta_client::MetaClientRef,
) -> Self {
let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
common_meta::cache::LayeredCacheRegistryBuilder::default()
.add_cache_registry(cache::build_fundamental_cache_registry(kv_backend.clone()))
.build(),
);
Self::new(
options.clone(),
kv_backend,
layered_cache_registry,
catalog::memory::MemoryCatalogManager::with_default_setup(),
Arc::new(client::client_manager::NodeClients::default()),
meta_client,
Arc::new(catalog::process_manager::ProcessManager::new(
"".to_string(),
None,
)),
)
}
pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
Self {
local_cache_invalidator: Some(cache_invalidator),
@@ -242,6 +270,7 @@ impl FrontendBuilder {
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(),
suspend: Arc::new(AtomicBool::new(false)),
})
}
}

View File

@@ -49,7 +49,7 @@ use table::table_name::TableName;
use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
SubstraitDecodeLogicalPlanSnafu, SuspendedSnafu, TableNotFoundSnafu, TableOperationSnafu,
};
use crate::instance::{Instance, attach_timer};
use crate::metrics::{
@@ -61,6 +61,8 @@ impl GrpcQueryHandler for Instance {
type Error = Error;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_execute(&request, ctx.clone())?;

View File

@@ -22,13 +22,14 @@ use common_error::ext::BoxedError;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use servers::error::{
AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu,
AuthSnafu, CatalogSnafu, Error, OtherSnafu, SuspendedSnafu, TimestampOverflowSnafu,
UnexpectedResultSnafu,
};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, ensure};
use crate::instance::Instance;
@@ -39,6 +40,8 @@ impl InfluxdbLineProtocolHandler for Instance {
request: InfluxdbRequest,
ctx: QueryContextRef,
) -> servers::error::Result<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()

View File

@@ -38,7 +38,7 @@ use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_n
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, SuspendedSnafu,
TableNotFoundSnafu,
};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
@@ -49,7 +49,7 @@ use servers::otlp::trace::{
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, ensure};
use table::TableRef;
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
use table::table::adapter::DfTableProviderAdapter;
@@ -65,8 +65,7 @@ impl JaegerQueryHandler for Instance {
// It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
vec![],
vec![],
@@ -107,8 +106,7 @@ impl JaegerQueryHandler for Instance {
// ```.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
vec![
SelectExpr::from(col(SPAN_NAME_COLUMN)),
SelectExpr::from(col(SPAN_KIND_COLUMN)),
@@ -160,8 +158,7 @@ impl JaegerQueryHandler for Instance {
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
selects,
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
@@ -220,8 +217,7 @@ impl JaegerQueryHandler for Instance {
// ```.
let output = query_trace_table(
ctx.clone(),
self.catalog_manager(),
self.query_engine(),
self,
vec![wildcard()],
filters,
vec![],
@@ -285,8 +281,7 @@ impl JaegerQueryHandler for Instance {
// query all spans
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
vec![wildcard()],
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
@@ -303,8 +298,7 @@ impl JaegerQueryHandler for Instance {
#[allow(clippy::too_many_arguments)]
async fn query_trace_table(
ctx: QueryContextRef,
catalog_manager: &CatalogManagerRef,
query_engine: &QueryEngineRef,
instance: &Instance,
selects: Vec<SelectExpr>,
filters: Vec<Expr>,
sorts: Vec<SortExpr>,
@@ -312,6 +306,8 @@ async fn query_trace_table(
tags: Option<HashMap<String, JsonValue>>,
distincts: Vec<Expr>,
) -> ServerResult<Output> {
ensure!(!instance.is_suspended(), SuspendedSnafu);
let trace_table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
.unwrap_or(TRACE_TABLE_NAME);
@@ -334,7 +330,8 @@ async fn query_trace_table(
}
};
let table = catalog_manager
let table = instance
.catalog_manager()
.table(
ctx.current_catalog(),
&ctx.current_schema(),
@@ -367,7 +364,7 @@ async fn query_trace_table(
.map(|s| format!("\"{}\"", s))
.collect::<HashSet<String>>();
let df_context = create_df_context(query_engine)?;
let df_context = create_df_context(instance.query_engine())?;
let dataframe = df_context
.read_table(Arc::new(DfTableProviderAdapter::new(table)))

View File

@@ -24,12 +24,12 @@ use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu,
Result as ServerResult,
Result as ServerResult, SuspendedSnafu,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use snafu::{ResultExt, ensure};
use table::Table;
use crate::instance::Instance;
@@ -37,6 +37,8 @@ use crate::instance::Instance;
#[async_trait]
impl PipelineHandler for Instance {
async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
@@ -71,6 +73,8 @@ impl PipelineHandler for Instance {
pipeline: &str,
query_ctx: QueryContextRef,
) -> ServerResult<PipelineInfo> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.pipeline_operator
.insert_pipeline(name, content_type, pipeline, query_ctx)
.await
@@ -83,6 +87,8 @@ impl PipelineHandler for Instance {
version: PipelineVersion,
ctx: QueryContextRef,
) -> ServerResult<Option<()>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.pipeline_operator
.delete_pipeline(name, version, ctx)
.await

View File

@@ -19,11 +19,11 @@ use client::Output;
use common_error::ext::BoxedError;
use log_query::LogQuery;
use server_error::Result as ServerResult;
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu, SuspendedSnafu};
use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
use servers::query_handler::LogQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use snafu::{ResultExt, ensure};
use tonic::async_trait;
use crate::instance::Instance;
@@ -31,6 +31,8 @@ use crate::instance::Instance;
#[async_trait]
impl LogQueryHandler for Instance {
async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let interceptor = self
.plugins
.get::<LogQueryInterceptorRef<server_error::Error>>();

View File

@@ -16,7 +16,9 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu};
use servers::error::{
self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu, SuspendedSnafu,
};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
@@ -33,6 +35,8 @@ impl OpentsdbProtocolHandler for Instance {
data_points: Vec<DataPoint>,
ctx: QueryContextRef,
) -> server_error::Result<usize> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()

View File

@@ -24,13 +24,13 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult, SuspendedSnafu};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{ResultExt, ensure};
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
use crate::instance::Instance;
@@ -44,6 +44,8 @@ impl OpenTelemetryProtocolHandler for Instance {
request: ExportMetricsServiceRequest,
ctx: QueryContextRef,
) -> ServerResult<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
@@ -123,6 +125,8 @@ impl OpenTelemetryProtocolHandler for Instance {
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
@@ -170,6 +174,8 @@ impl OpenTelemetryProtocolHandler for Instance {
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Vec<Output>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()

View File

@@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, Result as ServerResult, SuspendedSnafu};
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
@@ -39,7 +39,7 @@ use servers::query_handler::{
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
@@ -165,6 +165,8 @@ impl PromStoreProtocolHandler for Instance {
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<Output> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
@@ -211,6 +213,8 @@ impl PromStoreProtocolHandler for Instance {
request: ReadRequest,
ctx: QueryContextRef,
) -> ServerResult<PromStoreResponse> {
ensure!(!self.is_suspended(), SuspendedSnafu);
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()

View File

@@ -32,7 +32,7 @@ use collect_leader_region_handler::CollectLeaderRegionHandler;
use collect_stats_handler::CollectStatsHandler;
use common_base::Plugins;
use common_meta::datanode::Stat;
use common_meta::instruction::{Instruction, InstructionReply};
use common_meta::instruction::InstructionReply;
use common_meta::sequence::Sequence;
use common_telemetry::{debug, info, warn};
use dashmap::DashMap;
@@ -114,16 +114,19 @@ pub enum HandleControl {
#[derive(Debug, Default)]
pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
mailbox_message: Option<MailboxMessage>,
pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<RegionId>,
pub region_lease: Option<RegionLease>,
}
impl HeartbeatAccumulator {
pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
// TODO(jiachun): to HeartbeatResponse payload
None
pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
self.mailbox_message.take()
}
pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
let _ = self.mailbox_message.insert(message);
}
}
@@ -351,10 +354,11 @@ impl HeartbeatHandlerGroup {
}
}
let header = std::mem::take(&mut acc.header);
let mailbox_message = acc.take_mailbox_message();
let res = HeartbeatResponse {
header,
region_lease: acc.region_lease,
..Default::default()
mailbox_message,
};
Ok(res)
}
@@ -382,7 +386,9 @@ impl HeartbeatMailbox {
/// Parses the [Instruction] from [MailboxMessage].
#[cfg(test)]
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
pub(crate) fn json_instruction(
msg: &MailboxMessage,
) -> Result<common_meta::instruction::Instruction> {
let Payload::Json(payload) =
msg.payload
.as_ref()

View File

@@ -651,6 +651,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -777,6 +783,8 @@ impl ErrorExt for Error {
HandleOtelArrowRequest { .. } => StatusCode::Internal,
Cancelled { .. } => StatusCode::Cancelled,
Suspended { .. } => StatusCode::Suspend,
}
}
@@ -857,7 +865,8 @@ pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
| StatusCode::TableUnavailable
| StatusCode::RegionBusy
| StatusCode::StorageUnavailable
| StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE,
| StatusCode::External
| StatusCode::Suspend => HttpStatusCode::SERVICE_UNAVAILABLE,
StatusCode::Internal
| StatusCode::Unexpected

View File

@@ -508,5 +508,6 @@ fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
StatusCode::Suspend => ErrorKind::ER_SERVER_SHUTDOWN,
}
}

View File

@@ -295,6 +295,10 @@ pub enum PgErrorCode {
/// operator_intervention
#[snafu(display("operator_intervention"))]
Ec57000 = 3600,
/// cannot_connect_now
#[snafu(display("cannot_connect_now"))]
Ec57P03 = 3601,
// === End of Class 57 — Operator Intervention =====
// === Begin of Class 58 — System Error (errors external to PostgreSQL itself) ===
@@ -374,6 +378,7 @@ impl From<StatusCode> for PgErrorCode {
StatusCode::Unsupported => PgErrorCode::Ec0A000,
StatusCode::InvalidArguments => PgErrorCode::Ec22023,
StatusCode::Cancelled => PgErrorCode::Ec57000,
StatusCode::Suspend => PgErrorCode::Ec57P03,
StatusCode::DeadlineExceeded => PgErrorCode::Ec57000,
StatusCode::External => PgErrorCode::Ec58000,

View File

@@ -24,6 +24,6 @@ common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
derive_builder.workspace = true
derive_more = { version = "1", default-features = false, features = ["debug"] }
derive_more.workspace = true
snafu.workspace = true
sql.workspace = true

View File

@@ -30,13 +30,11 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder,
use catalog::process_manager::ProcessManager;
use client::Client;
use client::client_manager::NodeClients;
use cmd::frontend::create_heartbeat_task;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::DatanodeId;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
@@ -44,14 +42,12 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_runtime::Builder as RuntimeBuilder;
use common_runtime::runtime::BuilderBuild;
use common_stat::ResourceStatImpl;
use common_test_util::temp_dir::create_temp_dir;
use common_time::util::DefaultSystemTimer;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::Instance as FeInstance;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
@@ -68,7 +64,6 @@ use rand::Rng;
use servers::grpc::GrpcOptions;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::ServerHandlers;
use tempfile::TempDir;
use tonic::codec::CompressionEncoding;
@@ -427,31 +422,15 @@ impl GreptimeDbClusterBuilder {
)
.build();
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(cache_registry.clone())),
]);
let fe_opts = self.build_frontend_options();
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = HeartbeatTask::new(
&fe_opts,
meta_client.clone(),
HeartbeatOptions::default(),
Arc::new(handlers_executor),
Arc::new(resource_stat),
);
let instance = FrontendBuilder::new(
fe_opts.clone(),
cached_meta_backend.clone(),
cache_registry.clone(),
catalog_manager,
datanode_clients,
meta_client,
meta_client.clone(),
Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)),
)
.with_local_cache_invalidator(cache_registry)
@@ -459,6 +438,8 @@ impl GreptimeDbClusterBuilder {
.await
.unwrap();
let heartbeat_task = create_heartbeat_task(&fe_opts, meta_client, &instance);
let instance = Arc::new(instance);
// Build the servers for the frontend.