Skip to main content

cmd/datanode/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use cache::build_datanode_cache_registry;
18use catalog::kvbackend::new_read_only_meta_kv_backend;
19use common_base::Plugins;
20use common_meta::cache::LayeredCacheRegistryBuilder;
21use common_telemetry::info;
22use common_version::{short_version, verbose_version};
23use datanode::datanode::DatanodeBuilder;
24use datanode::service::DatanodeServiceBuilder;
25use meta_client::MetaClientType;
26use snafu::{OptionExt, ResultExt};
27use tracing_appender::non_blocking::WorkerGuard;
28
29use crate::datanode::{APP_NAME, DatanodeOptions, Instance};
30use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
31use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
32
33/// Builder for Datanode instance.
34pub struct InstanceBuilder {
35    guard: Vec<WorkerGuard>,
36    opts: DatanodeOptions,
37    datanode_builder: DatanodeBuilder,
38}
39
40impl InstanceBuilder {
41    /// Try to create a new [InstanceBuilder], and do some initialization work like allocating
42    /// runtime resources, setting up global logging and plugins, etc.
43    pub async fn try_new_with_init(
44        mut opts: DatanodeOptions,
45        mut plugins: Plugins,
46    ) -> Result<Self> {
47        let guard = Self::init(&mut opts, &mut plugins).await?;
48
49        let mut datanode_builder = Self::datanode_builder(&opts, &plugins).await?;
50
51        plugins::setup_datanode_plugins_post_build(&mut plugins, &opts.plugins, &datanode_builder)
52            .await
53            .context(StartDatanodeSnafu)?;
54        datanode_builder.set_plugins(plugins);
55
56        Ok(Self {
57            guard,
58            opts,
59            datanode_builder,
60        })
61    }
62
63    async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
64        common_runtime::init_global_runtimes(&opts.runtime);
65        common_runtime::init_datanode_runtimes(&opts.runtime);
66
67        let dn_opts = &mut opts.component;
68        let guard = common_telemetry::init_global_logging(
69            APP_NAME,
70            &dn_opts.logging,
71            &dn_opts.tracing,
72            dn_opts.node_id.map(|x| x.to_string()),
73            None,
74        );
75
76        log_versions(verbose_version(), short_version(), APP_NAME);
77        maybe_activate_heap_profile(&dn_opts.memory);
78        create_resource_limit_metrics(APP_NAME);
79
80        plugins::setup_datanode_plugins_pre_build(plugins, &opts.plugins, dn_opts)
81            .await
82            .context(StartDatanodeSnafu)?;
83
84        dn_opts.grpc.detect_server_addr();
85
86        info!("Initialized Datanode instance with {:#?}", opts);
87        Ok(guard)
88    }
89
90    async fn datanode_builder(
91        opts: &DatanodeOptions,
92        plugins: &Plugins,
93    ) -> Result<DatanodeBuilder> {
94        let dn_opts = &opts.component;
95
96        let member_id = dn_opts
97            .node_id
98            .context(MissingConfigSnafu { msg: "'node_id'" })?;
99        let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu {
100            msg: "meta client options",
101        })?;
102        let client = meta_client::create_meta_client(
103            MetaClientType::Datanode { member_id },
104            meta_client_options,
105            Some(plugins),
106            None,
107        )
108        .await
109        .context(MetaClientInitSnafu)?;
110
111        let backend = new_read_only_meta_kv_backend(client.clone());
112        let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
113
114        let registry = Arc::new(
115            LayeredCacheRegistryBuilder::default()
116                .add_cache_registry(build_datanode_cache_registry(backend))
117                .build(),
118        );
119        builder
120            .with_cache_registry(registry)
121            .with_meta_client(client.clone());
122        Ok(builder)
123    }
124
125    /// Get the mutable builder for Datanode, in case you want to change some fields before the
126    /// final construction.
127    pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder {
128        &mut self.datanode_builder
129    }
130
131    /// Try to build the Datanode instance.
132    pub async fn build(self) -> Result<Instance> {
133        let mut datanode = self
134            .datanode_builder
135            .build()
136            .await
137            .context(StartDatanodeSnafu)?;
138
139        let services = DatanodeServiceBuilder::new(&self.opts.component)
140            .with_default_grpc_server(&datanode.region_server())
141            .enable_http_service()
142            .build()
143            .context(StartDatanodeSnafu)?;
144        datanode.setup_services(services);
145
146        Ok(Instance::new(datanode, self.guard))
147    }
148}