diff --git a/src/log-store/src/fs/entry.rs b/src/log-store/src/fs/entry.rs index 39a67984e2..de0163634a 100644 --- a/src/log-store/src/fs/entry.rs +++ b/src/log-store/src/fs/entry.rs @@ -7,20 +7,23 @@ use futures::Stream; use snafu::{ensure, ResultExt}; use store_api::logstore::entry::{Encode, Entry, Epoch, Id, Offset}; use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream}; +use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; use crate::error::{CorruptedSnafu, DecodeAgainSnafu, DecodeSnafu, EncodeSnafu, Error}; use crate::fs::crc; +use crate::fs::namespace::LocalNamespace; -// length + offset + epoch + crc -const ENTRY_MIN_LEN: usize = 4 + 8 + 8 + 4; -// length + offset + epoch -const HEADER_LENGTH: usize = 4 + 8 + 8; +// length + offset + namespace id + epoch + crc +const ENTRY_MIN_LEN: usize = HEADER_LENGTH + 4; +// length + offset + namespace id + epoch +const HEADER_LENGTH: usize = 4 + 8 + 8 + 8; #[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { pub data: Vec, pub offset: Offset, pub id: Id, + pub namespace_id: NamespaceId, pub epoch: Epoch, } @@ -36,15 +39,18 @@ impl Encode for EntryImpl { /// Entry binary format (Little endian): /// - /// +--------+--------+--------+--------+--------+ - // |entry id| epoch | length | data | CRC | - // +--------+--------+--------+--------+--------+ - // | 8 bytes| 8 bytes| 4 bytes|| 4 bytes| - // +--------+--------+--------+--------+--------+ + // ```text + // +--------+--------------+-------+--------+--------+--------+ + // |entry id| namespace id | epoch | length | data | CRC | + // +--------+--------------+-------+--------+--------+--------+ + // | 8 bytes| 8 bytes |8 bytes| 4 bytes|| 4 bytes| + // +--------+--------------+-------+--------+--------+--------+ + // ``` /// fn encode_to(&self, buf: &mut T) -> Result { let data_length = self.data.len(); buf.write_u64_le(self.id).context(EncodeSnafu)?; + buf.write_u64_le(self.namespace_id).context(EncodeSnafu)?; buf.write_u64_le(self.epoch).context(EncodeSnafu)?; buf.write_u32_le(data_length as u32).context(EncodeSnafu)?; buf.write_from_slice(self.data.as_slice()) @@ -76,6 +82,9 @@ impl Encode for EntryImpl { let id = header.read_u64_le().unwrap(); // unwrap here is safe because header bytes must be present digest.update(&id.to_le_bytes()); + let namespace_id = header.read_u64_le().unwrap(); + digest.update(&namespace_id.to_le_bytes()); + let epoch = header.read_u64_le().unwrap(); digest.update(&epoch.to_le_bytes()); @@ -115,6 +124,7 @@ impl Encode for EntryImpl { data, epoch, offset: 0, + namespace_id, }) } @@ -124,18 +134,20 @@ impl Encode for EntryImpl { } impl EntryImpl { - pub(crate) fn new(data: impl AsRef<[u8]>, id: Id) -> EntryImpl { + pub(crate) fn new(data: impl AsRef<[u8]>, id: Id, namespace: LocalNamespace) -> EntryImpl { EntryImpl { id, data: data.as_ref().to_vec(), offset: 0, epoch: 0, + namespace_id: namespace.id(), } } } impl Entry for EntryImpl { type Error = Error; + type Namespace = LocalNamespace; fn data(&self) -> &[u8] { &self.data @@ -164,6 +176,10 @@ impl Entry for EntryImpl { fn is_empty(&self) -> bool { self.data.is_empty() } + + fn namespace(&self) -> Self::Namespace { + LocalNamespace::new(self.namespace_id) + } } impl TryFrom for EntryImpl { @@ -220,7 +236,7 @@ mod tests { #[test] pub fn test_entry_deser() { let data = "hello, world"; - let mut entry = EntryImpl::new(data.as_bytes(), 8); + let mut entry = EntryImpl::new(data.as_bytes(), 8, LocalNamespace::new(42)); entry.epoch = 9; let mut buf = BytesMut::with_capacity(entry.encoded_size()); entry.encode_to(&mut buf).unwrap(); @@ -232,7 +248,7 @@ mod tests { #[test] pub fn test_rewrite_entry_id() { let data = "hello, world"; - let entry = EntryImpl::new(data.as_bytes(), 123); + let entry = EntryImpl::new(data.as_bytes(), 123, LocalNamespace::new(42)); let mut buffer = BytesMut::with_capacity(entry.encoded_size()); entry.encode_to(&mut buffer).unwrap(); assert_eq!(123, entry.id()); @@ -248,7 +264,7 @@ mod tests { } fn prepare_entry_bytes(data: &str, id: Id) -> Bytes { - let mut entry = EntryImpl::new(data.as_bytes(), id); + let mut entry = EntryImpl::new(data.as_bytes(), id, LocalNamespace::new(42)); entry.set_id(123); entry.set_offset(456); let mut buffer = BytesMut::with_capacity(entry.encoded_size()); @@ -299,7 +315,7 @@ mod tests { let data = "hello, world"; let bytes = prepare_entry_bytes(data, 42); assert_eq!( - hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F") + hex::decode("7B000000000000002A0000000000000000000000000000000C00000068656C6C6F2C20776F726C64E8EE2E57") .unwrap() .as_slice(), &bytes[..] @@ -324,7 +340,7 @@ mod tests { let data = "hello, world"; let bytes = prepare_entry_bytes(data, 42); assert_eq!( - hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F") + hex::decode("7b000000000000002a0000000000000000000000000000000c00000068656c6c6f2c20776f726c64e8ee2e57") .unwrap() .as_slice(), &bytes[..] diff --git a/src/log-store/src/fs/file.rs b/src/log-store/src/fs/file.rs index a127399c3d..c1fa764295 100644 --- a/src/log-store/src/fs/file.rs +++ b/src/log-store/src/fs/file.rs @@ -653,25 +653,33 @@ mod tests { assert_eq!( 10, - file.append(&mut EntryImpl::new("test1".as_bytes(), 10)) - .await - .expect("Failed to append entry 1") - .entry_id + file.append(&mut EntryImpl::new( + "test1".as_bytes(), + 10, + LocalNamespace::new(42) + )) + .await + .expect("Failed to append entry 1") + .entry_id ); assert_eq!( 11, - file.append(&mut EntryImpl::new("test-2".as_bytes(), 11)) - .await - .expect("Failed to append entry 2") - .entry_id + file.append(&mut EntryImpl::new( + "test-2".as_bytes(), + 11, + LocalNamespace::new(42) + )) + .await + .expect("Failed to append entry 2") + .entry_id ); let mut log_file = std::fs::File::open(path.clone()).expect("Test log file does not exist"); let metadata = log_file.metadata().expect("Failed to read file metadata"); info!("Log file metadata: {:?}", metadata); - assert_eq!(59, metadata.len()); // 24+5+24+6 + assert_eq!(75, metadata.len()); // 32+5+32+6 let mut content = vec![0; metadata.len() as usize]; log_file .read_exact(&mut content) @@ -684,7 +692,7 @@ mod tests { metadata.len() ); - let ns = LocalNamespace::default(); + let ns = LocalNamespace::new(42); let mut stream = file.create_stream(&ns, 0); let mut data = vec![]; diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs index 2ad05eb252..3cadb10f32 100644 --- a/src/log-store/src/fs/log.rs +++ b/src/log-store/src/fs/log.rs @@ -8,8 +8,9 @@ use common_telemetry::{error, info, warn}; use futures::pin_mut; use futures::StreamExt; use snafu::{OptionExt, ResultExt}; -use store_api::logstore::entry::{Encode, Id}; +use store_api::logstore::entry::{Encode, Entry, Id}; use store_api::logstore::entry_stream::SendableEntryStream; +use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; use store_api::logstore::LogStore; use tokio::sync::RwLock; @@ -178,11 +179,7 @@ impl LogStore for LocalFileLogStore { type Entry = EntryImpl; type AppendResponse = AppendResponseImpl; - async fn append( - &self, - _ns: &Self::Namespace, - mut entry: Self::Entry, - ) -> Result { + async fn append(&self, mut entry: Self::Entry) -> Result { // TODO(hl): configurable retry times for _ in 0..3 { let current_active_file = self.active_file(); @@ -226,6 +223,7 @@ impl LogStore for LocalFileLogStore { ) -> Result> { let files = self.files.read().await; let ns = ns.clone(); + let s = stream!({ for (start_id, file) in files.iter() { // TODO(hl): Use index to lookup file @@ -233,7 +231,15 @@ impl LogStore for LocalFileLogStore { let s = file.create_stream(&ns, *start_id); pin_mut!(s); while let Some(entries) = s.next().await { - yield entries; + match entries { + Ok(entries) => { + yield Ok(entries + .into_iter() + .filter(|e| e.namespace().id() == ns.id()) + .collect::>()) + } + Err(e) => yield Err(e), + } } } } @@ -254,12 +260,12 @@ impl LogStore for LocalFileLogStore { todo!() } - fn entry>(&self, data: D, id: Id) -> Self::Entry { - EntryImpl::new(data, id) + fn entry>(&self, data: D, id: Id, namespace: Self::Namespace) -> Self::Entry { + EntryImpl::new(data, id, namespace) } - fn namespace(&self, name: &str) -> Self::Namespace { - LocalNamespace::new(name) + fn namespace(&self, id: NamespaceId) -> Self::Namespace { + LocalNamespace::new(id) } } @@ -283,11 +289,14 @@ mod tests { }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); - let ns = LocalNamespace::default(); assert_eq!( 0, logstore - .append(&ns, EntryImpl::new(generate_data(100), 0),) + .append(EntryImpl::new( + generate_data(96), + 0, + LocalNamespace::new(42) + ),) .await .unwrap() .entry_id @@ -296,7 +305,11 @@ mod tests { assert_eq!( 1, logstore - .append(&ns, EntryImpl::new(generate_data(100), 1)) + .append(EntryImpl::new( + generate_data(96), + 1, + LocalNamespace::new(42) + )) .await .unwrap() .entry_id @@ -326,9 +339,13 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); - let ns = LocalNamespace::default(); + let ns = LocalNamespace::new(42); let id = logstore - .append(&ns, EntryImpl::new(generate_data(100), 0)) + .append(EntryImpl::new( + generate_data(96), + 0, + LocalNamespace::new(42), + )) .await .unwrap() .entry_id; @@ -340,5 +357,59 @@ mod tests { let entries = stream.next().await.unwrap().unwrap(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].id(), 0); + assert_eq!(42, entries[0].namespace_id); + } + + #[tokio::test] + pub async fn test_namespace() { + common_telemetry::logging::init_default_ut_logging(); + let dir = TempDir::new("greptimedb").unwrap(); + let config = LogConfig { + append_buffer_size: 128, + max_log_file_size: 1024 * 1024, + log_file_dir: dir.path().to_str().unwrap().to_string(), + }; + let logstore = LocalFileLogStore::open(&config).await.unwrap(); + assert_eq!( + 0, + logstore + .append(EntryImpl::new( + generate_data(96), + 0, + LocalNamespace::new(42), + )) + .await + .unwrap() + .entry_id + ); + + assert_eq!( + 1, + logstore + .append(EntryImpl::new( + generate_data(96), + 1, + LocalNamespace::new(43), + )) + .await + .unwrap() + .entry_id + ); + + let stream = logstore.read(&LocalNamespace::new(42), 0).await.unwrap(); + tokio::pin!(stream); + + let entries = stream.next().await.unwrap().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].id(), 0); + assert_eq!(42, entries[0].namespace_id); + + let stream = logstore.read(&LocalNamespace::new(43), 0).await.unwrap(); + tokio::pin!(stream); + + let entries = stream.next().await.unwrap().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].id(), 1); + assert_eq!(43, entries[0].namespace_id); } } diff --git a/src/log-store/src/fs/namespace.rs b/src/log-store/src/fs/namespace.rs index 75da1e3817..a0f71ea7f8 100644 --- a/src/log-store/src/fs/namespace.rs +++ b/src/log-store/src/fs/namespace.rs @@ -1,34 +1,24 @@ -use std::sync::Arc; - -use store_api::logstore::namespace::Namespace; +use store_api::logstore::namespace::{Id, Namespace}; #[derive(Clone, Debug)] pub struct LocalNamespace { - inner: Arc, + pub(crate) id: Id, } impl Default for LocalNamespace { fn default() -> Self { - LocalNamespace::new("") + LocalNamespace::new(0) } } -#[derive(Debug)] -struct LocalNamespaceInner { - name: String, -} - impl LocalNamespace { - pub(crate) fn new(name: &str) -> Self { - let inner = Arc::new(LocalNamespaceInner { - name: name.to_string(), - }); - Self { inner } + pub(crate) fn new(id: Id) -> Self { + Self { id } } } impl Namespace for LocalNamespace { - fn name(&self) -> &str { - self.inner.name.as_str() + fn id(&self) -> Id { + self.id } } diff --git a/src/log-store/src/fs/noop.rs b/src/log-store/src/fs/noop.rs index 6c723c997b..2d55d845f5 100644 --- a/src/log-store/src/fs/noop.rs +++ b/src/log-store/src/fs/noop.rs @@ -1,3 +1,4 @@ +use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{entry::Id, LogStore}; use crate::error::{Error, Result}; @@ -15,11 +16,7 @@ impl LogStore for NoopLogStore { type Entry = EntryImpl; type AppendResponse = AppendResponseImpl; - async fn append( - &self, - _ns: &Self::Namespace, - mut _e: Self::Entry, - ) -> Result { + async fn append(&self, mut _e: Self::Entry) -> Result { Ok(AppendResponseImpl { entry_id: 0, offset: 0, @@ -51,11 +48,11 @@ impl LogStore for NoopLogStore { todo!() } - fn entry>(&self, data: D, id: Id) -> Self::Entry { - EntryImpl::new(data, id) + fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { + EntryImpl::new(data, id, ns) } - fn namespace(&self, name: &str) -> Self::Namespace { - LocalNamespace::new(name) + fn namespace(&self, id: NamespaceId) -> Self::Namespace { + LocalNamespace::new(id) } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 09a30c2bda..84788f7ad4 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -120,7 +120,7 @@ impl RegionImpl { let id = metadata.id(); let name = metadata.name().to_string(); let version_control = VersionControl::with_version(version); - let wal = Wal::new(name.clone(), store_config.log_store); + let wal = Wal::new(id, store_config.log_store); let inner = Arc::new(RegionInner { shared: Arc::new(SharedData { @@ -160,7 +160,7 @@ impl RegionImpl { let metadata = version.metadata().clone(); let version_control = Arc::new(VersionControl::with_version(version)); - let wal = Wal::new(name.clone(), store_config.log_store); + let wal = Wal::new(metadata.id(), store_config.log_store); let shared = Arc::new(SharedData { id: metadata.id(), name, diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 61a5fc6d67..637ad4d79b 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -5,8 +5,9 @@ use common_error::prelude::BoxedError; use futures::{stream, Stream, TryStreamExt}; use prost::Message; use snafu::{ensure, ResultExt}; +use store_api::storage::RegionId; use store_api::{ - logstore::{entry::Entry, namespace::Namespace, AppendResponse, LogStore}, + logstore::{entry::Entry, AppendResponse, LogStore}, storage::SequenceNumber, }; @@ -25,6 +26,7 @@ use crate::{ #[derive(Debug)] pub struct Wal { + name: String, namespace: S::Namespace, store: Arc, } @@ -37,6 +39,7 @@ pub type WriteBatchStream<'a> = Pin< impl Clone for Wal { fn clone(&self) -> Self { Self { + name: self.name.clone(), namespace: self.namespace.clone(), store: self.store.clone(), } @@ -44,16 +47,18 @@ impl Clone for Wal { } impl Wal { - pub fn new(region_name: impl Into, store: Arc) -> Self { - let region_name = region_name.into(); - let namespace = store.namespace(®ion_name); - - Self { namespace, store } + pub fn new(region_id: RegionId, store: Arc) -> Self { + let namespace = store.namespace(region_id); + Self { + name: region_id.to_string(), + namespace, + store, + } } #[inline] pub fn name(&self) -> &str { - self.namespace.name() + &self.name } } @@ -72,11 +77,11 @@ impl Wal { /// +---------------------+----------------------------------------------------+--------------+-------------+--------------+ /// ``` /// - pub async fn write_to_wal<'a>( + pub async fn write_to_wal( &self, seq: SequenceNumber, mut header: WalHeader, - payload: Payload<'a>, + payload: Payload<'_>, ) -> Result<(u64, usize)> { header.payload_type = payload.payload_type(); @@ -134,11 +139,11 @@ impl Wal { } async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> { - let e = self.store.entry(bytes, seq); + let e = self.store.entry(bytes, seq, self.namespace.clone()); let res = self .store - .append(&self.namespace, e) + .append(e) .await .map_err(BoxedError::new) .context(error::WriteWalSnafu { name: self.name() })?; @@ -259,7 +264,7 @@ mod tests { pub async fn test_write_wal() { let (log_store, _tmp) = test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await; - let wal = Wal::new("test_region", Arc::new(log_store)); + let wal = Wal::new(0, Arc::new(log_store)); let res = wal.write(0, b"test1").await.unwrap(); @@ -269,7 +274,7 @@ mod tests { let res = wal.write(1, b"test2").await.unwrap(); assert_eq!(1, res.0); - assert_eq!(29, res.1); + assert_eq!(5 + 32, res.1); } #[tokio::test] @@ -277,7 +282,7 @@ mod tests { common_telemetry::init_default_ut_logging(); let (log_store, _tmp) = test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await; - let wal = Wal::new("test_region", Arc::new(log_store)); + let wal = Wal::new(0, Arc::new(log_store)); let header = WalHeader::with_last_manifest_version(111); let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?; diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index a1974485b5..390b33ab2c 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -19,11 +19,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type AppendResponse: AppendResponse; /// Append an `Entry` to WAL with given namespace - async fn append( - &self, - ns: &Self::Namespace, - mut e: Self::Entry, - ) -> Result; + async fn append(&self, mut e: Self::Entry) -> Result; /// Append a batch of entries atomically and return the offset of first entry. async fn append_batch( @@ -50,11 +46,11 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { async fn list_namespaces(&self) -> Result, Self::Error>; /// Create an entry of the associate Entry type - fn entry>(&self, data: D, id: Id) -> Self::Entry; + fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry; /// Create a namespace of the associate Namespace type // TODO(sunng87): confusion with `create_namespace` - fn namespace(&self, name: &str) -> Self::Namespace; + fn namespace(&self, id: namespace::Id) -> Self::Namespace; } pub trait AppendResponse: Send + Sync { diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index b1d9fe1aeb..4ed9e8f565 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -1,6 +1,8 @@ use common_base::buffer::{Buffer, BufferMut}; use common_error::ext::ErrorExt; +use crate::logstore::namespace::Namespace; + pub type Offset = usize; pub type Epoch = u64; pub type Id = u64; @@ -8,6 +10,8 @@ pub type Id = u64; /// Entry is the minimal data storage unit in `LogStore`. pub trait Entry: Encode + Send + Sync { type Error: ErrorExt + Send + Sync; + type Namespace: Namespace; + /// Return contained data of entry. fn data(&self) -> &[u8]; @@ -25,6 +29,8 @@ pub trait Entry: Encode + Send + Sync { fn len(&self) -> usize; fn is_empty(&self) -> bool; + + fn namespace(&self) -> Self::Namespace; } pub trait Encode: Sized { diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 6d67f2873f..fb96393713 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -41,6 +41,15 @@ mod tests { #[snafu(visibility(pub))] pub struct Error {} + #[derive(Debug, Clone)] + pub struct Namespace {} + + impl crate::logstore::Namespace for Namespace { + fn id(&self) -> crate::logstore::namespace::Id { + 0 + } + } + impl ErrorExt for Error { fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) @@ -70,6 +79,7 @@ mod tests { impl Entry for SimpleEntry { type Error = Error; + type Namespace = Namespace; fn data(&self) -> &[u8] { &self.data @@ -96,6 +106,10 @@ mod tests { fn is_empty(&self) -> bool { self.data.is_empty() } + + fn namespace(&self) -> Self::Namespace { + Namespace {} + } } impl SimpleEntry { diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index 8464c1a5e3..be12aa43ea 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -1,3 +1,5 @@ +pub type Id = u64; + pub trait Namespace: Send + Sync + Clone + std::fmt::Debug { - fn name(&self) -> &str; + fn id(&self) -> Id; }