1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::consts::FILE_ENGINE;
22use common_error::ext::BoxedError;
23use common_recordbatch::SendableRecordBatchStream;
24use common_telemetry::{error, info};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ensure};
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_engine::{
29 RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
30 RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
31 SettableRegionRoleState, SinglePartitionScanner, SyncRegionFromRequest, SyncRegionFromResponse,
32};
33use store_api::region_request::{
34 AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
35 RegionRequest,
36};
37use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
38use tokio::sync::Mutex;
39
40use crate::config::EngineConfig;
41use crate::error::{
42 RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, UnsupportedSnafu,
43};
44use crate::region::{FileRegion, FileRegionRef};
45
46pub struct FileRegionEngine {
47 inner: EngineInnerRef,
48}
49
50impl FileRegionEngine {
51 pub fn new(_config: EngineConfig, object_store: ObjectStore) -> Self {
52 Self {
53 inner: Arc::new(EngineInner::new(object_store)),
54 }
55 }
56
57 async fn handle_query(
58 &self,
59 region_id: RegionId,
60 request: ScanRequest,
61 ) -> Result<SendableRecordBatchStream, BoxedError> {
62 self.inner
63 .get_region(region_id)
64 .await
65 .context(RegionNotFoundSnafu { region_id })
66 .map_err(BoxedError::new)?
67 .query(request)
68 .map_err(BoxedError::new)
69 }
70}
71
72#[async_trait]
73impl RegionEngine for FileRegionEngine {
74 fn name(&self) -> &str {
75 FILE_ENGINE
76 }
77
78 async fn handle_request(
79 &self,
80 region_id: RegionId,
81 request: RegionRequest,
82 ) -> Result<RegionResponse, BoxedError> {
83 self.inner
84 .handle_request(region_id, request)
85 .await
86 .map_err(BoxedError::new)
87 }
88
89 async fn handle_query(
90 &self,
91 region_id: RegionId,
92 request: ScanRequest,
93 ) -> Result<RegionScannerRef, BoxedError> {
94 let stream = self.handle_query(region_id, request).await?;
95 let metadata = self.get_metadata(region_id).await?;
96 let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
98 Ok(scanner)
99 }
100
101 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
102 self.inner
103 .get_region(region_id)
104 .await
105 .map(|r| r.metadata())
106 .context(RegionNotFoundSnafu { region_id })
107 .map_err(BoxedError::new)
108 }
109
110 async fn stop(&self) -> Result<(), BoxedError> {
111 self.inner.stop().await.map_err(BoxedError::new)
112 }
113
114 fn region_statistic(&self, _: RegionId) -> Option<RegionStatistic> {
115 None
116 }
117
118 async fn get_committed_sequence(&self, _: RegionId) -> Result<SequenceNumber, BoxedError> {
119 Ok(Default::default())
120 }
121
122 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
123 self.inner
124 .set_region_role(region_id, role)
125 .map_err(BoxedError::new)
126 }
127
128 async fn set_region_role_state_gracefully(
129 &self,
130 region_id: RegionId,
131 _region_role_state: SettableRegionRoleState,
132 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
133 let exists = self.inner.get_region(region_id).await.is_some();
134
135 if exists {
136 Ok(SetRegionRoleStateResponse::success(
137 SetRegionRoleStateSuccess::file(),
138 ))
139 } else {
140 Ok(SetRegionRoleStateResponse::NotFound)
141 }
142 }
143
144 async fn sync_region(
145 &self,
146 _region_id: RegionId,
147 _request: SyncRegionFromRequest,
148 ) -> Result<SyncRegionFromResponse, BoxedError> {
149 Ok(SyncRegionFromResponse::NotSupported)
151 }
152
153 async fn remap_manifests(
154 &self,
155 _request: RemapManifestsRequest,
156 ) -> Result<RemapManifestsResponse, BoxedError> {
157 Err(BoxedError::new(
158 UnsupportedSnafu {
159 operation: "remap_manifests",
160 }
161 .build(),
162 ))
163 }
164
165 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
166 self.inner.state(region_id)
167 }
168
169 fn as_any(&self) -> &dyn Any {
170 self
171 }
172}
173
174struct EngineInner {
175 regions: RwLock<HashMap<RegionId, FileRegionRef>>,
179
180 region_mutex: Mutex<()>,
183
184 object_store: ObjectStore,
185}
186
187type EngineInnerRef = Arc<EngineInner>;
188
189impl EngineInner {
190 fn new(object_store: ObjectStore) -> Self {
191 Self {
192 regions: RwLock::new(HashMap::new()),
193 region_mutex: Mutex::new(()),
194 object_store,
195 }
196 }
197
198 async fn handle_request(
199 &self,
200 region_id: RegionId,
201 request: RegionRequest,
202 ) -> EngineResult<RegionResponse> {
203 let result = match request {
204 RegionRequest::Create(req) => self.handle_create(region_id, req).await,
205 RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
206 RegionRequest::Open(req) => self.handle_open(region_id, req).await,
207 RegionRequest::Close(req) => self.handle_close(region_id, req).await,
208 _ => UnsupportedSnafu {
209 operation: request.to_string(),
210 }
211 .fail(),
212 };
213 result.map(RegionResponse::new)
214 }
215
216 async fn stop(&self) -> EngineResult<()> {
217 let _lock = self.region_mutex.lock().await;
218 self.regions.write().unwrap().clear();
219 Ok(())
220 }
221
222 fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> {
223 Ok(())
225 }
226
227 fn state(&self, region_id: RegionId) -> Option<RegionRole> {
228 if self.regions.read().unwrap().get(®ion_id).is_some() {
229 Some(RegionRole::Leader)
230 } else {
231 None
232 }
233 }
234}
235
236impl EngineInner {
237 async fn handle_create(
238 &self,
239 region_id: RegionId,
240 request: RegionCreateRequest,
241 ) -> EngineResult<AffectedRows> {
242 ensure!(
243 request.engine == FILE_ENGINE,
244 UnexpectedEngineSnafu {
245 engine: request.engine
246 }
247 );
248
249 if self.exists(region_id).await {
250 return Ok(0);
251 }
252
253 info!("Try to create region, region_id: {}", region_id);
254
255 let _lock = self.region_mutex.lock().await;
256 if self.exists(region_id).await {
258 return Ok(0);
259 }
260
261 let res = FileRegion::create(region_id, request, &self.object_store).await;
262 let region = res.inspect_err(|err| {
263 error!(
264 err;
265 "Failed to create region, region_id: {}",
266 region_id
267 );
268 })?;
269 self.regions.write().unwrap().insert(region_id, region);
270
271 info!("A new region is created, region_id: {}", region_id);
272 Ok(0)
273 }
274
275 async fn handle_open(
276 &self,
277 region_id: RegionId,
278 request: RegionOpenRequest,
279 ) -> EngineResult<AffectedRows> {
280 if self.exists(region_id).await {
281 return Ok(0);
282 }
283
284 info!("Try to open region, region_id: {}", region_id);
285
286 let _lock = self.region_mutex.lock().await;
287 if self.exists(region_id).await {
289 return Ok(0);
290 }
291
292 let res = FileRegion::open(region_id, request, &self.object_store).await;
293 let region = res.inspect_err(|err| {
294 error!(
295 err;
296 "Failed to open region, region_id: {}",
297 region_id
298 );
299 })?;
300 self.regions.write().unwrap().insert(region_id, region);
301
302 info!("Region opened, region_id: {}", region_id);
303 Ok(0)
304 }
305
306 async fn handle_close(
307 &self,
308 region_id: RegionId,
309 _request: RegionCloseRequest,
310 ) -> EngineResult<AffectedRows> {
311 let _lock = self.region_mutex.lock().await;
312
313 let mut regions = self.regions.write().unwrap();
314 if regions.remove(®ion_id).is_some() {
315 info!("Region closed, region_id: {}", region_id);
316 }
317
318 Ok(0)
319 }
320
321 async fn handle_drop(
322 &self,
323 region_id: RegionId,
324 _request: RegionDropRequest,
325 ) -> EngineResult<AffectedRows> {
326 if !self.exists(region_id).await {
327 return RegionNotFoundSnafu { region_id }.fail();
328 }
329
330 info!("Try to drop region, region_id: {}", region_id);
331
332 let _lock = self.region_mutex.lock().await;
333
334 let region = self.get_region(region_id).await;
335 if let Some(region) = region {
336 let res = FileRegion::drop(®ion, &self.object_store).await;
337 res.inspect_err(|err| {
338 error!(
339 err;
340 "Failed to drop region, region_id: {}",
341 region_id
342 );
343 })?;
344 }
345 let _ = self.regions.write().unwrap().remove(®ion_id);
346
347 info!("Region dropped, region_id: {}", region_id);
348 Ok(0)
349 }
350
351 async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
352 self.regions.read().unwrap().get(®ion_id).cloned()
353 }
354
355 async fn exists(&self, region_id: RegionId) -> bool {
356 self.regions.read().unwrap().contains_key(®ion_id)
357 }
358}