mito2/
access_layer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use async_stream::try_stream;
19use common_time::Timestamp;
20use either::Either;
21use futures::{Stream, TryStreamExt};
22use object_store::services::Fs;
23use object_store::util::{join_dir, with_instrument_layers};
24use object_store::{ATOMIC_WRITE_DIR, ErrorKind, OLD_ATOMIC_WRITE_DIR, ObjectStore};
25use smallvec::SmallVec;
26use snafu::ResultExt;
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_request::PathType;
29use store_api::sst_entry::StorageSstEntry;
30use store_api::storage::{FileId, RegionId, SequenceNumber};
31
32use crate::cache::CacheManagerRef;
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::write_cache::SstUploadRequest;
35use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
36use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
37use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
38use crate::read::{FlatSource, Source};
39use crate::region::options::IndexOptions;
40use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
41use crate::sst::index::IndexerBuilderImpl;
42use crate::sst::index::intermediate::IntermediateManager;
43use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
44use crate::sst::location::{self, region_dir_from_table_dir};
45use crate::sst::parquet::reader::ParquetReaderBuilder;
46use crate::sst::parquet::writer::ParquetWriter;
47use crate::sst::parquet::{SstInfo, WriteOptions};
48use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
49
50pub type AccessLayerRef = Arc<AccessLayer>;
51/// SST write results.
52pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
53
54/// Write operation type.
55#[derive(Eq, PartialEq, Debug)]
56pub enum WriteType {
57    /// Writes from flush
58    Flush,
59    /// Writes from compaction.
60    Compaction,
61}
62
63#[derive(Debug)]
64pub struct Metrics {
65    pub(crate) write_type: WriteType,
66    pub(crate) iter_source: Duration,
67    pub(crate) write_batch: Duration,
68    pub(crate) update_index: Duration,
69    pub(crate) upload_parquet: Duration,
70    pub(crate) upload_puffin: Duration,
71    pub(crate) compact_memtable: Duration,
72}
73
74impl Metrics {
75    pub fn new(write_type: WriteType) -> Self {
76        Self {
77            write_type,
78            iter_source: Default::default(),
79            write_batch: Default::default(),
80            update_index: Default::default(),
81            upload_parquet: Default::default(),
82            upload_puffin: Default::default(),
83            compact_memtable: Default::default(),
84        }
85    }
86
87    pub(crate) fn merge(mut self, other: Self) -> Self {
88        assert_eq!(self.write_type, other.write_type);
89        self.iter_source += other.iter_source;
90        self.write_batch += other.write_batch;
91        self.update_index += other.update_index;
92        self.upload_parquet += other.upload_parquet;
93        self.upload_puffin += other.upload_puffin;
94        self.compact_memtable += other.compact_memtable;
95        self
96    }
97
98    pub(crate) fn observe(self) {
99        match self.write_type {
100            WriteType::Flush => {
101                FLUSH_ELAPSED
102                    .with_label_values(&["iter_source"])
103                    .observe(self.iter_source.as_secs_f64());
104                FLUSH_ELAPSED
105                    .with_label_values(&["write_batch"])
106                    .observe(self.write_batch.as_secs_f64());
107                FLUSH_ELAPSED
108                    .with_label_values(&["update_index"])
109                    .observe(self.update_index.as_secs_f64());
110                FLUSH_ELAPSED
111                    .with_label_values(&["upload_parquet"])
112                    .observe(self.upload_parquet.as_secs_f64());
113                FLUSH_ELAPSED
114                    .with_label_values(&["upload_puffin"])
115                    .observe(self.upload_puffin.as_secs_f64());
116                if !self.compact_memtable.is_zero() {
117                    FLUSH_ELAPSED
118                        .with_label_values(&["compact_memtable"])
119                        .observe(self.upload_puffin.as_secs_f64());
120                }
121            }
122            WriteType::Compaction => {
123                COMPACTION_STAGE_ELAPSED
124                    .with_label_values(&["iter_source"])
125                    .observe(self.iter_source.as_secs_f64());
126                COMPACTION_STAGE_ELAPSED
127                    .with_label_values(&["write_batch"])
128                    .observe(self.write_batch.as_secs_f64());
129                COMPACTION_STAGE_ELAPSED
130                    .with_label_values(&["update_index"])
131                    .observe(self.update_index.as_secs_f64());
132                COMPACTION_STAGE_ELAPSED
133                    .with_label_values(&["upload_parquet"])
134                    .observe(self.upload_parquet.as_secs_f64());
135                COMPACTION_STAGE_ELAPSED
136                    .with_label_values(&["upload_puffin"])
137                    .observe(self.upload_puffin.as_secs_f64());
138            }
139        };
140    }
141}
142
143/// A layer to access SST files under the same directory.
144pub struct AccessLayer {
145    table_dir: String,
146    /// Path type for generating file paths.
147    path_type: PathType,
148    /// Target object store.
149    object_store: ObjectStore,
150    /// Puffin manager factory for index.
151    puffin_manager_factory: PuffinManagerFactory,
152    /// Intermediate manager for inverted index.
153    intermediate_manager: IntermediateManager,
154}
155
156impl std::fmt::Debug for AccessLayer {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("AccessLayer")
159            .field("table_dir", &self.table_dir)
160            .finish()
161    }
162}
163
164impl AccessLayer {
165    /// Returns a new [AccessLayer] for specific `table_dir`.
166    pub fn new(
167        table_dir: impl Into<String>,
168        path_type: PathType,
169        object_store: ObjectStore,
170        puffin_manager_factory: PuffinManagerFactory,
171        intermediate_manager: IntermediateManager,
172    ) -> AccessLayer {
173        AccessLayer {
174            table_dir: table_dir.into(),
175            path_type,
176            object_store,
177            puffin_manager_factory,
178            intermediate_manager,
179        }
180    }
181
182    /// Returns the directory of the table.
183    pub fn table_dir(&self) -> &str {
184        &self.table_dir
185    }
186
187    /// Returns the object store of the layer.
188    pub fn object_store(&self) -> &ObjectStore {
189        &self.object_store
190    }
191
192    /// Returns the path type of the layer.
193    pub fn path_type(&self) -> PathType {
194        self.path_type
195    }
196
197    /// Returns the puffin manager factory.
198    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
199        &self.puffin_manager_factory
200    }
201
202    /// Returns the intermediate manager.
203    pub fn intermediate_manager(&self) -> &IntermediateManager {
204        &self.intermediate_manager
205    }
206
207    /// Build the puffin manager.
208    pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
209        let store = self.object_store.clone();
210        let path_provider =
211            RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type());
212        self.puffin_manager_factory.build(store, path_provider)
213    }
214
215    /// Deletes a SST file (and its index file if it has one) with given file id.
216    pub(crate) async fn delete_sst(
217        &self,
218        region_file_id: &RegionFileId,
219        index_file_id: &RegionIndexId,
220    ) -> Result<()> {
221        let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
222        self.object_store
223            .delete(&path)
224            .await
225            .context(DeleteSstSnafu {
226                file_id: region_file_id.file_id(),
227            })?;
228
229        // Delete all versions of the index file.
230        for version in 0..=index_file_id.version {
231            let index_id = RegionIndexId::new(*region_file_id, version);
232            self.delete_index(index_id).await?;
233        }
234
235        Ok(())
236    }
237
238    pub(crate) async fn delete_index(
239        &self,
240        index_file_id: RegionIndexId,
241    ) -> Result<(), crate::error::Error> {
242        let path = location::index_file_path(
243            &self.table_dir,
244            RegionIndexId::new(index_file_id.file_id, index_file_id.version),
245            self.path_type,
246        );
247        self.object_store
248            .delete(&path)
249            .await
250            .context(DeleteIndexSnafu {
251                file_id: index_file_id.file_id(),
252            })?;
253        Ok(())
254    }
255
256    /// Returns the directory of the region in the table.
257    pub fn build_region_dir(&self, region_id: RegionId) -> String {
258        region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
259    }
260
261    /// Returns a reader builder for specific `file`.
262    pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
263        ParquetReaderBuilder::new(
264            self.table_dir.clone(),
265            self.path_type,
266            file,
267            self.object_store.clone(),
268        )
269    }
270
271    /// Writes a SST with specific `file_id` and `metadata` to the layer.
272    ///
273    /// Returns the info of the SST. If no data written, returns None.
274    pub async fn write_sst(
275        &self,
276        request: SstWriteRequest,
277        write_opts: &WriteOptions,
278        metrics: &mut Metrics,
279    ) -> Result<SstInfoArray> {
280        let region_id = request.metadata.region_id;
281        let cache_manager = request.cache_manager.clone();
282
283        let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
284            // Write to the write cache.
285            write_cache
286                .write_and_upload_sst(
287                    request,
288                    SstUploadRequest {
289                        dest_path_provider: RegionFilePathFactory::new(
290                            self.table_dir.clone(),
291                            self.path_type,
292                        ),
293                        remote_store: self.object_store.clone(),
294                    },
295                    write_opts,
296                    metrics,
297                )
298                .await?
299        } else {
300            // Write cache is disabled.
301            let store = self.object_store.clone();
302            let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
303            let indexer_builder = IndexerBuilderImpl {
304                build_type: request.op_type.into(),
305                metadata: request.metadata.clone(),
306                row_group_size: write_opts.row_group_size,
307                puffin_manager: self
308                    .puffin_manager_factory
309                    .build(store, path_provider.clone()),
310                write_cache_enabled: false,
311                intermediate_manager: self.intermediate_manager.clone(),
312                index_options: request.index_options,
313                inverted_index_config: request.inverted_index_config,
314                fulltext_index_config: request.fulltext_index_config,
315                bloom_filter_index_config: request.bloom_filter_index_config,
316                #[cfg(feature = "vector_index")]
317                vector_index_config: request.vector_index_config,
318            };
319            // We disable write cache on file system but we still use atomic write.
320            // TODO(yingwen): If we support other non-fs stores without the write cache, then
321            // we may have find a way to check whether we need the cleaner.
322            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
323            let mut writer = ParquetWriter::new_with_object_store(
324                self.object_store.clone(),
325                request.metadata,
326                request.index_config,
327                indexer_builder,
328                path_provider,
329                metrics,
330            )
331            .await
332            .with_file_cleaner(cleaner);
333            match request.source {
334                Either::Left(source) => {
335                    writer
336                        .write_all(source, request.max_sequence, write_opts)
337                        .await?
338                }
339                Either::Right(flat_source) => {
340                    writer.write_all_flat(flat_source, write_opts).await?
341                }
342            }
343        };
344
345        // Put parquet metadata to cache manager.
346        if !sst_info.is_empty() {
347            for sst in &sst_info {
348                if let Some(parquet_metadata) = &sst.file_metadata {
349                    cache_manager.put_parquet_meta_data(
350                        RegionFileId::new(region_id, sst.file_id),
351                        parquet_metadata.clone(),
352                    )
353                }
354            }
355        }
356
357        Ok(sst_info)
358    }
359
360    /// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store.
361    pub(crate) async fn put_sst(
362        &self,
363        data: &bytes::Bytes,
364        region_id: RegionId,
365        sst_info: &SstInfo,
366        cache_manager: &CacheManagerRef,
367    ) -> Result<Metrics> {
368        if let Some(write_cache) = cache_manager.write_cache() {
369            // Write to cache and upload to remote store
370            let upload_request = SstUploadRequest {
371                dest_path_provider: RegionFilePathFactory::new(
372                    self.table_dir.clone(),
373                    self.path_type,
374                ),
375                remote_store: self.object_store.clone(),
376            };
377            write_cache
378                .put_and_upload_sst(data, region_id, sst_info, upload_request)
379                .await
380        } else {
381            let start = Instant::now();
382            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
383            let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
384            let sst_file_path =
385                path_provider.build_sst_file_path(RegionFileId::new(region_id, sst_info.file_id));
386            let mut writer = self
387                .object_store
388                .writer_with(&sst_file_path)
389                .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
390                .concurrent(DEFAULT_WRITE_CONCURRENCY)
391                .await
392                .context(OpenDalSnafu)?;
393            if let Err(err) = writer.write(data.clone()).await.context(OpenDalSnafu) {
394                cleaner.clean_by_file_id(sst_info.file_id).await;
395                return Err(err);
396            }
397            if let Err(err) = writer.close().await.context(OpenDalSnafu) {
398                cleaner.clean_by_file_id(sst_info.file_id).await;
399                return Err(err);
400            }
401            let mut metrics = Metrics::new(WriteType::Flush);
402            metrics.write_batch = start.elapsed();
403            Ok(metrics)
404        }
405    }
406
407    /// Lists the SST entries from the storage layer in the table directory.
408    pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> + use<> {
409        let object_store = self.object_store.clone();
410        let table_dir = self.table_dir.clone();
411
412        try_stream! {
413            let mut lister = object_store
414                .lister_with(table_dir.as_str())
415                .recursive(true)
416                .await
417                .context(OpenDalSnafu)?;
418
419            while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
420                let metadata = entry.metadata();
421                if metadata.is_dir() {
422                    continue;
423                }
424
425                let path = entry.path();
426                if !path.ends_with(".parquet") && !path.ends_with(".puffin") {
427                    continue;
428                }
429
430                let file_size = metadata.content_length();
431                let file_size = if file_size == 0 { None } else { Some(file_size) };
432                let last_modified_ms = metadata
433                    .last_modified()
434                    .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
435
436                let entry = StorageSstEntry {
437                    file_path: path.to_string(),
438                    file_size,
439                    last_modified_ms,
440                    node_id: None,
441                };
442
443                yield entry;
444            }
445        }
446    }
447}
448
449/// `OperationType` represents the origin of the `SstWriteRequest`.
450#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
451pub enum OperationType {
452    Flush,
453    Compact,
454}
455
456/// Contents to build a SST.
457pub struct SstWriteRequest {
458    pub op_type: OperationType,
459    pub metadata: RegionMetadataRef,
460    pub source: Either<Source, FlatSource>,
461    pub cache_manager: CacheManagerRef,
462    #[allow(dead_code)]
463    pub storage: Option<String>,
464    pub max_sequence: Option<SequenceNumber>,
465
466    /// Configs for index
467    pub index_options: IndexOptions,
468    pub index_config: IndexConfig,
469    pub inverted_index_config: InvertedIndexConfig,
470    pub fulltext_index_config: FulltextIndexConfig,
471    pub bloom_filter_index_config: BloomFilterConfig,
472    #[cfg(feature = "vector_index")]
473    pub vector_index_config: crate::config::VectorIndexConfig,
474}
475
476/// Cleaner to remove temp files on the atomic write dir.
477pub(crate) struct TempFileCleaner {
478    region_id: RegionId,
479    object_store: ObjectStore,
480}
481
482impl TempFileCleaner {
483    /// Constructs the cleaner for the region and store.
484    pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
485        Self {
486            region_id,
487            object_store,
488        }
489    }
490
491    /// Removes the SST and index file from the local atomic dir by the file id.
492    /// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0.
493    pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
494        let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
495        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
496
497        Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
498    }
499
500    /// Removes the files from the local atomic dir by their names.
501    pub(crate) async fn clean_atomic_dir_files(
502        local_store: &ObjectStore,
503        names_to_remove: &[&str],
504    ) {
505        // We don't know the actual suffix of the file under atomic dir, so we have
506        // to list the dir. The cost should be acceptable as there won't be to many files.
507        let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
508            if e.kind() != ErrorKind::NotFound {
509                common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
510            }
511        }) else {
512            return;
513        };
514
515        // In our case, we can ensure the file id is unique so it is safe to remove all files
516        // with the same file id under the atomic write dir.
517        let actual_files: Vec<_> = entries
518            .into_iter()
519            .filter_map(|entry| {
520                if entry.metadata().is_dir() {
521                    return None;
522                }
523
524                // Remove name that matches files_to_remove.
525                let should_remove = names_to_remove
526                    .iter()
527                    .any(|file| entry.name().starts_with(file));
528                if should_remove {
529                    Some(entry.path().to_string())
530                } else {
531                    None
532                }
533            })
534            .collect();
535
536        common_telemetry::warn!(
537            "Clean files {:?} under atomic write dir for {:?}",
538            actual_files,
539            names_to_remove
540        );
541
542        if let Err(e) = local_store.delete_iter(actual_files).await {
543            common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
544        }
545    }
546}
547
548pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
549    let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
550    clean_dir(&atomic_write_dir).await?;
551
552    // Compatible code. Remove this after a major release.
553    let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
554    clean_dir(&old_atomic_temp_dir).await?;
555
556    let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
557    let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
558
559    Ok(with_instrument_layers(store, false))
560}
561
562/// Clean the directory.
563async fn clean_dir(dir: &str) -> Result<()> {
564    if tokio::fs::try_exists(dir)
565        .await
566        .context(CleanDirSnafu { dir })?
567    {
568        tokio::fs::remove_dir_all(dir)
569            .await
570            .context(CleanDirSnafu { dir })?;
571    }
572
573    Ok(())
574}
575
576/// Path provider for SST file and index file.
577pub trait FilePathProvider: Send + Sync {
578    /// Creates index file path of given file id. Version default to 0, and not shown in the path.
579    fn build_index_file_path(&self, file_id: RegionFileId) -> String;
580
581    /// Creates index file path of given index id (with version support).
582    fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
583
584    /// Creates SST file path of given file id.
585    fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
586}
587
588/// Path provider that builds paths in local write cache.
589#[derive(Clone)]
590pub(crate) struct WriteCachePathProvider {
591    file_cache: FileCacheRef,
592}
593
594impl WriteCachePathProvider {
595    /// Creates a new `WriteCachePathProvider` instance.
596    pub fn new(file_cache: FileCacheRef) -> Self {
597        Self { file_cache }
598    }
599}
600
601impl FilePathProvider for WriteCachePathProvider {
602    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
603        let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
604        self.file_cache.cache_file_path(puffin_key)
605    }
606
607    fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
608        let puffin_key = IndexKey::new(
609            index_id.region_id(),
610            index_id.file_id(),
611            FileType::Puffin(index_id.version),
612        );
613        self.file_cache.cache_file_path(puffin_key)
614    }
615
616    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
617        let parquet_file_key =
618            IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
619        self.file_cache.cache_file_path(parquet_file_key)
620    }
621}
622
623/// Path provider that builds paths in region storage path.
624#[derive(Clone, Debug)]
625pub(crate) struct RegionFilePathFactory {
626    pub(crate) table_dir: String,
627    pub(crate) path_type: PathType,
628}
629
630impl RegionFilePathFactory {
631    /// Creates a new `RegionFilePathFactory` instance.
632    pub fn new(table_dir: String, path_type: PathType) -> Self {
633        Self {
634            table_dir,
635            path_type,
636        }
637    }
638}
639
640impl FilePathProvider for RegionFilePathFactory {
641    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
642        location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
643    }
644
645    fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
646        location::index_file_path(&self.table_dir, index_id, self.path_type)
647    }
648
649    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
650        location::sst_file_path(&self.table_dir, file_id, self.path_type)
651    }
652}