refactor: use versioned index for index file (#7309)

* refactor: use versioned index for index file

Signed-off-by: discord9 <discord9@163.com>

* fix: sst entry table

Signed-off-by: discord9 <discord9@163.com>

* update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: unit type

Signed-off-by: discord9 <discord9@163.com>

* fix: missing version

Signed-off-by: discord9 <discord9@163.com>

* more fix build index

Signed-off-by: discord9 <discord9@163.com>

* fix: use proper index id

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* test: update

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* test: test_list_ssts fixed

Signed-off-by: discord9 <discord9@163.com>

* test: fix test

Signed-off-by: discord9 <discord9@163.com>

* feat: stuff

Signed-off-by: discord9 <discord9@163.com>

* fix: clean temp index file on abort&delete all index version when delete file

Signed-off-by: discord9 <discord9@163.com>

* docs: explain

Signed-off-by: discord9 <discord9@163.com>

* fix: actually clean up tmp dir

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* clean tmp dir only when write cache enabled

Signed-off-by: discord9 <discord9@163.com>

* refactor: add version to index cache

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* test: update size

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-09 15:31:12 +08:00
committed by GitHub
parent 36d89c3baf
commit 9197e818ec
47 changed files with 649 additions and 323 deletions

View File

@@ -163,7 +163,7 @@ impl ObjbenchCommand {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows,
num_row_groups,
sequence: None,

View File

@@ -119,7 +119,7 @@ mod tests {
.index_file_path
.map(|path| path.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
e.index_version = 0;
format!("\n{:?}", e)
})
.sorted()
@@ -128,12 +128,12 @@ mod tests {
assert_eq!(
debug_format,
r#"
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
);
// list from storage
let storage_entries = mito

View File

@@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, RegionFileId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
@@ -216,7 +216,7 @@ impl AccessLayer {
pub(crate) async fn delete_sst(
&self,
region_file_id: &RegionFileId,
index_file_id: &RegionFileId,
index_file_id: &RegionIndexId,
) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
self.object_store
@@ -226,13 +226,20 @@ impl AccessLayer {
file_id: region_file_id.file_id(),
})?;
let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_file_id.file_id(),
})?;
// Delete all versions of the index file.
for version in 0..=index_file_id.version {
let path = location::index_file_path(
&self.table_dir,
RegionIndexId::new(index_file_id.file_id, version),
self.path_type,
);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_file_id.file_id(),
})?;
}
Ok(())
}
@@ -291,6 +298,7 @@ impl AccessLayer {
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
write_cache_enabled: false,
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
@@ -468,9 +476,10 @@ impl TempFileCleaner {
}
/// Removes the SST and index file from the local atomic dir by the file id.
/// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0.
pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
}
@@ -553,9 +562,12 @@ async fn clean_dir(dir: &str) -> Result<()> {
/// Path provider for SST file and index file.
pub trait FilePathProvider: Send + Sync {
/// Creates index file path of given file id.
/// Creates index file path of given file id. Version default to 0, and not shown in the path.
fn build_index_file_path(&self, file_id: RegionFileId) -> String;
/// Creates index file path of given index id (with version support).
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
/// Creates SST file path of given file id.
fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
}
@@ -575,7 +587,16 @@ impl WriteCachePathProvider {
impl FilePathProvider for WriteCachePathProvider {
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
self.file_cache.cache_file_path(puffin_key)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
let puffin_key = IndexKey::new(
index_id.region_id(),
index_id.file_id(),
FileType::Puffin(index_id.version),
);
self.file_cache.cache_file_path(puffin_key)
}
@@ -605,7 +626,11 @@ impl RegionFilePathFactory {
impl FilePathProvider for RegionFilePathFactory {
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
location::index_file_path(&self.table_dir, file_id, self.path_type)
location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
location::index_file_path(&self.table_dir, index_id, self.path_type)
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {

View File

@@ -44,7 +44,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
@@ -180,7 +180,7 @@ impl CacheStrategy {
}
/// Calls [CacheManager::evict_puffin_cache()].
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.evict_puffin_cache(file_id).await
@@ -400,7 +400,7 @@ impl CacheManager {
}
/// Evicts every puffin-related cache entry for the given file.
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
if let Some(cache) = &self.bloom_filter_index_cache {
cache.invalidate_file(file_id.file_id());
}
@@ -422,7 +422,7 @@ impl CacheManager {
.remove(IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin,
FileType::Puffin(file_id.version),
))
.await;
}
@@ -949,7 +949,7 @@ mod tests {
let cache = Arc::new(cache);
let region_id = RegionId::new(1, 1);
let region_file_id = RegionFileId::new(region_id, FileId::random());
let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
let column_id: ColumnId = 1;
let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
@@ -957,16 +957,21 @@ mod tests {
let result_cache = cache.index_result_cache().unwrap();
let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
let bloom_key = (
index_id.file_id(),
index_id.version,
column_id,
Tag::Skipping,
);
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
(index_id.file_id(), index_id.version),
Arc::new(InvertedIndexMetas::default()),
);
let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
let selection = Arc::new(RowGroupSelection::default());
result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
let file_id_str = region_file_id.to_string();
result_cache.put(predicate.clone(), index_id.file_id(), selection);
let file_id_str = index_id.to_string();
let metadata = Arc::new(FileMetadata {
blobs: Vec::new(),
properties: HashMap::new(),
@@ -976,40 +981,32 @@ mod tests {
assert!(bloom_cache.get_metadata(bloom_key).is_some());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_some()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_some()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
cache.evict_puffin_cache(region_file_id).await;
cache.evict_puffin_cache(index_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_none()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
// Refill caches and evict via CacheStrategy to ensure delegation works.
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
(index_id.file_id(), index_id.version),
Arc::new(InvertedIndexMetas::default()),
);
result_cache.put(
predicate.clone(),
region_file_id.file_id(),
index_id.file_id(),
Arc::new(RowGroupSelection::default()),
);
puffin_metadata_cache.put_metadata(
@@ -1021,19 +1018,15 @@ mod tests {
);
let strategy = CacheStrategy::EnableAll(cache.clone());
strategy.evict_puffin_cache(region_file_id).await;
strategy.evict_puffin_cache(index_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_none()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
}
}

View File

@@ -71,7 +71,7 @@ impl FileCacheInner {
fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
match file_type {
FileType::Parquet => &self.parquet_index,
FileType::Puffin => &self.puffin_index,
FileType::Puffin { .. } => &self.puffin_index,
}
}
@@ -130,7 +130,7 @@ impl FileCacheInner {
// Track sizes separately for each file type
match key.file_type {
FileType::Parquet => parquet_size += size,
FileType::Puffin => puffin_size += size,
FileType::Puffin { .. } => puffin_size += size,
}
}
// The metrics is a signed int gauge so we can updates it finally.
@@ -178,7 +178,7 @@ impl FileCacheInner {
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
FileType::Puffin { .. } => "download_puffin",
}])
.start_timer();
@@ -607,7 +607,7 @@ impl fmt::Display for IndexKey {
"{}.{}.{}",
self.region_id.as_u64(),
self.file_id,
self.file_type.as_str()
self.file_type
)
}
}
@@ -618,7 +618,16 @@ pub enum FileType {
/// Parquet file.
Parquet,
/// Puffin file.
Puffin,
Puffin(u64),
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FileType::Parquet => write!(f, "parquet"),
FileType::Puffin(version) => write!(f, "{}.puffin", version),
}
}
}
impl FileType {
@@ -626,16 +635,16 @@ impl FileType {
fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin),
_ => None,
}
}
/// Converts the file type to string.
fn as_str(&self) -> &'static str {
match self {
FileType::Parquet => "parquet",
FileType::Puffin => "puffin",
"puffin" => Some(FileType::Puffin(0)),
_ => {
// if post-fix with .puffin, try to parse the version
if let Some(version_str) = s.strip_suffix(".puffin") {
let version = version_str.parse::<u64>().ok()?;
Some(FileType::Puffin(version))
} else {
None
}
}
}
}
@@ -643,7 +652,7 @@ impl FileType {
fn metric_label(&self) -> &'static str {
match self {
FileType::Parquet => FILE_TYPE,
FileType::Puffin => INDEX_TYPE,
FileType::Puffin(_) => INDEX_TYPE,
}
}
}
@@ -921,6 +930,15 @@ mod tests {
IndexKey::new(region_id, file_id, FileType::Parquet),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
);
assert_eq!(
IndexKey::new(region_id, file_id, FileType::Puffin(0)),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
);
assert_eq!(
IndexKey::new(region_id, file_id, FileType::Puffin(42)),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
.unwrap()
);
assert!(parse_index_key("").is_none());
assert!(parse_index_key(".").is_none());
assert!(parse_index_key("5299989643269").is_none());

View File

@@ -25,6 +25,7 @@ use store_api::storage::{ColumnId, FileId};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
@@ -35,8 +36,10 @@ pub enum Tag {
Fulltext,
}
pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag);
/// Cache for bloom filter index.
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
pub type BloomFilterIndexCache = IndexCache<BloomFilterIndexKey, BloomFilterMeta>;
pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
impl BloomFilterIndexCache {
@@ -59,11 +62,9 @@ impl BloomFilterIndexCache {
}
/// Calculates weight for bloom filter index metadata.
fn bloom_filter_index_metadata_weight(
k: &(FileId, ColumnId, Tag),
meta: &Arc<BloomFilterMeta>,
) -> u32 {
fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc<BloomFilterMeta>) -> u32 {
let base = k.0.as_bytes().len()
+ std::mem::size_of::<IndexVersion>()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
@@ -75,16 +76,14 @@ fn bloom_filter_index_metadata_weight(
}
/// Calculates weight for bloom filter index content.
fn bloom_filter_index_content_weight(
(k, _): &((FileId, ColumnId, Tag), PageKey),
v: &Bytes,
) -> u32 {
fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 {
(k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
}
/// Bloom filter index blob reader with cache.
pub struct CachedBloomFilterIndexBlobReader<R> {
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
tag: Tag,
blob_size: u64,
@@ -96,6 +95,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
/// Creates a new bloom filter index blob reader with cache.
pub fn new(
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
tag: Tag,
blob_size: u64,
@@ -104,6 +104,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
) -> Self {
Self {
file_id,
index_version,
column_id,
tag,
blob_size,
@@ -126,7 +127,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let (result, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
self.blob_size,
offset,
size,
@@ -161,7 +162,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let (page, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
self.blob_size,
range.start,
(range.end - range.start) as u32,
@@ -191,9 +192,9 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
if let Some(cached) = self
.cache
.get_metadata((self.file_id, self.column_id, self.tag))
if let Some(cached) =
self.cache
.get_metadata((self.file_id, self.index_version, self.column_id, self.tag))
{
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
@@ -203,7 +204,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
} else {
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
Arc::new(meta.clone()),
);
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
@@ -223,6 +224,7 @@ mod test {
#[test]
fn bloom_filter_metadata_weight_counts_vec_contents() {
let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let version = 0;
let column_id: ColumnId = 42;
let tag = Tag::Skipping;
@@ -246,10 +248,13 @@ mod test {
],
};
let weight =
bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
let weight = bloom_filter_index_metadata_weight(
&(file_id, version, column_id, tag),
&Arc::new(meta.clone()),
);
let base = file_id.as_bytes().len()
+ std::mem::size_of::<IndexVersion>()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();

View File

@@ -26,11 +26,12 @@ use store_api::storage::FileId;
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
/// Cache for inverted index.
pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>;
pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
impl InvertedIndexCache {
@@ -48,23 +49,24 @@ impl InvertedIndexCache {
/// Removes all cached entries for the given `file_id`.
pub fn invalidate_file(&self, file_id: FileId) {
self.invalidate_if(move |key| *key == file_id);
self.invalidate_if(move |key| key.0 == file_id);
}
}
/// Calculates weight for inverted index metadata.
fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
(k.as_bytes().len() + v.encoded_len()) as u32
fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc<InvertedIndexMetas>) -> u32 {
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.encoded_len()) as u32
}
/// Calculates weight for inverted index content.
fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
(k.as_bytes().len() + v.len()) as u32
fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 {
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.len()) as u32
}
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
index_version: IndexVersion,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
@@ -72,9 +74,16 @@ pub struct CachedInvertedIndexBlobReader<R> {
impl<R> CachedInvertedIndexBlobReader<R> {
/// Creates a new inverted index blob reader with cache.
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(
file_id: FileId,
index_version: IndexVersion,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
) -> Self {
Self {
file_id,
index_version,
blob_size,
inner,
cache,
@@ -96,7 +105,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let (result, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
(self.file_id, self.index_version),
self.blob_size,
offset,
size,
@@ -129,7 +138,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let (page, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
(self.file_id, self.index_version),
self.blob_size,
range.start,
(range.end - range.start) as u32,
@@ -156,7 +165,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) {
if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
@@ -164,7 +173,8 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
Ok(cached)
} else {
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(self.file_id, meta.clone());
self.cache
.put_metadata((self.file_id, self.index_version), meta.clone());
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta)
}
@@ -299,6 +309,7 @@ mod test {
// Init a test range reader in local fs.
let mut env = TestEnv::new().await;
let file_size = blob.len() as u64;
let index_version = 0;
let store = env.init_object_store_manager();
let temp_path = "data";
store.write(temp_path, blob).await.unwrap();
@@ -314,6 +325,7 @@ mod test {
let reader = InvertedIndexBlobReader::new(range_reader);
let cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
index_version,
file_size,
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
@@ -450,7 +462,7 @@ mod test {
let (read, _cache_metrics) = cached_reader
.cache
.get_or_load(
cached_reader.file_id,
(cached_reader.file_id, cached_reader.index_version),
file_size,
offset,
size,

View File

@@ -215,6 +215,7 @@ impl WriteCache {
puffin_manager: self
.puffin_manager_factory
.build(store.clone(), path_provider.clone()),
write_cache_enabled: true,
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -266,7 +267,7 @@ impl WriteCache {
upload_tracker.push_uploaded_file(parquet_path);
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
let puffin_path = upload_request
.dest_path_provider
.build_index_file_path(RegionFileId::new(region_id, sst.file_id));
@@ -439,7 +440,11 @@ impl UploadTracker {
file_cache.remove(parquet_key).await;
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
let puffin_key = IndexKey::new(
self.region_id,
sst.file_id,
FileType::Puffin(sst.index_metadata.version),
);
file_cache.remove(puffin_key).await;
}
}
@@ -548,7 +553,7 @@ mod tests {
assert_eq!(remote_data.to_vec(), cache_data.to_vec());
// Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
assert!(write_cache.file_cache.contains_key(&index_key));
let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();

View File

@@ -399,7 +399,7 @@ impl DefaultCompactor {
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,

View File

@@ -77,7 +77,7 @@ pub fn new_file_handle_with_size_and_sequence(
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -135,7 +135,7 @@ use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::{FileMeta, RegionFileId};
use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
@@ -541,22 +541,23 @@ impl MitoEngine {
return Vec::new();
};
let Some(index_file_id) = entry.index_file_id.as_ref() else {
return Vec::new();
};
let file_id = match FileId::parse_str(index_file_id) {
let index_version = entry.index_version;
let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
index_file_id
entry.file_id
);
return Vec::new();
}
};
let region_file_id = RegionFileId::new(entry.region_id, file_id);
let region_index_id = RegionIndexId::new(
RegionFileId::new(entry.region_id, file_id),
index_version,
);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
index_file_path: index_file_path.as_str(),
@@ -565,7 +566,7 @@ impl MitoEngine {
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: index_file_id,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};
@@ -576,7 +577,7 @@ impl MitoEngine {
collect_index_entries_from_puffin(
manager,
region_file_id,
region_index_id,
context,
bloom_filter_cache,
inverted_index_cache,

View File

@@ -861,9 +861,10 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
#[tokio::test]
async fn test_list_ssts() {
test_list_ssts_with_format(false, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -871,9 +872,10 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
test_list_ssts_with_format(true, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -945,13 +947,13 @@ async fn test_list_ssts_with_format(
.index_file_path
.map(|p| p.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
e.index_version = 0;
format!("\n{:?}", e)
})
.sorted()
.collect::<Vec<_>>()
.join("");
assert_eq!(debug_format, expected_manifest_ssts,);
assert_eq!(debug_format, expected_manifest_ssts, "{}", debug_format);
// list from storage
let storage_entries = engine
@@ -969,7 +971,7 @@ async fn test_list_ssts_with_format(
.sorted()
.collect::<Vec<_>>()
.join("");
assert_eq!(debug_format, expected_storage_ssts,);
assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format);
}
#[tokio::test]

View File

@@ -55,10 +55,10 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R
return 0;
}
let mut index_files_count: usize = 0;
for region_file_id in scanner.file_ids() {
for region_index_id in scanner.index_ids() {
let index_path = location::index_file_path(
access_layer.table_dir(),
region_file_id,
region_index_id,
access_layer.path_type(),
);
if access_layer

View File

@@ -32,7 +32,7 @@ use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
use crate::sst::index::fulltext_index::{
INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
@@ -66,14 +66,14 @@ pub(crate) struct IndexEntryContext<'a> {
/// Collect index metadata entries present in the SST puffin file.
pub(crate) async fn collect_index_entries_from_puffin(
manager: SstPuffinManager,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
context: IndexEntryContext<'_>,
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
inverted_index_cache: Option<InvertedIndexCacheRef>,
) -> Vec<PuffinIndexMetaEntry> {
let mut entries = Vec::new();
let reader = match manager.reader(&region_file_id).await {
let reader = match manager.reader(&region_index_id).await {
Ok(reader) => reader,
Err(err) => {
warn!(
@@ -104,7 +104,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
region_index_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
@@ -130,7 +130,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
region_index_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
@@ -172,7 +172,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::Inverted) => {
let mut inverted_entries = collect_inverted_entries(
&reader,
region_file_id,
region_index_id,
inverted_index_cache.as_ref(),
&context,
)
@@ -188,12 +188,12 @@ pub(crate) async fn collect_index_entries_from_puffin(
async fn collect_inverted_entries(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
cache: Option<&InvertedIndexCacheRef>,
context: &IndexEntryContext<'_>,
) -> Vec<PuffinIndexMetaEntry> {
// Read the inverted index blob and surface its per-column metadata entries.
let file_id = region_file_id.file_id();
let file_id = region_index_id.file_id();
let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
Ok(guard) => guard,
@@ -229,6 +229,7 @@ async fn collect_inverted_entries(
let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
let reader = CachedInvertedIndexBlobReader::new(
file_id,
region_index_id.version,
blob_size,
InvertedIndexBlobReader::new(blob_reader),
cache.clone(),
@@ -289,7 +290,7 @@ fn build_inverted_entries(
async fn try_read_bloom_meta(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
blob_type: &str,
target_key: &str,
cache: Option<&BloomFilterIndexCacheRef>,
@@ -311,7 +312,8 @@ async fn try_read_bloom_meta(
let result = match (cache, column_id, blob_size) {
(Some(cache), Some(column_id), Some(blob_size)) => {
CachedBloomFilterIndexBlobReader::new(
region_file_id.file_id(),
region_index_id.file_id(),
region_index_id.version,
column_id,
tag,
blob_size,

View File

@@ -643,7 +643,7 @@ impl RegionFlushTask {
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),

View File

@@ -330,10 +330,9 @@ impl LocalGcWorker {
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
// index file design is stable
let file_pairs: Vec<(FileId, FileId)> = unused_files
.iter()
.map(|file_id| (*file_id, *file_id))
.collect();
let file_pairs: Vec<(FileId, u64)> =
unused_files.iter().map(|file_id| (*file_id, 0)).collect();
// TODO(discord9): gc worker need another major refactor to support versioned index files
debug!(
"Found {} unused index files to delete for region {}",
@@ -354,7 +353,7 @@ impl LocalGcWorker {
Ok(unused_files)
}
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
delete_files(
region_id,
file_ids,

View File

@@ -247,7 +247,7 @@ async fn checkpoint_with_different_compression_types() {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -312,7 +312,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,

View File

@@ -135,6 +135,14 @@ impl Scanner {
}
}
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
Scanner::Series(series_scan) => series_scan.input().index_ids(),
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
@@ -1162,6 +1170,10 @@ impl ScanInput {
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
self.files.iter().map(|file| file.index_id()).collect()
}
}
fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {

View File

@@ -617,17 +617,16 @@ impl MitoRegion {
.map(|meta| {
let region_id = self.region_id;
let origin_region_id = meta.region_id;
let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
{
let index_file_path =
index_file_path(table_dir, meta.index_file_id(), path_type);
let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
(
Some(meta.index_file_id().file_id().to_string()),
meta.index_version,
Some(index_file_path),
Some(meta.index_file_size),
)
} else {
(None, None, None)
(0, None, None)
};
let visible = visible_ssts.contains(&meta.file_id);
ManifestSstEntry {
@@ -638,7 +637,7 @@ impl MitoRegion {
region_group: region_id.region_group(),
region_sequence: region_id.region_sequence(),
file_id: meta.file_id.to_string(),
index_file_id,
index_version,
level: meta.level,
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
file_size: meta.file_size,

View File

@@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
@@ -867,8 +867,8 @@ impl RegionLoadCacheTask {
if file_meta.exists_index() {
let puffin_key = IndexKey::new(
file_meta.region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
file_meta.file_id,
FileType::Puffin(file_meta.index_version),
);
if !file_cache.contains_key(&puffin_key) {
@@ -925,12 +925,18 @@ impl RegionLoadCacheTask {
break;
}
let index_remote_path = location::index_file_path(
table_dir,
let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
version
} else {
unreachable!("`files_to_download` should only contains Puffin files");
};
let index_id = RegionIndexId::new(
RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
path_type,
index_version,
);
let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
match file_cache
.download(puffin_key, &index_remote_path, object_store, file_size)
.await

View File

@@ -428,7 +428,7 @@ mod tests {
available_indexes: SmallVec::new(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 100,
num_row_groups: 1,
sequence: NonZeroU64::new(1),

View File

@@ -82,6 +82,8 @@ pub type Level = u8;
pub const MAX_LEVEL: Level = 2;
/// Type to store index types for a column.
pub type IndexTypes = SmallVec<[IndexType; 4]>;
/// Index version
pub type IndexVersion = u64;
/// Cross-region file id.
///
@@ -117,6 +119,41 @@ impl fmt::Display for RegionFileId {
}
}
/// Unique identifier for an index file, combining the SST file ID and the index version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RegionIndexId {
pub file_id: RegionFileId,
pub version: IndexVersion,
}
impl RegionIndexId {
pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
Self { file_id, version }
}
pub fn region_id(&self) -> RegionId {
self.file_id.region_id
}
pub fn file_id(&self) -> FileId {
self.file_id.file_id
}
}
impl fmt::Display for RegionIndexId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.version == 0 {
write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
} else {
write!(
f,
"{}/{}.{}",
self.file_id.region_id, self.file_id.file_id, self.version
)
}
}
}
/// Time range (min and max timestamps) of a SST file.
/// Both min and max are inclusive.
pub type FileTimeRange = (Timestamp, Timestamp);
@@ -159,12 +196,10 @@ pub struct FileMeta {
pub indexes: Vec<ColumnIndexMetadata>,
/// Size of the index file.
pub index_file_size: u64,
/// File ID of the index file.
///
/// When this field is None, it means the index file id is the same as the file id.
/// Only meaningful when index_file_size > 0.
/// Used for rebuilding index files.
pub index_file_id: Option<FileId>,
/// Version of the index file.
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
pub index_version: u64,
/// Number of rows in the file.
///
/// For historical reasons, this field might be missing in old files. Thus
@@ -332,14 +367,9 @@ impl FileMeta {
RegionFileId::new(self.region_id, self.file_id)
}
/// Returns the cross-region index file id.
/// If the index file id is not set, returns the file id.
pub fn index_file_id(&self) -> RegionFileId {
if let Some(index_file_id) = self.index_file_id {
RegionFileId::new(self.region_id, index_file_id)
} else {
self.file_id()
}
/// Returns the RegionIndexId for this file.
pub fn index_id(&self) -> RegionIndexId {
RegionIndexId::new(self.file_id(), self.index_version)
}
}
@@ -376,14 +406,9 @@ impl FileHandle {
RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
}
/// Returns the cross-region index file id.
/// If the index file id is not set, returns the file id.
pub fn index_file_id(&self) -> RegionFileId {
if let Some(index_file_id) = self.inner.meta.index_file_id {
RegionFileId::new(self.inner.meta.region_id, index_file_id)
} else {
self.file_id()
}
/// Returns the RegionIndexId for this file.
pub fn index_id(&self) -> RegionIndexId {
RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
}
/// Returns the complete file path of the file.
@@ -468,10 +493,15 @@ impl FileHandleInner {
}
}
/// Delete
/// Delete files for a region.
/// - `region_id`: Region id.
/// - `file_ids`: List of (file id, index version) tuples to delete.
/// - `delete_index`: Whether to delete the index file from the cache.
/// - `access_layer`: Access layer to delete files.
/// - `cache_manager`: Cache manager to remove files from cache.
pub async fn delete_files(
region_id: RegionId,
file_ids: &[(FileId, FileId)],
file_ids: &[(FileId, u64)],
delete_index: bool,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
@@ -484,12 +514,12 @@ pub async fn delete_files(
}
let mut deleted_files = Vec::with_capacity(file_ids.len());
for (file_id, index_file_id) in file_ids {
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
match access_layer
.delete_sst(
&RegionFileId::new(region_id, *file_id),
&RegionFileId::new(region_id, *index_file_id),
&region_file_id,
&RegionIndexId::new(region_file_id, *index_version),
)
.await
{
@@ -509,12 +539,16 @@ pub async fn delete_files(
deleted_files
);
for (file_id, index_file_id) in file_ids {
for (file_id, index_version) in file_ids {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
))
.await;
}
@@ -527,11 +561,14 @@ pub async fn delete_files(
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionFileId::new(region_id, *index_file_id))
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
index_file_id, region_id);
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
}
Ok(())
@@ -563,7 +600,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -614,7 +651,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,

View File

@@ -129,7 +129,7 @@ impl LocalFilePurger {
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = delete_files(
file_meta.region_id,
&[(file_meta.file_id, file_meta.index_file_id().file_id())],
&[(file_meta.file_id, file_meta.index_id().version)],
file_meta.exists_index(),
&sst_layer,
&cache_manager,
@@ -187,6 +187,7 @@ mod tests {
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{
ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
RegionIndexId,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -237,7 +238,7 @@ mod tests {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -263,6 +264,7 @@ mod tests {
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_file_id = RegionIndexId::new(sst_file_id, 0);
let sst_dir = "table1";
let index_aux_path = dir.path().join("index_aux");
@@ -285,7 +287,7 @@ mod tests {
let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
object_store.write(&path, vec![0; 4096]).await.unwrap();
let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
object_store
.write(&index_path, vec![0; 4096])
.await
@@ -309,7 +311,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_file_id: None,
index_version: 0,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),

View File

@@ -230,7 +230,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_file_id: None,
index_version: 0,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),

View File

@@ -61,7 +61,7 @@ use crate::request::{
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId,
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
@@ -81,6 +81,8 @@ pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
pub struct IndexOutput {
/// Size of the file.
pub file_size: u64,
/// Index version.
pub version: u64,
/// Inverted index output.
pub inverted_index: InvertedIndexOutput,
/// Fulltext index output.
@@ -163,7 +165,9 @@ pub type BloomFilterOutput = IndexBaseOutput;
pub struct Indexer {
file_id: FileId,
region_id: RegionId,
index_version: u64,
puffin_manager: Option<SstPuffinManager>,
write_cache_enabled: bool,
inverted_indexer: Option<InvertedIndexer>,
last_mem_inverted_index: usize,
fulltext_indexer: Option<FulltextIndexer>,
@@ -236,7 +240,7 @@ impl Indexer {
#[async_trait::async_trait]
pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId) -> Indexer;
async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
}
#[derive(Clone)]
pub(crate) struct IndexerBuilderImpl {
@@ -244,6 +248,7 @@ pub(crate) struct IndexerBuilderImpl {
pub(crate) metadata: RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) puffin_manager: SstPuffinManager,
pub(crate) write_cache_enabled: bool,
pub(crate) intermediate_manager: IntermediateManager,
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
@@ -254,10 +259,12 @@ pub(crate) struct IndexerBuilderImpl {
#[async_trait::async_trait]
impl IndexerBuilder for IndexerBuilderImpl {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
async fn build(&self, file_id: FileId) -> Indexer {
async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
let mut indexer = Indexer {
file_id,
region_id: self.metadata.region_id,
index_version,
write_cache_enabled: self.write_cache_enabled,
..Default::default()
};
@@ -611,13 +618,20 @@ impl IndexBuildTask {
&mut self,
version_control: VersionControlRef,
) -> Result<IndexBuildOutcome> {
let index_file_id = if self.file_meta.index_file_size > 0 {
// Generate new file ID if index file exists to avoid overwrite.
FileId::random()
// Determine the new index version
let new_index_version = if self.file_meta.index_file_size > 0 {
// Increment version if index file exists to avoid overwrite.
self.file_meta.index_version + 1
} else {
self.file_meta.file_id
0 // Default version for new index files
};
let mut indexer = self.indexer_builder.build(index_file_id).await;
// Use the same file_id but with new version for index file
let index_file_id = self.file_meta.file_id;
let mut indexer = self
.indexer_builder
.build(index_file_id, new_index_version)
.await;
// Check SST file existence before building index to avoid failure of parquet reader.
if !self.check_sst_file_exists(&version_control).await {
@@ -677,10 +691,10 @@ impl IndexBuildTask {
}
// Upload index file if write cache is enabled.
self.maybe_upload_index_file(index_output.clone(), index_file_id)
self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
.await?;
let worker_request = match self.update_manifest(index_output, index_file_id).await {
let worker_request = match self.update_manifest(index_output, new_index_version).await {
Ok(edit) => {
let index_build_finished = IndexBuildFinished {
region_id: self.file_meta.region_id,
@@ -712,6 +726,7 @@ impl IndexBuildTask {
&self,
output: IndexOutput,
index_file_id: FileId,
index_version: u64,
) -> Result<()> {
if let Some(write_cache) = &self.write_cache {
let file_id = self.file_meta.file_id;
@@ -719,12 +734,14 @@ impl IndexBuildTask {
let remote_store = self.access_layer.object_store();
let mut upload_tracker = UploadTracker::new(region_id);
let mut err = None;
let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
let puffin_key =
IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
let puffin_path = RegionFilePathFactory::new(
self.access_layer.table_dir().to_string(),
self.access_layer.path_type(),
)
.build_index_file_path(RegionFileId::new(region_id, file_id));
.build_index_file_path_with_version(index_id);
if let Err(e) = write_cache
.upload(puffin_key, &puffin_path, remote_store)
.await
@@ -756,12 +773,12 @@ impl IndexBuildTask {
async fn update_manifest(
&mut self,
output: IndexOutput,
index_file_id: FileId,
new_index_version: u64,
) -> Result<RegionEdit> {
self.file_meta.available_indexes = output.build_available_indexes();
self.file_meta.indexes = output.build_indexes();
self.file_meta.index_file_size = output.file_size;
self.file_meta.index_file_id = Some(index_file_id);
self.file_meta.index_version = new_index_version;
let edit = RegionEdit {
files_to_add: vec![self.file_meta.clone()],
files_to_remove: vec![],
@@ -1163,6 +1180,10 @@ mod tests {
unreachable!()
}
fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
unreachable!()
}
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
unreachable!()
}
@@ -1236,6 +1257,7 @@ mod tests {
metadata,
row_group_size: 1024,
puffin_manager,
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1260,13 +1282,14 @@ mod tests {
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1290,6 +1313,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig {
@@ -1299,7 +1323,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -1311,6 +1335,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1320,7 +1345,7 @@ mod tests {
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1332,6 +1357,7 @@ mod tests {
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1341,7 +1367,7 @@ mod tests {
..Default::default()
},
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1365,13 +1391,14 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -1388,13 +1415,14 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1411,13 +1439,14 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1441,13 +1470,14 @@ mod tests {
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -1619,7 +1649,7 @@ mod tests {
let puffin_path = location::index_file_path(
env.access_layer.table_dir(),
RegionFileId::new(region_id, file_meta.file_id),
RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
env.access_layer.path_type(),
);
@@ -1761,6 +1791,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: write_cache.build_puffin_manager().clone(),
write_cache_enabled: true,
intermediate_manager: write_cache.intermediate_manager().clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1812,7 +1843,11 @@ mod tests {
}
// The write cache should contain the uploaded index file.
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
let index_key = IndexKey::new(
region_id,
file_meta.file_id,
FileType::Puffin(sst_info.index_metadata.version),
);
assert!(write_cache.file_cache().contains_key(&index_key));
}

View File

@@ -44,7 +44,7 @@ use crate::error::{
Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
@@ -200,7 +200,7 @@ impl BloomFilterIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
@@ -242,6 +242,7 @@ impl BloomFilterIndexApplier {
}
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
file_id.version,
*column_id,
Tag::Skipping,
blob_size,
@@ -286,7 +287,7 @@ impl BloomFilterIndexApplier {
/// Returus `None` if the column does not have an index.
async fn blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
file_size_hint: Option<u64>,
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
@@ -328,7 +329,7 @@ impl BloomFilterIndexApplier {
/// Creates a blob reader from the cached index file
async fn cached_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
@@ -336,7 +337,11 @@ impl BloomFilterIndexApplier {
return Ok(None);
};
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
@@ -369,7 +374,7 @@ impl BloomFilterIndexApplier {
/// Creates a blob reader from the remote index file
async fn remote_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
@@ -446,6 +451,7 @@ mod tests {
use store_api::storage::FileId;
use super::*;
use crate::sst::file::RegionFileId;
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
use crate::sst::index::bloom_filter::creator::tests::{
mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
@@ -457,7 +463,7 @@ mod tests {
object_store: ObjectStore,
metadata: &RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
file_id: RegionFileId,
file_id: RegionIndexId,
) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
+ use<'_> {
move |exprs, row_groups| {
@@ -514,6 +520,7 @@ mod tests {
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
let memory_usage_threshold = Some(1024);
let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
let file_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let mut indexer = BloomFilterIndexer::new(

View File

@@ -481,7 +481,7 @@ pub(crate) mod tests {
use super::*;
use crate::access_layer::FilePathProvider;
use crate::read::BatchColumn;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub fn mock_object_store() -> ObjectStore {
@@ -499,6 +499,10 @@ pub(crate) mod tests {
file_id.file_id().to_string()
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
index_id.file_id.file_id().to_string()
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
file_id.file_id().to_string()
}
@@ -621,6 +625,7 @@ pub(crate) mod tests {
let puffin_manager = factory.build(object_store, TestPathProvider);
let file_id = RegionFileId::new(region_metadata.region_id, file_id);
let file_id = RegionIndexId::new(file_id, 0);
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
assert_eq!(row_count, 20);

View File

@@ -44,7 +44,7 @@ use crate::error::{
PuffinReadBlobSnafu, Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
@@ -221,7 +221,7 @@ impl FulltextIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_fine(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> {
@@ -275,7 +275,7 @@ impl FulltextIndexApplier {
async fn apply_fine_one_column(
&self,
file_size_hint: Option<u64>,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
request: &FulltextRequest,
metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -356,7 +356,7 @@ impl FulltextIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_coarse(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -405,7 +405,7 @@ impl FulltextIndexApplier {
async fn apply_coarse_one_column(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
column_id: ColumnId,
terms: &[FulltextTerm],
@@ -440,6 +440,7 @@ impl FulltextIndexApplier {
.content_length;
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
file_id.version,
column_id,
Tag::Fulltext,
blob_size,
@@ -611,7 +612,7 @@ impl IndexSource {
/// Returns `None` if the blob is not found.
async fn blob(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
key: &str,
file_size_hint: Option<u64>,
metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -649,7 +650,7 @@ impl IndexSource {
/// Returns `None` if the directory is not found.
async fn dir(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
key: &str,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -699,7 +700,7 @@ impl IndexSource {
/// Return reader and whether it is fallbacked to remote store.
async fn ensure_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<(SstPuffinReader, bool)> {
match self.build_local_cache(file_id, file_size_hint).await {
@@ -711,14 +712,18 @@ impl IndexSource {
async fn build_local_cache(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
@@ -740,7 +745,7 @@ impl IndexSource {
async fn build_remote(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<SstPuffinReader> {
let puffin_manager = self

View File

@@ -481,7 +481,7 @@ mod tests {
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::fulltext_index::applier::builder::{
FulltextQuery, FulltextRequest, FulltextTerm,
@@ -672,7 +672,8 @@ mod tests {
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
let mut writer = puffin_manager.writer(&region_file_id).await.unwrap();
let index_id = RegionIndexId::new(region_file_id, 0);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
let _ = indexer.finish(&mut writer).await.unwrap();
writer.finish().await.unwrap();
@@ -723,16 +724,15 @@ mod tests {
let backend = backend.clone();
async move {
match backend {
FulltextBackend::Tantivy => applier
.apply_fine(region_file_id, None, None)
.await
.unwrap(),
FulltextBackend::Tantivy => {
applier.apply_fine(index_id, None, None).await.unwrap()
}
FulltextBackend::Bloom => {
let coarse_mask = coarse_mask.unwrap_or_default();
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
// row group id == row id
let resp = applier
.apply_coarse(region_file_id, None, row_groups, None)
.apply_coarse(index_id, None, row_groups, None)
.await
.unwrap();
resp.map(|r| {

View File

@@ -14,6 +14,8 @@
use common_telemetry::warn;
use crate::access_layer::TempFileCleaner;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::Indexer;
impl Indexer {
@@ -22,6 +24,9 @@ impl Indexer {
self.do_abort_fulltext_index().await;
self.do_abort_bloom_filter().await;
self.do_prune_intm_sst_dir().await;
if self.write_cache_enabled {
self.do_abort_clean_fs_temp_dir().await;
}
self.puffin_manager = None;
}
@@ -87,4 +92,18 @@ impl Indexer {
);
}
}
async fn do_abort_clean_fs_temp_dir(&mut self) {
let Some(puffin_manager) = &self.puffin_manager else {
return;
};
let fs_accessor = puffin_manager.file_accessor();
let fs_handle = RegionIndexId::new(
RegionFileId::new(self.region_id, self.file_id),
self.index_version,
)
.to_string();
TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
}
}

View File

@@ -16,7 +16,7 @@ use common_telemetry::{debug, warn};
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use store_api::storage::ColumnId;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{
@@ -56,14 +56,18 @@ impl Indexer {
self.do_prune_intm_sst_dir().await;
output.file_size = self.do_finish_puffin_writer(writer).await;
output.version = self.index_version;
output
}
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
let puffin_manager = self.puffin_manager.take()?;
let puffin_manager = self.puffin_manager.clone()?;
let err = match puffin_manager
.writer(&RegionFileId::new(self.region_id, self.file_id))
.writer(&RegionIndexId::new(
RegionFileId::new(self.region_id, self.file_id),
self.index_version,
))
.await
{
Ok(writer) => return Some(writer),

View File

@@ -40,7 +40,7 @@ use crate::error::{
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
@@ -194,7 +194,7 @@ impl InvertedIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
@@ -222,6 +222,7 @@ impl InvertedIndexApplier {
let result = if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id.file_id(),
file_id.version,
blob_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
@@ -268,14 +269,18 @@ impl InvertedIndexApplier {
/// Creates a blob reader from the cached index file.
async fn cached_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
@@ -303,7 +308,7 @@ impl InvertedIndexApplier {
/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let puffin_manager = self
@@ -349,6 +354,7 @@ mod tests {
use store_api::storage::FileId;
use super::*;
use crate::sst::index::RegionFileId;
#[tokio::test]
async fn test_index_applier_apply_basic() {
@@ -356,13 +362,14 @@ mod tests {
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
writer
.put_blob(
INDEX_BLOB_TYPE,
@@ -392,7 +399,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
@@ -410,13 +417,14 @@ mod tests {
.await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
writer
.put_blob(
"invalid_blob_type",
@@ -440,7 +448,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let res = sst_index_applier.apply(file_id, None, None).await;
let res = sst_index_applier.apply(index_id, None, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -466,7 +466,7 @@ mod tests {
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -591,7 +591,8 @@ mod tests {
);
let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
let index_id = RegionIndexId::new(sst_file_id, 0);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
assert_eq!(row_count, rows.len() * segment_row_count);
writer.finish().await.unwrap();
@@ -615,7 +616,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id, None, None)
.apply(index_id, None, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -32,14 +32,14 @@ use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, StagerMetrics,
};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager<RegionFileId>>, ObjectStorePuffinFileAccessor>;
FsPuffinManager<Arc<BoundedStager<RegionIndexId>>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
@@ -52,7 +52,7 @@ const STAGING_DIR: &str = "staging";
#[derive(Clone)]
pub struct PuffinManagerFactory {
/// The stager used by the puffin manager.
stager: Arc<BoundedStager<RegionFileId>>,
stager: Arc<BoundedStager<RegionIndexId>>,
/// The size of the write buffer used to create object store.
write_buffer_size: Option<usize>,
@@ -92,7 +92,7 @@ impl PuffinManagerFactory {
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
pub(crate) async fn purge_stager(&self, file_id: RegionFileId) -> Result<()> {
pub(crate) async fn purge_stager(&self, file_id: RegionIndexId) -> Result<()> {
self.stager
.purge(&file_id)
.await
@@ -136,16 +136,22 @@ impl ObjectStorePuffinFileAccessor {
path_provider,
}
}
pub fn store(&self) -> &InstrumentedStore {
&self.object_store
}
}
#[async_trait]
impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
type Reader = InstrumentedRangeReader;
type Writer = InstrumentedAsyncWrite;
type FileHandle = RegionFileId;
type FileHandle = RegionIndexId;
async fn reader(&self, handle: &RegionFileId) -> PuffinResult<Self::Reader> {
let file_path = self.path_provider.build_index_file_path(*handle);
async fn reader(&self, handle: &RegionIndexId) -> PuffinResult<Self::Reader> {
let file_path = self
.path_provider
.build_index_file_path_with_version(*handle);
self.object_store
.range_reader(
&file_path,
@@ -157,8 +163,10 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
.context(puffin_error::ExternalSnafu)
}
async fn writer(&self, handle: &RegionFileId) -> PuffinResult<Self::Writer> {
let file_path = self.path_provider.build_index_file_path(*handle);
async fn writer(&self, handle: &RegionIndexId) -> PuffinResult<Self::Writer> {
let file_path = self
.path_provider
.build_index_file_path_with_version(*handle);
self.object_store
.writer(
&file_path,
@@ -184,7 +192,7 @@ mod tests {
use store_api::storage::FileId;
use super::*;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
struct TestFilePathProvider;
@@ -193,6 +201,10 @@ mod tests {
file_id.file_id().to_string()
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
index_id.file_id.file_id().to_string()
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
file_id.file_id().to_string()
}
@@ -206,7 +218,7 @@ mod tests {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let manager = factory.build(object_store, TestFilePathProvider);
let file_id = RegionFileId::new(0.into(), FileId::random());
let file_id = RegionIndexId::new(RegionFileId::new(0.into(), FileId::random()), 0);
let blob_key = "blob-key";
let dir_key = "dir-key";
let raw_data = b"hello world!";

View File

@@ -49,6 +49,10 @@ impl InstrumentedStore {
}
}
pub fn store(&self) -> &ObjectStore {
&self.object_store
}
/// Set the size of the write buffer.
pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);

View File

@@ -20,7 +20,7 @@ use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use crate::error::UnexpectedSnafu;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
/// Generate region dir from table_dir, region_id and path_type
pub fn region_dir_from_table_dir(
@@ -46,14 +46,68 @@ pub fn sst_file_path(table_dir: &str, region_file_id: RegionFileId, path_type: P
)
}
pub fn index_file_path(
pub fn index_file_path(table_dir: &str, index_id: RegionIndexId, path_type: PathType) -> String {
let region_dir = region_dir_from_table_dir(table_dir, index_id.file_id.region_id(), path_type);
let index_dir = util::join_dir(&region_dir, "index");
let filename = if index_id.version == 0 {
format!("{}.puffin", index_id.file_id.file_id())
} else {
format!("{}.{}.puffin", index_id.file_id.file_id(), index_id.version)
};
util::join_path(&index_dir, &filename)
}
/// Legacy function for backward compatibility - creates index file path using RegionFileId with version 0
pub fn index_file_path_legacy(
table_dir: &str,
region_file_id: RegionFileId,
path_type: PathType,
) -> String {
let region_dir = region_dir_from_table_dir(table_dir, region_file_id.region_id(), path_type);
let index_dir = util::join_dir(&region_dir, "index");
util::join_path(&index_dir, &format!("{}.puffin", region_file_id.file_id()))
let index_id = RegionIndexId::new(region_file_id, 0);
index_file_path(table_dir, index_id, path_type)
}
/// Parse file ID and version from index filename
pub fn parse_index_file_info(filepath: &str) -> crate::error::Result<(FileId, u64)> {
let filename = filepath.rsplit('/').next().context(UnexpectedSnafu {
reason: format!("invalid file path: {}", filepath),
})?;
let parts: Vec<&str> = filename.split('.').collect();
if parts.len() == 2 && parts[1] == "puffin" {
// Legacy format: {file_id}.puffin (version 0)
let file_id = parts[0];
FileId::parse_str(file_id).map(|id| (id, 0)).map_err(|e| {
UnexpectedSnafu {
reason: format!("invalid file id: {}, err: {}", file_id, e),
}
.build()
})
} else if parts.len() == 3 && parts[2] == "puffin" {
// New format: {file_id}.{version}.puffin
let file_id = parts[0];
let version = parts[1].parse::<u64>().map_err(|_| {
UnexpectedSnafu {
reason: format!("invalid version in file name: {}", filename),
}
.build()
})?;
FileId::parse_str(file_id)
.map(|id| (id, version))
.map_err(|e| {
UnexpectedSnafu {
reason: format!("invalid file id: {}, err: {}", file_id, e),
}
.build()
})
} else {
UnexpectedSnafu {
reason: format!("invalid index file name: {}", filename),
}
.fail()
}
}
/// Get RegionFileId from sst or index filename
@@ -111,17 +165,59 @@ mod tests {
fn test_index_file_path() {
let file_id = FileId::random();
let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id);
let index_id = RegionIndexId::new(region_file_id, 0);
assert_eq!(
index_file_path("table_dir", region_file_id, PathType::Bare),
index_file_path("table_dir", index_id, PathType::Bare),
format!("table_dir/1_0000000002/index/{}.puffin", file_id)
);
assert_eq!(
index_file_path("table_dir", region_file_id, PathType::Data),
index_file_path("table_dir", index_id, PathType::Data),
format!("table_dir/1_0000000002/data/index/{}.puffin", file_id)
);
assert_eq!(
index_file_path("table_dir", region_file_id, PathType::Metadata),
index_file_path("table_dir", index_id, PathType::Metadata),
format!("table_dir/1_0000000002/metadata/index/{}.puffin", file_id)
);
}
#[test]
fn test_index_file_path_versioned() {
let file_id = FileId::random();
let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id);
let index_id_v1 = RegionIndexId::new(region_file_id, 1);
let index_id_v2 = RegionIndexId::new(region_file_id, 2);
assert_eq!(
index_file_path("table_dir", index_id_v1, PathType::Bare),
format!("table_dir/1_0000000002/index/{}.1.puffin", file_id)
);
assert_eq!(
index_file_path("table_dir", index_id_v2, PathType::Bare),
format!("table_dir/1_0000000002/index/{}.2.puffin", file_id)
);
}
#[test]
fn test_parse_index_file_info() {
// Test legacy format
let file_id = FileId::random();
let result =
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.puffin"))
.unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 0);
// Test versioned format
let result =
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.1.puffin"))
.unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 1);
let result =
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.42.puffin"))
.unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 42);
}
}

View File

@@ -117,7 +117,7 @@ mod tests {
use crate::config::IndexConfig;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
@@ -144,7 +144,11 @@ mod tests {
impl FilePathProvider for FixedPathProvider {
fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
location::index_file_path(FILE_DIR, index_id, PathType::Bare)
}
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
@@ -156,7 +160,7 @@ mod tests {
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId) -> Indexer {
async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
Indexer::default()
}
}
@@ -711,6 +715,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size,
puffin_manager,
write_cache_enabled: false,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
@@ -769,7 +774,7 @@ mod tests {
available_indexes: info.index_metadata.build_available_indexes(),
indexes: info.index_metadata.build_indexes(),
index_file_size: info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_row_groups: info.num_row_groups,
num_rows: info.num_rows as u64,
sequence: None,
@@ -1090,6 +1095,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size,
puffin_manager,
write_cache_enabled: false,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {

View File

@@ -558,7 +558,7 @@ impl ParquetReaderBuilder {
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
metrics.fulltext_index_apply_metrics.as_mut(),
)
@@ -630,7 +630,7 @@ impl ParquetReaderBuilder {
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
metrics.inverted_index_apply_metrics.as_mut(),
)
@@ -709,7 +709,7 @@ impl ParquetReaderBuilder {
});
let apply_res = index_applier
.apply(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
@@ -792,7 +792,7 @@ impl ParquetReaderBuilder {
});
let apply_res = index_applier
.apply_coarse(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
rgs,
metrics.fulltext_index_apply_metrics.as_mut(),

View File

@@ -153,7 +153,7 @@ where
metrics: &'a mut Metrics,
) -> ParquetWriter<'a, F, I, P> {
let init_file = FileId::random();
let indexer = indexer_builder.build(init_file).await;
let indexer = indexer_builder.build(init_file, 0).await;
ParquetWriter {
path_provider,
@@ -482,7 +482,7 @@ where
.context(WriteParquetSnafu)?;
self.writer = Some(arrow_writer);
let indexer = self.indexer_builder.build(self.current_file).await;
let indexer = self.indexer_builder.build(self.current_file, 0).await;
self.current_indexer = Some(indexer);
// safety: self.writer is assigned above

View File

@@ -126,7 +126,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -104,7 +104,7 @@ impl VersionControlBuilder {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,
@@ -195,7 +195,7 @@ pub(crate) fn apply_edit(
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -481,12 +481,12 @@ async fn edit_region(
let index_file_index_key = IndexKey::new(
region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
file_meta.index_id().file_id.file_id(),
FileType::Puffin(file_meta.index_version),
);
let index_remote_path = location::index_file_path(
layer.table_dir(),
file_meta.file_id(),
file_meta.index_id(),
layer.path_type(),
);

View File

@@ -28,7 +28,7 @@ use crate::region::MitoRegionRef;
use crate::request::{
BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
};
use crate::sst::file::{FileHandle, RegionFileId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::index::{
IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
};
@@ -68,6 +68,7 @@ impl<S> RegionWorkerLoop<S> {
row_group_size: WriteOptions::default().row_group_size,
intermediate_manager,
puffin_manager,
write_cache_enabled: self.cache_manager.write_cache().is_some(),
});
IndexBuildTask {
@@ -216,7 +217,8 @@ impl<S> RegionWorkerLoop<S> {
let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
for file_meta in &request.edit.files_to_add {
let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
cache_strategy.evict_puffin_cache(region_file_id).await;
let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
cache_strategy.evict_puffin_cache(index_id).await;
}
region.version_control.apply_edit(

View File

@@ -56,6 +56,10 @@ impl<S, F> FsPuffinManager<S, F> {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub fn file_accessor(&self) -> &F {
&self.puffin_file_accessor
}
}
#[async_trait]

View File

@@ -47,8 +47,8 @@ pub struct ManifestSstEntry {
pub region_sequence: RegionSeq,
/// Engine-specific file identifier (string form).
pub file_id: String,
/// Engine-specific index file identifier (string form).
pub index_file_id: Option<String>,
/// Index version, increment when the index file is rebuilt.
pub index_version: u64,
/// SST level.
pub level: u8,
/// Full path of the SST file in object store.
@@ -91,7 +91,7 @@ impl ManifestSstEntry {
ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
ColumnSchema::new("file_id", Ty::string_datatype(), false),
ColumnSchema::new("index_file_id", Ty::string_datatype(), true),
ColumnSchema::new("index_version", Ty::uint64_datatype(), false),
ColumnSchema::new("level", Ty::uint8_datatype(), false),
ColumnSchema::new("file_path", Ty::string_datatype(), false),
ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
@@ -119,7 +119,7 @@ impl ManifestSstEntry {
let region_groups = entries.iter().map(|e| e.region_group);
let region_sequences = entries.iter().map(|e| e.region_sequence);
let file_ids = entries.iter().map(|e| e.file_id.as_str());
let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref());
let index_versions = entries.iter().map(|e| e.index_version);
let levels = entries.iter().map(|e| e.level);
let file_paths = entries.iter().map(|e| e.file_path.as_str());
let file_sizes = entries.iter().map(|e| e.file_size);
@@ -151,7 +151,7 @@ impl ManifestSstEntry {
Arc::new(UInt8Array::from_iter_values(region_groups)),
Arc::new(UInt32Array::from_iter_values(region_sequences)),
Arc::new(StringArray::from_iter_values(file_ids)),
Arc::new(StringArray::from_iter(index_file_ids)),
Arc::new(UInt64Array::from_iter(index_versions)),
Arc::new(UInt8Array::from_iter_values(levels)),
Arc::new(StringArray::from_iter_values(file_paths)),
Arc::new(UInt64Array::from_iter_values(file_sizes)),
@@ -437,7 +437,7 @@ mod tests {
region_group: region_group1,
region_sequence: region_seq1,
file_id: "f1".to_string(),
index_file_id: None,
index_version: 0,
level: 1,
file_path: "/p1".to_string(),
file_size: 100,
@@ -461,7 +461,7 @@ mod tests {
region_group: region_group2,
region_sequence: region_seq2,
file_id: "f2".to_string(),
index_file_id: Some("idx".to_string()),
index_version: 1,
level: 3,
file_path: "/p2".to_string(),
file_size: 200,
@@ -548,13 +548,13 @@ mod tests {
assert_eq!("f1", file_ids.value(0));
assert_eq!("f2", file_ids.value(1));
let index_file_ids = batch
let index_versions = batch
.column(7)
.as_any()
.downcast_ref::<StringArray>()
.downcast_ref::<UInt64Array>()
.unwrap();
assert!(index_file_ids.is_null(0));
assert_eq!("idx", index_file_ids.value(1));
assert_eq!(0, index_versions.value(0));
assert_eq!(1, index_versions.value(1));
let levels = batch
.column(8)

View File

@@ -10,7 +10,7 @@ DESC TABLE information_schema.ssts_manifest;
| region_group | UInt8 | | NO | | FIELD |
| region_sequence | UInt32 | | NO | | FIELD |
| file_id | String | | NO | | FIELD |
| index_file_id | String | | YES | | FIELD |
| index_version | UInt64 | | NO | | FIELD |
| level | UInt8 | | NO | | FIELD |
| file_path | String | | NO | | FIELD |
| file_size | UInt64 | | NO | | FIELD |
@@ -97,13 +97,13 @@ ADMIN FLUSH_TABLE('sst_case');
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
SELECT * FROM information_schema.ssts_manifest order by file_path;
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
@@ -165,15 +165,15 @@ ADMIN FLUSH_TABLE('sst_case');
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
SELECT * FROM information_schema.ssts_manifest order by file_path;
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>

View File

@@ -400,9 +400,9 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | ssts_manifest | file_id | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | ssts_manifest | file_path | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | ssts_manifest | file_size | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | ssts_manifest | index_file_id | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | ssts_manifest | index_file_path | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | ssts_manifest | index_file_size | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | ssts_manifest | index_version | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | ssts_manifest | level | 9 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
| greptime | information_schema | ssts_manifest | max_ts | 18 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
| greptime | information_schema | ssts_manifest | min_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |