1use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37 ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38 StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use pipeline::pipeline_operator::PipelineOperator;
44use query::QueryEngineFactory;
45use query::region_query::RegionQueryHandlerFactoryRef;
46use snafu::{OptionExt, ResultExt};
47
48use crate::error::{self, ExternalSnafu, Result};
49use crate::events::EventHandlerImpl;
50use crate::frontend::FrontendOptions;
51use crate::heartbeat::frontend_peer_addr;
52use crate::instance::Instance;
53use crate::instance::region_query::FrontendRegionQueryHandler;
54
55pub struct FrontendBuilder {
57 options: FrontendOptions,
58 kv_backend: KvBackendRef,
59 layered_cache_registry: LayeredCacheRegistryRef,
60 local_cache_invalidator: Option<CacheInvalidatorRef>,
61 catalog_manager: CatalogManagerRef,
62 node_manager: NodeManagerRef,
63 plugins: Option<Plugins>,
64 procedure_executor: ProcedureExecutorRef,
65 process_manager: ProcessManagerRef,
66}
67
68impl FrontendBuilder {
69 #[allow(clippy::too_many_arguments)]
70 pub fn new(
71 options: FrontendOptions,
72 kv_backend: KvBackendRef,
73 layered_cache_registry: LayeredCacheRegistryRef,
74 catalog_manager: CatalogManagerRef,
75 node_manager: NodeManagerRef,
76 procedure_executor: ProcedureExecutorRef,
77 process_manager: ProcessManagerRef,
78 ) -> Self {
79 Self {
80 options,
81 kv_backend,
82 layered_cache_registry,
83 local_cache_invalidator: None,
84 catalog_manager,
85 node_manager,
86 plugins: None,
87 procedure_executor,
88 process_manager,
89 }
90 }
91
92 #[cfg(test)]
93 pub(crate) fn new_test(
94 options: &FrontendOptions,
95 meta_client: meta_client::MetaClientRef,
96 ) -> Self {
97 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
98 use common_meta::cache::LayeredCacheRegistryBuilder;
99 use common_meta::kv_backend::memory::MemoryKvBackend;
100
101 let kv_backend = Arc::new(MemoryKvBackend::new());
102
103 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
105 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
106 let layered_cache_registry = Arc::new(
107 with_default_composite_cache_registry(
108 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
109 )
110 .unwrap()
111 .build(),
112 );
113
114 Self::new(
115 options.clone(),
116 kv_backend,
117 layered_cache_registry,
118 catalog::memory::MemoryCatalogManager::with_default_setup(),
119 Arc::new(client::client_manager::NodeClients::default()),
120 meta_client,
121 Arc::new(catalog::process_manager::ProcessManager::new(
122 "".to_string(),
123 None,
124 )),
125 )
126 }
127
128 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
129 Self {
130 local_cache_invalidator: Some(cache_invalidator),
131 ..self
132 }
133 }
134
135 pub fn options(&self) -> &FrontendOptions {
136 &self.options
137 }
138
139 pub fn kv_backend(&self) -> &KvBackendRef {
140 &self.kv_backend
141 }
142
143 pub fn layered_cache_registry(&self) -> &LayeredCacheRegistryRef {
144 &self.layered_cache_registry
145 }
146
147 pub fn catalog_manager(&self) -> &CatalogManagerRef {
148 &self.catalog_manager
149 }
150
151 pub fn node_manager(&self) -> &NodeManagerRef {
152 &self.node_manager
153 }
154
155 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
156 &self.procedure_executor
157 }
158
159 pub fn process_manager(&self) -> &ProcessManagerRef {
160 &self.process_manager
161 }
162
163 pub fn with_plugin(self, plugins: Plugins) -> Self {
164 Self {
165 plugins: Some(plugins),
166 ..self
167 }
168 }
169
170 pub async fn try_build(self) -> Result<Instance> {
171 let kv_backend = self.kv_backend;
172 let node_manager = self.node_manager;
173 let plugins = self.plugins.unwrap_or_default();
174 let process_manager = self.process_manager;
175 let table_route_cache: TableRouteCacheRef =
176 self.layered_cache_registry
177 .get()
178 .context(error::CacheRequiredSnafu {
179 name: TABLE_ROUTE_CACHE_NAME,
180 })?;
181 let partition_info_cache: PartitionInfoCacheRef = self
182 .layered_cache_registry
183 .get()
184 .context(error::CacheRequiredSnafu {
185 name: PARTITION_INFO_CACHE_NAME,
186 })?;
187 let partition_manager = Arc::new(PartitionRuleManager::new(
188 kv_backend.clone(),
189 table_route_cache.clone(),
190 partition_info_cache.clone(),
191 ));
192
193 let local_cache_invalidator = self
194 .local_cache_invalidator
195 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
196
197 let region_query_handler =
198 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
199 factory.build(partition_manager.clone(), node_manager.clone())
200 } else {
201 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
202 };
203
204 let table_flownode_cache =
205 self.layered_cache_registry
206 .get()
207 .context(error::CacheRequiredSnafu {
208 name: TABLE_FLOWNODE_SET_CACHE_NAME,
209 })?;
210
211 let inserter = Arc::new(Inserter::new(
212 self.catalog_manager.clone(),
213 partition_manager.clone(),
214 node_manager.clone(),
215 table_flownode_cache,
216 self.options.auto_create_table,
217 ));
218 let deleter = Arc::new(Deleter::new(
219 self.catalog_manager.clone(),
220 partition_manager.clone(),
221 node_manager.clone(),
222 ));
223 let requester = Arc::new(Requester::new(
224 self.catalog_manager.clone(),
225 partition_manager.clone(),
226 node_manager.clone(),
227 ));
228 let table_mutation_handler = Arc::new(TableMutationOperator::new(
229 inserter.clone(),
230 deleter.clone(),
231 requester,
232 ));
233
234 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
235 self.procedure_executor.clone(),
236 self.catalog_manager.clone(),
237 ));
238
239 let flow_metadata_manager: Arc<FlowMetadataManager> =
240 Arc::new(FlowMetadataManager::new(kv_backend.clone()));
241 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
242
243 let query_engine = QueryEngineFactory::new_with_plugins(
244 self.catalog_manager.clone(),
245 Some(partition_manager.clone()),
246 Some(region_query_handler.clone()),
247 Some(table_mutation_handler),
248 Some(procedure_service_handler),
249 Some(Arc::new(flow_service)),
250 true,
251 plugins.clone(),
252 self.options.query.clone(),
253 )
254 .query_engine();
255
256 let frontend_peer_addr = frontend_peer_addr(&self.options);
257 let statement_executor = StatementExecutor::new(
258 self.catalog_manager.clone(),
259 query_engine.clone(),
260 self.procedure_executor,
261 kv_backend.clone(),
262 local_cache_invalidator,
263 inserter.clone(),
264 partition_manager,
265 Some(process_manager.clone()),
266 frontend_peer_addr.clone(),
267 );
268
269 let statement_executor =
270 if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
271 let ctx = ExecutorConfigureContext {
272 kv_backend: kv_backend.clone(),
273 };
274 configurator
275 .configure(statement_executor, ctx)
276 .await
277 .context(ExternalSnafu)?
278 } else {
279 statement_executor
280 };
281
282 let statement_executor = Arc::new(statement_executor);
283
284 let pipeline_operator = Arc::new(PipelineOperator::new(
285 inserter.clone(),
286 statement_executor.clone(),
287 self.catalog_manager.clone(),
288 query_engine.clone(),
289 ));
290
291 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
292
293 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
294 statement_executor.clone(),
295 self.options.slow_query.ttl,
296 self.options.event_recorder.ttl,
297 ))));
298
299 Ok(Instance {
300 frontend_peer_addr,
301 catalog_manager: self.catalog_manager,
302 pipeline_operator,
303 statement_executor,
304 query_engine,
305 plugins,
306 inserter,
307 deleter,
308 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
309 event_recorder: Some(event_recorder),
310 process_manager,
311 otlp_metrics_table_legacy_cache: DashMap::new(),
312 slow_query_options: self.options.slow_query.clone(),
313 influxdb_default_merge_mode: self.options.influxdb.default_merge_mode,
314 suspend: Arc::new(AtomicBool::new(false)),
315 })
316 }
317}