mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 19:00:39 +00:00
refactor: datanode instance builder (#6034)
remove another piece of REPL codes
This commit is contained in:
@@ -15,9 +15,11 @@
|
||||
#![doc = include_str!("../../../../README.md")]
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use cmd::datanode::builder::InstanceBuilder;
|
||||
use cmd::error::{InitTlsProviderSnafu, Result};
|
||||
use cmd::options::GlobalOptions;
|
||||
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
|
||||
use common_base::Plugins;
|
||||
use common_version::version;
|
||||
use servers::install_ring_crypto_provider;
|
||||
|
||||
@@ -102,10 +104,10 @@ async fn main_body() -> Result<()> {
|
||||
async fn start(cli: Command) -> Result<()> {
|
||||
match cli.subcmd {
|
||||
SubCommand::Datanode(cmd) => {
|
||||
cmd.build(cmd.load_options(&cli.global_options)?)
|
||||
.await?
|
||||
.run()
|
||||
.await
|
||||
let opts = cmd.load_options(&cli.global_options)?;
|
||||
let plugins = Plugins::new();
|
||||
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
|
||||
cmd.build_with(builder).await?.run().await
|
||||
}
|
||||
SubCommand::Flownode(cmd) => {
|
||||
cmd.build(cmd.load_options(&cli.global_options)?)
|
||||
|
||||
@@ -12,33 +12,27 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub mod builder;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cache::build_datanode_cache_registry;
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_meta::cache::LayeredCacheRegistryBuilder;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::{info, warn};
|
||||
use common_version::{short_version, version};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use datanode::service::DatanodeServiceBuilder;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::Mode;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use datanode::datanode::Datanode;
|
||||
use meta_client::MetaClientOptions;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::builder::InstanceBuilder;
|
||||
use crate::error::{
|
||||
LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu,
|
||||
StartDatanodeSnafu,
|
||||
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
|
||||
};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::App;
|
||||
|
||||
pub const APP_NAME: &str = "greptime-datanode";
|
||||
|
||||
@@ -98,8 +92,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
|
||||
self.subcmd.build(opts).await
|
||||
pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
|
||||
self.subcmd.build_with(builder).await
|
||||
}
|
||||
|
||||
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
|
||||
@@ -115,9 +109,12 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
|
||||
async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Start(cmd) => cmd.build(opts).await,
|
||||
SubCommand::Start(cmd) => {
|
||||
info!("Building datanode with {:#?}", cmd);
|
||||
builder.build().await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -263,74 +260,6 @@ impl StartCommand {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
|
||||
common_runtime::init_global_runtimes(&opts.runtime);
|
||||
|
||||
let guard = common_telemetry::init_global_logging(
|
||||
APP_NAME,
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Datanode start command: {:#?}", self);
|
||||
info!("Datanode options: {:#?}", opts);
|
||||
|
||||
let plugin_opts = opts.plugins;
|
||||
let mut opts = opts.component;
|
||||
opts.grpc.detect_server_addr();
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let member_id = opts
|
||||
.node_id
|
||||
.context(MissingConfigSnafu { msg: "'node_id'" })?;
|
||||
|
||||
let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
|
||||
msg: "'meta_client_options'",
|
||||
})?;
|
||||
|
||||
let meta_client = meta_client::create_meta_client(
|
||||
MetaClientType::Datanode { member_id },
|
||||
meta_config,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
let meta_backend = Arc::new(MetaKvBackend {
|
||||
client: meta_client.clone(),
|
||||
});
|
||||
|
||||
// Builds cache registry for datanode.
|
||||
let layered_cache_registry = Arc::new(
|
||||
LayeredCacheRegistryBuilder::default()
|
||||
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
|
||||
.build(),
|
||||
);
|
||||
|
||||
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
|
||||
.with_meta_client(meta_client)
|
||||
.with_kv_backend(meta_backend)
|
||||
.with_cache_registry(layered_cache_registry)
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let services = DatanodeServiceBuilder::new(&opts)
|
||||
.with_default_grpc_server(&datanode.region_server())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
datanode.setup_services(services);
|
||||
|
||||
Ok(Instance::new(datanode, guard))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
138
src/cmd/src/datanode/builder.rs
Normal file
138
src/cmd/src/datanode/builder.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::build_datanode_cache_registry;
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::LayeredCacheRegistryBuilder;
|
||||
use common_telemetry::info;
|
||||
use common_version::{short_version, version};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use datanode::service::DatanodeServiceBuilder;
|
||||
use meta_client::MetaClientType;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
|
||||
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
|
||||
use crate::log_versions;
|
||||
|
||||
/// Builder for Datanode instance.
|
||||
pub struct InstanceBuilder {
|
||||
guard: Vec<WorkerGuard>,
|
||||
opts: DatanodeOptions,
|
||||
datanode_builder: DatanodeBuilder,
|
||||
}
|
||||
|
||||
impl InstanceBuilder {
|
||||
/// Try to create a new [InstanceBuilder], and do some initialization work like allocating
|
||||
/// runtime resources, setting up global logging and plugins, etc.
|
||||
pub async fn try_new_with_init(
|
||||
mut opts: DatanodeOptions,
|
||||
mut plugins: Plugins,
|
||||
) -> Result<Self> {
|
||||
let guard = Self::init(&mut opts, &mut plugins).await?;
|
||||
|
||||
let datanode_builder = Self::datanode_builder(&opts, plugins).await?;
|
||||
|
||||
Ok(Self {
|
||||
guard,
|
||||
opts,
|
||||
datanode_builder,
|
||||
})
|
||||
}
|
||||
|
||||
async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
|
||||
common_runtime::init_global_runtimes(&opts.runtime);
|
||||
|
||||
let dn_opts = &mut opts.component;
|
||||
let guard = common_telemetry::init_global_logging(
|
||||
APP_NAME,
|
||||
&dn_opts.logging,
|
||||
&dn_opts.tracing,
|
||||
dn_opts.node_id.map(|x| x.to_string()),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
dn_opts.grpc.detect_server_addr();
|
||||
|
||||
info!("Initialized Datanode instance with {:#?}", opts);
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result<DatanodeBuilder> {
|
||||
let dn_opts = &opts.component;
|
||||
|
||||
let member_id = dn_opts
|
||||
.node_id
|
||||
.context(MissingConfigSnafu { msg: "'node_id'" })?;
|
||||
let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu {
|
||||
msg: "meta client options",
|
||||
})?;
|
||||
let client = meta_client::create_meta_client(
|
||||
MetaClientType::Datanode { member_id },
|
||||
meta_client_options,
|
||||
Some(&plugins),
|
||||
)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
let backend = Arc::new(MetaKvBackend {
|
||||
client: client.clone(),
|
||||
});
|
||||
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
|
||||
|
||||
let registry = Arc::new(
|
||||
LayeredCacheRegistryBuilder::default()
|
||||
.add_cache_registry(build_datanode_cache_registry(backend))
|
||||
.build(),
|
||||
);
|
||||
builder
|
||||
.with_cache_registry(registry)
|
||||
.with_meta_client(client.clone());
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Get the mutable builder for Datanode, in case you want to change some fields before the
|
||||
/// final construction.
|
||||
pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder {
|
||||
&mut self.datanode_builder
|
||||
}
|
||||
|
||||
/// Try to build the Datanode instance.
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
let mut datanode = self
|
||||
.datanode_builder
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let services = DatanodeServiceBuilder::new(&self.opts.component)
|
||||
.with_default_grpc_server(&datanode.region_server())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
datanode.setup_services(services);
|
||||
|
||||
Ok(Instance::new(datanode, self.guard))
|
||||
}
|
||||
}
|
||||
@@ -177,9 +177,6 @@ pub enum Error {
|
||||
source: meta_srv::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid REPL command: {reason}"))]
|
||||
InvalidReplCommand { reason: String },
|
||||
|
||||
#[snafu(display("Failed to parse SQL: {}", sql))]
|
||||
ParseSql {
|
||||
sql: String,
|
||||
@@ -331,7 +328,6 @@ impl ErrorExt for Error {
|
||||
Error::MissingConfig { .. }
|
||||
| Error::LoadLayeredConfig { .. }
|
||||
| Error::IllegalConfig { .. }
|
||||
| Error::InvalidReplCommand { .. }
|
||||
| Error::InitTimezone { .. }
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
|
||||
@@ -75,7 +75,6 @@ use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
@@ -497,12 +496,9 @@ impl StartCommand {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
|
||||
.with_kv_backend(kv_backend.clone())
|
||||
.with_cache_registry(layered_cache_registry.clone())
|
||||
.build()
|
||||
.await
|
||||
.context(error::StartDatanodeSnafu)?;
|
||||
let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
|
||||
builder.with_cache_registry(layered_cache_registry.clone());
|
||||
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
|
||||
|
||||
let information_extension = Arc::new(StandaloneInformationExtension::new(
|
||||
datanode.region_server(),
|
||||
|
||||
Reference in New Issue
Block a user