diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index e1ce9572c4..d8ac5ce46b 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -494,36 +494,80 @@ impl Batch { /// Checks the batch is monotonic by timestamps. #[cfg(debug_assertions)] - pub(crate) fn check_monotonic(&self) -> bool { + pub(crate) fn check_monotonic(&self) -> Result<(), String> { + use std::cmp::Ordering; if self.timestamps_native().is_none() { - return true; + return Ok(()); } let timestamps = self.timestamps_native().unwrap(); let sequences = self.sequences.as_arrow().values(); - timestamps.windows(2).enumerate().all(|(i, window)| { + for (i, window) in timestamps.windows(2).enumerate() { let current = window[0]; let next = window[1]; let current_sequence = sequences[i]; let next_sequence = sequences[i + 1]; - if current == next { - current_sequence >= next_sequence - } else { - current < next + match current.cmp(&next) { + Ordering::Less => { + // The current timestamp is less than the next timestamp. + continue; + } + Ordering::Equal => { + // The current timestamp is equal to the next timestamp. + if current_sequence < next_sequence { + return Err(format!( + "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}", + current, next, current_sequence, next_sequence, i + )); + } + } + Ordering::Greater => { + // The current timestamp is greater than the next timestamp. + return Err(format!( + "timestamps are not monotonic: {} > {}, index: {}", + current, next, i + )); + } } - }) + } + + Ok(()) } - /// Returns true if the given batch is behind the current batch. + /// Returns Ok if the given batch is behind the current batch. #[cfg(debug_assertions)] - pub(crate) fn check_next_batch(&self, other: &Batch) -> bool { - // Checks the primary key and then the timestamp. - use std::cmp::Ordering; - self.primary_key() - .cmp(other.primary_key()) - .then_with(|| self.last_timestamp().cmp(&other.first_timestamp())) - .then_with(|| other.first_sequence().cmp(&self.last_sequence())) - <= Ordering::Equal + pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> { + // Checks the primary key + if self.primary_key() < other.primary_key() { + return Ok(()); + } + if self.primary_key() > other.primary_key() { + return Err(format!( + "primary key is not monotonic: {:?} > {:?}", + self.primary_key(), + other.primary_key() + )); + } + // Checks the timestamp. + if self.last_timestamp() < other.first_timestamp() { + return Ok(()); + } + if self.last_timestamp() > other.first_timestamp() { + return Err(format!( + "timestamps are not monotonic: {:?} > {:?}", + self.last_timestamp(), + other.first_timestamp() + )); + } + // Checks the sequence. + if self.last_sequence() >= other.first_sequence() { + return Ok(()); + } + Err(format!( + "sequences are not monotonic: {:?} < {:?}", + self.last_sequence(), + other.first_sequence() + )) } } @@ -552,31 +596,35 @@ impl BatchChecker { /// Returns true if the given batch is monotonic and behind /// the last batch. - pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool { - if !batch.check_monotonic() { - return false; - } + pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> { + batch.check_monotonic()?; if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) { if start > first { - return false; + return Err(format!( + "batch's first timestamp is before the start timestamp: {:?} > {:?}", + start, first + )); } } if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) { if end <= last { - return false; + return Err(format!( + "batch's last timestamp is after the end timestamp: {:?} <= {:?}", + end, last + )); } } // Checks the batch is behind the last batch. // Then Updates the last batch. - let is_behind = self + let res = self .last_batch .as_ref() .map(|last| last.check_next_batch(batch)) - .unwrap_or(true); + .unwrap_or(Ok(())); self.last_batch = Some(batch.clone()); - is_behind + res } /// Formats current batch and last batch for debug. @@ -615,15 +663,14 @@ impl BatchChecker { part_range: store_api::region_engine::PartitionRange, batch: &Batch, ) { - if !self.check_monotonic(batch) { - panic!( - "{}: batch is not sorted, region_id: {}, partition: {}, part_range: {:?}, {}", - scanner, - region_id, - partition, - part_range, - self.format_batch(batch), + if let Err(e) = self.check_monotonic(batch) { + let err_msg = format!( + "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}", + scanner, e, region_id, partition, part_range, ); + common_telemetry::error!("{err_msg}, {}", self.format_batch(batch)); + // Only print the number of row in the panic message. + panic!("{err_msg}, batch rows: {}", batch.num_rows()); } } } diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index 271f04c143..d8fc102da4 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -218,7 +218,7 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< } in &migrations { let procedure_id = - migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; + migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await; info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); procedure_ids.push(procedure_id); } @@ -228,7 +228,7 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< info!("Waits for migration: {migration:?}"); let region_id = migration.region_id.as_u64(); wait_condition_fn( - Duration::from_secs(120), + Duration::from_secs(240), || { let greptime = ctx.greptime.clone(); let procedure_id = procedure_id.to_string();