1use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37 ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38 StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use pipeline::pipeline_operator::PipelineOperator;
44use query::QueryEngineFactory;
45use query::region_query::RegionQueryHandlerFactoryRef;
46use snafu::{OptionExt, ResultExt};
47
48use crate::error::{self, ExternalSnafu, Result};
49use crate::events::EventHandlerImpl;
50use crate::frontend::FrontendOptions;
51use crate::heartbeat::frontend_peer_addr;
52use crate::instance::Instance;
53use crate::instance::region_query::FrontendRegionQueryHandler;
54
55pub struct FrontendBuilder {
57 options: FrontendOptions,
58 kv_backend: KvBackendRef,
59 layered_cache_registry: LayeredCacheRegistryRef,
60 local_cache_invalidator: Option<CacheInvalidatorRef>,
61 catalog_manager: CatalogManagerRef,
62 node_manager: NodeManagerRef,
63 plugins: Option<Plugins>,
64 procedure_executor: ProcedureExecutorRef,
65 process_manager: ProcessManagerRef,
66}
67
68impl FrontendBuilder {
69 #[allow(clippy::too_many_arguments)]
70 pub fn new(
71 options: FrontendOptions,
72 kv_backend: KvBackendRef,
73 layered_cache_registry: LayeredCacheRegistryRef,
74 catalog_manager: CatalogManagerRef,
75 node_manager: NodeManagerRef,
76 procedure_executor: ProcedureExecutorRef,
77 process_manager: ProcessManagerRef,
78 ) -> Self {
79 Self {
80 options,
81 kv_backend,
82 layered_cache_registry,
83 local_cache_invalidator: None,
84 catalog_manager,
85 node_manager,
86 plugins: None,
87 procedure_executor,
88 process_manager,
89 }
90 }
91
92 #[cfg(test)]
93 pub(crate) fn new_test(
94 options: &FrontendOptions,
95 meta_client: meta_client::MetaClientRef,
96 ) -> Self {
97 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
98 use common_meta::cache::LayeredCacheRegistryBuilder;
99 use common_meta::kv_backend::memory::MemoryKvBackend;
100
101 let kv_backend = Arc::new(MemoryKvBackend::new());
102
103 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
105 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
106 let layered_cache_registry = Arc::new(
107 with_default_composite_cache_registry(
108 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
109 )
110 .unwrap()
111 .build(),
112 );
113
114 Self::new(
115 options.clone(),
116 kv_backend,
117 layered_cache_registry,
118 catalog::memory::MemoryCatalogManager::with_default_setup(),
119 Arc::new(client::client_manager::NodeClients::default()),
120 meta_client,
121 Arc::new(catalog::process_manager::ProcessManager::new(
122 "".to_string(),
123 None,
124 )),
125 )
126 }
127
128 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
129 Self {
130 local_cache_invalidator: Some(cache_invalidator),
131 ..self
132 }
133 }
134
135 pub fn with_plugin(self, plugins: Plugins) -> Self {
136 Self {
137 plugins: Some(plugins),
138 ..self
139 }
140 }
141
142 pub async fn try_build(self) -> Result<Instance> {
143 let kv_backend = self.kv_backend;
144 let node_manager = self.node_manager;
145 let plugins = self.plugins.unwrap_or_default();
146 let process_manager = self.process_manager;
147 let table_route_cache: TableRouteCacheRef =
148 self.layered_cache_registry
149 .get()
150 .context(error::CacheRequiredSnafu {
151 name: TABLE_ROUTE_CACHE_NAME,
152 })?;
153 let partition_info_cache: PartitionInfoCacheRef = self
154 .layered_cache_registry
155 .get()
156 .context(error::CacheRequiredSnafu {
157 name: PARTITION_INFO_CACHE_NAME,
158 })?;
159 let partition_manager = Arc::new(PartitionRuleManager::new(
160 kv_backend.clone(),
161 table_route_cache.clone(),
162 partition_info_cache.clone(),
163 ));
164
165 let local_cache_invalidator = self
166 .local_cache_invalidator
167 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
168
169 let region_query_handler =
170 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
171 factory.build(partition_manager.clone(), node_manager.clone())
172 } else {
173 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
174 };
175
176 let table_flownode_cache =
177 self.layered_cache_registry
178 .get()
179 .context(error::CacheRequiredSnafu {
180 name: TABLE_FLOWNODE_SET_CACHE_NAME,
181 })?;
182
183 let inserter = Arc::new(Inserter::new(
184 self.catalog_manager.clone(),
185 partition_manager.clone(),
186 node_manager.clone(),
187 table_flownode_cache,
188 ));
189 let deleter = Arc::new(Deleter::new(
190 self.catalog_manager.clone(),
191 partition_manager.clone(),
192 node_manager.clone(),
193 ));
194 let requester = Arc::new(Requester::new(
195 self.catalog_manager.clone(),
196 partition_manager.clone(),
197 node_manager.clone(),
198 ));
199 let table_mutation_handler = Arc::new(TableMutationOperator::new(
200 inserter.clone(),
201 deleter.clone(),
202 requester,
203 ));
204
205 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
206 self.procedure_executor.clone(),
207 self.catalog_manager.clone(),
208 ));
209
210 let flow_metadata_manager: Arc<FlowMetadataManager> =
211 Arc::new(FlowMetadataManager::new(kv_backend.clone()));
212 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
213
214 let query_engine = QueryEngineFactory::new_with_plugins(
215 self.catalog_manager.clone(),
216 Some(partition_manager.clone()),
217 Some(region_query_handler.clone()),
218 Some(table_mutation_handler),
219 Some(procedure_service_handler),
220 Some(Arc::new(flow_service)),
221 true,
222 plugins.clone(),
223 self.options.query.clone(),
224 )
225 .query_engine();
226
227 let frontend_peer_addr = frontend_peer_addr(&self.options);
228 let statement_executor = StatementExecutor::new(
229 self.catalog_manager.clone(),
230 query_engine.clone(),
231 self.procedure_executor,
232 kv_backend.clone(),
233 local_cache_invalidator,
234 inserter.clone(),
235 partition_manager,
236 Some(process_manager.clone()),
237 frontend_peer_addr.clone(),
238 );
239
240 let statement_executor =
241 if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
242 let ctx = ExecutorConfigureContext {
243 kv_backend: kv_backend.clone(),
244 };
245 configurator
246 .configure(statement_executor, ctx)
247 .await
248 .context(ExternalSnafu)?
249 } else {
250 statement_executor
251 };
252
253 let statement_executor = Arc::new(statement_executor);
254
255 let pipeline_operator = Arc::new(PipelineOperator::new(
256 inserter.clone(),
257 statement_executor.clone(),
258 self.catalog_manager.clone(),
259 query_engine.clone(),
260 ));
261
262 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
263
264 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
265 statement_executor.clone(),
266 self.options.slow_query.ttl,
267 self.options.event_recorder.ttl,
268 ))));
269
270 Ok(Instance {
271 frontend_peer_addr,
272 catalog_manager: self.catalog_manager,
273 pipeline_operator,
274 statement_executor,
275 query_engine,
276 plugins,
277 inserter,
278 deleter,
279 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
280 event_recorder: Some(event_recorder),
281 process_manager,
282 otlp_metrics_table_legacy_cache: DashMap::new(),
283 slow_query_options: self.options.slow_query.clone(),
284 suspend: Arc::new(AtomicBool::new(false)),
285 })
286 }
287}