diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 0f4e794a73..fd38766954 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -169,7 +169,6 @@ fn convert_to_naive_entry(provider: Arc, record: Record) -> Entry Entry::Naive(NaiveEntry { provider: Provider::Kafka(provider), region_id, - // TODO(weny): should be the offset in the topic entry_id: record.meta.entry_id, data: record.data, }) @@ -182,6 +181,7 @@ fn convert_to_multiple_entry( ) -> Entry { let mut headers = Vec::with_capacity(records.len()); let mut parts = Vec::with_capacity(records.len()); + let entry_id = records.last().map(|r| r.meta.entry_id).unwrap_or_default(); for record in records { let header = match record.meta.tp { @@ -197,8 +197,7 @@ fn convert_to_multiple_entry( Entry::MultiplePart(MultiplePartEntry { provider: Provider::Kafka(provider), region_id, - // TODO(weny): should be the offset in the topic - entry_id: 0, + entry_id, headers, parts, }) @@ -369,8 +368,7 @@ mod tests { Entry::MultiplePart(MultiplePartEntry { provider: Provider::Kafka(provider.clone()), region_id, - // TODO(weny): always be 0. - entry_id: 0, + entry_id: 1, headers: vec![MultiplePartHeader::First], parts: vec![vec![1; 100]], }) @@ -388,8 +386,7 @@ mod tests { Entry::MultiplePart(MultiplePartEntry { provider: Provider::Kafka(provider.clone()), region_id, - // TODO(weny): always be 0. - entry_id: 0, + entry_id: 1, headers: vec![MultiplePartHeader::Last], parts: vec![vec![1; 100]], }) @@ -411,8 +408,7 @@ mod tests { Entry::MultiplePart(MultiplePartEntry { provider: Provider::Kafka(provider), region_id, - // TODO(weny): always be 0. - entry_id: 0, + entry_id: 1, headers: vec![MultiplePartHeader::Middle(0)], parts: vec![vec![1; 100]], }) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index dd09e164a6..5685a4453a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -657,13 +657,6 @@ pub enum Error { unexpected_entry_id: u64, }, - #[snafu(display("Read the corrupted log entry, region_id: {}", region_id))] - CorruptedEntry { - region_id: RegionId, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display( "Failed to download file, region_id: {}, file_id: {}, file_type: {:?}", region_id, @@ -1106,7 +1099,6 @@ impl ErrorExt for Error { | EncodeMemtable { .. } | CreateDir { .. } | ReadDataPart { .. } - | CorruptedEntry { .. } | BuildEntry { .. } | Metadata { .. } | MitoManifestInfo { .. } => StatusCode::Internal, diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index 2f7d08e0b8..70d040c527 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_stream::stream; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, warn}; use futures::future::join_all; use snafu::OptionExt; use store_api::logstore::entry::Entry; @@ -133,11 +133,15 @@ impl WalEntryReader for WalEntryReceiver { } let stream = stream! { - let mut buffered_entry = None; + let mut buffered_entry: Option = None; while let Some(next_entry) = entry_receiver.recv().await { match buffered_entry.take() { Some(entry) => { - yield decode_raw_entry(entry); + if entry.is_complete() { + yield decode_raw_entry(entry); + } else { + warn!("Ignoring incomplete entry: {}", entry); + } buffered_entry = Some(next_entry); }, None => { @@ -149,6 +153,8 @@ impl WalEntryReader for WalEntryReceiver { // Ignores tail corrupted data. if entry.is_complete() { yield decode_raw_entry(entry); + } else { + warn!("Ignoring incomplete entry: {}", entry); } } }; @@ -213,7 +219,6 @@ pub fn build_wal_entry_distributor_and_receivers( #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use api::v1::{Mutation, OpType, WalEntry}; use futures::{stream, TryStreamExt}; @@ -385,6 +390,7 @@ mod tests { #[tokio::test] async fn test_tail_corrupted_stream() { + common_telemetry::init_default_ut_logging(); let mut entries = vec![]; let region1 = RegionId::new(1, 1); let region1_expected_wal_entry = WalEntry { @@ -484,6 +490,7 @@ mod tests { #[tokio::test] async fn test_part_corrupted_stream() { + common_telemetry::init_default_ut_logging(); let mut entries = vec![]; let region1 = RegionId::new(1, 1); let region1_expected_wal_entry = WalEntry { @@ -504,7 +511,7 @@ mod tests { 3, )); entries.extend(vec![ - // The corrupted data. + // The incomplete entry. Entry::MultiplePart(MultiplePartEntry { provider: provider.clone(), region_id: region2, @@ -512,6 +519,7 @@ mod tests { headers: vec![MultiplePartHeader::First], parts: vec![vec![1; 100]], }), + // The incomplete entry. Entry::MultiplePart(MultiplePartEntry { provider: provider.clone(), region_id: region2, @@ -545,14 +553,14 @@ mod tests { vec![(0, region1_expected_wal_entry)] ); - assert_matches!( + assert_eq!( streams .get_mut(1) .unwrap() .try_collect::>() .await - .unwrap_err(), - error::Error::CorruptedEntry { .. } + .unwrap(), + vec![] ); } diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index 6c8c30aedf..272aa25173 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -14,21 +14,25 @@ use api::v1::WalEntry; use async_stream::stream; +use common_telemetry::tracing::warn; use futures::StreamExt; use object_store::Buffer; use prost::Message; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; -use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result}; +use crate::error::{DecodeWalSnafu, Result}; use crate::wal::raw_entry_reader::RawEntryReader; use crate::wal::{EntryId, WalEntryStream}; +/// Decodes the [Entry] into [WalEntry]. +/// +/// The caller must ensure the [Entry] is complete. pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> { let entry_id = raw_entry.entry_id(); let region_id = raw_entry.region_id(); - ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id }); + debug_assert!(raw_entry.is_complete()); let buffer = into_buffer(raw_entry); let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?; Ok((entry_id, wal_entry)) @@ -58,7 +62,7 @@ impl WalEntryReader for NoopEntryReader { } } -/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry]. +/// A Reader reads the [Entry] from [RawEntryReader] and decodes [Entry] into [WalEntry]. pub struct LogStoreEntryReader { reader: R, } @@ -75,11 +79,15 @@ impl WalEntryReader for LogStoreEntryReader { let mut stream = reader.read(ns, start_id)?; let stream = stream! { - let mut buffered_entry = None; + let mut buffered_entry: Option = None; while let Some(next_entry) = stream.next().await { match buffered_entry.take() { Some(entry) => { - yield decode_raw_entry(entry); + if entry.is_complete() { + yield decode_raw_entry(entry); + } else { + warn!("Ignoring incomplete entry: {}", entry); + } buffered_entry = Some(next_entry?); }, None => { @@ -91,6 +99,8 @@ impl WalEntryReader for LogStoreEntryReader { // Ignores tail corrupted data. if entry.is_complete() { yield decode_raw_entry(entry); + } else { + warn!("Ignoring incomplete entry: {}", entry); } } }; @@ -101,7 +111,6 @@ impl WalEntryReader for LogStoreEntryReader { #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use api::v1::{Mutation, OpType, WalEntry}; use futures::TryStreamExt; @@ -110,7 +119,6 @@ mod tests { use store_api::logstore::provider::Provider; use store_api::storage::RegionId; - use crate::error; use crate::test_util::wal_util::MockRawEntryStream; use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader}; @@ -141,7 +149,7 @@ mod tests { headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last], parts, }), - // The tail corrupted data. + // The tail incomplete entry. Entry::MultiplePart(MultiplePartEntry { provider: provider.clone(), region_id: RegionId::new(1, 1), @@ -171,6 +179,7 @@ mod tests { let provider = Provider::kafka_provider("my_topic".to_string()); let raw_entry_stream = MockRawEntryStream { entries: vec![ + // The incomplete entry. Entry::MultiplePart(MultiplePartEntry { provider: provider.clone(), region_id: RegionId::new(1, 1), @@ -189,12 +198,12 @@ mod tests { }; let mut reader = LogStoreEntryReader::new(raw_entry_stream); - let err = reader + let entries = reader .read(&provider, 0) .unwrap() .try_collect::>() .await - .unwrap_err(); - assert_matches!(err, error::Error::CorruptedEntry { .. }); + .unwrap(); + assert!(entries.is_empty()); } } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index f4cfe18b3e..30ee4b3efe 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Display, Formatter}; use std::mem::size_of; use crate::logstore::provider::Provider; @@ -30,6 +31,15 @@ pub enum Entry { MultiplePart(MultiplePartEntry), } +impl Display for Entry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Entry::Naive(entry) => write!(f, "{}", entry), + Entry::MultiplePart(entry) => write!(f, "{}", entry), + } + } +} + impl Entry { /// Into [NaiveEntry] if it's type of [Entry::Naive]. pub fn into_naive_entry(self) -> Option { @@ -56,6 +66,16 @@ pub struct NaiveEntry { pub data: Vec, } +impl Display for NaiveEntry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "NaiveEntry(provider={:?}, region_id={}, entry_id={})", + self.provider, self.region_id, self.entry_id, + ) + } +} + impl NaiveEntry { /// Estimates the persisted size of the entry. fn estimated_size(&self) -> usize { @@ -79,6 +99,19 @@ pub struct MultiplePartEntry { pub parts: Vec>, } +impl Display for MultiplePartEntry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MultiplePartEntry(provider={:?}, region_id={}, entry_id={}, len={})", + self.provider, + self.region_id, + self.entry_id, + self.parts.len() + ) + } +} + impl MultiplePartEntry { fn is_complete(&self) -> bool { self.headers.contains(&MultiplePartHeader::First) diff --git a/src/store-api/src/logstore/provider.rs b/src/store-api/src/logstore/provider.rs index 11ec580f79..1c62f7479a 100644 --- a/src/store-api/src/logstore/provider.rs +++ b/src/store-api/src/logstore/provider.rs @@ -69,10 +69,10 @@ impl Display for Provider { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self { Provider::RaftEngine(provider) => { - write!(f, "region: {}", RegionId::from_u64(provider.id)) + write!(f, "RaftEngine(region={})", RegionId::from_u64(provider.id)) } - Provider::Kafka(provider) => write!(f, "topic: {}", provider.topic), - Provider::Noop => write!(f, "noop"), + Provider::Kafka(provider) => write!(f, "Kafka(topic={})", provider.topic), + Provider::Noop => write!(f, "Noop"), } } }