fix: ignore incomplete WAL entries during read (#6251)

* fix: ignore incomplete entry

* fix: fix unit tests
This commit is contained in:
Weny Xu
2025-06-04 19:16:42 +08:00
committed by GitHub
parent 7afb77fd35
commit 80c5af0ecf
6 changed files with 78 additions and 40 deletions

View File

@@ -169,7 +169,6 @@ fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry
Entry::Naive(NaiveEntry {
provider: Provider::Kafka(provider),
region_id,
// TODO(weny): should be the offset in the topic
entry_id: record.meta.entry_id,
data: record.data,
})
@@ -182,6 +181,7 @@ fn convert_to_multiple_entry(
) -> Entry {
let mut headers = Vec::with_capacity(records.len());
let mut parts = Vec::with_capacity(records.len());
let entry_id = records.last().map(|r| r.meta.entry_id).unwrap_or_default();
for record in records {
let header = match record.meta.tp {
@@ -197,8 +197,7 @@ fn convert_to_multiple_entry(
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider),
region_id,
// TODO(weny): should be the offset in the topic
entry_id: 0,
entry_id,
headers,
parts,
})
@@ -369,8 +368,7 @@ mod tests {
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider.clone()),
region_id,
// TODO(weny): always be 0.
entry_id: 0,
entry_id: 1,
headers: vec![MultiplePartHeader::First],
parts: vec![vec![1; 100]],
})
@@ -388,8 +386,7 @@ mod tests {
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider.clone()),
region_id,
// TODO(weny): always be 0.
entry_id: 0,
entry_id: 1,
headers: vec![MultiplePartHeader::Last],
parts: vec![vec![1; 100]],
})
@@ -411,8 +408,7 @@ mod tests {
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider),
region_id,
// TODO(weny): always be 0.
entry_id: 0,
entry_id: 1,
headers: vec![MultiplePartHeader::Middle(0)],
parts: vec![vec![1; 100]],
})

View File

@@ -657,13 +657,6 @@ pub enum Error {
unexpected_entry_id: u64,
},
#[snafu(display("Read the corrupted log entry, region_id: {}", region_id))]
CorruptedEntry {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to download file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
@@ -1106,7 +1099,6 @@ impl ErrorExt for Error {
| EncodeMemtable { .. }
| CreateDir { .. }
| ReadDataPart { .. }
| CorruptedEntry { .. }
| BuildEntry { .. }
| Metadata { .. }
| MitoManifestInfo { .. } => StatusCode::Internal,

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_stream::stream;
use common_telemetry::{debug, error};
use common_telemetry::{debug, error, warn};
use futures::future::join_all;
use snafu::OptionExt;
use store_api::logstore::entry::Entry;
@@ -133,11 +133,15 @@ impl WalEntryReader for WalEntryReceiver {
}
let stream = stream! {
let mut buffered_entry = None;
let mut buffered_entry: Option<Entry> = None;
while let Some(next_entry) = entry_receiver.recv().await {
match buffered_entry.take() {
Some(entry) => {
yield decode_raw_entry(entry);
if entry.is_complete() {
yield decode_raw_entry(entry);
} else {
warn!("Ignoring incomplete entry: {}", entry);
}
buffered_entry = Some(next_entry);
},
None => {
@@ -149,6 +153,8 @@ impl WalEntryReader for WalEntryReceiver {
// Ignores tail corrupted data.
if entry.is_complete() {
yield decode_raw_entry(entry);
} else {
warn!("Ignoring incomplete entry: {}", entry);
}
}
};
@@ -213,7 +219,6 @@ pub fn build_wal_entry_distributor_and_receivers(
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::{Mutation, OpType, WalEntry};
use futures::{stream, TryStreamExt};
@@ -385,6 +390,7 @@ mod tests {
#[tokio::test]
async fn test_tail_corrupted_stream() {
common_telemetry::init_default_ut_logging();
let mut entries = vec![];
let region1 = RegionId::new(1, 1);
let region1_expected_wal_entry = WalEntry {
@@ -484,6 +490,7 @@ mod tests {
#[tokio::test]
async fn test_part_corrupted_stream() {
common_telemetry::init_default_ut_logging();
let mut entries = vec![];
let region1 = RegionId::new(1, 1);
let region1_expected_wal_entry = WalEntry {
@@ -504,7 +511,7 @@ mod tests {
3,
));
entries.extend(vec![
// The corrupted data.
// The incomplete entry.
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id: region2,
@@ -512,6 +519,7 @@ mod tests {
headers: vec![MultiplePartHeader::First],
parts: vec![vec![1; 100]],
}),
// The incomplete entry.
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id: region2,
@@ -545,14 +553,14 @@ mod tests {
vec![(0, region1_expected_wal_entry)]
);
assert_matches!(
assert_eq!(
streams
.get_mut(1)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap_err(),
error::Error::CorruptedEntry { .. }
.unwrap(),
vec![]
);
}

View File

@@ -14,21 +14,25 @@
use api::v1::WalEntry;
use async_stream::stream;
use common_telemetry::tracing::warn;
use futures::StreamExt;
use object_store::Buffer;
use prost::Message;
use snafu::{ensure, ResultExt};
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result};
use crate::error::{DecodeWalSnafu, Result};
use crate::wal::raw_entry_reader::RawEntryReader;
use crate::wal::{EntryId, WalEntryStream};
/// Decodes the [Entry] into [WalEntry].
///
/// The caller must ensure the [Entry] is complete.
pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> {
let entry_id = raw_entry.entry_id();
let region_id = raw_entry.region_id();
ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id });
debug_assert!(raw_entry.is_complete());
let buffer = into_buffer(raw_entry);
let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
Ok((entry_id, wal_entry))
@@ -58,7 +62,7 @@ impl WalEntryReader for NoopEntryReader {
}
}
/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry].
/// A Reader reads the [Entry] from [RawEntryReader] and decodes [Entry] into [WalEntry].
pub struct LogStoreEntryReader<R> {
reader: R,
}
@@ -75,11 +79,15 @@ impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
let mut stream = reader.read(ns, start_id)?;
let stream = stream! {
let mut buffered_entry = None;
let mut buffered_entry: Option<Entry> = None;
while let Some(next_entry) = stream.next().await {
match buffered_entry.take() {
Some(entry) => {
yield decode_raw_entry(entry);
if entry.is_complete() {
yield decode_raw_entry(entry);
} else {
warn!("Ignoring incomplete entry: {}", entry);
}
buffered_entry = Some(next_entry?);
},
None => {
@@ -91,6 +99,8 @@ impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
// Ignores tail corrupted data.
if entry.is_complete() {
yield decode_raw_entry(entry);
} else {
warn!("Ignoring incomplete entry: {}", entry);
}
}
};
@@ -101,7 +111,6 @@ impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::{Mutation, OpType, WalEntry};
use futures::TryStreamExt;
@@ -110,7 +119,6 @@ mod tests {
use store_api::logstore::provider::Provider;
use store_api::storage::RegionId;
use crate::error;
use crate::test_util::wal_util::MockRawEntryStream;
use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
@@ -141,7 +149,7 @@ mod tests {
headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
parts,
}),
// The tail corrupted data.
// The tail incomplete entry.
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id: RegionId::new(1, 1),
@@ -171,6 +179,7 @@ mod tests {
let provider = Provider::kafka_provider("my_topic".to_string());
let raw_entry_stream = MockRawEntryStream {
entries: vec![
// The incomplete entry.
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id: RegionId::new(1, 1),
@@ -189,12 +198,12 @@ mod tests {
};
let mut reader = LogStoreEntryReader::new(raw_entry_stream);
let err = reader
let entries = reader
.read(&provider, 0)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap_err();
assert_matches!(err, error::Error::CorruptedEntry { .. });
.unwrap();
assert!(entries.is_empty());
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::mem::size_of;
use crate::logstore::provider::Provider;
@@ -30,6 +31,15 @@ pub enum Entry {
MultiplePart(MultiplePartEntry),
}
impl Display for Entry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Entry::Naive(entry) => write!(f, "{}", entry),
Entry::MultiplePart(entry) => write!(f, "{}", entry),
}
}
}
impl Entry {
/// Into [NaiveEntry] if it's type of [Entry::Naive].
pub fn into_naive_entry(self) -> Option<NaiveEntry> {
@@ -56,6 +66,16 @@ pub struct NaiveEntry {
pub data: Vec<u8>,
}
impl Display for NaiveEntry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"NaiveEntry(provider={:?}, region_id={}, entry_id={})",
self.provider, self.region_id, self.entry_id,
)
}
}
impl NaiveEntry {
/// Estimates the persisted size of the entry.
fn estimated_size(&self) -> usize {
@@ -79,6 +99,19 @@ pub struct MultiplePartEntry {
pub parts: Vec<Vec<u8>>,
}
impl Display for MultiplePartEntry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MultiplePartEntry(provider={:?}, region_id={}, entry_id={}, len={})",
self.provider,
self.region_id,
self.entry_id,
self.parts.len()
)
}
}
impl MultiplePartEntry {
fn is_complete(&self) -> bool {
self.headers.contains(&MultiplePartHeader::First)

View File

@@ -69,10 +69,10 @@ impl Display for Provider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Provider::RaftEngine(provider) => {
write!(f, "region: {}", RegionId::from_u64(provider.id))
write!(f, "RaftEngine(region={})", RegionId::from_u64(provider.id))
}
Provider::Kafka(provider) => write!(f, "topic: {}", provider.topic),
Provider::Noop => write!(f, "noop"),
Provider::Kafka(provider) => write!(f, "Kafka(topic={})", provider.topic),
Provider::Noop => write!(f, "Noop"),
}
}
}