fix: do not remove deletion markers when window time range overlaps (#3773)

* fix: do not remove deletion markers when window time range overlaps

* chore: fix some minor issues; add compaction test

* chore: add more test

* fix: nitpick master's nitpick
This commit is contained in:
Lei, HUANG
2024-04-23 16:05:16 +08:00
committed by GitHub
parent f764fd5847
commit 778e195f07
3 changed files with 342 additions and 18 deletions

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -84,35 +85,41 @@ impl TwcsPicker {
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
time_windows: &BTreeMap<i64, Vec<FileHandle>>,
time_windows: &BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
let files_in_window = &files.files;
// we only remove deletion markers once no file in current window overlaps with any other window.
let filter_deleted = !files.overlapping;
if let Some(active_window) = active_window
&& *window == active_window
{
if files.len() > self.max_active_window_files {
if files_in_window.len() > self.max_active_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
inputs: files.clone(),
inputs: files_in_window.clone(),
filter_deleted,
});
} else {
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
}
} else {
// not active writing window
if files.len() > self.max_inactive_window_files {
if files_in_window.len() > self.max_inactive_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
inputs: files.clone(),
inputs: files_in_window.clone(),
filter_deleted,
});
} else {
debug!(
"No enough files, current: {}, max_inactive_window_files: {}",
files.len(),
files_in_window.len(),
self.max_inactive_window_files
)
}
@@ -195,24 +202,99 @@ impl Picker for TwcsPicker {
}
}
struct Window {
start: Timestamp,
end: Timestamp,
files: Vec<FileHandle>,
time_window: i64,
overlapping: bool,
}
impl Window {
/// Creates a new [Window] with given file.
fn new_with_file(file: FileHandle) -> Self {
let (start, end) = file.time_range();
Self {
start,
end,
files: vec![file],
time_window: 0,
overlapping: false,
}
}
/// Returns the time range of all files in current window (inclusive).
fn range(&self) -> (Timestamp, Timestamp) {
(self.start, self.end)
}
/// Adds a new file to window and updates time range.
fn add_file(&mut self, file: FileHandle) {
let (start, end) = file.time_range();
self.start = self.start.min(start);
self.end = self.end.max(end);
self.files.push(file);
}
}
/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
fn assign_to_windows<'a>(
files: impl Iterator<Item = &'a FileHandle>,
time_window_size: i64,
) -> BTreeMap<i64, Vec<FileHandle>> {
let mut windows: BTreeMap<i64, Vec<FileHandle>> = BTreeMap::new();
) -> BTreeMap<i64, Window> {
let mut windows: HashMap<i64, Window> = HashMap::new();
// Iterates all files and assign to time windows according to max timestamp
for file in files {
let (_, end) = file.time_range();
for f in files {
let (_, end) = f.time_range();
let time_window = end
.convert_to(TimeUnit::Second)
.unwrap()
.value()
.align_to_ceil_by_bucket(time_window_size)
.unwrap_or(i64::MIN);
windows.entry(time_window).or_default().push(file.clone());
match windows.entry(time_window) {
Entry::Occupied(mut e) => {
e.get_mut().add_file(f.clone());
}
Entry::Vacant(e) => {
let mut window = Window::new_with_file(f.clone());
window.time_window = time_window;
e.insert(window);
}
}
}
windows
if windows.is_empty() {
return BTreeMap::new();
}
let mut windows = windows.into_values().collect::<Vec<_>>();
windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
for idx in 1..windows.len() {
let next_range = windows[idx].range();
if overlaps(&current_range, &next_range) {
windows[idx - 1].overlapping = true;
windows[idx].overlapping = true;
}
current_range = (
current_range.0.min(next_range.0),
current_range.1.max(next_range.1),
);
}
windows.into_iter().map(|w| (w.time_window, w)).collect()
}
/// Checks if two inclusive timestamp ranges overlap with each other.
fn overlaps(l: &(Timestamp, Timestamp), r: &(Timestamp, Timestamp)) -> bool {
let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
let (_, l_end) = l;
let (r_start, _) = r;
r_start <= l_end
}
/// Finds the latest active writing window among all files.
@@ -344,6 +426,7 @@ impl TwcsCompactionTask {
sst_layer.clone(),
&output.inputs,
append_mode,
output.filter_deleted,
)
.await?;
let file_meta_opt = sst_layer
@@ -572,6 +655,8 @@ pub(crate) struct CompactionOutput {
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
/// Whether to remove deletion markers.
pub filter_deleted: bool,
}
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
@@ -580,10 +665,12 @@ async fn build_sst_reader(
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
append_mode: bool,
filter_deleted: bool,
) -> error::Result<BoxedBatchReader> {
let scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?)
.with_files(inputs.to_vec())
.with_append_mode(append_mode)
.with_filter_deleted(filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true);
SeqScan::new(scan_input).build_reader().await
@@ -642,7 +729,7 @@ mod tests {
.iter(),
3,
);
assert_eq!(5, windows.get(&0).unwrap().len());
assert_eq!(5, windows.get(&0).unwrap().files.len());
let files = [FileId::random(); 3];
let windows = assign_to_windows(
@@ -656,15 +743,148 @@ mod tests {
);
assert_eq!(
files[0],
windows.get(&0).unwrap().first().unwrap().file_id()
windows.get(&0).unwrap().files.first().unwrap().file_id()
);
assert_eq!(
files[1],
windows.get(&3).unwrap().first().unwrap().file_id()
windows.get(&3).unwrap().files.first().unwrap().file_id()
);
assert_eq!(
files[2],
windows.get(&12).unwrap().first().unwrap().file_id()
windows.get(&12).unwrap().files.first().unwrap().file_id()
);
}
/// (Window value, overlapping, files' time ranges in window)
type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
fn check_assign_to_windows_with_overlapping(
file_time_ranges: &[(i64, i64)],
time_window: i64,
expected_files: &[ExpectedWindowSpec],
) {
let files: Vec<_> = (0..file_time_ranges.len())
.map(|_| FileId::random())
.collect();
let file_handles = files
.iter()
.zip(file_time_ranges.iter())
.map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
.collect::<Vec<_>>();
let windows = assign_to_windows(file_handles.iter(), time_window);
for (expected_window, overlapping, window_files) in expected_files {
let actual_window = windows.get(expected_window).unwrap();
assert_eq!(*overlapping, actual_window.overlapping);
let mut file_ranges = actual_window
.files
.iter()
.map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
})
.collect::<Vec<_>>();
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
assert_eq!(window_files, &file_ranges);
}
}
#[test]
fn test_assign_to_windows_with_overlapping() {
check_assign_to_windows_with_overlapping(
&[(0, 999), (1000, 1999), (2000, 2999)],
2,
&[
(0, false, vec![(0, 999)]),
(2, false, vec![(1000, 1999), (2000, 2999)]),
],
);
check_assign_to_windows_with_overlapping(
&[(0, 1), (0, 999), (100, 2999)],
2,
&[
(0, true, vec![(0, 1), (0, 999)]),
(2, true, vec![(100, 2999)]),
],
);
check_assign_to_windows_with_overlapping(
&[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
2,
&[
(0, false, vec![(0, 999)]),
(2, false, vec![(1000, 1999), (2000, 2999)]),
(4, false, vec![(3000, 3999)]),
],
);
check_assign_to_windows_with_overlapping(
&[
(0, 999),
(1000, 1999),
(2000, 2999),
(3000, 3999),
(0, 3999),
],
2,
&[
(0, true, vec![(0, 999)]),
(2, true, vec![(1000, 1999), (2000, 2999)]),
(4, true, vec![(0, 3999), (3000, 3999)]),
],
);
check_assign_to_windows_with_overlapping(
&[
(0, 999),
(1000, 1999),
(2000, 2999),
(3000, 3999),
(1999, 3999),
],
2,
&[
(0, false, vec![(0, 999)]),
(2, true, vec![(1000, 1999), (2000, 2999)]),
(4, true, vec![(1999, 3999), (3000, 3999)]),
],
);
check_assign_to_windows_with_overlapping(
&[
(0, 999), // window 0
(1000, 1999), // window 2
(2000, 2999), // window 2
(3000, 3999), // window 4
(2999, 3999), // window 4
],
2,
&[
// window 2 overlaps with window 4
(0, false, vec![(0, 999)]),
(2, true, vec![(1000, 1999), (2000, 2999)]),
(4, true, vec![(2999, 3999), (3000, 3999)]),
],
);
check_assign_to_windows_with_overlapping(
&[
(0, 999), // window 0
(1000, 1999), // window 2
(2000, 2999), // window 2
(3000, 3999), // window 4
(0, 1000), // // window 2
],
2,
&[
// only window 0 overlaps with window 2.
(0, true, vec![(0, 999)]),
(2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
(4, false, vec![(3000, 3999)]),
],
);
}

View File

@@ -149,6 +149,102 @@ async fn test_compaction_region() {
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 4 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 3600..10800).await; // window 10800
delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
let result = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((3600..10800).map(|i| { i * 1000 }).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_compaction_region_with_overlapping_delete_all() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 4 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800
let result = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
4,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert!(vec.is_empty());
}
// For issue https://github.com/GreptimeTeam/greptimedb/issues/3633
#[tokio::test]
async fn test_readonly_during_compaction() {

View File

@@ -396,6 +396,7 @@ pub struct CreateRequestBuilder {
primary_key: Option<Vec<ColumnId>>,
all_not_null: bool,
engine: String,
ts_type: ConcreteDataType,
}
impl Default for CreateRequestBuilder {
@@ -408,6 +409,7 @@ impl Default for CreateRequestBuilder {
primary_key: None,
all_not_null: false,
engine: MITO_ENGINE_NAME.to_string(),
ts_type: ConcreteDataType::timestamp_millisecond_datatype(),
}
}
}
@@ -454,6 +456,12 @@ impl CreateRequestBuilder {
self
}
#[must_use]
pub fn with_ts_type(mut self, ty: ConcreteDataType) -> Self {
self.ts_type = ty;
self
}
pub fn build(&self) -> RegionCreateRequest {
let mut column_id = 0;
let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1);
@@ -487,7 +495,7 @@ impl CreateRequestBuilder {
column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
self.ts_type.clone(),
// Time index is always not null.
false,
),