fix: logstore read supports namespace isolation (#163)

* logstore read supports namespace isolation

* add namespace isolation test

* update

* revert unexpected changes

* Update log.rs

remove unnecessary info log

* reformat code
This commit is contained in:
Lei, Huang
2022-08-15 11:43:48 +08:00
committed by GitHub
parent 28b7a7cf35
commit b695881c6a
11 changed files with 196 additions and 91 deletions

View File

@@ -7,20 +7,23 @@ use futures::Stream;
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::{Encode, Entry, Epoch, Id, Offset};
use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use crate::error::{CorruptedSnafu, DecodeAgainSnafu, DecodeSnafu, EncodeSnafu, Error};
use crate::fs::crc;
use crate::fs::namespace::LocalNamespace;
// length + offset + epoch + crc
const ENTRY_MIN_LEN: usize = 4 + 8 + 8 + 4;
// length + offset + epoch
const HEADER_LENGTH: usize = 4 + 8 + 8;
// length + offset + namespace id + epoch + crc
const ENTRY_MIN_LEN: usize = HEADER_LENGTH + 4;
// length + offset + namespace id + epoch
const HEADER_LENGTH: usize = 4 + 8 + 8 + 8;
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
pub data: Vec<u8>,
pub offset: Offset,
pub id: Id,
pub namespace_id: NamespaceId,
pub epoch: Epoch,
}
@@ -36,15 +39,18 @@ impl Encode for EntryImpl {
/// Entry binary format (Little endian):
///
/// +--------+--------+--------+--------+--------+
// |entry id| epoch | length | data | CRC |
// +--------+--------+--------+--------+--------+
// | 8 bytes| 8 bytes| 4 bytes|<length>| 4 bytes|
// +--------+--------+--------+--------+--------+
// ```text
// +--------+--------------+-------+--------+--------+--------+
// |entry id| namespace id | epoch | length | data | CRC |
// +--------+--------------+-------+--------+--------+--------+
// | 8 bytes| 8 bytes |8 bytes| 4 bytes|<length>| 4 bytes|
// +--------+--------------+-------+--------+--------+--------+
// ```
///
fn encode_to<T: BufferMut>(&self, buf: &mut T) -> Result<usize, Self::Error> {
let data_length = self.data.len();
buf.write_u64_le(self.id).context(EncodeSnafu)?;
buf.write_u64_le(self.namespace_id).context(EncodeSnafu)?;
buf.write_u64_le(self.epoch).context(EncodeSnafu)?;
buf.write_u32_le(data_length as u32).context(EncodeSnafu)?;
buf.write_from_slice(self.data.as_slice())
@@ -76,6 +82,9 @@ impl Encode for EntryImpl {
let id = header.read_u64_le().unwrap(); // unwrap here is safe because header bytes must be present
digest.update(&id.to_le_bytes());
let namespace_id = header.read_u64_le().unwrap();
digest.update(&namespace_id.to_le_bytes());
let epoch = header.read_u64_le().unwrap();
digest.update(&epoch.to_le_bytes());
@@ -115,6 +124,7 @@ impl Encode for EntryImpl {
data,
epoch,
offset: 0,
namespace_id,
})
}
@@ -124,18 +134,20 @@ impl Encode for EntryImpl {
}
impl EntryImpl {
pub(crate) fn new(data: impl AsRef<[u8]>, id: Id) -> EntryImpl {
pub(crate) fn new(data: impl AsRef<[u8]>, id: Id, namespace: LocalNamespace) -> EntryImpl {
EntryImpl {
id,
data: data.as_ref().to_vec(),
offset: 0,
epoch: 0,
namespace_id: namespace.id(),
}
}
}
impl Entry for EntryImpl {
type Error = Error;
type Namespace = LocalNamespace;
fn data(&self) -> &[u8] {
&self.data
@@ -164,6 +176,10 @@ impl Entry for EntryImpl {
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn namespace(&self) -> Self::Namespace {
LocalNamespace::new(self.namespace_id)
}
}
impl TryFrom<Bytes> for EntryImpl {
@@ -220,7 +236,7 @@ mod tests {
#[test]
pub fn test_entry_deser() {
let data = "hello, world";
let mut entry = EntryImpl::new(data.as_bytes(), 8);
let mut entry = EntryImpl::new(data.as_bytes(), 8, LocalNamespace::new(42));
entry.epoch = 9;
let mut buf = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buf).unwrap();
@@ -232,7 +248,7 @@ mod tests {
#[test]
pub fn test_rewrite_entry_id() {
let data = "hello, world";
let entry = EntryImpl::new(data.as_bytes(), 123);
let entry = EntryImpl::new(data.as_bytes(), 123, LocalNamespace::new(42));
let mut buffer = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buffer).unwrap();
assert_eq!(123, entry.id());
@@ -248,7 +264,7 @@ mod tests {
}
fn prepare_entry_bytes(data: &str, id: Id) -> Bytes {
let mut entry = EntryImpl::new(data.as_bytes(), id);
let mut entry = EntryImpl::new(data.as_bytes(), id, LocalNamespace::new(42));
entry.set_id(123);
entry.set_offset(456);
let mut buffer = BytesMut::with_capacity(entry.encoded_size());
@@ -299,7 +315,7 @@ mod tests {
let data = "hello, world";
let bytes = prepare_entry_bytes(data, 42);
assert_eq!(
hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F")
hex::decode("7B000000000000002A0000000000000000000000000000000C00000068656C6C6F2C20776F726C64E8EE2E57")
.unwrap()
.as_slice(),
&bytes[..]
@@ -324,7 +340,7 @@ mod tests {
let data = "hello, world";
let bytes = prepare_entry_bytes(data, 42);
assert_eq!(
hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F")
hex::decode("7b000000000000002a0000000000000000000000000000000c00000068656c6c6f2c20776f726c64e8ee2e57")
.unwrap()
.as_slice(),
&bytes[..]

View File

@@ -653,25 +653,33 @@ mod tests {
assert_eq!(
10,
file.append(&mut EntryImpl::new("test1".as_bytes(), 10))
.await
.expect("Failed to append entry 1")
.entry_id
file.append(&mut EntryImpl::new(
"test1".as_bytes(),
10,
LocalNamespace::new(42)
))
.await
.expect("Failed to append entry 1")
.entry_id
);
assert_eq!(
11,
file.append(&mut EntryImpl::new("test-2".as_bytes(), 11))
.await
.expect("Failed to append entry 2")
.entry_id
file.append(&mut EntryImpl::new(
"test-2".as_bytes(),
11,
LocalNamespace::new(42)
))
.await
.expect("Failed to append entry 2")
.entry_id
);
let mut log_file = std::fs::File::open(path.clone()).expect("Test log file does not exist");
let metadata = log_file.metadata().expect("Failed to read file metadata");
info!("Log file metadata: {:?}", metadata);
assert_eq!(59, metadata.len()); // 24+5+24+6
assert_eq!(75, metadata.len()); // 32+5+32+6
let mut content = vec![0; metadata.len() as usize];
log_file
.read_exact(&mut content)
@@ -684,7 +692,7 @@ mod tests {
metadata.len()
);
let ns = LocalNamespace::default();
let ns = LocalNamespace::new(42);
let mut stream = file.create_stream(&ns, 0);
let mut data = vec![];

View File

@@ -8,8 +8,9 @@ use common_telemetry::{error, info, warn};
use futures::pin_mut;
use futures::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{Encode, Id};
use store_api::logstore::entry::{Encode, Entry, Id};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::LogStore;
use tokio::sync::RwLock;
@@ -178,11 +179,7 @@ impl LogStore for LocalFileLogStore {
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn append(
&self,
_ns: &Self::Namespace,
mut entry: Self::Entry,
) -> Result<Self::AppendResponse> {
async fn append(&self, mut entry: Self::Entry) -> Result<Self::AppendResponse> {
// TODO(hl): configurable retry times
for _ in 0..3 {
let current_active_file = self.active_file();
@@ -226,6 +223,7 @@ impl LogStore for LocalFileLogStore {
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
let files = self.files.read().await;
let ns = ns.clone();
let s = stream!({
for (start_id, file) in files.iter() {
// TODO(hl): Use index to lookup file
@@ -233,7 +231,15 @@ impl LogStore for LocalFileLogStore {
let s = file.create_stream(&ns, *start_id);
pin_mut!(s);
while let Some(entries) = s.next().await {
yield entries;
match entries {
Ok(entries) => {
yield Ok(entries
.into_iter()
.filter(|e| e.namespace().id() == ns.id())
.collect::<Vec<_>>())
}
Err(e) => yield Err(e),
}
}
}
}
@@ -254,12 +260,12 @@ impl LogStore for LocalFileLogStore {
todo!()
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id) -> Self::Entry {
EntryImpl::new(data, id)
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, namespace: Self::Namespace) -> Self::Entry {
EntryImpl::new(data, id, namespace)
}
fn namespace(&self, name: &str) -> Self::Namespace {
LocalNamespace::new(name)
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
LocalNamespace::new(id)
}
}
@@ -283,11 +289,14 @@ mod tests {
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
assert_eq!(
0,
logstore
.append(&ns, EntryImpl::new(generate_data(100), 0),)
.append(EntryImpl::new(
generate_data(96),
0,
LocalNamespace::new(42)
),)
.await
.unwrap()
.entry_id
@@ -296,7 +305,11 @@ mod tests {
assert_eq!(
1,
logstore
.append(&ns, EntryImpl::new(generate_data(100), 1))
.append(EntryImpl::new(
generate_data(96),
1,
LocalNamespace::new(42)
))
.await
.unwrap()
.entry_id
@@ -326,9 +339,13 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
let ns = LocalNamespace::new(42);
let id = logstore
.append(&ns, EntryImpl::new(generate_data(100), 0))
.append(EntryImpl::new(
generate_data(96),
0,
LocalNamespace::new(42),
))
.await
.unwrap()
.entry_id;
@@ -340,5 +357,59 @@ mod tests {
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 0);
assert_eq!(42, entries[0].namespace_id);
}
#[tokio::test]
pub async fn test_namespace() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 1024 * 1024,
log_file_dir: dir.path().to_str().unwrap().to_string(),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert_eq!(
0,
logstore
.append(EntryImpl::new(
generate_data(96),
0,
LocalNamespace::new(42),
))
.await
.unwrap()
.entry_id
);
assert_eq!(
1,
logstore
.append(EntryImpl::new(
generate_data(96),
1,
LocalNamespace::new(43),
))
.await
.unwrap()
.entry_id
);
let stream = logstore.read(&LocalNamespace::new(42), 0).await.unwrap();
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 0);
assert_eq!(42, entries[0].namespace_id);
let stream = logstore.read(&LocalNamespace::new(43), 0).await.unwrap();
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 1);
assert_eq!(43, entries[0].namespace_id);
}
}

View File

@@ -1,34 +1,24 @@
use std::sync::Arc;
use store_api::logstore::namespace::Namespace;
use store_api::logstore::namespace::{Id, Namespace};
#[derive(Clone, Debug)]
pub struct LocalNamespace {
inner: Arc<LocalNamespaceInner>,
pub(crate) id: Id,
}
impl Default for LocalNamespace {
fn default() -> Self {
LocalNamespace::new("")
LocalNamespace::new(0)
}
}
#[derive(Debug)]
struct LocalNamespaceInner {
name: String,
}
impl LocalNamespace {
pub(crate) fn new(name: &str) -> Self {
let inner = Arc::new(LocalNamespaceInner {
name: name.to_string(),
});
Self { inner }
pub(crate) fn new(id: Id) -> Self {
Self { id }
}
}
impl Namespace for LocalNamespace {
fn name(&self) -> &str {
self.inner.name.as_str()
fn id(&self) -> Id {
self.id
}
}

View File

@@ -1,3 +1,4 @@
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::{entry::Id, LogStore};
use crate::error::{Error, Result};
@@ -15,11 +16,7 @@ impl LogStore for NoopLogStore {
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn append(
&self,
_ns: &Self::Namespace,
mut _e: Self::Entry,
) -> Result<Self::AppendResponse> {
async fn append(&self, mut _e: Self::Entry) -> Result<Self::AppendResponse> {
Ok(AppendResponseImpl {
entry_id: 0,
offset: 0,
@@ -51,11 +48,11 @@ impl LogStore for NoopLogStore {
todo!()
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id) -> Self::Entry {
EntryImpl::new(data, id)
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
EntryImpl::new(data, id, ns)
}
fn namespace(&self, name: &str) -> Self::Namespace {
LocalNamespace::new(name)
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
LocalNamespace::new(id)
}
}

View File

@@ -120,7 +120,7 @@ impl<S: LogStore> RegionImpl<S> {
let id = metadata.id();
let name = metadata.name().to_string();
let version_control = VersionControl::with_version(version);
let wal = Wal::new(name.clone(), store_config.log_store);
let wal = Wal::new(id, store_config.log_store);
let inner = Arc::new(RegionInner {
shared: Arc::new(SharedData {
@@ -160,7 +160,7 @@ impl<S: LogStore> RegionImpl<S> {
let metadata = version.metadata().clone();
let version_control = Arc::new(VersionControl::with_version(version));
let wal = Wal::new(name.clone(), store_config.log_store);
let wal = Wal::new(metadata.id(), store_config.log_store);
let shared = Arc::new(SharedData {
id: metadata.id(),
name,

View File

@@ -5,8 +5,9 @@ use common_error::prelude::BoxedError;
use futures::{stream, Stream, TryStreamExt};
use prost::Message;
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
use store_api::{
logstore::{entry::Entry, namespace::Namespace, AppendResponse, LogStore},
logstore::{entry::Entry, AppendResponse, LogStore},
storage::SequenceNumber,
};
@@ -25,6 +26,7 @@ use crate::{
#[derive(Debug)]
pub struct Wal<S: LogStore> {
name: String,
namespace: S::Namespace,
store: Arc<S>,
}
@@ -37,6 +39,7 @@ pub type WriteBatchStream<'a> = Pin<
impl<S: LogStore> Clone for Wal<S> {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
namespace: self.namespace.clone(),
store: self.store.clone(),
}
@@ -44,16 +47,18 @@ impl<S: LogStore> Clone for Wal<S> {
}
impl<S: LogStore> Wal<S> {
pub fn new(region_name: impl Into<String>, store: Arc<S>) -> Self {
let region_name = region_name.into();
let namespace = store.namespace(&region_name);
Self { namespace, store }
pub fn new(region_id: RegionId, store: Arc<S>) -> Self {
let namespace = store.namespace(region_id);
Self {
name: region_id.to_string(),
namespace,
store,
}
}
#[inline]
pub fn name(&self) -> &str {
self.namespace.name()
&self.name
}
}
@@ -72,11 +77,11 @@ impl<S: LogStore> Wal<S> {
/// +---------------------+----------------------------------------------------+--------------+-------------+--------------+
/// ```
///
pub async fn write_to_wal<'a>(
pub async fn write_to_wal(
&self,
seq: SequenceNumber,
mut header: WalHeader,
payload: Payload<'a>,
payload: Payload<'_>,
) -> Result<(u64, usize)> {
header.payload_type = payload.payload_type();
@@ -134,11 +139,11 @@ impl<S: LogStore> Wal<S> {
}
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> {
let e = self.store.entry(bytes, seq);
let e = self.store.entry(bytes, seq, self.namespace.clone());
let res = self
.store
.append(&self.namespace, e)
.append(e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
@@ -259,7 +264,7 @@ mod tests {
pub async fn test_write_wal() {
let (log_store, _tmp) =
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new("test_region", Arc::new(log_store));
let wal = Wal::new(0, Arc::new(log_store));
let res = wal.write(0, b"test1").await.unwrap();
@@ -269,7 +274,7 @@ mod tests {
let res = wal.write(1, b"test2").await.unwrap();
assert_eq!(1, res.0);
assert_eq!(29, res.1);
assert_eq!(5 + 32, res.1);
}
#[tokio::test]
@@ -277,7 +282,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let (log_store, _tmp) =
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new("test_region", Arc::new(log_store));
let wal = Wal::new(0, Arc::new(log_store));
let header = WalHeader::with_last_manifest_version(111);
let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?;

View File

@@ -19,11 +19,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
type AppendResponse: AppendResponse;
/// Append an `Entry` to WAL with given namespace
async fn append(
&self,
ns: &Self::Namespace,
mut e: Self::Entry,
) -> Result<Self::AppendResponse, Self::Error>;
async fn append(&self, mut e: Self::Entry) -> Result<Self::AppendResponse, Self::Error>;
/// Append a batch of entries atomically and return the offset of first entry.
async fn append_batch(
@@ -50,11 +46,11 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;
/// Create an entry of the associate Entry type
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id) -> Self::Entry;
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry;
/// Create a namespace of the associate Namespace type
// TODO(sunng87): confusion with `create_namespace`
fn namespace(&self, name: &str) -> Self::Namespace;
fn namespace(&self, id: namespace::Id) -> Self::Namespace;
}
pub trait AppendResponse: Send + Sync {

View File

@@ -1,6 +1,8 @@
use common_base::buffer::{Buffer, BufferMut};
use common_error::ext::ErrorExt;
use crate::logstore::namespace::Namespace;
pub type Offset = usize;
pub type Epoch = u64;
pub type Id = u64;
@@ -8,6 +10,8 @@ pub type Id = u64;
/// Entry is the minimal data storage unit in `LogStore`.
pub trait Entry: Encode + Send + Sync {
type Error: ErrorExt + Send + Sync;
type Namespace: Namespace;
/// Return contained data of entry.
fn data(&self) -> &[u8];
@@ -25,6 +29,8 @@ pub trait Entry: Encode + Send + Sync {
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn namespace(&self) -> Self::Namespace;
}
pub trait Encode: Sized {

View File

@@ -41,6 +41,15 @@ mod tests {
#[snafu(visibility(pub))]
pub struct Error {}
#[derive(Debug, Clone)]
pub struct Namespace {}
impl crate::logstore::Namespace for Namespace {
fn id(&self) -> crate::logstore::namespace::Id {
0
}
}
impl ErrorExt for Error {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
@@ -70,6 +79,7 @@ mod tests {
impl Entry for SimpleEntry {
type Error = Error;
type Namespace = Namespace;
fn data(&self) -> &[u8] {
&self.data
@@ -96,6 +106,10 @@ mod tests {
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn namespace(&self) -> Self::Namespace {
Namespace {}
}
}
impl SimpleEntry {

View File

@@ -1,3 +1,5 @@
pub type Id = u64;
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug {
fn name(&self) -> &str;
fn id(&self) -> Id;
}