feat: update log

This commit is contained in:
evenyag
2024-11-05 17:47:32 +08:00
parent 8bf795d88c
commit f44862aaac
2 changed files with 19 additions and 17 deletions

View File

@@ -501,12 +501,12 @@ impl Batch {
}
let timestamps = self.timestamps_native().unwrap();
let sequences = self.sequences.as_arrow().values();
// let sequences = self.sequences.as_arrow().values();
for (i, window) in timestamps.windows(2).enumerate() {
let current = window[0];
let next = window[1];
let current_sequence = sequences[i];
let next_sequence = sequences[i + 1];
// let current_sequence = sequences[i];
// let next_sequence = sequences[i + 1];
match current.cmp(&next) {
Ordering::Less => {
// The current timestamp is less than the next timestamp.
@@ -514,12 +514,12 @@ impl Batch {
}
Ordering::Equal => {
// The current timestamp is equal to the next timestamp.
if current_sequence < next_sequence {
return Err(format!(
"sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
current, next, current_sequence, next_sequence, i
));
}
// if current_sequence < next_sequence {
// return Err(format!(
// "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
// current, next, current_sequence, next_sequence, i
// ));
// }
}
Ordering::Greater => {
// The current timestamp is greater than the next timestamp.
@@ -560,14 +560,15 @@ impl Batch {
));
}
// Checks the sequence.
if self.last_sequence() >= other.first_sequence() {
return Ok(());
}
Err(format!(
"sequences are not monotonic: {:?} < {:?}",
self.last_sequence(),
other.first_sequence()
))
Ok(())
// if self.last_sequence() >= other.first_sequence() {
// return Ok(());
// }
// Err(format!(
// "sequences are not monotonic: {:?} < {:?}",
// self.last_sequence(),
// other.first_sequence()
// ))
}
}

View File

@@ -517,6 +517,7 @@ impl PartSortStream {
}
self.buffer.push(batch);
// keep polling until boundary(a empty RecordBatch) is reached
continue;
}