1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::cache::manifest_cache::ManifestCache;
34use crate::error::{self, Result};
35use crate::metrics::UPLOAD_BYTES_TOTAL;
36use crate::region::opener::RegionLoadCacheTask;
37use crate::sst::file::RegionFileId;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::index::intermediate::IntermediateManager;
40use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
44
45pub struct WriteCache {
49 file_cache: FileCacheRef,
51 puffin_manager_factory: PuffinManagerFactory,
53 intermediate_manager: IntermediateManager,
55 task_sender: UnboundedSender<RegionLoadCacheTask>,
57 manifest_cache: Option<ManifestCache>,
59}
60
61pub type WriteCacheRef = Arc<WriteCache>;
62
63impl WriteCache {
64 #[allow(clippy::too_many_arguments)]
67 pub async fn new(
68 local_store: ObjectStore,
69 cache_capacity: ReadableSize,
70 ttl: Option<Duration>,
71 index_cache_percent: Option<u8>,
72 enable_background_worker: bool,
73 puffin_manager_factory: PuffinManagerFactory,
74 intermediate_manager: IntermediateManager,
75 manifest_cache: Option<ManifestCache>,
76 ) -> Result<Self> {
77 let (task_sender, task_receiver) = unbounded_channel();
78
79 let file_cache = Arc::new(FileCache::new(
80 local_store,
81 cache_capacity,
82 ttl,
83 index_cache_percent,
84 enable_background_worker,
85 ));
86 file_cache.recover(false, Some(task_receiver)).await;
87
88 Ok(Self {
89 file_cache,
90 puffin_manager_factory,
91 intermediate_manager,
92 task_sender,
93 manifest_cache,
94 })
95 }
96
97 #[allow(clippy::too_many_arguments)]
99 pub async fn new_fs(
100 cache_dir: &str,
101 cache_capacity: ReadableSize,
102 ttl: Option<Duration>,
103 index_cache_percent: Option<u8>,
104 enable_background_worker: bool,
105 puffin_manager_factory: PuffinManagerFactory,
106 intermediate_manager: IntermediateManager,
107 manifest_cache_capacity: ReadableSize,
108 ) -> Result<Self> {
109 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
110
111 let local_store = new_fs_cache_store(cache_dir).await?;
112
113 let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
115 Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl).await)
116 } else {
117 None
118 };
119
120 Self::new(
121 local_store,
122 cache_capacity,
123 ttl,
124 index_cache_percent,
125 enable_background_worker,
126 puffin_manager_factory,
127 intermediate_manager,
128 manifest_cache,
129 )
130 .await
131 }
132
133 pub(crate) fn file_cache(&self) -> FileCacheRef {
135 self.file_cache.clone()
136 }
137
138 pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
140 self.manifest_cache.clone()
141 }
142
143 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
145 let store = self.file_cache.local_store();
146 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
147 self.puffin_manager_factory.build(store, path_provider)
148 }
149
150 pub(crate) async fn put_and_upload_sst(
152 &self,
153 data: &bytes::Bytes,
154 region_id: RegionId,
155 sst_info: &SstInfo,
156 upload_request: SstUploadRequest,
157 ) -> Result<Metrics> {
158 let file_id = sst_info.file_id;
159 let mut metrics = Metrics::new(WriteType::Flush);
160
161 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
163
164 let cache_start = Instant::now();
166 let cache_path = self.file_cache.cache_file_path(parquet_key);
167 let store = self.file_cache.local_store();
168 let cleaner = TempFileCleaner::new(region_id, store.clone());
169 let write_res = store
170 .write(&cache_path, data.clone())
171 .await
172 .context(crate::error::OpenDalSnafu);
173 if let Err(e) = write_res {
174 cleaner.clean_by_file_id(file_id).await;
175 return Err(e);
176 }
177
178 metrics.write_batch = cache_start.elapsed();
179
180 let upload_start = Instant::now();
182 let region_file_id = RegionFileId::new(region_id, file_id);
183 let remote_path = upload_request
184 .dest_path_provider
185 .build_sst_file_path(region_file_id);
186
187 if let Err(e) = self
188 .upload(parquet_key, &remote_path, &upload_request.remote_store)
189 .await
190 {
191 self.remove(parquet_key).await;
193 return Err(e);
194 }
195
196 metrics.upload_parquet = upload_start.elapsed();
197 Ok(metrics)
198 }
199
200 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
202 &self.intermediate_manager
203 }
204
205 pub(crate) async fn write_and_upload_sst(
207 &self,
208 write_request: SstWriteRequest,
209 upload_request: SstUploadRequest,
210 write_opts: &WriteOptions,
211 metrics: &mut Metrics,
212 ) -> Result<SstInfoArray> {
213 let region_id = write_request.metadata.region_id;
214
215 let store = self.file_cache.local_store();
216 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
217 let indexer = IndexerBuilderImpl {
218 build_type: write_request.op_type.into(),
219 metadata: write_request.metadata.clone(),
220 row_group_size: write_opts.row_group_size,
221 puffin_manager: self
222 .puffin_manager_factory
223 .build(store.clone(), path_provider.clone()),
224 write_cache_enabled: true,
225 intermediate_manager: self.intermediate_manager.clone(),
226 index_options: write_request.index_options,
227 inverted_index_config: write_request.inverted_index_config,
228 fulltext_index_config: write_request.fulltext_index_config,
229 bloom_filter_index_config: write_request.bloom_filter_index_config,
230 };
231
232 let cleaner = TempFileCleaner::new(region_id, store.clone());
233 let mut writer = ParquetWriter::new_with_object_store(
235 store.clone(),
236 write_request.metadata,
237 write_request.index_config,
238 indexer,
239 path_provider.clone(),
240 metrics,
241 )
242 .await
243 .with_file_cleaner(cleaner);
244
245 let sst_info = match write_request.source {
246 either::Left(source) => {
247 writer
248 .write_all(source, write_request.max_sequence, write_opts)
249 .await?
250 }
251 either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
252 };
253
254 if sst_info.is_empty() {
256 return Ok(sst_info);
257 }
258
259 let mut upload_tracker = UploadTracker::new(region_id);
260 let mut err = None;
261 let remote_store = &upload_request.remote_store;
262 for sst in &sst_info {
263 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
264 let parquet_path = upload_request
265 .dest_path_provider
266 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
267 let start = Instant::now();
268 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
269 err = Some(e);
270 break;
271 }
272 metrics.upload_parquet += start.elapsed();
273 upload_tracker.push_uploaded_file(parquet_path);
274
275 if sst.index_metadata.file_size > 0 {
276 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
277 let puffin_path = upload_request
278 .dest_path_provider
279 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
280 let start = Instant::now();
281 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
282 err = Some(e);
283 break;
284 }
285 metrics.upload_puffin += start.elapsed();
286 upload_tracker.push_uploaded_file(puffin_path);
287 }
288 }
289
290 if let Some(err) = err {
291 upload_tracker
293 .clean(&sst_info, &self.file_cache, remote_store)
294 .await;
295 return Err(err);
296 }
297
298 Ok(sst_info)
299 }
300
301 pub(crate) async fn remove(&self, index_key: IndexKey) {
303 self.file_cache.remove(index_key).await
304 }
305
306 pub(crate) async fn download(
309 &self,
310 index_key: IndexKey,
311 remote_path: &str,
312 remote_store: &ObjectStore,
313 file_size: u64,
314 ) -> Result<()> {
315 self.file_cache
316 .download(index_key, remote_path, remote_store, file_size)
317 .await
318 }
319
320 pub(crate) async fn upload(
322 &self,
323 index_key: IndexKey,
324 upload_path: &str,
325 remote_store: &ObjectStore,
326 ) -> Result<()> {
327 let region_id = index_key.region_id;
328 let file_id = index_key.file_id;
329 let file_type = index_key.file_type;
330 let cache_path = self.file_cache.cache_file_path(index_key);
331
332 let start = Instant::now();
333 let cached_value = self
334 .file_cache
335 .local_store()
336 .stat(&cache_path)
337 .await
338 .context(error::OpenDalSnafu)?;
339 let reader = self
340 .file_cache
341 .local_store()
342 .reader(&cache_path)
343 .await
344 .context(error::OpenDalSnafu)?
345 .into_futures_async_read(0..cached_value.content_length())
346 .await
347 .context(error::OpenDalSnafu)?;
348
349 let mut writer = remote_store
350 .writer_with(upload_path)
351 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
352 .concurrent(DEFAULT_WRITE_CONCURRENCY)
353 .await
354 .context(error::OpenDalSnafu)?
355 .into_futures_async_write();
356
357 let bytes_written =
358 futures::io::copy(reader, &mut writer)
359 .await
360 .context(error::UploadSnafu {
361 region_id,
362 file_id,
363 file_type,
364 })?;
365
366 writer.close().await.context(error::UploadSnafu {
368 region_id,
369 file_id,
370 file_type,
371 })?;
372
373 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
374
375 debug!(
376 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
377 region_id,
378 file_id,
379 upload_path,
380 start.elapsed(),
381 );
382
383 let index_value = IndexValue {
384 file_size: bytes_written as _,
385 };
386 self.file_cache.put(index_key, index_value).await;
388
389 Ok(())
390 }
391
392 pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
396 let _ = self.task_sender.send(task);
397 }
398}
399
400pub struct SstUploadRequest {
402 pub dest_path_provider: RegionFilePathFactory,
404 pub remote_store: ObjectStore,
406}
407
408pub(crate) struct UploadTracker {
410 region_id: RegionId,
412 files_uploaded: Vec<String>,
414}
415
416impl UploadTracker {
417 pub(crate) fn new(region_id: RegionId) -> Self {
419 Self {
420 region_id,
421 files_uploaded: Vec::new(),
422 }
423 }
424
425 pub(crate) fn push_uploaded_file(&mut self, path: String) {
427 self.files_uploaded.push(path);
428 }
429
430 pub(crate) async fn clean(
432 &self,
433 sst_info: &SstInfoArray,
434 file_cache: &FileCacheRef,
435 remote_store: &ObjectStore,
436 ) {
437 common_telemetry::info!(
438 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
439 self.region_id,
440 sst_info.len()
441 );
442
443 for sst in sst_info {
445 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
446 file_cache.remove(parquet_key).await;
447
448 if sst.index_metadata.file_size > 0 {
449 let puffin_key = IndexKey::new(
450 self.region_id,
451 sst.file_id,
452 FileType::Puffin(sst.index_metadata.version),
453 );
454 file_cache.remove(puffin_key).await;
455 }
456 }
457
458 for file_path in &self.files_uploaded {
460 if let Err(e) = remote_store.delete(file_path).await {
461 common_telemetry::error!(e; "Failed to delete file {}", file_path);
462 }
463 }
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use common_test_util::temp_dir::create_temp_dir;
470 use object_store::ATOMIC_WRITE_DIR;
471 use store_api::region_request::PathType;
472
473 use super::*;
474 use crate::access_layer::OperationType;
475 use crate::cache::test_util::new_fs_store;
476 use crate::cache::{CacheManager, CacheStrategy};
477 use crate::error::InvalidBatchSnafu;
478 use crate::read::Source;
479 use crate::region::options::IndexOptions;
480 use crate::sst::parquet::reader::ParquetReaderBuilder;
481 use crate::test_util::TestEnv;
482 use crate::test_util::sst_util::{
483 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
484 sst_region_metadata,
485 };
486
487 #[tokio::test]
488 async fn test_write_and_upload_sst() {
489 let mut env = TestEnv::new().await;
492 let mock_store = env.init_object_store_manager();
493 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
494
495 let local_dir = create_temp_dir("");
496 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
497
498 let write_cache = env
499 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
500 .await;
501
502 let metadata = Arc::new(sst_region_metadata());
504 let region_id = metadata.region_id;
505 let source = new_source(&[
506 new_batch_by_range(&["a", "d"], 0, 60),
507 new_batch_by_range(&["b", "f"], 0, 40),
508 new_batch_by_range(&["b", "h"], 100, 200),
509 ]);
510
511 let write_request = SstWriteRequest {
512 op_type: OperationType::Flush,
513 metadata,
514 source: either::Left(source),
515 storage: None,
516 max_sequence: None,
517 cache_manager: Default::default(),
518 index_options: IndexOptions::default(),
519 index_config: Default::default(),
520 inverted_index_config: Default::default(),
521 fulltext_index_config: Default::default(),
522 bloom_filter_index_config: Default::default(),
523 };
524
525 let upload_request = SstUploadRequest {
526 dest_path_provider: path_provider.clone(),
527 remote_store: mock_store.clone(),
528 };
529
530 let write_opts = WriteOptions {
531 row_group_size: 512,
532 ..Default::default()
533 };
534
535 let mut metrics = Metrics::new(WriteType::Flush);
537 let mut sst_infos = write_cache
538 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
539 .await
540 .unwrap();
541 let sst_info = sst_infos.remove(0);
542
543 let file_id = sst_info.file_id;
544 let sst_upload_path =
545 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
546 let index_upload_path =
547 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
548
549 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
551 assert!(write_cache.file_cache.contains_key(&key));
552
553 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
555 let cache_data = local_store
556 .read(&write_cache.file_cache.cache_file_path(key))
557 .await
558 .unwrap();
559 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
560
561 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
563 assert!(write_cache.file_cache.contains_key(&index_key));
564
565 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
566 let cache_index_data = local_store
567 .read(&write_cache.file_cache.cache_file_path(index_key))
568 .await
569 .unwrap();
570 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
571
572 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
574 write_cache.remove(sst_index_key).await;
575 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
576 write_cache.remove(index_key).await;
577 assert!(!write_cache.file_cache.contains_key(&index_key));
578 }
579
580 #[tokio::test]
581 async fn test_read_metadata_from_write_cache() {
582 common_telemetry::init_default_ut_logging();
583 let mut env = TestEnv::new().await;
584 let data_home = env.data_home().display().to_string();
585 let mock_store = env.init_object_store_manager();
586
587 let local_dir = create_temp_dir("");
588 let local_path = local_dir.path().to_str().unwrap();
589 let local_store = new_fs_store(local_path);
590
591 let write_cache = env
593 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
594 .await;
595 let cache_manager = Arc::new(
596 CacheManager::builder()
597 .write_cache(Some(write_cache.clone()))
598 .build(),
599 );
600
601 let metadata = Arc::new(sst_region_metadata());
603
604 let source = new_source(&[
605 new_batch_by_range(&["a", "d"], 0, 60),
606 new_batch_by_range(&["b", "f"], 0, 40),
607 new_batch_by_range(&["b", "h"], 100, 200),
608 ]);
609
610 let write_request = SstWriteRequest {
612 op_type: OperationType::Flush,
613 metadata,
614 source: either::Left(source),
615 storage: None,
616 max_sequence: None,
617 cache_manager: cache_manager.clone(),
618 index_options: IndexOptions::default(),
619 index_config: Default::default(),
620 inverted_index_config: Default::default(),
621 fulltext_index_config: Default::default(),
622 bloom_filter_index_config: Default::default(),
623 };
624 let write_opts = WriteOptions {
625 row_group_size: 512,
626 ..Default::default()
627 };
628 let upload_request = SstUploadRequest {
629 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
630 remote_store: mock_store.clone(),
631 };
632
633 let mut metrics = Metrics::new(WriteType::Flush);
634 let mut sst_infos = write_cache
635 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
636 .await
637 .unwrap();
638 let sst_info = sst_infos.remove(0);
639 let write_parquet_metadata = sst_info.file_metadata.unwrap();
640
641 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
643 let builder = ParquetReaderBuilder::new(
644 data_home,
645 PathType::Bare,
646 handle.clone(),
647 mock_store.clone(),
648 )
649 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
650 let reader = builder.build().await.unwrap();
651
652 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
654 }
655
656 #[tokio::test]
657 async fn test_write_cache_clean_tmp_files() {
658 common_telemetry::init_default_ut_logging();
659 let mut env = TestEnv::new().await;
660 let data_home = env.data_home().display().to_string();
661 let mock_store = env.init_object_store_manager();
662
663 let write_cache_dir = create_temp_dir("");
664 let write_cache_path = write_cache_dir.path().to_str().unwrap();
665 let write_cache = env
666 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
667 .await;
668
669 let cache_manager = Arc::new(
671 CacheManager::builder()
672 .write_cache(Some(write_cache.clone()))
673 .build(),
674 );
675
676 let metadata = Arc::new(sst_region_metadata());
678
679 let source = Source::Iter(Box::new(
681 [
682 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
683 InvalidBatchSnafu {
684 reason: "Abort the writer",
685 }
686 .fail(),
687 ]
688 .into_iter(),
689 ));
690
691 let write_request = SstWriteRequest {
693 op_type: OperationType::Flush,
694 metadata,
695 source: either::Left(source),
696 storage: None,
697 max_sequence: None,
698 cache_manager: cache_manager.clone(),
699 index_options: IndexOptions::default(),
700 index_config: Default::default(),
701 inverted_index_config: Default::default(),
702 fulltext_index_config: Default::default(),
703 bloom_filter_index_config: Default::default(),
704 };
705 let write_opts = WriteOptions {
706 row_group_size: 512,
707 ..Default::default()
708 };
709 let upload_request = SstUploadRequest {
710 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
711 remote_store: mock_store.clone(),
712 };
713
714 let mut metrics = Metrics::new(WriteType::Flush);
715 write_cache
716 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
717 .await
718 .unwrap_err();
719 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
720 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
721 let mut has_files = false;
722 while let Some(entry) = entries.next_entry().await.unwrap() {
723 if entry.file_type().await.unwrap().is_dir() {
724 continue;
725 }
726 has_files = true;
727 common_telemetry::warn!(
728 "Found remaining temporary file in atomic dir: {}",
729 entry.path().display()
730 );
731 }
732
733 assert!(!has_files);
734 }
735}