From 1a06a7be88d4e18e563c8e29b78413d9b1f60e5b Mon Sep 17 00:00:00 2001 From: Jiachun Feng Date: Tue, 2 Aug 2022 17:52:00 +0800 Subject: [PATCH] feat: decode WAL entry (#123) * feat: decode wal entry * chore: todo message --- src/storage/src/codec.rs | 2 +- src/storage/src/error.rs | 10 +++- src/storage/src/region/writer.rs | 2 +- src/storage/src/wal.rs | 97 ++++++++++++++++++++++++++------ src/storage/src/write_batch.rs | 82 ++++++++++++++------------- 5 files changed, 135 insertions(+), 58 deletions(-) diff --git a/src/storage/src/codec.rs b/src/storage/src/codec.rs index 3a99b4a85e..691c69aa3d 100644 --- a/src/storage/src/codec.rs +++ b/src/storage/src/codec.rs @@ -15,5 +15,5 @@ pub trait Decoder { type Error: ErrorExt; /// Decodes a message from the bytes buffer. - fn decode(&self, src: &[u8]) -> Result, Self::Error>; + fn decode(&self, src: &[u8]) -> Result; } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index ae02e4119f..cba5e67fa7 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -203,6 +203,13 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("WAL data corrupted, name: {}, message: {}", name, message))] + WalDataCorrupted { + name: String, + message: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -227,7 +234,8 @@ impl ErrorExt for Error { | DecodeRegionMetaActionList { .. } | Readline { .. } | InvalidParquetSchema { .. } - | SequenceColumnNotFound { .. } => StatusCode::Unexpected, + | SequenceColumnNotFound { .. } + | WalDataCorrupted { .. } => StatusCode::Unexpected, FlushIo { .. } | InitBackend { .. } diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 773284dc79..1d98bd6ff1 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -216,7 +216,7 @@ impl WriterInner { let mut stream = writer_ctx.wal.read_from_wal(start_sequence).await?; - while let Some((_header, _write_batch)) = stream.try_next().await? { + while let Some((_seq_num, _header, _write_batch)) = stream.try_next().await? { // TODO(yingwen): [open_region] 1. Split write batch and insert into memtables. 2. Need to update // (recover) committed_sequence. } diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index e4184cffa7..01f1c5343c 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use common_error::prelude::BoxedError; use futures::{stream, Stream, TryStreamExt}; use prost::Message; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use store_api::{ logstore::{entry::Entry, namespace::Namespace, AppendResponse, LogStore}, storage::SequenceNumber, @@ -14,7 +14,10 @@ use crate::{ codec::{Decoder, Encoder}, error::{self, Error, Result}, proto::{self, PayloadType, WalHeader}, - write_batch::{codec::WriteBatchArrowEncoder, WriteBatch}, + write_batch::{ + codec::{WriteBatchArrowDecoder, WriteBatchArrowEncoder}, + WriteBatch, + }, }; #[derive(Debug)] @@ -23,8 +26,9 @@ pub struct Wal { store: Arc, } -pub type WriteBatchStream<'a> = - Pin> + Send + 'a>>; +pub type WriteBatchStream<'a> = Pin< + Box)>> + Send + 'a>, +>; // wal should be cheap to clone impl Clone for Wal { @@ -111,7 +115,7 @@ impl Wal { source: BoxedError::new(e), }) .and_then(|entries| async { - let iter = entries.into_iter().map(decode_entry); + let iter = entries.into_iter().map(|x| self.decode_entry(x)); Ok(stream::iter(iter)) }) @@ -121,24 +125,63 @@ impl Wal { } async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> { - let ns = self.namespace.clone(); let mut e = self.store.entry(bytes); e.set_id(seq); let res = self .store - .append(&ns, e) + .append(&self.namespace, e) .await .map_err(BoxedError::new) .context(error::WriteWalSnafu { name: self.name() })?; Ok((res.entry_id(), res.offset())) } -} -fn decode_entry(_entry: E) -> Result<(WalHeader, WriteBatch)> { - // TODO(yingwen): [open_region] Decode entry into write batch. - unimplemented!() + fn decode_entry( + &self, + entry: E, + ) -> Result<(SequenceNumber, WalHeader, Option)> { + let seq_num = entry.id(); + let input = entry.data(); + + let wal_header_decoder = WalHeaderDecoder {}; + let (data_pos, mut header) = wal_header_decoder.decode(input)?; + + ensure!( + data_pos <= input.len(), + error::WalDataCorruptedSnafu { + name: self.name(), + message: format!( + "Not enough input buffer, expected data position={}, actual buffer length={}", + data_pos, + input.len() + ), + } + ); + + match PayloadType::from_i32(header.payload_type) { + Some(PayloadType::None) => Ok((seq_num, header, None)), + Some(PayloadType::WriteBatchArrow) => { + let mutation_extras = std::mem::take(&mut header.mutation_extras); + let decoder = WriteBatchArrowDecoder::new(mutation_extras); + let write_batch = decoder + .decode(&input[data_pos..]) + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { name: self.name() })?; + + Ok((seq_num, header, Some(write_batch))) + } + Some(PayloadType::WriteBatchProto) => { + todo!("protobuf decoder") + } + _ => error::WalDataCorruptedSnafu { + name: self.name(), + message: format!("invalid payload type={}", header.payload_type), + } + .fail(), + } + } } pub enum Payload<'a> { @@ -177,7 +220,7 @@ impl Decoder for WalHeaderDecoder { type Item = (usize, WalHeader); type Error = Error; - fn decode(&self, src: &[u8]) -> Result> { + fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> { let mut data_pos = prost::decode_length_delimiter(src) .map_err(|err| err.into()) .context(error::DecodeWalHeaderSnafu)?; @@ -187,7 +230,7 @@ impl Decoder for WalHeaderDecoder { .map_err(|err| err.into()) .context(error::DecodeWalHeaderSnafu)?; - Ok(Some((data_pos, wal_header))) + Ok((data_pos, wal_header)) } } @@ -214,6 +257,30 @@ mod tests { assert_eq!(29, res.1); } + #[tokio::test] + pub async fn test_read_wal_only_header() -> Result<()> { + let (log_store, _tmp) = + test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await; + let wal = Wal::new("test_region", Arc::new(log_store)); + let header = WalHeader::with_last_manifest_version(111); + let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?; + + assert_eq!(0, seq_num); + + let mut stream = wal.read_from_wal(seq_num).await?; + let mut data = vec![]; + while let Some((seq_num, header, write_batch)) = stream.try_next().await? { + data.push((seq_num, header, write_batch)); + } + + assert_eq!(1, data.len()); + assert_eq!(seq_num, data[0].0); + assert_eq!(111, data[0].1.last_manifest_version); + assert!(data[0].2.is_none()); + + Ok(()) + } + #[test] pub fn test_wal_header_codec() { let wal_header = WalHeader { @@ -233,9 +300,7 @@ mod tests { let decoder = WalHeaderDecoder {}; let res = decoder.decode(&buf).unwrap(); - assert!(res.is_some()); - - let data_pos = res.unwrap().0; + let data_pos = res.0; assert_eq!(buf.len() - 3, data_pos); } } diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 10ea62e38b..4bc6d32614 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -91,8 +91,8 @@ pub enum Error { #[snafu(display("Failed to decode, in stream waiting state"))] StreamWaiting, - #[snafu(display("Failed to decode, data corruption {}", message))] - DataCorruption { + #[snafu(display("Failed to decode, corrupted data {}", message))] + DataCorrupted { message: String, backtrace: Backtrace, }, @@ -422,7 +422,7 @@ pub mod codec { use store_api::storage::WriteRequest; use super::{ - DataCorruptionSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu, + DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu, Error as WriteBatchError, Mutation, ParseSchemaSnafu, Result, WriteBatch, }; use crate::{ @@ -510,7 +510,6 @@ pub mod codec { } impl WriteBatchArrowDecoder { - #[allow(dead_code)] pub fn new(mutation_extras: Vec) -> Self { Self { mutation_extras } } @@ -520,7 +519,7 @@ pub mod codec { type Item = WriteBatch; type Error = WriteBatchError; - fn decode(&self, src: &[u8]) -> Result> { + fn decode(&self, src: &[u8]) -> Result { let mut reader = Cursor::new(src); let metadata = read::read_stream_metadata(&mut reader).context(DecodeArrowSnafu)?; let mut reader = ArrowStreamReader::new(reader, metadata); @@ -539,7 +538,7 @@ pub mod codec { // check if exactly finished ensure!( reader.check_exactly_finished().context(DecodeArrowSnafu)?, - DataCorruptionSnafu { + DataCorruptedSnafu { message: "Impossible, the num of data chunks is different than expected." } ); @@ -571,8 +570,8 @@ pub mod codec { ensure!( chunks.len() == self.mutation_extras.len(), - DataCorruptionSnafu { - message: &format!( + DataCorruptedSnafu { + message: format!( "expected {} mutations, but got {}", self.mutation_extras.len(), chunks.len() @@ -592,36 +591,41 @@ pub mod codec { .mutation_extras .iter() .zip(chunks.iter()) - .map(|(ext, mtn)| match ext.mutation_type { - x if x == MutationType::Put as i32 => { - let valid_column_names = if ext.column_null_mask.is_empty() { - column_names.clone() - } else { - bit_vec::BitVec::from_bytes(&ext.column_null_mask) + .map( + |(ext, mtn)| match MutationType::from_i32(ext.mutation_type) { + Some(MutationType::Put) => { + let valid_column_names = if ext.column_null_mask.is_empty() { + column_names.clone() + } else { + bit_vec::BitVec::from_bytes(&ext.column_null_mask) + .iter() + .zip(column_names.iter()) + .filter(|(mask, _)| !*mask) + .map(|(_, column_name)| column_name.clone()) + .collect::>() + }; + + let mut put_data = PutData::with_num_columns(valid_column_names.len()); + + let res = valid_column_names .iter() - .zip(column_names.iter()) - .filter(|(mask, _)| !*mask) - .map(|(_, column_name)| column_name.clone()) - .collect::>() - }; + .zip(mtn) + .map(|(name, vector)| { + put_data.add_column_by_name(name, vector.clone()) + }) + .collect::>>(); - let mut put_data = PutData::with_num_columns(valid_column_names.len()); - - let res = valid_column_names - .iter() - .zip(mtn) - .map(|(name, vector)| put_data.add_column_by_name(name, vector.clone())) - .collect::>>(); - - res.map(|_| Mutation::Put(put_data)) - } - x if x == MutationType::Delete as i32 => { - todo!() - } - _ => { - unreachable!() - } - }) + res.map(|_| Mutation::Put(put_data)) + } + Some(MutationType::Delete) => { + todo!("delete mutation") + } + _ => DataCorruptedSnafu { + message: format!("Unexpceted mutation type: {}", ext.mutation_type), + } + .fail(), + }, + ) .collect::>>()?; let mut write_batch = WriteBatch::new(Arc::new(schema)); @@ -632,7 +636,7 @@ pub mod codec { Mutation::Put(put_data) => write_batch.put(put_data), })?; - Ok(Some(write_batch)) + Ok(write_batch) } } } @@ -893,7 +897,7 @@ mod tests { let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch)); let result = decoder.decode(&dst); - let batch2 = result?.unwrap(); + let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows); Ok(()) @@ -921,7 +925,7 @@ mod tests { let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch)); let result = decoder.decode(&dst); - let batch2 = result?.unwrap(); + let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows); Ok(())