chore: unify name metasrv (#3671)

chore: unify name
This commit is contained in:
JeremyHi
2024-04-09 11:03:26 +08:00
committed by GitHub
parent b3c94a303b
commit 1629435888
34 changed files with 242 additions and 244 deletions

View File

@@ -39,7 +39,7 @@ body:
- Query Engine
- Table Engine
- Write Protocols
- MetaSrv
- Metasrv
- Frontend
- Datanode
- Other

View File

@@ -27,8 +27,8 @@ subgraph Frontend["Frontend"]
end
end
MyTable --> MetaSrv
MetaSrv --> ETCD
MyTable --> Metasrv
Metasrv --> ETCD
MyTable-->TableEngine0
MyTable-->TableEngine1
@@ -95,8 +95,8 @@ subgraph Frontend["Frontend"]
end
end
MyTable --> MetaSrv
MetaSrv --> ETCD
MyTable --> Metasrv
Metasrv --> ETCD
MyTable-->RegionEngine
MyTable-->RegionEngine1

View File

@@ -36,7 +36,7 @@ Hence, we choose the third option, and use a simple logical plan that's anagonis
## Deploy mode and protocol
- Greptime Flow is an independent streaming compute component. It can be used either within a standalone node or as a dedicated node at the same level as frontend in distributed mode.
- It accepts insert request Rows, which is used between frontend and datanode.
- New flow job is submitted in the format of modified SQL query like snowflake do, like: `CREATE TASK avg_over_5m WINDOW_SIZE = "5m" AS SELECT avg(value) FROM table WHERE time > now() - 5m GROUP BY time(1m)`. Flow job then got stored in MetaSrv.
- New flow job is submitted in the format of modified SQL query like snowflake do, like: `CREATE TASK avg_over_5m WINDOW_SIZE = "5m" AS SELECT avg(value) FROM table WHERE time > now() - 5m GROUP BY time(1m)`. Flow job then got stored in Metasrv.
- It also persists results in the format of Rows to frontend.
- The query plan uses Substrait as codec format. It's the same with GreptimeDB's query engine.
- Greptime Flow needs a WAL for recovering. It's possible to reuse datanode's.

View File

@@ -216,7 +216,7 @@ pub enum Error {
},
#[snafu(display("Failed to perform metasrv operation"))]
MetaSrv {
Metasrv {
location: Location,
source: meta_client::error::Error,
},
@@ -304,7 +304,7 @@ impl ErrorExt for Error {
| Error::CreateTable { source, .. }
| Error::TableSchemaMismatch { source, .. } => source.status_code(),
Error::MetaSrv { source, .. } => source.status_code(),
Error::Metasrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source, .. } => source.status_code(),
Error::SystemCatalogTableScanExec { source, .. } => source.status_code(),
Error::InvalidTableInfoInCatalog { source, .. } => source.status_code(),

View File

@@ -17,8 +17,8 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_telemetry::logging;
use meta_srv::bootstrap::MetaSrvInstance;
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
use crate::error::{self, Result, StartMetaServerSnafu};
@@ -26,11 +26,11 @@ use crate::options::{CliOptions, Options};
use crate::App;
pub struct Instance {
instance: MetaSrvInstance,
instance: MetasrvInstance,
}
impl Instance {
fn new(instance: MetaSrvInstance) -> Self {
fn new(instance: MetasrvInstance) -> Self {
Self { instance }
}
}
@@ -42,7 +42,7 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
plugins::start_meta_srv_plugins(self.instance.plugins())
plugins::start_metasrv_plugins(self.instance.plugins())
.await
.context(StartMetaServerSnafu)?;
@@ -64,7 +64,7 @@ pub struct Command {
}
impl Command {
pub async fn build(self, opts: MetaSrvOptions) -> Result<Instance> {
pub async fn build(self, opts: MetasrvOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
@@ -79,7 +79,7 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: MetaSrvOptions) -> Result<Instance> {
async fn build(self, opts: MetasrvOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
@@ -127,10 +127,10 @@ struct StartCommand {
impl StartCommand {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut opts: MetaSrvOptions = Options::load_layered_options(
let mut opts: MetasrvOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
MetaSrvOptions::env_list_keys(),
MetasrvOptions::env_list_keys(),
)?;
if let Some(dir) = &cli_options.log_dir {
@@ -193,20 +193,20 @@ impl StartCommand {
Ok(Options::Metasrv(Box::new(opts)))
}
async fn build(self, mut opts: MetaSrvOptions) -> Result<Instance> {
let plugins = plugins::setup_meta_srv_plugins(&mut opts)
async fn build(self, mut opts: MetasrvOptions) -> Result<Instance> {
let plugins = plugins::setup_metasrv_plugins(&mut opts)
.await
.context(StartMetaServerSnafu)?;
logging::info!("MetaSrv start command: {:#?}", self);
logging::info!("MetaSrv options: {:#?}", opts);
logging::info!("Metasrv start command: {:#?}", self);
logging::info!("Metasrv options: {:#?}", opts);
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
let instance = MetaSrvInstance::new(opts, plugins, metasrv)
let instance = MetasrvInstance::new(opts, plugins, metasrv)
.await
.context(error::BuildMetaServerSnafu)?;

View File

@@ -15,12 +15,12 @@
use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::MetaSrvWalConfig;
use common_wal::config::MetasrvWalConfig;
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
use frontend::error::{Result as FeResult, TomlFormatSnafu};
use frontend::frontend::{FrontendOptions, TomlSerializable};
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::metasrv::MetasrvOptions;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -38,7 +38,7 @@ pub struct MixOptions {
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub wal_meta: MetaSrvWalConfig,
pub wal_meta: MetasrvWalConfig,
}
impl From<MixOptions> for FrontendOptions {
@@ -56,7 +56,7 @@ impl TomlSerializable for MixOptions {
pub enum Options {
Datanode(Box<DatanodeOptions>),
Frontend(Box<FrontendOptions>),
Metasrv(Box<MetaSrvOptions>),
Metasrv(Box<MetasrvOptions>),
Standalone(Box<MixOptions>),
Cli(Box<LoggingOptions>),
}

View File

@@ -17,7 +17,7 @@ pub mod kafka;
use std::collections::HashMap;
use std::sync::Arc;
use common_wal::config::MetaSrvWalConfig;
use common_wal::config::MetasrvWalConfig;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
@@ -39,10 +39,10 @@ pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: MetaSrvWalConfig, kv_backend: KvBackendRef) -> Self {
pub fn new(config: MetasrvWalConfig, kv_backend: KvBackendRef) -> Self {
match config {
MetaSrvWalConfig::RaftEngine => Self::RaftEngine,
MetaSrvWalConfig::Kafka(kafka_config) => {
MetasrvWalConfig::RaftEngine => Self::RaftEngine,
MetasrvWalConfig::Kafka(kafka_config) => {
Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend))
}
}
@@ -118,7 +118,7 @@ pub fn prepare_wal_options(
#[cfg(test)]
mod tests {
use common_wal::config::kafka::MetaSrvKafkaConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
@@ -129,7 +129,7 @@ mod tests {
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = MetaSrvWalConfig::RaftEngine;
let wal_config = MetasrvWalConfig::RaftEngine;
let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
allocator.start().await.unwrap();
@@ -155,7 +155,7 @@ mod tests {
.collect::<Vec<_>>();
// Creates a topic manager.
let config = MetaSrvKafkaConfig {
let config = MetasrvKafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use common_telemetry::{error, info};
use common_wal::config::kafka::MetaSrvKafkaConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::TopicSelectorType;
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
@@ -46,7 +46,7 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: MetaSrvKafkaConfig,
config: MetasrvKafkaConfig,
pub(crate) topic_pool: Vec<String>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
@@ -54,7 +54,7 @@ pub struct TopicManager {
impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: MetaSrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
@@ -283,7 +283,7 @@ mod tests {
.collect::<Vec<_>>();
// Creates a topic manager.
let config = MetaSrvKafkaConfig {
let config = MetasrvKafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()

View File

@@ -17,16 +17,16 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use crate::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
/// Wal configurations for metasrv.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum MetaSrvWalConfig {
pub enum MetasrvWalConfig {
#[default]
RaftEngine,
Kafka(MetaSrvKafkaConfig),
Kafka(MetasrvKafkaConfig),
}
/// Wal configurations for datanode.
@@ -57,11 +57,11 @@ impl Default for StandaloneWalConfig {
}
}
impl From<StandaloneWalConfig> for MetaSrvWalConfig {
impl From<StandaloneWalConfig> for MetasrvWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine,
StandaloneWalConfig::Kafka(config) => Self::Kafka(MetaSrvKafkaConfig {
StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
@@ -100,7 +100,7 @@ mod tests {
use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::TopicSelectorType;
#[test]
@@ -109,8 +109,8 @@ mod tests {
let toml_str = r#"
provider = "raft_engine"
"#;
let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(metasrv_wal_config, MetaSrvWalConfig::RaftEngine);
let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(
@@ -166,9 +166,9 @@ mod tests {
backoff_deadline = "5mins"
"#;
// Deserialized to MetaSrvWalConfig.
let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetaSrvKafkaConfig {
// Deserialized to MetasrvWalConfig.
let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetasrvKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
@@ -183,7 +183,7 @@ mod tests {
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected));
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
// Deserialized to DatanodeWalConfig.
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();

View File

@@ -18,5 +18,5 @@ pub mod metasrv;
pub mod standalone;
pub use datanode::DatanodeKafkaConfig;
pub use metasrv::MetaSrvKafkaConfig;
pub use metasrv::MetasrvKafkaConfig;
pub use standalone::StandaloneKafkaConfig;

View File

@@ -22,7 +22,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetaSrvKafkaConfig {
pub struct MetasrvKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The number of topics to be created upon start.
@@ -43,7 +43,7 @@ pub struct MetaSrvKafkaConfig {
pub backoff: BackoffConfig,
}
impl Default for MetaSrvKafkaConfig {
impl Default for MetasrvKafkaConfig {
fn default() -> Self {
let broker_endpoints = vec![BROKER_ENDPOINT.to_string()];
let replication_factor = broker_endpoints.len() as i16;

View File

@@ -41,8 +41,8 @@ use crate::election::etcd::EtcdElection;
use crate::error::InitExportMetricsTaskSnafu;
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::SelectorType;
@@ -50,12 +50,12 @@ use crate::service::admin;
use crate::{error, Result};
#[derive(Clone)]
pub struct MetaSrvInstance {
meta_srv: MetaSrv,
pub struct MetasrvInstance {
metasrv: Metasrv,
http_srv: Arc<HttpServer>,
httpsrv: Arc<HttpServer>,
opts: MetaSrvOptions,
opts: MetasrvOptions,
signal_sender: Option<Sender<()>>,
@@ -64,25 +64,25 @@ pub struct MetaSrvInstance {
export_metrics_task: Option<ExportMetricsTask>,
}
impl MetaSrvInstance {
impl MetasrvInstance {
pub async fn new(
opts: MetaSrvOptions,
opts: MetasrvOptions,
plugins: Plugins,
meta_srv: MetaSrv,
) -> Result<MetaSrvInstance> {
let http_srv = Arc::new(
metasrv: Metasrv,
) -> Result<MetasrvInstance> {
let httpsrv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.build(),
);
// put meta_srv into plugins for later use
plugins.insert::<Arc<MetaSrv>>(Arc::new(meta_srv.clone()));
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(Arc::new(metasrv.clone()));
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
Ok(MetaSrvInstance {
meta_srv,
http_srv,
Ok(MetasrvInstance {
metasrv,
httpsrv,
opts,
signal_sender: None,
plugins,
@@ -91,7 +91,7 @@ impl MetaSrvInstance {
}
pub async fn start(&mut self) -> Result<()> {
self.meta_srv.try_start().await?;
self.metasrv.try_start().await?;
if let Some(t) = self.export_metrics_task.as_ref() {
t.start(None).context(InitExportMetricsTaskSnafu)?
@@ -101,23 +101,23 @@ impl MetaSrvInstance {
self.signal_sender = Some(tx);
let mut router = router(self.meta_srv.clone());
if let Some(configurator) = self.meta_srv.plugins().get::<ConfiguratorRef>() {
let mut router = router(self.metasrv.clone());
if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
router = configurator.config_grpc(router);
}
let meta_srv = bootstrap_meta_srv_with_router(&self.opts.bind_addr, router, rx);
let metasrv = bootstrap_metasrv_with_router(&self.opts.bind_addr, router, rx);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let http_srv = async {
self.http_srv
self.httpsrv
.start(addr)
.await
.map(|_| ())
.context(error::StartHttpSnafu)
};
future::try_join(meta_srv, http_srv).await?;
future::try_join(metasrv, http_srv).await?;
Ok(())
}
@@ -128,12 +128,12 @@ impl MetaSrvInstance {
.await
.context(error::SendShutdownSignalSnafu)?;
}
self.meta_srv.shutdown().await?;
self.http_srv
self.metasrv.shutdown().await?;
self.httpsrv
.shutdown()
.await
.context(error::ShutdownServerSnafu {
server: self.http_srv.name(),
server: self.httpsrv.name(),
})?;
Ok(())
}
@@ -143,7 +143,7 @@ impl MetaSrvInstance {
}
}
pub async fn bootstrap_meta_srv_with_router(
pub async fn bootstrap_metasrv_with_router(
bind_addr: &str,
router: Router,
mut signal: Receiver<()>,
@@ -167,22 +167,22 @@ pub async fn bootstrap_meta_srv_with_router(
Ok(())
}
pub fn router(meta_srv: MetaSrv) -> Router {
pub fn router(metasrv: Metasrv) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(LockServer::new(meta_srv.clone()))
.add_service(ProcedureServiceServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv))
.add_service(HeartbeatServer::new(metasrv.clone()))
.add_service(StoreServer::new(metasrv.clone()))
.add_service(ClusterServer::new(metasrv.clone()))
.add_service(LockServer::new(metasrv.clone()))
.add_service(ProcedureServiceServer::new(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
}
pub async fn metasrv_builder(
opts: &MetaSrvOptions,
opts: &MetasrvOptions,
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetaSrvBuilder> {
) -> Result<MetasrvBuilder> {
let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) {
(Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)),
(None, true) => (
@@ -229,7 +229,7 @@ pub async fn metasrv_builder(
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
};
Ok(MetaSrvBuilder::new()
Ok(MetasrvBuilder::new()
.options(opts.clone())
.kv_backend(kv_backend)
.in_memory(in_memory)
@@ -239,7 +239,7 @@ pub async fn metasrv_builder(
.plugins(plugins))
}
async fn create_etcd_client(opts: &MetaSrvOptions) -> Result<Client> {
async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
let etcd_endpoints = opts
.store_addr
.split(',')

View File

@@ -22,7 +22,7 @@ use tokio::sync::broadcast::Receiver;
use crate::error::Result;
pub const ELECTION_KEY: &str = "__meta_srv_election";
pub const ELECTION_KEY: &str = "__metasrv_election";
#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {

View File

@@ -363,7 +363,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("MetaSrv has no leader at this moment"))]
#[snafu(display("Metasrv has no leader at this moment"))]
NoLeader { location: Location },
#[snafu(display("Table {} not found", name))]

View File

@@ -115,7 +115,7 @@ mod tests {
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::builder::MetasrvBuilder;
use crate::test_util::create_region_failover_manager;
#[tokio::test(flavor = "multi_thread")]
@@ -129,7 +129,7 @@ mod tests {
let req = &HeartbeatRequest::default();
let builder = MetaSrvBuilder::new();
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

View File

@@ -38,7 +38,7 @@ impl HeartbeatHandler for OnLeaderStartHandler {
if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// TODO(weny): Unifies the multiple leader state between Context and Metasrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();

View File

@@ -116,7 +116,7 @@ mod test {
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::builder::MetasrvBuilder;
fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
@@ -170,7 +170,7 @@ mod test {
.await
.unwrap();
let builder = MetaSrvBuilder::new();
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
@@ -317,7 +317,7 @@ mod test {
.await
.unwrap();
let builder = MetaSrvBuilder::new();
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();

View File

@@ -33,7 +33,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::{error, info, warn};
use common_wal::config::MetaSrvWalConfig;
use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
@@ -63,7 +63,7 @@ pub const METASRV_HOME: &str = "/tmp/metasrv";
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetaSrvOptions {
pub struct MetasrvOptions {
pub bind_addr: String,
pub server_addr: String,
pub store_addr: String,
@@ -77,7 +77,7 @@ pub struct MetaSrvOptions {
pub datanode: DatanodeOptions,
pub enable_telemetry: bool,
pub data_home: String,
pub wal: MetaSrvWalConfig,
pub wal: MetasrvWalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: String,
/// The max operations per txn
@@ -93,13 +93,13 @@ pub struct MetaSrvOptions {
pub max_txn_ops: usize,
}
impl MetaSrvOptions {
impl MetasrvOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}
impl Default for MetaSrvOptions {
impl Default for MetasrvOptions {
fn default() -> Self {
Self {
bind_addr: "127.0.0.1:3002".to_string(),
@@ -124,7 +124,7 @@ impl Default for MetaSrvOptions {
datanode: DatanodeOptions::default(),
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),
wal: MetaSrvWalConfig::default(),
wal: MetasrvWalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),
max_txn_ops: 128,
@@ -132,7 +132,7 @@ impl Default for MetaSrvOptions {
}
}
impl MetaSrvOptions {
impl MetasrvOptions {
pub fn to_toml_string(&self) -> String {
toml::to_string(&self).unwrap()
}
@@ -253,10 +253,10 @@ impl MetaStateHandler {
}
#[derive(Clone)]
pub struct MetaSrv {
pub struct Metasrv {
state: StateRef,
started: Arc<AtomicBool>,
options: MetaSrvOptions,
options: MetasrvOptions,
// It is only valid at the leader node and is used to temporarily
// store some data that will not be persisted.
in_memory: ResettableKvBackendRef,
@@ -279,14 +279,14 @@ pub struct MetaSrv {
plugins: Plugins,
}
impl MetaSrv {
impl Metasrv {
pub async fn try_start(&self) -> Result<()> {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
warn!("MetaSrv already started");
warn!("Metasrv already started");
return Ok(());
}
@@ -347,11 +347,11 @@ impl MetaSrv {
while started.load(Ordering::Relaxed) {
let res = election.campaign().await;
if let Err(e) = res {
warn!("MetaSrv election error: {}", e);
warn!("Metasrv election error: {}", e);
}
info!("MetaSrv re-initiate election");
info!("Metasrv re-initiate election");
}
info!("MetaSrv stopped");
info!("Metasrv stopped");
});
} else {
if let Err(e) = self.wal_options_allocator.start().await {
@@ -368,7 +368,7 @@ impl MetaSrv {
.context(StartProcedureManagerSnafu)?;
}
info!("MetaSrv started");
info!("Metasrv started");
Ok(())
}
@@ -403,7 +403,7 @@ impl MetaSrv {
}
#[inline]
pub fn options(&self) -> &MetaSrvOptions {
pub fn options(&self) -> &MetasrvOptions {
&self.options
}

View File

@@ -57,7 +57,7 @@ use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers};
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
@@ -70,8 +70,8 @@ use crate::state::State;
use crate::table_meta_alloc::MetasrvPeerAllocator;
// TODO(fys): try use derive_builder macro
pub struct MetaSrvBuilder {
options: Option<MetaSrvOptions>,
pub struct MetasrvBuilder {
options: Option<MetasrvOptions>,
kv_backend: Option<KvBackendRef>,
in_memory: Option<ResettableKvBackendRef>,
selector: Option<SelectorRef>,
@@ -84,7 +84,7 @@ pub struct MetaSrvBuilder {
table_metadata_allocator: Option<TableMetadataAllocatorRef>,
}
impl MetaSrvBuilder {
impl MetasrvBuilder {
pub fn new() -> Self {
Self {
kv_backend: None,
@@ -101,7 +101,7 @@ impl MetaSrvBuilder {
}
}
pub fn options(mut self, options: MetaSrvOptions) -> Self {
pub fn options(mut self, options: MetasrvOptions) -> Self {
self.options = Some(options);
self
}
@@ -159,10 +159,10 @@ impl MetaSrvBuilder {
self
}
pub async fn build(self) -> Result<MetaSrv> {
pub async fn build(self) -> Result<Metasrv> {
let started = Arc::new(AtomicBool::new(false));
let MetaSrvBuilder {
let MetasrvBuilder {
election,
meta_peer_client,
options,
@@ -320,7 +320,7 @@ impl MetaSrvBuilder {
let enable_telemetry = options.enable_telemetry;
let metasrv_home = options.data_home.to_string();
Ok(MetaSrv {
Ok(Metasrv {
state,
started,
options,
@@ -373,7 +373,7 @@ fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
}
fn build_procedure_manager(
options: &MetaSrvOptions,
options: &MetasrvOptions,
kv_backend: &KvBackendRef,
) -> ProcedureManagerRef {
let manager_config = ManagerConfig {
@@ -391,7 +391,7 @@ fn build_procedure_manager(
}
fn build_ddl_manager(
options: &MetaSrvOptions,
options: &MetasrvOptions,
datanode_clients: Option<DatanodeManagerRef>,
procedure_manager: &ProcedureManagerRef,
mailbox: &MailboxRef,
@@ -431,7 +431,7 @@ fn build_ddl_manager(
))
}
impl Default for MetaSrvBuilder {
impl Default for MetasrvBuilder {
fn default() -> Self {
Self::new()
}

View File

@@ -26,14 +26,14 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tower::service_fn;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
#[derive(Clone)]
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
pub meta_srv: MetaSrv,
pub metasrv: Metasrv,
}
pub async fn mock_with_memstore() -> MockInfo {
@@ -52,7 +52,7 @@ pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo
}
pub async fn mock(
opts: MetaSrvOptions,
opts: MetasrvOptions,
kv_backend: KvBackendRef,
selector: Option<SelectorRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
@@ -62,7 +62,7 @@ pub async fn mock(
table_metadata_manager.init().await.unwrap();
let builder = MetaSrvBuilder::new().options(opts).kv_backend(kv_backend);
let builder = MetasrvBuilder::new().options(opts).kv_backend(kv_backend);
let builder = match selector {
Some(s) => builder.selector(s),
@@ -74,11 +74,11 @@ pub async fn mock(
None => builder,
};
let meta_srv = builder.build().await.unwrap();
meta_srv.try_start().await.unwrap();
let metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();
let (client, server) = tokio::io::duplex(1024);
let service = meta_srv.clone();
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::new(service.clone()))
@@ -119,6 +119,6 @@ pub async fn mock(
MockInfo {
server_addr,
channel_manager,
meta_srv,
metasrv,
}
}

View File

@@ -31,20 +31,20 @@ use tonic::body::BoxBody;
use tonic::codegen::{empty_body, http, BoxFuture, Service};
use tonic::transport::NamedService;
use crate::metasrv::MetaSrv;
use crate::metasrv::Metasrv;
pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
pub fn make_admin_service(metasrv: Metasrv) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);
let router = router.route(
"/node-lease",
node_lease::NodeLeaseHandler {
meta_peer_client: meta_srv.meta_peer_client().clone(),
meta_peer_client: metasrv.meta_peer_client().clone(),
},
);
let handler = heartbeat::HeartBeatHandler {
meta_peer_client: meta_srv.meta_peer_client().clone(),
meta_peer_client: metasrv.meta_peer_client().clone(),
};
let router = router
.route("/heartbeat", handler.clone())
@@ -53,26 +53,26 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
let router = router.route(
"/catalogs",
meta::CatalogsHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
},
);
let handler = meta::SchemasHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
};
let router = router
.route("/schemas", handler.clone())
.route("/schemas/help", handler);
let handler = meta::TablesHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
};
let router = router
.route("/tables", handler.clone())
.route("/tables/help", handler);
let handler = meta::TableHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
};
let router = router
.route("/table", handler.clone())
@@ -81,27 +81,27 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
let router = router.route(
"/leader",
leader::LeaderHandler {
election: meta_srv.election().cloned(),
election: metasrv.election().cloned(),
},
);
let handler = route::RouteHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
};
let router = router
.route("/route", handler.clone())
.route("/route/help", handler);
let handler = region_migration::SubmitRegionMigrationTaskHandler {
region_migration_manager: meta_srv.region_migration_manager().clone(),
meta_peer_client: meta_srv.meta_peer_client().clone(),
region_migration_manager: metasrv.region_migration_manager().clone(),
meta_peer_client: metasrv.meta_peer_client().clone(),
};
let router = router.route("/region-migration", handler);
let router = router.route(
"/maintenance",
maintenance::MaintenanceHandler {
kv_backend: meta_srv.kv_backend().clone(),
kv_backend: metasrv.kv_backend().clone(),
},
);
let router = Router::nest("/admin", router);

View File

@@ -21,11 +21,11 @@ use snafu::ResultExt;
use tonic::{Request, Response};
use crate::error;
use crate::metasrv::MetaSrv;
use crate::metasrv::Metasrv;
use crate::service::GrpcResult;
#[async_trait::async_trait]
impl cluster_server::Cluster for MetaSrv {
impl cluster_server::Cluster for Metasrv {
async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader());
@@ -73,7 +73,7 @@ impl cluster_server::Cluster for MetaSrv {
}
}
impl MetaSrv {
impl Metasrv {
pub fn is_leader(&self) -> bool {
self.election().map(|x| x.is_leader()).unwrap_or(false)
}

View File

@@ -30,11 +30,11 @@ use tonic::{Request, Response, Streaming};
use crate::error;
use crate::error::Result;
use crate::handler::Pusher;
use crate::metasrv::{Context, MetaSrv};
use crate::metasrv::{Context, Metasrv};
use crate::service::{GrpcResult, GrpcStream};
#[async_trait::async_trait]
impl heartbeat_server::Heartbeat for MetaSrv {
impl heartbeat_server::Heartbeat for Metasrv {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
async fn heartbeat(
@@ -179,13 +179,13 @@ mod tests {
use tonic::IntoRequest;
use super::get_node_id;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::builder::MetasrvBuilder;
#[tokio::test]
async fn test_ask_leader() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let meta_srv = MetaSrvBuilder::new()
let metasrv = MetasrvBuilder::new()
.kv_backend(kv_backend)
.build()
.await
@@ -195,10 +195,10 @@ mod tests {
header: Some(RequestHeader::new((1, 1), Role::Datanode, W3cTrace::new())),
};
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();
let res = metasrv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner();
assert_eq!(1, res.header.unwrap().cluster_id);
assert_eq!(meta_srv.options().bind_addr, res.leader.unwrap().addr);
assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr);
}
#[test]

View File

@@ -17,10 +17,10 @@ use tonic::{Request, Response};
use super::GrpcResult;
use crate::lock::Opts;
use crate::metasrv::MetaSrv;
use crate::metasrv::Metasrv;
#[async_trait::async_trait]
impl lock_server::Lock for MetaSrv {
impl lock_server::Lock for Metasrv {
async fn lock(&self, request: Request<LockRequest>) -> GrpcResult<LockResponse> {
let LockRequest {
name, expire_secs, ..

View File

@@ -27,11 +27,11 @@ use tonic::{Request, Response};
use super::GrpcResult;
use crate::error;
use crate::metasrv::MetaSrv;
use crate::metasrv::Metasrv;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTask;
#[async_trait::async_trait]
impl procedure_service_server::ProcedureService for MetaSrv {
impl procedure_service_server::ProcedureService for Metasrv {
async fn query(
&self,
request: Request<QueryProcedureRequest>,

View File

@@ -32,12 +32,12 @@ use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use crate::error::{self, MissingRequestHeaderSnafu};
use crate::metasrv::MetaSrv;
use crate::metasrv::Metasrv;
use crate::metrics::METRIC_META_KV_REQUEST_ELAPSED;
use crate::service::GrpcResult;
#[async_trait::async_trait]
impl store_server::Store for MetaSrv {
impl store_server::Store for Metasrv {
async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
let req = req.into_inner();
@@ -260,11 +260,11 @@ mod tests {
use common_telemetry::tracing_context::W3cTrace;
use tonic::IntoRequest;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::MetaSrv;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::Metasrv;
async fn new_meta_srv() -> MetaSrv {
MetaSrvBuilder::new()
async fn new_metasrv() -> Metasrv {
MetasrvBuilder::new()
.kv_backend(Arc::new(MemoryKvBackend::new()))
.build()
.await
@@ -273,77 +273,77 @@ mod tests {
#[tokio::test]
async fn test_range() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = RangeRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.range(req.into_request()).await;
let res = metasrv.range(req.into_request()).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_put() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = PutRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.put(req.into_request()).await;
let res = metasrv.put(req.into_request()).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_batch_get() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = BatchGetRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_get(req.into_request()).await;
let res = metasrv.batch_get(req.into_request()).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_batch_put() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = BatchPutRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_put(req.into_request()).await;
let res = metasrv.batch_put(req.into_request()).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_batch_delete() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = BatchDeleteRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_delete(req.into_request()).await;
let res = metasrv.batch_delete(req.into_request()).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_compare_and_put() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = CompareAndPutRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.compare_and_put(req.into_request()).await;
let res = metasrv.compare_and_put(req.into_request()).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_delete_range() {
let meta_srv = new_meta_srv().await;
let metasrv = new_metasrv().await;
let mut req = DeleteRangeRequest::default();
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.delete_range(req.into_request()).await;
let res = metasrv.delete_range(req.into_request()).await;
let _ = res.unwrap();
}

View File

@@ -18,4 +18,4 @@ mod meta_srv;
pub use datanode::{setup_datanode_plugins, start_datanode_plugins};
pub use frontend::{setup_frontend_plugins, start_frontend_plugins};
pub use meta_srv::{setup_meta_srv_plugins, start_meta_srv_plugins};
pub use meta_srv::{setup_metasrv_plugins, start_metasrv_plugins};

View File

@@ -14,12 +14,12 @@
use common_base::Plugins;
use meta_srv::error::Result;
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::metasrv::MetasrvOptions;
pub async fn setup_meta_srv_plugins(_opts: &mut MetaSrvOptions) -> Result<Plugins> {
pub async fn setup_metasrv_plugins(_opts: &mut MetasrvOptions) -> Result<Plugins> {
Ok(Plugins::new())
}
pub async fn start_meta_srv_plugins(_plugins: Plugins) -> Result<()> {
pub async fn start_metasrv_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}

View File

@@ -36,7 +36,7 @@ use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
@@ -45,7 +45,7 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
@@ -68,7 +68,7 @@ pub struct GreptimeDbCluster {
pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub meta_srv: MetaSrv,
pub metasrv: Metasrv,
pub frontend: Arc<FeInstance>,
}
@@ -79,7 +79,7 @@ pub struct GreptimeDbClusterBuilder {
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
datanode_wal_config: DatanodeWalConfig,
metasrv_wal_config: MetaSrvWalConfig,
metasrv_wal_config: MetasrvWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}
@@ -110,7 +110,7 @@ impl GreptimeDbClusterBuilder {
store_providers: None,
datanodes: None,
datanode_wal_config: DatanodeWalConfig::default(),
metasrv_wal_config: MetaSrvWalConfig::default(),
metasrv_wal_config: MetasrvWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
@@ -141,7 +141,7 @@ impl GreptimeDbClusterBuilder {
}
#[must_use]
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self {
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self {
self.metasrv_wal_config = metasrv_wal_config;
self
}
@@ -168,7 +168,7 @@ impl GreptimeDbClusterBuilder {
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));
let opt = MetaSrvOptions {
let opt = MetasrvOptions {
procedure: ProcedureConfig {
// Due to large network delay during cross data-center.
// We only make max_retry_times and retry_delay large than the default in tests.
@@ -180,7 +180,7 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};
let meta_srv = meta_srv::mocks::mock(
let metasrv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
self.meta_selector.clone(),
@@ -189,17 +189,15 @@ impl GreptimeDbClusterBuilder {
.await;
let datanode_instances = self
.build_datanodes_with_options(&meta_srv, &datanode_options)
.build_datanodes_with_options(&metasrv, &datanode_options)
.await;
build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;
self.wait_datanodes_alive(meta_srv.meta_srv.meta_peer_client(), datanodes)
self.wait_datanodes_alive(metasrv.metasrv.meta_peer_client(), datanodes)
.await;
let frontend = self
.build_frontend(meta_srv.clone(), datanode_clients)
.await;
let frontend = self.build_frontend(metasrv.clone(), datanode_clients).await;
test_util::prepare_another_catalog_and_schema(frontend.as_ref()).await;
@@ -211,7 +209,7 @@ impl GreptimeDbClusterBuilder {
dir_guards,
datanode_instances,
kv_backend: self.kv_backend.clone(),
meta_srv: meta_srv.meta_srv,
metasrv: metasrv.metasrv,
frontend,
}
}
@@ -280,13 +278,13 @@ impl GreptimeDbClusterBuilder {
async fn build_datanodes_with_options(
&self,
meta_srv: &MockInfo,
metasrv: &MockInfo,
options: &[DatanodeOptions],
) -> HashMap<DatanodeId, Datanode> {
let mut instances = HashMap::with_capacity(options.len());
for opts in options {
let datanode = self.create_datanode(opts.clone(), meta_srv.clone()).await;
let datanode = self.create_datanode(opts.clone(), metasrv.clone()).await;
instances.insert(opts.node_id.unwrap(), datanode);
}
@@ -312,14 +310,14 @@ impl GreptimeDbClusterBuilder {
panic!("Some Datanodes are not alive in 10 seconds!")
}
async fn create_datanode(&self, opts: DatanodeOptions, meta_srv: MockInfo) -> Datanode {
async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode {
let mut meta_client = MetaClientBuilder::new(1000, opts.node_id.unwrap(), Role::Datanode)
.enable_router()
.enable_store()
.enable_heartbeat()
.channel_manager(meta_srv.channel_manager)
.channel_manager(metasrv.channel_manager)
.build();
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_backend = Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone()),
@@ -339,18 +337,18 @@ impl GreptimeDbClusterBuilder {
async fn build_frontend(
&self,
meta_srv: MockInfo,
metasrv: MockInfo,
datanode_clients: Arc<DatanodeClients>,
) -> Arc<FeInstance> {
let mut meta_client = MetaClientBuilder::new(1000, 0, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
.channel_manager(meta_srv.channel_manager)
.channel_manager(metasrv.channel_manager)
.enable_procedure()
.enable_access_cluster_info()
.build();
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let cached_meta_backend =

View File

@@ -30,7 +30,7 @@ use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
@@ -51,7 +51,7 @@ pub struct GreptimeDbStandalone {
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
datanode_wal_config: DatanodeWalConfig,
metasrv_wal_config: MetaSrvWalConfig,
metasrv_wal_config: MetasrvWalConfig,
store_providers: Option<Vec<StorageType>>,
default_store: Option<StorageType>,
plugin: Option<Plugins>,
@@ -65,7 +65,7 @@ impl GreptimeDbStandaloneBuilder {
plugin: None,
default_store: None,
datanode_wal_config: DatanodeWalConfig::default(),
metasrv_wal_config: MetaSrvWalConfig::default(),
metasrv_wal_config: MetasrvWalConfig::default(),
}
}
@@ -102,7 +102,7 @@ impl GreptimeDbStandaloneBuilder {
}
#[must_use]
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self {
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self {
self.metasrv_wal_config = metasrv_wal_config;
self
}

View File

@@ -38,7 +38,7 @@ impl MockDistributedInstance {
}
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
self.0.meta_srv.table_metadata_manager()
self.0.metasrv.table_metadata_manager()
}
}

View File

@@ -20,8 +20,8 @@ use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use frontend::instance::Instance;
use rstest_reuse::{self, template};
@@ -227,7 +227,7 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMoc
broker_endpoints: endpoints.clone(),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,
@@ -257,7 +257,7 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
broker_endpoints: endpoints.clone(),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,

View File

@@ -258,7 +258,7 @@ async fn find_region_distribution(
cluster: &GreptimeDbCluster,
table_id: TableId,
) -> RegionDistribution {
let manager = cluster.meta_srv.table_metadata_manager();
let manager = cluster.metasrv.table_metadata_manager();
let region_distribution = manager
.table_route_manager()
.get_region_distribution(table_id)
@@ -343,27 +343,27 @@ async fn run_region_failover_procedure(
failed_region: RegionIdent,
selector: SelectorRef,
) {
let meta_srv = &cluster.meta_srv;
let procedure_manager = meta_srv.procedure_manager();
let metasrv = &cluster.metasrv;
let procedure_manager = metasrv.procedure_manager();
let procedure = RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
RegionFailoverContext {
region_lease_secs: 10,
in_memory: meta_srv.in_memory().clone(),
kv_backend: meta_srv.kv_backend().clone(),
mailbox: meta_srv.mailbox().clone(),
in_memory: metasrv.in_memory().clone(),
kv_backend: metasrv.kv_backend().clone(),
mailbox: metasrv.mailbox().clone(),
selector,
selector_ctx: SelectorContext {
datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS,
server_addr: meta_srv.options().server_addr.clone(),
kv_backend: meta_srv.kv_backend().clone(),
meta_peer_client: meta_srv.meta_peer_client().clone(),
server_addr: metasrv.options().server_addr.clone(),
kv_backend: metasrv.kv_backend().clone(),
meta_peer_client: metasrv.meta_peer_client().clone(),
table_id: None,
},
dist_lock: meta_srv.lock().clone(),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
dist_lock: metasrv.lock().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
},
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

View File

@@ -23,8 +23,8 @@ use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::{Helper, UInt64Vector};
@@ -116,7 +116,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
linger: Duration::from_millis(25),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -127,7 +127,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
.build()
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
@@ -143,7 +143,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await;
// Selecting target of region migration.
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let region_migration_manager = cluster.metasrv.region_migration_manager();
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
@@ -243,7 +243,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
linger: Duration::from_millis(25),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -271,7 +271,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
let old_distribution = distribution.clone();
// Selecting target of region migration.
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let region_migration_manager = cluster.metasrv.region_migration_manager();
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
@@ -365,7 +365,7 @@ pub async fn test_region_migration_multiple_regions(
linger: Duration::from_millis(25),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -376,7 +376,7 @@ pub async fn test_region_migration_multiple_regions(
.build()
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
@@ -393,7 +393,7 @@ pub async fn test_region_migration_multiple_regions(
assert_eq!(distribution.len(), 2);
// Selecting target of region migration.
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let region_migration_manager = cluster.metasrv.region_migration_manager();
let (peer_1, peer_1_regions) = distribution.pop_first().unwrap();
let (peer_2, peer_2_regions) = distribution.pop_first().unwrap();
@@ -502,7 +502,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
linger: Duration::from_millis(25),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -513,7 +513,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
.build()
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
@@ -530,7 +530,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
assert_eq!(distribution.len(), 1);
// Selecting target of region migration.
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let region_migration_manager = cluster.metasrv.region_migration_manager();
let (from_peer_id, mut from_regions) = distribution.pop_first().unwrap();
let to_peer_id = 1;
let mut to_regions = Vec::new();
@@ -634,7 +634,7 @@ pub async fn test_region_migration_incorrect_from_peer(
linger: Duration::from_millis(25),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -645,7 +645,7 @@ pub async fn test_region_migration_incorrect_from_peer(
.build()
.await;
let logical_timer = 1685508715000;
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
@@ -659,7 +659,7 @@ pub async fn test_region_migration_incorrect_from_peer(
// The region distribution
let distribution = find_region_distribution(&table_metadata_manager, table_id).await;
assert_eq!(distribution.len(), 3);
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let region_migration_manager = cluster.metasrv.region_migration_manager();
let region_id = RegionId::new(table_id, 1);
@@ -709,7 +709,7 @@ pub async fn test_region_migration_incorrect_region_id(
linger: Duration::from_millis(25),
..Default::default()
}))
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -720,7 +720,7 @@ pub async fn test_region_migration_incorrect_region_id(
.build()
.await;
let logical_timer = 1685508715000;
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
@@ -734,7 +734,7 @@ pub async fn test_region_migration_incorrect_region_id(
// The region distribution
let distribution = find_region_distribution(&table_metadata_manager, table_id).await;
assert_eq!(distribution.len(), 3);
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let region_migration_manager = cluster.metasrv.region_migration_manager();
let region_id = RegionId::new(table_id, 5);