feat: pull meta config in frontend during startup (#7727)

* chore: init impl for meta config

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use ser string for config

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add metasrv config ser trait

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use trait

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor fix

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change to vec options

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: rename mod name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add comment

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: rename and add comment

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: tmp save

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add log and error in server

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: introduce deserializer

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add blank line

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: impl config service

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fmt

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update proto commit

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-03-02 17:11:10 +08:00
committed by GitHub
parent f4b4d61651
commit 6d170a40b4
19 changed files with 341 additions and 6 deletions

7
Cargo.lock generated
View File

@@ -2655,6 +2655,7 @@ dependencies = [
"common-grpc",
"humantime-serde",
"serde",
"serde_json",
]
[[package]]
@@ -5724,7 +5725,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=19a5104851a86a27b92a5e00e69c60465c7e5d83#19a5104851a86a27b92a5e00e69c60465c7e5d83"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4010e25ea09bf0a3ef359b946376da67d0323c5d#4010e25ea09bf0a3ef359b946376da67d0323c5d"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",
@@ -7768,6 +7769,7 @@ dependencies = [
"common-grpc",
"common-macro",
"common-meta",
"common-options",
"common-telemetry",
"datatypes",
"futures",
@@ -7776,6 +7778,7 @@ dependencies = [
"meta-srv",
"rand 0.9.1",
"serde",
"serde_json",
"session",
"snafu 0.8.6",
"tokio",
@@ -9936,12 +9939,14 @@ dependencies = [
"common-base",
"common-error",
"common-meta",
"common-options",
"datanode",
"flow",
"frontend",
"meta-client",
"meta-srv",
"serde",
"serde_json",
"snafu 0.8.6",
"standalone",
]

View File

@@ -154,7 +154,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "19a5104851a86a27b92a5e00e69c60465c7e5d83" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4010e25ea09bf0a3ef359b946376da67d0323c5d" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -48,9 +48,12 @@ use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use plugins::PluginOptions;
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
use plugins::frontend::setup_frontend_dynamic_plugins;
use plugins::options::PluginOptionsDeserializerImpl;
use servers::addrs;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
@@ -370,6 +373,14 @@ impl StartCommand {
.await
.context(error::MetaClientInitSnafu)?;
let meta_config: Vec<PluginOptions> = meta_client
.pull_config(PluginOptionsDeserializerImpl)
.await
.context(error::MetaClientInitSnafu)?;
setup_frontend_dynamic_plugins(meta_config, &mut plugins)
.await
.context(error::StartFrontendSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))

View File

@@ -8,6 +8,7 @@ license.workspace = true
common-grpc.workspace = true
humantime-serde.workspace = true
serde.workspace = true
serde_json.workspace = true
[lints]
workspace = true

View File

@@ -14,3 +14,4 @@
pub mod datanode;
pub mod memory;
pub mod plugin_options;

View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use serde::de::DeserializeOwned;
/// A trait for serializing Metasrv config to a JSON string.
/// So it can be used in the metasrv's crate instead of depending on the plugins' crate.
pub trait PluginOptionsSerializer: Send + Sync {
fn serialize(&self) -> Result<String, serde_json::Error>;
}
pub type PluginOptionsSerializerRef = Arc<dyn PluginOptionsSerializer>;
/// A trait for deserializing Metasrv config from a JSON string.
pub trait PluginOptionsDeserializer<T: DeserializeOwned>: Send + Sync {
fn deserialize(&self, payload: &str) -> Result<T, serde_json::Error>;
}

View File

@@ -15,12 +15,14 @@ common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-telemetry.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod ask_leader;
mod config;
pub mod heartbeat;
mod load_balance;
mod procedure;
@@ -56,18 +57,21 @@ use common_meta::rpc::store::{
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_options::plugin_options::PluginOptionsDeserializer;
use common_telemetry::info;
use config::Client as ConfigClient;
use futures::TryStreamExt;
use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
use procedure::Client as ProcedureClient;
use serde::de::DeserializeOwned;
use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
use crate::error::{
ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
Result,
ConvertMetaConfigSnafu, ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error,
GetFlowStatSnafu, NotStartedSnafu, Result,
};
pub type Id = u64;
@@ -205,6 +209,9 @@ impl MetaClientBuilder {
HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
});
let config = self
.enable_heartbeat
.then(|| ConfigClient::new(self.id, self.role, mgr.clone()));
let store = self
.enable_store
.then(|| StoreClient::new(self.id, self.role, mgr.clone()));
@@ -233,6 +240,7 @@ impl MetaClientBuilder {
heartbeat_channel_manager,
)),
heartbeat,
config,
store,
procedure,
cluster,
@@ -247,6 +255,7 @@ pub struct MetaClient {
channel_manager: ChannelManager,
leader_provider_factory: LeaderProviderFactoryRef,
heartbeat: Option<HeartbeatClient>,
config: Option<ConfigClient>,
store: Option<StoreClient>,
procedure: Option<ProcedureClient>,
cluster: Option<ClusterClient>,
@@ -265,6 +274,7 @@ impl MetaClient {
ChannelManager::default(),
)),
heartbeat: None,
config: None,
store: None,
procedure: None,
cluster: None,
@@ -561,6 +571,11 @@ impl MetaClient {
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &self.config {
info!("Starting config client ...");
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &mut self.store {
info!("Starting store client ...");
client.start(peers.clone()).await?;
@@ -584,6 +599,18 @@ impl MetaClient {
self.heartbeat_client()?.ask_leader().await
}
pub async fn pull_config<T, U>(&self, deserializer: T) -> Result<U>
where
T: PluginOptionsDeserializer<U>,
U: DeserializeOwned,
{
let res = self.config_client()?.pull_config().await?;
let v = deserializer
.deserialize(&res.payload)
.context(ConvertMetaConfigSnafu)?;
Ok(v)
}
/// Returns a heartbeat bidirectional streaming: (sender, receiver), the
/// other end is the leader of `metasrv`.
///
@@ -709,6 +736,12 @@ impl MetaClient {
})
}
pub fn config_client(&self) -> Result<ConfigClient> {
self.config.clone().context(NotStartedSnafu {
name: "config_client",
})
}
pub fn store_client(&self) -> Result<StoreClient> {
self.store.clone().context(NotStartedSnafu {
name: "store_client",

View File

@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::config_client::ConfigClient;
use api::v1::meta::{PullConfigRequest, PullConfigResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::util;
use common_telemetry::tracing_context::TracingContext;
use snafu::{OptionExt, ResultExt, ensure};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use crate::client::{Id, LeaderProviderRef};
use crate::error;
use crate::error::{InvalidResponseHeaderSnafu, Result};
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
Self { inner }
}
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
inner.start_with(leader_provider)
}
pub async fn pull_config(&self) -> Result<PullConfigResponse> {
let inner = self.inner.read().await;
inner.ask_leader().await?;
inner.pull_config().await
}
}
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
leader_provider: Option<LeaderProviderRef>,
}
impl Inner {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self {
id,
role,
channel_manager,
leader_provider: None,
}
}
fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Config client already started"
}
);
self.leader_provider = Some(leader_provider);
Ok(())
}
async fn ask_leader(&self) -> Result<String> {
let Some(leader_provider) = self.leader_provider.as_ref() else {
return error::IllegalGrpcClientStateSnafu {
err_msg: "not started",
}
.fail();
};
leader_provider.ask_leader().await
}
async fn pull_config(&self) -> Result<PullConfigResponse> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Config client not start"
}
);
let leader_addr = self
.leader_provider
.as_ref()
.unwrap()
.leader()
.context(error::NoLeaderSnafu)?;
let mut client = self.make_client(&leader_addr)?;
let header = RequestHeader::new(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let req = PullConfigRequest {
header: Some(header),
};
let res = client
.pull_config(req)
.await
.map_err(error::Error::from)?
.into_inner();
util::check_response_header(res.header.as_ref()).context(InvalidResponseHeaderSnafu)?;
Ok(res)
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<ConfigClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(ConfigClient::new(channel)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]
fn is_started(&self) -> bool {
self.leader_provider.is_some()
}
}

View File

@@ -116,6 +116,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert meta config"))]
ConvertMetaConfig {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: serde_json::Error,
},
}
#[allow(dead_code)]
@@ -136,7 +144,8 @@ impl ErrorExt for Error {
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. }
| Error::ReadOnlyKvBackend { .. } => StatusCode::Internal,
| Error::ReadOnlyKvBackend { .. }
| Error::ConvertMetaConfig { .. } => StatusCode::Internal,
Error::MetaServer { code, .. } => *code,

View File

@@ -16,6 +16,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::config_server::ConfigServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
@@ -255,6 +256,7 @@ pub fn router(metasrv: Arc<Metasrv>) -> Router {
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ConfigServer::from_arc(metasrv.clone()));
router.add_service(admin::make_admin_service(metasrv))
}

View File

@@ -376,6 +376,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to serialize config"))]
SerializeConfig {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse number: {}", err_msg))]
ParseNum {
err_msg: String,
@@ -1143,6 +1151,7 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. }
| Error::FileIo { .. }
| Error::TcpBind { .. }
| Error::SerializeConfig { .. }
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::NoLeader { .. }

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::config_server::ConfigServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
@@ -125,6 +126,7 @@ pub async fn mock(
let router =
add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
let router = add_compressed_service!(router, ConfigServer::from_arc(service.clone()));
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await

View File

@@ -19,6 +19,7 @@ use tonic::{Response, Status};
pub mod admin;
pub mod cluster;
mod config;
mod heartbeat;
pub mod mailbox;
pub mod procedure;

View File

@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{PullConfigRequest, PullConfigResponse, ResponseHeader, config_server};
use common_options::plugin_options::PluginOptionsSerializerRef;
use common_telemetry::{info, warn};
use snafu::ResultExt;
use tonic::{Request, Response};
use crate::error::SerializeConfigSnafu;
use crate::metasrv::Metasrv;
use crate::service::GrpcResult;
#[async_trait::async_trait]
impl config_server::Config for Metasrv {
async fn pull_config(&self, req: Request<PullConfigRequest>) -> GrpcResult<PullConfigResponse> {
let payload = match self.plugins().get::<PluginOptionsSerializerRef>() {
Some(p) => p
.serialize()
.inspect_err(|e| warn!(e; "Failed to serialize plugin options"))
.context(SerializeConfigSnafu)?,
None => String::new(),
};
let res = PullConfigResponse {
header: Some(ResponseHeader::success()),
payload,
};
let member_id = req.into_inner().header.as_ref().map(|h| h.member_id);
info!("Sending meta config to member: {member_id:?}");
Ok(Response::new(res))
}
}

View File

@@ -15,11 +15,13 @@ cli.workspace = true
common-base.workspace = true
common-error.workspace = true
common-meta.workspace = true
common-options.workspace = true
datanode.workspace = true
flow.workspace = true
frontend.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
standalone.workspace = true

View File

@@ -37,6 +37,19 @@ pub async fn setup_frontend_plugins(
Ok(())
}
/// Setup dynamic plugins based on the meta config in frontend.
/// This is called after the `setup_frontend_plugins` because the meta client needs to be created first.
///
/// For those configs/plugins which are corresponding with the metasrv's config,
/// we pull from metasrv first, then create/override the current config/plugin.
/// Note: make sure the override works as expected.
pub async fn setup_frontend_dynamic_plugins(
_meta_config: Vec<PluginOptions>,
_plugins: &mut Plugins,
) -> Result<()> {
Ok(())
}
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}

View File

@@ -17,7 +17,7 @@ pub mod datanode;
pub mod flownode;
pub mod frontend;
mod meta_srv;
mod options;
pub mod options;
pub mod standalone;
pub use cli::SubCommand;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_options::plugin_options::{PluginOptionsDeserializer, PluginOptionsSerializer};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -21,3 +22,26 @@ pub struct DummyOptions;
pub enum PluginOptions {
Dummy(DummyOptions),
}
pub struct PluginOptionsList(pub Vec<PluginOptions>);
impl PluginOptionsSerializer for PluginOptionsList {
fn serialize(&self) -> Result<String, serde_json::Error> {
if self.0.is_empty() {
Ok(String::new())
} else {
serde_json::to_string(&self.0)
}
}
}
pub struct PluginOptionsDeserializerImpl;
impl PluginOptionsDeserializer<Vec<PluginOptions>> for PluginOptionsDeserializerImpl {
fn deserialize(&self, payload: &str) -> Result<Vec<PluginOptions>, serde_json::Error> {
if payload.is_empty() {
Ok(vec![])
} else {
serde_json::from_str(payload)
}
}
}