Skip to main content

frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37    ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38    StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use pipeline::pipeline_operator::PipelineOperator;
44use query::QueryEngineFactory;
45use query::region_query::RegionQueryHandlerFactoryRef;
46use snafu::{OptionExt, ResultExt};
47
48use crate::error::{self, ExternalSnafu, Result};
49use crate::events::EventHandlerImpl;
50use crate::frontend::FrontendOptions;
51use crate::heartbeat::frontend_peer_addr;
52use crate::instance::Instance;
53use crate::instance::region_query::FrontendRegionQueryHandler;
54
55/// The frontend [`Instance`] builder.
56pub struct FrontendBuilder {
57    options: FrontendOptions,
58    kv_backend: KvBackendRef,
59    layered_cache_registry: LayeredCacheRegistryRef,
60    local_cache_invalidator: Option<CacheInvalidatorRef>,
61    catalog_manager: CatalogManagerRef,
62    node_manager: NodeManagerRef,
63    plugins: Option<Plugins>,
64    procedure_executor: ProcedureExecutorRef,
65    process_manager: ProcessManagerRef,
66}
67
68impl FrontendBuilder {
69    #[allow(clippy::too_many_arguments)]
70    pub fn new(
71        options: FrontendOptions,
72        kv_backend: KvBackendRef,
73        layered_cache_registry: LayeredCacheRegistryRef,
74        catalog_manager: CatalogManagerRef,
75        node_manager: NodeManagerRef,
76        procedure_executor: ProcedureExecutorRef,
77        process_manager: ProcessManagerRef,
78    ) -> Self {
79        Self {
80            options,
81            kv_backend,
82            layered_cache_registry,
83            local_cache_invalidator: None,
84            catalog_manager,
85            node_manager,
86            plugins: None,
87            procedure_executor,
88            process_manager,
89        }
90    }
91
92    #[cfg(test)]
93    pub(crate) fn new_test(
94        options: &FrontendOptions,
95        meta_client: meta_client::MetaClientRef,
96    ) -> Self {
97        use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
98        use common_meta::cache::LayeredCacheRegistryBuilder;
99        use common_meta::kv_backend::memory::MemoryKvBackend;
100
101        let kv_backend = Arc::new(MemoryKvBackend::new());
102
103        // Builds cache registry
104        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
105        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
106        let layered_cache_registry = Arc::new(
107            with_default_composite_cache_registry(
108                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
109            )
110            .unwrap()
111            .build(),
112        );
113
114        Self::new(
115            options.clone(),
116            kv_backend,
117            layered_cache_registry,
118            catalog::memory::MemoryCatalogManager::with_default_setup(),
119            Arc::new(client::client_manager::NodeClients::default()),
120            meta_client,
121            Arc::new(catalog::process_manager::ProcessManager::new(
122                "".to_string(),
123                None,
124            )),
125        )
126    }
127
128    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
129        Self {
130            local_cache_invalidator: Some(cache_invalidator),
131            ..self
132        }
133    }
134
135    pub fn with_plugin(self, plugins: Plugins) -> Self {
136        Self {
137            plugins: Some(plugins),
138            ..self
139        }
140    }
141
142    pub async fn try_build(self) -> Result<Instance> {
143        let kv_backend = self.kv_backend;
144        let node_manager = self.node_manager;
145        let plugins = self.plugins.unwrap_or_default();
146        let process_manager = self.process_manager;
147        let table_route_cache: TableRouteCacheRef =
148            self.layered_cache_registry
149                .get()
150                .context(error::CacheRequiredSnafu {
151                    name: TABLE_ROUTE_CACHE_NAME,
152                })?;
153        let partition_info_cache: PartitionInfoCacheRef = self
154            .layered_cache_registry
155            .get()
156            .context(error::CacheRequiredSnafu {
157                name: PARTITION_INFO_CACHE_NAME,
158            })?;
159        let partition_manager = Arc::new(PartitionRuleManager::new(
160            kv_backend.clone(),
161            table_route_cache.clone(),
162            partition_info_cache.clone(),
163        ));
164
165        let local_cache_invalidator = self
166            .local_cache_invalidator
167            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
168
169        let region_query_handler =
170            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
171                factory.build(partition_manager.clone(), node_manager.clone())
172            } else {
173                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
174            };
175
176        let table_flownode_cache =
177            self.layered_cache_registry
178                .get()
179                .context(error::CacheRequiredSnafu {
180                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
181                })?;
182
183        let inserter = Arc::new(Inserter::new(
184            self.catalog_manager.clone(),
185            partition_manager.clone(),
186            node_manager.clone(),
187            table_flownode_cache,
188        ));
189        let deleter = Arc::new(Deleter::new(
190            self.catalog_manager.clone(),
191            partition_manager.clone(),
192            node_manager.clone(),
193        ));
194        let requester = Arc::new(Requester::new(
195            self.catalog_manager.clone(),
196            partition_manager.clone(),
197            node_manager.clone(),
198        ));
199        let table_mutation_handler = Arc::new(TableMutationOperator::new(
200            inserter.clone(),
201            deleter.clone(),
202            requester,
203        ));
204
205        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
206            self.procedure_executor.clone(),
207            self.catalog_manager.clone(),
208        ));
209
210        let flow_metadata_manager: Arc<FlowMetadataManager> =
211            Arc::new(FlowMetadataManager::new(kv_backend.clone()));
212        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
213
214        let query_engine = QueryEngineFactory::new_with_plugins(
215            self.catalog_manager.clone(),
216            Some(partition_manager.clone()),
217            Some(region_query_handler.clone()),
218            Some(table_mutation_handler),
219            Some(procedure_service_handler),
220            Some(Arc::new(flow_service)),
221            true,
222            plugins.clone(),
223            self.options.query.clone(),
224        )
225        .query_engine();
226
227        let frontend_peer_addr = frontend_peer_addr(&self.options);
228        let statement_executor = StatementExecutor::new(
229            self.catalog_manager.clone(),
230            query_engine.clone(),
231            self.procedure_executor,
232            kv_backend.clone(),
233            local_cache_invalidator,
234            inserter.clone(),
235            partition_manager,
236            Some(process_manager.clone()),
237            frontend_peer_addr.clone(),
238        );
239
240        let statement_executor =
241            if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
242                let ctx = ExecutorConfigureContext {
243                    kv_backend: kv_backend.clone(),
244                };
245                configurator
246                    .configure(statement_executor, ctx)
247                    .await
248                    .context(ExternalSnafu)?
249            } else {
250                statement_executor
251            };
252
253        let statement_executor = Arc::new(statement_executor);
254
255        let pipeline_operator = Arc::new(PipelineOperator::new(
256            inserter.clone(),
257            statement_executor.clone(),
258            self.catalog_manager.clone(),
259            query_engine.clone(),
260        ));
261
262        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
263
264        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
265            statement_executor.clone(),
266            self.options.slow_query.ttl,
267            self.options.event_recorder.ttl,
268        ))));
269
270        Ok(Instance {
271            frontend_peer_addr,
272            catalog_manager: self.catalog_manager,
273            pipeline_operator,
274            statement_executor,
275            query_engine,
276            plugins,
277            inserter,
278            deleter,
279            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
280            event_recorder: Some(event_recorder),
281            process_manager,
282            otlp_metrics_table_legacy_cache: DashMap::new(),
283            slow_query_options: self.options.slow_query.clone(),
284            suspend: Arc::new(AtomicBool::new(false)),
285        })
286    }
287}