feat: Implement the Buf to avoid extra memory allocation (#4585)

* feat: Implement the Buf to avoid extra memory allocation

* fmt toml

* fmt code

* mv entry.into_buffer to raw_entry_buffer

* less reuse opendal

* remove todo #4065

* Update src/mito2/src/wal/entry_reader.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fmt code

---------

Co-authored-by: ozewr <l19ht@google.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
ozewr
2024-08-19 20:11:08 +08:00
committed by GitHub
parent 8de11a0e34
commit 30af78700f
2 changed files with 13 additions and 5 deletions

View File

@@ -15,6 +15,7 @@
use api::v1::WalEntry;
use async_stream::stream;
use futures::StreamExt;
use object_store::Buffer;
use prost::Message;
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Entry;
@@ -28,13 +29,20 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)>
let entry_id = raw_entry.entry_id();
let region_id = raw_entry.region_id();
ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id });
// TODO(weny): implement the [Buf] for return value, avoid extra memory allocation.
let bytes = raw_entry.into_bytes();
let wal_entry = WalEntry::decode(bytes.as_slice()).context(DecodeWalSnafu { region_id })?;
let buffer = into_buffer(raw_entry);
let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
Ok((entry_id, wal_entry))
}
fn into_buffer(raw_entry: Entry) -> Buffer {
match raw_entry {
Entry::Naive(entry) => Buffer::from(entry.data),
Entry::MultiplePart(entry) => {
Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from))
}
}
}
/// [WalEntryReader] provides the ability to read and decode entries from the underlying store.
///
/// Notes: It will consume the inner stream and only allow invoking the `read` at once.

View File

@@ -14,7 +14,7 @@
pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,
Result, Writer,
};