feat: add RawEntryReader and OneshotWalEntryReader trait (#4027)

* feat: add `RawEntryReader` and `OneShotWalEntryReader` trait

* chore: rename `OneShot` to `Oneshot`

* refacotr: remove `region_id` from `OneshotWalEntryReader`
This commit is contained in:
Weny Xu
2024-05-24 15:30:50 +09:00
committed by GitHub
parent 0101657649
commit 466f7c6448
8 changed files with 140 additions and 12 deletions

View File

@@ -20,10 +20,9 @@ pub(crate) mod util;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::Namespace;
use crate::error::Error;
use store_api::storage::RegionId;
/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
@@ -56,7 +55,13 @@ pub struct EntryImpl {
}
impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}
fn data(&self) -> &[u8] {
&self.data
@@ -66,6 +71,10 @@ impl Entry for EntryImpl {
self.id
}
fn region_id(&self) -> RegionId {
RegionId::from_u64(self.ns.region_id)
}
fn estimated_size(&self) -> usize {
size_of::<Self>() + self.data.capacity() * size_of::<u8>() + self.ns.topic.capacity()
}

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use common_wal::options::WalOptions;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
use store_api::storage::RegionId;
use crate::error::{Error, Result};
@@ -36,7 +37,13 @@ impl Namespace for NamespaceImpl {
}
impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: vec![],
}
}
fn data(&self) -> &[u8] {
&[]
@@ -46,6 +53,10 @@ impl Entry for EntryImpl {
0
}
fn region_id(&self) -> RegionId {
RegionId::from_u64(0)
}
fn estimated_size(&self) -> usize {
0
}

View File

@@ -15,10 +15,10 @@
use std::hash::{Hash, Hasher};
use std::mem::size_of;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::storage::RegionId;
use crate::error::Error;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
mod backend;
@@ -67,7 +67,13 @@ impl Namespace for NamespaceImpl {
}
impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}
fn data(&self) -> &[u8] {
self.data.as_slice()
@@ -77,6 +83,10 @@ impl Entry for EntryImpl {
self.id
}
fn region_id(&self) -> RegionId {
RegionId::from_u64(self.id)
}
fn estimated_size(&self) -> usize {
self.data.len() + size_of::<u64>() + size_of::<u64>()
}

View File

@@ -14,6 +14,13 @@
//! Write ahead log of the engine.
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod raw_entry_reader;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod wal_entry_reader;
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;

View File

@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::stream::BoxStream;
use store_api::logstore::entry::RawEntry;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::wal::EntryId;
/// A stream that yields [RawEntry].
pub type RawEntryStream<'a> = BoxStream<'a, Result<RawEntry>>;
// The namespace of kafka log store
pub struct KafkaNamespace<'a> {
topic: &'a str,
}
// The namespace of raft engine log store
pub struct RaftEngineNamespace {
region_id: RegionId,
}
/// The namespace of [RawEntryReader].
pub(crate) enum LogStoreNamespace<'a> {
RaftEngine(RaftEngineNamespace),
Kafka(KafkaNamespace<'a>),
}
/// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore].
pub(crate) trait RawEntryReader: Send + Sync {
fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<RawEntryStream<'static>>;
}

View File

@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::storage::RegionId;
use crate::error::Result;
use crate::wal::raw_entry_reader::LogStoreNamespace;
use crate::wal::{EntryId, WalEntryStream};
/// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store.
pub(crate) trait OneshotWalEntryReader: Send + Sync {
fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<WalEntryStream>;
}

View File

@@ -12,16 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use crate::storage::RegionId;
/// An entry's id.
/// Different log store implementations may interpret the id to different meanings.
pub type Id = u64;
/// The raw Wal entry.
pub struct RawEntry {
pub region_id: RegionId,
pub entry_id: Id,
pub data: Vec<u8>,
}
/// Entry is the minimal data storage unit through which users interact with the log store.
/// The log store implementation may have larger or smaller data storage unit than an entry.
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
/// Consumes [Entry] and converts to [RawEntry].
fn into_raw_entry(self) -> RawEntry;
/// Returns the contained data of the entry.
fn data(&self) -> &[u8];
@@ -30,6 +38,9 @@ pub trait Entry: Send + Sync {
/// Usually the namespace id is identical with the region id.
fn id(&self) -> Id;
/// Returns the [RegionId]
fn region_id(&self) -> RegionId;
/// Computes the estimated encoded size.
fn estimated_size(&self) -> usize;
}

View File

@@ -39,6 +39,8 @@ mod tests {
use super::*;
pub use crate::logstore::entry::Id;
use crate::logstore::entry::RawEntry;
use crate::storage::RegionId;
pub struct SimpleEntry {
/// Binary data of current entry
@@ -64,7 +66,13 @@ mod tests {
}
impl Entry for SimpleEntry {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: RegionId::from_u64(0),
entry_id: 0,
data: vec![],
}
}
fn data(&self) -> &[u8] {
&self.data
@@ -74,6 +82,10 @@ mod tests {
0u64
}
fn region_id(&self) -> RegionId {
RegionId::from_u64(0)
}
fn estimated_size(&self) -> usize {
self.data.len()
}