mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 07:20:41 +00:00
feat: add visible to sst entry for staging mode (#6964)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -72,7 +72,7 @@ impl RegionServer {
|
||||
})?
|
||||
};
|
||||
|
||||
let entries = mito.all_ssts_from_manifest().collect::<Vec<_>>();
|
||||
let entries = mito.all_ssts_from_manifest().await;
|
||||
let schema = ManifestSstEntry::schema().arrow_schema().clone();
|
||||
let batch = ManifestSstEntry::to_record_batch(&entries)
|
||||
.map_err(DataFusionError::from)
|
||||
|
||||
@@ -111,6 +111,8 @@ mod tests {
|
||||
let mito = env.mito();
|
||||
let debug_format = mito
|
||||
.all_ssts_from_manifest()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|mut e| {
|
||||
e.file_path = e.file_path.replace(&e.file_id, "<file_id>");
|
||||
e.index_file_path = e
|
||||
@@ -125,12 +127,12 @@ mod tests {
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None }"#
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
|
||||
);
|
||||
|
||||
// list from storage
|
||||
|
||||
@@ -414,16 +414,20 @@ impl MitoEngine {
|
||||
}
|
||||
|
||||
/// Lists all SSTs from the manifest of all regions in the engine.
|
||||
pub fn all_ssts_from_manifest(&self) -> impl Iterator<Item = ManifestSstEntry> + use<'_> {
|
||||
pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
|
||||
let node_id = self.inner.workers.file_ref_manager().node_id();
|
||||
self.inner
|
||||
.workers
|
||||
.all_regions()
|
||||
.flat_map(|region| region.manifest_sst_entries())
|
||||
.map(move |mut entry| {
|
||||
entry.node_id = node_id;
|
||||
entry
|
||||
})
|
||||
let regions = self.inner.workers.all_regions();
|
||||
|
||||
let mut results = Vec::new();
|
||||
for region in regions {
|
||||
let mut entries = region.manifest_sst_entries().await;
|
||||
for e in &mut entries {
|
||||
e.node_id = node_id;
|
||||
}
|
||||
results.extend(entries);
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
/// Lists all SSTs from the storage layer of all regions in the engine.
|
||||
|
||||
@@ -771,6 +771,8 @@ async fn test_list_ssts() {
|
||||
// list from manifest
|
||||
let debug_format = engine
|
||||
.all_ssts_from_manifest()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|mut e| {
|
||||
e.file_path = e.file_path.replace(&e.file_id, "<file_id>");
|
||||
e.index_file_path = e
|
||||
@@ -785,9 +787,9 @@ async fn test_list_ssts() {
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None }"#
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#
|
||||
);
|
||||
|
||||
// list from storage
|
||||
|
||||
@@ -369,6 +369,11 @@ async fn test_staging_exit_success_with_manifests() {
|
||||
"No data should be readable before exit staging mode"
|
||||
);
|
||||
|
||||
// Inspect SSTs from manifest
|
||||
let sst_entries = engine.all_ssts_from_manifest().await;
|
||||
assert_eq!(sst_entries.len(), 2);
|
||||
assert!(sst_entries.iter().all(|e| !e.visible));
|
||||
|
||||
// Exit staging mode successfully
|
||||
engine
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
|
||||
@@ -424,4 +429,9 @@ async fn test_staging_exit_success_with_manifests() {
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
|
||||
assert_eq!(total_rows, 10, "Expected to read all staged rows");
|
||||
|
||||
// Inspect SSTs from manifest
|
||||
let sst_entries = engine.all_ssts_from_manifest().await;
|
||||
assert_eq!(sst_entries.len(), 2);
|
||||
assert!(sst_entries.iter().all(|e| e.visible));
|
||||
}
|
||||
|
||||
@@ -18,8 +18,8 @@ pub mod opener;
|
||||
pub mod options;
|
||||
pub(crate) mod version;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
@@ -545,46 +545,55 @@ impl MitoRegion {
|
||||
}
|
||||
|
||||
/// Returns the SST entries of the region.
|
||||
pub fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
|
||||
pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
|
||||
let table_dir = self.table_dir();
|
||||
let path_type = self.access_layer.path_type();
|
||||
self.version()
|
||||
|
||||
let visible_ssts = self
|
||||
.version()
|
||||
.ssts
|
||||
.levels()
|
||||
.iter()
|
||||
.flat_map(|level| {
|
||||
level.files().map(|file| {
|
||||
let meta = file.meta_ref();
|
||||
let region_id = self.region_id;
|
||||
let origin_region_id = meta.region_id;
|
||||
let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
|
||||
let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
|
||||
(Some(index_file_path), Some(meta.index_file_size))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
ManifestSstEntry {
|
||||
table_dir: table_dir.to_string(),
|
||||
region_id,
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
region_group: region_id.region_group(),
|
||||
region_sequence: region_id.region_sequence(),
|
||||
file_id: meta.file_id.to_string(),
|
||||
level: meta.level,
|
||||
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
|
||||
file_size: meta.file_size,
|
||||
index_file_path,
|
||||
index_file_size,
|
||||
num_rows: meta.num_rows,
|
||||
num_row_groups: meta.num_row_groups,
|
||||
min_ts: meta.time_range.0,
|
||||
max_ts: meta.time_range.1,
|
||||
sequence: meta.sequence.map(|s| s.get()),
|
||||
origin_region_id,
|
||||
node_id: None,
|
||||
}
|
||||
})
|
||||
.flat_map(|level| level.files().map(|file| file.file_id().file_id()))
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
self.manifest_ctx
|
||||
.manifest()
|
||||
.await
|
||||
.files
|
||||
.values()
|
||||
.map(|meta| {
|
||||
let region_id = self.region_id;
|
||||
let origin_region_id = meta.region_id;
|
||||
let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
|
||||
let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
|
||||
(Some(index_file_path), Some(meta.index_file_size))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
let visible = visible_ssts.contains(&meta.file_id);
|
||||
ManifestSstEntry {
|
||||
table_dir: table_dir.to_string(),
|
||||
region_id,
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
region_group: region_id.region_group(),
|
||||
region_sequence: region_id.region_sequence(),
|
||||
file_id: meta.file_id.to_string(),
|
||||
level: meta.level,
|
||||
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
|
||||
file_size: meta.file_size,
|
||||
index_file_path,
|
||||
index_file_size,
|
||||
num_rows: meta.num_rows,
|
||||
num_row_groups: meta.num_row_groups,
|
||||
min_ts: meta.time_range.0,
|
||||
max_ts: meta.time_range.1,
|
||||
sequence: meta.sequence.map(|s| s.get()),
|
||||
origin_region_id,
|
||||
node_id: None,
|
||||
visible,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ use common_time::timestamp::TimeUnit;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
|
||||
use datatypes::arrow::array::{
|
||||
ArrayRef, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
|
||||
UInt64Array,
|
||||
ArrayRef, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
|
||||
UInt32Array, UInt64Array,
|
||||
};
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow_array::StringArray;
|
||||
@@ -71,6 +71,8 @@ pub struct ManifestSstEntry {
|
||||
pub origin_region_id: RegionId,
|
||||
/// The node id fetched from the manifest.
|
||||
pub node_id: Option<u64>,
|
||||
/// Whether this file is visible in current version.
|
||||
pub visible: bool,
|
||||
}
|
||||
|
||||
impl ManifestSstEntry {
|
||||
@@ -97,6 +99,7 @@ impl ManifestSstEntry {
|
||||
ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
|
||||
ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
|
||||
ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
|
||||
ColumnSchema::new("visible", Ty::boolean_datatype(), false),
|
||||
]))
|
||||
}
|
||||
|
||||
@@ -130,6 +133,7 @@ impl ManifestSstEntry {
|
||||
let sequences = entries.iter().map(|e| e.sequence);
|
||||
let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
|
||||
let node_ids = entries.iter().map(|e| e.node_id);
|
||||
let visible_flags = entries.iter().map(|e| Some(e.visible));
|
||||
|
||||
let columns: Vec<ArrayRef> = vec![
|
||||
Arc::new(StringArray::from_iter_values(table_dirs)),
|
||||
@@ -151,6 +155,7 @@ impl ManifestSstEntry {
|
||||
Arc::new(UInt64Array::from_iter(sequences)),
|
||||
Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
|
||||
Arc::new(UInt64Array::from_iter(node_ids)),
|
||||
Arc::new(BooleanArray::from_iter(visible_flags)),
|
||||
];
|
||||
|
||||
DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
|
||||
@@ -316,6 +321,7 @@ mod tests {
|
||||
sequence: None,
|
||||
origin_region_id: region_id1,
|
||||
node_id: Some(1),
|
||||
visible: false,
|
||||
},
|
||||
ManifestSstEntry {
|
||||
table_dir: "tdir2".to_string(),
|
||||
@@ -337,6 +343,7 @@ mod tests {
|
||||
sequence: Some(9),
|
||||
origin_region_id: region_id2,
|
||||
node_id: None,
|
||||
visible: true,
|
||||
},
|
||||
];
|
||||
|
||||
@@ -504,6 +511,14 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(1, node_ids.value(0));
|
||||
assert!(node_ids.is_null(1));
|
||||
|
||||
let visible = batch
|
||||
.column(19)
|
||||
.as_any()
|
||||
.downcast_ref::<BooleanArray>()
|
||||
.unwrap();
|
||||
assert!(!visible.value(0));
|
||||
assert!(visible.value(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user