mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 16:10:02 +00:00
Compare commits
7 Commits
chore/test
...
feat/index
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
122dfbf9f8 | ||
|
|
3c7fa84442 | ||
|
|
0cd368d1f6 | ||
|
|
c82208dbc0 | ||
|
|
bbbe91e97a | ||
|
|
987e1b5a15 | ||
|
|
9cac640b41 |
@@ -565,6 +565,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
|
||||
struct Noop;
|
||||
impl FilePurger for Noop {
|
||||
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
|
||||
fn update_index(&self, _file_meta: FileMeta, _version: store_api::storage::IndexVersion) {}
|
||||
}
|
||||
Arc::new(Noop)
|
||||
}
|
||||
|
||||
@@ -228,22 +228,25 @@ impl AccessLayer {
|
||||
|
||||
// Delete all versions of the index file.
|
||||
for version in 0..=index_file_id.version {
|
||||
let path = location::index_file_path(
|
||||
&self.table_dir,
|
||||
RegionIndexId::new(index_file_id.file_id, version),
|
||||
self.path_type,
|
||||
);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteIndexSnafu {
|
||||
file_id: region_file_id.file_id(),
|
||||
})?;
|
||||
self.delete_index(&RegionIndexId::new(index_file_id.file_id, version))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_index(&self, region_index_id: &RegionIndexId) -> Result<()> {
|
||||
let path = location::index_file_path(&self.table_dir, *region_index_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteIndexSnafu {
|
||||
file_id: region_index_id.file_id(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the directory of the region in the table.
|
||||
pub fn build_region_dir(&self, region_id: RegionId) -> String {
|
||||
region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
|
||||
|
||||
@@ -21,11 +21,10 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use index::bloom_filter::error::Result;
|
||||
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
|
||||
use store_api::storage::{ColumnId, FileId};
|
||||
use store_api::storage::{ColumnId, FileId, IndexVersion};
|
||||
|
||||
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
|
||||
use crate::metrics::{CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::IndexVersion;
|
||||
|
||||
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
|
||||
|
||||
|
||||
3
src/mito2/src/cache/index/inverted_index.rs
vendored
3
src/mito2/src/cache/index/inverted_index.rs
vendored
@@ -22,11 +22,10 @@ use bytes::Bytes;
|
||||
use index::inverted_index::error::Result;
|
||||
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
use prost::Message;
|
||||
use store_api::storage::FileId;
|
||||
use store_api::storage::{FileId, IndexVersion};
|
||||
|
||||
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
|
||||
use crate::metrics::{CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::IndexVersion;
|
||||
|
||||
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
|
||||
use smallvec::SmallVec;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::{ColumnId, FileId, RegionId};
|
||||
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
@@ -82,8 +82,6 @@ pub type Level = u8;
|
||||
pub const MAX_LEVEL: Level = 2;
|
||||
/// Type to store index types for a column.
|
||||
pub type IndexTypes = SmallVec<[IndexType; 4]>;
|
||||
/// Index version
|
||||
pub type IndexVersion = u64;
|
||||
|
||||
/// Cross-region file id.
|
||||
///
|
||||
@@ -199,7 +197,7 @@ pub struct FileMeta {
|
||||
/// Version of the index file.
|
||||
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
|
||||
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
|
||||
pub index_version: u64,
|
||||
pub index_version: IndexVersion,
|
||||
/// Number of rows in the file.
|
||||
///
|
||||
/// For historical reasons, this field might be missing in old files. Thus
|
||||
@@ -540,38 +538,89 @@ pub async fn delete_files(
|
||||
);
|
||||
|
||||
for (file_id, index_version) in file_ids {
|
||||
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
|
||||
// Removes index file from the cache.
|
||||
if delete_index {
|
||||
write_cache
|
||||
.remove(IndexKey::new(
|
||||
region_id,
|
||||
*file_id,
|
||||
FileType::Puffin(*index_version),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
purge_index_write_cache_stager(
|
||||
region_id,
|
||||
delete_index,
|
||||
access_layer,
|
||||
cache_manager,
|
||||
file_id,
|
||||
index_version,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Remove the SST file from the cache.
|
||||
/// Delete index file for a given SST file&index version.
|
||||
pub async fn delete_index(
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
index_version: u64,
|
||||
access_layer: &AccessLayerRef,
|
||||
cache_manager: &Option<CacheManagerRef>,
|
||||
) -> crate::error::Result<()> {
|
||||
if let Err(err) = access_layer
|
||||
.delete_index(&RegionIndexId::new(
|
||||
RegionFileId::new(region_id, file_id),
|
||||
index_version,
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!(err; "Failed to delete index file for {}/{}.{}",
|
||||
region_id, file_id, index_version);
|
||||
}
|
||||
|
||||
purge_index_write_cache_stager(
|
||||
region_id,
|
||||
true,
|
||||
access_layer,
|
||||
cache_manager,
|
||||
&file_id,
|
||||
&index_version,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn purge_index_write_cache_stager(
|
||||
region_id: RegionId,
|
||||
delete_index: bool,
|
||||
access_layer: &Arc<crate::access_layer::AccessLayer>,
|
||||
cache_manager: &Option<Arc<crate::cache::CacheManager>>,
|
||||
file_id: &FileId,
|
||||
index_version: &u64,
|
||||
) {
|
||||
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
|
||||
// Removes index file from the cache.
|
||||
if delete_index {
|
||||
write_cache
|
||||
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
|
||||
.remove(IndexKey::new(
|
||||
region_id,
|
||||
*file_id,
|
||||
FileType::Puffin(*index_version),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Purges index content in the stager.
|
||||
if let Err(e) = access_layer
|
||||
.puffin_manager_factory()
|
||||
.purge_stager(RegionIndexId::new(
|
||||
RegionFileId::new(region_id, *file_id),
|
||||
*index_version,
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
|
||||
file_id, index_version, region_id);
|
||||
}
|
||||
// Remove the SST file from the cache.
|
||||
write_cache
|
||||
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Purges index content in the stager.
|
||||
if let Err(e) = access_layer
|
||||
.puffin_manager_factory()
|
||||
.purge_stager(RegionIndexId::new(
|
||||
RegionFileId::new(region_id, *file_id),
|
||||
*index_version,
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
|
||||
file_id, index_version, region_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -132,7 +132,11 @@ impl FileReferenceManager {
|
||||
let region_id = file_meta.region_id;
|
||||
let mut is_new = false;
|
||||
{
|
||||
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
|
||||
let file_ref = FileRef::new(
|
||||
file_meta.region_id,
|
||||
file_meta.file_id,
|
||||
file_meta.index_version,
|
||||
);
|
||||
self.files_per_region
|
||||
.entry(region_id)
|
||||
.and_modify(|refs| {
|
||||
@@ -157,7 +161,7 @@ impl FileReferenceManager {
|
||||
/// If the reference count reaches zero, the file reference will be removed from the manager.
|
||||
pub fn remove_file(&self, file_meta: &FileMeta) {
|
||||
let region_id = file_meta.region_id;
|
||||
let file_ref = FileRef::new(region_id, file_meta.file_id);
|
||||
let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version);
|
||||
|
||||
let mut remove_table_entry = false;
|
||||
let mut remove_file_ref = false;
|
||||
@@ -246,13 +250,13 @@ mod tests {
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
|
||||
);
|
||||
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
let expected_region_ref_manifest =
|
||||
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
|
||||
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id, 0)]);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
|
||||
@@ -265,7 +269,7 @@ mod tests {
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 2)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
@@ -281,7 +285,7 @@ mod tests {
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -778,6 +778,7 @@ impl IndexBuildTask {
|
||||
self.file_meta.available_indexes = output.build_available_indexes();
|
||||
self.file_meta.indexes = output.build_indexes();
|
||||
self.file_meta.index_file_size = output.file_size;
|
||||
let old_index_version = self.file_meta.index_version;
|
||||
self.file_meta.index_version = new_index_version;
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![self.file_meta.clone()],
|
||||
@@ -801,6 +802,11 @@ impl IndexBuildTask {
|
||||
self.file_meta.region_id,
|
||||
self.reason.as_str()
|
||||
);
|
||||
// notify the file purger to remove the old index files if any
|
||||
if new_index_version > 0 {
|
||||
self.file_purger
|
||||
.update_index(self.file_meta.clone(), old_index_version);
|
||||
}
|
||||
Ok(edit)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,6 @@ pub use datatypes::schema::{
|
||||
};
|
||||
|
||||
pub use self::descriptors::*;
|
||||
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
|
||||
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
|
||||
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
pub use self::types::{SequenceNumber, SequenceRange};
|
||||
|
||||
@@ -24,6 +24,9 @@ use uuid::Uuid;
|
||||
use crate::ManifestVersion;
|
||||
use crate::storage::RegionId;
|
||||
|
||||
/// Index version, incremented when the index file is rebuilt.
|
||||
pub type IndexVersion = u64;
|
||||
|
||||
#[derive(Debug, Snafu, PartialEq)]
|
||||
pub struct ParseIdError {
|
||||
source: uuid::Error,
|
||||
@@ -70,15 +73,21 @@ impl FromStr for FileId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicating holding a `FileHandle` reference for a specific file&index in a region.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct FileRef {
|
||||
pub region_id: RegionId,
|
||||
pub file_id: FileId,
|
||||
pub index_version: IndexVersion,
|
||||
}
|
||||
|
||||
impl FileRef {
|
||||
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
|
||||
Self { region_id, file_id }
|
||||
pub fn new(region_id: RegionId, file_id: FileId, index_version: u64) -> Self {
|
||||
Self {
|
||||
region_id,
|
||||
file_id,
|
||||
index_version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user