feat: decode WAL entry (#123)

* feat: decode wal entry

* chore: todo message
This commit is contained in:
Jiachun Feng
2022-08-02 17:52:00 +08:00
committed by GitHub
parent cd42f308a8
commit 1a06a7be88
5 changed files with 135 additions and 58 deletions

View File

@@ -15,5 +15,5 @@ pub trait Decoder {
type Error: ErrorExt;
/// Decodes a message from the bytes buffer.
fn decode(&self, src: &[u8]) -> Result<Option<Self::Item>, Self::Error>;
fn decode(&self, src: &[u8]) -> Result<Self::Item, Self::Error>;
}

View File

@@ -203,6 +203,13 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("WAL data corrupted, name: {}, message: {}", name, message))]
WalDataCorrupted {
name: String,
message: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -227,7 +234,8 @@ impl ErrorExt for Error {
| DecodeRegionMetaActionList { .. }
| Readline { .. }
| InvalidParquetSchema { .. }
| SequenceColumnNotFound { .. } => StatusCode::Unexpected,
| SequenceColumnNotFound { .. }
| WalDataCorrupted { .. } => StatusCode::Unexpected,
FlushIo { .. }
| InitBackend { .. }

View File

@@ -216,7 +216,7 @@ impl WriterInner {
let mut stream = writer_ctx.wal.read_from_wal(start_sequence).await?;
while let Some((_header, _write_batch)) = stream.try_next().await? {
while let Some((_seq_num, _header, _write_batch)) = stream.try_next().await? {
// TODO(yingwen): [open_region] 1. Split write batch and insert into memtables. 2. Need to update
// (recover) committed_sequence.
}

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use common_error::prelude::BoxedError;
use futures::{stream, Stream, TryStreamExt};
use prost::Message;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::{
logstore::{entry::Entry, namespace::Namespace, AppendResponse, LogStore},
storage::SequenceNumber,
@@ -14,7 +14,10 @@ use crate::{
codec::{Decoder, Encoder},
error::{self, Error, Result},
proto::{self, PayloadType, WalHeader},
write_batch::{codec::WriteBatchArrowEncoder, WriteBatch},
write_batch::{
codec::{WriteBatchArrowDecoder, WriteBatchArrowEncoder},
WriteBatch,
},
};
#[derive(Debug)]
@@ -23,8 +26,9 @@ pub struct Wal<S: LogStore> {
store: Arc<S>,
}
pub type WriteBatchStream<'a> =
Pin<Box<dyn Stream<Item = Result<(WalHeader, WriteBatch)>> + Send + 'a>>;
pub type WriteBatchStream<'a> = Pin<
Box<dyn Stream<Item = Result<(SequenceNumber, WalHeader, Option<WriteBatch>)>> + Send + 'a>,
>;
// wal should be cheap to clone
impl<S: LogStore> Clone for Wal<S> {
@@ -111,7 +115,7 @@ impl<S: LogStore> Wal<S> {
source: BoxedError::new(e),
})
.and_then(|entries| async {
let iter = entries.into_iter().map(decode_entry);
let iter = entries.into_iter().map(|x| self.decode_entry(x));
Ok(stream::iter(iter))
})
@@ -121,24 +125,63 @@ impl<S: LogStore> Wal<S> {
}
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> {
let ns = self.namespace.clone();
let mut e = self.store.entry(bytes);
e.set_id(seq);
let res = self
.store
.append(&ns, e)
.append(&self.namespace, e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
Ok((res.entry_id(), res.offset()))
}
}
fn decode_entry<E: Entry>(_entry: E) -> Result<(WalHeader, WriteBatch)> {
// TODO(yingwen): [open_region] Decode entry into write batch.
unimplemented!()
fn decode_entry<E: Entry>(
&self,
entry: E,
) -> Result<(SequenceNumber, WalHeader, Option<WriteBatch>)> {
let seq_num = entry.id();
let input = entry.data();
let wal_header_decoder = WalHeaderDecoder {};
let (data_pos, mut header) = wal_header_decoder.decode(input)?;
ensure!(
data_pos <= input.len(),
error::WalDataCorruptedSnafu {
name: self.name(),
message: format!(
"Not enough input buffer, expected data position={}, actual buffer length={}",
data_pos,
input.len()
),
}
);
match PayloadType::from_i32(header.payload_type) {
Some(PayloadType::None) => Ok((seq_num, header, None)),
Some(PayloadType::WriteBatchArrow) => {
let mutation_extras = std::mem::take(&mut header.mutation_extras);
let decoder = WriteBatchArrowDecoder::new(mutation_extras);
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?;
Ok((seq_num, header, Some(write_batch)))
}
Some(PayloadType::WriteBatchProto) => {
todo!("protobuf decoder")
}
_ => error::WalDataCorruptedSnafu {
name: self.name(),
message: format!("invalid payload type={}", header.payload_type),
}
.fail(),
}
}
}
pub enum Payload<'a> {
@@ -177,7 +220,7 @@ impl Decoder for WalHeaderDecoder {
type Item = (usize, WalHeader);
type Error = Error;
fn decode(&self, src: &[u8]) -> Result<Option<(usize, WalHeader)>> {
fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> {
let mut data_pos = prost::decode_length_delimiter(src)
.map_err(|err| err.into())
.context(error::DecodeWalHeaderSnafu)?;
@@ -187,7 +230,7 @@ impl Decoder for WalHeaderDecoder {
.map_err(|err| err.into())
.context(error::DecodeWalHeaderSnafu)?;
Ok(Some((data_pos, wal_header)))
Ok((data_pos, wal_header))
}
}
@@ -214,6 +257,30 @@ mod tests {
assert_eq!(29, res.1);
}
#[tokio::test]
pub async fn test_read_wal_only_header() -> Result<()> {
let (log_store, _tmp) =
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new("test_region", Arc::new(log_store));
let header = WalHeader::with_last_manifest_version(111);
let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?;
assert_eq!(0, seq_num);
let mut stream = wal.read_from_wal(seq_num).await?;
let mut data = vec![];
while let Some((seq_num, header, write_batch)) = stream.try_next().await? {
data.push((seq_num, header, write_batch));
}
assert_eq!(1, data.len());
assert_eq!(seq_num, data[0].0);
assert_eq!(111, data[0].1.last_manifest_version);
assert!(data[0].2.is_none());
Ok(())
}
#[test]
pub fn test_wal_header_codec() {
let wal_header = WalHeader {
@@ -233,9 +300,7 @@ mod tests {
let decoder = WalHeaderDecoder {};
let res = decoder.decode(&buf).unwrap();
assert!(res.is_some());
let data_pos = res.unwrap().0;
let data_pos = res.0;
assert_eq!(buf.len() - 3, data_pos);
}
}

View File

@@ -91,8 +91,8 @@ pub enum Error {
#[snafu(display("Failed to decode, in stream waiting state"))]
StreamWaiting,
#[snafu(display("Failed to decode, data corruption {}", message))]
DataCorruption {
#[snafu(display("Failed to decode, corrupted data {}", message))]
DataCorrupted {
message: String,
backtrace: Backtrace,
},
@@ -422,7 +422,7 @@ pub mod codec {
use store_api::storage::WriteRequest;
use super::{
DataCorruptionSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
Error as WriteBatchError, Mutation, ParseSchemaSnafu, Result, WriteBatch,
};
use crate::{
@@ -510,7 +510,6 @@ pub mod codec {
}
impl WriteBatchArrowDecoder {
#[allow(dead_code)]
pub fn new(mutation_extras: Vec<MutationExtra>) -> Self {
Self { mutation_extras }
}
@@ -520,7 +519,7 @@ pub mod codec {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<Option<WriteBatch>> {
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let mut reader = Cursor::new(src);
let metadata = read::read_stream_metadata(&mut reader).context(DecodeArrowSnafu)?;
let mut reader = ArrowStreamReader::new(reader, metadata);
@@ -539,7 +538,7 @@ pub mod codec {
// check if exactly finished
ensure!(
reader.check_exactly_finished().context(DecodeArrowSnafu)?,
DataCorruptionSnafu {
DataCorruptedSnafu {
message: "Impossible, the num of data chunks is different than expected."
}
);
@@ -571,8 +570,8 @@ pub mod codec {
ensure!(
chunks.len() == self.mutation_extras.len(),
DataCorruptionSnafu {
message: &format!(
DataCorruptedSnafu {
message: format!(
"expected {} mutations, but got {}",
self.mutation_extras.len(),
chunks.len()
@@ -592,36 +591,41 @@ pub mod codec {
.mutation_extras
.iter()
.zip(chunks.iter())
.map(|(ext, mtn)| match ext.mutation_type {
x if x == MutationType::Put as i32 => {
let valid_column_names = if ext.column_null_mask.is_empty() {
column_names.clone()
} else {
bit_vec::BitVec::from_bytes(&ext.column_null_mask)
.map(
|(ext, mtn)| match MutationType::from_i32(ext.mutation_type) {
Some(MutationType::Put) => {
let valid_column_names = if ext.column_null_mask.is_empty() {
column_names.clone()
} else {
bit_vec::BitVec::from_bytes(&ext.column_null_mask)
.iter()
.zip(column_names.iter())
.filter(|(mask, _)| !*mask)
.map(|(_, column_name)| column_name.clone())
.collect::<Vec<_>>()
};
let mut put_data = PutData::with_num_columns(valid_column_names.len());
let res = valid_column_names
.iter()
.zip(column_names.iter())
.filter(|(mask, _)| !*mask)
.map(|(_, column_name)| column_name.clone())
.collect::<Vec<_>>()
};
.zip(mtn)
.map(|(name, vector)| {
put_data.add_column_by_name(name, vector.clone())
})
.collect::<Result<Vec<_>>>();
let mut put_data = PutData::with_num_columns(valid_column_names.len());
let res = valid_column_names
.iter()
.zip(mtn)
.map(|(name, vector)| put_data.add_column_by_name(name, vector.clone()))
.collect::<Result<Vec<_>>>();
res.map(|_| Mutation::Put(put_data))
}
x if x == MutationType::Delete as i32 => {
todo!()
}
_ => {
unreachable!()
}
})
res.map(|_| Mutation::Put(put_data))
}
Some(MutationType::Delete) => {
todo!("delete mutation")
}
_ => DataCorruptedSnafu {
message: format!("Unexpceted mutation type: {}", ext.mutation_type),
}
.fail(),
},
)
.collect::<Result<Vec<_>>>()?;
let mut write_batch = WriteBatch::new(Arc::new(schema));
@@ -632,7 +636,7 @@ pub mod codec {
Mutation::Put(put_data) => write_batch.put(put_data),
})?;
Ok(Some(write_batch))
Ok(write_batch)
}
}
}
@@ -893,7 +897,7 @@ mod tests {
let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch));
let result = decoder.decode(&dst);
let batch2 = result?.unwrap();
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
@@ -921,7 +925,7 @@ mod tests {
let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch));
let result = decoder.decode(&dst);
let batch2 = result?.unwrap();
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())