mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 22:49:58 +00:00
Compare commits
14 Commits
docs/vecto
...
feat/index
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
122dfbf9f8 | ||
|
|
3c7fa84442 | ||
|
|
0cd368d1f6 | ||
|
|
c82208dbc0 | ||
|
|
bbbe91e97a | ||
|
|
987e1b5a15 | ||
|
|
9cac640b41 | ||
|
|
9197e818ec | ||
|
|
36d89c3baf | ||
|
|
0ebfd161d8 | ||
|
|
8b26a98c3b | ||
|
|
7199823be9 | ||
|
|
60f752d306 | ||
|
|
edb1f6086f |
32
.github/workflows/release.yml
vendored
32
.github/workflows/release.yml
vendored
@@ -49,14 +49,9 @@ on:
|
||||
description: Do not run integration tests during the build
|
||||
type: boolean
|
||||
default: true
|
||||
build_linux_amd64_artifacts:
|
||||
build_linux_artifacts:
|
||||
type: boolean
|
||||
description: Build linux-amd64 artifacts
|
||||
required: false
|
||||
default: false
|
||||
build_linux_arm64_artifacts:
|
||||
type: boolean
|
||||
description: Build linux-arm64 artifacts
|
||||
description: Build linux artifacts (both amd64 and arm64)
|
||||
required: false
|
||||
default: false
|
||||
build_macos_artifacts:
|
||||
@@ -144,7 +139,7 @@ jobs:
|
||||
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
|
||||
|
||||
- name: Allocate linux-amd64 runner
|
||||
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
uses: ./.github/actions/start-runner
|
||||
id: start-linux-amd64-runner
|
||||
with:
|
||||
@@ -158,7 +153,7 @@ jobs:
|
||||
subnet-id: ${{ vars.EC2_RUNNER_SUBNET_ID }}
|
||||
|
||||
- name: Allocate linux-arm64 runner
|
||||
if: ${{ inputs.build_linux_arm64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
uses: ./.github/actions/start-runner
|
||||
id: start-linux-arm64-runner
|
||||
with:
|
||||
@@ -173,7 +168,7 @@ jobs:
|
||||
|
||||
build-linux-amd64-artifacts:
|
||||
name: Build linux-amd64 artifacts
|
||||
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [
|
||||
allocate-runners,
|
||||
]
|
||||
@@ -195,7 +190,7 @@ jobs:
|
||||
|
||||
build-linux-arm64-artifacts:
|
||||
name: Build linux-arm64 artifacts
|
||||
if: ${{ inputs.build_linux_arm64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [
|
||||
allocate-runners,
|
||||
]
|
||||
@@ -217,7 +212,7 @@ jobs:
|
||||
|
||||
run-multi-lang-tests:
|
||||
name: Run Multi-language SDK Tests
|
||||
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [
|
||||
allocate-runners,
|
||||
build-linux-amd64-artifacts,
|
||||
@@ -386,7 +381,18 @@ jobs:
|
||||
|
||||
publish-github-release:
|
||||
name: Create GitHub release and upload artifacts
|
||||
if: ${{ inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
# Use always() to run even when optional jobs (macos, windows) are skipped.
|
||||
# Then check that required jobs succeeded and optional jobs didn't fail.
|
||||
if: |
|
||||
always() &&
|
||||
(inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule') &&
|
||||
needs.allocate-runners.result == 'success' &&
|
||||
(needs.build-linux-amd64-artifacts.result == 'success' || needs.build-linux-amd64-artifacts.result == 'skipped') &&
|
||||
(needs.build-linux-arm64-artifacts.result == 'success' || needs.build-linux-arm64-artifacts.result == 'skipped') &&
|
||||
(needs.build-macos-artifacts.result == 'success' || needs.build-macos-artifacts.result == 'skipped') &&
|
||||
(needs.build-windows-artifacts.result == 'success' || needs.build-windows-artifacts.result == 'skipped') &&
|
||||
(needs.release-images-to-dockerhub.result == 'success' || needs.release-images-to-dockerhub.result == 'skipped') &&
|
||||
(needs.run-multi-lang-tests.result == 'success' || needs.run-multi-lang-tests.result == 'skipped')
|
||||
needs: [ # The job have to wait for all the artifacts are built.
|
||||
allocate-runners,
|
||||
build-linux-amd64-artifacts,
|
||||
|
||||
@@ -163,7 +163,7 @@ impl ObjbenchCommand {
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows,
|
||||
num_row_groups,
|
||||
sequence: None,
|
||||
@@ -565,6 +565,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
|
||||
struct Noop;
|
||||
impl FilePurger for Noop {
|
||||
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
|
||||
fn update_index(&self, _file_meta: FileMeta, _version: store_api::storage::IndexVersion) {}
|
||||
}
|
||||
Arc::new(Noop)
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
mod basic;
|
||||
mod candidate_select;
|
||||
mod con;
|
||||
mod concurrent;
|
||||
mod config;
|
||||
mod err_handle;
|
||||
mod full_list;
|
||||
|
||||
@@ -50,7 +50,7 @@ impl GcScheduler {
|
||||
let now = Instant::now();
|
||||
|
||||
// Check if enough time has passed since last cleanup
|
||||
if now.duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
|
||||
if now.saturating_duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ mod tests {
|
||||
.index_file_path
|
||||
.map(|path| path.replace(&e.file_id, "<file_id>"));
|
||||
e.file_id = "<file_id>".to_string();
|
||||
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
|
||||
e.index_version = 0;
|
||||
format!("\n{:?}", e)
|
||||
})
|
||||
.sorted()
|
||||
@@ -128,12 +128,12 @@ mod tests {
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
|
||||
);
|
||||
// list from storage
|
||||
let storage_entries = mito
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
|
||||
use crate::read::{FlatSource, Source};
|
||||
use crate::region::options::IndexOptions;
|
||||
use crate::sst::file::{FileHandle, RegionFileId};
|
||||
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::IndexerBuilderImpl;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
|
||||
@@ -216,7 +216,7 @@ impl AccessLayer {
|
||||
pub(crate) async fn delete_sst(
|
||||
&self,
|
||||
region_file_id: &RegionFileId,
|
||||
index_file_id: &RegionFileId,
|
||||
index_file_id: &RegionIndexId,
|
||||
) -> Result<()> {
|
||||
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
|
||||
self.object_store
|
||||
@@ -226,12 +226,22 @@ impl AccessLayer {
|
||||
file_id: region_file_id.file_id(),
|
||||
})?;
|
||||
|
||||
let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type);
|
||||
// Delete all versions of the index file.
|
||||
for version in 0..=index_file_id.version {
|
||||
self.delete_index(&RegionIndexId::new(index_file_id.file_id, version))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_index(&self, region_index_id: &RegionIndexId) -> Result<()> {
|
||||
let path = location::index_file_path(&self.table_dir, *region_index_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteIndexSnafu {
|
||||
file_id: region_file_id.file_id(),
|
||||
file_id: region_index_id.file_id(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
@@ -291,6 +301,7 @@ impl AccessLayer {
|
||||
puffin_manager: self
|
||||
.puffin_manager_factory
|
||||
.build(store, path_provider.clone()),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: self.intermediate_manager.clone(),
|
||||
index_options: request.index_options,
|
||||
inverted_index_config: request.inverted_index_config,
|
||||
@@ -468,9 +479,10 @@ impl TempFileCleaner {
|
||||
}
|
||||
|
||||
/// Removes the SST and index file from the local atomic dir by the file id.
|
||||
/// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0.
|
||||
pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
|
||||
let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
|
||||
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
|
||||
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
|
||||
|
||||
Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
|
||||
}
|
||||
@@ -553,9 +565,12 @@ async fn clean_dir(dir: &str) -> Result<()> {
|
||||
|
||||
/// Path provider for SST file and index file.
|
||||
pub trait FilePathProvider: Send + Sync {
|
||||
/// Creates index file path of given file id.
|
||||
/// Creates index file path of given file id. Version default to 0, and not shown in the path.
|
||||
fn build_index_file_path(&self, file_id: RegionFileId) -> String;
|
||||
|
||||
/// Creates index file path of given index id (with version support).
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
|
||||
|
||||
/// Creates SST file path of given file id.
|
||||
fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
|
||||
}
|
||||
@@ -575,7 +590,16 @@ impl WriteCachePathProvider {
|
||||
|
||||
impl FilePathProvider for WriteCachePathProvider {
|
||||
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
|
||||
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
|
||||
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
|
||||
self.file_cache.cache_file_path(puffin_key)
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
|
||||
let puffin_key = IndexKey::new(
|
||||
index_id.region_id(),
|
||||
index_id.file_id(),
|
||||
FileType::Puffin(index_id.version),
|
||||
);
|
||||
self.file_cache.cache_file_path(puffin_key)
|
||||
}
|
||||
|
||||
@@ -605,7 +629,11 @@ impl RegionFilePathFactory {
|
||||
|
||||
impl FilePathProvider for RegionFilePathFactory {
|
||||
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
|
||||
location::index_file_path(&self.table_dir, file_id, self.path_type)
|
||||
location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
|
||||
location::index_file_path(&self.table_dir, index_id, self.path_type)
|
||||
}
|
||||
|
||||
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
|
||||
use crate::cache::write_cache::WriteCacheRef;
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
|
||||
use crate::read::Batch;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::parquet::reader::MetadataCacheMetrics;
|
||||
|
||||
/// Metrics type key for sst meta.
|
||||
@@ -180,7 +180,7 @@ impl CacheStrategy {
|
||||
}
|
||||
|
||||
/// Calls [CacheManager::evict_puffin_cache()].
|
||||
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
|
||||
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
|
||||
match self {
|
||||
CacheStrategy::EnableAll(cache_manager) => {
|
||||
cache_manager.evict_puffin_cache(file_id).await
|
||||
@@ -400,7 +400,7 @@ impl CacheManager {
|
||||
}
|
||||
|
||||
/// Evicts every puffin-related cache entry for the given file.
|
||||
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
|
||||
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
|
||||
if let Some(cache) = &self.bloom_filter_index_cache {
|
||||
cache.invalidate_file(file_id.file_id());
|
||||
}
|
||||
@@ -422,7 +422,7 @@ impl CacheManager {
|
||||
.remove(IndexKey::new(
|
||||
file_id.region_id(),
|
||||
file_id.file_id(),
|
||||
FileType::Puffin,
|
||||
FileType::Puffin(file_id.version),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
@@ -949,7 +949,7 @@ mod tests {
|
||||
let cache = Arc::new(cache);
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let region_file_id = RegionFileId::new(region_id, FileId::random());
|
||||
let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
|
||||
let column_id: ColumnId = 1;
|
||||
|
||||
let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
|
||||
@@ -957,16 +957,21 @@ mod tests {
|
||||
let result_cache = cache.index_result_cache().unwrap();
|
||||
let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
|
||||
|
||||
let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
|
||||
let bloom_key = (
|
||||
index_id.file_id(),
|
||||
index_id.version,
|
||||
column_id,
|
||||
Tag::Skipping,
|
||||
);
|
||||
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
|
||||
inverted_cache.put_metadata(
|
||||
region_file_id.file_id(),
|
||||
(index_id.file_id(), index_id.version),
|
||||
Arc::new(InvertedIndexMetas::default()),
|
||||
);
|
||||
let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
|
||||
let selection = Arc::new(RowGroupSelection::default());
|
||||
result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
|
||||
let file_id_str = region_file_id.to_string();
|
||||
result_cache.put(predicate.clone(), index_id.file_id(), selection);
|
||||
let file_id_str = index_id.to_string();
|
||||
let metadata = Arc::new(FileMetadata {
|
||||
blobs: Vec::new(),
|
||||
properties: HashMap::new(),
|
||||
@@ -976,40 +981,32 @@ mod tests {
|
||||
assert!(bloom_cache.get_metadata(bloom_key).is_some());
|
||||
assert!(
|
||||
inverted_cache
|
||||
.get_metadata(region_file_id.file_id())
|
||||
.is_some()
|
||||
);
|
||||
assert!(
|
||||
result_cache
|
||||
.get(&predicate, region_file_id.file_id())
|
||||
.get_metadata((index_id.file_id(), index_id.version))
|
||||
.is_some()
|
||||
);
|
||||
assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
|
||||
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
|
||||
|
||||
cache.evict_puffin_cache(region_file_id).await;
|
||||
cache.evict_puffin_cache(index_id).await;
|
||||
|
||||
assert!(bloom_cache.get_metadata(bloom_key).is_none());
|
||||
assert!(
|
||||
inverted_cache
|
||||
.get_metadata(region_file_id.file_id())
|
||||
.is_none()
|
||||
);
|
||||
assert!(
|
||||
result_cache
|
||||
.get(&predicate, region_file_id.file_id())
|
||||
.get_metadata((index_id.file_id(), index_id.version))
|
||||
.is_none()
|
||||
);
|
||||
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
|
||||
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
|
||||
|
||||
// Refill caches and evict via CacheStrategy to ensure delegation works.
|
||||
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
|
||||
inverted_cache.put_metadata(
|
||||
region_file_id.file_id(),
|
||||
(index_id.file_id(), index_id.version),
|
||||
Arc::new(InvertedIndexMetas::default()),
|
||||
);
|
||||
result_cache.put(
|
||||
predicate.clone(),
|
||||
region_file_id.file_id(),
|
||||
index_id.file_id(),
|
||||
Arc::new(RowGroupSelection::default()),
|
||||
);
|
||||
puffin_metadata_cache.put_metadata(
|
||||
@@ -1021,19 +1018,15 @@ mod tests {
|
||||
);
|
||||
|
||||
let strategy = CacheStrategy::EnableAll(cache.clone());
|
||||
strategy.evict_puffin_cache(region_file_id).await;
|
||||
strategy.evict_puffin_cache(index_id).await;
|
||||
|
||||
assert!(bloom_cache.get_metadata(bloom_key).is_none());
|
||||
assert!(
|
||||
inverted_cache
|
||||
.get_metadata(region_file_id.file_id())
|
||||
.is_none()
|
||||
);
|
||||
assert!(
|
||||
result_cache
|
||||
.get(&predicate, region_file_id.file_id())
|
||||
.get_metadata((index_id.file_id(), index_id.version))
|
||||
.is_none()
|
||||
);
|
||||
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
|
||||
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
50
src/mito2/src/cache/file_cache.rs
vendored
50
src/mito2/src/cache/file_cache.rs
vendored
@@ -71,7 +71,7 @@ impl FileCacheInner {
|
||||
fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
|
||||
match file_type {
|
||||
FileType::Parquet => &self.parquet_index,
|
||||
FileType::Puffin => &self.puffin_index,
|
||||
FileType::Puffin { .. } => &self.puffin_index,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +130,7 @@ impl FileCacheInner {
|
||||
// Track sizes separately for each file type
|
||||
match key.file_type {
|
||||
FileType::Parquet => parquet_size += size,
|
||||
FileType::Puffin => puffin_size += size,
|
||||
FileType::Puffin { .. } => puffin_size += size,
|
||||
}
|
||||
}
|
||||
// The metrics is a signed int gauge so we can updates it finally.
|
||||
@@ -178,7 +178,7 @@ impl FileCacheInner {
|
||||
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
|
||||
.with_label_values(&[match file_type {
|
||||
FileType::Parquet => "download_parquet",
|
||||
FileType::Puffin => "download_puffin",
|
||||
FileType::Puffin { .. } => "download_puffin",
|
||||
}])
|
||||
.start_timer();
|
||||
|
||||
@@ -607,7 +607,7 @@ impl fmt::Display for IndexKey {
|
||||
"{}.{}.{}",
|
||||
self.region_id.as_u64(),
|
||||
self.file_id,
|
||||
self.file_type.as_str()
|
||||
self.file_type
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -618,7 +618,16 @@ pub enum FileType {
|
||||
/// Parquet file.
|
||||
Parquet,
|
||||
/// Puffin file.
|
||||
Puffin,
|
||||
Puffin(u64),
|
||||
}
|
||||
|
||||
impl fmt::Display for FileType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
FileType::Parquet => write!(f, "parquet"),
|
||||
FileType::Puffin(version) => write!(f, "{}.puffin", version),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileType {
|
||||
@@ -626,16 +635,16 @@ impl FileType {
|
||||
fn parse(s: &str) -> Option<FileType> {
|
||||
match s {
|
||||
"parquet" => Some(FileType::Parquet),
|
||||
"puffin" => Some(FileType::Puffin),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the file type to string.
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
FileType::Parquet => "parquet",
|
||||
FileType::Puffin => "puffin",
|
||||
"puffin" => Some(FileType::Puffin(0)),
|
||||
_ => {
|
||||
// if post-fix with .puffin, try to parse the version
|
||||
if let Some(version_str) = s.strip_suffix(".puffin") {
|
||||
let version = version_str.parse::<u64>().ok()?;
|
||||
Some(FileType::Puffin(version))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -643,7 +652,7 @@ impl FileType {
|
||||
fn metric_label(&self) -> &'static str {
|
||||
match self {
|
||||
FileType::Parquet => FILE_TYPE,
|
||||
FileType::Puffin => INDEX_TYPE,
|
||||
FileType::Puffin(_) => INDEX_TYPE,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -921,6 +930,15 @@ mod tests {
|
||||
IndexKey::new(region_id, file_id, FileType::Parquet),
|
||||
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
IndexKey::new(region_id, file_id, FileType::Puffin(0)),
|
||||
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
IndexKey::new(region_id, file_id, FileType::Puffin(42)),
|
||||
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
|
||||
.unwrap()
|
||||
);
|
||||
assert!(parse_index_key("").is_none());
|
||||
assert!(parse_index_key(".").is_none());
|
||||
assert!(parse_index_key("5299989643269").is_none());
|
||||
|
||||
40
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
40
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
@@ -21,7 +21,7 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use index::bloom_filter::error::Result;
|
||||
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
|
||||
use store_api::storage::{ColumnId, FileId};
|
||||
use store_api::storage::{ColumnId, FileId, IndexVersion};
|
||||
|
||||
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
|
||||
use crate::metrics::{CACHE_HIT, CACHE_MISS};
|
||||
@@ -35,8 +35,10 @@ pub enum Tag {
|
||||
Fulltext,
|
||||
}
|
||||
|
||||
pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag);
|
||||
|
||||
/// Cache for bloom filter index.
|
||||
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
|
||||
pub type BloomFilterIndexCache = IndexCache<BloomFilterIndexKey, BloomFilterMeta>;
|
||||
pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
|
||||
|
||||
impl BloomFilterIndexCache {
|
||||
@@ -59,11 +61,9 @@ impl BloomFilterIndexCache {
|
||||
}
|
||||
|
||||
/// Calculates weight for bloom filter index metadata.
|
||||
fn bloom_filter_index_metadata_weight(
|
||||
k: &(FileId, ColumnId, Tag),
|
||||
meta: &Arc<BloomFilterMeta>,
|
||||
) -> u32 {
|
||||
fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc<BloomFilterMeta>) -> u32 {
|
||||
let base = k.0.as_bytes().len()
|
||||
+ std::mem::size_of::<IndexVersion>()
|
||||
+ std::mem::size_of::<ColumnId>()
|
||||
+ std::mem::size_of::<Tag>()
|
||||
+ std::mem::size_of::<BloomFilterMeta>();
|
||||
@@ -75,16 +75,14 @@ fn bloom_filter_index_metadata_weight(
|
||||
}
|
||||
|
||||
/// Calculates weight for bloom filter index content.
|
||||
fn bloom_filter_index_content_weight(
|
||||
(k, _): &((FileId, ColumnId, Tag), PageKey),
|
||||
v: &Bytes,
|
||||
) -> u32 {
|
||||
fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 {
|
||||
(k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
|
||||
}
|
||||
|
||||
/// Bloom filter index blob reader with cache.
|
||||
pub struct CachedBloomFilterIndexBlobReader<R> {
|
||||
file_id: FileId,
|
||||
index_version: IndexVersion,
|
||||
column_id: ColumnId,
|
||||
tag: Tag,
|
||||
blob_size: u64,
|
||||
@@ -96,6 +94,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
/// Creates a new bloom filter index blob reader with cache.
|
||||
pub fn new(
|
||||
file_id: FileId,
|
||||
index_version: IndexVersion,
|
||||
column_id: ColumnId,
|
||||
tag: Tag,
|
||||
blob_size: u64,
|
||||
@@ -104,6 +103,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
) -> Self {
|
||||
Self {
|
||||
file_id,
|
||||
index_version,
|
||||
column_id,
|
||||
tag,
|
||||
blob_size,
|
||||
@@ -126,7 +126,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
let (result, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
(self.file_id, self.index_version, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
offset,
|
||||
size,
|
||||
@@ -161,7 +161,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
let (page, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
(self.file_id, self.index_version, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
@@ -191,9 +191,9 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
if let Some(cached) = self
|
||||
.cache
|
||||
.get_metadata((self.file_id, self.column_id, self.tag))
|
||||
if let Some(cached) =
|
||||
self.cache
|
||||
.get_metadata((self.file_id, self.index_version, self.column_id, self.tag))
|
||||
{
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
if let Some(m) = metrics {
|
||||
@@ -203,7 +203,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
} else {
|
||||
let meta = self.inner.metadata(metrics).await?;
|
||||
self.cache.put_metadata(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
(self.file_id, self.index_version, self.column_id, self.tag),
|
||||
Arc::new(meta.clone()),
|
||||
);
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
@@ -223,6 +223,7 @@ mod test {
|
||||
#[test]
|
||||
fn bloom_filter_metadata_weight_counts_vec_contents() {
|
||||
let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
|
||||
let version = 0;
|
||||
let column_id: ColumnId = 42;
|
||||
let tag = Tag::Skipping;
|
||||
|
||||
@@ -246,10 +247,13 @@ mod test {
|
||||
],
|
||||
};
|
||||
|
||||
let weight =
|
||||
bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
|
||||
let weight = bloom_filter_index_metadata_weight(
|
||||
&(file_id, version, column_id, tag),
|
||||
&Arc::new(meta.clone()),
|
||||
);
|
||||
|
||||
let base = file_id.as_bytes().len()
|
||||
+ std::mem::size_of::<IndexVersion>()
|
||||
+ std::mem::size_of::<ColumnId>()
|
||||
+ std::mem::size_of::<Tag>()
|
||||
+ std::mem::size_of::<BloomFilterMeta>();
|
||||
|
||||
37
src/mito2/src/cache/index/inverted_index.rs
vendored
37
src/mito2/src/cache/index/inverted_index.rs
vendored
@@ -22,7 +22,7 @@ use bytes::Bytes;
|
||||
use index::inverted_index::error::Result;
|
||||
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
use prost::Message;
|
||||
use store_api::storage::FileId;
|
||||
use store_api::storage::{FileId, IndexVersion};
|
||||
|
||||
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
|
||||
use crate::metrics::{CACHE_HIT, CACHE_MISS};
|
||||
@@ -30,7 +30,7 @@ use crate::metrics::{CACHE_HIT, CACHE_MISS};
|
||||
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
|
||||
|
||||
/// Cache for inverted index.
|
||||
pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
|
||||
pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>;
|
||||
pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
|
||||
|
||||
impl InvertedIndexCache {
|
||||
@@ -48,23 +48,24 @@ impl InvertedIndexCache {
|
||||
|
||||
/// Removes all cached entries for the given `file_id`.
|
||||
pub fn invalidate_file(&self, file_id: FileId) {
|
||||
self.invalidate_if(move |key| *key == file_id);
|
||||
self.invalidate_if(move |key| key.0 == file_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculates weight for inverted index metadata.
|
||||
fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
|
||||
(k.as_bytes().len() + v.encoded_len()) as u32
|
||||
fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc<InvertedIndexMetas>) -> u32 {
|
||||
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.encoded_len()) as u32
|
||||
}
|
||||
|
||||
/// Calculates weight for inverted index content.
|
||||
fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
|
||||
(k.as_bytes().len() + v.len()) as u32
|
||||
fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 {
|
||||
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.len()) as u32
|
||||
}
|
||||
|
||||
/// Inverted index blob reader with cache.
|
||||
pub struct CachedInvertedIndexBlobReader<R> {
|
||||
file_id: FileId,
|
||||
index_version: IndexVersion,
|
||||
blob_size: u64,
|
||||
inner: R,
|
||||
cache: InvertedIndexCacheRef,
|
||||
@@ -72,9 +73,16 @@ pub struct CachedInvertedIndexBlobReader<R> {
|
||||
|
||||
impl<R> CachedInvertedIndexBlobReader<R> {
|
||||
/// Creates a new inverted index blob reader with cache.
|
||||
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
|
||||
pub fn new(
|
||||
file_id: FileId,
|
||||
index_version: IndexVersion,
|
||||
blob_size: u64,
|
||||
inner: R,
|
||||
cache: InvertedIndexCacheRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
file_id,
|
||||
index_version,
|
||||
blob_size,
|
||||
inner,
|
||||
cache,
|
||||
@@ -96,7 +104,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
let (result, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
self.file_id,
|
||||
(self.file_id, self.index_version),
|
||||
self.blob_size,
|
||||
offset,
|
||||
size,
|
||||
@@ -129,7 +137,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
let (page, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
self.file_id,
|
||||
(self.file_id, self.index_version),
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
@@ -156,7 +164,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>> {
|
||||
if let Some(cached) = self.cache.get_metadata(self.file_id) {
|
||||
if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) {
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
if let Some(m) = metrics {
|
||||
m.cache_hit += 1;
|
||||
@@ -164,7 +172,8 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
Ok(cached)
|
||||
} else {
|
||||
let meta = self.inner.metadata(metrics).await?;
|
||||
self.cache.put_metadata(self.file_id, meta.clone());
|
||||
self.cache
|
||||
.put_metadata((self.file_id, self.index_version), meta.clone());
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(meta)
|
||||
}
|
||||
@@ -299,6 +308,7 @@ mod test {
|
||||
// Init a test range reader in local fs.
|
||||
let mut env = TestEnv::new().await;
|
||||
let file_size = blob.len() as u64;
|
||||
let index_version = 0;
|
||||
let store = env.init_object_store_manager();
|
||||
let temp_path = "data";
|
||||
store.write(temp_path, blob).await.unwrap();
|
||||
@@ -314,6 +324,7 @@ mod test {
|
||||
let reader = InvertedIndexBlobReader::new(range_reader);
|
||||
let cached_reader = CachedInvertedIndexBlobReader::new(
|
||||
FileId::random(),
|
||||
index_version,
|
||||
file_size,
|
||||
reader,
|
||||
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
|
||||
@@ -450,7 +461,7 @@ mod test {
|
||||
let (read, _cache_metrics) = cached_reader
|
||||
.cache
|
||||
.get_or_load(
|
||||
cached_reader.file_id,
|
||||
(cached_reader.file_id, cached_reader.index_version),
|
||||
file_size,
|
||||
offset,
|
||||
size,
|
||||
|
||||
11
src/mito2/src/cache/write_cache.rs
vendored
11
src/mito2/src/cache/write_cache.rs
vendored
@@ -215,6 +215,7 @@ impl WriteCache {
|
||||
puffin_manager: self
|
||||
.puffin_manager_factory
|
||||
.build(store.clone(), path_provider.clone()),
|
||||
write_cache_enabled: true,
|
||||
intermediate_manager: self.intermediate_manager.clone(),
|
||||
index_options: write_request.index_options,
|
||||
inverted_index_config: write_request.inverted_index_config,
|
||||
@@ -266,7 +267,7 @@ impl WriteCache {
|
||||
upload_tracker.push_uploaded_file(parquet_path);
|
||||
|
||||
if sst.index_metadata.file_size > 0 {
|
||||
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
|
||||
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
|
||||
let puffin_path = upload_request
|
||||
.dest_path_provider
|
||||
.build_index_file_path(RegionFileId::new(region_id, sst.file_id));
|
||||
@@ -439,7 +440,11 @@ impl UploadTracker {
|
||||
file_cache.remove(parquet_key).await;
|
||||
|
||||
if sst.index_metadata.file_size > 0 {
|
||||
let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
|
||||
let puffin_key = IndexKey::new(
|
||||
self.region_id,
|
||||
sst.file_id,
|
||||
FileType::Puffin(sst.index_metadata.version),
|
||||
);
|
||||
file_cache.remove(puffin_key).await;
|
||||
}
|
||||
}
|
||||
@@ -548,7 +553,7 @@ mod tests {
|
||||
assert_eq!(remote_data.to_vec(), cache_data.to_vec());
|
||||
|
||||
// Check write cache contains the index key
|
||||
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
|
||||
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
|
||||
assert!(write_cache.file_cache.contains_key(&index_key));
|
||||
|
||||
let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
|
||||
|
||||
@@ -399,7 +399,7 @@ impl DefaultCompactor {
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
indexes: sst_info.index_metadata.build_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
sequence: max_sequence,
|
||||
|
||||
@@ -77,7 +77,7 @@ pub fn new_file_handle_with_size_and_sequence(
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
|
||||
@@ -135,7 +135,7 @@ use crate::read::stream::ScanBatchStream;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::region::opener::PartitionExprFetcherRef;
|
||||
use crate::request::{RegionEditRequest, WorkerRequest};
|
||||
use crate::sst::file::{FileMeta, RegionFileId};
|
||||
use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
use crate::wal::entry_distributor::{
|
||||
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
|
||||
@@ -541,22 +541,23 @@ impl MitoEngine {
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
let Some(index_file_id) = entry.index_file_id.as_ref() else {
|
||||
return Vec::new();
|
||||
};
|
||||
let file_id = match FileId::parse_str(index_file_id) {
|
||||
let index_version = entry.index_version;
|
||||
let file_id = match FileId::parse_str(&entry.file_id) {
|
||||
Ok(file_id) => file_id,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
err;
|
||||
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
|
||||
entry.table_dir,
|
||||
index_file_id
|
||||
entry.file_id
|
||||
);
|
||||
return Vec::new();
|
||||
}
|
||||
};
|
||||
let region_file_id = RegionFileId::new(entry.region_id, file_id);
|
||||
let region_index_id = RegionIndexId::new(
|
||||
RegionFileId::new(entry.region_id, file_id),
|
||||
index_version,
|
||||
);
|
||||
let context = IndexEntryContext {
|
||||
table_dir: &entry.table_dir,
|
||||
index_file_path: index_file_path.as_str(),
|
||||
@@ -565,7 +566,7 @@ impl MitoEngine {
|
||||
region_number: entry.region_number,
|
||||
region_group: entry.region_group,
|
||||
region_sequence: entry.region_sequence,
|
||||
file_id: index_file_id,
|
||||
file_id: &entry.file_id,
|
||||
index_file_size: entry.index_file_size,
|
||||
node_id,
|
||||
};
|
||||
@@ -576,7 +577,7 @@ impl MitoEngine {
|
||||
|
||||
collect_index_entries_from_puffin(
|
||||
manager,
|
||||
region_file_id,
|
||||
region_index_id,
|
||||
context,
|
||||
bloom_filter_cache,
|
||||
inverted_index_cache,
|
||||
|
||||
@@ -861,9 +861,10 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
|
||||
#[tokio::test]
|
||||
async fn test_list_ssts() {
|
||||
test_list_ssts_with_format(false, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
|
||||
r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
@@ -871,9 +872,10 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
|
||||
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
|
||||
test_list_ssts_with_format(true, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
|
||||
r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
@@ -945,13 +947,13 @@ async fn test_list_ssts_with_format(
|
||||
.index_file_path
|
||||
.map(|p| p.replace(&e.file_id, "<file_id>"));
|
||||
e.file_id = "<file_id>".to_string();
|
||||
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
|
||||
e.index_version = 0;
|
||||
format!("\n{:?}", e)
|
||||
})
|
||||
.sorted()
|
||||
.collect::<Vec<_>>()
|
||||
.join("");
|
||||
assert_eq!(debug_format, expected_manifest_ssts,);
|
||||
assert_eq!(debug_format, expected_manifest_ssts, "{}", debug_format);
|
||||
|
||||
// list from storage
|
||||
let storage_entries = engine
|
||||
@@ -969,7 +971,7 @@ async fn test_list_ssts_with_format(
|
||||
.sorted()
|
||||
.collect::<Vec<_>>()
|
||||
.join("");
|
||||
assert_eq!(debug_format, expected_storage_ssts,);
|
||||
assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -55,10 +55,10 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R
|
||||
return 0;
|
||||
}
|
||||
let mut index_files_count: usize = 0;
|
||||
for region_file_id in scanner.file_ids() {
|
||||
for region_index_id in scanner.index_ids() {
|
||||
let index_path = location::index_file_path(
|
||||
access_layer.table_dir(),
|
||||
region_file_id,
|
||||
region_index_id,
|
||||
access_layer.path_type(),
|
||||
);
|
||||
if access_layer
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::cache::index::bloom_filter_index::{
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
|
||||
};
|
||||
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::RegionIndexId;
|
||||
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
|
||||
use crate::sst::index::fulltext_index::{
|
||||
INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
|
||||
@@ -66,14 +66,14 @@ pub(crate) struct IndexEntryContext<'a> {
|
||||
/// Collect index metadata entries present in the SST puffin file.
|
||||
pub(crate) async fn collect_index_entries_from_puffin(
|
||||
manager: SstPuffinManager,
|
||||
region_file_id: RegionFileId,
|
||||
region_index_id: RegionIndexId,
|
||||
context: IndexEntryContext<'_>,
|
||||
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
|
||||
inverted_index_cache: Option<InvertedIndexCacheRef>,
|
||||
) -> Vec<PuffinIndexMetaEntry> {
|
||||
let mut entries = Vec::new();
|
||||
|
||||
let reader = match manager.reader(®ion_file_id).await {
|
||||
let reader = match manager.reader(®ion_index_id).await {
|
||||
Ok(reader) => reader,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
@@ -104,7 +104,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
|
||||
Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
|
||||
let bloom_meta = try_read_bloom_meta(
|
||||
&reader,
|
||||
region_file_id,
|
||||
region_index_id,
|
||||
blob.blob_type.as_str(),
|
||||
target_key,
|
||||
bloom_filter_cache.as_ref(),
|
||||
@@ -130,7 +130,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
|
||||
Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
|
||||
let bloom_meta = try_read_bloom_meta(
|
||||
&reader,
|
||||
region_file_id,
|
||||
region_index_id,
|
||||
blob.blob_type.as_str(),
|
||||
target_key,
|
||||
bloom_filter_cache.as_ref(),
|
||||
@@ -172,7 +172,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
|
||||
Some(BlobIndexTypeTargetKey::Inverted) => {
|
||||
let mut inverted_entries = collect_inverted_entries(
|
||||
&reader,
|
||||
region_file_id,
|
||||
region_index_id,
|
||||
inverted_index_cache.as_ref(),
|
||||
&context,
|
||||
)
|
||||
@@ -188,12 +188,12 @@ pub(crate) async fn collect_index_entries_from_puffin(
|
||||
|
||||
async fn collect_inverted_entries(
|
||||
reader: &SstPuffinReader,
|
||||
region_file_id: RegionFileId,
|
||||
region_index_id: RegionIndexId,
|
||||
cache: Option<&InvertedIndexCacheRef>,
|
||||
context: &IndexEntryContext<'_>,
|
||||
) -> Vec<PuffinIndexMetaEntry> {
|
||||
// Read the inverted index blob and surface its per-column metadata entries.
|
||||
let file_id = region_file_id.file_id();
|
||||
let file_id = region_index_id.file_id();
|
||||
|
||||
let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
|
||||
Ok(guard) => guard,
|
||||
@@ -229,6 +229,7 @@ async fn collect_inverted_entries(
|
||||
let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
|
||||
let reader = CachedInvertedIndexBlobReader::new(
|
||||
file_id,
|
||||
region_index_id.version,
|
||||
blob_size,
|
||||
InvertedIndexBlobReader::new(blob_reader),
|
||||
cache.clone(),
|
||||
@@ -289,7 +290,7 @@ fn build_inverted_entries(
|
||||
|
||||
async fn try_read_bloom_meta(
|
||||
reader: &SstPuffinReader,
|
||||
region_file_id: RegionFileId,
|
||||
region_index_id: RegionIndexId,
|
||||
blob_type: &str,
|
||||
target_key: &str,
|
||||
cache: Option<&BloomFilterIndexCacheRef>,
|
||||
@@ -311,7 +312,8 @@ async fn try_read_bloom_meta(
|
||||
let result = match (cache, column_id, blob_size) {
|
||||
(Some(cache), Some(column_id), Some(blob_size)) => {
|
||||
CachedBloomFilterIndexBlobReader::new(
|
||||
region_file_id.file_id(),
|
||||
region_index_id.file_id(),
|
||||
region_index_id.version,
|
||||
column_id,
|
||||
tag,
|
||||
blob_size,
|
||||
|
||||
@@ -643,7 +643,7 @@ impl RegionFlushTask {
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
indexes: sst_info.index_metadata.build_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
sequence: NonZeroU64::new(max_sequence),
|
||||
|
||||
@@ -330,10 +330,9 @@ impl LocalGcWorker {
|
||||
|
||||
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
|
||||
// index file design is stable
|
||||
let file_pairs: Vec<(FileId, FileId)> = unused_files
|
||||
.iter()
|
||||
.map(|file_id| (*file_id, *file_id))
|
||||
.collect();
|
||||
let file_pairs: Vec<(FileId, u64)> =
|
||||
unused_files.iter().map(|file_id| (*file_id, 0)).collect();
|
||||
// TODO(discord9): gc worker need another major refactor to support versioned index files
|
||||
|
||||
debug!(
|
||||
"Found {} unused index files to delete for region {}",
|
||||
@@ -354,7 +353,7 @@ impl LocalGcWorker {
|
||||
Ok(unused_files)
|
||||
}
|
||||
|
||||
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
|
||||
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
|
||||
delete_files(
|
||||
region_id,
|
||||
file_ids,
|
||||
|
||||
@@ -247,7 +247,7 @@ async fn checkpoint_with_different_compression_types() {
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
@@ -312,7 +312,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
|
||||
@@ -84,6 +84,14 @@ impl ProjectionMapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the projection includes any tag columns.
|
||||
pub(crate) fn has_tags(&self) -> bool {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => m.has_tags(),
|
||||
ProjectionMapper::Flat(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns ids of projected columns that we need to read
|
||||
/// from memtables and SSTs.
|
||||
pub(crate) fn column_ids(&self) -> &[ColumnId] {
|
||||
@@ -257,6 +265,11 @@ impl PrimaryKeyProjectionMapper {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
/// Returns true if the projection includes any tag columns.
|
||||
pub(crate) fn has_tags(&self) -> bool {
|
||||
self.has_tags
|
||||
}
|
||||
|
||||
/// Returns ids of projected columns that we need to read
|
||||
/// from memtables and SSTs.
|
||||
pub(crate) fn column_ids(&self) -> &[ColumnId] {
|
||||
|
||||
@@ -135,6 +135,14 @@ impl Scanner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
|
||||
Scanner::Series(series_scan) => series_scan.input().index_ids(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
|
||||
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
|
||||
use store_api::region_engine::{PrepareRequest, RegionScanner};
|
||||
@@ -958,6 +966,7 @@ impl ScanInput {
|
||||
) -> Result<FileRangeBuilder> {
|
||||
let predicate = self.predicate_for_file(file);
|
||||
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
|
||||
let decode_pk_values = !self.compaction && self.mapper.has_tags();
|
||||
let res = self
|
||||
.access_layer
|
||||
.read_sst(file.clone())
|
||||
@@ -971,6 +980,7 @@ impl ScanInput {
|
||||
.flat_format(self.flat_format)
|
||||
.compaction(self.compaction)
|
||||
.pre_filter_mode(filter_mode)
|
||||
.decode_primary_key_values(decode_pk_values)
|
||||
.build_reader_input(reader_metrics)
|
||||
.await;
|
||||
let (mut file_range_ctx, selection) = match res {
|
||||
@@ -1160,6 +1170,10 @@ impl ScanInput {
|
||||
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
|
||||
self.files.iter().map(|file| file.file_id()).collect()
|
||||
}
|
||||
|
||||
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
|
||||
self.files.iter().map(|file| file.index_id()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
|
||||
|
||||
@@ -617,17 +617,16 @@ impl MitoRegion {
|
||||
.map(|meta| {
|
||||
let region_id = self.region_id;
|
||||
let origin_region_id = meta.region_id;
|
||||
let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
|
||||
let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
|
||||
{
|
||||
let index_file_path =
|
||||
index_file_path(table_dir, meta.index_file_id(), path_type);
|
||||
let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
|
||||
(
|
||||
Some(meta.index_file_id().file_id().to_string()),
|
||||
meta.index_version,
|
||||
Some(index_file_path),
|
||||
Some(meta.index_file_size),
|
||||
)
|
||||
} else {
|
||||
(None, None, None)
|
||||
(0, None, None)
|
||||
};
|
||||
let visible = visible_ssts.contains(&meta.file_id);
|
||||
ManifestSstEntry {
|
||||
@@ -638,7 +637,7 @@ impl MitoRegion {
|
||||
region_group: region_id.region_group(),
|
||||
region_sequence: region_id.region_sequence(),
|
||||
file_id: meta.file_id.to_string(),
|
||||
index_file_id,
|
||||
index_version,
|
||||
level: meta.level,
|
||||
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
|
||||
file_size: meta.file_size,
|
||||
|
||||
@@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx;
|
||||
use crate::request::OptionOutputTx;
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
use crate::sst::FormatType;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
@@ -867,8 +867,8 @@ impl RegionLoadCacheTask {
|
||||
if file_meta.exists_index() {
|
||||
let puffin_key = IndexKey::new(
|
||||
file_meta.region_id,
|
||||
file_meta.index_file_id().file_id(),
|
||||
FileType::Puffin,
|
||||
file_meta.file_id,
|
||||
FileType::Puffin(file_meta.index_version),
|
||||
);
|
||||
|
||||
if !file_cache.contains_key(&puffin_key) {
|
||||
@@ -925,12 +925,18 @@ impl RegionLoadCacheTask {
|
||||
break;
|
||||
}
|
||||
|
||||
let index_remote_path = location::index_file_path(
|
||||
table_dir,
|
||||
let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
|
||||
version
|
||||
} else {
|
||||
unreachable!("`files_to_download` should only contains Puffin files");
|
||||
};
|
||||
let index_id = RegionIndexId::new(
|
||||
RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
|
||||
path_type,
|
||||
index_version,
|
||||
);
|
||||
|
||||
let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
|
||||
|
||||
match file_cache
|
||||
.download(puffin_key, &index_remote_path, object_store, file_size)
|
||||
.await
|
||||
|
||||
@@ -428,7 +428,7 @@ mod tests {
|
||||
available_indexes: SmallVec::new(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 100,
|
||||
num_row_groups: 1,
|
||||
sequence: NonZeroU64::new(1),
|
||||
|
||||
@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
|
||||
use smallvec::SmallVec;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::{ColumnId, FileId, RegionId};
|
||||
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
@@ -117,6 +117,41 @@ impl fmt::Display for RegionFileId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Unique identifier for an index file, combining the SST file ID and the index version.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct RegionIndexId {
|
||||
pub file_id: RegionFileId,
|
||||
pub version: IndexVersion,
|
||||
}
|
||||
|
||||
impl RegionIndexId {
|
||||
pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
|
||||
Self { file_id, version }
|
||||
}
|
||||
|
||||
pub fn region_id(&self) -> RegionId {
|
||||
self.file_id.region_id
|
||||
}
|
||||
|
||||
pub fn file_id(&self) -> FileId {
|
||||
self.file_id.file_id
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RegionIndexId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if self.version == 0 {
|
||||
write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
|
||||
} else {
|
||||
write!(
|
||||
f,
|
||||
"{}/{}.{}",
|
||||
self.file_id.region_id, self.file_id.file_id, self.version
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Time range (min and max timestamps) of a SST file.
|
||||
/// Both min and max are inclusive.
|
||||
pub type FileTimeRange = (Timestamp, Timestamp);
|
||||
@@ -159,12 +194,10 @@ pub struct FileMeta {
|
||||
pub indexes: Vec<ColumnIndexMetadata>,
|
||||
/// Size of the index file.
|
||||
pub index_file_size: u64,
|
||||
/// File ID of the index file.
|
||||
///
|
||||
/// When this field is None, it means the index file id is the same as the file id.
|
||||
/// Only meaningful when index_file_size > 0.
|
||||
/// Used for rebuilding index files.
|
||||
pub index_file_id: Option<FileId>,
|
||||
/// Version of the index file.
|
||||
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
|
||||
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
|
||||
pub index_version: IndexVersion,
|
||||
/// Number of rows in the file.
|
||||
///
|
||||
/// For historical reasons, this field might be missing in old files. Thus
|
||||
@@ -332,14 +365,9 @@ impl FileMeta {
|
||||
RegionFileId::new(self.region_id, self.file_id)
|
||||
}
|
||||
|
||||
/// Returns the cross-region index file id.
|
||||
/// If the index file id is not set, returns the file id.
|
||||
pub fn index_file_id(&self) -> RegionFileId {
|
||||
if let Some(index_file_id) = self.index_file_id {
|
||||
RegionFileId::new(self.region_id, index_file_id)
|
||||
} else {
|
||||
self.file_id()
|
||||
}
|
||||
/// Returns the RegionIndexId for this file.
|
||||
pub fn index_id(&self) -> RegionIndexId {
|
||||
RegionIndexId::new(self.file_id(), self.index_version)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,14 +404,9 @@ impl FileHandle {
|
||||
RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
|
||||
}
|
||||
|
||||
/// Returns the cross-region index file id.
|
||||
/// If the index file id is not set, returns the file id.
|
||||
pub fn index_file_id(&self) -> RegionFileId {
|
||||
if let Some(index_file_id) = self.inner.meta.index_file_id {
|
||||
RegionFileId::new(self.inner.meta.region_id, index_file_id)
|
||||
} else {
|
||||
self.file_id()
|
||||
}
|
||||
/// Returns the RegionIndexId for this file.
|
||||
pub fn index_id(&self) -> RegionIndexId {
|
||||
RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
|
||||
}
|
||||
|
||||
/// Returns the complete file path of the file.
|
||||
@@ -468,10 +491,15 @@ impl FileHandleInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete
|
||||
/// Delete files for a region.
|
||||
/// - `region_id`: Region id.
|
||||
/// - `file_ids`: List of (file id, index version) tuples to delete.
|
||||
/// - `delete_index`: Whether to delete the index file from the cache.
|
||||
/// - `access_layer`: Access layer to delete files.
|
||||
/// - `cache_manager`: Cache manager to remove files from cache.
|
||||
pub async fn delete_files(
|
||||
region_id: RegionId,
|
||||
file_ids: &[(FileId, FileId)],
|
||||
file_ids: &[(FileId, u64)],
|
||||
delete_index: bool,
|
||||
access_layer: &AccessLayerRef,
|
||||
cache_manager: &Option<CacheManagerRef>,
|
||||
@@ -484,12 +512,12 @@ pub async fn delete_files(
|
||||
}
|
||||
let mut deleted_files = Vec::with_capacity(file_ids.len());
|
||||
|
||||
for (file_id, index_file_id) in file_ids {
|
||||
for (file_id, index_version) in file_ids {
|
||||
let region_file_id = RegionFileId::new(region_id, *file_id);
|
||||
match access_layer
|
||||
.delete_sst(
|
||||
&RegionFileId::new(region_id, *file_id),
|
||||
&RegionFileId::new(region_id, *index_file_id),
|
||||
®ion_file_id,
|
||||
&RegionIndexId::new(region_file_id, *index_version),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -509,32 +537,90 @@ pub async fn delete_files(
|
||||
deleted_files
|
||||
);
|
||||
|
||||
for (file_id, index_file_id) in file_ids {
|
||||
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
|
||||
// Removes index file from the cache.
|
||||
if delete_index {
|
||||
write_cache
|
||||
.remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
|
||||
.await;
|
||||
}
|
||||
for (file_id, index_version) in file_ids {
|
||||
purge_index_write_cache_stager(
|
||||
region_id,
|
||||
delete_index,
|
||||
access_layer,
|
||||
cache_manager,
|
||||
file_id,
|
||||
index_version,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Remove the SST file from the cache.
|
||||
/// Delete index file for a given SST file&index version.
|
||||
pub async fn delete_index(
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
index_version: u64,
|
||||
access_layer: &AccessLayerRef,
|
||||
cache_manager: &Option<CacheManagerRef>,
|
||||
) -> crate::error::Result<()> {
|
||||
if let Err(err) = access_layer
|
||||
.delete_index(&RegionIndexId::new(
|
||||
RegionFileId::new(region_id, file_id),
|
||||
index_version,
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!(err; "Failed to delete index file for {}/{}.{}",
|
||||
region_id, file_id, index_version);
|
||||
}
|
||||
|
||||
purge_index_write_cache_stager(
|
||||
region_id,
|
||||
true,
|
||||
access_layer,
|
||||
cache_manager,
|
||||
&file_id,
|
||||
&index_version,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn purge_index_write_cache_stager(
|
||||
region_id: RegionId,
|
||||
delete_index: bool,
|
||||
access_layer: &Arc<crate::access_layer::AccessLayer>,
|
||||
cache_manager: &Option<Arc<crate::cache::CacheManager>>,
|
||||
file_id: &FileId,
|
||||
index_version: &u64,
|
||||
) {
|
||||
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
|
||||
// Removes index file from the cache.
|
||||
if delete_index {
|
||||
write_cache
|
||||
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
|
||||
.remove(IndexKey::new(
|
||||
region_id,
|
||||
*file_id,
|
||||
FileType::Puffin(*index_version),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Purges index content in the stager.
|
||||
if let Err(e) = access_layer
|
||||
.puffin_manager_factory()
|
||||
.purge_stager(RegionFileId::new(region_id, *index_file_id))
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
|
||||
index_file_id, region_id);
|
||||
}
|
||||
// Remove the SST file from the cache.
|
||||
write_cache
|
||||
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Purges index content in the stager.
|
||||
if let Err(e) = access_layer
|
||||
.puffin_manager_factory()
|
||||
.purge_stager(RegionIndexId::new(
|
||||
RegionFileId::new(region_id, *file_id),
|
||||
*index_version,
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
|
||||
file_id, index_version, region_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -563,7 +649,7 @@ mod tests {
|
||||
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
}],
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
@@ -614,7 +700,7 @@ mod tests {
|
||||
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
}],
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -132,7 +132,11 @@ impl FileReferenceManager {
|
||||
let region_id = file_meta.region_id;
|
||||
let mut is_new = false;
|
||||
{
|
||||
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
|
||||
let file_ref = FileRef::new(
|
||||
file_meta.region_id,
|
||||
file_meta.file_id,
|
||||
file_meta.index_version,
|
||||
);
|
||||
self.files_per_region
|
||||
.entry(region_id)
|
||||
.and_modify(|refs| {
|
||||
@@ -157,7 +161,7 @@ impl FileReferenceManager {
|
||||
/// If the reference count reaches zero, the file reference will be removed from the manager.
|
||||
pub fn remove_file(&self, file_meta: &FileMeta) {
|
||||
let region_id = file_meta.region_id;
|
||||
let file_ref = FileRef::new(region_id, file_meta.file_id);
|
||||
let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version);
|
||||
|
||||
let mut remove_table_entry = false;
|
||||
let mut remove_file_ref = false;
|
||||
@@ -230,7 +234,7 @@ mod tests {
|
||||
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
}],
|
||||
index_file_size: 4096,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 1024,
|
||||
num_row_groups: 1,
|
||||
sequence: NonZeroU64::new(4096),
|
||||
@@ -246,13 +250,13 @@ mod tests {
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
|
||||
);
|
||||
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
let expected_region_ref_manifest =
|
||||
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
|
||||
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id, 0)]);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
|
||||
@@ -265,7 +269,7 @@ mod tests {
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 2)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
@@ -281,7 +285,7 @@ mod tests {
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -61,7 +61,7 @@ use crate::request::{
|
||||
};
|
||||
use crate::schedule::scheduler::{Job, SchedulerRef};
|
||||
use crate::sst::file::{
|
||||
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId,
|
||||
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
|
||||
};
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
|
||||
@@ -81,6 +81,8 @@ pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
|
||||
pub struct IndexOutput {
|
||||
/// Size of the file.
|
||||
pub file_size: u64,
|
||||
/// Index version.
|
||||
pub version: u64,
|
||||
/// Inverted index output.
|
||||
pub inverted_index: InvertedIndexOutput,
|
||||
/// Fulltext index output.
|
||||
@@ -163,7 +165,9 @@ pub type BloomFilterOutput = IndexBaseOutput;
|
||||
pub struct Indexer {
|
||||
file_id: FileId,
|
||||
region_id: RegionId,
|
||||
index_version: u64,
|
||||
puffin_manager: Option<SstPuffinManager>,
|
||||
write_cache_enabled: bool,
|
||||
inverted_indexer: Option<InvertedIndexer>,
|
||||
last_mem_inverted_index: usize,
|
||||
fulltext_indexer: Option<FulltextIndexer>,
|
||||
@@ -236,7 +240,7 @@ impl Indexer {
|
||||
#[async_trait::async_trait]
|
||||
pub trait IndexerBuilder {
|
||||
/// Builds indexer of given file id to [index_file_path].
|
||||
async fn build(&self, file_id: FileId) -> Indexer;
|
||||
async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
|
||||
}
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct IndexerBuilderImpl {
|
||||
@@ -244,6 +248,7 @@ pub(crate) struct IndexerBuilderImpl {
|
||||
pub(crate) metadata: RegionMetadataRef,
|
||||
pub(crate) row_group_size: usize,
|
||||
pub(crate) puffin_manager: SstPuffinManager,
|
||||
pub(crate) write_cache_enabled: bool,
|
||||
pub(crate) intermediate_manager: IntermediateManager,
|
||||
pub(crate) index_options: IndexOptions,
|
||||
pub(crate) inverted_index_config: InvertedIndexConfig,
|
||||
@@ -254,10 +259,12 @@ pub(crate) struct IndexerBuilderImpl {
|
||||
#[async_trait::async_trait]
|
||||
impl IndexerBuilder for IndexerBuilderImpl {
|
||||
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
|
||||
async fn build(&self, file_id: FileId) -> Indexer {
|
||||
async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
|
||||
let mut indexer = Indexer {
|
||||
file_id,
|
||||
region_id: self.metadata.region_id,
|
||||
index_version,
|
||||
write_cache_enabled: self.write_cache_enabled,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -611,13 +618,20 @@ impl IndexBuildTask {
|
||||
&mut self,
|
||||
version_control: VersionControlRef,
|
||||
) -> Result<IndexBuildOutcome> {
|
||||
let index_file_id = if self.file_meta.index_file_size > 0 {
|
||||
// Generate new file ID if index file exists to avoid overwrite.
|
||||
FileId::random()
|
||||
// Determine the new index version
|
||||
let new_index_version = if self.file_meta.index_file_size > 0 {
|
||||
// Increment version if index file exists to avoid overwrite.
|
||||
self.file_meta.index_version + 1
|
||||
} else {
|
||||
self.file_meta.file_id
|
||||
0 // Default version for new index files
|
||||
};
|
||||
let mut indexer = self.indexer_builder.build(index_file_id).await;
|
||||
|
||||
// Use the same file_id but with new version for index file
|
||||
let index_file_id = self.file_meta.file_id;
|
||||
let mut indexer = self
|
||||
.indexer_builder
|
||||
.build(index_file_id, new_index_version)
|
||||
.await;
|
||||
|
||||
// Check SST file existence before building index to avoid failure of parquet reader.
|
||||
if !self.check_sst_file_exists(&version_control).await {
|
||||
@@ -677,10 +691,10 @@ impl IndexBuildTask {
|
||||
}
|
||||
|
||||
// Upload index file if write cache is enabled.
|
||||
self.maybe_upload_index_file(index_output.clone(), index_file_id)
|
||||
self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
|
||||
.await?;
|
||||
|
||||
let worker_request = match self.update_manifest(index_output, index_file_id).await {
|
||||
let worker_request = match self.update_manifest(index_output, new_index_version).await {
|
||||
Ok(edit) => {
|
||||
let index_build_finished = IndexBuildFinished {
|
||||
region_id: self.file_meta.region_id,
|
||||
@@ -712,6 +726,7 @@ impl IndexBuildTask {
|
||||
&self,
|
||||
output: IndexOutput,
|
||||
index_file_id: FileId,
|
||||
index_version: u64,
|
||||
) -> Result<()> {
|
||||
if let Some(write_cache) = &self.write_cache {
|
||||
let file_id = self.file_meta.file_id;
|
||||
@@ -719,12 +734,14 @@ impl IndexBuildTask {
|
||||
let remote_store = self.access_layer.object_store();
|
||||
let mut upload_tracker = UploadTracker::new(region_id);
|
||||
let mut err = None;
|
||||
let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
|
||||
let puffin_key =
|
||||
IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
|
||||
let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
|
||||
let puffin_path = RegionFilePathFactory::new(
|
||||
self.access_layer.table_dir().to_string(),
|
||||
self.access_layer.path_type(),
|
||||
)
|
||||
.build_index_file_path(RegionFileId::new(region_id, file_id));
|
||||
.build_index_file_path_with_version(index_id);
|
||||
if let Err(e) = write_cache
|
||||
.upload(puffin_key, &puffin_path, remote_store)
|
||||
.await
|
||||
@@ -756,12 +773,13 @@ impl IndexBuildTask {
|
||||
async fn update_manifest(
|
||||
&mut self,
|
||||
output: IndexOutput,
|
||||
index_file_id: FileId,
|
||||
new_index_version: u64,
|
||||
) -> Result<RegionEdit> {
|
||||
self.file_meta.available_indexes = output.build_available_indexes();
|
||||
self.file_meta.indexes = output.build_indexes();
|
||||
self.file_meta.index_file_size = output.file_size;
|
||||
self.file_meta.index_file_id = Some(index_file_id);
|
||||
let old_index_version = self.file_meta.index_version;
|
||||
self.file_meta.index_version = new_index_version;
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![self.file_meta.clone()],
|
||||
files_to_remove: vec![],
|
||||
@@ -784,6 +802,11 @@ impl IndexBuildTask {
|
||||
self.file_meta.region_id,
|
||||
self.reason.as_str()
|
||||
);
|
||||
// notify the file purger to remove the old index files if any
|
||||
if new_index_version > 0 {
|
||||
self.file_purger
|
||||
.update_index(self.file_meta.clone(), old_index_version);
|
||||
}
|
||||
Ok(edit)
|
||||
}
|
||||
}
|
||||
@@ -1163,6 +1186,10 @@ mod tests {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
|
||||
unreachable!()
|
||||
}
|
||||
@@ -1236,6 +1263,7 @@ mod tests {
|
||||
metadata,
|
||||
row_group_size: 1024,
|
||||
puffin_manager,
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager,
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
@@ -1260,13 +1288,14 @@ mod tests {
|
||||
metadata,
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager,
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_some());
|
||||
@@ -1290,6 +1319,7 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager.clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig {
|
||||
@@ -1299,7 +1329,7 @@ mod tests {
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_none());
|
||||
@@ -1311,6 +1341,7 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager.clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
@@ -1320,7 +1351,7 @@ mod tests {
|
||||
},
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_some());
|
||||
@@ -1332,6 +1363,7 @@ mod tests {
|
||||
metadata,
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager,
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
@@ -1341,7 +1373,7 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_some());
|
||||
@@ -1365,13 +1397,14 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager.clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_none());
|
||||
@@ -1388,13 +1421,14 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager.clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_some());
|
||||
@@ -1411,13 +1445,14 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager,
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_some());
|
||||
@@ -1441,13 +1476,14 @@ mod tests {
|
||||
metadata,
|
||||
row_group_size: 0,
|
||||
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager,
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
}
|
||||
.build(FileId::random())
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.inverted_indexer.is_none());
|
||||
@@ -1619,7 +1655,7 @@ mod tests {
|
||||
|
||||
let puffin_path = location::index_file_path(
|
||||
env.access_layer.table_dir(),
|
||||
RegionFileId::new(region_id, file_meta.file_id),
|
||||
RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
|
||||
env.access_layer.path_type(),
|
||||
);
|
||||
|
||||
@@ -1761,6 +1797,7 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size: 1024,
|
||||
puffin_manager: write_cache.build_puffin_manager().clone(),
|
||||
write_cache_enabled: true,
|
||||
intermediate_manager: write_cache.intermediate_manager().clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
@@ -1812,7 +1849,11 @@ mod tests {
|
||||
}
|
||||
|
||||
// The write cache should contain the uploaded index file.
|
||||
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
|
||||
let index_key = IndexKey::new(
|
||||
region_id,
|
||||
file_meta.file_id,
|
||||
FileType::Puffin(sst_info.index_metadata.version),
|
||||
);
|
||||
assert!(write_cache.file_cache().contains_key(&index_key));
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::error::{
|
||||
Result,
|
||||
};
|
||||
use crate::metrics::INDEX_APPLY_ELAPSED;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::RegionIndexId;
|
||||
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
|
||||
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
|
||||
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
|
||||
@@ -200,7 +200,7 @@ impl BloomFilterIndexApplier {
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
row_groups: impl Iterator<Item = (usize, bool)>,
|
||||
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
|
||||
@@ -242,6 +242,7 @@ impl BloomFilterIndexApplier {
|
||||
}
|
||||
let reader = CachedBloomFilterIndexBlobReader::new(
|
||||
file_id.file_id(),
|
||||
file_id.version,
|
||||
*column_id,
|
||||
Tag::Skipping,
|
||||
blob_size,
|
||||
@@ -286,7 +287,7 @@ impl BloomFilterIndexApplier {
|
||||
/// Returus `None` if the column does not have an index.
|
||||
async fn blob_reader(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
column_id: ColumnId,
|
||||
file_size_hint: Option<u64>,
|
||||
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
|
||||
@@ -328,7 +329,7 @@ impl BloomFilterIndexApplier {
|
||||
/// Creates a blob reader from the cached index file
|
||||
async fn cached_blob_reader(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
column_id: ColumnId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<Option<BlobReader>> {
|
||||
@@ -336,7 +337,11 @@ impl BloomFilterIndexApplier {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
|
||||
let index_key = IndexKey::new(
|
||||
file_id.region_id(),
|
||||
file_id.file_id(),
|
||||
FileType::Puffin(file_id.version),
|
||||
);
|
||||
if file_cache.get(index_key).await.is_none() {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -369,7 +374,7 @@ impl BloomFilterIndexApplier {
|
||||
/// Creates a blob reader from the remote index file
|
||||
async fn remote_blob_reader(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
column_id: ColumnId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<BlobReader> {
|
||||
@@ -446,6 +451,7 @@ mod tests {
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
|
||||
use crate::sst::index::bloom_filter::creator::tests::{
|
||||
mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
|
||||
@@ -457,7 +463,7 @@ mod tests {
|
||||
object_store: ObjectStore,
|
||||
metadata: &RegionMetadata,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
|
||||
+ use<'_> {
|
||||
move |exprs, row_groups| {
|
||||
@@ -514,6 +520,7 @@ mod tests {
|
||||
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
|
||||
let memory_usage_threshold = Some(1024);
|
||||
let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
|
||||
let file_id = RegionIndexId::new(file_id, 0);
|
||||
let table_dir = "table_dir".to_string();
|
||||
|
||||
let mut indexer = BloomFilterIndexer::new(
|
||||
|
||||
@@ -481,7 +481,7 @@ pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::access_layer::FilePathProvider;
|
||||
use crate::read::BatchColumn;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
pub fn mock_object_store() -> ObjectStore {
|
||||
@@ -499,6 +499,10 @@ pub(crate) mod tests {
|
||||
file_id.file_id().to_string()
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
|
||||
index_id.file_id.file_id().to_string()
|
||||
}
|
||||
|
||||
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
|
||||
file_id.file_id().to_string()
|
||||
}
|
||||
@@ -621,6 +625,7 @@ pub(crate) mod tests {
|
||||
let puffin_manager = factory.build(object_store, TestPathProvider);
|
||||
|
||||
let file_id = RegionFileId::new(region_metadata.region_id, file_id);
|
||||
let file_id = RegionIndexId::new(file_id, 0);
|
||||
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
|
||||
let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
|
||||
assert_eq!(row_count, 20);
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::error::{
|
||||
PuffinReadBlobSnafu, Result,
|
||||
};
|
||||
use crate::metrics::INDEX_APPLY_ELAPSED;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::RegionIndexId;
|
||||
use crate::sst::index::TYPE_FULLTEXT_INDEX;
|
||||
use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
|
||||
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
|
||||
@@ -221,7 +221,7 @@ impl FulltextIndexApplier {
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply_fine(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<Option<BTreeSet<RowId>>> {
|
||||
@@ -275,7 +275,7 @@ impl FulltextIndexApplier {
|
||||
async fn apply_fine_one_column(
|
||||
&self,
|
||||
file_size_hint: Option<u64>,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
column_id: ColumnId,
|
||||
request: &FulltextRequest,
|
||||
metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
@@ -356,7 +356,7 @@ impl FulltextIndexApplier {
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply_coarse(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
row_groups: impl Iterator<Item = (usize, bool)>,
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
@@ -405,7 +405,7 @@ impl FulltextIndexApplier {
|
||||
|
||||
async fn apply_coarse_one_column(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
column_id: ColumnId,
|
||||
terms: &[FulltextTerm],
|
||||
@@ -440,6 +440,7 @@ impl FulltextIndexApplier {
|
||||
.content_length;
|
||||
let reader = CachedBloomFilterIndexBlobReader::new(
|
||||
file_id.file_id(),
|
||||
file_id.version,
|
||||
column_id,
|
||||
Tag::Fulltext,
|
||||
blob_size,
|
||||
@@ -611,7 +612,7 @@ impl IndexSource {
|
||||
/// Returns `None` if the blob is not found.
|
||||
async fn blob(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
key: &str,
|
||||
file_size_hint: Option<u64>,
|
||||
metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
@@ -649,7 +650,7 @@ impl IndexSource {
|
||||
/// Returns `None` if the directory is not found.
|
||||
async fn dir(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
key: &str,
|
||||
file_size_hint: Option<u64>,
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
@@ -699,7 +700,7 @@ impl IndexSource {
|
||||
/// Return reader and whether it is fallbacked to remote store.
|
||||
async fn ensure_reader(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<(SstPuffinReader, bool)> {
|
||||
match self.build_local_cache(file_id, file_size_hint).await {
|
||||
@@ -711,14 +712,18 @@ impl IndexSource {
|
||||
|
||||
async fn build_local_cache(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<Option<SstPuffinReader>> {
|
||||
let Some(file_cache) = &self.file_cache else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
|
||||
let index_key = IndexKey::new(
|
||||
file_id.region_id(),
|
||||
file_id.file_id(),
|
||||
FileType::Puffin(file_id.version),
|
||||
);
|
||||
if file_cache.get(index_key).await.is_none() {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -740,7 +745,7 @@ impl IndexSource {
|
||||
|
||||
async fn build_remote(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<SstPuffinReader> {
|
||||
let puffin_manager = self
|
||||
|
||||
@@ -481,7 +481,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::access_layer::RegionFilePathFactory;
|
||||
use crate::read::{Batch, BatchColumn};
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
|
||||
use crate::sst::index::fulltext_index::applier::builder::{
|
||||
FulltextQuery, FulltextRequest, FulltextTerm,
|
||||
@@ -672,7 +672,8 @@ mod tests {
|
||||
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
|
||||
);
|
||||
let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
|
||||
let mut writer = puffin_manager.writer(®ion_file_id).await.unwrap();
|
||||
let index_id = RegionIndexId::new(region_file_id, 0);
|
||||
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
|
||||
let _ = indexer.finish(&mut writer).await.unwrap();
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
@@ -723,16 +724,15 @@ mod tests {
|
||||
let backend = backend.clone();
|
||||
async move {
|
||||
match backend {
|
||||
FulltextBackend::Tantivy => applier
|
||||
.apply_fine(region_file_id, None, None)
|
||||
.await
|
||||
.unwrap(),
|
||||
FulltextBackend::Tantivy => {
|
||||
applier.apply_fine(index_id, None, None).await.unwrap()
|
||||
}
|
||||
FulltextBackend::Bloom => {
|
||||
let coarse_mask = coarse_mask.unwrap_or_default();
|
||||
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
|
||||
// row group id == row id
|
||||
let resp = applier
|
||||
.apply_coarse(region_file_id, None, row_groups, None)
|
||||
.apply_coarse(index_id, None, row_groups, None)
|
||||
.await
|
||||
.unwrap();
|
||||
resp.map(|r| {
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
use common_telemetry::warn;
|
||||
|
||||
use crate::access_layer::TempFileCleaner;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::Indexer;
|
||||
|
||||
impl Indexer {
|
||||
@@ -22,6 +24,9 @@ impl Indexer {
|
||||
self.do_abort_fulltext_index().await;
|
||||
self.do_abort_bloom_filter().await;
|
||||
self.do_prune_intm_sst_dir().await;
|
||||
if self.write_cache_enabled {
|
||||
self.do_abort_clean_fs_temp_dir().await;
|
||||
}
|
||||
self.puffin_manager = None;
|
||||
}
|
||||
|
||||
@@ -87,4 +92,18 @@ impl Indexer {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_abort_clean_fs_temp_dir(&mut self) {
|
||||
let Some(puffin_manager) = &self.puffin_manager else {
|
||||
return;
|
||||
};
|
||||
let fs_accessor = puffin_manager.file_accessor();
|
||||
|
||||
let fs_handle = RegionIndexId::new(
|
||||
RegionFileId::new(self.region_id, self.file_id),
|
||||
self.index_version,
|
||||
)
|
||||
.to_string();
|
||||
TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ use common_telemetry::{debug, warn};
|
||||
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::puffin_manager::SstPuffinWriter;
|
||||
use crate::sst::index::statistics::{ByteCount, RowCount};
|
||||
use crate::sst::index::{
|
||||
@@ -56,14 +56,18 @@ impl Indexer {
|
||||
|
||||
self.do_prune_intm_sst_dir().await;
|
||||
output.file_size = self.do_finish_puffin_writer(writer).await;
|
||||
output.version = self.index_version;
|
||||
output
|
||||
}
|
||||
|
||||
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
|
||||
let puffin_manager = self.puffin_manager.take()?;
|
||||
let puffin_manager = self.puffin_manager.clone()?;
|
||||
|
||||
let err = match puffin_manager
|
||||
.writer(&RegionFileId::new(self.region_id, self.file_id))
|
||||
.writer(&RegionIndexId::new(
|
||||
RegionFileId::new(self.region_id, self.file_id),
|
||||
self.index_version,
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(writer) => return Some(writer),
|
||||
|
||||
@@ -40,7 +40,7 @@ use crate::error::{
|
||||
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
|
||||
};
|
||||
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::RegionIndexId;
|
||||
use crate::sst::index::TYPE_INVERTED_INDEX;
|
||||
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
|
||||
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
|
||||
@@ -194,7 +194,7 @@ impl InvertedIndexApplier {
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
|
||||
) -> Result<ApplyOutput> {
|
||||
@@ -222,6 +222,7 @@ impl InvertedIndexApplier {
|
||||
let result = if let Some(index_cache) = &self.inverted_index_cache {
|
||||
let mut index_reader = CachedInvertedIndexBlobReader::new(
|
||||
file_id.file_id(),
|
||||
file_id.version,
|
||||
blob_size,
|
||||
InvertedIndexBlobReader::new(blob),
|
||||
index_cache.clone(),
|
||||
@@ -268,14 +269,18 @@ impl InvertedIndexApplier {
|
||||
/// Creates a blob reader from the cached index file.
|
||||
async fn cached_blob_reader(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<Option<BlobReader>> {
|
||||
let Some(file_cache) = &self.file_cache else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
|
||||
let index_key = IndexKey::new(
|
||||
file_id.region_id(),
|
||||
file_id.file_id(),
|
||||
FileType::Puffin(file_id.version),
|
||||
);
|
||||
if file_cache.get(index_key).await.is_none() {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -303,7 +308,7 @@ impl InvertedIndexApplier {
|
||||
/// Creates a blob reader from the remote index file.
|
||||
async fn remote_blob_reader(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<BlobReader> {
|
||||
let puffin_manager = self
|
||||
@@ -349,6 +354,7 @@ mod tests {
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::index::RegionFileId;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_index_applier_apply_basic() {
|
||||
@@ -356,13 +362,14 @@ mod tests {
|
||||
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let file_id = RegionFileId::new(0.into(), FileId::random());
|
||||
let index_id = RegionIndexId::new(file_id, 0);
|
||||
let table_dir = "table_dir".to_string();
|
||||
|
||||
let puffin_manager = puffin_manager_factory.build(
|
||||
object_store.clone(),
|
||||
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
|
||||
);
|
||||
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
|
||||
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
|
||||
writer
|
||||
.put_blob(
|
||||
INDEX_BLOB_TYPE,
|
||||
@@ -392,7 +399,7 @@ mod tests {
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
|
||||
let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
|
||||
assert_eq!(
|
||||
output,
|
||||
ApplyOutput {
|
||||
@@ -410,13 +417,14 @@ mod tests {
|
||||
.await;
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let file_id = RegionFileId::new(0.into(), FileId::random());
|
||||
let index_id = RegionIndexId::new(file_id, 0);
|
||||
let table_dir = "table_dir".to_string();
|
||||
|
||||
let puffin_manager = puffin_manager_factory.build(
|
||||
object_store.clone(),
|
||||
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
|
||||
);
|
||||
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
|
||||
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
|
||||
writer
|
||||
.put_blob(
|
||||
"invalid_blob_type",
|
||||
@@ -440,7 +448,7 @@ mod tests {
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let res = sst_index_applier.apply(file_id, None, None).await;
|
||||
let res = sst_index_applier.apply(index_id, None, None).await;
|
||||
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -466,7 +466,7 @@ mod tests {
|
||||
use crate::cache::index::inverted_index::InvertedIndexCache;
|
||||
use crate::metrics::CACHE_BYTES;
|
||||
use crate::read::BatchColumn;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
@@ -591,7 +591,8 @@ mod tests {
|
||||
);
|
||||
|
||||
let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
|
||||
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
|
||||
let index_id = RegionIndexId::new(sst_file_id, 0);
|
||||
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
|
||||
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
|
||||
assert_eq!(row_count, rows.len() * segment_row_count);
|
||||
writer.finish().await.unwrap();
|
||||
@@ -615,7 +616,7 @@ mod tests {
|
||||
.unwrap();
|
||||
Box::pin(async move {
|
||||
applier
|
||||
.apply(sst_file_id, None, None)
|
||||
.apply(index_id, None, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.matched_segment_ids
|
||||
|
||||
@@ -32,14 +32,14 @@ use crate::metrics::{
|
||||
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
|
||||
INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, StagerMetrics,
|
||||
};
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::RegionIndexId;
|
||||
use crate::sst::index::store::{self, InstrumentedStore};
|
||||
|
||||
type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
|
||||
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
|
||||
|
||||
pub(crate) type SstPuffinManager =
|
||||
FsPuffinManager<Arc<BoundedStager<RegionFileId>>, ObjectStorePuffinFileAccessor>;
|
||||
FsPuffinManager<Arc<BoundedStager<RegionIndexId>>, ObjectStorePuffinFileAccessor>;
|
||||
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
|
||||
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
|
||||
pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
|
||||
@@ -52,7 +52,7 @@ const STAGING_DIR: &str = "staging";
|
||||
#[derive(Clone)]
|
||||
pub struct PuffinManagerFactory {
|
||||
/// The stager used by the puffin manager.
|
||||
stager: Arc<BoundedStager<RegionFileId>>,
|
||||
stager: Arc<BoundedStager<RegionIndexId>>,
|
||||
|
||||
/// The size of the write buffer used to create object store.
|
||||
write_buffer_size: Option<usize>,
|
||||
@@ -92,7 +92,7 @@ impl PuffinManagerFactory {
|
||||
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
|
||||
}
|
||||
|
||||
pub(crate) async fn purge_stager(&self, file_id: RegionFileId) -> Result<()> {
|
||||
pub(crate) async fn purge_stager(&self, file_id: RegionIndexId) -> Result<()> {
|
||||
self.stager
|
||||
.purge(&file_id)
|
||||
.await
|
||||
@@ -136,16 +136,22 @@ impl ObjectStorePuffinFileAccessor {
|
||||
path_provider,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn store(&self) -> &InstrumentedStore {
|
||||
&self.object_store
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
|
||||
type Reader = InstrumentedRangeReader;
|
||||
type Writer = InstrumentedAsyncWrite;
|
||||
type FileHandle = RegionFileId;
|
||||
type FileHandle = RegionIndexId;
|
||||
|
||||
async fn reader(&self, handle: &RegionFileId) -> PuffinResult<Self::Reader> {
|
||||
let file_path = self.path_provider.build_index_file_path(*handle);
|
||||
async fn reader(&self, handle: &RegionIndexId) -> PuffinResult<Self::Reader> {
|
||||
let file_path = self
|
||||
.path_provider
|
||||
.build_index_file_path_with_version(*handle);
|
||||
self.object_store
|
||||
.range_reader(
|
||||
&file_path,
|
||||
@@ -157,8 +163,10 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
|
||||
.context(puffin_error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn writer(&self, handle: &RegionFileId) -> PuffinResult<Self::Writer> {
|
||||
let file_path = self.path_provider.build_index_file_path(*handle);
|
||||
async fn writer(&self, handle: &RegionIndexId) -> PuffinResult<Self::Writer> {
|
||||
let file_path = self
|
||||
.path_provider
|
||||
.build_index_file_path_with_version(*handle);
|
||||
self.object_store
|
||||
.writer(
|
||||
&file_path,
|
||||
@@ -184,7 +192,7 @@ mod tests {
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
|
||||
struct TestFilePathProvider;
|
||||
|
||||
@@ -193,6 +201,10 @@ mod tests {
|
||||
file_id.file_id().to_string()
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
|
||||
index_id.file_id.file_id().to_string()
|
||||
}
|
||||
|
||||
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
|
||||
file_id.file_id().to_string()
|
||||
}
|
||||
@@ -206,7 +218,7 @@ mod tests {
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let manager = factory.build(object_store, TestFilePathProvider);
|
||||
|
||||
let file_id = RegionFileId::new(0.into(), FileId::random());
|
||||
let file_id = RegionIndexId::new(RegionFileId::new(0.into(), FileId::random()), 0);
|
||||
let blob_key = "blob-key";
|
||||
let dir_key = "dir-key";
|
||||
let raw_data = b"hello world!";
|
||||
|
||||
@@ -49,6 +49,10 @@ impl InstrumentedStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn store(&self) -> &ObjectStore {
|
||||
&self.object_store
|
||||
}
|
||||
|
||||
/// Set the size of the write buffer.
|
||||
pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
|
||||
self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);
|
||||
|
||||
@@ -20,7 +20,7 @@ use store_api::region_request::PathType;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
|
||||
use crate::error::UnexpectedSnafu;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
|
||||
/// Generate region dir from table_dir, region_id and path_type
|
||||
pub fn region_dir_from_table_dir(
|
||||
@@ -46,14 +46,68 @@ pub fn sst_file_path(table_dir: &str, region_file_id: RegionFileId, path_type: P
|
||||
)
|
||||
}
|
||||
|
||||
pub fn index_file_path(
|
||||
pub fn index_file_path(table_dir: &str, index_id: RegionIndexId, path_type: PathType) -> String {
|
||||
let region_dir = region_dir_from_table_dir(table_dir, index_id.file_id.region_id(), path_type);
|
||||
let index_dir = util::join_dir(®ion_dir, "index");
|
||||
|
||||
let filename = if index_id.version == 0 {
|
||||
format!("{}.puffin", index_id.file_id.file_id())
|
||||
} else {
|
||||
format!("{}.{}.puffin", index_id.file_id.file_id(), index_id.version)
|
||||
};
|
||||
|
||||
util::join_path(&index_dir, &filename)
|
||||
}
|
||||
|
||||
/// Legacy function for backward compatibility - creates index file path using RegionFileId with version 0
|
||||
pub fn index_file_path_legacy(
|
||||
table_dir: &str,
|
||||
region_file_id: RegionFileId,
|
||||
path_type: PathType,
|
||||
) -> String {
|
||||
let region_dir = region_dir_from_table_dir(table_dir, region_file_id.region_id(), path_type);
|
||||
let index_dir = util::join_dir(®ion_dir, "index");
|
||||
util::join_path(&index_dir, &format!("{}.puffin", region_file_id.file_id()))
|
||||
let index_id = RegionIndexId::new(region_file_id, 0);
|
||||
index_file_path(table_dir, index_id, path_type)
|
||||
}
|
||||
|
||||
/// Parse file ID and version from index filename
|
||||
pub fn parse_index_file_info(filepath: &str) -> crate::error::Result<(FileId, u64)> {
|
||||
let filename = filepath.rsplit('/').next().context(UnexpectedSnafu {
|
||||
reason: format!("invalid file path: {}", filepath),
|
||||
})?;
|
||||
let parts: Vec<&str> = filename.split('.').collect();
|
||||
|
||||
if parts.len() == 2 && parts[1] == "puffin" {
|
||||
// Legacy format: {file_id}.puffin (version 0)
|
||||
let file_id = parts[0];
|
||||
FileId::parse_str(file_id).map(|id| (id, 0)).map_err(|e| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("invalid file id: {}, err: {}", file_id, e),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
} else if parts.len() == 3 && parts[2] == "puffin" {
|
||||
// New format: {file_id}.{version}.puffin
|
||||
let file_id = parts[0];
|
||||
let version = parts[1].parse::<u64>().map_err(|_| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("invalid version in file name: {}", filename),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
FileId::parse_str(file_id)
|
||||
.map(|id| (id, version))
|
||||
.map_err(|e| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("invalid file id: {}, err: {}", file_id, e),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
} else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("invalid index file name: {}", filename),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
|
||||
/// Get RegionFileId from sst or index filename
|
||||
@@ -111,17 +165,59 @@ mod tests {
|
||||
fn test_index_file_path() {
|
||||
let file_id = FileId::random();
|
||||
let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id);
|
||||
let index_id = RegionIndexId::new(region_file_id, 0);
|
||||
assert_eq!(
|
||||
index_file_path("table_dir", region_file_id, PathType::Bare),
|
||||
index_file_path("table_dir", index_id, PathType::Bare),
|
||||
format!("table_dir/1_0000000002/index/{}.puffin", file_id)
|
||||
);
|
||||
assert_eq!(
|
||||
index_file_path("table_dir", region_file_id, PathType::Data),
|
||||
index_file_path("table_dir", index_id, PathType::Data),
|
||||
format!("table_dir/1_0000000002/data/index/{}.puffin", file_id)
|
||||
);
|
||||
assert_eq!(
|
||||
index_file_path("table_dir", region_file_id, PathType::Metadata),
|
||||
index_file_path("table_dir", index_id, PathType::Metadata),
|
||||
format!("table_dir/1_0000000002/metadata/index/{}.puffin", file_id)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_file_path_versioned() {
|
||||
let file_id = FileId::random();
|
||||
let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id);
|
||||
let index_id_v1 = RegionIndexId::new(region_file_id, 1);
|
||||
let index_id_v2 = RegionIndexId::new(region_file_id, 2);
|
||||
|
||||
assert_eq!(
|
||||
index_file_path("table_dir", index_id_v1, PathType::Bare),
|
||||
format!("table_dir/1_0000000002/index/{}.1.puffin", file_id)
|
||||
);
|
||||
assert_eq!(
|
||||
index_file_path("table_dir", index_id_v2, PathType::Bare),
|
||||
format!("table_dir/1_0000000002/index/{}.2.puffin", file_id)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_index_file_info() {
|
||||
// Test legacy format
|
||||
let file_id = FileId::random();
|
||||
let result =
|
||||
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.puffin"))
|
||||
.unwrap();
|
||||
assert_eq!(result.0.to_string(), file_id.to_string());
|
||||
assert_eq!(result.1, 0);
|
||||
|
||||
// Test versioned format
|
||||
let result =
|
||||
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.1.puffin"))
|
||||
.unwrap();
|
||||
assert_eq!(result.0.to_string(), file_id.to_string());
|
||||
assert_eq!(result.1, 1);
|
||||
|
||||
let result =
|
||||
parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.42.puffin"))
|
||||
.unwrap();
|
||||
assert_eq!(result.0.to_string(), file_id.to_string());
|
||||
assert_eq!(result.1, 42);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ mod tests {
|
||||
use crate::config::IndexConfig;
|
||||
use crate::read::{BatchBuilder, BatchReader, FlatSource};
|
||||
use crate::region::options::{IndexOptions, InvertedIndexOptions};
|
||||
use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
|
||||
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
|
||||
use crate::sst::file_purger::NoopFilePurger;
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
|
||||
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
|
||||
@@ -144,7 +144,11 @@ mod tests {
|
||||
|
||||
impl FilePathProvider for FixedPathProvider {
|
||||
fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
|
||||
location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
|
||||
location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
|
||||
location::index_file_path(FILE_DIR, index_id, PathType::Bare)
|
||||
}
|
||||
|
||||
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
|
||||
@@ -156,7 +160,7 @@ mod tests {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IndexerBuilder for NoopIndexBuilder {
|
||||
async fn build(&self, _file_id: FileId) -> Indexer {
|
||||
async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
|
||||
Indexer::default()
|
||||
}
|
||||
}
|
||||
@@ -711,6 +715,7 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size,
|
||||
puffin_manager,
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager,
|
||||
index_options: IndexOptions {
|
||||
inverted_index: InvertedIndexOptions {
|
||||
@@ -769,7 +774,7 @@ mod tests {
|
||||
available_indexes: info.index_metadata.build_available_indexes(),
|
||||
indexes: info.index_metadata.build_indexes(),
|
||||
index_file_size: info.index_metadata.file_size,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_row_groups: info.num_row_groups,
|
||||
num_rows: info.num_rows as u64,
|
||||
sequence: None,
|
||||
@@ -1090,6 +1095,7 @@ mod tests {
|
||||
metadata: metadata.clone(),
|
||||
row_group_size,
|
||||
puffin_manager,
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager,
|
||||
index_options: IndexOptions {
|
||||
inverted_index: InvertedIndexOptions {
|
||||
|
||||
@@ -40,7 +40,10 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::vectors::{Helper, Vector};
|
||||
use mito_codec::row_converter::{SortField, build_primary_key_codec_with_fields};
|
||||
use mito_codec::row_converter::{
|
||||
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
|
||||
build_primary_key_codec_with_fields,
|
||||
};
|
||||
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
|
||||
use parquet::file::statistics::Statistics;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
@@ -48,7 +51,8 @@ use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
|
||||
use crate::error::{
|
||||
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
|
||||
ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
|
||||
NewRecordBatchSnafu, Result,
|
||||
};
|
||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::sst::file::{FileMeta, FileTimeRange};
|
||||
@@ -386,6 +390,13 @@ impl ReadFormat {
|
||||
}
|
||||
}
|
||||
|
||||
/// Enables or disables eager decoding of primary key values into batches.
|
||||
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
|
||||
if let ReadFormat::PrimaryKey(format) = self {
|
||||
format.set_decode_primary_key_values(decode);
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a sequence array to override.
|
||||
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
|
||||
match self {
|
||||
@@ -411,6 +422,8 @@ pub struct PrimaryKeyReadFormat {
|
||||
field_id_to_projected_index: HashMap<ColumnId, usize>,
|
||||
/// Sequence number to override the sequence read from the SST.
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
/// Codec used to decode primary key values if eager decoding is enabled.
|
||||
primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
|
||||
}
|
||||
|
||||
impl PrimaryKeyReadFormat {
|
||||
@@ -439,6 +452,7 @@ impl PrimaryKeyReadFormat {
|
||||
projection_indices: format_projection.projection_indices,
|
||||
field_id_to_projected_index: format_projection.column_id_to_projected_index,
|
||||
override_sequence: None,
|
||||
primary_key_codec: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,6 +461,15 @@ impl PrimaryKeyReadFormat {
|
||||
self.override_sequence = sequence;
|
||||
}
|
||||
|
||||
/// Enables or disables eager decoding of primary key values into batches.
|
||||
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
|
||||
self.primary_key_codec = if decode {
|
||||
Some(build_primary_key_codec(&self.metadata))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
/// Gets the arrow schema of the SST file.
|
||||
///
|
||||
/// This schema is computed from the region metadata but should be the same
|
||||
@@ -561,7 +584,12 @@ impl PrimaryKeyReadFormat {
|
||||
});
|
||||
}
|
||||
|
||||
let batch = builder.build()?;
|
||||
let mut batch = builder.build()?;
|
||||
if let Some(codec) = &self.primary_key_codec {
|
||||
let pk_values: CompositeValues =
|
||||
codec.decode(batch.primary_key()).context(DecodeSnafu)?;
|
||||
batch.set_pk_values(pk_values);
|
||||
}
|
||||
batches.push_back(batch);
|
||||
}
|
||||
|
||||
|
||||
@@ -127,6 +127,8 @@ pub struct ParquetReaderBuilder {
|
||||
compaction: bool,
|
||||
/// Mode to pre-filter columns.
|
||||
pre_filter_mode: PreFilterMode,
|
||||
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
|
||||
decode_primary_key_values: bool,
|
||||
}
|
||||
|
||||
impl ParquetReaderBuilder {
|
||||
@@ -152,6 +154,7 @@ impl ParquetReaderBuilder {
|
||||
flat_format: false,
|
||||
compaction: false,
|
||||
pre_filter_mode: PreFilterMode::All,
|
||||
decode_primary_key_values: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,6 +239,13 @@ impl ParquetReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Decodes primary key values eagerly when reading primary key format SSTs.
|
||||
#[must_use]
|
||||
pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
|
||||
self.decode_primary_key_values = decode;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a [ParquetReader].
|
||||
///
|
||||
/// This needs to perform IO operation.
|
||||
@@ -292,6 +302,9 @@ impl ParquetReaderBuilder {
|
||||
self.compaction,
|
||||
)?
|
||||
};
|
||||
if self.decode_primary_key_values {
|
||||
read_format.set_decode_primary_key_values(true);
|
||||
}
|
||||
if need_override_sequence(&parquet_meta) {
|
||||
read_format
|
||||
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
|
||||
@@ -545,7 +558,7 @@ impl ParquetReaderBuilder {
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = index_applier
|
||||
.apply_fine(
|
||||
self.file_handle.file_id(),
|
||||
self.file_handle.index_id(),
|
||||
Some(file_size_hint),
|
||||
metrics.fulltext_index_apply_metrics.as_mut(),
|
||||
)
|
||||
@@ -617,7 +630,7 @@ impl ParquetReaderBuilder {
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = index_applier
|
||||
.apply(
|
||||
self.file_handle.file_id(),
|
||||
self.file_handle.index_id(),
|
||||
Some(file_size_hint),
|
||||
metrics.inverted_index_apply_metrics.as_mut(),
|
||||
)
|
||||
@@ -696,7 +709,7 @@ impl ParquetReaderBuilder {
|
||||
});
|
||||
let apply_res = index_applier
|
||||
.apply(
|
||||
self.file_handle.file_id(),
|
||||
self.file_handle.index_id(),
|
||||
Some(file_size_hint),
|
||||
rgs,
|
||||
metrics.bloom_filter_apply_metrics.as_mut(),
|
||||
@@ -779,7 +792,7 @@ impl ParquetReaderBuilder {
|
||||
});
|
||||
let apply_res = index_applier
|
||||
.apply_coarse(
|
||||
self.file_handle.file_id(),
|
||||
self.file_handle.index_id(),
|
||||
Some(file_size_hint),
|
||||
rgs,
|
||||
metrics.fulltext_index_apply_metrics.as_mut(),
|
||||
|
||||
@@ -153,7 +153,7 @@ where
|
||||
metrics: &'a mut Metrics,
|
||||
) -> ParquetWriter<'a, F, I, P> {
|
||||
let init_file = FileId::random();
|
||||
let indexer = indexer_builder.build(init_file).await;
|
||||
let indexer = indexer_builder.build(init_file, 0).await;
|
||||
|
||||
ParquetWriter {
|
||||
path_provider,
|
||||
@@ -482,7 +482,7 @@ where
|
||||
.context(WriteParquetSnafu)?;
|
||||
self.writer = Some(arrow_writer);
|
||||
|
||||
let indexer = self.indexer_builder.build(self.current_file).await;
|
||||
let indexer = self.indexer_builder.build(self.current_file, 0).await;
|
||||
self.current_indexer = Some(indexer);
|
||||
|
||||
// safety: self.writer is assigned above
|
||||
|
||||
@@ -126,7 +126,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
|
||||
@@ -104,7 +104,7 @@ impl VersionControlBuilder {
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
@@ -195,7 +195,7 @@ pub(crate) fn apply_edit(
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
|
||||
@@ -481,12 +481,12 @@ async fn edit_region(
|
||||
|
||||
let index_file_index_key = IndexKey::new(
|
||||
region_id,
|
||||
file_meta.index_file_id().file_id(),
|
||||
FileType::Puffin,
|
||||
file_meta.index_id().file_id.file_id(),
|
||||
FileType::Puffin(file_meta.index_version),
|
||||
);
|
||||
let index_remote_path = location::index_file_path(
|
||||
layer.table_dir(),
|
||||
file_meta.file_id(),
|
||||
file_meta.index_id(),
|
||||
layer.path_type(),
|
||||
);
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::region::MitoRegionRef;
|
||||
use crate::request::{
|
||||
BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
|
||||
};
|
||||
use crate::sst::file::{FileHandle, RegionFileId};
|
||||
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
|
||||
use crate::sst::index::{
|
||||
IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
|
||||
};
|
||||
@@ -68,6 +68,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
row_group_size: WriteOptions::default().row_group_size,
|
||||
intermediate_manager,
|
||||
puffin_manager,
|
||||
write_cache_enabled: self.cache_manager.write_cache().is_some(),
|
||||
});
|
||||
|
||||
IndexBuildTask {
|
||||
@@ -216,7 +217,8 @@ impl<S> RegionWorkerLoop<S> {
|
||||
let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
|
||||
for file_meta in &request.edit.files_to_add {
|
||||
let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
|
||||
cache_strategy.evict_puffin_cache(region_file_id).await;
|
||||
let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
|
||||
cache_strategy.evict_puffin_cache(index_id).await;
|
||||
}
|
||||
|
||||
region.version_control.apply_edit(
|
||||
|
||||
@@ -13,14 +13,15 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::warn;
|
||||
use datafusion::arrow::array::AsArray;
|
||||
use datafusion::arrow::compute::{self, SortOptions, concat_batches};
|
||||
use datafusion::arrow::array::{Array, AsArray, StringArray};
|
||||
use datafusion::arrow::compute::{SortOptions, concat_batches};
|
||||
use datafusion::arrow::datatypes::{DataType, Float64Type, SchemaRef};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::common::stats::Precision;
|
||||
@@ -40,8 +41,8 @@ use datafusion::physical_plan::{
|
||||
};
|
||||
use datafusion::prelude::{Column, Expr};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
|
||||
use datatypes::value::{OrderedF64, ValueRef};
|
||||
use datatypes::vectors::{Helper, MutableVector};
|
||||
use datatypes::value::{OrderedF64, Value, ValueRef};
|
||||
use datatypes::vectors::{Helper, MutableVector, VectorRef};
|
||||
use futures::{Stream, StreamExt, ready};
|
||||
|
||||
/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later
|
||||
@@ -358,6 +359,9 @@ impl ExecutionPlan for HistogramFoldExec {
|
||||
input_buffer: vec![],
|
||||
input,
|
||||
output_schema,
|
||||
input_schema: self.input.schema(),
|
||||
mode: FoldMode::Optimistic,
|
||||
safe_group: None,
|
||||
metric: baseline_metric,
|
||||
batch_size,
|
||||
input_buffered_rows: 0,
|
||||
@@ -430,6 +434,12 @@ impl DisplayAs for HistogramFoldExec {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum FoldMode {
|
||||
Optimistic,
|
||||
Safe,
|
||||
}
|
||||
|
||||
pub struct HistogramFoldStream {
|
||||
// internal states
|
||||
le_column_index: usize,
|
||||
@@ -441,6 +451,9 @@ pub struct HistogramFoldStream {
|
||||
/// Expected output batch size
|
||||
batch_size: usize,
|
||||
output_schema: SchemaRef,
|
||||
input_schema: SchemaRef,
|
||||
mode: FoldMode,
|
||||
safe_group: Option<SafeGroup>,
|
||||
|
||||
// buffers
|
||||
input_buffer: Vec<RecordBatch>,
|
||||
@@ -453,6 +466,13 @@ pub struct HistogramFoldStream {
|
||||
metric: BaselineMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct SafeGroup {
|
||||
tag_values: Vec<Value>,
|
||||
buckets: Vec<f64>,
|
||||
counters: Vec<f64>,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for HistogramFoldStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
@@ -478,7 +498,10 @@ impl Stream for HistogramFoldStream {
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
break Poll::Ready(Some(result));
|
||||
}
|
||||
None => break Poll::Ready(self.take_output_buf()?.map(Ok)),
|
||||
None => {
|
||||
self.flush_remaining()?;
|
||||
break Poll::Ready(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
}
|
||||
};
|
||||
self.metric.record_poll(poll)
|
||||
@@ -491,22 +514,28 @@ impl HistogramFoldStream {
|
||||
&mut self,
|
||||
input: RecordBatch,
|
||||
) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
|
||||
let Some(bucket_num) = self.calculate_bucket_num(&input)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
match self.mode {
|
||||
FoldMode::Safe => {
|
||||
self.push_input_buf(input);
|
||||
self.process_safe_mode_buffer()?;
|
||||
}
|
||||
FoldMode::Optimistic => {
|
||||
self.push_input_buf(input);
|
||||
let Some(bucket_num) = self.calculate_bucket_num_from_buffer()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
self.bucket_size = Some(bucket_num);
|
||||
|
||||
if self.input_buffered_rows + input.num_rows() < bucket_num {
|
||||
// not enough rows to fold
|
||||
self.push_input_buf(input);
|
||||
return Ok(None);
|
||||
if self.input_buffered_rows < bucket_num {
|
||||
// not enough rows to fold
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.fold_buf(bucket_num)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.fold_buf(bucket_num, input)?;
|
||||
if self.output_buffered_rows >= self.batch_size {
|
||||
return Ok(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
self.maybe_take_output()
|
||||
}
|
||||
|
||||
/// Generate a group of empty [MutableVector]s from the output schema.
|
||||
@@ -532,55 +561,100 @@ impl HistogramFoldStream {
|
||||
Ok(builders)
|
||||
}
|
||||
|
||||
fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
|
||||
/// Determines bucket count using buffered batches, concatenating them to
|
||||
/// detect the first complete bucket that may span batch boundaries.
|
||||
fn calculate_bucket_num_from_buffer(&mut self) -> DataFusionResult<Option<usize>> {
|
||||
if let Some(size) = self.bucket_size {
|
||||
return Ok(Some(size));
|
||||
}
|
||||
|
||||
let inf_pos = self.find_positive_inf(batch)?;
|
||||
if inf_pos == batch.num_rows() {
|
||||
// no positive inf found, append to buffer and wait for next batch
|
||||
self.push_input_buf(batch.clone());
|
||||
if self.input_buffer.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// else we found the positive inf.
|
||||
// calculate the bucket size
|
||||
let bucket_size = inf_pos + self.input_buffered_rows + 1;
|
||||
Ok(Some(bucket_size))
|
||||
let batch_refs: Vec<&RecordBatch> = self.input_buffer.iter().collect();
|
||||
let batch = concat_batches(&self.input_schema, batch_refs)?;
|
||||
self.find_first_complete_bucket(&batch)
|
||||
}
|
||||
|
||||
fn find_first_complete_bucket(&self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
|
||||
if batch.num_rows() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let vectors = Helper::try_into_vectors(batch.columns())
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
|
||||
let le_array = batch.column(self.le_column_index).as_string::<i32>();
|
||||
|
||||
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
|
||||
self.collect_tag_values(&vectors, 0, &mut tag_values_buf);
|
||||
let mut group_start = 0usize;
|
||||
|
||||
for row in 0..batch.num_rows() {
|
||||
if !self.is_same_group(&vectors, row, &tag_values_buf) {
|
||||
// new group begins
|
||||
self.collect_tag_values(&vectors, row, &mut tag_values_buf);
|
||||
group_start = row;
|
||||
}
|
||||
|
||||
if Self::is_positive_infinity(le_array, row) {
|
||||
return Ok(Some(row - group_start + 1));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Fold record batches from input buffer and put to output buffer
|
||||
fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> {
|
||||
self.push_input_buf(input);
|
||||
// TODO(ruihang): this concat is avoidable.
|
||||
let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?;
|
||||
fn fold_buf(&mut self, bucket_num: usize) -> DataFusionResult<()> {
|
||||
let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?;
|
||||
let mut remaining_rows = self.input_buffered_rows;
|
||||
let mut cursor = 0;
|
||||
|
||||
// TODO(LFC): Try to get rid of the Arrow array to vector conversion here.
|
||||
let vectors = Helper::try_into_vectors(batch.columns())
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
|
||||
let le_array = batch.column(self.le_column_index);
|
||||
let le_array = le_array.as_string::<i32>();
|
||||
let field_array = batch.column(self.field_column_index);
|
||||
let field_array = field_array.as_primitive::<Float64Type>();
|
||||
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
|
||||
|
||||
while remaining_rows >= bucket_num && self.mode == FoldMode::Optimistic {
|
||||
self.collect_tag_values(&vectors, cursor, &mut tag_values_buf);
|
||||
if !self.validate_optimistic_group(
|
||||
&vectors,
|
||||
le_array,
|
||||
cursor,
|
||||
bucket_num,
|
||||
&tag_values_buf,
|
||||
) {
|
||||
let remaining_input_batch = batch.slice(cursor, remaining_rows);
|
||||
self.switch_to_safe_mode(remaining_input_batch)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
while remaining_rows >= bucket_num {
|
||||
// "sample" normal columns
|
||||
for normal_index in &self.normal_indices {
|
||||
let val = vectors[*normal_index].get(cursor);
|
||||
self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref());
|
||||
for (idx, value) in self.normal_indices.iter().zip(tag_values_buf.iter()) {
|
||||
self.output_buffer[*idx].push_value_ref(value);
|
||||
}
|
||||
// "fold" `le` and field columns
|
||||
let le_array = batch.column(self.le_column_index);
|
||||
let le_array = le_array.as_string::<i32>();
|
||||
let field_array = batch.column(self.field_column_index);
|
||||
let field_array = field_array.as_primitive::<Float64Type>();
|
||||
let mut bucket = vec![];
|
||||
let mut counters = vec![];
|
||||
let mut bucket = Vec::with_capacity(bucket_num);
|
||||
let mut counters = Vec::with_capacity(bucket_num);
|
||||
for bias in 0..bucket_num {
|
||||
let le_str = le_array.value(cursor + bias);
|
||||
let le = le_str.parse::<f64>().unwrap();
|
||||
let position = cursor + bias;
|
||||
let le = if le_array.is_valid(position) {
|
||||
le_array.value(position).parse::<f64>().unwrap_or(f64::NAN)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
bucket.push(le);
|
||||
|
||||
let counter = field_array.value(cursor + bias);
|
||||
let counter = if field_array.is_valid(position) {
|
||||
field_array.value(position)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
counters.push(counter);
|
||||
}
|
||||
// ignore invalid data
|
||||
@@ -593,7 +667,9 @@ impl HistogramFoldStream {
|
||||
|
||||
let remaining_input_batch = batch.slice(cursor, remaining_rows);
|
||||
self.input_buffered_rows = remaining_input_batch.num_rows();
|
||||
self.input_buffer.push(remaining_input_batch);
|
||||
if self.input_buffered_rows > 0 {
|
||||
self.input_buffer.push(remaining_input_batch);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -603,6 +679,170 @@ impl HistogramFoldStream {
|
||||
self.input_buffer.push(batch);
|
||||
}
|
||||
|
||||
fn maybe_take_output(&mut self) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
|
||||
if self.output_buffered_rows >= self.batch_size {
|
||||
return Ok(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn switch_to_safe_mode(&mut self, remaining_batch: RecordBatch) -> DataFusionResult<()> {
|
||||
self.mode = FoldMode::Safe;
|
||||
self.bucket_size = None;
|
||||
self.input_buffer.clear();
|
||||
self.input_buffered_rows = remaining_batch.num_rows();
|
||||
|
||||
if self.input_buffered_rows > 0 {
|
||||
self.input_buffer.push(remaining_batch);
|
||||
self.process_safe_mode_buffer()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn collect_tag_values<'a>(
|
||||
&self,
|
||||
vectors: &'a [VectorRef],
|
||||
row: usize,
|
||||
tag_values: &mut Vec<ValueRef<'a>>,
|
||||
) {
|
||||
tag_values.clear();
|
||||
for idx in self.normal_indices.iter() {
|
||||
tag_values.push(vectors[*idx].get_ref(row));
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_optimistic_group(
|
||||
&self,
|
||||
vectors: &[VectorRef],
|
||||
le_array: &StringArray,
|
||||
cursor: usize,
|
||||
bucket_num: usize,
|
||||
tag_values: &[ValueRef<'_>],
|
||||
) -> bool {
|
||||
let inf_index = cursor + bucket_num - 1;
|
||||
if !Self::is_positive_infinity(le_array, inf_index) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for offset in 1..bucket_num {
|
||||
let row = cursor + offset;
|
||||
for (idx, expected) in self.normal_indices.iter().zip(tag_values.iter()) {
|
||||
if vectors[*idx].get_ref(row) != *expected {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/// Checks whether a row belongs to the current group (same series).
|
||||
fn is_same_group(
|
||||
&self,
|
||||
vectors: &[VectorRef],
|
||||
row: usize,
|
||||
tag_values: &[ValueRef<'_>],
|
||||
) -> bool {
|
||||
self.normal_indices
|
||||
.iter()
|
||||
.zip(tag_values.iter())
|
||||
.all(|(idx, expected)| vectors[*idx].get_ref(row) == *expected)
|
||||
}
|
||||
|
||||
fn push_output_row(&mut self, tag_values: &[ValueRef<'_>], result: f64) {
|
||||
debug_assert_eq!(self.normal_indices.len(), tag_values.len());
|
||||
for (idx, value) in self.normal_indices.iter().zip(tag_values.iter()) {
|
||||
self.output_buffer[*idx].push_value_ref(value);
|
||||
}
|
||||
self.output_buffer[self.field_column_index].push_value_ref(&ValueRef::from(result));
|
||||
self.output_buffered_rows += 1;
|
||||
}
|
||||
|
||||
fn finalize_safe_group(&mut self) -> DataFusionResult<()> {
|
||||
if let Some(group) = self.safe_group.take() {
|
||||
if group.tag_values.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let has_inf = group
|
||||
.buckets
|
||||
.last()
|
||||
.map(|v| v.is_infinite() && v.is_sign_positive())
|
||||
.unwrap_or(false);
|
||||
let result = if group.buckets.len() < 2 || !has_inf {
|
||||
f64::NAN
|
||||
} else {
|
||||
Self::evaluate_row(self.quantile, &group.buckets, &group.counters)
|
||||
.unwrap_or(f64::NAN)
|
||||
};
|
||||
let mut tag_value_refs = Vec::with_capacity(group.tag_values.len());
|
||||
tag_value_refs.extend(group.tag_values.iter().map(|v| v.as_value_ref()));
|
||||
self.push_output_row(&tag_value_refs, result);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_safe_mode_buffer(&mut self) -> DataFusionResult<()> {
|
||||
if self.input_buffer.is_empty() {
|
||||
self.input_buffered_rows = 0;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?;
|
||||
self.input_buffered_rows = 0;
|
||||
let vectors = Helper::try_into_vectors(batch.columns())
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
|
||||
let le_array = batch.column(self.le_column_index).as_string::<i32>();
|
||||
let field_array = batch
|
||||
.column(self.field_column_index)
|
||||
.as_primitive::<Float64Type>();
|
||||
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
|
||||
|
||||
for row in 0..batch.num_rows() {
|
||||
self.collect_tag_values(&vectors, row, &mut tag_values_buf);
|
||||
let should_start_new_group = self
|
||||
.safe_group
|
||||
.as_ref()
|
||||
.is_none_or(|group| !Self::tag_values_equal(&group.tag_values, &tag_values_buf));
|
||||
if should_start_new_group {
|
||||
self.finalize_safe_group()?;
|
||||
self.safe_group = Some(SafeGroup {
|
||||
tag_values: tag_values_buf.iter().cloned().map(Value::from).collect(),
|
||||
buckets: Vec::new(),
|
||||
counters: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
let Some(group) = self.safe_group.as_mut() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let bucket = if le_array.is_valid(row) {
|
||||
le_array.value(row).parse::<f64>().unwrap_or(f64::NAN)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
let counter = if field_array.is_valid(row) {
|
||||
field_array.value(row)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
|
||||
group.buckets.push(bucket);
|
||||
group.counters.push(counter);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn tag_values_equal(group_values: &[Value], current: &[ValueRef<'_>]) -> bool {
|
||||
group_values.len() == current.len()
|
||||
&& group_values
|
||||
.iter()
|
||||
.zip(current.iter())
|
||||
.all(|(group, now)| group.as_value_ref() == *now)
|
||||
}
|
||||
|
||||
/// Compute result from output buffer
|
||||
fn take_output_buf(&mut self) -> DataFusionResult<Option<RecordBatch>> {
|
||||
if self.output_buffered_rows == 0 {
|
||||
@@ -630,41 +870,31 @@ impl HistogramFoldStream {
|
||||
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
|
||||
}
|
||||
|
||||
/// Find the first `+Inf` which indicates the end of the bucket group
|
||||
///
|
||||
/// If the return value equals to batch's num_rows means the it's not found
|
||||
/// in this batch
|
||||
fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult<usize> {
|
||||
// fuse this function. It should not be called when the
|
||||
// bucket size is already know.
|
||||
if let Some(bucket_size) = self.bucket_size {
|
||||
return Ok(bucket_size);
|
||||
}
|
||||
let string_le_array = batch.column(self.le_column_index);
|
||||
let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| {
|
||||
DataFusionError::Execution(format!(
|
||||
"cannot cast {} array to float64 array: {:?}",
|
||||
string_le_array.data_type(),
|
||||
e
|
||||
))
|
||||
})?;
|
||||
let le_as_f64_array = float_le_array
|
||||
.as_primitive_opt::<Float64Type>()
|
||||
.ok_or_else(|| {
|
||||
DataFusionError::Execution(format!(
|
||||
"expect a float64 array, but found {}",
|
||||
float_le_array.data_type()
|
||||
))
|
||||
})?;
|
||||
for (i, v) in le_as_f64_array.iter().enumerate() {
|
||||
if let Some(v) = v
|
||||
&& v == f64::INFINITY
|
||||
{
|
||||
return Ok(i);
|
||||
fn flush_remaining(&mut self) -> DataFusionResult<()> {
|
||||
if self.mode == FoldMode::Optimistic && self.input_buffered_rows > 0 {
|
||||
let buffered_batches: Vec<_> = self.input_buffer.drain(..).collect();
|
||||
if !buffered_batches.is_empty() {
|
||||
let batch = concat_batches(&self.input_schema, buffered_batches.as_slice())?;
|
||||
self.switch_to_safe_mode(batch)?;
|
||||
} else {
|
||||
self.input_buffered_rows = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(batch.num_rows())
|
||||
if self.mode == FoldMode::Safe {
|
||||
self.process_safe_mode_buffer()?;
|
||||
self.finalize_safe_group()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_positive_infinity(le_array: &StringArray, index: usize) -> bool {
|
||||
le_array.is_valid(index)
|
||||
&& matches!(
|
||||
le_array.value(index).parse::<f64>(),
|
||||
Ok(value) if value.is_infinite() && value.is_sign_positive()
|
||||
)
|
||||
}
|
||||
|
||||
/// Evaluate the field column and return the result
|
||||
@@ -693,8 +923,28 @@ impl HistogramFoldStream {
|
||||
}
|
||||
|
||||
// check input value
|
||||
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
|
||||
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
|
||||
if !bucket.windows(2).all(|w| w[0] <= w[1]) {
|
||||
return Ok(f64::NAN);
|
||||
}
|
||||
let counter = {
|
||||
let needs_fix =
|
||||
counter.iter().any(|v| !v.is_finite()) || !counter.windows(2).all(|w| w[0] <= w[1]);
|
||||
if !needs_fix {
|
||||
Cow::Borrowed(counter)
|
||||
} else {
|
||||
let mut fixed = Vec::with_capacity(counter.len());
|
||||
let mut prev = 0.0;
|
||||
for (idx, &v) in counter.iter().enumerate() {
|
||||
let mut val = if v.is_finite() { v } else { prev };
|
||||
if idx > 0 && val < prev {
|
||||
val = prev;
|
||||
}
|
||||
fixed.push(val);
|
||||
prev = val;
|
||||
}
|
||||
Cow::Owned(fixed)
|
||||
}
|
||||
};
|
||||
|
||||
let total = *counter.last().unwrap();
|
||||
let expected_pos = total * quantile;
|
||||
@@ -713,6 +963,9 @@ impl HistogramFoldStream {
|
||||
lower_bound = bucket[fit_bucket_pos - 1];
|
||||
lower_count = counter[fit_bucket_pos - 1];
|
||||
}
|
||||
if (upper_count - lower_count).abs() < 1e-10 {
|
||||
return Ok(f64::NAN);
|
||||
}
|
||||
Ok(lower_bound
|
||||
+ (upper_bound - lower_bound) / (upper_count - lower_count)
|
||||
* (expected_pos - lower_count))
|
||||
@@ -724,8 +977,8 @@ impl HistogramFoldStream {
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::Float64Array;
|
||||
use datafusion::arrow::datatypes::{Field, Schema};
|
||||
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
use datafusion::arrow::datatypes::{Field, Schema, SchemaRef, TimeUnit};
|
||||
use datafusion::common::ToDFSchema;
|
||||
use datafusion::datasource::memory::MemorySourceConfig;
|
||||
use datafusion::datasource::source::DataSourceExec;
|
||||
@@ -792,6 +1045,43 @@ mod test {
|
||||
))
|
||||
}
|
||||
|
||||
fn build_fold_exec_from_batches(
|
||||
batches: Vec<RecordBatch>,
|
||||
schema: SchemaRef,
|
||||
quantile: f64,
|
||||
ts_column_index: usize,
|
||||
) -> Arc<HistogramFoldExec> {
|
||||
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(),
|
||||
)));
|
||||
let output_schema: SchemaRef = Arc::new(
|
||||
HistogramFold::convert_schema(
|
||||
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
|
||||
"le",
|
||||
)
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.clone(),
|
||||
);
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
|
||||
Arc::new(HistogramFoldExec {
|
||||
le_column_index: 1,
|
||||
field_column_index: 2,
|
||||
quantile,
|
||||
ts_column_index,
|
||||
input: memory_exec,
|
||||
output_schema,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
properties,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fold_overall() {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
@@ -863,6 +1153,187 @@ mod test {
|
||||
assert_eq!(actual, expected_output_schema)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fallback_to_safe_mode_on_missing_inf() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
let host_column = Arc::new(StringArray::from(vec!["a", "a", "a", "a", "b", "b"])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "+Inf", "0.1", "1.0", "0.1", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 3.0, 1.0, 5.0])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+------+-----+
|
||||
| host | val |
|
||||
+------+-----+
|
||||
| a | 0.1 |
|
||||
| a | NaN |
|
||||
| b | 0.1 |
|
||||
+------+-----+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn emit_nan_when_no_inf_present() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
let host_column = Arc::new(StringArray::from(vec!["c", "c"])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec!["0.1", "1.0"])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.9, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+------+-----+
|
||||
| host | val |
|
||||
+------+-----+
|
||||
| c | NaN |
|
||||
+------+-----+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn safe_mode_handles_misaligned_groups() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
|
||||
2900000, 2900000, 2900000, 3000000, 3000000, 3000000, 3000000, 3005000, 3005000,
|
||||
3010000, 3010000, 3010000, 3010000, 3010000,
|
||||
])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "1", "5", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![
|
||||
0.0, 0.0, 0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0,
|
||||
])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut values = Vec::new();
|
||||
for batch in result {
|
||||
let array = batch.column(1).as_primitive::<Float64Type>();
|
||||
values.extend(array.iter().map(|v| v.unwrap()));
|
||||
}
|
||||
|
||||
assert_eq!(values.len(), 4);
|
||||
assert!(values[0].is_nan());
|
||||
assert!((values[1] - 0.55).abs() < 1e-10);
|
||||
assert!((values[2] - 0.1).abs() < 1e-10);
|
||||
assert!((values[3] - 2.0).abs() < 1e-10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_buckets_at_first_timestamp() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
|
||||
2_900_000, 3_000_000, 3_000_000, 3_000_000, 3_000_000, 3_005_000, 3_005_000, 3_010_000,
|
||||
3_010_000, 3_010_000, 3_010_000, 3_010_000,
|
||||
])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![
|
||||
0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0,
|
||||
])) as _;
|
||||
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut values = Vec::new();
|
||||
for batch in result {
|
||||
let array = batch.column(1).as_primitive::<Float64Type>();
|
||||
values.extend(array.iter().map(|v| v.unwrap()));
|
||||
}
|
||||
|
||||
assert_eq!(values.len(), 4);
|
||||
assert!(values[0].is_nan());
|
||||
assert!((values[1] - 0.55).abs() < 1e-10);
|
||||
assert!((values[2] - 0.1).abs() < 1e-10);
|
||||
assert!((values[3] - 2.0).abs() < 1e-10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_inf_in_first_group() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
|
||||
1000, 1000, 1000, 2000, 2000, 2000, 2000,
|
||||
])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "1", "5", "0.1", "1", "5", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![
|
||||
0.0, 0.0, 0.0, 10.0, 20.0, 30.0, 30.0,
|
||||
])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut values = Vec::new();
|
||||
for batch in result {
|
||||
let array = batch.column(1).as_primitive::<Float64Type>();
|
||||
values.extend(array.iter().map(|v| v.unwrap()));
|
||||
}
|
||||
|
||||
assert_eq!(values.len(), 2);
|
||||
assert!(values[0].is_nan());
|
||||
assert!((values[1] - 0.55).abs() < 1e-10, "{values:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_row_normal_case() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY];
|
||||
@@ -935,11 +1406,11 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn evaluate_out_of_order_input() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY];
|
||||
let counters = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0];
|
||||
HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert_eq!(0.0, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -957,4 +1428,20 @@ mod test {
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert_eq!(3.0, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_non_monotonic_counter() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY];
|
||||
let counters = [0.1, 0.2, 0.4, 0.17, 0.5];
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert!((result - 1.25).abs() < 1e-10, "{result}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_nan_counter() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY];
|
||||
let counters = [f64::NAN, 1.0, 2.0, 3.0, 3.0];
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert!((result - 1.5).abs() < 1e-10, "{result}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,10 @@ impl<S, F> FsPuffinManager<S, F> {
|
||||
self.puffin_metadata_cache = puffin_metadata_cache;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn file_accessor(&self) -> &F {
|
||||
&self.puffin_file_accessor
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.11.8
|
||||
v0.11.9
|
||||
|
||||
@@ -47,8 +47,8 @@ pub struct ManifestSstEntry {
|
||||
pub region_sequence: RegionSeq,
|
||||
/// Engine-specific file identifier (string form).
|
||||
pub file_id: String,
|
||||
/// Engine-specific index file identifier (string form).
|
||||
pub index_file_id: Option<String>,
|
||||
/// Index version, increment when the index file is rebuilt.
|
||||
pub index_version: u64,
|
||||
/// SST level.
|
||||
pub level: u8,
|
||||
/// Full path of the SST file in object store.
|
||||
@@ -91,7 +91,7 @@ impl ManifestSstEntry {
|
||||
ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
|
||||
ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
|
||||
ColumnSchema::new("file_id", Ty::string_datatype(), false),
|
||||
ColumnSchema::new("index_file_id", Ty::string_datatype(), true),
|
||||
ColumnSchema::new("index_version", Ty::uint64_datatype(), false),
|
||||
ColumnSchema::new("level", Ty::uint8_datatype(), false),
|
||||
ColumnSchema::new("file_path", Ty::string_datatype(), false),
|
||||
ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
|
||||
@@ -119,7 +119,7 @@ impl ManifestSstEntry {
|
||||
let region_groups = entries.iter().map(|e| e.region_group);
|
||||
let region_sequences = entries.iter().map(|e| e.region_sequence);
|
||||
let file_ids = entries.iter().map(|e| e.file_id.as_str());
|
||||
let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref());
|
||||
let index_versions = entries.iter().map(|e| e.index_version);
|
||||
let levels = entries.iter().map(|e| e.level);
|
||||
let file_paths = entries.iter().map(|e| e.file_path.as_str());
|
||||
let file_sizes = entries.iter().map(|e| e.file_size);
|
||||
@@ -151,7 +151,7 @@ impl ManifestSstEntry {
|
||||
Arc::new(UInt8Array::from_iter_values(region_groups)),
|
||||
Arc::new(UInt32Array::from_iter_values(region_sequences)),
|
||||
Arc::new(StringArray::from_iter_values(file_ids)),
|
||||
Arc::new(StringArray::from_iter(index_file_ids)),
|
||||
Arc::new(UInt64Array::from_iter(index_versions)),
|
||||
Arc::new(UInt8Array::from_iter_values(levels)),
|
||||
Arc::new(StringArray::from_iter_values(file_paths)),
|
||||
Arc::new(UInt64Array::from_iter_values(file_sizes)),
|
||||
@@ -437,7 +437,7 @@ mod tests {
|
||||
region_group: region_group1,
|
||||
region_sequence: region_seq1,
|
||||
file_id: "f1".to_string(),
|
||||
index_file_id: None,
|
||||
index_version: 0,
|
||||
level: 1,
|
||||
file_path: "/p1".to_string(),
|
||||
file_size: 100,
|
||||
@@ -461,7 +461,7 @@ mod tests {
|
||||
region_group: region_group2,
|
||||
region_sequence: region_seq2,
|
||||
file_id: "f2".to_string(),
|
||||
index_file_id: Some("idx".to_string()),
|
||||
index_version: 1,
|
||||
level: 3,
|
||||
file_path: "/p2".to_string(),
|
||||
file_size: 200,
|
||||
@@ -548,13 +548,13 @@ mod tests {
|
||||
assert_eq!("f1", file_ids.value(0));
|
||||
assert_eq!("f2", file_ids.value(1));
|
||||
|
||||
let index_file_ids = batch
|
||||
let index_versions = batch
|
||||
.column(7)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
assert!(index_file_ids.is_null(0));
|
||||
assert_eq!("idx", index_file_ids.value(1));
|
||||
assert_eq!(0, index_versions.value(0));
|
||||
assert_eq!(1, index_versions.value(1));
|
||||
|
||||
let levels = batch
|
||||
.column(8)
|
||||
|
||||
@@ -26,6 +26,6 @@ pub use datatypes::schema::{
|
||||
};
|
||||
|
||||
pub use self::descriptors::*;
|
||||
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
|
||||
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
|
||||
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
pub use self::types::{SequenceNumber, SequenceRange};
|
||||
|
||||
@@ -24,6 +24,9 @@ use uuid::Uuid;
|
||||
use crate::ManifestVersion;
|
||||
use crate::storage::RegionId;
|
||||
|
||||
/// Index version, incremented when the index file is rebuilt.
|
||||
pub type IndexVersion = u64;
|
||||
|
||||
#[derive(Debug, Snafu, PartialEq)]
|
||||
pub struct ParseIdError {
|
||||
source: uuid::Error,
|
||||
@@ -70,15 +73,21 @@ impl FromStr for FileId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicating holding a `FileHandle` reference for a specific file&index in a region.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct FileRef {
|
||||
pub region_id: RegionId,
|
||||
pub file_id: FileId,
|
||||
pub index_version: IndexVersion,
|
||||
}
|
||||
|
||||
impl FileRef {
|
||||
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
|
||||
Self { region_id, file_id }
|
||||
pub fn new(region_id: RegionId, file_id: FileId, index_version: u64) -> Self {
|
||||
Self {
|
||||
region_id,
|
||||
file_id,
|
||||
index_version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ DESC TABLE information_schema.ssts_manifest;
|
||||
| region_group | UInt8 | | NO | | FIELD |
|
||||
| region_sequence | UInt32 | | NO | | FIELD |
|
||||
| file_id | String | | NO | | FIELD |
|
||||
| index_file_id | String | | YES | | FIELD |
|
||||
| index_version | UInt64 | | NO | | FIELD |
|
||||
| level | UInt8 | | NO | | FIELD |
|
||||
| file_path | String | | NO | | FIELD |
|
||||
| file_size | UInt64 | | NO | | FIELD |
|
||||
@@ -97,13 +97,13 @@ ADMIN FLUSH_TABLE('sst_case');
|
||||
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
|
||||
SELECT * FROM information_schema.ssts_manifest order by file_path;
|
||||
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
|
||||
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
|
||||
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
|
||||
@@ -165,15 +165,15 @@ ADMIN FLUSH_TABLE('sst_case');
|
||||
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
|
||||
SELECT * FROM information_schema.ssts_manifest order by file_path;
|
||||
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>|<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
|
||||
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
|
||||
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
|
||||
|
||||
@@ -363,3 +363,52 @@ drop table greptime_servers_postgres_query_elapsed_no_le;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- test case with some missing buckets
|
||||
create table histogram5_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
s string,
|
||||
val double,
|
||||
primary key (s, le),
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into histogram5_bucket values
|
||||
(3000000, "0.1", "a", 0),
|
||||
-- (3000000, "1", "a", 0),
|
||||
-- (3000000, "5", "a", 0),
|
||||
-- (3000000, "+Inf", "a", 0),
|
||||
(3005000, "0.1", "a", 50),
|
||||
(3005000, "1", "a", 70),
|
||||
(3005000, "5", "a", 110),
|
||||
(3005000, "+Inf", "a", 120),
|
||||
(3010000, "0.1", "a", 10),
|
||||
-- (3010000, "1", "a", 20),
|
||||
-- (3010000, "5", "a", 20),
|
||||
(3010000, "+Inf", "a", 30),
|
||||
(3015000, "0.1", "a", 10),
|
||||
(3015000, "1", "a", 10),
|
||||
(3015000, "3", "a", 20), --
|
||||
(3015000, "5", "a", 30),
|
||||
(3015000, "+Inf", "a", 50);
|
||||
|
||||
Affected Rows: 12
|
||||
|
||||
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
|
||||
|
||||
+---------------------+---+--------------------+
|
||||
| ts | s | val |
|
||||
+---------------------+---+--------------------+
|
||||
| 1970-01-01T00:50:00 | a | NaN |
|
||||
| 1970-01-01T00:50:03 | a | NaN |
|
||||
| 1970-01-01T00:50:06 | a | 0.5499999999999999 |
|
||||
| 1970-01-01T00:50:09 | a | 0.5499999999999999 |
|
||||
| 1970-01-01T00:50:12 | a | 0.775 |
|
||||
| 1970-01-01T00:50:15 | a | 4.0 |
|
||||
+---------------------+---+--------------------+
|
||||
|
||||
drop table histogram5_bucket;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -204,3 +204,36 @@ tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, le) (rate(g
|
||||
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fbf) (rate(greptime_servers_postgres_query_elapsed_no_le{instance=~"xxx"}[1m])));
|
||||
|
||||
drop table greptime_servers_postgres_query_elapsed_no_le;
|
||||
|
||||
-- test case with some missing buckets
|
||||
create table histogram5_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
s string,
|
||||
val double,
|
||||
primary key (s, le),
|
||||
);
|
||||
|
||||
insert into histogram5_bucket values
|
||||
(3000000, "0.1", "a", 0),
|
||||
-- (3000000, "1", "a", 0),
|
||||
-- (3000000, "5", "a", 0),
|
||||
-- (3000000, "+Inf", "a", 0),
|
||||
(3005000, "0.1", "a", 50),
|
||||
(3005000, "1", "a", 70),
|
||||
(3005000, "5", "a", 110),
|
||||
(3005000, "+Inf", "a", 120),
|
||||
(3010000, "0.1", "a", 10),
|
||||
-- (3010000, "1", "a", 20),
|
||||
-- (3010000, "5", "a", 20),
|
||||
(3010000, "+Inf", "a", 30),
|
||||
(3015000, "0.1", "a", 10),
|
||||
(3015000, "1", "a", 10),
|
||||
(3015000, "3", "a", 20), --
|
||||
(3015000, "5", "a", 30),
|
||||
(3015000, "+Inf", "a", 50);
|
||||
|
||||
|
||||
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
|
||||
|
||||
drop table histogram5_bucket;
|
||||
|
||||
@@ -400,9 +400,9 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | ssts_manifest | file_id | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | file_path | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | file_size | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_id | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_path | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_size | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | index_version | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | level | 9 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | max_ts | 18 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
|
||||
| greptime | information_schema | ssts_manifest | min_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
|
||||
|
||||
Reference in New Issue
Block a user