mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat(mito): Optimize async index building with priority-based batching (#7034)
* feat: add priority-based batching to IndexBuildScheduler Signed-off-by: SNC123 <sinhco@outlook.com> * fix: clean old puffin-related cache Signed-off-by: SNC123 <sinhco@outlook.com> * test: add test for IndexBuildScheduler Signed-off-by: SNC123 <sinhco@outlook.com> * feat: different index file id for read and async write Signed-off-by: SNC123 <sinhco@outlook.com> * feat: different index file id for delete Signed-off-by: SNC123 <sinhco@outlook.com> * chore: clippy Signed-off-by: SNC123 <sinhco@outlook.com> * fix: apply suggestions Signed-off-by: SNC123 <sinhco@outlook.com> * fix: apply comments Signed-off-by: SNC123 <sinhco@outlook.com> * combine files and index files Signed-off-by: SNC123 <sinhco@outlook.com> * feat: add index_file_id into ManifestSstEntry Signed-off-by: SNC123 <sinhco@outlook.com> * Update src/mito2/src/gc.rs Signed-off-by: SNC123 <sinhco@outlook.com> * resolve conflicts Signed-off-by: SNC123 <sinhco@outlook.com> * fix: sqlness Signed-off-by: SNC123 <sinhco@outlook.com> * chore: fmt Signed-off-by: SNC123 <sinhco@outlook.com> --------- Signed-off-by: SNC123 <sinhco@outlook.com>
This commit is contained in:
@@ -162,6 +162,7 @@ impl ObjbenchCommand {
|
||||
file_size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows,
|
||||
num_row_groups,
|
||||
sequence: None,
|
||||
|
||||
@@ -119,6 +119,7 @@ mod tests {
|
||||
.index_file_path
|
||||
.map(|path| path.replace(&e.file_id, "<file_id>"));
|
||||
e.file_id = "<file_id>".to_string();
|
||||
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
|
||||
format!("\n{:?}", e)
|
||||
})
|
||||
.sorted()
|
||||
@@ -127,12 +128,12 @@ mod tests {
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
|
||||
);
|
||||
// list from storage
|
||||
let storage_entries = mito
|
||||
|
||||
@@ -213,7 +213,11 @@ impl AccessLayer {
|
||||
}
|
||||
|
||||
/// Deletes a SST file (and its index file if it has one) with given file id.
|
||||
pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
|
||||
pub(crate) async fn delete_sst(
|
||||
&self,
|
||||
region_file_id: &RegionFileId,
|
||||
index_file_id: &RegionFileId,
|
||||
) -> Result<()> {
|
||||
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
@@ -222,7 +226,7 @@ impl AccessLayer {
|
||||
file_id: region_file_id.file_id(),
|
||||
})?;
|
||||
|
||||
let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type);
|
||||
let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
|
||||
@@ -432,6 +432,7 @@ impl Compactor for DefaultCompactor {
|
||||
file_size: sst_info.file_size,
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
index_file_id: None,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
sequence: max_sequence,
|
||||
|
||||
@@ -76,6 +76,7 @@ pub fn new_file_handle_with_size_and_sequence(
|
||||
file_size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
|
||||
@@ -500,19 +500,21 @@ impl MitoEngine {
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
let file_id = match FileId::parse_str(&entry.file_id) {
|
||||
let Some(index_file_id) = entry.index_file_id.as_ref() else {
|
||||
return Vec::new();
|
||||
};
|
||||
let file_id = match FileId::parse_str(index_file_id) {
|
||||
Ok(file_id) => file_id,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
err;
|
||||
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
|
||||
entry.table_dir,
|
||||
entry.file_id
|
||||
index_file_id
|
||||
);
|
||||
return Vec::new();
|
||||
}
|
||||
};
|
||||
|
||||
let region_file_id = RegionFileId::new(entry.region_id, file_id);
|
||||
let context = IndexEntryContext {
|
||||
table_dir: &entry.table_dir,
|
||||
@@ -522,7 +524,7 @@ impl MitoEngine {
|
||||
region_number: entry.region_number,
|
||||
region_group: entry.region_group,
|
||||
region_sequence: entry.region_sequence,
|
||||
file_id: &entry.file_id,
|
||||
file_id: index_file_id,
|
||||
index_file_size: entry.index_file_size,
|
||||
node_id,
|
||||
};
|
||||
|
||||
@@ -859,9 +859,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
|
||||
#[tokio::test]
|
||||
async fn test_list_ssts() {
|
||||
test_list_ssts_with_format(false, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
@@ -869,9 +869,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
|
||||
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
|
||||
test_list_ssts_with_format(true, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
@@ -943,6 +943,7 @@ async fn test_list_ssts_with_format(
|
||||
.index_file_path
|
||||
.map(|p| p.replace(&e.file_id, "<file_id>"));
|
||||
e.file_id = "<file_id>".to_string();
|
||||
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
|
||||
format!("\n{:?}", e)
|
||||
})
|
||||
.sorted()
|
||||
|
||||
@@ -638,6 +638,7 @@ impl RegionFlushTask {
|
||||
file_size: sst_info.file_size,
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
index_file_id: None,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
sequence: NonZeroU64::new(max_sequence),
|
||||
|
||||
@@ -365,7 +365,22 @@ impl LocalGcWorker {
|
||||
unused_len, region_id
|
||||
);
|
||||
|
||||
self.delete_files(region_id, &unused_files).await?;
|
||||
let file_pairs: Vec<(FileId, FileId)> = unused_files
|
||||
.iter()
|
||||
.filter_map(|file_id| {
|
||||
current_files
|
||||
.get(file_id)
|
||||
.map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
info!(
|
||||
"Found {} unused index files to delete for region {}",
|
||||
file_pairs.len(),
|
||||
region_id
|
||||
);
|
||||
|
||||
self.delete_files(region_id, &file_pairs).await?;
|
||||
|
||||
debug!(
|
||||
"Successfully deleted {} unused files for region {}",
|
||||
@@ -375,7 +390,7 @@ impl LocalGcWorker {
|
||||
Ok(unused_files)
|
||||
}
|
||||
|
||||
async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> {
|
||||
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
|
||||
delete_files(
|
||||
region_id,
|
||||
file_ids,
|
||||
|
||||
@@ -265,6 +265,7 @@ async fn checkpoint_with_different_compression_types() {
|
||||
file_size: 1024000,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
@@ -331,6 +332,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
|
||||
file_size: 1024000,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
|
||||
@@ -590,11 +590,17 @@ impl MitoRegion {
|
||||
.map(|meta| {
|
||||
let region_id = self.region_id;
|
||||
let origin_region_id = meta.region_id;
|
||||
let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
|
||||
let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
|
||||
(Some(index_file_path), Some(meta.index_file_size))
|
||||
let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
|
||||
{
|
||||
let index_file_path =
|
||||
index_file_path(table_dir, meta.index_file_id(), path_type);
|
||||
(
|
||||
Some(meta.index_file_id().file_id().to_string()),
|
||||
Some(index_file_path),
|
||||
Some(meta.index_file_size),
|
||||
)
|
||||
} else {
|
||||
(None, None)
|
||||
(None, None, None)
|
||||
};
|
||||
let visible = visible_ssts.contains(&meta.file_id);
|
||||
ManifestSstEntry {
|
||||
@@ -605,6 +611,7 @@ impl MitoRegion {
|
||||
region_group: region_id.region_group(),
|
||||
region_sequence: region_id.region_sequence(),
|
||||
file_id: meta.file_id.to_string(),
|
||||
index_file_id,
|
||||
level: meta.level,
|
||||
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
|
||||
file_size: meta.file_size,
|
||||
|
||||
@@ -427,6 +427,7 @@ mod tests {
|
||||
file_size: 1024,
|
||||
available_indexes: SmallVec::new(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 100,
|
||||
num_row_groups: 1,
|
||||
sequence: NonZeroU64::new(1),
|
||||
|
||||
@@ -39,7 +39,7 @@ use store_api::region_request::{
|
||||
RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
|
||||
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
use tokio::sync::oneshot::{self, Receiver, Sender};
|
||||
|
||||
use crate::error::{
|
||||
@@ -780,8 +780,9 @@ pub(crate) enum BackgroundNotify {
|
||||
FlushFailed(FlushFailed),
|
||||
/// Index build has finished.
|
||||
IndexBuildFinished(IndexBuildFinished),
|
||||
/// Index build has been stopped (aborted or succeeded).
|
||||
IndexBuildStopped(IndexBuildStopped),
|
||||
/// Index build has failed.
|
||||
#[allow(dead_code)]
|
||||
IndexBuildFailed(IndexBuildFailed),
|
||||
/// Compaction has finished.
|
||||
CompactionFinished(CompactionFinished),
|
||||
@@ -846,10 +847,17 @@ pub(crate) struct IndexBuildFinished {
|
||||
pub(crate) edit: RegionEdit,
|
||||
}
|
||||
|
||||
/// Notifies an index build job has been stopped.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct IndexBuildStopped {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) region_id: RegionId,
|
||||
pub(crate) file_id: FileId,
|
||||
}
|
||||
|
||||
/// Notifies an index build job has failed.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct IndexBuildFailed {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) err: Arc<Error>,
|
||||
}
|
||||
|
||||
|
||||
@@ -146,6 +146,12 @@ pub struct FileMeta {
|
||||
pub available_indexes: SmallVec<[IndexType; 4]>,
|
||||
/// Size of the index file.
|
||||
pub index_file_size: u64,
|
||||
/// File ID of the index file.
|
||||
///
|
||||
/// When this field is None, it means the index file id is the same as the file id.
|
||||
/// Only meaningful when index_file_size > 0.
|
||||
/// Used for rebuilding index files.
|
||||
pub index_file_id: Option<FileId>,
|
||||
/// Number of rows in the file.
|
||||
///
|
||||
/// For historical reasons, this field might be missing in old files. Thus
|
||||
@@ -259,6 +265,16 @@ impl FileMeta {
|
||||
pub fn file_id(&self) -> RegionFileId {
|
||||
RegionFileId::new(self.region_id, self.file_id)
|
||||
}
|
||||
|
||||
/// Returns the cross-region index file id.
|
||||
/// If the index file id is not set, returns the file id.
|
||||
pub fn index_file_id(&self) -> RegionFileId {
|
||||
if let Some(index_file_id) = self.index_file_id {
|
||||
RegionFileId::new(self.region_id, index_file_id)
|
||||
} else {
|
||||
self.file_id()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to a SST file.
|
||||
@@ -294,6 +310,16 @@ impl FileHandle {
|
||||
RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
|
||||
}
|
||||
|
||||
/// Returns the cross-region index file id.
|
||||
/// If the index file id is not set, returns the file id.
|
||||
pub fn index_file_id(&self) -> RegionFileId {
|
||||
if let Some(index_file_id) = self.inner.meta.index_file_id {
|
||||
RegionFileId::new(self.inner.meta.region_id, index_file_id)
|
||||
} else {
|
||||
self.file_id()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the complete file path of the file.
|
||||
pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
|
||||
location::sst_file_path(table_dir, self.file_id(), path_type)
|
||||
@@ -379,22 +405,28 @@ impl FileHandleInner {
|
||||
/// Delete
|
||||
pub async fn delete_files(
|
||||
region_id: RegionId,
|
||||
file_ids: &[FileId],
|
||||
file_ids: &[(FileId, FileId)],
|
||||
delete_index: bool,
|
||||
access_layer: &AccessLayerRef,
|
||||
cache_manager: &Option<CacheManagerRef>,
|
||||
) -> crate::error::Result<()> {
|
||||
// Remove meta of the file from cache.
|
||||
if let Some(cache) = &cache_manager {
|
||||
for file_id in file_ids {
|
||||
for (file_id, _) in file_ids {
|
||||
cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
|
||||
}
|
||||
}
|
||||
let mut deleted_files = Vec::with_capacity(file_ids.len());
|
||||
|
||||
for file_id in file_ids {
|
||||
for (file_id, index_file_id) in file_ids {
|
||||
let region_file_id = RegionFileId::new(region_id, *file_id);
|
||||
match access_layer.delete_sst(®ion_file_id).await {
|
||||
match access_layer
|
||||
.delete_sst(
|
||||
&RegionFileId::new(region_id, *file_id),
|
||||
&RegionFileId::new(region_id, *index_file_id),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
deleted_files.push(*file_id);
|
||||
}
|
||||
@@ -411,14 +443,12 @@ pub async fn delete_files(
|
||||
deleted_files
|
||||
);
|
||||
|
||||
for file_id in file_ids {
|
||||
let region_file_id = RegionFileId::new(region_id, *file_id);
|
||||
|
||||
for (file_id, index_file_id) in file_ids {
|
||||
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
|
||||
// Removes index file from the cache.
|
||||
if delete_index {
|
||||
write_cache
|
||||
.remove(IndexKey::new(region_id, *file_id, FileType::Puffin))
|
||||
.remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -431,11 +461,11 @@ pub async fn delete_files(
|
||||
// Purges index content in the stager.
|
||||
if let Err(e) = access_layer
|
||||
.puffin_manager_factory()
|
||||
.purge_stager(region_file_id)
|
||||
.purge_stager(RegionFileId::new(region_id, *index_file_id))
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
|
||||
file_id, region_id);
|
||||
index_file_id, region_id);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -459,6 +489,7 @@ mod tests {
|
||||
file_size: 0,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
@@ -505,6 +536,7 @@ mod tests {
|
||||
file_size: 0,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
|
||||
@@ -128,7 +128,7 @@ impl LocalFilePurger {
|
||||
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
|
||||
if let Err(e) = delete_files(
|
||||
file_meta.region_id,
|
||||
&[file_meta.file_id],
|
||||
&[(file_meta.file_id, file_meta.index_file_id().file_id())],
|
||||
file_meta.exists_index(),
|
||||
&sst_layer,
|
||||
&cache_manager,
|
||||
@@ -233,6 +233,7 @@ mod tests {
|
||||
file_size: 4096,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
@@ -300,6 +301,7 @@ mod tests {
|
||||
file_size: 4096,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
index_file_size: 4096,
|
||||
index_file_id: None,
|
||||
num_rows: 1024,
|
||||
num_row_groups: 1,
|
||||
sequence: NonZeroU64::new(4096),
|
||||
|
||||
@@ -235,6 +235,7 @@ mod tests {
|
||||
file_size: 4096,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
index_file_size: 4096,
|
||||
index_file_id: None,
|
||||
num_rows: 1024,
|
||||
num_row_groups: 1,
|
||||
sequence: NonZeroU64::new(4096),
|
||||
|
||||
@@ -21,11 +21,13 @@ pub mod puffin_manager;
|
||||
mod statistics;
|
||||
pub(crate) mod store;
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bloom_filter::creator::BloomFilterIndexer;
|
||||
use common_telemetry::{debug, info, warn};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use datatypes::arrow::array::BinaryArray;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::index::IndexValuesCodec;
|
||||
@@ -43,7 +45,10 @@ use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, Regio
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
|
||||
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
|
||||
use crate::error::{BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, Result};
|
||||
use crate::error::{
|
||||
BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
|
||||
RegionDroppedSnafu, RegionTruncatedSnafu, Result,
|
||||
};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
|
||||
use crate::read::{Batch, BatchReader};
|
||||
@@ -51,7 +56,8 @@ use crate::region::options::IndexOptions;
|
||||
use crate::region::version::VersionControlRef;
|
||||
use crate::region::{ManifestContextRef, RegionLeaderState};
|
||||
use crate::request::{
|
||||
BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime,
|
||||
BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
|
||||
WorkerRequestWithTime,
|
||||
};
|
||||
use crate::schedule::scheduler::{Job, SchedulerRef};
|
||||
use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
|
||||
@@ -410,7 +416,7 @@ impl IndexerBuilderImpl {
|
||||
}
|
||||
|
||||
/// Type of an index build task.
|
||||
#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
|
||||
#[derive(Debug, Clone, IntoStaticStr)]
|
||||
pub enum IndexBuildType {
|
||||
/// Build index when schema change.
|
||||
SchemaChange,
|
||||
@@ -426,6 +432,16 @@ impl IndexBuildType {
|
||||
fn as_str(&self) -> &'static str {
|
||||
self.into()
|
||||
}
|
||||
|
||||
// Higher value means higher priority.
|
||||
fn priority(&self) -> u8 {
|
||||
match self {
|
||||
IndexBuildType::Manual => 3,
|
||||
IndexBuildType::SchemaChange => 2,
|
||||
IndexBuildType::Flush => 1,
|
||||
IndexBuildType::Compact => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OperationType> for IndexBuildType {
|
||||
@@ -447,6 +463,7 @@ pub enum IndexBuildOutcome {
|
||||
/// Mpsc output result sender.
|
||||
pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexBuildTask {
|
||||
/// The file meta to build index for.
|
||||
pub file_meta: FileMeta,
|
||||
@@ -465,14 +482,24 @@ pub struct IndexBuildTask {
|
||||
pub(crate) result_sender: ResultMpscSender,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IndexBuildTask {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IndexBuildTask")
|
||||
.field("region_id", &self.file_meta.region_id)
|
||||
.field("file_id", &self.file_meta.file_id)
|
||||
.field("reason", &self.reason)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexBuildTask {
|
||||
/// Notify the caller the job is success.
|
||||
pub async fn on_success(&mut self, outcome: IndexBuildOutcome) {
|
||||
pub async fn on_success(&self, outcome: IndexBuildOutcome) {
|
||||
let _ = self.result_sender.send(Ok(outcome)).await;
|
||||
}
|
||||
|
||||
/// Send index build error to waiter.
|
||||
pub async fn on_failure(&mut self, err: Arc<Error>) {
|
||||
pub async fn on_failure(&self, err: Arc<Error>) {
|
||||
let _ = self
|
||||
.result_sender
|
||||
.send(Err(err.clone()).context(BuildIndexAsyncSnafu {
|
||||
@@ -503,7 +530,18 @@ impl IndexBuildTask {
|
||||
);
|
||||
self.on_failure(e.into()).await
|
||||
}
|
||||
}
|
||||
let worker_request = WorkerRequest::Background {
|
||||
region_id: self.file_meta.region_id,
|
||||
notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
|
||||
region_id: self.file_meta.region_id,
|
||||
file_id: self.file_meta.file_id,
|
||||
}),
|
||||
};
|
||||
let _ = self
|
||||
.request_sender
|
||||
.send(WorkerRequestWithTime::new(worker_request))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Checks if the SST file still exists in object store and version to avoid conflict with compaction.
|
||||
@@ -542,7 +580,13 @@ impl IndexBuildTask {
|
||||
&mut self,
|
||||
version_control: VersionControlRef,
|
||||
) -> Result<IndexBuildOutcome> {
|
||||
let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await;
|
||||
let index_file_id = if self.file_meta.index_file_size > 0 {
|
||||
// Generate new file ID if index file exists to avoid overwrite.
|
||||
FileId::random()
|
||||
} else {
|
||||
self.file_meta.file_id
|
||||
};
|
||||
let mut indexer = self.indexer_builder.build(index_file_id).await;
|
||||
|
||||
// Check SST file existence before building index to avoid failure of parquet reader.
|
||||
if !self.check_sst_file_exists(&version_control).await {
|
||||
@@ -602,9 +646,10 @@ impl IndexBuildTask {
|
||||
}
|
||||
|
||||
// Upload index file if write cache is enabled.
|
||||
self.maybe_upload_index_file(index_output.clone()).await?;
|
||||
self.maybe_upload_index_file(index_output.clone(), index_file_id)
|
||||
.await?;
|
||||
|
||||
let worker_request = match self.update_manifest(index_output).await {
|
||||
let worker_request = match self.update_manifest(index_output, index_file_id).await {
|
||||
Ok(edit) => {
|
||||
let index_build_finished = IndexBuildFinished {
|
||||
region_id: self.file_meta.region_id,
|
||||
@@ -632,14 +677,18 @@ impl IndexBuildTask {
|
||||
Ok(IndexBuildOutcome::Finished)
|
||||
}
|
||||
|
||||
async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> {
|
||||
async fn maybe_upload_index_file(
|
||||
&self,
|
||||
output: IndexOutput,
|
||||
index_file_id: FileId,
|
||||
) -> Result<()> {
|
||||
if let Some(write_cache) = &self.write_cache {
|
||||
let file_id = self.file_meta.file_id;
|
||||
let region_id = self.file_meta.region_id;
|
||||
let remote_store = self.access_layer.object_store();
|
||||
let mut upload_tracker = UploadTracker::new(region_id);
|
||||
let mut err = None;
|
||||
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
|
||||
let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
|
||||
let puffin_path = RegionFilePathFactory::new(
|
||||
self.access_layer.table_dir().to_string(),
|
||||
self.access_layer.path_type(),
|
||||
@@ -673,9 +722,14 @@ impl IndexBuildTask {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_manifest(&mut self, output: IndexOutput) -> Result<RegionEdit> {
|
||||
async fn update_manifest(
|
||||
&mut self,
|
||||
output: IndexOutput,
|
||||
index_file_id: FileId,
|
||||
) -> Result<RegionEdit> {
|
||||
self.file_meta.available_indexes = output.build_available_indexes();
|
||||
self.file_meta.index_file_size = output.file_size;
|
||||
self.file_meta.index_file_id = Some(index_file_id);
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![self.file_meta.clone()],
|
||||
files_to_remove: vec![],
|
||||
@@ -701,26 +755,205 @@ impl IndexBuildTask {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
impl PartialEq for IndexBuildTask {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.reason.priority() == other.reason.priority()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for IndexBuildTask {}
|
||||
|
||||
impl PartialOrd for IndexBuildTask {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for IndexBuildTask {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.reason.priority().cmp(&other.reason.priority())
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler].
|
||||
pub struct IndexBuildStatus {
|
||||
pub region_id: RegionId,
|
||||
pub building_files: HashSet<FileId>,
|
||||
pub pending_tasks: BinaryHeap<IndexBuildTask>,
|
||||
}
|
||||
|
||||
impl IndexBuildStatus {
|
||||
pub fn new(region_id: RegionId) -> Self {
|
||||
IndexBuildStatus {
|
||||
region_id,
|
||||
building_files: HashSet::new(),
|
||||
pending_tasks: BinaryHeap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_failure(self, err: Arc<Error>) {
|
||||
for task in self.pending_tasks {
|
||||
task.on_failure(err.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexBuildScheduler {
|
||||
/// Background job scheduler.
|
||||
scheduler: SchedulerRef,
|
||||
/// Tracks regions need to build index.
|
||||
region_status: HashMap<RegionId, IndexBuildStatus>,
|
||||
/// Limit of files allowed to build index concurrently for a region.
|
||||
files_limit: usize,
|
||||
}
|
||||
|
||||
/// Manager background index build tasks of a worker.
|
||||
impl IndexBuildScheduler {
|
||||
pub fn new(scheduler: SchedulerRef) -> Self {
|
||||
IndexBuildScheduler { scheduler }
|
||||
pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
|
||||
IndexBuildScheduler {
|
||||
scheduler,
|
||||
region_status: HashMap::new(),
|
||||
files_limit,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_build(
|
||||
pub(crate) async fn schedule_build(
|
||||
&mut self,
|
||||
version_control: &VersionControlRef,
|
||||
task: IndexBuildTask,
|
||||
) -> Result<()> {
|
||||
// We should clone version control to expand the lifetime.
|
||||
let job = task.into_index_build_job(version_control.clone());
|
||||
self.scheduler.schedule(job)?;
|
||||
let status = self
|
||||
.region_status
|
||||
.entry(task.file_meta.region_id)
|
||||
.or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
|
||||
|
||||
if status.building_files.contains(&task.file_meta.file_id) {
|
||||
let region_file_id =
|
||||
RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
|
||||
debug!(
|
||||
"Aborting index build task since index is already being built for region file {:?}",
|
||||
region_file_id
|
||||
);
|
||||
task.on_success(IndexBuildOutcome::Aborted(format!(
|
||||
"Index is already being built for region file {:?}",
|
||||
region_file_id
|
||||
)))
|
||||
.await;
|
||||
task.listener.on_index_build_abort(region_file_id).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
status.pending_tasks.push(task);
|
||||
|
||||
self.schedule_next_build_batch(version_control);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedule tasks until reaching the files limit or no more tasks.
|
||||
fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
|
||||
let mut building_count = 0;
|
||||
for status in self.region_status.values() {
|
||||
building_count += status.building_files.len();
|
||||
}
|
||||
|
||||
while building_count < self.files_limit {
|
||||
if let Some(task) = self.find_next_task() {
|
||||
let region_id = task.file_meta.region_id;
|
||||
let file_id = task.file_meta.file_id;
|
||||
let job = task.into_index_build_job(version_control.clone());
|
||||
if self.scheduler.schedule(job).is_ok() {
|
||||
if let Some(status) = self.region_status.get_mut(®ion_id) {
|
||||
status.building_files.insert(file_id);
|
||||
building_count += 1;
|
||||
status
|
||||
.pending_tasks
|
||||
.retain(|t| t.file_meta.file_id != file_id);
|
||||
} else {
|
||||
error!(
|
||||
"Region status not found when scheduling index build task, region: {}",
|
||||
region_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
"Failed to schedule index build job, region: {}, file_id: {}",
|
||||
region_id, file_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// No more tasks to schedule.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the next task which has the highest priority to run.
|
||||
fn find_next_task(&self) -> Option<IndexBuildTask> {
|
||||
self.region_status
|
||||
.iter()
|
||||
.filter_map(|(_, status)| status.pending_tasks.peek())
|
||||
.max()
|
||||
.cloned()
|
||||
}
|
||||
|
||||
pub(crate) fn on_task_stopped(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
version_control: &VersionControlRef,
|
||||
) {
|
||||
if let Some(status) = self.region_status.get_mut(®ion_id) {
|
||||
status.building_files.remove(&file_id);
|
||||
if status.building_files.is_empty() && status.pending_tasks.is_empty() {
|
||||
// No more tasks for this region, remove it.
|
||||
self.region_status.remove(®ion_id);
|
||||
}
|
||||
}
|
||||
|
||||
self.schedule_next_build_batch(version_control);
|
||||
}
|
||||
|
||||
pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
|
||||
error!(
|
||||
err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
|
||||
region_id
|
||||
);
|
||||
let Some(status) = self.region_status.remove(®ion_id) else {
|
||||
return;
|
||||
};
|
||||
status.on_failure(err).await;
|
||||
}
|
||||
|
||||
/// Notifies the scheduler that the region is dropped.
|
||||
pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
|
||||
self.remove_region_on_failure(
|
||||
region_id,
|
||||
Arc::new(RegionDroppedSnafu { region_id }.build()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Notifies the scheduler that the region is closed.
|
||||
pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
|
||||
self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Notifies the scheduler that the region is truncated.
|
||||
pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
|
||||
self.remove_region_on_failure(
|
||||
region_id,
|
||||
Arc::new(RegionTruncatedSnafu { region_id }.build()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
|
||||
let Some(status) = self.region_status.remove(®ion_id) else {
|
||||
return;
|
||||
};
|
||||
status.on_failure(err).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes primary keys from a flat format RecordBatch.
|
||||
@@ -1192,7 +1425,7 @@ mod tests {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let (tx, _rx) = mpsc::channel(4);
|
||||
let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
|
||||
let mut scheduler = env.mock_index_build_scheduler();
|
||||
let mut scheduler = env.mock_index_build_scheduler(4);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
|
||||
let file_purger = Arc::new(NoopFilePurger {});
|
||||
@@ -1222,7 +1455,10 @@ mod tests {
|
||||
};
|
||||
|
||||
// Schedule the build task and check result.
|
||||
scheduler.schedule_build(&version_control, task).unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task)
|
||||
.await
|
||||
.unwrap();
|
||||
match result_rx.recv().await.unwrap() {
|
||||
Ok(outcome) => {
|
||||
if outcome == IndexBuildOutcome::Finished {
|
||||
@@ -1236,7 +1472,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_index_build_task_sst_exist() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mut scheduler = env.mock_index_build_scheduler();
|
||||
let mut scheduler = env.mock_index_build_scheduler(4);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
|
||||
let region_id = metadata.region_id;
|
||||
@@ -1272,7 +1508,10 @@ mod tests {
|
||||
result_sender: result_tx,
|
||||
};
|
||||
|
||||
scheduler.schedule_build(&version_control, task).unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The task should finish successfully.
|
||||
match result_rx.recv().await.unwrap() {
|
||||
@@ -1304,7 +1543,7 @@ mod tests {
|
||||
|
||||
async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mut scheduler = env.mock_index_build_scheduler();
|
||||
let mut scheduler = env.mock_index_build_scheduler(4);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
|
||||
let file_purger = Arc::new(NoopFilePurger {});
|
||||
@@ -1340,7 +1579,10 @@ mod tests {
|
||||
result_sender: result_tx,
|
||||
};
|
||||
|
||||
scheduler.schedule_build(&version_control, task).unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let puffin_path = location::index_file_path(
|
||||
env.access_layer.table_dir(),
|
||||
@@ -1395,7 +1637,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_index_build_task_no_index() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mut scheduler = env.mock_index_build_scheduler();
|
||||
let mut scheduler = env.mock_index_build_scheduler(4);
|
||||
let mut metadata = sst_region_metadata();
|
||||
// Unset indexes in metadata to simulate no index scenario.
|
||||
metadata.column_metadatas.iter_mut().for_each(|col| {
|
||||
@@ -1437,7 +1679,10 @@ mod tests {
|
||||
result_sender: result_tx,
|
||||
};
|
||||
|
||||
scheduler.schedule_build(&version_control, task).unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The task should finish successfully.
|
||||
match result_rx.recv().await.unwrap() {
|
||||
@@ -1454,7 +1699,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_index_build_task_with_write_cache() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mut scheduler = env.mock_index_build_scheduler();
|
||||
let mut scheduler = env.mock_index_build_scheduler(4);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
|
||||
let file_purger = Arc::new(NoopFilePurger {});
|
||||
@@ -1518,7 +1763,10 @@ mod tests {
|
||||
result_sender: result_tx,
|
||||
};
|
||||
|
||||
scheduler.schedule_build(&version_control, task).unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The task should finish successfully.
|
||||
match result_rx.recv().await.unwrap() {
|
||||
@@ -1532,4 +1780,188 @@ mod tests {
|
||||
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
|
||||
assert!(write_cache.file_cache().contains_key(&index_key));
|
||||
}
|
||||
|
||||
async fn create_mock_task_for_schedule(
|
||||
env: &SchedulerEnv,
|
||||
file_id: FileId,
|
||||
region_id: RegionId,
|
||||
reason: IndexBuildType,
|
||||
) -> IndexBuildTask {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
|
||||
let file_purger = Arc::new(NoopFilePurger {});
|
||||
let indexer_builder = mock_indexer_builder(metadata, env).await;
|
||||
let (tx, _rx) = mpsc::channel(4);
|
||||
let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
|
||||
|
||||
IndexBuildTask {
|
||||
file_meta: FileMeta {
|
||||
region_id,
|
||||
file_id,
|
||||
file_size: 100,
|
||||
..Default::default()
|
||||
},
|
||||
reason,
|
||||
access_layer: env.access_layer.clone(),
|
||||
listener: WorkerListener::default(),
|
||||
manifest_ctx,
|
||||
write_cache: None,
|
||||
file_purger,
|
||||
indexer_builder,
|
||||
request_sender: tx,
|
||||
result_sender: result_tx,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scheduler_comprehensive() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mut scheduler = env.mock_index_build_scheduler(2);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let region_id = metadata.region_id;
|
||||
let file_purger = Arc::new(NoopFilePurger {});
|
||||
|
||||
// Prepare multiple files for testing
|
||||
let file_id1 = FileId::random();
|
||||
let file_id2 = FileId::random();
|
||||
let file_id3 = FileId::random();
|
||||
let file_id4 = FileId::random();
|
||||
let file_id5 = FileId::random();
|
||||
|
||||
let mut files = HashMap::new();
|
||||
for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
|
||||
files.insert(
|
||||
file_id,
|
||||
FileMeta {
|
||||
region_id,
|
||||
file_id,
|
||||
file_size: 100,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let version_control = mock_version_control(metadata, file_purger, files).await;
|
||||
|
||||
// Test 1: Basic scheduling
|
||||
let task1 =
|
||||
create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
|
||||
assert!(
|
||||
scheduler
|
||||
.schedule_build(&version_control, task1)
|
||||
.await
|
||||
.is_ok()
|
||||
);
|
||||
assert!(scheduler.region_status.contains_key(®ion_id));
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 1);
|
||||
assert!(status.building_files.contains(&file_id1));
|
||||
|
||||
// Test 2: Duplicate file scheduling (should be skipped)
|
||||
let task1_dup =
|
||||
create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
|
||||
scheduler
|
||||
.schedule_build(&version_control, task1_dup)
|
||||
.await
|
||||
.unwrap();
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 1); // Still only one
|
||||
|
||||
// Test 3: Fill up to limit (2 building tasks)
|
||||
let task2 =
|
||||
create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
|
||||
scheduler
|
||||
.schedule_build(&version_control, task2)
|
||||
.await
|
||||
.unwrap();
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 2); // Reached limit
|
||||
assert_eq!(status.pending_tasks.len(), 0);
|
||||
|
||||
// Test 4: Add tasks with different priorities to pending queue
|
||||
// Now all new tasks will be pending since we reached the limit
|
||||
let task3 =
|
||||
create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
|
||||
let task4 =
|
||||
create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
|
||||
.await;
|
||||
let task5 =
|
||||
create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
|
||||
|
||||
scheduler
|
||||
.schedule_build(&version_control, task3)
|
||||
.await
|
||||
.unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task4)
|
||||
.await
|
||||
.unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task5)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 2); // Still at limit
|
||||
assert_eq!(status.pending_tasks.len(), 3); // Three pending
|
||||
|
||||
// Test 5: Task completion triggers scheduling next highest priority task (Manual)
|
||||
scheduler.on_task_stopped(region_id, file_id1, &version_control);
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert!(!status.building_files.contains(&file_id1));
|
||||
assert_eq!(status.building_files.len(), 2); // Should schedule next task
|
||||
assert_eq!(status.pending_tasks.len(), 2); // One less pending
|
||||
// The highest priority task (Manual) should now be building
|
||||
assert!(status.building_files.contains(&file_id5));
|
||||
|
||||
// Test 6: Complete another task, should schedule SchemaChange (second highest priority)
|
||||
scheduler.on_task_stopped(region_id, file_id2, &version_control);
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 2);
|
||||
assert_eq!(status.pending_tasks.len(), 1); // One less pending
|
||||
assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building
|
||||
|
||||
// Test 7: Complete remaining tasks and cleanup
|
||||
scheduler.on_task_stopped(region_id, file_id5, &version_control);
|
||||
scheduler.on_task_stopped(region_id, file_id4, &version_control);
|
||||
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building
|
||||
assert_eq!(status.pending_tasks.len(), 0);
|
||||
assert!(status.building_files.contains(&file_id3));
|
||||
|
||||
scheduler.on_task_stopped(region_id, file_id3, &version_control);
|
||||
|
||||
// Region should be removed when all tasks complete
|
||||
assert!(!scheduler.region_status.contains_key(®ion_id));
|
||||
|
||||
// Test 8: Region dropped with pending tasks
|
||||
let task6 =
|
||||
create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
|
||||
let task7 =
|
||||
create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
|
||||
let task8 =
|
||||
create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
|
||||
|
||||
scheduler
|
||||
.schedule_build(&version_control, task6)
|
||||
.await
|
||||
.unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task7)
|
||||
.await
|
||||
.unwrap();
|
||||
scheduler
|
||||
.schedule_build(&version_control, task8)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(scheduler.region_status.contains_key(®ion_id));
|
||||
let status = scheduler.region_status.get(®ion_id).unwrap();
|
||||
assert_eq!(status.building_files.len(), 2);
|
||||
assert_eq!(status.pending_tasks.len(), 1);
|
||||
|
||||
scheduler.on_region_dropped(region_id).await;
|
||||
assert!(!scheduler.region_status.contains_key(®ion_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -768,6 +768,7 @@ mod tests {
|
||||
file_size: info.file_size,
|
||||
available_indexes: info.index_metadata.build_available_indexes(),
|
||||
index_file_size: info.index_metadata.file_size,
|
||||
index_file_id: None,
|
||||
num_row_groups: info.num_row_groups,
|
||||
num_rows: info.num_rows as u64,
|
||||
sequence: None,
|
||||
|
||||
@@ -111,10 +111,10 @@ impl SchedulerEnv {
|
||||
}
|
||||
|
||||
/// Creates a new index build scheduler.
|
||||
pub(crate) fn mock_index_build_scheduler(&self) -> IndexBuildScheduler {
|
||||
pub(crate) fn mock_index_build_scheduler(&self, files_limit: usize) -> IndexBuildScheduler {
|
||||
let scheduler = self.get_scheduler();
|
||||
|
||||
IndexBuildScheduler::new(scheduler)
|
||||
IndexBuildScheduler::new(scheduler, files_limit)
|
||||
}
|
||||
|
||||
/// Creates a new manifest context.
|
||||
|
||||
@@ -125,6 +125,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
|
||||
file_size: 0,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
|
||||
@@ -103,6 +103,7 @@ impl VersionControlBuilder {
|
||||
file_size: 0, // We don't care file size.
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
@@ -192,6 +193,7 @@ pub(crate) fn apply_edit(
|
||||
file_size: 0, // We don't care file size.
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_file_id: None,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
|
||||
@@ -509,7 +509,10 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
),
|
||||
purge_scheduler: self.purge_scheduler.clone(),
|
||||
write_buffer_manager: self.write_buffer_manager,
|
||||
index_build_scheduler: IndexBuildScheduler::new(self.index_build_job_pool),
|
||||
index_build_scheduler: IndexBuildScheduler::new(
|
||||
self.index_build_job_pool,
|
||||
self.config.max_background_index_builds,
|
||||
),
|
||||
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
|
||||
compaction_scheduler: CompactionScheduler::new(
|
||||
self.compact_job_pool,
|
||||
@@ -1059,6 +1062,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
BackgroundNotify::IndexBuildFinished(req) => {
|
||||
self.handle_index_build_finished(region_id, req).await
|
||||
}
|
||||
BackgroundNotify::IndexBuildStopped(req) => {
|
||||
self.handle_index_build_stopped(region_id, req).await
|
||||
}
|
||||
BackgroundNotify::IndexBuildFailed(req) => {
|
||||
self.handle_index_build_failed(region_id, req).await
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@ impl<S> RegionWorkerLoop<S> {
|
||||
self.flush_scheduler.on_region_closed(region_id);
|
||||
// Clean compaction status.
|
||||
self.compaction_scheduler.on_region_closed(region_id);
|
||||
// clean index build status.
|
||||
self.index_build_scheduler.on_region_closed(region_id).await;
|
||||
|
||||
info!("Region {} closed, worker: {}", region_id, self.id);
|
||||
|
||||
|
||||
@@ -83,6 +83,10 @@ where
|
||||
self.flush_scheduler.on_region_dropped(region_id);
|
||||
// Notifies compaction scheduler.
|
||||
self.compaction_scheduler.on_region_dropped(region_id);
|
||||
// notifies index build scheduler.
|
||||
self.index_build_scheduler
|
||||
.on_region_dropped(region_id)
|
||||
.await;
|
||||
|
||||
// Marks region version as dropped
|
||||
region
|
||||
|
||||
@@ -22,9 +22,12 @@ use store_api::region_request::RegionBuildIndexRequest;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::Result;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::request::{BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, OptionOutputTx};
|
||||
use crate::request::{
|
||||
BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
|
||||
};
|
||||
use crate::sst::file::{FileHandle, RegionFileId};
|
||||
use crate::sst::index::{
|
||||
IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
|
||||
@@ -172,7 +175,8 @@ impl<S> RegionWorkerLoop<S> {
|
||||
);
|
||||
let _ = self
|
||||
.index_build_scheduler
|
||||
.schedule_build(®ion.version_control, task);
|
||||
.schedule_build(®ion.version_control, task)
|
||||
.await;
|
||||
}
|
||||
// Wait for all index build tasks to finish and notify the caller.
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -203,11 +207,19 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
};
|
||||
|
||||
// Clean old puffin-related cache for all rebuilt files.
|
||||
let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
|
||||
for file_meta in &request.edit.files_to_add {
|
||||
let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
|
||||
cache_strategy.evict_puffin_cache(region_file_id).await;
|
||||
}
|
||||
|
||||
region.version_control.apply_edit(
|
||||
Some(request.edit.clone()),
|
||||
&[],
|
||||
region.file_purger.clone(),
|
||||
);
|
||||
|
||||
for file_meta in &request.edit.files_to_add {
|
||||
self.listener
|
||||
.on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
|
||||
@@ -221,6 +233,27 @@ impl<S> RegionWorkerLoop<S> {
|
||||
request: IndexBuildFailed,
|
||||
) {
|
||||
error!(request.err; "Index build failed for region: {}", region_id);
|
||||
// TODO(SNC123): Implement error handling logic after IndexBuildScheduler optimization.
|
||||
self.index_build_scheduler
|
||||
.on_failure(region_id, request.err.clone())
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_index_build_stopped(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
request: IndexBuildStopped,
|
||||
) {
|
||||
let Some(region) = self.regions.get_region(region_id) else {
|
||||
warn!(
|
||||
"Region not found for index build stopped, region_id: {}",
|
||||
region_id
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.index_build_scheduler.on_task_stopped(
|
||||
region_id,
|
||||
request.file_id,
|
||||
®ion.version_control,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,6 +142,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
self.flush_scheduler.on_region_truncated(region_id);
|
||||
// Notifies compaction scheduler.
|
||||
self.compaction_scheduler.on_region_truncated(region_id);
|
||||
// Notifies index build scheduler.
|
||||
self.index_build_scheduler
|
||||
.on_region_truncated(region_id)
|
||||
.await;
|
||||
|
||||
if let TruncateKind::All {
|
||||
truncated_entry_id,
|
||||
|
||||
@@ -47,6 +47,8 @@ pub struct ManifestSstEntry {
|
||||
pub region_sequence: RegionSeq,
|
||||
/// Engine-specific file identifier (string form).
|
||||
pub file_id: String,
|
||||
/// Engine-specific index file identifier (string form).
|
||||
pub index_file_id: Option<String>,
|
||||
/// SST level.
|
||||
pub level: u8,
|
||||
/// Full path of the SST file in object store.
|
||||
@@ -89,6 +91,7 @@ impl ManifestSstEntry {
|
||||
ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
|
||||
ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
|
||||
ColumnSchema::new("file_id", Ty::string_datatype(), false),
|
||||
ColumnSchema::new("index_file_id", Ty::string_datatype(), true),
|
||||
ColumnSchema::new("level", Ty::uint8_datatype(), false),
|
||||
ColumnSchema::new("file_path", Ty::string_datatype(), false),
|
||||
ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
|
||||
@@ -116,6 +119,7 @@ impl ManifestSstEntry {
|
||||
let region_groups = entries.iter().map(|e| e.region_group);
|
||||
let region_sequences = entries.iter().map(|e| e.region_sequence);
|
||||
let file_ids = entries.iter().map(|e| e.file_id.as_str());
|
||||
let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref());
|
||||
let levels = entries.iter().map(|e| e.level);
|
||||
let file_paths = entries.iter().map(|e| e.file_path.as_str());
|
||||
let file_sizes = entries.iter().map(|e| e.file_size);
|
||||
@@ -147,6 +151,7 @@ impl ManifestSstEntry {
|
||||
Arc::new(UInt8Array::from_iter_values(region_groups)),
|
||||
Arc::new(UInt32Array::from_iter_values(region_sequences)),
|
||||
Arc::new(StringArray::from_iter_values(file_ids)),
|
||||
Arc::new(StringArray::from_iter(index_file_ids)),
|
||||
Arc::new(UInt8Array::from_iter_values(levels)),
|
||||
Arc::new(StringArray::from_iter_values(file_paths)),
|
||||
Arc::new(UInt64Array::from_iter_values(file_sizes)),
|
||||
@@ -432,6 +437,7 @@ mod tests {
|
||||
region_group: region_group1,
|
||||
region_sequence: region_seq1,
|
||||
file_id: "f1".to_string(),
|
||||
index_file_id: None,
|
||||
level: 1,
|
||||
file_path: "/p1".to_string(),
|
||||
file_size: 100,
|
||||
@@ -455,6 +461,7 @@ mod tests {
|
||||
region_group: region_group2,
|
||||
region_sequence: region_seq2,
|
||||
file_id: "f2".to_string(),
|
||||
index_file_id: Some("idx".to_string()),
|
||||
level: 3,
|
||||
file_path: "/p2".to_string(),
|
||||
file_size: 200,
|
||||
@@ -541,16 +548,24 @@ mod tests {
|
||||
assert_eq!("f1", file_ids.value(0));
|
||||
assert_eq!("f2", file_ids.value(1));
|
||||
|
||||
let levels = batch
|
||||
let index_file_ids = batch
|
||||
.column(7)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
assert!(index_file_ids.is_null(0));
|
||||
assert_eq!("idx", index_file_ids.value(1));
|
||||
|
||||
let levels = batch
|
||||
.column(8)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt8Array>()
|
||||
.unwrap();
|
||||
assert_eq!(1, levels.value(0));
|
||||
assert_eq!(3, levels.value(1));
|
||||
|
||||
let file_paths = batch
|
||||
.column(8)
|
||||
.column(9)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
@@ -558,7 +573,7 @@ mod tests {
|
||||
assert_eq!("/p2", file_paths.value(1));
|
||||
|
||||
let file_sizes = batch
|
||||
.column(9)
|
||||
.column(10)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -566,7 +581,7 @@ mod tests {
|
||||
assert_eq!(200, file_sizes.value(1));
|
||||
|
||||
let index_file_paths = batch
|
||||
.column(10)
|
||||
.column(11)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
@@ -574,7 +589,7 @@ mod tests {
|
||||
assert_eq!("idx", index_file_paths.value(1));
|
||||
|
||||
let index_file_sizes = batch
|
||||
.column(11)
|
||||
.column(12)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -582,7 +597,7 @@ mod tests {
|
||||
assert_eq!(11, index_file_sizes.value(1));
|
||||
|
||||
let num_rows = batch
|
||||
.column(12)
|
||||
.column(13)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -590,7 +605,7 @@ mod tests {
|
||||
assert_eq!(20, num_rows.value(1));
|
||||
|
||||
let num_row_groups = batch
|
||||
.column(13)
|
||||
.column(14)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -598,7 +613,7 @@ mod tests {
|
||||
assert_eq!(4, num_row_groups.value(1));
|
||||
|
||||
let num_series = batch
|
||||
.column(14)
|
||||
.column(15)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -606,7 +621,7 @@ mod tests {
|
||||
assert!(num_series.is_null(1));
|
||||
|
||||
let min_ts = batch
|
||||
.column(15)
|
||||
.column(16)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap();
|
||||
@@ -614,7 +629,7 @@ mod tests {
|
||||
assert_eq!(5, min_ts.value(1));
|
||||
|
||||
let max_ts = batch
|
||||
.column(16)
|
||||
.column(17)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap();
|
||||
@@ -622,7 +637,7 @@ mod tests {
|
||||
assert_eq!(2_000_000, max_ts.value(1));
|
||||
|
||||
let sequences = batch
|
||||
.column(17)
|
||||
.column(18)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -630,7 +645,7 @@ mod tests {
|
||||
assert_eq!(9, sequences.value(1));
|
||||
|
||||
let origin_region_ids = batch
|
||||
.column(18)
|
||||
.column(19)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -638,7 +653,7 @@ mod tests {
|
||||
assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
|
||||
|
||||
let node_ids = batch
|
||||
.column(19)
|
||||
.column(20)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
@@ -646,7 +661,7 @@ mod tests {
|
||||
assert!(node_ids.is_null(1));
|
||||
|
||||
let visible = batch
|
||||
.column(20)
|
||||
.column(21)
|
||||
.as_any()
|
||||
.downcast_ref::<BooleanArray>()
|
||||
.unwrap();
|
||||
|
||||
@@ -10,6 +10,7 @@ DESC TABLE information_schema.ssts_manifest;
|
||||
| region_group | UInt8 | | NO | | FIELD |
|
||||
| region_sequence | UInt32 | | NO | | FIELD |
|
||||
| file_id | String | | NO | | FIELD |
|
||||
| index_file_id | String | | YES | | FIELD |
|
||||
| level | UInt8 | | NO | | FIELD |
|
||||
| file_path | String | | NO | | FIELD |
|
||||
| file_size | UInt64 | | NO | | FIELD |
|
||||
@@ -96,13 +97,13 @@ ADMIN FLUSH_TABLE('sst_case');
|
||||
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
|
||||
SELECT * FROM information_schema.ssts_manifest order by file_path;
|
||||
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
|
||||
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
|
||||
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
|
||||
@@ -164,15 +165,15 @@ ADMIN FLUSH_TABLE('sst_case');
|
||||
-- SQLNESS REPLACE (/public/\d+) /public/<TABLE_ID>
|
||||
SELECT * FROM information_schema.ssts_manifest order by file_path;
|
||||
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
| data/greptime/public/<TABLE_ID>/ |<NUM>|<NUM>|<NUM>|<NUM>|<NUM>| <UUID> | <UUID> |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>|<NUM>|<NUM>|<NUM>| <DATETIME> | <DATETIME> |<NUM>|<NUM>|<NUM>| true |
|
||||
+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+
|
||||
|
||||
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
|
||||
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
|
||||
|
||||
@@ -406,26 +406,27 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | ssts_index_meta | target_key | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_index_meta | target_type | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | file_id | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | file_path | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | file_size | 10 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_path | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_size | 12 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | level | 8 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | max_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
|
||||
| greptime | information_schema | ssts_manifest | min_ts | 16 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
|
||||
| greptime | information_schema | ssts_manifest | node_id | 20 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | num_row_groups | 14 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | num_rows | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | num_series | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | origin_region_id | 19 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | file_path | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | file_size | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_id | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_path | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | ssts_manifest | index_file_size | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | level | 9 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | max_ts | 18 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
|
||||
| greptime | information_schema | ssts_manifest | min_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |
|
||||
| greptime | information_schema | ssts_manifest | node_id | 21 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | num_row_groups | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | num_rows | 14 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | num_series | 16 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | origin_region_id | 20 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | region_group | 5 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | region_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | region_number | 4 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | region_sequence | 6 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | sequence | 18 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | sequence | 19 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | table_dir | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_manifest | table_id | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
|
||||
| greptime | information_schema | ssts_manifest | visible | 21 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | |
|
||||
| greptime | information_schema | ssts_manifest | visible | 22 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | |
|
||||
| greptime | information_schema | ssts_storage | file_path | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | ssts_storage | file_size | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | ssts_storage | last_modified_ms | 3 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |
|
||||
|
||||
Reference in New Issue
Block a user