1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::cache::manifest_cache::ManifestCache;
34use crate::error::{self, Result};
35use crate::metrics::UPLOAD_BYTES_TOTAL;
36use crate::region::opener::RegionLoadCacheTask;
37use crate::sst::file::RegionFileId;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::index::intermediate::IntermediateManager;
40use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
44
45pub struct WriteCache {
49 file_cache: FileCacheRef,
51 puffin_manager_factory: PuffinManagerFactory,
53 intermediate_manager: IntermediateManager,
55 task_sender: UnboundedSender<RegionLoadCacheTask>,
57 manifest_cache: Option<ManifestCache>,
59}
60
61pub type WriteCacheRef = Arc<WriteCache>;
62
63impl WriteCache {
64 #[allow(clippy::too_many_arguments)]
67 pub async fn new(
68 local_store: ObjectStore,
69 cache_capacity: ReadableSize,
70 ttl: Option<Duration>,
71 index_cache_percent: Option<u8>,
72 enable_background_worker: bool,
73 puffin_manager_factory: PuffinManagerFactory,
74 intermediate_manager: IntermediateManager,
75 manifest_cache: Option<ManifestCache>,
76 ) -> Result<Self> {
77 let (task_sender, task_receiver) = unbounded_channel();
78
79 let file_cache = Arc::new(FileCache::new(
80 local_store,
81 cache_capacity,
82 ttl,
83 index_cache_percent,
84 enable_background_worker,
85 ));
86 file_cache.recover(false, Some(task_receiver)).await;
87
88 Ok(Self {
89 file_cache,
90 puffin_manager_factory,
91 intermediate_manager,
92 task_sender,
93 manifest_cache,
94 })
95 }
96
97 #[allow(clippy::too_many_arguments)]
99 pub async fn new_fs(
100 cache_dir: &str,
101 cache_capacity: ReadableSize,
102 ttl: Option<Duration>,
103 index_cache_percent: Option<u8>,
104 enable_background_worker: bool,
105 puffin_manager_factory: PuffinManagerFactory,
106 intermediate_manager: IntermediateManager,
107 manifest_cache_capacity: ReadableSize,
108 ) -> Result<Self> {
109 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
110
111 let local_store = new_fs_cache_store(cache_dir).await?;
112
113 let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
115 Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl, false).await)
116 } else {
117 None
118 };
119
120 Self::new(
121 local_store,
122 cache_capacity,
123 ttl,
124 index_cache_percent,
125 enable_background_worker,
126 puffin_manager_factory,
127 intermediate_manager,
128 manifest_cache,
129 )
130 .await
131 }
132
133 pub(crate) fn file_cache(&self) -> FileCacheRef {
135 self.file_cache.clone()
136 }
137
138 pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
140 self.manifest_cache.clone()
141 }
142
143 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
145 let store = self.file_cache.local_store();
146 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
147 self.puffin_manager_factory.build(store, path_provider)
148 }
149
150 pub(crate) async fn put_and_upload_sst(
152 &self,
153 data: &bytes::Bytes,
154 region_id: RegionId,
155 sst_info: &SstInfo,
156 upload_request: SstUploadRequest,
157 ) -> Result<Metrics> {
158 let file_id = sst_info.file_id;
159 let mut metrics = Metrics::new(WriteType::Flush);
160
161 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
163
164 let cache_start = Instant::now();
166 let cache_path = self.file_cache.cache_file_path(parquet_key);
167 let store = self.file_cache.local_store();
168 let cleaner = TempFileCleaner::new(region_id, store.clone());
169 let write_res = store
170 .write(&cache_path, data.clone())
171 .await
172 .context(crate::error::OpenDalSnafu);
173 if let Err(e) = write_res {
174 cleaner.clean_by_file_id(file_id).await;
175 return Err(e);
176 }
177
178 metrics.write_batch = cache_start.elapsed();
179
180 let upload_start = Instant::now();
182 let region_file_id = RegionFileId::new(region_id, file_id);
183 let remote_path = upload_request
184 .dest_path_provider
185 .build_sst_file_path(region_file_id);
186
187 if let Err(e) = self
188 .upload(parquet_key, &remote_path, &upload_request.remote_store)
189 .await
190 {
191 self.remove(parquet_key).await;
193 return Err(e);
194 }
195
196 metrics.upload_parquet = upload_start.elapsed();
197 Ok(metrics)
198 }
199
200 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
202 &self.intermediate_manager
203 }
204
205 pub(crate) async fn write_and_upload_sst(
207 &self,
208 write_request: SstWriteRequest,
209 upload_request: SstUploadRequest,
210 write_opts: &WriteOptions,
211 metrics: &mut Metrics,
212 ) -> Result<SstInfoArray> {
213 let region_id = write_request.metadata.region_id;
214
215 let store = self.file_cache.local_store();
216 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
217 let indexer = IndexerBuilderImpl {
218 build_type: write_request.op_type.into(),
219 metadata: write_request.metadata.clone(),
220 row_group_size: write_opts.row_group_size,
221 puffin_manager: self
222 .puffin_manager_factory
223 .build(store.clone(), path_provider.clone()),
224 write_cache_enabled: true,
225 intermediate_manager: self.intermediate_manager.clone(),
226 index_options: write_request.index_options,
227 inverted_index_config: write_request.inverted_index_config,
228 fulltext_index_config: write_request.fulltext_index_config,
229 bloom_filter_index_config: write_request.bloom_filter_index_config,
230 #[cfg(feature = "vector_index")]
231 vector_index_config: write_request.vector_index_config,
232 };
233
234 let cleaner = TempFileCleaner::new(region_id, store.clone());
235 let mut writer = ParquetWriter::new_with_object_store(
237 store.clone(),
238 write_request.metadata,
239 write_request.index_config,
240 indexer,
241 path_provider.clone(),
242 metrics,
243 )
244 .await
245 .with_file_cleaner(cleaner);
246
247 let sst_info = match write_request.source {
248 either::Left(source) => {
249 writer
250 .write_all(source, write_request.max_sequence, write_opts)
251 .await?
252 }
253 either::Right(flat_source) => {
254 writer
255 .write_all_flat(flat_source, write_request.max_sequence, write_opts)
256 .await?
257 }
258 };
259
260 if sst_info.is_empty() {
262 return Ok(sst_info);
263 }
264
265 let mut upload_tracker = UploadTracker::new(region_id);
266 let mut err = None;
267 let remote_store = &upload_request.remote_store;
268 for sst in &sst_info {
269 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
270 let parquet_path = upload_request
271 .dest_path_provider
272 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
273 let start = Instant::now();
274 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
275 err = Some(e);
276 break;
277 }
278 metrics.upload_parquet += start.elapsed();
279 upload_tracker.push_uploaded_file(parquet_path);
280
281 if sst.index_metadata.file_size > 0 {
282 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
283 let puffin_path = upload_request
284 .dest_path_provider
285 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
286 let start = Instant::now();
287 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
288 err = Some(e);
289 break;
290 }
291 metrics.upload_puffin += start.elapsed();
292 upload_tracker.push_uploaded_file(puffin_path);
293 }
294 }
295
296 if let Some(err) = err {
297 upload_tracker
299 .clean(&sst_info, &self.file_cache, remote_store)
300 .await;
301 return Err(err);
302 }
303
304 Ok(sst_info)
305 }
306
307 pub(crate) async fn remove(&self, index_key: IndexKey) {
309 self.file_cache.remove(index_key).await
310 }
311
312 pub(crate) async fn download(
315 &self,
316 index_key: IndexKey,
317 remote_path: &str,
318 remote_store: &ObjectStore,
319 file_size: u64,
320 ) -> Result<()> {
321 self.file_cache
322 .download(index_key, remote_path, remote_store, file_size)
323 .await
324 }
325
326 pub(crate) async fn upload(
328 &self,
329 index_key: IndexKey,
330 upload_path: &str,
331 remote_store: &ObjectStore,
332 ) -> Result<()> {
333 let region_id = index_key.region_id;
334 let file_id = index_key.file_id;
335 let file_type = index_key.file_type;
336 let cache_path = self.file_cache.cache_file_path(index_key);
337
338 let start = Instant::now();
339 let cached_value = self
340 .file_cache
341 .local_store()
342 .stat(&cache_path)
343 .await
344 .context(error::OpenDalSnafu)?;
345 let reader = self
346 .file_cache
347 .local_store()
348 .reader(&cache_path)
349 .await
350 .context(error::OpenDalSnafu)?
351 .into_futures_async_read(0..cached_value.content_length())
352 .await
353 .context(error::OpenDalSnafu)?;
354
355 let mut writer = remote_store
356 .writer_with(upload_path)
357 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
358 .concurrent(DEFAULT_WRITE_CONCURRENCY)
359 .await
360 .context(error::OpenDalSnafu)?
361 .into_futures_async_write();
362
363 let bytes_written =
364 futures::io::copy(reader, &mut writer)
365 .await
366 .context(error::UploadSnafu {
367 region_id,
368 file_id,
369 file_type,
370 })?;
371
372 writer.close().await.context(error::UploadSnafu {
374 region_id,
375 file_id,
376 file_type,
377 })?;
378
379 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
380
381 debug!(
382 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
383 region_id,
384 file_id,
385 upload_path,
386 start.elapsed(),
387 );
388
389 let index_value = IndexValue {
390 file_size: bytes_written as _,
391 };
392 self.file_cache.put(index_key, index_value).await;
394
395 Ok(())
396 }
397
398 pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
402 let _ = self.task_sender.send(task);
403 }
404}
405
406pub struct SstUploadRequest {
408 pub dest_path_provider: RegionFilePathFactory,
410 pub remote_store: ObjectStore,
412}
413
414pub(crate) struct UploadTracker {
416 region_id: RegionId,
418 files_uploaded: Vec<String>,
420}
421
422impl UploadTracker {
423 pub(crate) fn new(region_id: RegionId) -> Self {
425 Self {
426 region_id,
427 files_uploaded: Vec::new(),
428 }
429 }
430
431 pub(crate) fn push_uploaded_file(&mut self, path: String) {
433 self.files_uploaded.push(path);
434 }
435
436 pub(crate) async fn clean(
438 &self,
439 sst_info: &SstInfoArray,
440 file_cache: &FileCacheRef,
441 remote_store: &ObjectStore,
442 ) {
443 common_telemetry::info!(
444 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
445 self.region_id,
446 sst_info.len()
447 );
448
449 for sst in sst_info {
451 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
452 file_cache.remove(parquet_key).await;
453
454 if sst.index_metadata.file_size > 0 {
455 let puffin_key = IndexKey::new(
456 self.region_id,
457 sst.file_id,
458 FileType::Puffin(sst.index_metadata.version),
459 );
460 file_cache.remove(puffin_key).await;
461 }
462 }
463
464 for file_path in &self.files_uploaded {
466 if let Err(e) = remote_store.delete(file_path).await {
467 common_telemetry::error!(e; "Failed to delete file {}", file_path);
468 }
469 }
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use common_test_util::temp_dir::create_temp_dir;
476 use object_store::ATOMIC_WRITE_DIR;
477 use parquet::file::metadata::PageIndexPolicy;
478 use store_api::region_request::PathType;
479
480 use super::*;
481 use crate::access_layer::OperationType;
482 use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
483 use crate::cache::{CacheManager, CacheStrategy};
484 use crate::error::InvalidBatchSnafu;
485 use crate::read::Source;
486 use crate::region::options::IndexOptions;
487 use crate::sst::parquet::reader::ParquetReaderBuilder;
488 use crate::test_util::TestEnv;
489 use crate::test_util::sst_util::{
490 new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata,
491 };
492
493 #[tokio::test]
494 async fn test_write_and_upload_sst() {
495 let mut env = TestEnv::new().await;
498 let mock_store = env.init_object_store_manager();
499 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
500
501 let local_dir = create_temp_dir("");
502 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
503
504 let write_cache = env
505 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
506 .await;
507
508 let metadata = Arc::new(sst_region_metadata());
510 let region_id = metadata.region_id;
511 let source = new_source(&[
512 new_batch_by_range(&["a", "d"], 0, 60),
513 new_batch_by_range(&["b", "f"], 0, 40),
514 new_batch_by_range(&["b", "h"], 100, 200),
515 ]);
516
517 let write_request = SstWriteRequest {
518 op_type: OperationType::Flush,
519 metadata,
520 source: either::Left(source),
521 storage: None,
522 max_sequence: None,
523 cache_manager: Default::default(),
524 index_options: IndexOptions::default(),
525 index_config: Default::default(),
526 inverted_index_config: Default::default(),
527 fulltext_index_config: Default::default(),
528 bloom_filter_index_config: Default::default(),
529 #[cfg(feature = "vector_index")]
530 vector_index_config: Default::default(),
531 };
532
533 let upload_request = SstUploadRequest {
534 dest_path_provider: path_provider.clone(),
535 remote_store: mock_store.clone(),
536 };
537
538 let write_opts = WriteOptions {
539 row_group_size: 512,
540 ..Default::default()
541 };
542
543 let mut metrics = Metrics::new(WriteType::Flush);
545 let mut sst_infos = write_cache
546 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
547 .await
548 .unwrap();
549 let sst_info = sst_infos.remove(0);
550
551 let file_id = sst_info.file_id;
552 let sst_upload_path =
553 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
554 let index_upload_path =
555 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
556
557 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
559 assert!(write_cache.file_cache.contains_key(&key));
560
561 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
563 let cache_data = local_store
564 .read(&write_cache.file_cache.cache_file_path(key))
565 .await
566 .unwrap();
567 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
568
569 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
571 assert!(write_cache.file_cache.contains_key(&index_key));
572
573 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
574 let cache_index_data = local_store
575 .read(&write_cache.file_cache.cache_file_path(index_key))
576 .await
577 .unwrap();
578 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
579
580 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
582 write_cache.remove(sst_index_key).await;
583 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
584 write_cache.remove(index_key).await;
585 assert!(!write_cache.file_cache.contains_key(&index_key));
586 }
587
588 #[tokio::test]
589 async fn test_read_metadata_from_write_cache() {
590 common_telemetry::init_default_ut_logging();
591 let mut env = TestEnv::new().await;
592 let data_home = env.data_home().display().to_string();
593 let mock_store = env.init_object_store_manager();
594
595 let local_dir = create_temp_dir("");
596 let local_path = local_dir.path().to_str().unwrap();
597 let local_store = new_fs_store(local_path);
598
599 let write_cache = env
601 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
602 .await;
603 let cache_manager = Arc::new(
604 CacheManager::builder()
605 .write_cache(Some(write_cache.clone()))
606 .build(),
607 );
608
609 let metadata = Arc::new(sst_region_metadata());
611
612 let source = new_source(&[
613 new_batch_by_range(&["a", "d"], 0, 60),
614 new_batch_by_range(&["b", "f"], 0, 40),
615 new_batch_by_range(&["b", "h"], 100, 200),
616 ]);
617
618 let write_request = SstWriteRequest {
620 op_type: OperationType::Flush,
621 metadata,
622 source: either::Left(source),
623 storage: None,
624 max_sequence: None,
625 cache_manager: cache_manager.clone(),
626 index_options: IndexOptions::default(),
627 index_config: Default::default(),
628 inverted_index_config: Default::default(),
629 fulltext_index_config: Default::default(),
630 bloom_filter_index_config: Default::default(),
631 #[cfg(feature = "vector_index")]
632 vector_index_config: Default::default(),
633 };
634 let write_opts = WriteOptions {
635 row_group_size: 512,
636 ..Default::default()
637 };
638 let upload_request = SstUploadRequest {
639 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
640 remote_store: mock_store.clone(),
641 };
642
643 let mut metrics = Metrics::new(WriteType::Flush);
644 let mut sst_infos = write_cache
645 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
646 .await
647 .unwrap();
648 let sst_info = sst_infos.remove(0);
649 let write_parquet_metadata = sst_info.file_metadata.unwrap();
650
651 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
653 let builder = ParquetReaderBuilder::new(
654 data_home,
655 PathType::Bare,
656 handle.clone(),
657 mock_store.clone(),
658 )
659 .cache(CacheStrategy::EnableAll(cache_manager.clone()))
660 .page_index_policy(PageIndexPolicy::Optional);
661 let reader = builder.build().await.unwrap();
662
663 assert_parquet_metadata_equal(write_parquet_metadata, reader.parquet_metadata());
665 }
666
667 #[tokio::test]
668 async fn test_write_cache_clean_tmp_files() {
669 common_telemetry::init_default_ut_logging();
670 let mut env = TestEnv::new().await;
671 let data_home = env.data_home().display().to_string();
672 let mock_store = env.init_object_store_manager();
673
674 let write_cache_dir = create_temp_dir("");
675 let write_cache_path = write_cache_dir.path().to_str().unwrap();
676 let write_cache = env
677 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
678 .await;
679
680 let cache_manager = Arc::new(
682 CacheManager::builder()
683 .write_cache(Some(write_cache.clone()))
684 .build(),
685 );
686
687 let metadata = Arc::new(sst_region_metadata());
689
690 let source = Source::Iter(Box::new(
692 [
693 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
694 InvalidBatchSnafu {
695 reason: "Abort the writer",
696 }
697 .fail(),
698 ]
699 .into_iter(),
700 ));
701
702 let write_request = SstWriteRequest {
704 op_type: OperationType::Flush,
705 metadata,
706 source: either::Left(source),
707 storage: None,
708 max_sequence: None,
709 cache_manager: cache_manager.clone(),
710 index_options: IndexOptions::default(),
711 index_config: Default::default(),
712 inverted_index_config: Default::default(),
713 fulltext_index_config: Default::default(),
714 bloom_filter_index_config: Default::default(),
715 #[cfg(feature = "vector_index")]
716 vector_index_config: Default::default(),
717 };
718 let write_opts = WriteOptions {
719 row_group_size: 512,
720 ..Default::default()
721 };
722 let upload_request = SstUploadRequest {
723 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
724 remote_store: mock_store.clone(),
725 };
726
727 let mut metrics = Metrics::new(WriteType::Flush);
728 write_cache
729 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
730 .await
731 .unwrap_err();
732 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
733 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
734 let mut has_files = false;
735 while let Some(entry) = entries.next_entry().await.unwrap() {
736 if entry.file_type().await.unwrap().is_dir() {
737 continue;
738 }
739 has_files = true;
740 common_telemetry::warn!(
741 "Found remaining temporary file in atomic dir: {}",
742 entry.path().display()
743 );
744 }
745
746 assert!(!has_files);
747 }
748}