Compare commits

...

14 Commits

Author SHA1 Message Date
discord9
122dfbf9f8 reviewing tests
Signed-off-by: discord9 <discord9@163.com>
2025-12-10 14:23:33 +08:00
discord9
3c7fa84442 chore
Signed-off-by: discord9 <discord9@163.com>
2025-12-10 14:17:51 +08:00
discord9
0cd368d1f6 feat: track index?
Signed-off-by: discord9 <discord9@163.com>
2025-12-10 14:09:10 +08:00
discord9
c82208dbc0 chore: unused
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
bbbe91e97a feat: better method
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
987e1b5a15 noop
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
9cac640b41 feat: delete index files properly
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
9197e818ec refactor: use versioned index for index file (#7309)
* refactor: use versioned index for index file

Signed-off-by: discord9 <discord9@163.com>

* fix: sst entry table

Signed-off-by: discord9 <discord9@163.com>

* update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: unit type

Signed-off-by: discord9 <discord9@163.com>

* fix: missing version

Signed-off-by: discord9 <discord9@163.com>

* more fix build index

Signed-off-by: discord9 <discord9@163.com>

* fix: use proper index id

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* test: update

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* test: test_list_ssts fixed

Signed-off-by: discord9 <discord9@163.com>

* test: fix test

Signed-off-by: discord9 <discord9@163.com>

* feat: stuff

Signed-off-by: discord9 <discord9@163.com>

* fix: clean temp index file on abort&delete all index version when delete file

Signed-off-by: discord9 <discord9@163.com>

* docs: explain

Signed-off-by: discord9 <discord9@163.com>

* fix: actually clean up tmp dir

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* clean tmp dir only when write cache enabled

Signed-off-by: discord9 <discord9@163.com>

* refactor: add version to index cache

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* test: update size

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-12-09 07:31:12 +00:00
discord9
36d89c3baf fix: use saturating in gc tracker (#7369)
chore: use saturating

Signed-off-by: discord9 <discord9@163.com>
2025-12-09 06:38:59 +00:00
Ruihang Xia
0ebfd161d8 feat: allow publishing new nightly release when some platforms are absent (#7354)
* feat: allow publishing new nightly release when some platforms are absent

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unify linux platforms

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* always evaluate conditions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-09 04:59:50 +00:00
ZonaHe
8b26a98c3b feat: update dashboard to v0.11.9 (#7364)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-12-09 02:37:44 +00:00
discord9
7199823be9 chore: rename to avoid git reserved name (#7359)
rename to avoid reserved name

Signed-off-by: discord9 <discord9@163.com>
2025-12-08 04:01:25 +00:00
Ruihang Xia
60f752d306 feat: run histogram quantile in safe mode for incomplete data (#7297)
* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test and fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* correct sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refine code and comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-05 09:19:21 +00:00
Ruihang Xia
edb1f6086f feat: decode pk eagerly (#7350)
* feat: decode pk eagerly

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* merge primary_key_codec and decode_primary_key_values

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-05 09:11:51 +00:00
59 changed files with 2414 additions and 448 deletions

View File

@@ -49,14 +49,9 @@ on:
description: Do not run integration tests during the build
type: boolean
default: true
build_linux_amd64_artifacts:
build_linux_artifacts:
type: boolean
description: Build linux-amd64 artifacts
required: false
default: false
build_linux_arm64_artifacts:
type: boolean
description: Build linux-arm64 artifacts
description: Build linux artifacts (both amd64 and arm64)
required: false
default: false
build_macos_artifacts:
@@ -144,7 +139,7 @@ jobs:
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
- name: Allocate linux-amd64 runner
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
id: start-linux-amd64-runner
with:
@@ -158,7 +153,7 @@ jobs:
subnet-id: ${{ vars.EC2_RUNNER_SUBNET_ID }}
- name: Allocate linux-arm64 runner
if: ${{ inputs.build_linux_arm64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
id: start-linux-arm64-runner
with:
@@ -173,7 +168,7 @@ jobs:
build-linux-amd64-artifacts:
name: Build linux-amd64 artifacts
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
allocate-runners,
]
@@ -195,7 +190,7 @@ jobs:
build-linux-arm64-artifacts:
name: Build linux-arm64 artifacts
if: ${{ inputs.build_linux_arm64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
allocate-runners,
]
@@ -217,7 +212,7 @@ jobs:
run-multi-lang-tests:
name: Run Multi-language SDK Tests
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -386,7 +381,18 @@ jobs:
publish-github-release:
name: Create GitHub release and upload artifacts
if: ${{ inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule' }}
# Use always() to run even when optional jobs (macos, windows) are skipped.
# Then check that required jobs succeeded and optional jobs didn't fail.
if: |
always() &&
(inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule') &&
needs.allocate-runners.result == 'success' &&
(needs.build-linux-amd64-artifacts.result == 'success' || needs.build-linux-amd64-artifacts.result == 'skipped') &&
(needs.build-linux-arm64-artifacts.result == 'success' || needs.build-linux-arm64-artifacts.result == 'skipped') &&
(needs.build-macos-artifacts.result == 'success' || needs.build-macos-artifacts.result == 'skipped') &&
(needs.build-windows-artifacts.result == 'success' || needs.build-windows-artifacts.result == 'skipped') &&
(needs.release-images-to-dockerhub.result == 'success' || needs.release-images-to-dockerhub.result == 'skipped') &&
(needs.run-multi-lang-tests.result == 'success' || needs.run-multi-lang-tests.result == 'skipped')
needs: [ # The job have to wait for all the artifacts are built.
allocate-runners,
build-linux-amd64-artifacts,

View File

@@ -163,7 +163,7 @@ impl ObjbenchCommand {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows,
num_row_groups,
sequence: None,
@@ -565,6 +565,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
fn update_index(&self, _file_meta: FileMeta, _version: store_api::storage::IndexVersion) {}
}
Arc::new(Noop)
}

View File

@@ -14,7 +14,7 @@
mod basic;
mod candidate_select;
mod con;
mod concurrent;
mod config;
mod err_handle;
mod full_list;

View File

@@ -50,7 +50,7 @@ impl GcScheduler {
let now = Instant::now();
// Check if enough time has passed since last cleanup
if now.duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
if now.saturating_duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
return Ok(());
}

View File

@@ -119,7 +119,7 @@ mod tests {
.index_file_path
.map(|path| path.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
e.index_version = 0;
format!("\n{:?}", e)
})
.sorted()
@@ -128,12 +128,12 @@ mod tests {
assert_eq!(
debug_format,
r#"
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
);
// list from storage
let storage_entries = mito

View File

@@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, RegionFileId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
@@ -216,7 +216,7 @@ impl AccessLayer {
pub(crate) async fn delete_sst(
&self,
region_file_id: &RegionFileId,
index_file_id: &RegionFileId,
index_file_id: &RegionIndexId,
) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
self.object_store
@@ -226,12 +226,22 @@ impl AccessLayer {
file_id: region_file_id.file_id(),
})?;
let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type);
// Delete all versions of the index file.
for version in 0..=index_file_id.version {
self.delete_index(&RegionIndexId::new(index_file_id.file_id, version))
.await?;
}
Ok(())
}
pub(crate) async fn delete_index(&self, region_index_id: &RegionIndexId) -> Result<()> {
let path = location::index_file_path(&self.table_dir, *region_index_id, self.path_type);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_file_id.file_id(),
file_id: region_index_id.file_id(),
})?;
Ok(())
@@ -291,6 +301,7 @@ impl AccessLayer {
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
write_cache_enabled: false,
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
@@ -468,9 +479,10 @@ impl TempFileCleaner {
}
/// Removes the SST and index file from the local atomic dir by the file id.
/// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0.
pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
}
@@ -553,9 +565,12 @@ async fn clean_dir(dir: &str) -> Result<()> {
/// Path provider for SST file and index file.
pub trait FilePathProvider: Send + Sync {
/// Creates index file path of given file id.
/// Creates index file path of given file id. Version default to 0, and not shown in the path.
fn build_index_file_path(&self, file_id: RegionFileId) -> String;
/// Creates index file path of given index id (with version support).
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
/// Creates SST file path of given file id.
fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
}
@@ -575,7 +590,16 @@ impl WriteCachePathProvider {
impl FilePathProvider for WriteCachePathProvider {
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
self.file_cache.cache_file_path(puffin_key)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
let puffin_key = IndexKey::new(
index_id.region_id(),
index_id.file_id(),
FileType::Puffin(index_id.version),
);
self.file_cache.cache_file_path(puffin_key)
}
@@ -605,7 +629,11 @@ impl RegionFilePathFactory {
impl FilePathProvider for RegionFilePathFactory {
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
location::index_file_path(&self.table_dir, file_id, self.path_type)
location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
location::index_file_path(&self.table_dir, index_id, self.path_type)
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {

View File

@@ -44,7 +44,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
@@ -180,7 +180,7 @@ impl CacheStrategy {
}
/// Calls [CacheManager::evict_puffin_cache()].
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.evict_puffin_cache(file_id).await
@@ -400,7 +400,7 @@ impl CacheManager {
}
/// Evicts every puffin-related cache entry for the given file.
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
if let Some(cache) = &self.bloom_filter_index_cache {
cache.invalidate_file(file_id.file_id());
}
@@ -422,7 +422,7 @@ impl CacheManager {
.remove(IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin,
FileType::Puffin(file_id.version),
))
.await;
}
@@ -949,7 +949,7 @@ mod tests {
let cache = Arc::new(cache);
let region_id = RegionId::new(1, 1);
let region_file_id = RegionFileId::new(region_id, FileId::random());
let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
let column_id: ColumnId = 1;
let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
@@ -957,16 +957,21 @@ mod tests {
let result_cache = cache.index_result_cache().unwrap();
let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
let bloom_key = (
index_id.file_id(),
index_id.version,
column_id,
Tag::Skipping,
);
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
(index_id.file_id(), index_id.version),
Arc::new(InvertedIndexMetas::default()),
);
let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
let selection = Arc::new(RowGroupSelection::default());
result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
let file_id_str = region_file_id.to_string();
result_cache.put(predicate.clone(), index_id.file_id(), selection);
let file_id_str = index_id.to_string();
let metadata = Arc::new(FileMetadata {
blobs: Vec::new(),
properties: HashMap::new(),
@@ -976,40 +981,32 @@ mod tests {
assert!(bloom_cache.get_metadata(bloom_key).is_some());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_some()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_some()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
cache.evict_puffin_cache(region_file_id).await;
cache.evict_puffin_cache(index_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_none()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
// Refill caches and evict via CacheStrategy to ensure delegation works.
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
(index_id.file_id(), index_id.version),
Arc::new(InvertedIndexMetas::default()),
);
result_cache.put(
predicate.clone(),
region_file_id.file_id(),
index_id.file_id(),
Arc::new(RowGroupSelection::default()),
);
puffin_metadata_cache.put_metadata(
@@ -1021,19 +1018,15 @@ mod tests {
);
let strategy = CacheStrategy::EnableAll(cache.clone());
strategy.evict_puffin_cache(region_file_id).await;
strategy.evict_puffin_cache(index_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_none()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
}
}

View File

@@ -71,7 +71,7 @@ impl FileCacheInner {
fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
match file_type {
FileType::Parquet => &self.parquet_index,
FileType::Puffin => &self.puffin_index,
FileType::Puffin { .. } => &self.puffin_index,
}
}
@@ -130,7 +130,7 @@ impl FileCacheInner {
// Track sizes separately for each file type
match key.file_type {
FileType::Parquet => parquet_size += size,
FileType::Puffin => puffin_size += size,
FileType::Puffin { .. } => puffin_size += size,
}
}
// The metrics is a signed int gauge so we can updates it finally.
@@ -178,7 +178,7 @@ impl FileCacheInner {
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
FileType::Puffin { .. } => "download_puffin",
}])
.start_timer();
@@ -607,7 +607,7 @@ impl fmt::Display for IndexKey {
"{}.{}.{}",
self.region_id.as_u64(),
self.file_id,
self.file_type.as_str()
self.file_type
)
}
}
@@ -618,7 +618,16 @@ pub enum FileType {
/// Parquet file.
Parquet,
/// Puffin file.
Puffin,
Puffin(u64),
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FileType::Parquet => write!(f, "parquet"),
FileType::Puffin(version) => write!(f, "{}.puffin", version),
}
}
}
impl FileType {
@@ -626,16 +635,16 @@ impl FileType {
fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin),
_ => None,
}
}
/// Converts the file type to string.
fn as_str(&self) -> &'static str {
match self {
FileType::Parquet => "parquet",
FileType::Puffin => "puffin",
"puffin" => Some(FileType::Puffin(0)),
_ => {
// if post-fix with .puffin, try to parse the version
if let Some(version_str) = s.strip_suffix(".puffin") {
let version = version_str.parse::<u64>().ok()?;
Some(FileType::Puffin(version))
} else {
None
}
}
}
}
@@ -643,7 +652,7 @@ impl FileType {
fn metric_label(&self) -> &'static str {
match self {
FileType::Parquet => FILE_TYPE,
FileType::Puffin => INDEX_TYPE,
FileType::Puffin(_) => INDEX_TYPE,
}
}
}
@@ -921,6 +930,15 @@ mod tests {
IndexKey::new(region_id, file_id, FileType::Parquet),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
);
assert_eq!(
IndexKey::new(region_id, file_id, FileType::Puffin(0)),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
);
assert_eq!(
IndexKey::new(region_id, file_id, FileType::Puffin(42)),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
.unwrap()
);
assert!(parse_index_key("").is_none());
assert!(parse_index_key(".").is_none());
assert!(parse_index_key("5299989643269").is_none());

View File

@@ -21,7 +21,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use store_api::storage::{ColumnId, FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
@@ -35,8 +35,10 @@ pub enum Tag {
Fulltext,
}
pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag);
/// Cache for bloom filter index.
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
pub type BloomFilterIndexCache = IndexCache<BloomFilterIndexKey, BloomFilterMeta>;
pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
impl BloomFilterIndexCache {
@@ -59,11 +61,9 @@ impl BloomFilterIndexCache {
}
/// Calculates weight for bloom filter index metadata.
fn bloom_filter_index_metadata_weight(
k: &(FileId, ColumnId, Tag),
meta: &Arc<BloomFilterMeta>,
) -> u32 {
fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc<BloomFilterMeta>) -> u32 {
let base = k.0.as_bytes().len()
+ std::mem::size_of::<IndexVersion>()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
@@ -75,16 +75,14 @@ fn bloom_filter_index_metadata_weight(
}
/// Calculates weight for bloom filter index content.
fn bloom_filter_index_content_weight(
(k, _): &((FileId, ColumnId, Tag), PageKey),
v: &Bytes,
) -> u32 {
fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 {
(k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
}
/// Bloom filter index blob reader with cache.
pub struct CachedBloomFilterIndexBlobReader<R> {
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
tag: Tag,
blob_size: u64,
@@ -96,6 +94,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
/// Creates a new bloom filter index blob reader with cache.
pub fn new(
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
tag: Tag,
blob_size: u64,
@@ -104,6 +103,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
) -> Self {
Self {
file_id,
index_version,
column_id,
tag,
blob_size,
@@ -126,7 +126,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let (result, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
self.blob_size,
offset,
size,
@@ -161,7 +161,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let (page, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
self.blob_size,
range.start,
(range.end - range.start) as u32,
@@ -191,9 +191,9 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
if let Some(cached) = self
.cache
.get_metadata((self.file_id, self.column_id, self.tag))
if let Some(cached) =
self.cache
.get_metadata((self.file_id, self.index_version, self.column_id, self.tag))
{
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
@@ -203,7 +203,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
} else {
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
Arc::new(meta.clone()),
);
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
@@ -223,6 +223,7 @@ mod test {
#[test]
fn bloom_filter_metadata_weight_counts_vec_contents() {
let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let version = 0;
let column_id: ColumnId = 42;
let tag = Tag::Skipping;
@@ -246,10 +247,13 @@ mod test {
],
};
let weight =
bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
let weight = bloom_filter_index_metadata_weight(
&(file_id, version, column_id, tag),
&Arc::new(meta.clone()),
);
let base = file_id.as_bytes().len()
+ std::mem::size_of::<IndexVersion>()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();

View File

@@ -22,7 +22,7 @@ use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
use store_api::storage::{FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
@@ -30,7 +30,7 @@ use crate::metrics::{CACHE_HIT, CACHE_MISS};
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
/// Cache for inverted index.
pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>;
pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
impl InvertedIndexCache {
@@ -48,23 +48,24 @@ impl InvertedIndexCache {
/// Removes all cached entries for the given `file_id`.
pub fn invalidate_file(&self, file_id: FileId) {
self.invalidate_if(move |key| *key == file_id);
self.invalidate_if(move |key| key.0 == file_id);
}
}
/// Calculates weight for inverted index metadata.
fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
(k.as_bytes().len() + v.encoded_len()) as u32
fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc<InvertedIndexMetas>) -> u32 {
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.encoded_len()) as u32
}
/// Calculates weight for inverted index content.
fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
(k.as_bytes().len() + v.len()) as u32
fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 {
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.len()) as u32
}
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
index_version: IndexVersion,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
@@ -72,9 +73,16 @@ pub struct CachedInvertedIndexBlobReader<R> {
impl<R> CachedInvertedIndexBlobReader<R> {
/// Creates a new inverted index blob reader with cache.
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(
file_id: FileId,
index_version: IndexVersion,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
) -> Self {
Self {
file_id,
index_version,
blob_size,
inner,
cache,
@@ -96,7 +104,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let (result, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
(self.file_id, self.index_version),
self.blob_size,
offset,
size,
@@ -129,7 +137,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let (page, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
(self.file_id, self.index_version),
self.blob_size,
range.start,
(range.end - range.start) as u32,
@@ -156,7 +164,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) {
if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
@@ -164,7 +172,8 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
Ok(cached)
} else {
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(self.file_id, meta.clone());
self.cache
.put_metadata((self.file_id, self.index_version), meta.clone());
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta)
}
@@ -299,6 +308,7 @@ mod test {
// Init a test range reader in local fs.
let mut env = TestEnv::new().await;
let file_size = blob.len() as u64;
let index_version = 0;
let store = env.init_object_store_manager();
let temp_path = "data";
store.write(temp_path, blob).await.unwrap();
@@ -314,6 +324,7 @@ mod test {
let reader = InvertedIndexBlobReader::new(range_reader);
let cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
index_version,
file_size,
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
@@ -450,7 +461,7 @@ mod test {
let (read, _cache_metrics) = cached_reader
.cache
.get_or_load(
cached_reader.file_id,
(cached_reader.file_id, cached_reader.index_version),
file_size,
offset,
size,

View File

@@ -215,6 +215,7 @@ impl WriteCache {
puffin_manager: self
.puffin_manager_factory
.build(store.clone(), path_provider.clone()),
write_cache_enabled: true,
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -266,7 +267,7 @@ impl WriteCache {
upload_tracker.push_uploaded_file(parquet_path);
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
let puffin_path = upload_request
.dest_path_provider
.build_index_file_path(RegionFileId::new(region_id, sst.file_id));
@@ -439,7 +440,11 @@ impl UploadTracker {
file_cache.remove(parquet_key).await;
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
let puffin_key = IndexKey::new(
self.region_id,
sst.file_id,
FileType::Puffin(sst.index_metadata.version),
);
file_cache.remove(puffin_key).await;
}
}
@@ -548,7 +553,7 @@ mod tests {
assert_eq!(remote_data.to_vec(), cache_data.to_vec());
// Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
assert!(write_cache.file_cache.contains_key(&index_key));
let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();

View File

@@ -399,7 +399,7 @@ impl DefaultCompactor {
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,

View File

@@ -77,7 +77,7 @@ pub fn new_file_handle_with_size_and_sequence(
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -135,7 +135,7 @@ use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::{FileMeta, RegionFileId};
use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
@@ -541,22 +541,23 @@ impl MitoEngine {
return Vec::new();
};
let Some(index_file_id) = entry.index_file_id.as_ref() else {
return Vec::new();
};
let file_id = match FileId::parse_str(index_file_id) {
let index_version = entry.index_version;
let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
index_file_id
entry.file_id
);
return Vec::new();
}
};
let region_file_id = RegionFileId::new(entry.region_id, file_id);
let region_index_id = RegionIndexId::new(
RegionFileId::new(entry.region_id, file_id),
index_version,
);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
index_file_path: index_file_path.as_str(),
@@ -565,7 +566,7 @@ impl MitoEngine {
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: index_file_id,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};
@@ -576,7 +577,7 @@ impl MitoEngine {
collect_index_entries_from_puffin(
manager,
region_file_id,
region_index_id,
context,
bloom_filter_cache,
inverted_index_cache,

View File

@@ -861,9 +861,10 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
#[tokio::test]
async fn test_list_ssts() {
test_list_ssts_with_format(false, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -871,9 +872,10 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
test_list_ssts_with_format(true, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -945,13 +947,13 @@ async fn test_list_ssts_with_format(
.index_file_path
.map(|p| p.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
e.index_version = 0;
format!("\n{:?}", e)
})
.sorted()
.collect::<Vec<_>>()
.join("");
assert_eq!(debug_format, expected_manifest_ssts,);
assert_eq!(debug_format, expected_manifest_ssts, "{}", debug_format);
// list from storage
let storage_entries = engine
@@ -969,7 +971,7 @@ async fn test_list_ssts_with_format(
.sorted()
.collect::<Vec<_>>()
.join("");
assert_eq!(debug_format, expected_storage_ssts,);
assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format);
}
#[tokio::test]

View File

@@ -55,10 +55,10 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R
return 0;
}
let mut index_files_count: usize = 0;
for region_file_id in scanner.file_ids() {
for region_index_id in scanner.index_ids() {
let index_path = location::index_file_path(
access_layer.table_dir(),
region_file_id,
region_index_id,
access_layer.path_type(),
);
if access_layer

View File

@@ -32,7 +32,7 @@ use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
use crate::sst::index::fulltext_index::{
INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
@@ -66,14 +66,14 @@ pub(crate) struct IndexEntryContext<'a> {
/// Collect index metadata entries present in the SST puffin file.
pub(crate) async fn collect_index_entries_from_puffin(
manager: SstPuffinManager,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
context: IndexEntryContext<'_>,
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
inverted_index_cache: Option<InvertedIndexCacheRef>,
) -> Vec<PuffinIndexMetaEntry> {
let mut entries = Vec::new();
let reader = match manager.reader(&region_file_id).await {
let reader = match manager.reader(&region_index_id).await {
Ok(reader) => reader,
Err(err) => {
warn!(
@@ -104,7 +104,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
region_index_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
@@ -130,7 +130,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
region_index_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
@@ -172,7 +172,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::Inverted) => {
let mut inverted_entries = collect_inverted_entries(
&reader,
region_file_id,
region_index_id,
inverted_index_cache.as_ref(),
&context,
)
@@ -188,12 +188,12 @@ pub(crate) async fn collect_index_entries_from_puffin(
async fn collect_inverted_entries(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
cache: Option<&InvertedIndexCacheRef>,
context: &IndexEntryContext<'_>,
) -> Vec<PuffinIndexMetaEntry> {
// Read the inverted index blob and surface its per-column metadata entries.
let file_id = region_file_id.file_id();
let file_id = region_index_id.file_id();
let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
Ok(guard) => guard,
@@ -229,6 +229,7 @@ async fn collect_inverted_entries(
let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
let reader = CachedInvertedIndexBlobReader::new(
file_id,
region_index_id.version,
blob_size,
InvertedIndexBlobReader::new(blob_reader),
cache.clone(),
@@ -289,7 +290,7 @@ fn build_inverted_entries(
async fn try_read_bloom_meta(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
blob_type: &str,
target_key: &str,
cache: Option<&BloomFilterIndexCacheRef>,
@@ -311,7 +312,8 @@ async fn try_read_bloom_meta(
let result = match (cache, column_id, blob_size) {
(Some(cache), Some(column_id), Some(blob_size)) => {
CachedBloomFilterIndexBlobReader::new(
region_file_id.file_id(),
region_index_id.file_id(),
region_index_id.version,
column_id,
tag,
blob_size,

View File

@@ -643,7 +643,7 @@ impl RegionFlushTask {
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),

View File

@@ -330,10 +330,9 @@ impl LocalGcWorker {
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
// index file design is stable
let file_pairs: Vec<(FileId, FileId)> = unused_files
.iter()
.map(|file_id| (*file_id, *file_id))
.collect();
let file_pairs: Vec<(FileId, u64)> =
unused_files.iter().map(|file_id| (*file_id, 0)).collect();
// TODO(discord9): gc worker need another major refactor to support versioned index files
debug!(
"Found {} unused index files to delete for region {}",
@@ -354,7 +353,7 @@ impl LocalGcWorker {
Ok(unused_files)
}
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
delete_files(
region_id,
file_ids,

View File

@@ -247,7 +247,7 @@ async fn checkpoint_with_different_compression_types() {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -312,7 +312,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,

View File

@@ -84,6 +84,14 @@ impl ProjectionMapper {
}
}
/// Returns true if the projection includes any tag columns.
pub(crate) fn has_tags(&self) -> bool {
match self {
ProjectionMapper::PrimaryKey(m) => m.has_tags(),
ProjectionMapper::Flat(_) => false,
}
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
@@ -257,6 +265,11 @@ impl PrimaryKeyProjectionMapper {
&self.metadata
}
/// Returns true if the projection includes any tag columns.
pub(crate) fn has_tags(&self) -> bool {
self.has_tags
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {

View File

@@ -135,6 +135,14 @@ impl Scanner {
}
}
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
Scanner::Series(series_scan) => series_scan.input().index_ids(),
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
@@ -958,6 +966,7 @@ impl ScanInput {
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let decode_pk_values = !self.compaction && self.mapper.has_tags();
let res = self
.access_layer
.read_sst(file.clone())
@@ -971,6 +980,7 @@ impl ScanInput {
.flat_format(self.flat_format)
.compaction(self.compaction)
.pre_filter_mode(filter_mode)
.decode_primary_key_values(decode_pk_values)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {
@@ -1160,6 +1170,10 @@ impl ScanInput {
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
self.files.iter().map(|file| file.index_id()).collect()
}
}
fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {

View File

@@ -617,17 +617,16 @@ impl MitoRegion {
.map(|meta| {
let region_id = self.region_id;
let origin_region_id = meta.region_id;
let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
{
let index_file_path =
index_file_path(table_dir, meta.index_file_id(), path_type);
let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
(
Some(meta.index_file_id().file_id().to_string()),
meta.index_version,
Some(index_file_path),
Some(meta.index_file_size),
)
} else {
(None, None, None)
(0, None, None)
};
let visible = visible_ssts.contains(&meta.file_id);
ManifestSstEntry {
@@ -638,7 +637,7 @@ impl MitoRegion {
region_group: region_id.region_group(),
region_sequence: region_id.region_sequence(),
file_id: meta.file_id.to_string(),
index_file_id,
index_version,
level: meta.level,
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
file_size: meta.file_size,

View File

@@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
@@ -867,8 +867,8 @@ impl RegionLoadCacheTask {
if file_meta.exists_index() {
let puffin_key = IndexKey::new(
file_meta.region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
file_meta.file_id,
FileType::Puffin(file_meta.index_version),
);
if !file_cache.contains_key(&puffin_key) {
@@ -925,12 +925,18 @@ impl RegionLoadCacheTask {
break;
}
let index_remote_path = location::index_file_path(
table_dir,
let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
version
} else {
unreachable!("`files_to_download` should only contains Puffin files");
};
let index_id = RegionIndexId::new(
RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
path_type,
index_version,
);
let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
match file_cache
.download(puffin_key, &index_remote_path, object_store, file_size)
.await

View File

@@ -428,7 +428,7 @@ mod tests {
available_indexes: SmallVec::new(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 100,
num_row_groups: 1,
sequence: NonZeroU64::new(1),

View File

@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, RegionId};
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -117,6 +117,41 @@ impl fmt::Display for RegionFileId {
}
}
/// Unique identifier for an index file, combining the SST file ID and the index version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RegionIndexId {
pub file_id: RegionFileId,
pub version: IndexVersion,
}
impl RegionIndexId {
pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
Self { file_id, version }
}
pub fn region_id(&self) -> RegionId {
self.file_id.region_id
}
pub fn file_id(&self) -> FileId {
self.file_id.file_id
}
}
impl fmt::Display for RegionIndexId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.version == 0 {
write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
} else {
write!(
f,
"{}/{}.{}",
self.file_id.region_id, self.file_id.file_id, self.version
)
}
}
}
/// Time range (min and max timestamps) of a SST file.
/// Both min and max are inclusive.
pub type FileTimeRange = (Timestamp, Timestamp);
@@ -159,12 +194,10 @@ pub struct FileMeta {
pub indexes: Vec<ColumnIndexMetadata>,
/// Size of the index file.
pub index_file_size: u64,
/// File ID of the index file.
///
/// When this field is None, it means the index file id is the same as the file id.
/// Only meaningful when index_file_size > 0.
/// Used for rebuilding index files.
pub index_file_id: Option<FileId>,
/// Version of the index file.
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
pub index_version: IndexVersion,
/// Number of rows in the file.
///
/// For historical reasons, this field might be missing in old files. Thus
@@ -332,14 +365,9 @@ impl FileMeta {
RegionFileId::new(self.region_id, self.file_id)
}
/// Returns the cross-region index file id.
/// If the index file id is not set, returns the file id.
pub fn index_file_id(&self) -> RegionFileId {
if let Some(index_file_id) = self.index_file_id {
RegionFileId::new(self.region_id, index_file_id)
} else {
self.file_id()
}
/// Returns the RegionIndexId for this file.
pub fn index_id(&self) -> RegionIndexId {
RegionIndexId::new(self.file_id(), self.index_version)
}
}
@@ -376,14 +404,9 @@ impl FileHandle {
RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
}
/// Returns the cross-region index file id.
/// If the index file id is not set, returns the file id.
pub fn index_file_id(&self) -> RegionFileId {
if let Some(index_file_id) = self.inner.meta.index_file_id {
RegionFileId::new(self.inner.meta.region_id, index_file_id)
} else {
self.file_id()
}
/// Returns the RegionIndexId for this file.
pub fn index_id(&self) -> RegionIndexId {
RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
}
/// Returns the complete file path of the file.
@@ -468,10 +491,15 @@ impl FileHandleInner {
}
}
/// Delete
/// Delete files for a region.
/// - `region_id`: Region id.
/// - `file_ids`: List of (file id, index version) tuples to delete.
/// - `delete_index`: Whether to delete the index file from the cache.
/// - `access_layer`: Access layer to delete files.
/// - `cache_manager`: Cache manager to remove files from cache.
pub async fn delete_files(
region_id: RegionId,
file_ids: &[(FileId, FileId)],
file_ids: &[(FileId, u64)],
delete_index: bool,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
@@ -484,12 +512,12 @@ pub async fn delete_files(
}
let mut deleted_files = Vec::with_capacity(file_ids.len());
for (file_id, index_file_id) in file_ids {
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
match access_layer
.delete_sst(
&RegionFileId::new(region_id, *file_id),
&RegionFileId::new(region_id, *index_file_id),
&region_file_id,
&RegionIndexId::new(region_file_id, *index_version),
)
.await
{
@@ -509,32 +537,90 @@ pub async fn delete_files(
deleted_files
);
for (file_id, index_file_id) in file_ids {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
.await;
}
for (file_id, index_version) in file_ids {
purge_index_write_cache_stager(
region_id,
delete_index,
access_layer,
cache_manager,
file_id,
index_version,
)
.await;
}
Ok(())
}
// Remove the SST file from the cache.
/// Delete index file for a given SST file&index version.
pub async fn delete_index(
region_id: RegionId,
file_id: FileId,
index_version: u64,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
if let Err(err) = access_layer
.delete_index(&RegionIndexId::new(
RegionFileId::new(region_id, file_id),
index_version,
))
.await
{
error!(err; "Failed to delete index file for {}/{}.{}",
region_id, file_id, index_version);
}
purge_index_write_cache_stager(
region_id,
true,
access_layer,
cache_manager,
&file_id,
&index_version,
)
.await;
Ok(())
}
pub async fn purge_index_write_cache_stager(
region_id: RegionId,
delete_index: bool,
access_layer: &Arc<crate::access_layer::AccessLayer>,
cache_manager: &Option<Arc<crate::cache::CacheManager>>,
file_id: &FileId,
index_version: &u64,
) {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionFileId::new(region_id, *index_file_id))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
index_file_id, region_id);
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
Ok(())
}
#[cfg(test)]
@@ -563,7 +649,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -614,7 +700,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,

File diff suppressed because it is too large Load Diff

View File

@@ -132,7 +132,11 @@ impl FileReferenceManager {
let region_id = file_meta.region_id;
let mut is_new = false;
{
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
let file_ref = FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version,
);
self.files_per_region
.entry(region_id)
.and_modify(|refs| {
@@ -157,7 +161,7 @@ impl FileReferenceManager {
/// If the reference count reaches zero, the file reference will be removed from the manager.
pub fn remove_file(&self, file_meta: &FileMeta) {
let region_id = file_meta.region_id;
let file_ref = FileRef::new(region_id, file_meta.file_id);
let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version);
let mut remove_table_entry = false;
let mut remove_file_ref = false;
@@ -230,7 +234,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_file_id: None,
index_version: 0,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
@@ -246,13 +250,13 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
);
file_ref_mgr.add_file(&file_meta);
let expected_region_ref_manifest =
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id, 0)]);
assert_eq!(
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
@@ -265,7 +269,7 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 2)])
);
assert_eq!(
@@ -281,7 +285,7 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
);
assert_eq!(

View File

@@ -61,7 +61,7 @@ use crate::request::{
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId,
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
@@ -81,6 +81,8 @@ pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
pub struct IndexOutput {
/// Size of the file.
pub file_size: u64,
/// Index version.
pub version: u64,
/// Inverted index output.
pub inverted_index: InvertedIndexOutput,
/// Fulltext index output.
@@ -163,7 +165,9 @@ pub type BloomFilterOutput = IndexBaseOutput;
pub struct Indexer {
file_id: FileId,
region_id: RegionId,
index_version: u64,
puffin_manager: Option<SstPuffinManager>,
write_cache_enabled: bool,
inverted_indexer: Option<InvertedIndexer>,
last_mem_inverted_index: usize,
fulltext_indexer: Option<FulltextIndexer>,
@@ -236,7 +240,7 @@ impl Indexer {
#[async_trait::async_trait]
pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId) -> Indexer;
async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
}
#[derive(Clone)]
pub(crate) struct IndexerBuilderImpl {
@@ -244,6 +248,7 @@ pub(crate) struct IndexerBuilderImpl {
pub(crate) metadata: RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) puffin_manager: SstPuffinManager,
pub(crate) write_cache_enabled: bool,
pub(crate) intermediate_manager: IntermediateManager,
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
@@ -254,10 +259,12 @@ pub(crate) struct IndexerBuilderImpl {
#[async_trait::async_trait]
impl IndexerBuilder for IndexerBuilderImpl {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
async fn build(&self, file_id: FileId) -> Indexer {
async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
let mut indexer = Indexer {
file_id,
region_id: self.metadata.region_id,
index_version,
write_cache_enabled: self.write_cache_enabled,
..Default::default()
};
@@ -611,13 +618,20 @@ impl IndexBuildTask {
&mut self,
version_control: VersionControlRef,
) -> Result<IndexBuildOutcome> {
let index_file_id = if self.file_meta.index_file_size > 0 {
// Generate new file ID if index file exists to avoid overwrite.
FileId::random()
// Determine the new index version
let new_index_version = if self.file_meta.index_file_size > 0 {
// Increment version if index file exists to avoid overwrite.
self.file_meta.index_version + 1
} else {
self.file_meta.file_id
0 // Default version for new index files
};
let mut indexer = self.indexer_builder.build(index_file_id).await;
// Use the same file_id but with new version for index file
let index_file_id = self.file_meta.file_id;
let mut indexer = self
.indexer_builder
.build(index_file_id, new_index_version)
.await;
// Check SST file existence before building index to avoid failure of parquet reader.
if !self.check_sst_file_exists(&version_control).await {
@@ -677,10 +691,10 @@ impl IndexBuildTask {
}
// Upload index file if write cache is enabled.
self.maybe_upload_index_file(index_output.clone(), index_file_id)
self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
.await?;
let worker_request = match self.update_manifest(index_output, index_file_id).await {
let worker_request = match self.update_manifest(index_output, new_index_version).await {
Ok(edit) => {
let index_build_finished = IndexBuildFinished {
region_id: self.file_meta.region_id,
@@ -712,6 +726,7 @@ impl IndexBuildTask {
&self,
output: IndexOutput,
index_file_id: FileId,
index_version: u64,
) -> Result<()> {
if let Some(write_cache) = &self.write_cache {
let file_id = self.file_meta.file_id;
@@ -719,12 +734,14 @@ impl IndexBuildTask {
let remote_store = self.access_layer.object_store();
let mut upload_tracker = UploadTracker::new(region_id);
let mut err = None;
let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
let puffin_key =
IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
let puffin_path = RegionFilePathFactory::new(
self.access_layer.table_dir().to_string(),
self.access_layer.path_type(),
)
.build_index_file_path(RegionFileId::new(region_id, file_id));
.build_index_file_path_with_version(index_id);
if let Err(e) = write_cache
.upload(puffin_key, &puffin_path, remote_store)
.await
@@ -756,12 +773,13 @@ impl IndexBuildTask {
async fn update_manifest(
&mut self,
output: IndexOutput,
index_file_id: FileId,
new_index_version: u64,
) -> Result<RegionEdit> {
self.file_meta.available_indexes = output.build_available_indexes();
self.file_meta.indexes = output.build_indexes();
self.file_meta.index_file_size = output.file_size;
self.file_meta.index_file_id = Some(index_file_id);
let old_index_version = self.file_meta.index_version;
self.file_meta.index_version = new_index_version;
let edit = RegionEdit {
files_to_add: vec![self.file_meta.clone()],
files_to_remove: vec![],
@@ -784,6 +802,11 @@ impl IndexBuildTask {
self.file_meta.region_id,
self.reason.as_str()
);
// notify the file purger to remove the old index files if any
if new_index_version > 0 {
self.file_purger
.update_index(self.file_meta.clone(), old_index_version);
}
Ok(edit)
}
}
@@ -1163,6 +1186,10 @@ mod tests {
unreachable!()
}
fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
unreachable!()
}
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
unreachable!()
}
@@ -1236,6 +1263,7 @@ mod tests {
metadata,
row_group_size: 1024,
puffin_manager,
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1260,13 +1288,14 @@ mod tests {
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1290,6 +1319,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig {
@@ -1299,7 +1329,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -1311,6 +1341,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1320,7 +1351,7 @@ mod tests {
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1332,6 +1363,7 @@ mod tests {
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1341,7 +1373,7 @@ mod tests {
..Default::default()
},
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1365,13 +1397,14 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -1388,13 +1421,14 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1411,13 +1445,14 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -1441,13 +1476,14 @@ mod tests {
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
write_cache_enabled: false,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random())
.build(FileId::random(), 0)
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -1619,7 +1655,7 @@ mod tests {
let puffin_path = location::index_file_path(
env.access_layer.table_dir(),
RegionFileId::new(region_id, file_meta.file_id),
RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
env.access_layer.path_type(),
);
@@ -1761,6 +1797,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: write_cache.build_puffin_manager().clone(),
write_cache_enabled: true,
intermediate_manager: write_cache.intermediate_manager().clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -1812,7 +1849,11 @@ mod tests {
}
// The write cache should contain the uploaded index file.
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
let index_key = IndexKey::new(
region_id,
file_meta.file_id,
FileType::Puffin(sst_info.index_metadata.version),
);
assert!(write_cache.file_cache().contains_key(&index_key));
}

View File

@@ -44,7 +44,7 @@ use crate::error::{
Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
@@ -200,7 +200,7 @@ impl BloomFilterIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
@@ -242,6 +242,7 @@ impl BloomFilterIndexApplier {
}
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
file_id.version,
*column_id,
Tag::Skipping,
blob_size,
@@ -286,7 +287,7 @@ impl BloomFilterIndexApplier {
/// Returus `None` if the column does not have an index.
async fn blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
file_size_hint: Option<u64>,
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
@@ -328,7 +329,7 @@ impl BloomFilterIndexApplier {
/// Creates a blob reader from the cached index file
async fn cached_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
@@ -336,7 +337,11 @@ impl BloomFilterIndexApplier {
return Ok(None);
};
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
@@ -369,7 +374,7 @@ impl BloomFilterIndexApplier {
/// Creates a blob reader from the remote index file
async fn remote_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
@@ -446,6 +451,7 @@ mod tests {
use store_api::storage::FileId;
use super::*;
use crate::sst::file::RegionFileId;
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
use crate::sst::index::bloom_filter::creator::tests::{
mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
@@ -457,7 +463,7 @@ mod tests {
object_store: ObjectStore,
metadata: &RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
file_id: RegionFileId,
file_id: RegionIndexId,
) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
+ use<'_> {
move |exprs, row_groups| {
@@ -514,6 +520,7 @@ mod tests {
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
let memory_usage_threshold = Some(1024);
let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
let file_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let mut indexer = BloomFilterIndexer::new(

View File

@@ -481,7 +481,7 @@ pub(crate) mod tests {
use super::*;
use crate::access_layer::FilePathProvider;
use crate::read::BatchColumn;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub fn mock_object_store() -> ObjectStore {
@@ -499,6 +499,10 @@ pub(crate) mod tests {
file_id.file_id().to_string()
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
index_id.file_id.file_id().to_string()
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
file_id.file_id().to_string()
}
@@ -621,6 +625,7 @@ pub(crate) mod tests {
let puffin_manager = factory.build(object_store, TestPathProvider);
let file_id = RegionFileId::new(region_metadata.region_id, file_id);
let file_id = RegionIndexId::new(file_id, 0);
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
assert_eq!(row_count, 20);

View File

@@ -44,7 +44,7 @@ use crate::error::{
PuffinReadBlobSnafu, Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
@@ -221,7 +221,7 @@ impl FulltextIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_fine(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> {
@@ -275,7 +275,7 @@ impl FulltextIndexApplier {
async fn apply_fine_one_column(
&self,
file_size_hint: Option<u64>,
file_id: RegionFileId,
file_id: RegionIndexId,
column_id: ColumnId,
request: &FulltextRequest,
metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -356,7 +356,7 @@ impl FulltextIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_coarse(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -405,7 +405,7 @@ impl FulltextIndexApplier {
async fn apply_coarse_one_column(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
column_id: ColumnId,
terms: &[FulltextTerm],
@@ -440,6 +440,7 @@ impl FulltextIndexApplier {
.content_length;
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
file_id.version,
column_id,
Tag::Fulltext,
blob_size,
@@ -611,7 +612,7 @@ impl IndexSource {
/// Returns `None` if the blob is not found.
async fn blob(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
key: &str,
file_size_hint: Option<u64>,
metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -649,7 +650,7 @@ impl IndexSource {
/// Returns `None` if the directory is not found.
async fn dir(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
key: &str,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
@@ -699,7 +700,7 @@ impl IndexSource {
/// Return reader and whether it is fallbacked to remote store.
async fn ensure_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<(SstPuffinReader, bool)> {
match self.build_local_cache(file_id, file_size_hint).await {
@@ -711,14 +712,18 @@ impl IndexSource {
async fn build_local_cache(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
@@ -740,7 +745,7 @@ impl IndexSource {
async fn build_remote(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<SstPuffinReader> {
let puffin_manager = self

View File

@@ -481,7 +481,7 @@ mod tests {
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::fulltext_index::applier::builder::{
FulltextQuery, FulltextRequest, FulltextTerm,
@@ -672,7 +672,8 @@ mod tests {
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
let mut writer = puffin_manager.writer(&region_file_id).await.unwrap();
let index_id = RegionIndexId::new(region_file_id, 0);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
let _ = indexer.finish(&mut writer).await.unwrap();
writer.finish().await.unwrap();
@@ -723,16 +724,15 @@ mod tests {
let backend = backend.clone();
async move {
match backend {
FulltextBackend::Tantivy => applier
.apply_fine(region_file_id, None, None)
.await
.unwrap(),
FulltextBackend::Tantivy => {
applier.apply_fine(index_id, None, None).await.unwrap()
}
FulltextBackend::Bloom => {
let coarse_mask = coarse_mask.unwrap_or_default();
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
// row group id == row id
let resp = applier
.apply_coarse(region_file_id, None, row_groups, None)
.apply_coarse(index_id, None, row_groups, None)
.await
.unwrap();
resp.map(|r| {

View File

@@ -14,6 +14,8 @@
use common_telemetry::warn;
use crate::access_layer::TempFileCleaner;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::Indexer;
impl Indexer {
@@ -22,6 +24,9 @@ impl Indexer {
self.do_abort_fulltext_index().await;
self.do_abort_bloom_filter().await;
self.do_prune_intm_sst_dir().await;
if self.write_cache_enabled {
self.do_abort_clean_fs_temp_dir().await;
}
self.puffin_manager = None;
}
@@ -87,4 +92,18 @@ impl Indexer {
);
}
}
async fn do_abort_clean_fs_temp_dir(&mut self) {
let Some(puffin_manager) = &self.puffin_manager else {
return;
};
let fs_accessor = puffin_manager.file_accessor();
let fs_handle = RegionIndexId::new(
RegionFileId::new(self.region_id, self.file_id),
self.index_version,
)
.to_string();
TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
}
}

View File

@@ -16,7 +16,7 @@ use common_telemetry::{debug, warn};
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use store_api::storage::ColumnId;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{
@@ -56,14 +56,18 @@ impl Indexer {
self.do_prune_intm_sst_dir().await;
output.file_size = self.do_finish_puffin_writer(writer).await;
output.version = self.index_version;
output
}
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
let puffin_manager = self.puffin_manager.take()?;
let puffin_manager = self.puffin_manager.clone()?;
let err = match puffin_manager
.writer(&RegionFileId::new(self.region_id, self.file_id))
.writer(&RegionIndexId::new(
RegionFileId::new(self.region_id, self.file_id),
self.index_version,
))
.await
{
Ok(writer) => return Some(writer),

View File

@@ -40,7 +40,7 @@ use crate::error::{
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
@@ -194,7 +194,7 @@ impl InvertedIndexApplier {
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
@@ -222,6 +222,7 @@ impl InvertedIndexApplier {
let result = if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id.file_id(),
file_id.version,
blob_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
@@ -268,14 +269,18 @@ impl InvertedIndexApplier {
/// Creates a blob reader from the cached index file.
async fn cached_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
@@ -303,7 +308,7 @@ impl InvertedIndexApplier {
/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(
&self,
file_id: RegionFileId,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let puffin_manager = self
@@ -349,6 +354,7 @@ mod tests {
use store_api::storage::FileId;
use super::*;
use crate::sst::index::RegionFileId;
#[tokio::test]
async fn test_index_applier_apply_basic() {
@@ -356,13 +362,14 @@ mod tests {
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
writer
.put_blob(
INDEX_BLOB_TYPE,
@@ -392,7 +399,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
@@ -410,13 +417,14 @@ mod tests {
.await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
writer
.put_blob(
"invalid_blob_type",
@@ -440,7 +448,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let res = sst_index_applier.apply(file_id, None, None).await;
let res = sst_index_applier.apply(index_id, None, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -466,7 +466,7 @@ mod tests {
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -591,7 +591,8 @@ mod tests {
);
let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
let index_id = RegionIndexId::new(sst_file_id, 0);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
assert_eq!(row_count, rows.len() * segment_row_count);
writer.finish().await.unwrap();
@@ -615,7 +616,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id, None, None)
.apply(index_id, None, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -32,14 +32,14 @@ use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, StagerMetrics,
};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager<RegionFileId>>, ObjectStorePuffinFileAccessor>;
FsPuffinManager<Arc<BoundedStager<RegionIndexId>>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
@@ -52,7 +52,7 @@ const STAGING_DIR: &str = "staging";
#[derive(Clone)]
pub struct PuffinManagerFactory {
/// The stager used by the puffin manager.
stager: Arc<BoundedStager<RegionFileId>>,
stager: Arc<BoundedStager<RegionIndexId>>,
/// The size of the write buffer used to create object store.
write_buffer_size: Option<usize>,
@@ -92,7 +92,7 @@ impl PuffinManagerFactory {
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
pub(crate) async fn purge_stager(&self, file_id: RegionFileId) -> Result<()> {
pub(crate) async fn purge_stager(&self, file_id: RegionIndexId) -> Result<()> {
self.stager
.purge(&file_id)
.await
@@ -136,16 +136,22 @@ impl ObjectStorePuffinFileAccessor {
path_provider,
}
}
pub fn store(&self) -> &InstrumentedStore {
&self.object_store
}
}
#[async_trait]
impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
type Reader = InstrumentedRangeReader;
type Writer = InstrumentedAsyncWrite;
type FileHandle = RegionFileId;
type FileHandle = RegionIndexId;
async fn reader(&self, handle: &RegionFileId) -> PuffinResult<Self::Reader> {
let file_path = self.path_provider.build_index_file_path(*handle);
async fn reader(&self, handle: &RegionIndexId) -> PuffinResult<Self::Reader> {
let file_path = self
.path_provider
.build_index_file_path_with_version(*handle);
self.object_store
.range_reader(
&file_path,
@@ -157,8 +163,10 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
.context(puffin_error::ExternalSnafu)
}
async fn writer(&self, handle: &RegionFileId) -> PuffinResult<Self::Writer> {
let file_path = self.path_provider.build_index_file_path(*handle);
async fn writer(&self, handle: &RegionIndexId) -> PuffinResult<Self::Writer> {
let file_path = self
.path_provider
.build_index_file_path_with_version(*handle);
self.object_store
.writer(
&file_path,
@@ -184,7 +192,7 @@ mod tests {
use store_api::storage::FileId;
use super::*;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
struct TestFilePathProvider;
@@ -193,6 +201,10 @@ mod tests {
file_id.file_id().to_string()
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
index_id.file_id.file_id().to_string()
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
file_id.file_id().to_string()
}
@@ -206,7 +218,7 @@ mod tests {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let manager = factory.build(object_store, TestFilePathProvider);
let file_id = RegionFileId::new(0.into(), FileId::random());
let file_id = RegionIndexId::new(RegionFileId::new(0.into(), FileId::random()), 0);
let blob_key = "blob-key";
let dir_key = "dir-key";
let raw_data = b"hello world!";

View File

@@ -49,6 +49,10 @@ impl InstrumentedStore {
}
}
pub fn store(&self) -> &ObjectStore {
&self.object_store
}
/// Set the size of the write buffer.
pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);

View File

@@ -20,7 +20,7 @@ use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use crate::error::UnexpectedSnafu;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
/// Generate region dir from table_dir, region_id and path_type
pub fn region_dir_from_table_dir(
@@ -46,14 +46,68 @@ pub fn sst_file_path(table_dir: &str, region_file_id: RegionFileId, path_type: P
)
}
pub fn index_file_path(
pub fn index_file_path(table_dir: &str, index_id: RegionIndexId, path_type: PathType) -> String {
let region_dir = region_dir_from_table_dir(table_dir, index_id.file_id.region_id(), path_type);
let index_dir = util::join_dir(&region_dir, "index");
let filename = if index_id.version == 0 {
format!("{}.puffin", index_id.file_id.file_id())
} else {
format!("{}.{}.puffin", index_id.file_id.file_id(), index_id.version)
};
util::join_path(&index_dir, &filename)
}
/// Legacy function for backward compatibility - creates index file path using RegionFileId with version 0
pub fn index_file_path_legacy(
table_dir: &str,
region_file_id: RegionFileId,
path_type: PathType,
) -> String {
let region_dir = region_dir_from_table_dir(table_dir, region_file_id.region_id(), path_type);
let index_dir = util::join_dir(&region_dir, "index");
util::join_path(&index_dir, &format!("{}.puffin", region_file_id.file_id()))
let index_id = RegionIndexId::new(region_file_id, 0);
index_file_path(table_dir, index_id, path_type)
}
/// Parse file ID and version from index filename
pub fn parse_index_file_info(filepath: &str) -> crate::error::Result<(FileId, u64)> {
let filename = filepath.rsplit('/').next().context(UnexpectedSnafu {
reason: format!("invalid file path: {}", filepath),
})?;
let parts: Vec<&str> = filename.split('.').collect();
if parts.len() == 2 && parts[1] == "puffin" {
// Legacy format: {file_id}.puffin (version 0)
let file_id = parts[0];
FileId::parse_str(file_id).map(|id| (id, 0)).map_err(|e| {
UnexpectedSnafu {
reason: format!("invalid file id: {}, err: {}", file_id, e),
}
.build()
})
} else if parts.len() == 3 && parts[2] == "puffin" {
// New format: {file_id}.{version}.puffin
let file_id = parts[0];
let version = parts[1].parse::<u64>().map_err(|_| {
UnexpectedSnafu {
reason: format!("invalid version in file name: {}", filename),
}
.build()
})?;
FileId::parse_str(file_id)
.map(|id| (id, version))
.map_err(|e| {
UnexpectedSnafu {
reason: format!("invalid file id: {}, err: {}", file_id, e),
}
.build()
})
} else {
UnexpectedSnafu {
reason: format!("invalid index file name: {}", filename),
}
.fail()
}
}
/// Get RegionFileId from sst or index filename
@@ -111,17 +165,59 @@ mod tests {
fn test_index_file_path() {
let file_id = FileId::random();
let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id);
let index_id = RegionIndexId::new(region_file_id, 0);
assert_eq!(
index_file_path("table_dir", region_file_id, PathType::Bare),
index_file_path("table_dir", index_id, PathType::Bare),
format!("table_dir/1_0000000002/index/{}.puffin", file_id)
);
assert_eq!(
index_file_path("table_dir", region_file_id, PathType::Data),
index_file_path("table_dir", index_id, PathType::Data),
format!("table_dir/1_0000000002/data/index/{}.puffin", file_id)
);
assert_eq!(
index_file_path("table_dir", region_file_id, PathType::Metadata),
index_file_path("table_dir", index_id, PathType::Metadata),
format!("table_dir/1_0000000002/metadata/index/{}.puffin", file_id)
);
}
#[test]
fn test_index_file_path_versioned() {
let file_id = FileId::random();
let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id);
let index_id_v1 = RegionIndexId::new(region_file_id, 1);
let index_id_v2 = RegionIndexId::new(region_file_id, 2);
assert_eq!(
index_file_path("table_dir", index_id_v1, PathType::Bare),
format!("table_dir/1_0000000002/index/{}.1.puffin", file_id)
);
assert_eq!(
index_file_path("table_dir", index_id_v2, PathType::Bare),
format!("table_dir/1_0000000002/index/{}.2.puffin", file_id)
);
}
#[test]
fn test_parse_index_file_info() {
// Test legacy format
let file_id = FileId::random();
let result =
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.puffin"))
.unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 0);
// Test versioned format
let result =
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.1.puffin"))
.unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 1);
let result =
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.42.puffin"))
.unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 42);
}
}

View File

@@ -117,7 +117,7 @@ mod tests {
use crate::config::IndexConfig;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
@@ -144,7 +144,11 @@ mod tests {
impl FilePathProvider for FixedPathProvider {
fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
location::index_file_path(FILE_DIR, index_id, PathType::Bare)
}
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
@@ -156,7 +160,7 @@ mod tests {
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId) -> Indexer {
async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
Indexer::default()
}
}
@@ -711,6 +715,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size,
puffin_manager,
write_cache_enabled: false,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
@@ -769,7 +774,7 @@ mod tests {
available_indexes: info.index_metadata.build_available_indexes(),
indexes: info.index_metadata.build_indexes(),
index_file_size: info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_row_groups: info.num_row_groups,
num_rows: info.num_rows as u64,
sequence: None,
@@ -1090,6 +1095,7 @@ mod tests {
metadata: metadata.clone(),
row_group_size,
puffin_manager,
write_cache_enabled: false,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {

View File

@@ -40,7 +40,10 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use mito_codec::row_converter::{SortField, build_primary_key_codec_with_fields};
use mito_codec::row_converter::{
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
build_primary_key_codec_with_fields,
};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics;
use snafu::{OptionExt, ResultExt, ensure};
@@ -48,7 +51,8 @@ use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::sst::file::{FileMeta, FileTimeRange};
@@ -386,6 +390,13 @@ impl ReadFormat {
}
}
/// Enables or disables eager decoding of primary key values into batches.
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
if let ReadFormat::PrimaryKey(format) = self {
format.set_decode_primary_key_values(decode);
}
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
match self {
@@ -411,6 +422,8 @@ pub struct PrimaryKeyReadFormat {
field_id_to_projected_index: HashMap<ColumnId, usize>,
/// Sequence number to override the sequence read from the SST.
override_sequence: Option<SequenceNumber>,
/// Codec used to decode primary key values if eager decoding is enabled.
primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
}
impl PrimaryKeyReadFormat {
@@ -439,6 +452,7 @@ impl PrimaryKeyReadFormat {
projection_indices: format_projection.projection_indices,
field_id_to_projected_index: format_projection.column_id_to_projected_index,
override_sequence: None,
primary_key_codec: None,
}
}
@@ -447,6 +461,15 @@ impl PrimaryKeyReadFormat {
self.override_sequence = sequence;
}
/// Enables or disables eager decoding of primary key values into batches.
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
self.primary_key_codec = if decode {
Some(build_primary_key_codec(&self.metadata))
} else {
None
};
}
/// Gets the arrow schema of the SST file.
///
/// This schema is computed from the region metadata but should be the same
@@ -561,7 +584,12 @@ impl PrimaryKeyReadFormat {
});
}
let batch = builder.build()?;
let mut batch = builder.build()?;
if let Some(codec) = &self.primary_key_codec {
let pk_values: CompositeValues =
codec.decode(batch.primary_key()).context(DecodeSnafu)?;
batch.set_pk_values(pk_values);
}
batches.push_back(batch);
}

View File

@@ -127,6 +127,8 @@ pub struct ParquetReaderBuilder {
compaction: bool,
/// Mode to pre-filter columns.
pre_filter_mode: PreFilterMode,
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
decode_primary_key_values: bool,
}
impl ParquetReaderBuilder {
@@ -152,6 +154,7 @@ impl ParquetReaderBuilder {
flat_format: false,
compaction: false,
pre_filter_mode: PreFilterMode::All,
decode_primary_key_values: false,
}
}
@@ -236,6 +239,13 @@ impl ParquetReaderBuilder {
self
}
/// Decodes primary key values eagerly when reading primary key format SSTs.
#[must_use]
pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
self.decode_primary_key_values = decode;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -292,6 +302,9 @@ impl ParquetReaderBuilder {
self.compaction,
)?
};
if self.decode_primary_key_values {
read_format.set_decode_primary_key_values(true);
}
if need_override_sequence(&parquet_meta) {
read_format
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
@@ -545,7 +558,7 @@ impl ParquetReaderBuilder {
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
metrics.fulltext_index_apply_metrics.as_mut(),
)
@@ -617,7 +630,7 @@ impl ParquetReaderBuilder {
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
metrics.inverted_index_apply_metrics.as_mut(),
)
@@ -696,7 +709,7 @@ impl ParquetReaderBuilder {
});
let apply_res = index_applier
.apply(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
@@ -779,7 +792,7 @@ impl ParquetReaderBuilder {
});
let apply_res = index_applier
.apply_coarse(
self.file_handle.file_id(),
self.file_handle.index_id(),
Some(file_size_hint),
rgs,
metrics.fulltext_index_apply_metrics.as_mut(),

View File

@@ -153,7 +153,7 @@ where
metrics: &'a mut Metrics,
) -> ParquetWriter<'a, F, I, P> {
let init_file = FileId::random();
let indexer = indexer_builder.build(init_file).await;
let indexer = indexer_builder.build(init_file, 0).await;
ParquetWriter {
path_provider,
@@ -482,7 +482,7 @@ where
.context(WriteParquetSnafu)?;
self.writer = Some(arrow_writer);
let indexer = self.indexer_builder.build(self.current_file).await;
let indexer = self.indexer_builder.build(self.current_file, 0).await;
self.current_indexer = Some(indexer);
// safety: self.writer is assigned above

View File

@@ -126,7 +126,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -104,7 +104,7 @@ impl VersionControlBuilder {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,
@@ -195,7 +195,7 @@ pub(crate) fn apply_edit(
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -481,12 +481,12 @@ async fn edit_region(
let index_file_index_key = IndexKey::new(
region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
file_meta.index_id().file_id.file_id(),
FileType::Puffin(file_meta.index_version),
);
let index_remote_path = location::index_file_path(
layer.table_dir(),
file_meta.file_id(),
file_meta.index_id(),
layer.path_type(),
);

View File

@@ -28,7 +28,7 @@ use crate::region::MitoRegionRef;
use crate::request::{
BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
};
use crate::sst::file::{FileHandle, RegionFileId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::index::{
IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
};
@@ -68,6 +68,7 @@ impl<S> RegionWorkerLoop<S> {
row_group_size: WriteOptions::default().row_group_size,
intermediate_manager,
puffin_manager,
write_cache_enabled: self.cache_manager.write_cache().is_some(),
});
IndexBuildTask {
@@ -216,7 +217,8 @@ impl<S> RegionWorkerLoop<S> {
let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
for file_meta in &request.edit.files_to_add {
let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
cache_strategy.evict_puffin_cache(region_file_id).await;
let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
cache_strategy.evict_puffin_cache(index_id).await;
}
region.version_control.apply_edit(

View File

@@ -13,14 +13,15 @@
// limitations under the License.
use std::any::Any;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use common_telemetry::warn;
use datafusion::arrow::array::AsArray;
use datafusion::arrow::compute::{self, SortOptions, concat_batches};
use datafusion::arrow::array::{Array, AsArray, StringArray};
use datafusion::arrow::compute::{SortOptions, concat_batches};
use datafusion::arrow::datatypes::{DataType, Float64Type, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::stats::Precision;
@@ -40,8 +41,8 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::{Column, Expr};
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
use datatypes::value::{OrderedF64, ValueRef};
use datatypes::vectors::{Helper, MutableVector};
use datatypes::value::{OrderedF64, Value, ValueRef};
use datatypes::vectors::{Helper, MutableVector, VectorRef};
use futures::{Stream, StreamExt, ready};
/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later
@@ -358,6 +359,9 @@ impl ExecutionPlan for HistogramFoldExec {
input_buffer: vec![],
input,
output_schema,
input_schema: self.input.schema(),
mode: FoldMode::Optimistic,
safe_group: None,
metric: baseline_metric,
batch_size,
input_buffered_rows: 0,
@@ -430,6 +434,12 @@ impl DisplayAs for HistogramFoldExec {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FoldMode {
Optimistic,
Safe,
}
pub struct HistogramFoldStream {
// internal states
le_column_index: usize,
@@ -441,6 +451,9 @@ pub struct HistogramFoldStream {
/// Expected output batch size
batch_size: usize,
output_schema: SchemaRef,
input_schema: SchemaRef,
mode: FoldMode,
safe_group: Option<SafeGroup>,
// buffers
input_buffer: Vec<RecordBatch>,
@@ -453,6 +466,13 @@ pub struct HistogramFoldStream {
metric: BaselineMetrics,
}
#[derive(Debug, Default)]
struct SafeGroup {
tag_values: Vec<Value>,
buckets: Vec<f64>,
counters: Vec<f64>,
}
impl RecordBatchStream for HistogramFoldStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
@@ -478,7 +498,10 @@ impl Stream for HistogramFoldStream {
self.metric.elapsed_compute().add_elapsed(timer);
break Poll::Ready(Some(result));
}
None => break Poll::Ready(self.take_output_buf()?.map(Ok)),
None => {
self.flush_remaining()?;
break Poll::Ready(self.take_output_buf()?.map(Ok));
}
}
};
self.metric.record_poll(poll)
@@ -491,22 +514,28 @@ impl HistogramFoldStream {
&mut self,
input: RecordBatch,
) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
let Some(bucket_num) = self.calculate_bucket_num(&input)? else {
return Ok(None);
};
match self.mode {
FoldMode::Safe => {
self.push_input_buf(input);
self.process_safe_mode_buffer()?;
}
FoldMode::Optimistic => {
self.push_input_buf(input);
let Some(bucket_num) = self.calculate_bucket_num_from_buffer()? else {
return Ok(None);
};
self.bucket_size = Some(bucket_num);
if self.input_buffered_rows + input.num_rows() < bucket_num {
// not enough rows to fold
self.push_input_buf(input);
return Ok(None);
if self.input_buffered_rows < bucket_num {
// not enough rows to fold
return Ok(None);
}
self.fold_buf(bucket_num)?;
}
}
self.fold_buf(bucket_num, input)?;
if self.output_buffered_rows >= self.batch_size {
return Ok(self.take_output_buf()?.map(Ok));
}
Ok(None)
self.maybe_take_output()
}
/// Generate a group of empty [MutableVector]s from the output schema.
@@ -532,55 +561,100 @@ impl HistogramFoldStream {
Ok(builders)
}
fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
/// Determines bucket count using buffered batches, concatenating them to
/// detect the first complete bucket that may span batch boundaries.
fn calculate_bucket_num_from_buffer(&mut self) -> DataFusionResult<Option<usize>> {
if let Some(size) = self.bucket_size {
return Ok(Some(size));
}
let inf_pos = self.find_positive_inf(batch)?;
if inf_pos == batch.num_rows() {
// no positive inf found, append to buffer and wait for next batch
self.push_input_buf(batch.clone());
if self.input_buffer.is_empty() {
return Ok(None);
}
// else we found the positive inf.
// calculate the bucket size
let bucket_size = inf_pos + self.input_buffered_rows + 1;
Ok(Some(bucket_size))
let batch_refs: Vec<&RecordBatch> = self.input_buffer.iter().collect();
let batch = concat_batches(&self.input_schema, batch_refs)?;
self.find_first_complete_bucket(&batch)
}
fn find_first_complete_bucket(&self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
if batch.num_rows() == 0 {
return Ok(None);
}
let vectors = Helper::try_into_vectors(batch.columns())
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let le_array = batch.column(self.le_column_index).as_string::<i32>();
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
self.collect_tag_values(&vectors, 0, &mut tag_values_buf);
let mut group_start = 0usize;
for row in 0..batch.num_rows() {
if !self.is_same_group(&vectors, row, &tag_values_buf) {
// new group begins
self.collect_tag_values(&vectors, row, &mut tag_values_buf);
group_start = row;
}
if Self::is_positive_infinity(le_array, row) {
return Ok(Some(row - group_start + 1));
}
}
Ok(None)
}
/// Fold record batches from input buffer and put to output buffer
fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> {
self.push_input_buf(input);
// TODO(ruihang): this concat is avoidable.
let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?;
fn fold_buf(&mut self, bucket_num: usize) -> DataFusionResult<()> {
let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?;
let mut remaining_rows = self.input_buffered_rows;
let mut cursor = 0;
// TODO(LFC): Try to get rid of the Arrow array to vector conversion here.
let vectors = Helper::try_into_vectors(batch.columns())
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let le_array = batch.column(self.le_column_index);
let le_array = le_array.as_string::<i32>();
let field_array = batch.column(self.field_column_index);
let field_array = field_array.as_primitive::<Float64Type>();
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
while remaining_rows >= bucket_num && self.mode == FoldMode::Optimistic {
self.collect_tag_values(&vectors, cursor, &mut tag_values_buf);
if !self.validate_optimistic_group(
&vectors,
le_array,
cursor,
bucket_num,
&tag_values_buf,
) {
let remaining_input_batch = batch.slice(cursor, remaining_rows);
self.switch_to_safe_mode(remaining_input_batch)?;
return Ok(());
}
while remaining_rows >= bucket_num {
// "sample" normal columns
for normal_index in &self.normal_indices {
let val = vectors[*normal_index].get(cursor);
self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref());
for (idx, value) in self.normal_indices.iter().zip(tag_values_buf.iter()) {
self.output_buffer[*idx].push_value_ref(value);
}
// "fold" `le` and field columns
let le_array = batch.column(self.le_column_index);
let le_array = le_array.as_string::<i32>();
let field_array = batch.column(self.field_column_index);
let field_array = field_array.as_primitive::<Float64Type>();
let mut bucket = vec![];
let mut counters = vec![];
let mut bucket = Vec::with_capacity(bucket_num);
let mut counters = Vec::with_capacity(bucket_num);
for bias in 0..bucket_num {
let le_str = le_array.value(cursor + bias);
let le = le_str.parse::<f64>().unwrap();
let position = cursor + bias;
let le = if le_array.is_valid(position) {
le_array.value(position).parse::<f64>().unwrap_or(f64::NAN)
} else {
f64::NAN
};
bucket.push(le);
let counter = field_array.value(cursor + bias);
let counter = if field_array.is_valid(position) {
field_array.value(position)
} else {
f64::NAN
};
counters.push(counter);
}
// ignore invalid data
@@ -593,7 +667,9 @@ impl HistogramFoldStream {
let remaining_input_batch = batch.slice(cursor, remaining_rows);
self.input_buffered_rows = remaining_input_batch.num_rows();
self.input_buffer.push(remaining_input_batch);
if self.input_buffered_rows > 0 {
self.input_buffer.push(remaining_input_batch);
}
Ok(())
}
@@ -603,6 +679,170 @@ impl HistogramFoldStream {
self.input_buffer.push(batch);
}
fn maybe_take_output(&mut self) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
if self.output_buffered_rows >= self.batch_size {
return Ok(self.take_output_buf()?.map(Ok));
}
Ok(None)
}
fn switch_to_safe_mode(&mut self, remaining_batch: RecordBatch) -> DataFusionResult<()> {
self.mode = FoldMode::Safe;
self.bucket_size = None;
self.input_buffer.clear();
self.input_buffered_rows = remaining_batch.num_rows();
if self.input_buffered_rows > 0 {
self.input_buffer.push(remaining_batch);
self.process_safe_mode_buffer()?;
}
Ok(())
}
fn collect_tag_values<'a>(
&self,
vectors: &'a [VectorRef],
row: usize,
tag_values: &mut Vec<ValueRef<'a>>,
) {
tag_values.clear();
for idx in self.normal_indices.iter() {
tag_values.push(vectors[*idx].get_ref(row));
}
}
fn validate_optimistic_group(
&self,
vectors: &[VectorRef],
le_array: &StringArray,
cursor: usize,
bucket_num: usize,
tag_values: &[ValueRef<'_>],
) -> bool {
let inf_index = cursor + bucket_num - 1;
if !Self::is_positive_infinity(le_array, inf_index) {
return false;
}
for offset in 1..bucket_num {
let row = cursor + offset;
for (idx, expected) in self.normal_indices.iter().zip(tag_values.iter()) {
if vectors[*idx].get_ref(row) != *expected {
return false;
}
}
}
true
}
/// Checks whether a row belongs to the current group (same series).
fn is_same_group(
&self,
vectors: &[VectorRef],
row: usize,
tag_values: &[ValueRef<'_>],
) -> bool {
self.normal_indices
.iter()
.zip(tag_values.iter())
.all(|(idx, expected)| vectors[*idx].get_ref(row) == *expected)
}
fn push_output_row(&mut self, tag_values: &[ValueRef<'_>], result: f64) {
debug_assert_eq!(self.normal_indices.len(), tag_values.len());
for (idx, value) in self.normal_indices.iter().zip(tag_values.iter()) {
self.output_buffer[*idx].push_value_ref(value);
}
self.output_buffer[self.field_column_index].push_value_ref(&ValueRef::from(result));
self.output_buffered_rows += 1;
}
fn finalize_safe_group(&mut self) -> DataFusionResult<()> {
if let Some(group) = self.safe_group.take() {
if group.tag_values.is_empty() {
return Ok(());
}
let has_inf = group
.buckets
.last()
.map(|v| v.is_infinite() && v.is_sign_positive())
.unwrap_or(false);
let result = if group.buckets.len() < 2 || !has_inf {
f64::NAN
} else {
Self::evaluate_row(self.quantile, &group.buckets, &group.counters)
.unwrap_or(f64::NAN)
};
let mut tag_value_refs = Vec::with_capacity(group.tag_values.len());
tag_value_refs.extend(group.tag_values.iter().map(|v| v.as_value_ref()));
self.push_output_row(&tag_value_refs, result);
}
Ok(())
}
fn process_safe_mode_buffer(&mut self) -> DataFusionResult<()> {
if self.input_buffer.is_empty() {
self.input_buffered_rows = 0;
return Ok(());
}
let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?;
self.input_buffered_rows = 0;
let vectors = Helper::try_into_vectors(batch.columns())
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let le_array = batch.column(self.le_column_index).as_string::<i32>();
let field_array = batch
.column(self.field_column_index)
.as_primitive::<Float64Type>();
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
for row in 0..batch.num_rows() {
self.collect_tag_values(&vectors, row, &mut tag_values_buf);
let should_start_new_group = self
.safe_group
.as_ref()
.is_none_or(|group| !Self::tag_values_equal(&group.tag_values, &tag_values_buf));
if should_start_new_group {
self.finalize_safe_group()?;
self.safe_group = Some(SafeGroup {
tag_values: tag_values_buf.iter().cloned().map(Value::from).collect(),
buckets: Vec::new(),
counters: Vec::new(),
});
}
let Some(group) = self.safe_group.as_mut() else {
continue;
};
let bucket = if le_array.is_valid(row) {
le_array.value(row).parse::<f64>().unwrap_or(f64::NAN)
} else {
f64::NAN
};
let counter = if field_array.is_valid(row) {
field_array.value(row)
} else {
f64::NAN
};
group.buckets.push(bucket);
group.counters.push(counter);
}
Ok(())
}
fn tag_values_equal(group_values: &[Value], current: &[ValueRef<'_>]) -> bool {
group_values.len() == current.len()
&& group_values
.iter()
.zip(current.iter())
.all(|(group, now)| group.as_value_ref() == *now)
}
/// Compute result from output buffer
fn take_output_buf(&mut self) -> DataFusionResult<Option<RecordBatch>> {
if self.output_buffered_rows == 0 {
@@ -630,41 +870,31 @@ impl HistogramFoldStream {
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
}
/// Find the first `+Inf` which indicates the end of the bucket group
///
/// If the return value equals to batch's num_rows means the it's not found
/// in this batch
fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult<usize> {
// fuse this function. It should not be called when the
// bucket size is already know.
if let Some(bucket_size) = self.bucket_size {
return Ok(bucket_size);
}
let string_le_array = batch.column(self.le_column_index);
let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| {
DataFusionError::Execution(format!(
"cannot cast {} array to float64 array: {:?}",
string_le_array.data_type(),
e
))
})?;
let le_as_f64_array = float_le_array
.as_primitive_opt::<Float64Type>()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"expect a float64 array, but found {}",
float_le_array.data_type()
))
})?;
for (i, v) in le_as_f64_array.iter().enumerate() {
if let Some(v) = v
&& v == f64::INFINITY
{
return Ok(i);
fn flush_remaining(&mut self) -> DataFusionResult<()> {
if self.mode == FoldMode::Optimistic && self.input_buffered_rows > 0 {
let buffered_batches: Vec<_> = self.input_buffer.drain(..).collect();
if !buffered_batches.is_empty() {
let batch = concat_batches(&self.input_schema, buffered_batches.as_slice())?;
self.switch_to_safe_mode(batch)?;
} else {
self.input_buffered_rows = 0;
}
}
Ok(batch.num_rows())
if self.mode == FoldMode::Safe {
self.process_safe_mode_buffer()?;
self.finalize_safe_group()?;
}
Ok(())
}
fn is_positive_infinity(le_array: &StringArray, index: usize) -> bool {
le_array.is_valid(index)
&& matches!(
le_array.value(index).parse::<f64>(),
Ok(value) if value.is_infinite() && value.is_sign_positive()
)
}
/// Evaluate the field column and return the result
@@ -693,8 +923,28 @@ impl HistogramFoldStream {
}
// check input value
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
if !bucket.windows(2).all(|w| w[0] <= w[1]) {
return Ok(f64::NAN);
}
let counter = {
let needs_fix =
counter.iter().any(|v| !v.is_finite()) || !counter.windows(2).all(|w| w[0] <= w[1]);
if !needs_fix {
Cow::Borrowed(counter)
} else {
let mut fixed = Vec::with_capacity(counter.len());
let mut prev = 0.0;
for (idx, &v) in counter.iter().enumerate() {
let mut val = if v.is_finite() { v } else { prev };
if idx > 0 && val < prev {
val = prev;
}
fixed.push(val);
prev = val;
}
Cow::Owned(fixed)
}
};
let total = *counter.last().unwrap();
let expected_pos = total * quantile;
@@ -713,6 +963,9 @@ impl HistogramFoldStream {
lower_bound = bucket[fit_bucket_pos - 1];
lower_count = counter[fit_bucket_pos - 1];
}
if (upper_count - lower_count).abs() < 1e-10 {
return Ok(f64::NAN);
}
Ok(lower_bound
+ (upper_bound - lower_bound) / (upper_count - lower_count)
* (expected_pos - lower_count))
@@ -724,8 +977,8 @@ impl HistogramFoldStream {
mod test {
use std::sync::Arc;
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{Field, Schema, SchemaRef, TimeUnit};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
@@ -792,6 +1045,43 @@ mod test {
))
}
fn build_fold_exec_from_batches(
batches: Vec<RecordBatch>,
schema: SchemaRef,
quantile: f64,
ts_column_index: usize,
) -> Arc<HistogramFoldExec> {
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(),
)));
let output_schema: SchemaRef = Arc::new(
HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
"le",
)
.unwrap()
.as_arrow()
.clone(),
);
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
quantile,
ts_column_index,
input: memory_exec,
output_schema,
metric: ExecutionPlanMetricsSet::new(),
properties,
})
}
#[tokio::test]
async fn fold_overall() {
let memory_exec = Arc::new(prepare_test_data());
@@ -863,6 +1153,187 @@ mod test {
assert_eq!(actual, expected_output_schema)
}
#[tokio::test]
async fn fallback_to_safe_mode_on_missing_inf() {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let host_column = Arc::new(StringArray::from(vec!["a", "a", "a", "a", "b", "b"])) as _;
let le_column = Arc::new(StringArray::from(vec![
"0.1", "+Inf", "0.1", "1.0", "0.1", "+Inf",
])) as _;
let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 3.0, 1.0, 5.0])) as _;
let batch =
RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap();
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
let expected = String::from(
"+------+-----+
| host | val |
+------+-----+
| a | 0.1 |
| a | NaN |
| b | 0.1 |
+------+-----+",
);
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn emit_nan_when_no_inf_present() {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let host_column = Arc::new(StringArray::from(vec!["c", "c"])) as _;
let le_column = Arc::new(StringArray::from(vec!["0.1", "1.0"])) as _;
let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0])) as _;
let batch =
RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap();
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.9, 0);
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
let expected = String::from(
"+------+-----+
| host | val |
+------+-----+
| c | NaN |
+------+-----+",
);
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn safe_mode_handles_misaligned_groups() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
2900000, 2900000, 2900000, 3000000, 3000000, 3000000, 3000000, 3005000, 3005000,
3010000, 3010000, 3010000, 3010000, 3010000,
])) as _;
let le_column = Arc::new(StringArray::from(vec![
"0.1", "1", "5", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf",
])) as _;
let val_column = Arc::new(Float64Array::from(vec![
0.0, 0.0, 0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0,
])) as _;
let batch =
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
let mut values = Vec::new();
for batch in result {
let array = batch.column(1).as_primitive::<Float64Type>();
values.extend(array.iter().map(|v| v.unwrap()));
}
assert_eq!(values.len(), 4);
assert!(values[0].is_nan());
assert!((values[1] - 0.55).abs() < 1e-10);
assert!((values[2] - 0.1).abs() < 1e-10);
assert!((values[3] - 2.0).abs() < 1e-10);
}
#[tokio::test]
async fn missing_buckets_at_first_timestamp() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
2_900_000, 3_000_000, 3_000_000, 3_000_000, 3_000_000, 3_005_000, 3_005_000, 3_010_000,
3_010_000, 3_010_000, 3_010_000, 3_010_000,
])) as _;
let le_column = Arc::new(StringArray::from(vec![
"0.1", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf",
])) as _;
let val_column = Arc::new(Float64Array::from(vec![
0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0,
])) as _;
let batch =
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
let mut values = Vec::new();
for batch in result {
let array = batch.column(1).as_primitive::<Float64Type>();
values.extend(array.iter().map(|v| v.unwrap()));
}
assert_eq!(values.len(), 4);
assert!(values[0].is_nan());
assert!((values[1] - 0.55).abs() < 1e-10);
assert!((values[2] - 0.1).abs() < 1e-10);
assert!((values[3] - 2.0).abs() < 1e-10);
}
#[tokio::test]
async fn missing_inf_in_first_group() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
1000, 1000, 1000, 2000, 2000, 2000, 2000,
])) as _;
let le_column = Arc::new(StringArray::from(vec![
"0.1", "1", "5", "0.1", "1", "5", "+Inf",
])) as _;
let val_column = Arc::new(Float64Array::from(vec![
0.0, 0.0, 0.0, 10.0, 20.0, 30.0, 30.0,
])) as _;
let batch =
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
let mut values = Vec::new();
for batch in result {
let array = batch.column(1).as_primitive::<Float64Type>();
values.extend(array.iter().map(|v| v.unwrap()));
}
assert_eq!(values.len(), 2);
assert!(values[0].is_nan());
assert!((values[1] - 0.55).abs() < 1e-10, "{values:?}");
}
#[test]
fn evaluate_row_normal_case() {
let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY];
@@ -935,11 +1406,11 @@ mod test {
}
#[test]
#[should_panic]
fn evaluate_out_of_order_input() {
let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY];
let counters = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0];
HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
assert_eq!(0.0, result);
}
#[test]
@@ -957,4 +1428,20 @@ mod test {
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
assert_eq!(3.0, result);
}
#[test]
fn evaluate_non_monotonic_counter() {
let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY];
let counters = [0.1, 0.2, 0.4, 0.17, 0.5];
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
assert!((result - 1.25).abs() < 1e-10, "{result}");
}
#[test]
fn evaluate_nan_counter() {
let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY];
let counters = [f64::NAN, 1.0, 2.0, 3.0, 3.0];
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
assert!((result - 1.5).abs() < 1e-10, "{result}");
}
}

View File

@@ -56,6 +56,10 @@ impl<S, F> FsPuffinManager<S, F> {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub fn file_accessor(&self) -> &F {
&self.puffin_file_accessor
}
}
#[async_trait]

View File

@@ -1 +1 @@
v0.11.8
v0.11.9

View File

@@ -47,8 +47,8 @@ pub struct ManifestSstEntry {
pub region_sequence: RegionSeq,
/// Engine-specific file identifier (string form).
pub file_id: String,
/// Engine-specific index file identifier (string form).
pub index_file_id: Option<String>,
/// Index version, increment when the index file is rebuilt.
pub index_version: u64,
/// SST level.
pub level: u8,
/// Full path of the SST file in object store.
@@ -91,7 +91,7 @@ impl ManifestSstEntry {
ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
ColumnSchema::new("file_id", Ty::string_datatype(), false),
ColumnSchema::new("index_file_id", Ty::string_datatype(), true),
ColumnSchema::new("index_version", Ty::uint64_datatype(), false),
ColumnSchema::new("level", Ty::uint8_datatype(), false),
ColumnSchema::new("file_path", Ty::string_datatype(), false),
ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
@@ -119,7 +119,7 @@ impl ManifestSstEntry {
let region_groups = entries.iter().map(|e| e.region_group);
let region_sequences = entries.iter().map(|e| e.region_sequence);
let file_ids = entries.iter().map(|e| e.file_id.as_str());
let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref());
let index_versions = entries.iter().map(|e| e.index_version);
let levels = entries.iter().map(|e| e.level);
let file_paths = entries.iter().map(|e| e.file_path.as_str());
let file_sizes = entries.iter().map(|e| e.file_size);
@@ -151,7 +151,7 @@ impl ManifestSstEntry {
Arc::new(UInt8Array::from_iter_values(region_groups)),
Arc::new(UInt32Array::from_iter_values(region_sequences)),
Arc::new(StringArray::from_iter_values(file_ids)),
Arc::new(StringArray::from_iter(index_file_ids)),
Arc::new(UInt64Array::from_iter(index_versions)),
Arc::new(UInt8Array::from_iter_values(levels)),
Arc::new(StringArray::from_iter_values(file_paths)),
Arc::new(UInt64Array::from_iter_values(file_sizes)),
@@ -437,7 +437,7 @@ mod tests {
region_group: region_group1,
region_sequence: region_seq1,
file_id: "f1".to_string(),
index_file_id: None,
index_version: 0,
level: 1,
file_path: "/p1".to_string(),
file_size: 100,
@@ -461,7 +461,7 @@ mod tests {
region_group: region_group2,
region_sequence: region_seq2,
file_id: "f2".to_string(),
index_file_id: Some("idx".to_string()),
index_version: 1,
level: 3,
file_path: "/p2".to_string(),
file_size: 200,
@@ -548,13 +548,13 @@ mod tests {
assert_eq!("f1", file_ids.value(0));
assert_eq!("f2", file_ids.value(1));
let index_file_ids = batch
let index_versions = batch
.column(7)
.as_any()
.downcast_ref::<StringArray>()
.downcast_ref::<UInt64Array>()
.unwrap();
assert!(index_file_ids.is_null(0));
assert_eq!("idx", index_file_ids.value(1));
assert_eq!(0, index_versions.value(0));
assert_eq!(1, index_versions.value(1));
let levels = batch
.column(8)

View File

@@ -26,6 +26,6 @@ pub use datatypes::schema::{
};
pub use self::descriptors::*;
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
pub use self::types::{SequenceNumber, SequenceRange};

View File

@@ -24,6 +24,9 @@ use uuid::Uuid;
use crate::ManifestVersion;
use crate::storage::RegionId;
/// Index version, incremented when the index file is rebuilt.
pub type IndexVersion = u64;
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,
@@ -70,15 +73,21 @@ impl FromStr for FileId {
}
}
/// Indicating holding a `FileHandle` reference for a specific file&index in a region.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileRef {
pub region_id: RegionId,
pub file_id: FileId,
pub index_version: IndexVersion,
}
impl FileRef {
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
Self { region_id, file_id }
pub fn new(region_id: RegionId, file_id: FileId, index_version: u64) -> Self {
Self {
region_id,
file_id,
index_version,
}
}
}

View File

@@ -10,7 +10,7 @@ DESC TABLE information_schema.ssts_manifest;
| region_group | UInt8 | | NO | | FIELD |
| region_sequence | UInt32 | | NO | | FIELD |
| file_id | String | | NO | | FIELD |
| index_file_id | String | | YES | | FIELD |
| index_version | UInt64 | | NO | | FIELD |
| level | UInt8 | | NO | | FIELD |
| file_path | String | | NO | | FIELD |
| file_size | UInt64 | | NO | | FIELD |
@@ -97,13 +97,13 @@ ADMIN FLUSH_TABLE('sst_case');
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
SELECT * FROM information_schema.ssts_manifest order by file_path;
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
@@ -165,15 +165,15 @@ ADMIN FLUSH_TABLE('sst_case');
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
SELECT * FROM information_schema.ssts_manifest order by file_path;
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>

View File

@@ -363,3 +363,52 @@ drop table greptime_servers_postgres_query_elapsed_no_le;
Affected Rows: 0
-- test case with some missing buckets
create table histogram5_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
Affected Rows: 0
insert into histogram5_bucket values
(3000000, "0.1", "a", 0),
-- (3000000, "1", "a", 0),
-- (3000000, "5", "a", 0),
-- (3000000, "+Inf", "a", 0),
(3005000, "0.1", "a", 50),
(3005000, "1", "a", 70),
(3005000, "5", "a", 110),
(3005000, "+Inf", "a", 120),
(3010000, "0.1", "a", 10),
-- (3010000, "1", "a", 20),
-- (3010000, "5", "a", 20),
(3010000, "+Inf", "a", 30),
(3015000, "0.1", "a", 10),
(3015000, "1", "a", 10),
(3015000, "3", "a", 20), --
(3015000, "5", "a", 30),
(3015000, "+Inf", "a", 50);
Affected Rows: 12
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
+---------------------+---+--------------------+
| ts | s | val |
+---------------------+---+--------------------+
| 1970-01-01T00:50:00 | a | NaN |
| 1970-01-01T00:50:03 | a | NaN |
| 1970-01-01T00:50:06 | a | 0.5499999999999999 |
| 1970-01-01T00:50:09 | a | 0.5499999999999999 |
| 1970-01-01T00:50:12 | a | 0.775 |
| 1970-01-01T00:50:15 | a | 4.0 |
+---------------------+---+--------------------+
drop table histogram5_bucket;
Affected Rows: 0

View File

@@ -204,3 +204,36 @@ tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, le) (rate(g
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fbf) (rate(greptime_servers_postgres_query_elapsed_no_le{instance=~"xxx"}[1m])));
drop table greptime_servers_postgres_query_elapsed_no_le;
-- test case with some missing buckets
create table histogram5_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
insert into histogram5_bucket values
(3000000, "0.1", "a", 0),
-- (3000000, "1", "a", 0),
-- (3000000, "5", "a", 0),
-- (3000000, "+Inf", "a", 0),
(3005000, "0.1", "a", 50),
(3005000, "1", "a", 70),
(3005000, "5", "a", 110),
(3005000, "+Inf", "a", 120),
(3010000, "0.1", "a", 10),
-- (3010000, "1", "a", 20),
-- (3010000, "5", "a", 20),
(3010000, "+Inf", "a", 30),
(3015000, "0.1", "a", 10),
(3015000, "1", "a", 10),
(3015000, "3", "a", 20), --
(3015000, "5", "a", 30),
(3015000, "+Inf", "a", 50);
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
drop table histogram5_bucket;

View File

@@ -400,9 +400,9 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | ssts_manifest | file_id | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | ssts_manifest | file_path | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | ssts_manifest | file_size | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | ssts_manifest | index_file_id | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | ssts_manifest | index_file_path | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | ssts_manifest | index_file_size | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | ssts_manifest | index_version | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | ssts_manifest | level | 9 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
| greptime | information_schema | ssts_manifest | max_ts | 18 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
| greptime | information_schema | ssts_manifest | min_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |