mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
chore: provide more info in check batch message (#4906)
* chore: provide more info in check message * chore: set timeout to 240s --------- Co-authored-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -494,36 +494,80 @@ impl Batch {
|
||||
|
||||
/// Checks the batch is monotonic by timestamps.
|
||||
#[cfg(debug_assertions)]
|
||||
pub(crate) fn check_monotonic(&self) -> bool {
|
||||
pub(crate) fn check_monotonic(&self) -> Result<(), String> {
|
||||
use std::cmp::Ordering;
|
||||
if self.timestamps_native().is_none() {
|
||||
return true;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timestamps = self.timestamps_native().unwrap();
|
||||
let sequences = self.sequences.as_arrow().values();
|
||||
timestamps.windows(2).enumerate().all(|(i, window)| {
|
||||
for (i, window) in timestamps.windows(2).enumerate() {
|
||||
let current = window[0];
|
||||
let next = window[1];
|
||||
let current_sequence = sequences[i];
|
||||
let next_sequence = sequences[i + 1];
|
||||
if current == next {
|
||||
current_sequence >= next_sequence
|
||||
} else {
|
||||
current < next
|
||||
match current.cmp(&next) {
|
||||
Ordering::Less => {
|
||||
// The current timestamp is less than the next timestamp.
|
||||
continue;
|
||||
}
|
||||
Ordering::Equal => {
|
||||
// The current timestamp is equal to the next timestamp.
|
||||
if current_sequence < next_sequence {
|
||||
return Err(format!(
|
||||
"sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
|
||||
current, next, current_sequence, next_sequence, i
|
||||
));
|
||||
}
|
||||
}
|
||||
Ordering::Greater => {
|
||||
// The current timestamp is greater than the next timestamp.
|
||||
return Err(format!(
|
||||
"timestamps are not monotonic: {} > {}, index: {}",
|
||||
current, next, i
|
||||
));
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the given batch is behind the current batch.
|
||||
/// Returns Ok if the given batch is behind the current batch.
|
||||
#[cfg(debug_assertions)]
|
||||
pub(crate) fn check_next_batch(&self, other: &Batch) -> bool {
|
||||
// Checks the primary key and then the timestamp.
|
||||
use std::cmp::Ordering;
|
||||
self.primary_key()
|
||||
.cmp(other.primary_key())
|
||||
.then_with(|| self.last_timestamp().cmp(&other.first_timestamp()))
|
||||
.then_with(|| other.first_sequence().cmp(&self.last_sequence()))
|
||||
<= Ordering::Equal
|
||||
pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
|
||||
// Checks the primary key
|
||||
if self.primary_key() < other.primary_key() {
|
||||
return Ok(());
|
||||
}
|
||||
if self.primary_key() > other.primary_key() {
|
||||
return Err(format!(
|
||||
"primary key is not monotonic: {:?} > {:?}",
|
||||
self.primary_key(),
|
||||
other.primary_key()
|
||||
));
|
||||
}
|
||||
// Checks the timestamp.
|
||||
if self.last_timestamp() < other.first_timestamp() {
|
||||
return Ok(());
|
||||
}
|
||||
if self.last_timestamp() > other.first_timestamp() {
|
||||
return Err(format!(
|
||||
"timestamps are not monotonic: {:?} > {:?}",
|
||||
self.last_timestamp(),
|
||||
other.first_timestamp()
|
||||
));
|
||||
}
|
||||
// Checks the sequence.
|
||||
if self.last_sequence() >= other.first_sequence() {
|
||||
return Ok(());
|
||||
}
|
||||
Err(format!(
|
||||
"sequences are not monotonic: {:?} < {:?}",
|
||||
self.last_sequence(),
|
||||
other.first_sequence()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -552,31 +596,35 @@ impl BatchChecker {
|
||||
|
||||
/// Returns true if the given batch is monotonic and behind
|
||||
/// the last batch.
|
||||
pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool {
|
||||
if !batch.check_monotonic() {
|
||||
return false;
|
||||
}
|
||||
pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
|
||||
batch.check_monotonic()?;
|
||||
|
||||
if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
|
||||
if start > first {
|
||||
return false;
|
||||
return Err(format!(
|
||||
"batch's first timestamp is before the start timestamp: {:?} > {:?}",
|
||||
start, first
|
||||
));
|
||||
}
|
||||
}
|
||||
if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
|
||||
if end <= last {
|
||||
return false;
|
||||
return Err(format!(
|
||||
"batch's last timestamp is after the end timestamp: {:?} <= {:?}",
|
||||
end, last
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Checks the batch is behind the last batch.
|
||||
// Then Updates the last batch.
|
||||
let is_behind = self
|
||||
let res = self
|
||||
.last_batch
|
||||
.as_ref()
|
||||
.map(|last| last.check_next_batch(batch))
|
||||
.unwrap_or(true);
|
||||
.unwrap_or(Ok(()));
|
||||
self.last_batch = Some(batch.clone());
|
||||
is_behind
|
||||
res
|
||||
}
|
||||
|
||||
/// Formats current batch and last batch for debug.
|
||||
@@ -615,15 +663,14 @@ impl BatchChecker {
|
||||
part_range: store_api::region_engine::PartitionRange,
|
||||
batch: &Batch,
|
||||
) {
|
||||
if !self.check_monotonic(batch) {
|
||||
panic!(
|
||||
"{}: batch is not sorted, region_id: {}, partition: {}, part_range: {:?}, {}",
|
||||
scanner,
|
||||
region_id,
|
||||
partition,
|
||||
part_range,
|
||||
self.format_batch(batch),
|
||||
if let Err(e) = self.check_monotonic(batch) {
|
||||
let err_msg = format!(
|
||||
"{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
|
||||
scanner, e, region_id, partition, part_range,
|
||||
);
|
||||
common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
|
||||
// Only print the number of row in the panic message.
|
||||
panic!("{err_msg}, batch rows: {}", batch.num_rows());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,7 +218,7 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
|
||||
} in &migrations
|
||||
{
|
||||
let procedure_id =
|
||||
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
|
||||
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await;
|
||||
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
|
||||
procedure_ids.push(procedure_id);
|
||||
}
|
||||
@@ -228,7 +228,7 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
|
||||
info!("Waits for migration: {migration:?}");
|
||||
let region_id = migration.region_id.as_u64();
|
||||
wait_condition_fn(
|
||||
Duration::from_secs(120),
|
||||
Duration::from_secs(240),
|
||||
|| {
|
||||
let greptime = ctx.greptime.clone();
|
||||
let procedure_id = procedure_id.to_string();
|
||||
|
||||
Reference in New Issue
Block a user