diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 07a21596cb..dc068f3b4b 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -20,10 +20,9 @@ pub(crate) mod util; use std::fmt::Display; use serde::{Deserialize, Serialize}; -use store_api::logstore::entry::{Entry, Id as EntryId}; +use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry}; use store_api::logstore::namespace::Namespace; - -use crate::error::Error; +use store_api::storage::RegionId; /// Kafka Namespace implementation. #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] @@ -56,7 +55,13 @@ pub struct EntryImpl { } impl Entry for EntryImpl { - type Error = Error; + fn into_raw_entry(self) -> RawEntry { + RawEntry { + region_id: self.region_id(), + entry_id: self.id(), + data: self.data, + } + } fn data(&self) -> &[u8] { &self.data @@ -66,6 +71,10 @@ impl Entry for EntryImpl { self.id } + fn region_id(&self) -> RegionId { + RegionId::from_u64(self.ns.region_id) + } + fn estimated_size(&self) -> usize { size_of::() + self.data.capacity() * size_of::() + self.ns.topic.capacity() } diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index ded005ec79..e5ed7fd66b 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -13,9 +13,10 @@ // limitations under the License. use common_wal::options::WalOptions; -use store_api::logstore::entry::{Entry, Id as EntryId}; +use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; +use store_api::storage::RegionId; use crate::error::{Error, Result}; @@ -36,7 +37,13 @@ impl Namespace for NamespaceImpl { } impl Entry for EntryImpl { - type Error = Error; + fn into_raw_entry(self) -> RawEntry { + RawEntry { + region_id: self.region_id(), + entry_id: self.id(), + data: vec![], + } + } fn data(&self) -> &[u8] { &[] @@ -46,6 +53,10 @@ impl Entry for EntryImpl { 0 } + fn region_id(&self) -> RegionId { + RegionId::from_u64(0) + } + fn estimated_size(&self) -> usize { 0 } diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index e7a6f6b0ca..cdb600249c 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -15,10 +15,10 @@ use std::hash::{Hash, Hasher}; use std::mem::size_of; -use store_api::logstore::entry::{Entry, Id as EntryId}; +use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; +use store_api::storage::RegionId; -use crate::error::Error; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; mod backend; @@ -67,7 +67,13 @@ impl Namespace for NamespaceImpl { } impl Entry for EntryImpl { - type Error = Error; + fn into_raw_entry(self) -> RawEntry { + RawEntry { + region_id: self.region_id(), + entry_id: self.id(), + data: self.data, + } + } fn data(&self) -> &[u8] { self.data.as_slice() @@ -77,6 +83,10 @@ impl Entry for EntryImpl { self.id } + fn region_id(&self) -> RegionId { + RegionId::from_u64(self.id) + } + fn estimated_size(&self) -> usize { self.data.len() + size_of::() + size_of::() } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 06e38f36f9..a36493f300 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -14,6 +14,13 @@ //! Write ahead log of the engine. +/// TODO(weny): remove it +#[allow(unused)] +pub(crate) mod raw_entry_reader; +/// TODO(weny): remove it +#[allow(unused)] +pub(crate) mod wal_entry_reader; + use std::collections::HashMap; use std::mem; use std::sync::Arc; diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs new file mode 100644 index 0000000000..aa4d5ea0e4 --- /dev/null +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::stream::BoxStream; +use store_api::logstore::entry::RawEntry; +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::wal::EntryId; + +/// A stream that yields [RawEntry]. +pub type RawEntryStream<'a> = BoxStream<'a, Result>; + +// The namespace of kafka log store +pub struct KafkaNamespace<'a> { + topic: &'a str, +} + +// The namespace of raft engine log store +pub struct RaftEngineNamespace { + region_id: RegionId, +} + +/// The namespace of [RawEntryReader]. +pub(crate) enum LogStoreNamespace<'a> { + RaftEngine(RaftEngineNamespace), + Kafka(KafkaNamespace<'a>), +} + +/// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore]. +pub(crate) trait RawEntryReader: Send + Sync { + fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result>; +} diff --git a/src/mito2/src/wal/wal_entry_reader.rs b/src/mito2/src/wal/wal_entry_reader.rs new file mode 100644 index 0000000000..8c3e161222 --- /dev/null +++ b/src/mito2/src/wal/wal_entry_reader.rs @@ -0,0 +1,24 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::wal::raw_entry_reader::LogStoreNamespace; +use crate::wal::{EntryId, WalEntryStream}; + +/// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store. +pub(crate) trait OneshotWalEntryReader: Send + Sync { + fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result; +} diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index daac2df4c9..50e58a38fe 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -12,16 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::ext::ErrorExt; +use crate::storage::RegionId; /// An entry's id. /// Different log store implementations may interpret the id to different meanings. pub type Id = u64; +/// The raw Wal entry. +pub struct RawEntry { + pub region_id: RegionId, + pub entry_id: Id, + pub data: Vec, +} + /// Entry is the minimal data storage unit through which users interact with the log store. /// The log store implementation may have larger or smaller data storage unit than an entry. pub trait Entry: Send + Sync { - type Error: ErrorExt + Send + Sync; + /// Consumes [Entry] and converts to [RawEntry]. + fn into_raw_entry(self) -> RawEntry; /// Returns the contained data of the entry. fn data(&self) -> &[u8]; @@ -30,6 +38,9 @@ pub trait Entry: Send + Sync { /// Usually the namespace id is identical with the region id. fn id(&self) -> Id; + /// Returns the [RegionId] + fn region_id(&self) -> RegionId; + /// Computes the estimated encoded size. fn estimated_size(&self) -> usize; } diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 5f26133ada..6a5886b0b5 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -39,6 +39,8 @@ mod tests { use super::*; pub use crate::logstore::entry::Id; + use crate::logstore::entry::RawEntry; + use crate::storage::RegionId; pub struct SimpleEntry { /// Binary data of current entry @@ -64,7 +66,13 @@ mod tests { } impl Entry for SimpleEntry { - type Error = Error; + fn into_raw_entry(self) -> RawEntry { + RawEntry { + region_id: RegionId::from_u64(0), + entry_id: 0, + data: vec![], + } + } fn data(&self) -> &[u8] { &self.data @@ -74,6 +82,10 @@ mod tests { 0u64 } + fn region_id(&self) -> RegionId { + RegionId::from_u64(0) + } + fn estimated_size(&self) -> usize { self.data.len() }