From 1629435888c633992c06ca3079d39d0792775bd3 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Tue, 9 Apr 2024 11:03:26 +0800 Subject: [PATCH] chore: unify name metasrv (#3671) chore: unify name --- .github/ISSUE_TEMPLATE/bug_report.yml | 2 +- docs/rfcs/2023-07-06-table-engine-refactor.md | 8 +- docs/rfcs/2024-01-17-dataflow-framework.md | 2 +- src/catalog/src/error.rs | 4 +- src/cmd/src/metasrv.rs | 28 +++---- src/cmd/src/options.rs | 8 +- src/common/meta/src/wal_options_allocator.rs | 14 ++-- .../kafka/topic_manager.rs | 8 +- src/common/wal/src/config.rs | 24 +++--- src/common/wal/src/config/kafka.rs | 2 +- src/common/wal/src/config/kafka/metasrv.rs | 4 +- src/meta-srv/src/bootstrap.rs | 74 +++++++++---------- src/meta-srv/src/election.rs | 2 +- src/meta-srv/src/error.rs | 2 +- src/meta-srv/src/handler/failure_handler.rs | 4 +- .../src/handler/on_leader_start_handler.rs | 2 +- .../src/handler/region_lease_handler.rs | 6 +- src/meta-srv/src/metasrv.rs | 32 ++++---- src/meta-srv/src/metasrv/builder.rs | 22 +++--- src/meta-srv/src/mocks.rs | 18 ++--- src/meta-srv/src/service/admin.rs | 26 +++---- src/meta-srv/src/service/cluster.rs | 6 +- src/meta-srv/src/service/heartbeat.rs | 12 +-- src/meta-srv/src/service/lock.rs | 4 +- src/meta-srv/src/service/procedure.rs | 4 +- src/meta-srv/src/service/store.rs | 40 +++++----- src/plugins/src/lib.rs | 2 +- src/plugins/src/meta_srv.rs | 6 +- tests-integration/src/cluster.rs | 42 +++++------ tests-integration/src/standalone.rs | 8 +- tests-integration/src/tests.rs | 2 +- tests-integration/src/tests/test_util.rs | 8 +- tests-integration/tests/region_failover.rs | 22 +++--- tests-integration/tests/region_migration.rs | 38 +++++----- 34 files changed, 242 insertions(+), 244 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 68e9108d57..d88aa4b664 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -39,7 +39,7 @@ body: - Query Engine - Table Engine - Write Protocols - - MetaSrv + - Metasrv - Frontend - Datanode - Other diff --git a/docs/rfcs/2023-07-06-table-engine-refactor.md b/docs/rfcs/2023-07-06-table-engine-refactor.md index 2477a5e2e9..33df3eed08 100644 --- a/docs/rfcs/2023-07-06-table-engine-refactor.md +++ b/docs/rfcs/2023-07-06-table-engine-refactor.md @@ -27,8 +27,8 @@ subgraph Frontend["Frontend"] end end -MyTable --> MetaSrv -MetaSrv --> ETCD +MyTable --> Metasrv +Metasrv --> ETCD MyTable-->TableEngine0 MyTable-->TableEngine1 @@ -95,8 +95,8 @@ subgraph Frontend["Frontend"] end end -MyTable --> MetaSrv -MetaSrv --> ETCD +MyTable --> Metasrv +Metasrv --> ETCD MyTable-->RegionEngine MyTable-->RegionEngine1 diff --git a/docs/rfcs/2024-01-17-dataflow-framework.md b/docs/rfcs/2024-01-17-dataflow-framework.md index 3d62deba42..46da2175e1 100644 --- a/docs/rfcs/2024-01-17-dataflow-framework.md +++ b/docs/rfcs/2024-01-17-dataflow-framework.md @@ -36,7 +36,7 @@ Hence, we choose the third option, and use a simple logical plan that's anagonis ## Deploy mode and protocol - Greptime Flow is an independent streaming compute component. It can be used either within a standalone node or as a dedicated node at the same level as frontend in distributed mode. - It accepts insert request Rows, which is used between frontend and datanode. -- New flow job is submitted in the format of modified SQL query like snowflake do, like: `CREATE TASK avg_over_5m WINDOW_SIZE = "5m" AS SELECT avg(value) FROM table WHERE time > now() - 5m GROUP BY time(1m)`. Flow job then got stored in MetaSrv. +- New flow job is submitted in the format of modified SQL query like snowflake do, like: `CREATE TASK avg_over_5m WINDOW_SIZE = "5m" AS SELECT avg(value) FROM table WHERE time > now() - 5m GROUP BY time(1m)`. Flow job then got stored in Metasrv. - It also persists results in the format of Rows to frontend. - The query plan uses Substrait as codec format. It's the same with GreptimeDB's query engine. - Greptime Flow needs a WAL for recovering. It's possible to reuse datanode's. diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 3e8f204ed3..8391dab045 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -216,7 +216,7 @@ pub enum Error { }, #[snafu(display("Failed to perform metasrv operation"))] - MetaSrv { + Metasrv { location: Location, source: meta_client::error::Error, }, @@ -304,7 +304,7 @@ impl ErrorExt for Error { | Error::CreateTable { source, .. } | Error::TableSchemaMismatch { source, .. } => source.status_code(), - Error::MetaSrv { source, .. } => source.status_code(), + Error::Metasrv { source, .. } => source.status_code(), Error::SystemCatalogTableScan { source, .. } => source.status_code(), Error::SystemCatalogTableScanExec { source, .. } => source.status_code(), Error::InvalidTableInfoInCatalog { source, .. } => source.status_code(), diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index c7affc93c6..29b0f517de 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -17,8 +17,8 @@ use std::time::Duration; use async_trait::async_trait; use clap::Parser; use common_telemetry::logging; -use meta_srv::bootstrap::MetaSrvInstance; -use meta_srv::metasrv::MetaSrvOptions; +use meta_srv::bootstrap::MetasrvInstance; +use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; use crate::error::{self, Result, StartMetaServerSnafu}; @@ -26,11 +26,11 @@ use crate::options::{CliOptions, Options}; use crate::App; pub struct Instance { - instance: MetaSrvInstance, + instance: MetasrvInstance, } impl Instance { - fn new(instance: MetaSrvInstance) -> Self { + fn new(instance: MetasrvInstance) -> Self { Self { instance } } } @@ -42,7 +42,7 @@ impl App for Instance { } async fn start(&mut self) -> Result<()> { - plugins::start_meta_srv_plugins(self.instance.plugins()) + plugins::start_metasrv_plugins(self.instance.plugins()) .await .context(StartMetaServerSnafu)?; @@ -64,7 +64,7 @@ pub struct Command { } impl Command { - pub async fn build(self, opts: MetaSrvOptions) -> Result { + pub async fn build(self, opts: MetasrvOptions) -> Result { self.subcmd.build(opts).await } @@ -79,7 +79,7 @@ enum SubCommand { } impl SubCommand { - async fn build(self, opts: MetaSrvOptions) -> Result { + async fn build(self, opts: MetasrvOptions) -> Result { match self { SubCommand::Start(cmd) => cmd.build(opts).await, } @@ -127,10 +127,10 @@ struct StartCommand { impl StartCommand { fn load_options(&self, cli_options: &CliOptions) -> Result { - let mut opts: MetaSrvOptions = Options::load_layered_options( + let mut opts: MetasrvOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), - MetaSrvOptions::env_list_keys(), + MetasrvOptions::env_list_keys(), )?; if let Some(dir) = &cli_options.log_dir { @@ -193,20 +193,20 @@ impl StartCommand { Ok(Options::Metasrv(Box::new(opts))) } - async fn build(self, mut opts: MetaSrvOptions) -> Result { - let plugins = plugins::setup_meta_srv_plugins(&mut opts) + async fn build(self, mut opts: MetasrvOptions) -> Result { + let plugins = plugins::setup_metasrv_plugins(&mut opts) .await .context(StartMetaServerSnafu)?; - logging::info!("MetaSrv start command: {:#?}", self); - logging::info!("MetaSrv options: {:#?}", opts); + logging::info!("Metasrv start command: {:#?}", self); + logging::info!("Metasrv options: {:#?}", opts); let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None) .await .context(error::BuildMetaServerSnafu)?; let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?; - let instance = MetaSrvInstance::new(opts, plugins, metasrv) + let instance = MetasrvInstance::new(opts, plugins, metasrv) .await .context(error::BuildMetaServerSnafu)?; diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9bc652d44a..8fd974b939 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -15,12 +15,12 @@ use clap::ArgMatches; use common_config::KvBackendConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; -use common_wal::config::MetaSrvWalConfig; +use common_wal::config::MetasrvWalConfig; use config::{Config, Environment, File, FileFormat}; use datanode::config::{DatanodeOptions, ProcedureConfig}; use frontend::error::{Result as FeResult, TomlFormatSnafu}; use frontend::frontend::{FrontendOptions, TomlSerializable}; -use meta_srv::metasrv::MetaSrvOptions; +use meta_srv::metasrv::MetasrvOptions; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -38,7 +38,7 @@ pub struct MixOptions { pub frontend: FrontendOptions, pub datanode: DatanodeOptions, pub logging: LoggingOptions, - pub wal_meta: MetaSrvWalConfig, + pub wal_meta: MetasrvWalConfig, } impl From for FrontendOptions { @@ -56,7 +56,7 @@ impl TomlSerializable for MixOptions { pub enum Options { Datanode(Box), Frontend(Box), - Metasrv(Box), + Metasrv(Box), Standalone(Box), Cli(Box), } diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 8116773415..202b2958ba 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -17,7 +17,7 @@ pub mod kafka; use std::collections::HashMap; use std::sync::Arc; -use common_wal::config::MetaSrvWalConfig; +use common_wal::config::MetasrvWalConfig; use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use snafu::ResultExt; use store_api::storage::{RegionId, RegionNumber}; @@ -39,10 +39,10 @@ pub type WalOptionsAllocatorRef = Arc; impl WalOptionsAllocator { /// Creates a WalOptionsAllocator. - pub fn new(config: MetaSrvWalConfig, kv_backend: KvBackendRef) -> Self { + pub fn new(config: MetasrvWalConfig, kv_backend: KvBackendRef) -> Self { match config { - MetaSrvWalConfig::RaftEngine => Self::RaftEngine, - MetaSrvWalConfig::Kafka(kafka_config) => { + MetasrvWalConfig::RaftEngine => Self::RaftEngine, + MetasrvWalConfig::Kafka(kafka_config) => { Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend)) } } @@ -118,7 +118,7 @@ pub fn prepare_wal_options( #[cfg(test)] mod tests { - use common_wal::config::kafka::MetaSrvKafkaConfig; + use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::test_util::run_test_with_kafka_wal; use super::*; @@ -129,7 +129,7 @@ mod tests { #[tokio::test] async fn test_allocator_with_raft_engine() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let wal_config = MetaSrvWalConfig::RaftEngine; + let wal_config = MetasrvWalConfig::RaftEngine; let allocator = WalOptionsAllocator::new(wal_config, kv_backend); allocator.start().await.unwrap(); @@ -155,7 +155,7 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetaSrvKafkaConfig { + let config = MetasrvKafkaConfig { replication_factor: broker_endpoints.len() as i16, broker_endpoints, ..Default::default() diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index d6ee3e7746..ea2f89554b 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use common_telemetry::{error, info}; -use common_wal::config::kafka::MetaSrvKafkaConfig; +use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::TopicSelectorType; use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; @@ -46,7 +46,7 @@ const DEFAULT_PARTITION: i32 = 0; /// Manages topic initialization and selection. pub struct TopicManager { - config: MetaSrvKafkaConfig, + config: MetasrvKafkaConfig, pub(crate) topic_pool: Vec, pub(crate) topic_selector: TopicSelectorRef, kv_backend: KvBackendRef, @@ -54,7 +54,7 @@ pub struct TopicManager { impl TopicManager { /// Creates a new topic manager. - pub fn new(config: MetaSrvKafkaConfig, kv_backend: KvBackendRef) -> Self { + pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. let topics = (0..config.num_topics) .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) @@ -283,7 +283,7 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetaSrvKafkaConfig { + let config = MetasrvKafkaConfig { replication_factor: broker_endpoints.len() as i16, broker_endpoints, ..Default::default() diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index a51335c199..6c4993b835 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -17,16 +17,16 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; -use crate::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig}; +use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; use crate::config::raft_engine::RaftEngineConfig; /// Wal configurations for metasrv. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] #[serde(tag = "provider", rename_all = "snake_case")] -pub enum MetaSrvWalConfig { +pub enum MetasrvWalConfig { #[default] RaftEngine, - Kafka(MetaSrvKafkaConfig), + Kafka(MetasrvKafkaConfig), } /// Wal configurations for datanode. @@ -57,11 +57,11 @@ impl Default for StandaloneWalConfig { } } -impl From for MetaSrvWalConfig { +impl From for MetasrvWalConfig { fn from(config: StandaloneWalConfig) -> Self { match config { StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine, - StandaloneWalConfig::Kafka(config) => Self::Kafka(MetaSrvKafkaConfig { + StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { broker_endpoints: config.broker_endpoints, num_topics: config.num_topics, selector_type: config.selector_type, @@ -100,7 +100,7 @@ mod tests { use super::*; use crate::config::kafka::common::BackoffConfig; - use crate::config::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig}; + use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; use crate::TopicSelectorType; #[test] @@ -109,8 +109,8 @@ mod tests { let toml_str = r#" provider = "raft_engine" "#; - let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); - assert_eq!(metasrv_wal_config, MetaSrvWalConfig::RaftEngine); + let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine); let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); assert_eq!( @@ -166,9 +166,9 @@ mod tests { backoff_deadline = "5mins" "#; - // Deserialized to MetaSrvWalConfig. - let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); - let expected = MetaSrvKafkaConfig { + // Deserialized to MetasrvWalConfig. + let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap(); + let expected = MetasrvKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], num_topics: 32, selector_type: TopicSelectorType::RoundRobin, @@ -183,7 +183,7 @@ mod tests { deadline: Some(Duration::from_secs(60 * 5)), }, }; - assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected)); + assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); // Deserialized to DatanodeWalConfig. let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); diff --git a/src/common/wal/src/config/kafka.rs b/src/common/wal/src/config/kafka.rs index 586d2182a0..f47e444521 100644 --- a/src/common/wal/src/config/kafka.rs +++ b/src/common/wal/src/config/kafka.rs @@ -18,5 +18,5 @@ pub mod metasrv; pub mod standalone; pub use datanode::DatanodeKafkaConfig; -pub use metasrv::MetaSrvKafkaConfig; +pub use metasrv::MetasrvKafkaConfig; pub use standalone::StandaloneKafkaConfig; diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index a8989275f4..99efe762fb 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -22,7 +22,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; /// Kafka wal configurations for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] -pub struct MetaSrvKafkaConfig { +pub struct MetasrvKafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, /// The number of topics to be created upon start. @@ -43,7 +43,7 @@ pub struct MetaSrvKafkaConfig { pub backoff: BackoffConfig, } -impl Default for MetaSrvKafkaConfig { +impl Default for MetasrvKafkaConfig { fn default() -> Self { let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; let replication_factor = broker_endpoints.len() as i16; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 858b62a8d2..e6cf40bb4d 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -41,8 +41,8 @@ use crate::election::etcd::EtcdElection; use crate::error::InitExportMetricsTaskSnafu; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; -use crate::metasrv::builder::MetaSrvBuilder; -use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; +use crate::metasrv::builder::MetasrvBuilder; +use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::SelectorType; @@ -50,12 +50,12 @@ use crate::service::admin; use crate::{error, Result}; #[derive(Clone)] -pub struct MetaSrvInstance { - meta_srv: MetaSrv, +pub struct MetasrvInstance { + metasrv: Metasrv, - http_srv: Arc, + httpsrv: Arc, - opts: MetaSrvOptions, + opts: MetasrvOptions, signal_sender: Option>, @@ -64,25 +64,25 @@ pub struct MetaSrvInstance { export_metrics_task: Option, } -impl MetaSrvInstance { +impl MetasrvInstance { pub async fn new( - opts: MetaSrvOptions, + opts: MetasrvOptions, plugins: Plugins, - meta_srv: MetaSrv, - ) -> Result { - let http_srv = Arc::new( + metasrv: Metasrv, + ) -> Result { + let httpsrv = Arc::new( HttpServerBuilder::new(opts.http.clone()) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(opts.to_toml_string()) .build(), ); - // put meta_srv into plugins for later use - plugins.insert::>(Arc::new(meta_srv.clone())); + // put metasrv into plugins for later use + plugins.insert::>(Arc::new(metasrv.clone())); let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) .context(InitExportMetricsTaskSnafu)?; - Ok(MetaSrvInstance { - meta_srv, - http_srv, + Ok(MetasrvInstance { + metasrv, + httpsrv, opts, signal_sender: None, plugins, @@ -91,7 +91,7 @@ impl MetaSrvInstance { } pub async fn start(&mut self) -> Result<()> { - self.meta_srv.try_start().await?; + self.metasrv.try_start().await?; if let Some(t) = self.export_metrics_task.as_ref() { t.start(None).context(InitExportMetricsTaskSnafu)? @@ -101,23 +101,23 @@ impl MetaSrvInstance { self.signal_sender = Some(tx); - let mut router = router(self.meta_srv.clone()); - if let Some(configurator) = self.meta_srv.plugins().get::() { + let mut router = router(self.metasrv.clone()); + if let Some(configurator) = self.metasrv.plugins().get::() { router = configurator.config_grpc(router); } - let meta_srv = bootstrap_meta_srv_with_router(&self.opts.bind_addr, router, rx); + let metasrv = bootstrap_metasrv_with_router(&self.opts.bind_addr, router, rx); let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu { addr: &self.opts.http.addr, })?; let http_srv = async { - self.http_srv + self.httpsrv .start(addr) .await .map(|_| ()) .context(error::StartHttpSnafu) }; - future::try_join(meta_srv, http_srv).await?; + future::try_join(metasrv, http_srv).await?; Ok(()) } @@ -128,12 +128,12 @@ impl MetaSrvInstance { .await .context(error::SendShutdownSignalSnafu)?; } - self.meta_srv.shutdown().await?; - self.http_srv + self.metasrv.shutdown().await?; + self.httpsrv .shutdown() .await .context(error::ShutdownServerSnafu { - server: self.http_srv.name(), + server: self.httpsrv.name(), })?; Ok(()) } @@ -143,7 +143,7 @@ impl MetaSrvInstance { } } -pub async fn bootstrap_meta_srv_with_router( +pub async fn bootstrap_metasrv_with_router( bind_addr: &str, router: Router, mut signal: Receiver<()>, @@ -167,22 +167,22 @@ pub async fn bootstrap_meta_srv_with_router( Ok(()) } -pub fn router(meta_srv: MetaSrv) -> Router { +pub fn router(metasrv: Metasrv) -> Router { tonic::transport::Server::builder() .accept_http1(true) // for admin services - .add_service(HeartbeatServer::new(meta_srv.clone())) - .add_service(StoreServer::new(meta_srv.clone())) - .add_service(ClusterServer::new(meta_srv.clone())) - .add_service(LockServer::new(meta_srv.clone())) - .add_service(ProcedureServiceServer::new(meta_srv.clone())) - .add_service(admin::make_admin_service(meta_srv)) + .add_service(HeartbeatServer::new(metasrv.clone())) + .add_service(StoreServer::new(metasrv.clone())) + .add_service(ClusterServer::new(metasrv.clone())) + .add_service(LockServer::new(metasrv.clone())) + .add_service(ProcedureServiceServer::new(metasrv.clone())) + .add_service(admin::make_admin_service(metasrv)) } pub async fn metasrv_builder( - opts: &MetaSrvOptions, + opts: &MetasrvOptions, plugins: Plugins, kv_backend: Option, -) -> Result { +) -> Result { let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) { (Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)), (None, true) => ( @@ -229,7 +229,7 @@ pub async fn metasrv_builder( SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, }; - Ok(MetaSrvBuilder::new() + Ok(MetasrvBuilder::new() .options(opts.clone()) .kv_backend(kv_backend) .in_memory(in_memory) @@ -239,7 +239,7 @@ pub async fn metasrv_builder( .plugins(plugins)) } -async fn create_etcd_client(opts: &MetaSrvOptions) -> Result { +async fn create_etcd_client(opts: &MetasrvOptions) -> Result { let etcd_endpoints = opts .store_addr .split(',') diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index cdd434068c..fb5c9296e2 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -22,7 +22,7 @@ use tokio::sync::broadcast::Receiver; use crate::error::Result; -pub const ELECTION_KEY: &str = "__meta_srv_election"; +pub const ELECTION_KEY: &str = "__metasrv_election"; #[derive(Debug, Clone)] pub enum LeaderChangeMessage { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 5db066c34d..c643528ddd 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -363,7 +363,7 @@ pub enum Error { location: Location, }, - #[snafu(display("MetaSrv has no leader at this moment"))] + #[snafu(display("Metasrv has no leader at this moment"))] NoLeader { location: Location }, #[snafu(display("Table {} not found", name))] diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 9748737fae..004d32e4a2 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -115,7 +115,7 @@ mod tests { use super::*; use crate::handler::node_stat::{RegionStat, Stat}; - use crate::metasrv::builder::MetaSrvBuilder; + use crate::metasrv::builder::MetasrvBuilder; use crate::test_util::create_region_failover_manager; #[tokio::test(flavor = "multi_thread")] @@ -129,7 +129,7 @@ mod tests { let req = &HeartbeatRequest::default(); - let builder = MetaSrvBuilder::new(); + let builder = MetasrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let mut ctx = metasrv.new_ctx(); ctx.is_infancy = false; diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index 58751833d1..dccb8d3d60 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -38,7 +38,7 @@ impl HeartbeatHandler for OnLeaderStartHandler { if election.in_infancy() { ctx.is_infancy = true; - // TODO(weny): Unifies the multiple leader state between Context and MetaSrv. + // TODO(weny): Unifies the multiple leader state between Context and Metasrv. // we can't ensure the in-memory kv has already been reset in the outside loop. // We still use heartbeat requests to trigger resetting in-memory kv. ctx.reset_in_memory(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 0a8f917bf6..83d190ef8a 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -116,7 +116,7 @@ mod test { use super::*; use crate::handler::node_stat::{RegionStat, Stat}; - use crate::metasrv::builder::MetaSrvBuilder; + use crate::metasrv::builder::MetasrvBuilder; fn new_test_keeper() -> RegionLeaseKeeper { let store = Arc::new(MemoryKvBackend::new()); @@ -170,7 +170,7 @@ mod test { .await .unwrap(); - let builder = MetaSrvBuilder::new(); + let builder = MetasrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let ctx = &mut metasrv.new_ctx(); @@ -317,7 +317,7 @@ mod test { .await .unwrap(); - let builder = MetaSrvBuilder::new(); + let builder = MetasrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let ctx = &mut metasrv.new_ctx(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index fa76903694..4b84fcb9e3 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -33,7 +33,7 @@ use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; -use common_wal::config::MetaSrvWalConfig; +use common_wal::config::MetasrvWalConfig; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; @@ -63,7 +63,7 @@ pub const METASRV_HOME: &str = "/tmp/metasrv"; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] -pub struct MetaSrvOptions { +pub struct MetasrvOptions { pub bind_addr: String, pub server_addr: String, pub store_addr: String, @@ -77,7 +77,7 @@ pub struct MetaSrvOptions { pub datanode: DatanodeOptions, pub enable_telemetry: bool, pub data_home: String, - pub wal: MetaSrvWalConfig, + pub wal: MetasrvWalConfig, pub export_metrics: ExportMetricsOption, pub store_key_prefix: String, /// The max operations per txn @@ -93,13 +93,13 @@ pub struct MetaSrvOptions { pub max_txn_ops: usize, } -impl MetaSrvOptions { +impl MetasrvOptions { pub fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["wal.broker_endpoints"]) } } -impl Default for MetaSrvOptions { +impl Default for MetasrvOptions { fn default() -> Self { Self { bind_addr: "127.0.0.1:3002".to_string(), @@ -124,7 +124,7 @@ impl Default for MetaSrvOptions { datanode: DatanodeOptions::default(), enable_telemetry: true, data_home: METASRV_HOME.to_string(), - wal: MetaSrvWalConfig::default(), + wal: MetasrvWalConfig::default(), export_metrics: ExportMetricsOption::default(), store_key_prefix: String::new(), max_txn_ops: 128, @@ -132,7 +132,7 @@ impl Default for MetaSrvOptions { } } -impl MetaSrvOptions { +impl MetasrvOptions { pub fn to_toml_string(&self) -> String { toml::to_string(&self).unwrap() } @@ -253,10 +253,10 @@ impl MetaStateHandler { } #[derive(Clone)] -pub struct MetaSrv { +pub struct Metasrv { state: StateRef, started: Arc, - options: MetaSrvOptions, + options: MetasrvOptions, // It is only valid at the leader node and is used to temporarily // store some data that will not be persisted. in_memory: ResettableKvBackendRef, @@ -279,14 +279,14 @@ pub struct MetaSrv { plugins: Plugins, } -impl MetaSrv { +impl Metasrv { pub async fn try_start(&self) -> Result<()> { if self .started .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) .is_err() { - warn!("MetaSrv already started"); + warn!("Metasrv already started"); return Ok(()); } @@ -347,11 +347,11 @@ impl MetaSrv { while started.load(Ordering::Relaxed) { let res = election.campaign().await; if let Err(e) = res { - warn!("MetaSrv election error: {}", e); + warn!("Metasrv election error: {}", e); } - info!("MetaSrv re-initiate election"); + info!("Metasrv re-initiate election"); } - info!("MetaSrv stopped"); + info!("Metasrv stopped"); }); } else { if let Err(e) = self.wal_options_allocator.start().await { @@ -368,7 +368,7 @@ impl MetaSrv { .context(StartProcedureManagerSnafu)?; } - info!("MetaSrv started"); + info!("Metasrv started"); Ok(()) } @@ -403,7 +403,7 @@ impl MetaSrv { } #[inline] - pub fn options(&self) -> &MetaSrvOptions { + pub fn options(&self) -> &MetasrvOptions { &self.options } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index d406589599..003d8ba5c6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -57,7 +57,7 @@ use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers}; use crate::lock::memory::MemLock; use crate::lock::DistLockRef; use crate::metasrv::{ - ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, + ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::region_migration::manager::RegionMigrationManager; @@ -70,8 +70,8 @@ use crate::state::State; use crate::table_meta_alloc::MetasrvPeerAllocator; // TODO(fys): try use derive_builder macro -pub struct MetaSrvBuilder { - options: Option, +pub struct MetasrvBuilder { + options: Option, kv_backend: Option, in_memory: Option, selector: Option, @@ -84,7 +84,7 @@ pub struct MetaSrvBuilder { table_metadata_allocator: Option, } -impl MetaSrvBuilder { +impl MetasrvBuilder { pub fn new() -> Self { Self { kv_backend: None, @@ -101,7 +101,7 @@ impl MetaSrvBuilder { } } - pub fn options(mut self, options: MetaSrvOptions) -> Self { + pub fn options(mut self, options: MetasrvOptions) -> Self { self.options = Some(options); self } @@ -159,10 +159,10 @@ impl MetaSrvBuilder { self } - pub async fn build(self) -> Result { + pub async fn build(self) -> Result { let started = Arc::new(AtomicBool::new(false)); - let MetaSrvBuilder { + let MetasrvBuilder { election, meta_peer_client, options, @@ -320,7 +320,7 @@ impl MetaSrvBuilder { let enable_telemetry = options.enable_telemetry; let metasrv_home = options.data_home.to_string(); - Ok(MetaSrv { + Ok(Metasrv { state, started, options, @@ -373,7 +373,7 @@ fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef { } fn build_procedure_manager( - options: &MetaSrvOptions, + options: &MetasrvOptions, kv_backend: &KvBackendRef, ) -> ProcedureManagerRef { let manager_config = ManagerConfig { @@ -391,7 +391,7 @@ fn build_procedure_manager( } fn build_ddl_manager( - options: &MetaSrvOptions, + options: &MetasrvOptions, datanode_clients: Option, procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, @@ -431,7 +431,7 @@ fn build_ddl_manager( )) } -impl Default for MetaSrvBuilder { +impl Default for MetasrvBuilder { fn default() -> Self { Self::new() } diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 2228a2dc5a..53e22ce6d4 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -26,14 +26,14 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use tower::service_fn; -use crate::metasrv::builder::MetaSrvBuilder; -use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; +use crate::metasrv::builder::MetasrvBuilder; +use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; #[derive(Clone)] pub struct MockInfo { pub server_addr: String, pub channel_manager: ChannelManager, - pub meta_srv: MetaSrv, + pub metasrv: Metasrv, } pub async fn mock_with_memstore() -> MockInfo { @@ -52,7 +52,7 @@ pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo } pub async fn mock( - opts: MetaSrvOptions, + opts: MetasrvOptions, kv_backend: KvBackendRef, selector: Option, datanode_clients: Option>, @@ -62,7 +62,7 @@ pub async fn mock( table_metadata_manager.init().await.unwrap(); - let builder = MetaSrvBuilder::new().options(opts).kv_backend(kv_backend); + let builder = MetasrvBuilder::new().options(opts).kv_backend(kv_backend); let builder = match selector { Some(s) => builder.selector(s), @@ -74,11 +74,11 @@ pub async fn mock( None => builder, }; - let meta_srv = builder.build().await.unwrap(); - meta_srv.try_start().await.unwrap(); + let metasrv = builder.build().await.unwrap(); + metasrv.try_start().await.unwrap(); let (client, server) = tokio::io::duplex(1024); - let service = meta_srv.clone(); + let service = metasrv.clone(); let _handle = tokio::spawn(async move { tonic::transport::Server::builder() .add_service(HeartbeatServer::new(service.clone())) @@ -119,6 +119,6 @@ pub async fn mock( MockInfo { server_addr, channel_manager, - meta_srv, + metasrv, } } diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index bf376b1c80..6e38fc22fc 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -31,20 +31,20 @@ use tonic::body::BoxBody; use tonic::codegen::{empty_body, http, BoxFuture, Service}; use tonic::transport::NamedService; -use crate::metasrv::MetaSrv; +use crate::metasrv::Metasrv; -pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { +pub fn make_admin_service(metasrv: Metasrv) -> Admin { let router = Router::new().route("/health", health::HealthHandler); let router = router.route( "/node-lease", node_lease::NodeLeaseHandler { - meta_peer_client: meta_srv.meta_peer_client().clone(), + meta_peer_client: metasrv.meta_peer_client().clone(), }, ); let handler = heartbeat::HeartBeatHandler { - meta_peer_client: meta_srv.meta_peer_client().clone(), + meta_peer_client: metasrv.meta_peer_client().clone(), }; let router = router .route("/heartbeat", handler.clone()) @@ -53,26 +53,26 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = router.route( "/catalogs", meta::CatalogsHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), + table_metadata_manager: metasrv.table_metadata_manager().clone(), }, ); let handler = meta::SchemasHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), + table_metadata_manager: metasrv.table_metadata_manager().clone(), }; let router = router .route("/schemas", handler.clone()) .route("/schemas/help", handler); let handler = meta::TablesHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), + table_metadata_manager: metasrv.table_metadata_manager().clone(), }; let router = router .route("/tables", handler.clone()) .route("/tables/help", handler); let handler = meta::TableHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), + table_metadata_manager: metasrv.table_metadata_manager().clone(), }; let router = router .route("/table", handler.clone()) @@ -81,27 +81,27 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = router.route( "/leader", leader::LeaderHandler { - election: meta_srv.election().cloned(), + election: metasrv.election().cloned(), }, ); let handler = route::RouteHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), + table_metadata_manager: metasrv.table_metadata_manager().clone(), }; let router = router .route("/route", handler.clone()) .route("/route/help", handler); let handler = region_migration::SubmitRegionMigrationTaskHandler { - region_migration_manager: meta_srv.region_migration_manager().clone(), - meta_peer_client: meta_srv.meta_peer_client().clone(), + region_migration_manager: metasrv.region_migration_manager().clone(), + meta_peer_client: metasrv.meta_peer_client().clone(), }; let router = router.route("/region-migration", handler); let router = router.route( "/maintenance", maintenance::MaintenanceHandler { - kv_backend: meta_srv.kv_backend().clone(), + kv_backend: metasrv.kv_backend().clone(), }, ); let router = Router::nest("/admin", router); diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index 5b8c38aaca..049b34269d 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -21,11 +21,11 @@ use snafu::ResultExt; use tonic::{Request, Response}; use crate::error; -use crate::metasrv::MetaSrv; +use crate::metasrv::Metasrv; use crate::service::GrpcResult; #[async_trait::async_trait] -impl cluster_server::Cluster for MetaSrv { +impl cluster_server::Cluster for Metasrv { async fn batch_get(&self, req: Request) -> GrpcResult { if !self.is_leader() { let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader()); @@ -73,7 +73,7 @@ impl cluster_server::Cluster for MetaSrv { } } -impl MetaSrv { +impl Metasrv { pub fn is_leader(&self) -> bool { self.election().map(|x| x.is_leader()).unwrap_or(false) } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 598edf2ca7..542793b128 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -30,11 +30,11 @@ use tonic::{Request, Response, Streaming}; use crate::error; use crate::error::Result; use crate::handler::Pusher; -use crate::metasrv::{Context, MetaSrv}; +use crate::metasrv::{Context, Metasrv}; use crate::service::{GrpcResult, GrpcStream}; #[async_trait::async_trait] -impl heartbeat_server::Heartbeat for MetaSrv { +impl heartbeat_server::Heartbeat for Metasrv { type HeartbeatStream = GrpcStream; async fn heartbeat( @@ -179,13 +179,13 @@ mod tests { use tonic::IntoRequest; use super::get_node_id; - use crate::metasrv::builder::MetaSrvBuilder; + use crate::metasrv::builder::MetasrvBuilder; #[tokio::test] async fn test_ask_leader() { let kv_backend = Arc::new(MemoryKvBackend::new()); - let meta_srv = MetaSrvBuilder::new() + let metasrv = MetasrvBuilder::new() .kv_backend(kv_backend) .build() .await @@ -195,10 +195,10 @@ mod tests { header: Some(RequestHeader::new((1, 1), Role::Datanode, W3cTrace::new())), }; - let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); + let res = metasrv.ask_leader(req.into_request()).await.unwrap(); let res = res.into_inner(); assert_eq!(1, res.header.unwrap().cluster_id); - assert_eq!(meta_srv.options().bind_addr, res.leader.unwrap().addr); + assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr); } #[test] diff --git a/src/meta-srv/src/service/lock.rs b/src/meta-srv/src/service/lock.rs index 81f218027e..4334bdfc37 100644 --- a/src/meta-srv/src/service/lock.rs +++ b/src/meta-srv/src/service/lock.rs @@ -17,10 +17,10 @@ use tonic::{Request, Response}; use super::GrpcResult; use crate::lock::Opts; -use crate::metasrv::MetaSrv; +use crate::metasrv::Metasrv; #[async_trait::async_trait] -impl lock_server::Lock for MetaSrv { +impl lock_server::Lock for Metasrv { async fn lock(&self, request: Request) -> GrpcResult { let LockRequest { name, expire_secs, .. diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index a45e538a36..0a410875fa 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -27,11 +27,11 @@ use tonic::{Request, Response}; use super::GrpcResult; use crate::error; -use crate::metasrv::MetaSrv; +use crate::metasrv::Metasrv; use crate::procedure::region_migration::manager::RegionMigrationProcedureTask; #[async_trait::async_trait] -impl procedure_service_server::ProcedureService for MetaSrv { +impl procedure_service_server::ProcedureService for Metasrv { async fn query( &self, request: Request, diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 1a291ada1f..f9a970bca7 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -32,12 +32,12 @@ use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; use crate::error::{self, MissingRequestHeaderSnafu}; -use crate::metasrv::MetaSrv; +use crate::metasrv::Metasrv; use crate::metrics::METRIC_META_KV_REQUEST_ELAPSED; use crate::service::GrpcResult; #[async_trait::async_trait] -impl store_server::Store for MetaSrv { +impl store_server::Store for Metasrv { async fn range(&self, req: Request) -> GrpcResult { let req = req.into_inner(); @@ -260,11 +260,11 @@ mod tests { use common_telemetry::tracing_context::W3cTrace; use tonic::IntoRequest; - use crate::metasrv::builder::MetaSrvBuilder; - use crate::metasrv::MetaSrv; + use crate::metasrv::builder::MetasrvBuilder; + use crate::metasrv::Metasrv; - async fn new_meta_srv() -> MetaSrv { - MetaSrvBuilder::new() + async fn new_metasrv() -> Metasrv { + MetasrvBuilder::new() .kv_backend(Arc::new(MemoryKvBackend::new())) .build() .await @@ -273,77 +273,77 @@ mod tests { #[tokio::test] async fn test_range() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = RangeRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.range(req.into_request()).await; + let res = metasrv.range(req.into_request()).await; let _ = res.unwrap(); } #[tokio::test] async fn test_put() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = PutRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.put(req.into_request()).await; + let res = metasrv.put(req.into_request()).await; let _ = res.unwrap(); } #[tokio::test] async fn test_batch_get() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = BatchGetRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.batch_get(req.into_request()).await; + let res = metasrv.batch_get(req.into_request()).await; let _ = res.unwrap(); } #[tokio::test] async fn test_batch_put() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = BatchPutRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.batch_put(req.into_request()).await; + let res = metasrv.batch_put(req.into_request()).await; let _ = res.unwrap(); } #[tokio::test] async fn test_batch_delete() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = BatchDeleteRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.batch_delete(req.into_request()).await; + let res = metasrv.batch_delete(req.into_request()).await; let _ = res.unwrap(); } #[tokio::test] async fn test_compare_and_put() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = CompareAndPutRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.compare_and_put(req.into_request()).await; + let res = metasrv.compare_and_put(req.into_request()).await; let _ = res.unwrap(); } #[tokio::test] async fn test_delete_range() { - let meta_srv = new_meta_srv().await; + let metasrv = new_metasrv().await; let mut req = DeleteRangeRequest::default(); req.set_header((1, 1), Role::Datanode, W3cTrace::new()); - let res = meta_srv.delete_range(req.into_request()).await; + let res = metasrv.delete_range(req.into_request()).await; let _ = res.unwrap(); } diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index a80ff8ff5f..f0be0bd763 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -18,4 +18,4 @@ mod meta_srv; pub use datanode::{setup_datanode_plugins, start_datanode_plugins}; pub use frontend::{setup_frontend_plugins, start_frontend_plugins}; -pub use meta_srv::{setup_meta_srv_plugins, start_meta_srv_plugins}; +pub use meta_srv::{setup_metasrv_plugins, start_metasrv_plugins}; diff --git a/src/plugins/src/meta_srv.rs b/src/plugins/src/meta_srv.rs index 80ac6d8c7f..2974494be5 100644 --- a/src/plugins/src/meta_srv.rs +++ b/src/plugins/src/meta_srv.rs @@ -14,12 +14,12 @@ use common_base::Plugins; use meta_srv::error::Result; -use meta_srv::metasrv::MetaSrvOptions; +use meta_srv::metasrv::MetasrvOptions; -pub async fn setup_meta_srv_plugins(_opts: &mut MetaSrvOptions) -> Result { +pub async fn setup_metasrv_plugins(_opts: &mut MetasrvOptions) -> Result { Ok(Plugins::new()) } -pub async fn start_meta_srv_plugins(_plugins: Plugins) -> Result<()> { +pub async fn start_metasrv_plugins(_plugins: Plugins) -> Result<()> { Ok(()) } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index bc28e8e130..f4c3756ead 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -36,7 +36,7 @@ use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; -use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; +use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; @@ -45,7 +45,7 @@ use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; -use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; +use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; @@ -68,7 +68,7 @@ pub struct GreptimeDbCluster { pub datanode_instances: HashMap, pub kv_backend: KvBackendRef, - pub meta_srv: MetaSrv, + pub metasrv: Metasrv, pub frontend: Arc, } @@ -79,7 +79,7 @@ pub struct GreptimeDbClusterBuilder { store_providers: Option>, datanodes: Option, datanode_wal_config: DatanodeWalConfig, - metasrv_wal_config: MetaSrvWalConfig, + metasrv_wal_config: MetasrvWalConfig, shared_home_dir: Option>, meta_selector: Option, } @@ -110,7 +110,7 @@ impl GreptimeDbClusterBuilder { store_providers: None, datanodes: None, datanode_wal_config: DatanodeWalConfig::default(), - metasrv_wal_config: MetaSrvWalConfig::default(), + metasrv_wal_config: MetasrvWalConfig::default(), shared_home_dir: None, meta_selector: None, } @@ -141,7 +141,7 @@ impl GreptimeDbClusterBuilder { } #[must_use] - pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self { + pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self { self.metasrv_wal_config = metasrv_wal_config; self } @@ -168,7 +168,7 @@ impl GreptimeDbClusterBuilder { let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20)); let datanode_clients = Arc::new(DatanodeClients::new(channel_config)); - let opt = MetaSrvOptions { + let opt = MetasrvOptions { procedure: ProcedureConfig { // Due to large network delay during cross data-center. // We only make max_retry_times and retry_delay large than the default in tests. @@ -180,7 +180,7 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; - let meta_srv = meta_srv::mocks::mock( + let metasrv = meta_srv::mocks::mock( opt, self.kv_backend.clone(), self.meta_selector.clone(), @@ -189,17 +189,15 @@ impl GreptimeDbClusterBuilder { .await; let datanode_instances = self - .build_datanodes_with_options(&meta_srv, &datanode_options) + .build_datanodes_with_options(&metasrv, &datanode_options) .await; build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await; - self.wait_datanodes_alive(meta_srv.meta_srv.meta_peer_client(), datanodes) + self.wait_datanodes_alive(metasrv.metasrv.meta_peer_client(), datanodes) .await; - let frontend = self - .build_frontend(meta_srv.clone(), datanode_clients) - .await; + let frontend = self.build_frontend(metasrv.clone(), datanode_clients).await; test_util::prepare_another_catalog_and_schema(frontend.as_ref()).await; @@ -211,7 +209,7 @@ impl GreptimeDbClusterBuilder { dir_guards, datanode_instances, kv_backend: self.kv_backend.clone(), - meta_srv: meta_srv.meta_srv, + metasrv: metasrv.metasrv, frontend, } } @@ -280,13 +278,13 @@ impl GreptimeDbClusterBuilder { async fn build_datanodes_with_options( &self, - meta_srv: &MockInfo, + metasrv: &MockInfo, options: &[DatanodeOptions], ) -> HashMap { let mut instances = HashMap::with_capacity(options.len()); for opts in options { - let datanode = self.create_datanode(opts.clone(), meta_srv.clone()).await; + let datanode = self.create_datanode(opts.clone(), metasrv.clone()).await; instances.insert(opts.node_id.unwrap(), datanode); } @@ -312,14 +310,14 @@ impl GreptimeDbClusterBuilder { panic!("Some Datanodes are not alive in 10 seconds!") } - async fn create_datanode(&self, opts: DatanodeOptions, meta_srv: MockInfo) -> Datanode { + async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode { let mut meta_client = MetaClientBuilder::new(1000, opts.node_id.unwrap(), Role::Datanode) .enable_router() .enable_store() .enable_heartbeat() - .channel_manager(meta_srv.channel_manager) + .channel_manager(metasrv.channel_manager) .build(); - meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); + meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_backend = Arc::new(MetaKvBackend { client: Arc::new(meta_client.clone()), @@ -339,18 +337,18 @@ impl GreptimeDbClusterBuilder { async fn build_frontend( &self, - meta_srv: MockInfo, + metasrv: MockInfo, datanode_clients: Arc, ) -> Arc { let mut meta_client = MetaClientBuilder::new(1000, 0, Role::Frontend) .enable_router() .enable_store() .enable_heartbeat() - .channel_manager(meta_srv.channel_manager) + .channel_manager(metasrv.channel_manager) .enable_procedure() .enable_access_cluster_info() .build(); - meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); + meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); let cached_meta_backend = diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 6b4340db27..72b36dad5d 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -30,7 +30,7 @@ use common_meta::wal_options_allocator::WalOptionsAllocator; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; -use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; +use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; @@ -51,7 +51,7 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, datanode_wal_config: DatanodeWalConfig, - metasrv_wal_config: MetaSrvWalConfig, + metasrv_wal_config: MetasrvWalConfig, store_providers: Option>, default_store: Option, plugin: Option, @@ -65,7 +65,7 @@ impl GreptimeDbStandaloneBuilder { plugin: None, default_store: None, datanode_wal_config: DatanodeWalConfig::default(), - metasrv_wal_config: MetaSrvWalConfig::default(), + metasrv_wal_config: MetasrvWalConfig::default(), } } @@ -102,7 +102,7 @@ impl GreptimeDbStandaloneBuilder { } #[must_use] - pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self { + pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self { self.metasrv_wal_config = metasrv_wal_config; self } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 22c0d591de..65f91e7f5e 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -38,7 +38,7 @@ impl MockDistributedInstance { } pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { - self.0.meta_srv.table_metadata_manager() + self.0.metasrv.table_metadata_manager() } } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 43f3981fee..01ca50f29f 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -20,8 +20,8 @@ use common_query::Output; use common_recordbatch::util; use common_telemetry::warn; use common_test_util::find_workspace_path; -use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig}; -use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; +use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; +use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use frontend::instance::Instance; use rstest_reuse::{self, template}; @@ -227,7 +227,7 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option RegionDistribution { - let manager = cluster.meta_srv.table_metadata_manager(); + let manager = cluster.metasrv.table_metadata_manager(); let region_distribution = manager .table_route_manager() .get_region_distribution(table_id) @@ -343,27 +343,27 @@ async fn run_region_failover_procedure( failed_region: RegionIdent, selector: SelectorRef, ) { - let meta_srv = &cluster.meta_srv; - let procedure_manager = meta_srv.procedure_manager(); + let metasrv = &cluster.metasrv; + let procedure_manager = metasrv.procedure_manager(); let procedure = RegionFailoverProcedure::new( "greptime".into(), "public".into(), failed_region.clone(), RegionFailoverContext { region_lease_secs: 10, - in_memory: meta_srv.in_memory().clone(), - kv_backend: meta_srv.kv_backend().clone(), - mailbox: meta_srv.mailbox().clone(), + in_memory: metasrv.in_memory().clone(), + kv_backend: metasrv.kv_backend().clone(), + mailbox: metasrv.mailbox().clone(), selector, selector_ctx: SelectorContext { datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS, - server_addr: meta_srv.options().server_addr.clone(), - kv_backend: meta_srv.kv_backend().clone(), - meta_peer_client: meta_srv.meta_peer_client().clone(), + server_addr: metasrv.options().server_addr.clone(), + kv_backend: metasrv.kv_backend().clone(), + meta_peer_client: metasrv.meta_peer_client().clone(), table_id: None, }, - dist_lock: meta_srv.lock().clone(), - table_metadata_manager: meta_srv.table_metadata_manager().clone(), + dist_lock: metasrv.lock().clone(), + table_metadata_manager: metasrv.table_metadata_manager().clone(), }, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 79f7a3d38a..1ceb249978 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -23,8 +23,8 @@ use common_recordbatch::RecordBatches; use common_telemetry::info; use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir::create_temp_dir; -use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig}; -use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; +use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; +use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datatypes::prelude::ScalarVector; use datatypes::value::Value; use datatypes::vectors::{Helper, UInt64Vector}; @@ -116,7 +116,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec