Compare commits

...

5 Commits

Author SHA1 Message Date
Lei, HUANG
a640872cda fix: parquet native row group pruning support 2023-01-07 21:34:08 +08:00
Lei, HUANG
7e3c59fb51 fix: remove start from LogStore; fix error message (#837)
(cherry picked from commit 627d444723)
2023-01-06 15:20:04 +08:00
Lei, HUANG
7bbc679c76 fix: revert script dependenciex 2023-01-06 15:15:41 +08:00
Lei, HUANG
0b3a2cbcda fix: revert cargo workspace dependencies 2023-01-06 15:10:04 +08:00
Lei, HUANG
53ee85cdad feat: use raft-engine crate to reimplement logstore (#799)
(cherry picked from commit 8f5ecefc90)
2023-01-06 15:05:55 +08:00
46 changed files with 1213 additions and 3021 deletions

161
Cargo.lock generated
View File

@@ -2488,6 +2488,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de853764b47027c2e862a995c34978ffa63c1501f2e15f987ba11bd4f9bba193"
[[package]]
name = "fail"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe5e43d0f78a42ad591453aedb1d7ae631ce7ee445c7643691055a9ed8d3b01c"
dependencies = [
"log",
"once_cell",
"rand 0.8.5",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@@ -2698,6 +2709,16 @@ dependencies = [
"syn",
]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@@ -3165,6 +3186,12 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "if_chain"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed"
[[package]]
name = "indexmap"
version = "1.9.2"
@@ -3532,6 +3559,9 @@ dependencies = [
"futures",
"futures-util",
"hex",
"protobuf",
"protobuf-build",
"raft-engine",
"rand 0.8.5",
"snafu",
"store-api",
@@ -4133,6 +4163,20 @@ dependencies = [
"memoffset 0.6.5",
]
[[package]]
name = "nix"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
dependencies = [
"autocfg",
"bitflags",
"cfg-if 1.0.0",
"libc",
"memoffset 0.6.5",
"pin-utils",
]
[[package]]
name = "nom"
version = "7.1.1"
@@ -4189,6 +4233,17 @@ dependencies = [
"serde",
]
[[package]]
name = "num-derive"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -4313,9 +4368,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.16.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
[[package]]
name = "oorandom"
@@ -5037,6 +5092,33 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
dependencies = [
"cfg-if 1.0.0",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror",
]
[[package]]
name = "prometheus-static-metric"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8f30cdb09c39930b8fa5e0f23cbb895ab3f766b187403a0ba0956fc1ef4f0e5"
dependencies = [
"lazy_static",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "promql"
version = "0.1.0"
@@ -5166,6 +5248,36 @@ dependencies = [
"prost 0.11.3",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
dependencies = [
"bytes",
]
[[package]]
name = "protobuf-build"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb3c02f54ecaf12572c1a60dbdb36b1f8f713a16105881143f2be84cca5bbe3"
dependencies = [
"bitflags",
"protobuf",
"protobuf-codegen",
"regex",
]
[[package]]
name = "protobuf-codegen"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6"
dependencies = [
"protobuf",
]
[[package]]
name = "ptr_meta"
version = "0.1.4"
@@ -5299,6 +5411,39 @@ dependencies = [
"nibble_vec",
]
[[package]]
name = "raft-engine"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b66e735395b7ff12f3ebbb4794006aecb365c4c9a82141279b58b227ac3a8b"
dependencies = [
"byteorder",
"crc32fast",
"crossbeam",
"fail",
"fs2",
"hashbrown 0.12.3",
"hex",
"if_chain",
"lazy_static",
"libc",
"log",
"lz4-sys",
"nix 0.25.1",
"num-derive",
"num-traits",
"parking_lot",
"prometheus",
"prometheus-static-metric",
"protobuf",
"rayon",
"scopeguard",
"serde",
"serde_repr",
"strum",
"thiserror",
]
[[package]]
name = "rand"
version = "0.4.6"
@@ -6230,6 +6375,7 @@ dependencies = [
"snafu",
"sql",
"storage",
"store-api",
"table",
"tempdir",
"tokio",
@@ -6369,6 +6515,17 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_repr"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a5ec9fa74a20ebbe5d9ac23dac1fc96ba0ecfe9f50f2843b52e537b10fbcb4e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"

View File

@@ -367,7 +367,7 @@ pub struct TableEntryValue {
#[cfg(test)]
mod tests {
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::ObjectStore;

View File

@@ -1,10 +1,10 @@
// Copyright 2022 Greptime Team
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -144,12 +144,6 @@ pub enum Error {
source: log_store::error::Error,
},
#[snafu(display("Failed to star log store gc task, source: {}", source))]
StartLogStore {
#[snafu(backtrace)]
source: log_store::error::Error,
},
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
@@ -367,7 +361,6 @@ impl ErrorExt for Error {
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::StartLogStore { source, .. } => source.status_code(),
}
}

View File

@@ -21,8 +21,8 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
@@ -36,13 +36,12 @@ use servers::Mode;
use snafu::prelude::*;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, Result, StartLogStoreSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -52,7 +51,7 @@ mod grpc;
mod script;
mod sql;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
@@ -62,7 +61,6 @@ pub struct Instance {
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
pub(crate) logstore: Arc<LocalFileLogStore>,
}
pub type InstanceRef = Arc<Instance>;
@@ -70,7 +68,7 @@ pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let meta_client = match opts.mode {
Mode::Standalone => None,
@@ -160,7 +158,6 @@ impl Instance {
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}
@@ -169,7 +166,6 @@ impl Instance {
.start()
.await
.context(NewCatalogSnafu)?;
self.logstore.start().await.context(StartLogStoreSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
@@ -275,9 +271,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
Ok(meta_client)
}
pub(crate) async fn create_local_file_log_store(
path: impl AsRef<str>,
) -> Result<LocalFileLogStore> {
pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngineLogStore> {
let path = path.as_ref();
// create WAL directory
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
@@ -289,9 +283,8 @@ pub(crate) async fn create_local_file_log_store(
..Default::default()
};
let log_store = LocalFileLogStore::open(&log_config)
let logstore = RaftEngineLogStore::try_new(log_config)
.await
.context(error::OpenLogStoreSnafu)?;
Ok(log_store)
.context(OpenLogStoreSnafu)?;
Ok(logstore)
}

View File

@@ -29,7 +29,7 @@ use table::table::TableIdProvider;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance};
use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance};
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;
@@ -41,7 +41,7 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
@@ -83,7 +83,6 @@ impl Instance {
script_executor,
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
heartbeat_task: Some(heartbeat_task),
logstore,
})
}
}

View File

@@ -125,7 +125,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;

View File

@@ -4,10 +4,15 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[build-dependencies]
protobuf-build = { version = "0.14", default-features = false, features = [
"protobuf-codec",
] }
[dependencies]
arc-swap = "1.5"
async-stream = "0.3"
async-trait = "0.1"
async-stream="0.3"
async-trait="0.1"
base64 = "0.13"
byteorder = "1.4"
bytes = "1.1"
@@ -16,9 +21,11 @@ common-error = { path = "../common/error" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
crc = "3.0"
futures = "0.3"
futures-util = "0.3"
futures="0.3"
hex = "0.4"
protobuf = { version = "2", features = ["bytes"] }
raft-engine = "0.3"
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tempdir = "0.3"

View File

@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(all(unix, not(miri)))]
mod unix;
// todo(hl): maybe support windows seek_write/seek_read
#[cfg(any(not(unix), miri))]
mod fallback;
use protobuf_build::Builder;
#[cfg(any(all(not(unix), not(windows)), miri))]
pub use fallback::{pread_exact, pwrite_all};
#[cfg(all(unix, not(miri)))]
pub use unix::{pread_exact, pwrite_all};
fn main() {
let base = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
Builder::new()
.search_dir_for_protos(&format!("{base}/proto"))
.includes(&[format!("{base}/include"), format!("{base}/proto")])
.include_google_protos()
.generate()
}

View File

@@ -0,0 +1,17 @@
syntax = "proto3";
package logstore;
message EntryImpl {
uint64 id = 1;
uint64 namespace_id = 2;
bytes data = 3;
}
message LogStoreState {
uint64 last_index = 1;
}
message NamespaceImpl {
uint64 id = 1;
}

View File

@@ -20,6 +20,9 @@ pub struct LogConfig {
pub max_log_file_size: usize,
pub log_file_dir: String,
pub gc_interval: Duration,
pub purge_threshold: usize,
pub read_batch_size: usize,
pub sync_write: bool,
}
impl Default for LogConfig {
@@ -31,6 +34,9 @@ impl Default for LogConfig {
max_log_file_size: 1024 * 1024 * 1024,
log_file_dir: "/tmp/greptimedb".to_string(),
gc_interval: Duration::from_secs(10 * 60),
purge_threshold: 1024 * 1024 * 1024 * 50,
read_batch_size: 128,
sync_write: false,
}
}
}
@@ -49,5 +55,8 @@ mod tests {
assert_eq!(1024 * 1024 * 1024, default.max_log_file_size);
assert_eq!(128, default.append_buffer_size);
assert_eq!(Duration::from_secs(600), default.gc_interval);
assert_eq!(1024 * 1024 * 1024 * 50, default.purge_threshold);
assert_eq!(128, default.read_batch_size);
assert!(!default.sync_write);
}
}

View File

@@ -14,7 +14,6 @@
use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::{ErrorExt, Snafu};
use snafu::{Backtrace, ErrorCompat};
use tokio::task::JoinError;
@@ -22,83 +21,46 @@ use tokio::task::JoinError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to encode entry, source: {}", source))]
Encode { source: common_base::buffer::Error },
#[snafu(display("Failed to decode entry, remain size: {}", size))]
Decode { size: usize, backtrace: Backtrace },
#[snafu(display("No enough data to decode, try again"))]
DecodeAgain,
#[snafu(display("Failed to append entry, source: {}", source))]
Append {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to wait for log file write complete, source: {}", source))]
Write { source: tokio::task::JoinError },
#[snafu(display("Entry corrupted, msg: {}", msg))]
Corrupted { msg: String, backtrace: Backtrace },
#[snafu(display("IO error, source: {}", source))]
Io {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to create path {}, source: {}", path, source))]
CreateDir {
path: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to read path {}, source: {}", path, source))]
ReadPath {
path: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to open log file {}, source: {}", file_name, source))]
OpenLog {
file_name: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("File name {} illegal", file_name))]
FileNameIllegal {
file_name: String,
backtrace: Backtrace,
},
#[snafu(display("Internal error, msg: {}", msg))]
Internal { msg: String, backtrace: Backtrace },
#[snafu(display("End of LogFile"))]
Eof,
#[snafu(display("File duplicate on start: {}", msg))]
DuplicateFile { msg: String },
#[snafu(display("Log file suffix is illegal: {}", suffix))]
SuffixIllegal { suffix: String },
#[snafu(display("Failed while waiting for write to finish, source: {}", source))]
WaitWrite { source: tokio::task::JoinError },
#[snafu(display("Invalid logstore status, msg: {}", msg))]
InvalidState { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to wait for gc task to stop, source: {}", source))]
WaitGcTaskStop {
source: JoinError,
backtrace: Backtrace,
},
#[snafu(display("Failed to add entry to LogBatch, source: {}", source))]
AddEntryLogBatch {
source: raft_engine::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to perform raft-engine operation, source: {}", source))]
RaftEngine {
source: raft_engine::Error,
backtrace: Backtrace,
},
#[snafu(display("Log store not started yet"))]
IllegalState { backtrace: Backtrace },
#[snafu(display("Namespace is illegal: {}", ns))]
IllegalNamespace { ns: u64, backtrace: Backtrace },
#[snafu(display(
"Failed to fetch entries from namespace: {}, start: {}, end: {}, max size: {}, source: {}",
start,
end,
max_size,
source,
ns
))]
FetchEntry {
ns: u64,
start: u64,
end: u64,
max_size: usize,
source: raft_engine::Error,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {

View File

@@ -1,46 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::logstore::entry::{Id, Offset};
use store_api::logstore::AppendResponse;
mod chunk;
pub mod config;
mod crc;
mod entry;
mod file;
mod file_name;
mod index;
mod io;
pub mod log;
mod namespace;
pub mod noop;
#[derive(Debug, PartialEq, Eq)]
pub struct AppendResponseImpl {
entry_id: Id,
offset: Offset,
}
impl AppendResponse for AppendResponseImpl {
#[inline]
fn entry_id(&self) -> Id {
self.entry_id
}
#[inline]
fn offset(&self) -> Offset {
self.offset
}
}

View File

@@ -1,233 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::LinkedList;
use common_base::buffer::{Buffer, UnderflowSnafu};
use snafu::ensure;
pub const DEFAULT_CHUNK_SIZE: usize = 4096;
#[derive(Debug)]
pub(crate) struct Chunk {
// internal data
pub data: Box<[u8]>,
// read offset
pub read_offset: usize,
// write offset
pub write_offset: usize,
}
impl Default for Chunk {
fn default() -> Self {
let data = vec![0u8; DEFAULT_CHUNK_SIZE].into_boxed_slice();
Self {
write_offset: 0,
read_offset: 0,
data,
}
}
}
impl Chunk {
#[cfg(test)]
pub fn copy_from_slice(s: &[u8]) -> Self {
let src_len = s.len();
// before [box syntax](https://github.com/rust-lang/rust/issues/49733) becomes stable,
// we can only initialize an array on heap like this.
let mut data = vec![0u8; src_len].into_boxed_slice();
data[0..src_len].copy_from_slice(s);
Self {
read_offset: 0,
write_offset: src_len,
data,
}
}
pub fn new(data: Box<[u8]>, write: usize) -> Self {
Self {
write_offset: write,
read_offset: 0,
data,
}
}
pub fn len(&self) -> usize {
self.write_offset - self.read_offset
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// allows short read.
/// Calling read **will not** advance read cursor, must call `advance` manually.
pub fn read(&self, dst: &mut [u8]) -> usize {
let size = self.len().min(dst.len());
let range = self.read_offset..(self.read_offset + size);
dst[0..size].copy_from_slice(&self.data[range]);
size
}
pub fn advance(&mut self, by: usize) -> usize {
assert!(
self.write_offset >= self.read_offset,
"Illegal chunk state, read: {}, write: {}",
self.read_offset,
self.write_offset
);
let step = by.min(self.write_offset - self.read_offset);
self.read_offset += step;
step
}
}
pub struct ChunkList {
chunks: LinkedList<Chunk>,
}
impl ChunkList {
pub fn new() -> Self {
Self {
chunks: LinkedList::new(),
}
}
pub(crate) fn push(&mut self, chunk: Chunk) {
self.chunks.push_back(chunk);
}
}
impl Buffer for ChunkList {
fn remaining_size(&self) -> usize {
self.chunks.iter().map(|c| c.len()).sum()
}
fn peek_to_slice(&self, mut dst: &mut [u8]) -> common_base::buffer::Result<()> {
ensure!(self.remaining_size() >= dst.len(), UnderflowSnafu);
for c in &self.chunks {
if dst.is_empty() {
break;
}
let read = c.read(dst);
dst = &mut dst[read..];
}
ensure!(dst.is_empty(), UnderflowSnafu);
Ok(())
}
fn read_to_slice(&mut self, dst: &mut [u8]) -> common_base::buffer::Result<()> {
self.peek_to_slice(dst)?;
self.advance_by(dst.len());
Ok(())
}
fn advance_by(&mut self, by: usize) {
let mut left = by;
while left > 0 {
if let Some(c) = self.chunks.front_mut() {
let actual = c.advance(left);
if c.is_empty() {
self.chunks.pop_front(); // remove first chunk
}
left -= actual;
} else {
panic!("Advance step [{by}] exceeds max readable bytes");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_chunk() {
let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes());
assert_eq!(5, chunk.write_offset);
assert_eq!(0, chunk.read_offset);
assert_eq!(5, chunk.len());
let mut dst = [0u8; 3];
assert_eq!(3, chunk.read(&mut dst));
assert_eq!(5, chunk.write_offset);
assert_eq!(0, chunk.read_offset);
assert_eq!(5, chunk.len());
}
#[test]
pub fn test_chunk_short_read() {
let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes());
let mut dst = vec![0u8; 8];
let read = chunk.read(&mut dst);
assert_eq!(5, read);
assert_eq!(vec![b'h', b'e', b'l', b'l', b'o', 0x0, 0x0, 0x0], dst);
}
#[test]
pub fn test_chunk_advance() {
let mut chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes());
let mut dst = vec![0u8; 8];
assert_eq!(5, chunk.read(&mut dst));
assert_eq!(0, chunk.read_offset);
assert_eq!(5, chunk.write_offset);
assert_eq!(1, chunk.advance(1));
assert_eq!(1, chunk.read_offset);
assert_eq!(5, chunk.write_offset);
assert_eq!(4, chunk.advance(5));
assert_eq!(5, chunk.read_offset);
assert_eq!(5, chunk.write_offset);
}
#[test]
pub fn test_composite_chunk_read() {
let mut chunks = ChunkList {
chunks: LinkedList::new(),
};
chunks.push(Chunk::copy_from_slice("abcd".as_bytes()));
chunks.push(Chunk::copy_from_slice("12345".as_bytes()));
assert_eq!(9, chunks.remaining_size());
let mut dst = [0u8; 2];
chunks.peek_to_slice(&mut dst).unwrap();
chunks.advance_by(2);
assert_eq!([b'a', b'b'], dst);
assert_eq!(2, chunks.chunks.len());
let mut dst = [0u8; 3];
chunks.peek_to_slice(&mut dst).unwrap();
chunks.advance_by(3);
assert_eq!([b'c', b'd', b'1'], dst);
assert_eq!(4, chunks.remaining_size());
assert_eq!(1, chunks.chunks.len());
let mut dst = [0u8; 4];
chunks.peek_to_slice(&mut dst).unwrap();
chunks.advance_by(4);
assert_eq!([b'2', b'3', b'4', b'5'], dst);
assert_eq!(0, chunks.remaining_size());
assert_eq!(0, chunks.chunks.len());
chunks.push(Chunk::copy_from_slice("uvwxyz".as_bytes()));
assert_eq!(6, chunks.remaining_size());
assert_eq!(1, chunks.chunks.len());
}
}

View File

@@ -1,17 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crc::{Crc, CRC_32_ISCSI};
pub const CRC_ALGO: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);

View File

@@ -1,394 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
use common_base::buffer::{Buffer, BufferMut};
use futures::Stream;
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::{Encode, Entry, Epoch, Id, Offset};
use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use crate::error::{CorruptedSnafu, DecodeAgainSnafu, DecodeSnafu, EncodeSnafu, Error};
use crate::fs::crc;
use crate::fs::namespace::LocalNamespace;
// length + offset + namespace id + epoch + crc
const ENTRY_MIN_LEN: usize = HEADER_LENGTH + 4;
// length + offset + namespace id + epoch
const HEADER_LENGTH: usize = 4 + 8 + 8 + 8;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct EntryImpl {
pub data: Vec<u8>,
pub offset: Offset,
pub id: Id,
pub namespace_id: NamespaceId,
pub epoch: Epoch,
}
impl EntryImpl {
#[cfg(test)]
fn set_offset(&mut self, offset: Offset) {
self.offset = offset;
}
}
impl Encode for EntryImpl {
type Error = Error;
/// Entry binary format (Little endian):
///
// ```text
// +--------+--------------+-------+--------+--------+--------+
// |entry id| namespace id | epoch | length | data | CRC |
// +--------+--------------+-------+--------+--------+--------+
// | 8 bytes| 8 bytes |8 bytes| 4 bytes|<length>| 4 bytes|
// +--------+--------------+-------+--------+--------+--------+
// ```
///
fn encode_to<T: BufferMut>(&self, buf: &mut T) -> Result<usize, Self::Error> {
let data_length = self.data.len();
buf.write_u64_le(self.id).context(EncodeSnafu)?;
buf.write_u64_le(self.namespace_id).context(EncodeSnafu)?;
buf.write_u64_le(self.epoch).context(EncodeSnafu)?;
buf.write_u32_le(data_length as u32).context(EncodeSnafu)?;
buf.write_from_slice(self.data.as_slice())
.context(EncodeSnafu)?;
let checksum = crc::CRC_ALGO.checksum(buf.as_slice());
buf.write_u32_le(checksum).context(EncodeSnafu)?;
Ok(data_length + ENTRY_MIN_LEN)
}
fn decode<T: Buffer>(buf: &mut T) -> Result<Self, Self::Error> {
ensure!(buf.remaining_size() >= HEADER_LENGTH, DecodeAgainSnafu);
macro_rules! map_err {
($stmt: expr, $var: ident) => {
$stmt.map_err(|_| {
DecodeSnafu {
size: $var.remaining_size(),
}
.build()
})
};
}
let mut digest = crc::CRC_ALGO.digest();
let mut header = [0u8; HEADER_LENGTH];
buf.peek_to_slice(&mut header).unwrap();
let mut header = &header[..];
let id = header.read_u64_le().unwrap(); // unwrap here is safe because header bytes must be present
digest.update(&id.to_le_bytes());
let namespace_id = header.read_u64_le().unwrap();
digest.update(&namespace_id.to_le_bytes());
let epoch = header.read_u64_le().unwrap();
digest.update(&epoch.to_le_bytes());
let data_len = header.read_u32_le().unwrap();
digest.update(&data_len.to_le_bytes());
ensure!(
buf.remaining_size() >= ENTRY_MIN_LEN + data_len as usize,
DecodeAgainSnafu
);
buf.advance_by(HEADER_LENGTH);
let mut data = vec![0u8; data_len as usize];
map_err!(buf.peek_to_slice(&mut data), buf)?;
digest.update(&data);
buf.advance_by(data_len as usize);
let crc_read = map_err!(buf.peek_u32_le(), buf)?;
let crc_calc = digest.finalize();
ensure!(
crc_read == crc_calc,
CorruptedSnafu {
msg: format!(
"CRC mismatch while decoding entry, read: {}, calc: {}",
hex::encode_upper(crc_read.to_le_bytes()),
hex::encode_upper(crc_calc.to_le_bytes())
)
}
);
buf.advance_by(4);
Ok(Self {
id,
data,
epoch,
offset: 0,
namespace_id,
})
}
fn encoded_size(&self) -> usize {
self.data.len() + ENTRY_MIN_LEN
}
}
impl EntryImpl {
pub(crate) fn new(data: impl AsRef<[u8]>, id: Id, namespace: LocalNamespace) -> EntryImpl {
EntryImpl {
id,
data: data.as_ref().to_vec(),
offset: 0,
epoch: 0,
namespace_id: namespace.id(),
}
}
}
impl Entry for EntryImpl {
type Error = Error;
type Namespace = LocalNamespace;
fn data(&self) -> &[u8] {
&self.data
}
fn id(&self) -> Id {
self.id
}
fn offset(&self) -> Offset {
self.offset
}
fn set_id(&mut self, id: Id) {
self.id = id;
}
fn epoch(&self) -> Epoch {
self.epoch
}
fn len(&self) -> usize {
ENTRY_MIN_LEN + self.data.len()
}
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn namespace(&self) -> Self::Namespace {
LocalNamespace::new(self.namespace_id)
}
}
impl TryFrom<Bytes> for EntryImpl {
type Error = Error;
fn try_from(mut value: Bytes) -> Result<Self, Self::Error> {
EntryImpl::decode(&mut value)
}
}
impl From<&EntryImpl> for BytesMut {
fn from(e: &EntryImpl) -> Self {
let size = e.encoded_size();
let mut res = BytesMut::with_capacity(size);
e.encode_to(&mut res).unwrap(); // buffer is pre-allocated, so won't fail
res
}
}
pub struct StreamImpl<'a> {
pub inner: SendableEntryStream<'a, EntryImpl, Error>,
pub start_entry_id: Id,
}
impl<'a> Stream for StreamImpl<'a> {
type Item = Result<Vec<EntryImpl>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<'a> EntryStream for StreamImpl<'a> {
type Error = Error;
type Entry = EntryImpl;
fn start_id(&self) -> u64 {
self.start_entry_id
}
}
#[cfg(test)]
mod tests {
use async_stream::stream;
use byteorder::{ByteOrder, LittleEndian};
use futures::pin_mut;
use futures_util::StreamExt;
use tokio::time::Duration;
use super::*;
use crate::fs::chunk::{Chunk, ChunkList};
use crate::fs::crc::CRC_ALGO;
#[test]
pub fn test_entry_deser() {
let data = "hello, world";
let mut entry = EntryImpl::new(data.as_bytes(), 8, LocalNamespace::new(42));
entry.epoch = 9;
let mut buf = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buf).unwrap();
assert_eq!(ENTRY_MIN_LEN + data.as_bytes().len(), buf.len());
let decoded: EntryImpl = EntryImpl::decode(&mut buf.as_slice()).unwrap();
assert_eq!(entry, decoded);
}
#[test]
pub fn test_rewrite_entry_id() {
let data = "hello, world";
let entry = EntryImpl::new(data.as_bytes(), 123, LocalNamespace::new(42));
let mut buffer = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buffer).unwrap();
assert_eq!(123, entry.id());
// rewrite entry id.
LittleEndian::write_u64(&mut buffer[0..8], 333);
let len = buffer.len();
let checksum = CRC_ALGO.checksum(&buffer[0..len - 4]);
LittleEndian::write_u32(&mut buffer[len - 4..], checksum);
let entry_impl = EntryImpl::decode(&mut buffer.freeze()).expect("Failed to deserialize");
assert_eq!(333, entry_impl.id());
}
fn prepare_entry_bytes(data: &str, id: Id) -> Bytes {
let mut entry = EntryImpl::new(data.as_bytes(), id, LocalNamespace::new(42));
entry.set_id(123);
entry.set_offset(456);
let mut buffer = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buffer).unwrap();
let len = buffer.len();
let checksum = CRC_ALGO.checksum(&buffer[0..len - 4]);
LittleEndian::write_u32(&mut buffer[len - 4..], checksum);
buffer.freeze()
}
/// Test decode entry from a composite buffer.
#[test]
pub fn test_composite_buffer() {
let data_1 = "hello, world";
let bytes = prepare_entry_bytes(data_1, 0);
EntryImpl::decode(&mut bytes.clone()).unwrap();
let c1 = Chunk::copy_from_slice(&bytes);
let data_2 = "LoremIpsumDolor";
let bytes = prepare_entry_bytes(data_2, 1);
EntryImpl::decode(&mut bytes.clone()).unwrap();
let c2 = Chunk::copy_from_slice(&bytes);
let mut chunks = ChunkList::new();
chunks.push(c1);
chunks.push(c2);
assert_eq!(
ENTRY_MIN_LEN * 2 + data_2.len() + data_1.len(),
chunks.remaining_size()
);
let mut decoded = vec![];
while chunks.remaining_size() > 0 {
let entry_impl = EntryImpl::decode(&mut chunks).unwrap();
decoded.push(entry_impl.data);
}
assert_eq!(
vec![data_1.as_bytes().to_vec(), data_2.as_bytes().to_vec()],
decoded
);
}
// split an encoded entry to two different chunk and try decode from this composite chunk
#[test]
pub fn test_decode_split_data_from_composite_chunk() {
let data = "hello, world";
let bytes = prepare_entry_bytes(data, 42);
assert_eq!(
hex::decode("7B000000000000002A0000000000000000000000000000000C00000068656C6C6F2C20776F726C64E8EE2E57")
.unwrap()
.as_slice(),
&bytes[..]
);
let original = EntryImpl::decode(&mut bytes.clone()).unwrap();
let split_point = bytes.len() / 2;
let (left, right) = bytes.split_at(split_point);
let mut chunks = ChunkList::new();
chunks.push(Chunk::copy_from_slice(left));
chunks.push(Chunk::copy_from_slice(right));
assert_eq!(bytes.len(), chunks.remaining_size());
let decoded = EntryImpl::decode(&mut chunks).unwrap();
assert_eq!(original, decoded);
}
// Tests decode entry from encoded entry data as two chunks
#[tokio::test]
pub async fn test_decode_from_chunk_stream() {
// prepare entry
let data = "hello, world";
let bytes = prepare_entry_bytes(data, 42);
assert_eq!(
hex::decode("7b000000000000002a0000000000000000000000000000000c00000068656c6c6f2c20776f726c64e8ee2e57")
.unwrap()
.as_slice(),
&bytes[..]
);
let original = EntryImpl::decode(&mut bytes.clone()).unwrap();
let split_point = bytes.len() / 2;
let (left, right) = bytes.split_at(split_point);
// prepare chunk stream
let chunk_stream = stream!({
yield Chunk::copy_from_slice(left);
tokio::time::sleep(Duration::from_millis(10)).await;
yield Chunk::copy_from_slice(right);
});
pin_mut!(chunk_stream);
let mut chunks = ChunkList::new();
let mut decoded = vec![];
while let Some(c) = chunk_stream.next().await {
chunks.push(c);
match EntryImpl::decode(&mut chunks) {
Ok(e) => {
decoded.push(e);
}
Err(Error::DecodeAgain { .. }) => {
continue;
}
_ => {
panic!()
}
}
}
assert_eq!(1, decoded.len());
assert_eq!(original, decoded.into_iter().next().unwrap());
}
}

View File

@@ -1,884 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::fs::{File, OpenOptions};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use async_stream::stream;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use common_error::ext::BoxedError;
use common_telemetry::logging::{error, info};
use common_telemetry::{debug, trace};
use futures::Stream;
use futures_util::StreamExt;
use snafu::ResultExt;
use store_api::logstore::entry::{Encode, Entry, Id, Offset};
use store_api::logstore::entry_stream::EntryStream;
use store_api::logstore::namespace::Namespace;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{Receiver, Sender as MpscSender};
use tokio::sync::oneshot::Sender as OneshotSender;
use tokio::sync::{oneshot, Notify};
use tokio::task::JoinHandle;
use tokio::time;
use crate::error::Error::Eof;
use crate::error::{
AppendSnafu, Error, InternalSnafu, IoSnafu, OpenLogSnafu, Result, WaitWriteSnafu, WriteSnafu,
};
use crate::fs::chunk::{Chunk, ChunkList};
use crate::fs::config::LogConfig;
use crate::fs::crc::CRC_ALGO;
use crate::fs::entry::{EntryImpl, StreamImpl};
use crate::fs::file_name::FileName;
use crate::fs::namespace::LocalNamespace;
use crate::fs::AppendResponseImpl;
pub const CHUNK_SIZE: usize = 4096;
const LOG_WRITER_BATCH_SIZE: usize = 16;
/// Wraps File operation to get rid of `&mut self` requirements
struct FileWriter {
inner: Arc<File>,
path: String,
}
impl FileWriter {
pub fn new(file: Arc<File>, path: String) -> Self {
Self { inner: file, path }
}
pub async fn write(&self, data: Bytes, offset: u64) -> Result<()> {
let file = self.inner.clone();
let handle = common_runtime::spawn_blocking_write(move || {
crate::fs::io::pwrite_all(&file, &data, offset)
});
handle.await.context(WriteSnafu)?
}
/// Writes a batch of `AppendRequest` to file.
pub async fn write_batch(self: &Arc<Self>, batch: &Vec<AppendRequest>) -> Result<usize> {
let mut futures = Vec::with_capacity(batch.len());
let mut max_offset = 0;
for req in batch {
let offset = req.offset;
let end = req.data.len() + offset;
max_offset = max_offset.max(end);
let future = self.write(req.data.clone(), offset as u64);
futures.push(future);
}
debug!(
"Write batch, size: {}, max offset: {}",
batch.len(),
max_offset
);
futures::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
.map(|_| max_offset)
}
pub async fn flush(&self) -> Result<()> {
let file = self.inner.clone();
common_runtime::spawn_blocking_write(move || file.sync_all().context(IoSnafu))
.await
.context(WaitWriteSnafu)?
}
pub async fn destroy(&self) -> Result<()> {
tokio::fs::remove_file(&self.path).await.context(IoSnafu)?;
Ok(())
}
}
pub type LogFileRef = Arc<LogFile>;
pub struct LogFile {
// name of log file
name: FileName,
// file writer
writer: Arc<FileWriter>,
// append request channel
pending_request_tx: Option<MpscSender<AppendRequest>>,
// flush task notifier
notify: Arc<Notify>,
// flush task join handle
join_handle: Mutex<Option<JoinHandle<Result<()>>>>,
// internal state(offset, id counter...)
state: Arc<State>,
// the start entry id of current log file
start_entry_id: u64,
// max file size of current log file
max_file_size: usize,
// buffer size for append request channel. read from config on start.
append_buffer_size: usize,
}
impl Drop for LogFile {
fn drop(&mut self) {
self.state.stopped.store(true, Ordering::Relaxed);
info!("Dropping log file {}", self.name);
}
}
impl LogFile {
/// Opens a file in path with given log config.
pub async fn open(path: impl Into<String>, config: &LogConfig) -> Result<Self> {
let path = path.into();
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(path.clone())
.context(OpenLogSnafu { file_name: &path })?;
let file_name = FileName::try_from(path.as_str())?;
let start_entry_id = file_name.entry_id();
let mut log = Self {
name: file_name,
writer: Arc::new(FileWriter::new(Arc::new(file), path.clone())),
start_entry_id,
pending_request_tx: None,
notify: Arc::new(Notify::new()),
max_file_size: config.max_log_file_size,
join_handle: Mutex::new(None),
state: Arc::new(State::default()),
append_buffer_size: config.append_buffer_size,
};
let metadata = log.writer.inner.metadata().context(IoSnafu)?;
let expect_length = metadata.len() as usize;
log.state
.write_offset
.store(expect_length, Ordering::Relaxed);
log.state
.flush_offset
.store(expect_length, Ordering::Relaxed);
let replay_start_time = time::Instant::now();
let (actual_offset, next_entry_id) = log.replay().await?;
info!(
"Log file {} replay finished, last offset: {}, file start entry id: {}, elapsed time: {}ms",
path, actual_offset, start_entry_id,
time::Instant::now().duration_since(replay_start_time).as_millis()
);
log.state
.write_offset
.store(actual_offset, Ordering::Relaxed);
log.state
.flush_offset
.store(actual_offset, Ordering::Relaxed);
log.state
.last_entry_id
.store(next_entry_id, Ordering::Relaxed);
Ok(log)
}
/// Returns the persisted size of current log file.
#[inline]
pub fn persisted_size(&self) -> usize {
self.state.flush_offset()
}
/// Starts log file and it's internal components(including flush task, etc.).
pub async fn start(&mut self) -> Result<()> {
let notify = self.notify.clone();
let writer = self.writer.clone();
let state = self.state.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(self.append_buffer_size);
let handle = tokio::spawn(async move {
while !state.is_stopped() {
let batch = Self::recv_batch(&mut rx, &state, &notify, true).await;
debug!("Receive write request, size: {}", batch.len());
if !batch.is_empty() {
Self::handle_batch(batch, &state, &writer).await;
}
}
// log file stopped
let batch = Self::recv_batch(&mut rx, &state, &notify, false).await;
if !batch.is_empty() {
Self::handle_batch(batch, &state, &writer).await;
}
info!("Writer task finished");
Ok(())
});
self.pending_request_tx = Some(tx);
*self.join_handle.lock().unwrap() = Some(handle);
info!("Flush task started: {}", self.name);
Ok(())
}
/// Stops log file.
/// # Panics
/// Panics when a log file is stopped while not being started ever.
pub async fn stop(&self) -> Result<()> {
self.state.stopped.store(true, Ordering::Release);
let join_handle = self
.join_handle
.lock()
.unwrap()
.take()
.expect("Join handle should present");
self.notify.notify_waiters();
let res = join_handle.await.unwrap();
info!("LogFile task finished: {:?}", res);
res
}
pub async fn destroy(&self) -> Result<()> {
self.writer.destroy().await?;
Ok(())
}
async fn handle_batch(
mut batch: Vec<AppendRequest>,
state: &Arc<State>,
writer: &Arc<FileWriter>,
) {
// preserve previous write offset
let prev_write_offset = state.write_offset();
let mut last_id = 0;
for mut req in &mut batch {
req.offset = state
.write_offset
.fetch_add(req.data.len(), Ordering::AcqRel);
last_id = req.id;
debug!("Entry id: {}, offset: {}", req.id, req.offset,);
}
match writer.write_batch(&batch).await {
Ok(max_offset) => match writer.flush().await {
Ok(_) => {
let prev_ofs = state.flush_offset.swap(max_offset, Ordering::Acquire);
let prev_id = state.last_entry_id.swap(last_id, Ordering::Acquire);
debug!(
"Flush offset: {} -> {}, max offset in batch: {}, entry id: {}->{}",
prev_ofs,
state.flush_offset.load(Ordering::Acquire),
max_offset,
prev_id,
state.last_entry_id.load(Ordering::Acquire),
);
batch.into_iter().for_each(AppendRequest::complete);
}
Err(e) => {
error!(e; "Failed to flush log file");
batch.into_iter().for_each(|r| r.fail());
state
.write_offset
.store(prev_write_offset, Ordering::Release);
}
},
Err(e) => {
error!(e; "Failed to write append requests");
batch.into_iter().for_each(|r| r.fail());
state
.write_offset
.store(prev_write_offset, Ordering::Release);
}
}
}
async fn recv_batch(
rx: &mut Receiver<AppendRequest>,
state: &Arc<State>,
notify: &Arc<Notify>,
wait_on_empty: bool,
) -> Vec<AppendRequest> {
let mut batch: Vec<AppendRequest> = Vec::with_capacity(LOG_WRITER_BATCH_SIZE);
for _ in 0..LOG_WRITER_BATCH_SIZE {
match rx.try_recv() {
Ok(req) => {
batch.push(req);
}
Err(e) => match e {
TryRecvError::Empty => {
if batch.is_empty() && wait_on_empty {
notify.notified().await;
if state.is_stopped() {
break;
}
} else {
break;
}
}
TryRecvError::Disconnected => {
error!("Channel unexpectedly disconnected!");
break;
}
},
}
}
batch
}
#[inline]
pub fn start_entry_id(&self) -> Id {
self.start_entry_id
}
/// Replays current file til last entry read
pub async fn replay(&mut self) -> Result<(usize, Id)> {
let log_name = self.name.to_string();
let previous_offset = self.state.flush_offset();
let ns = LocalNamespace::default();
let mut stream = self.create_stream(
// TODO(hl): LocalNamespace should be filled
&ns, 0,
);
let mut last_offset = 0usize;
let mut last_entry_id: Option<Id> = None;
while let Some(res) = stream.next().await {
match res {
Ok(entries) => {
for e in entries {
last_offset += e.len();
last_entry_id = Some(e.id());
}
}
Err(e) => {
error!(e; "Error while replay log {}", log_name);
break;
}
}
}
info!(
"Replay log {} finished, offset: {} -> {}, last entry id: {:?}",
log_name, previous_offset, last_offset, last_entry_id
);
Ok((last_offset, last_entry_id.unwrap_or(self.start_entry_id)))
}
/// Creates a reader stream that asynchronously generates entries start from given entry id.
/// ### Notice
/// If the entry with start entry id is not present, the first generated entry will start with
/// the first entry with an id greater than `start_entry_id`.
pub fn create_stream(
&self,
_ns: &impl Namespace,
start_entry_id: u64,
) -> impl EntryStream<Entry = EntryImpl, Error = Error> + '_ {
let length = self.state.flush_offset.load(Ordering::Relaxed);
let mut chunk_stream = file_chunk_stream(self.writer.inner.clone(), 0, length, 0);
let entry_stream = stream!({
let mut chunks = ChunkList::new();
while let Some(chunk) = chunk_stream.next().await {
let chunk = chunk.unwrap();
chunks.push(chunk);
let mut batch = vec![];
loop {
match EntryImpl::decode(&mut chunks) {
Ok(e) => {
if e.id() >= start_entry_id {
batch.push(e);
}
}
Err(Error::DecodeAgain { .. }) => {
// no more data for decoding
break;
}
Err(e) => {
yield Err(e);
break;
}
}
}
trace!("Yield batch size: {}", batch.len());
yield Ok(batch);
}
});
StreamImpl {
inner: Box::pin(entry_stream),
start_entry_id,
}
}
/// Appends an entry to `LogFile` and return a `Result` containing the id of entry appended.
pub async fn append<T: Entry>(&self, e: &mut T) -> Result<AppendResponseImpl>
where
T: Encode<Error = Error>,
{
if self.state.is_stopped() {
return Err(Error::Eof);
}
let entry_id = e.id();
let mut serialized = BytesMut::with_capacity(e.encoded_size());
e.encode_to(&mut serialized)
.map_err(BoxedError::new)
.context(AppendSnafu)?;
let size = serialized.len();
if size + self.state.write_offset() > self.max_file_size {
return Err(Error::Eof);
}
// rewrite encoded data
LittleEndian::write_u64(&mut serialized[0..8], entry_id);
let checksum = CRC_ALGO.checksum(&serialized[0..size - 4]);
LittleEndian::write_u32(&mut serialized[size - 4..], checksum);
let (tx, rx) = oneshot::channel();
self.pending_request_tx
.as_ref()
.expect("Call start before write to LogFile!")
.send(AppendRequest {
data: serialized.freeze(),
tx,
offset: 0,
id: entry_id,
})
.await
.map_err(|_| {
InternalSnafu {
msg: "Send append request",
}
.build()
})?;
self.notify.notify_one(); // notify write thread.
rx.await
.expect("Sender dropped while waiting for append result")
.map_err(|_| {
InternalSnafu {
msg: "Failed to write request".to_string(),
}
.build()
})
}
#[inline]
pub fn try_seal(&self) -> bool {
self.state
.sealed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
#[inline]
pub fn is_seal(&self) -> bool {
self.state.sealed.load(Ordering::Acquire)
}
#[inline]
pub fn is_stopped(&self) -> bool {
self.state.stopped.load(Ordering::Acquire)
}
#[inline]
pub fn unseal(&self) {
self.state.sealed.store(false, Ordering::Release);
}
#[inline]
pub fn file_name(&self) -> String {
self.name.to_string()
}
#[inline]
pub fn last_entry_id(&self) -> Id {
self.state.last_entry_id.load(Ordering::Acquire)
}
}
impl Debug for LogFile {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogFile")
.field("name", &self.name)
.field("start_entry_id", &self.start_entry_id)
.field("max_file_size", &self.max_file_size)
.field("state", &self.state)
.finish()
}
}
#[derive(Debug)]
pub(crate) struct AppendRequest {
tx: OneshotSender<std::result::Result<AppendResponseImpl, ()>>,
offset: Offset,
id: Id,
data: Bytes,
}
impl AppendRequest {
#[inline]
pub fn complete(self) {
let _ = self.tx.send(Ok(AppendResponseImpl {
offset: self.offset,
entry_id: self.id,
}));
}
#[inline]
pub fn fail(self) {
let _ = self.tx.send(Err(()));
}
}
#[derive(Default, Debug)]
struct State {
write_offset: AtomicUsize,
flush_offset: AtomicUsize,
last_entry_id: AtomicU64,
sealed: AtomicBool,
stopped: AtomicBool,
}
impl State {
#[inline]
pub fn is_stopped(&self) -> bool {
self.stopped.load(Ordering::Acquire)
}
#[inline]
pub fn write_offset(&self) -> usize {
self.write_offset.load(Ordering::Acquire)
}
#[inline]
pub fn flush_offset(&self) -> usize {
self.flush_offset.load(Ordering::Acquire)
}
}
type SendableChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk>> + Send>>;
/// Creates a stream of chunks of data from file. If `buffer_size` is not 0, the returned stream
/// will have a bounded buffer and a background thread will do prefetching. When consumer cannot
/// catch up with spawned prefetch loop, the prefetch thread will be blocked and wait until buffer
/// has enough capacity.
///
/// If the `buffer_size` is 0, there will not be a prefetching thread. File chunks will not be read
/// until stream consumer asks for next chunk.
fn file_chunk_stream(
file: Arc<File>,
mut offset: usize,
file_size: usize,
buffer_size: usize,
) -> SendableChunkStream {
if buffer_size == 0 {
return file_chunk_stream_sync(file, offset, file_size);
}
let (tx, mut rx) = tokio::sync::mpsc::channel(buffer_size);
common_runtime::spawn_blocking_read(move || loop {
if offset >= file_size {
return;
}
match read_at(&file, offset, file_size) {
Ok(data) => {
let data_len = data.len();
if tx.blocking_send(Ok(data)).is_err() {
break;
}
offset += data_len;
continue;
}
Err(e) => {
error!(e; "Failed to read file chunk");
// we're going to break any way so just forget the join result.
let _ = tx.blocking_send(Err(e));
break;
}
}
});
Box::pin(stream!({
while let Some(v) = rx.recv().await {
yield v;
}
}))
}
fn file_chunk_stream_sync(
file: Arc<File>,
mut offset: usize,
file_size: usize,
) -> SendableChunkStream {
let s = stream!({
loop {
if offset >= file_size {
return;
}
match read_at(&file, offset, file_size) {
Ok(data) => {
let data_len = data.len();
yield Ok(data);
offset += data_len;
continue;
}
Err(e) => {
error!(e; "Failed to read file chunk");
yield Err(e);
break;
}
}
}
});
Box::pin(s)
}
/// Reads a chunk of data from file in a blocking manner.
/// The file may not contain enough data to fulfill the whole chunk so only data available
/// is read into chunk. The `write` field of `Chunk` indicates the end of valid data.
fn read_at(file: &Arc<File>, offset: usize, file_length: usize) -> Result<Chunk> {
if offset > file_length {
return Err(Eof);
}
let size = CHUNK_SIZE.min(file_length - offset);
let mut data = Box::new([0u8; CHUNK_SIZE]);
crate::fs::io::pread_exact(file.as_ref(), &mut data[0..size], offset as u64)?;
Ok(Chunk::new(data, size))
}
#[cfg(test)]
mod tests {
use std::io::Read;
use common_telemetry::logging;
use futures::pin_mut;
use futures_util::StreamExt;
use tempdir::TempDir;
use tokio::io::AsyncWriteExt;
use super::*;
use crate::fs::namespace::LocalNamespace;
#[tokio::test]
pub async fn test_create_entry_stream() {
logging::init_default_ut_logging();
let config = LogConfig::default();
let dir = TempDir::new("greptimedb-store-test").unwrap();
let path_buf = dir.path().join("0010.log");
let path = path_buf.to_str().unwrap().to_string();
File::create(path.as_str()).unwrap();
let mut file = LogFile::open(path.clone(), &config)
.await
.unwrap_or_else(|_| panic!("Failed to open file: {path}"));
file.start().await.expect("Failed to start log file");
assert_eq!(
10,
file.append(&mut EntryImpl::new(
"test1".as_bytes(),
10,
LocalNamespace::new(42)
))
.await
.expect("Failed to append entry 1")
.entry_id
);
assert_eq!(
11,
file.append(&mut EntryImpl::new(
"test-2".as_bytes(),
11,
LocalNamespace::new(42)
))
.await
.expect("Failed to append entry 2")
.entry_id
);
let mut log_file = std::fs::File::open(path.clone()).expect("Test log file does not exist");
let metadata = log_file.metadata().expect("Failed to read file metadata");
info!("Log file metadata: {:?}", metadata);
assert_eq!(75, metadata.len()); // 32+5+32+6
let mut content = vec![0; metadata.len() as usize];
log_file
.read_exact(&mut content)
.expect("Read log file failed");
info!(
"Log file {:?} content: {}, size:{}",
dir,
hex::encode(content),
metadata.len()
);
let ns = LocalNamespace::new(42);
let mut stream = file.create_stream(&ns, 0);
let mut data = vec![];
while let Some(v) = stream.next().await {
let entries = v.unwrap();
for e in entries {
let vec = e.data().to_vec();
info!("Read entry: {}", String::from_utf8_lossy(&vec));
data.push(String::from_utf8(vec).unwrap());
}
}
assert_eq!(vec!["test1".to_string(), "test-2".to_string()], data);
drop(stream);
let result = file.stop().await;
info!("Stop file res: {:?}", result);
}
#[tokio::test]
pub async fn test_read_at() {
let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap();
let file_path = dir.path().join("chunk-stream-file-test");
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&file_path)
.await
.unwrap();
file.write_all("1234567890ab".as_bytes()).await.unwrap();
file.flush().await.unwrap();
let file = Arc::new(file.into_std().await);
let result = read_at(&file, 0, 12).unwrap();
assert_eq!(12, result.len());
assert_eq!("1234567890ab".as_bytes(), &result.data[0..result.len()]);
}
#[tokio::test]
pub async fn test_read_at_center() {
let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap();
let file_path = dir.path().join("chunk-stream-file-test-center");
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&file_path)
.await
.unwrap();
file.write_all("1234567890ab".as_bytes()).await.unwrap();
file.flush().await.unwrap();
let file_len = file.metadata().await.unwrap().len();
let file = Arc::new(file.into_std().await);
let result = read_at(&file, 8, file_len as usize).unwrap();
assert_eq!(4, result.len());
assert_eq!("90ab".as_bytes(), &result.data[0..result.len()]);
}
#[tokio::test]
pub async fn test_file_chunk_stream() {
let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap();
let file_path = dir.path().join("chunk-stream-file-test");
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&file_path)
.await
.unwrap();
file.write_all(&vec![42].repeat(4096 + 1024)).await.unwrap();
file.flush().await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let file = Arc::new(file.into_std().await);
let stream = file_chunk_stream(file, 0, file_size as usize, 1024);
pin_mut!(stream);
let mut chunks = vec![];
while let Some(r) = stream.next().await {
chunks.push(r.unwrap());
}
assert_eq!(
vec![4096, 1024],
chunks.iter().map(|c| c.write_offset).collect::<Vec<_>>()
);
assert_eq!(
vec![vec![42].repeat(4096), vec![42].repeat(1024)],
chunks
.iter()
.map(|c| &c.data[0..c.write_offset])
.collect::<Vec<_>>()
);
}
#[tokio::test]
pub async fn test_sync_chunk_stream() {
let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap();
let file_path = dir.path().join("chunk-stream-file-test");
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&file_path)
.await
.unwrap();
file.write_all(&vec![42].repeat(4096 + 1024)).await.unwrap();
file.flush().await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let file = Arc::new(file.into_std().await);
let stream = file_chunk_stream_sync(file, 0, file_size as usize);
pin_mut!(stream);
let mut chunks = vec![];
while let Some(r) = stream.next().await {
chunks.push(r.unwrap());
}
assert_eq!(
vec![4096, 1024],
chunks.iter().map(|c| c.write_offset).collect::<Vec<_>>()
);
assert_eq!(
vec![vec![42].repeat(4096), vec![42].repeat(1024)],
chunks
.iter()
.map(|c| &c.data[0..c.write_offset])
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn test_shutdown() {
logging::init_default_ut_logging();
let config = LogConfig::default();
let dir = TempDir::new("greptimedb-store-test").unwrap();
let path_buf = dir.path().join("0010.log");
let path = path_buf.to_str().unwrap().to_string();
File::create(path.as_str()).unwrap();
let mut file = LogFile::open(path.clone(), &config)
.await
.unwrap_or_else(|_| panic!("Failed to open file: {path}"));
let state = file.state.clone();
file.start().await.unwrap();
drop(file);
assert!(state.stopped.load(Ordering::Relaxed));
}
}

View File

@@ -1,125 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::path::Path;
use snafu::OptionExt;
use store_api::logstore::entry::Id;
use crate::error::{Error, FileNameIllegalSnafu, SuffixIllegalSnafu};
use crate::fs::file_name::FileName::Log;
/// FileName represents the file name with padded leading zeros.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum FileName {
// File name with .log as suffix.
Log(Id),
}
impl TryFrom<&str> for FileName {
type Error = Error;
fn try_from(p: &str) -> Result<Self, Self::Error> {
let path = Path::new(p);
let extension =
path.extension()
.and_then(|s| s.to_str())
.with_context(|| FileNameIllegalSnafu {
file_name: path.to_string_lossy(),
})?;
let id: u64 = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())
.with_context(|| FileNameIllegalSnafu {
file_name: p.to_string(),
})?;
Self::new_with_suffix(id, extension)
}
}
impl From<u64> for FileName {
fn from(entry_id: u64) -> Self {
Self::log(entry_id)
}
}
impl FileName {
pub fn log(entry_id: Id) -> Self {
Log(entry_id)
}
pub fn new_with_suffix(entry_id: Id, suffix: &str) -> Result<Self, Error> {
match suffix {
"log" => Ok(Log(entry_id)),
_ => SuffixIllegalSnafu { suffix }.fail(),
}
}
pub fn entry_id(&self) -> Id {
match self {
Log(id) => *id,
}
}
fn suffix(&self) -> &str {
match self {
Log(_) => ".log",
}
}
}
impl Display for FileName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:020}{}", self.entry_id(), self.suffix())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_padding_file_name() {
let id = u64::MIN;
assert_eq!("00000000000000000000", format!("{id:020}"));
let id = 123u64;
assert_eq!("00000000000000000123", format!("{id:020}"));
let id = 123123123123u64;
assert_eq!("00000000123123123123", format!("{id:020}"));
let id = u64::MAX;
assert_eq!(u64::MAX.to_string(), format!("{id:020}"));
}
#[test]
pub fn test_file_name_to_string() {
assert_eq!("00000000000000000000.log", FileName::log(0).to_string());
assert_eq!(
u64::MAX.to_string() + ".log",
FileName::log(u64::MAX).to_string()
);
}
#[test]
pub fn test_parse_file_name() {
let path = "/path/to/any/01010010000.log";
let parsed = FileName::try_from(path).expect("Failed to parse file name");
assert_eq!(1010010000u64, parsed.entry_id());
assert_eq!(".log", parsed.suffix());
}
}

View File

@@ -1,82 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::RwLock;
use store_api::logstore::entry::{Id, Offset};
use crate::error::Result;
use crate::fs::file_name::FileName;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Location {
pub file_name: FileName,
pub offset: Offset,
}
#[allow(dead_code)]
impl Location {
pub fn new(file_name: FileName, offset: Offset) -> Self {
Self { file_name, offset }
}
}
/// In-memory entry id to offset index.
pub trait EntryIndex {
/// Add entry id to offset mapping.
fn add_entry_id(&self, id: Id, loc: Location) -> Option<Location>;
/// Find offset by entry id.
fn find_offset_by_id(&self, id: Id) -> Result<Option<Location>>;
}
pub struct MemoryIndex {
map: RwLock<BTreeMap<Id, Location>>,
}
#[allow(dead_code)]
impl MemoryIndex {
pub fn new() -> Self {
Self {
map: RwLock::new(BTreeMap::new()),
}
}
}
impl EntryIndex for MemoryIndex {
fn add_entry_id(&self, id: Id, loc: Location) -> Option<Location> {
self.map.write().unwrap().insert(id, loc)
}
fn find_offset_by_id(&self, id: Id) -> Result<Option<Location>> {
Ok(self.map.read().unwrap().get(&id).cloned())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_entry() {
let index = MemoryIndex::new();
index.add_entry_id(1, Location::new(FileName::log(0), 1));
assert_eq!(
Location::new(FileName::log(0), 1),
index.find_offset_by_id(1).unwrap().unwrap()
);
assert_eq!(None, index.find_offset_by_id(2).unwrap());
}
}

View File

@@ -1,29 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::TryFrom;
use std::fs::File;
use snafu::ResultExt;
use crate::error::Error;
// TODO(hl): Implement pread/pwrite for non-Unix platforms
pub fn pread_exact(file: &File, _buf: &mut [u8], _offset: u64) -> Result<(), Error> {
unimplemented!()
}
pub fn pwrite_all(file: &File, _buf: &[u8], _offset: u64) -> Result<(), Error> {
unimplemented!()
}

View File

@@ -1,28 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use std::os::unix::fs::FileExt;
use snafu::ResultExt;
use crate::error::{Error, IoSnafu};
pub fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> Result<(), Error> {
file.read_exact_at(buf, offset).context(IoSnafu)
}
pub fn pwrite_all(file: &File, buf: &[u8], offset: u64) -> Result<(), Error> {
file.write_all_at(buf, offset).context(IoSnafu)
}

View File

@@ -1,770 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use common_telemetry::{debug, error, info, warn};
use futures::{pin_mut, StreamExt};
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{Encode, Entry, Id};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::LogStore;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{
CreateDirSnafu, DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu,
InvalidStateSnafu, IoSnafu, ReadPathSnafu, Result, WaitGcTaskStopSnafu,
};
use crate::fs::config::LogConfig;
use crate::fs::entry::EntryImpl;
use crate::fs::file::{LogFile, LogFileRef};
use crate::fs::file_name::FileName;
use crate::fs::namespace::LocalNamespace;
use crate::fs::AppendResponseImpl;
type FileMap = BTreeMap<u64, LogFileRef>;
#[derive(Debug)]
pub struct LocalFileLogStore {
files: Arc<RwLock<FileMap>>,
active: ArcSwap<LogFile>,
config: LogConfig,
obsolete_ids: Arc<RwLock<HashMap<LocalNamespace, u64>>>,
cancel_token: Mutex<Option<CancellationToken>>,
gc_task_handle: Mutex<Option<JoinHandle<()>>>,
}
impl LocalFileLogStore {
/// Opens a directory as log store directory, initialize directory if it is empty.
pub async fn open(config: &LogConfig) -> Result<Self> {
// Create the log directory if missing.
tokio::fs::create_dir_all(&config.log_file_dir)
.await
.context(CreateDirSnafu {
path: &config.log_file_dir,
})?;
let mut files = Self::load_dir(&config.log_file_dir, config).await?;
if files.is_empty() {
Self::init_on_empty(&mut files, config).await?;
info!("Initialized log store directory: {}", config.log_file_dir)
}
let id = *files.keys().max().context(InternalSnafu {
msg: format!(
"log store directory is empty after initialization: {}",
config.log_file_dir
),
})?;
info!(
"Successfully loaded log store directory, files: {:?}",
files
);
let active_file = files
.get_mut(&id)
.expect("Not expected to fail when initing log store");
active_file.unseal();
let active_file_name = active_file.file_name();
info!("Log store active log file: {}", active_file_name);
// Start active log file
Arc::get_mut(active_file)
.with_context(|| InternalSnafu {
msg: format!(
"Concurrent modification on log store {active_file_name} start is not allowed"
),
})?
.start()
.await?;
info!(
"Successfully started current active file: {}",
active_file_name
);
let active_file_cloned = active_file.clone();
Ok(Self {
files: Arc::new(RwLock::new(files)),
active: ArcSwap::new(active_file_cloned),
config: config.clone(),
obsolete_ids: Arc::new(Default::default()),
cancel_token: Mutex::new(None),
gc_task_handle: Mutex::new(None),
})
}
pub async fn init_on_empty(files: &mut FileMap, config: &LogConfig) -> Result<()> {
let path = Path::new(&config.log_file_dir).join(FileName::log(0).to_string());
let file_path = path.to_str().context(FileNameIllegalSnafu {
file_name: config.log_file_dir.clone(),
})?;
let file = LogFile::open(file_path, config).await?;
files.insert(0, Arc::new(file));
Ok(())
}
pub async fn load_dir(path: impl AsRef<str>, config: &LogConfig) -> Result<FileMap> {
let mut map = FileMap::new();
let mut dir = tokio::fs::read_dir(Path::new(path.as_ref()))
.await
.context(ReadPathSnafu {
path: path.as_ref(),
})?;
while let Some(f) = dir.next_entry().await.context(IoSnafu)? {
let path_buf = f.path();
let path = path_buf.to_str().context(FileNameIllegalSnafu {
file_name: path.as_ref().to_string(),
})?;
let file_name = FileName::try_from(path)?;
let start_id = file_name.entry_id();
let file = LogFile::open(path, config).await?;
info!("Load log store file {}: {:?}", start_id, file);
if map.contains_key(&start_id) {
error!("Log file with start entry id: {start_id} already exists");
return DuplicateFileSnafu {
msg: format!("File with start id: {start_id} duplicates on start"),
}
.fail();
}
file.try_seal();
map.insert(start_id, Arc::new(file));
}
Ok(map)
}
/// Mark current active file as closed and create a new log file for writing.
async fn roll_next(&self, active: LogFileRef) -> Result<()> {
// acquires lock
let mut files = self.files.write().await;
// if active is already sealed, then just return.
if active.is_seal() {
return Ok(());
}
// create and start a new log file
let next_entry_id = active.last_entry_id() + 1;
let path_buf =
Path::new(&self.config.log_file_dir).join(FileName::log(next_entry_id).to_string());
let path = path_buf.to_str().context(FileNameIllegalSnafu {
file_name: self.config.log_file_dir.clone(),
})?;
let mut new_file = LogFile::open(path, &self.config).await?;
new_file.start().await?;
let new_file = Arc::new(new_file);
files.insert(new_file.start_entry_id(), new_file.clone());
self.active.swap(new_file);
active.try_seal();
tokio::spawn(async move {
active.stop().await.unwrap();
info!("Sealed log file {} stopped.", active.file_name());
});
Ok(()) // release lock
}
pub fn active_file(&self) -> Arc<LogFile> {
self.active.load().clone()
}
}
async fn gc(
files: Arc<RwLock<FileMap>>,
obsolete_ids: Arc<RwLock<HashMap<LocalNamespace, u64>>>,
) -> Result<()> {
if let Some(lowest) = find_lowest_id(obsolete_ids).await {
gc_inner(files, lowest).await
} else {
Ok(())
}
}
async fn find_lowest_id(obsolete_ids: Arc<RwLock<HashMap<LocalNamespace, u64>>>) -> Option<u64> {
let mut lowest_obsolete = None;
{
let obsolete_ids = obsolete_ids.read().await;
for (ns, id) in obsolete_ids.iter() {
if *id <= *lowest_obsolete.get_or_insert(*id) {
lowest_obsolete = Some(*id);
debug!("Current lowest obsolete id: {}, namespace: {:?}", *id, ns);
}
}
}
lowest_obsolete
}
async fn gc_inner(files: Arc<RwLock<FileMap>>, obsolete_id: u64) -> Result<()> {
let mut files = files.write().await;
let files_to_delete = find_files_to_delete(&files, obsolete_id);
info!(
"Compacting log file up to entry id: {}, files to delete: {:?}",
obsolete_id, files_to_delete
);
for entry_id in files_to_delete {
if let Some(f) = files.remove(&entry_id) {
if !f.is_stopped() {
f.stop().await?;
}
f.destroy().await?;
info!("Destroyed log file: {}", f.file_name());
}
}
Ok(())
}
fn find_files_to_delete<T>(offset_map: &BTreeMap<u64, T>, entry_id: u64) -> Vec<u64> {
let mut res = vec![];
for (cur, next) in offset_map.keys().zip(offset_map.keys().skip(1)) {
if *cur < entry_id && *next <= entry_id {
res.push(*cur);
}
}
res
}
#[async_trait::async_trait]
impl LogStore for LocalFileLogStore {
type Error = Error;
type Namespace = LocalNamespace;
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn start(&self) -> Result<()> {
let files = self.files.clone();
let obsolete_ids = self.obsolete_ids.clone();
let interval = self.config.gc_interval;
let token = tokio_util::sync::CancellationToken::new();
let child = token.child_token();
let handle = common_runtime::spawn_bg(async move {
loop {
if let Err(e) = gc(files.clone(), obsolete_ids.clone()).await {
error!(e; "Failed to gc log store");
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
info!("LogStore gc task has been cancelled");
return;
}
}
}
});
*self.gc_task_handle.lock().await = Some(handle);
*self.cancel_token.lock().await = Some(token);
Ok(())
}
async fn stop(&self) -> Result<()> {
let handle = self
.gc_task_handle
.lock()
.await
.take()
.context(InvalidStateSnafu {
msg: "Logstore gc task not spawned",
})?;
let token = self
.cancel_token
.lock()
.await
.take()
.context(InvalidStateSnafu {
msg: "Logstore gc task not spawned",
})?;
token.cancel();
Ok(handle.await.context(WaitGcTaskStopSnafu)?)
}
async fn append(&self, mut entry: Self::Entry) -> Result<Self::AppendResponse> {
// TODO(hl): configurable retry times
for _ in 0..3 {
let current_active_file = self.active_file();
match current_active_file.append(&mut entry).await {
Ok(r) => return Ok(r),
Err(e) => match e {
Error::Eof => {
self.roll_next(current_active_file.clone()).await?;
info!(
"Rolled to next file, retry append, entry size: {}",
entry.encoded_size()
);
continue;
}
Error::Internal { .. } => {
warn!("File closed, try new file");
continue;
}
_ => {
error!(e; "Failed to roll to next log file");
return Err(e);
}
},
}
}
return InternalSnafu {
msg: "Failed to append entry with max retry time exceeds",
}
.fail();
}
async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
todo!()
}
async fn read(
&self,
ns: &Self::Namespace,
id: Id,
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
let files = self.files.read().await;
let ns = ns.clone();
let s = stream!({
for (start_id, file) in files.iter() {
// TODO(hl): Use index to lookup file
if *start_id <= id {
let s = file.create_stream(&ns, id);
pin_mut!(s);
while let Some(entries) = s.next().await {
match entries {
Ok(entries) => {
yield Ok(entries
.into_iter()
.filter(|e| e.namespace().id() == ns.id())
.collect::<Vec<_>>())
}
Err(e) => yield Err(e),
}
}
}
}
});
Ok(Box::pin(s))
}
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
todo!()
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, namespace: Self::Namespace) -> Self::Entry {
EntryImpl::new(data, id, namespace)
}
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
LocalNamespace::new(id)
}
async fn obsolete(
&self,
namespace: Self::Namespace,
id: Id,
) -> std::result::Result<(), Self::Error> {
info!("Mark namespace obsolete entry id, {:?}:{}", namespace, id);
let mut map = self.obsolete_ids.write().await;
let prev = map.insert(namespace, id);
info!("Prev: {:?}", prev);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::time::Duration;
use futures_util::StreamExt;
use rand::distributions::Alphanumeric;
use rand::Rng;
use store_api::logstore::entry::Entry;
use tempdir::TempDir;
use super::*;
#[tokio::test]
pub async fn test_roll_file() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb1").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert_eq!(
0,
logstore
.append(EntryImpl::new(
generate_data(96),
0,
LocalNamespace::new(42)
),)
.await
.unwrap()
.entry_id
);
assert_eq!(
1,
logstore
.append(EntryImpl::new(
generate_data(96),
1,
LocalNamespace::new(42)
))
.await
.unwrap()
.entry_id
);
}
fn generate_data(size: usize) -> Vec<u8> {
let s: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(size)
.map(char::from)
.collect();
s.into_bytes()
}
#[tokio::test]
pub async fn test_write_and_read_data() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb2").unwrap();
let dir_str = dir.path().to_string_lossy().to_string();
info!("dir: {}", dir_str);
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::new(42);
let id = logstore
.append(EntryImpl::new(
generate_data(96),
0,
LocalNamespace::new(42),
))
.await
.unwrap()
.entry_id;
assert_eq!(0, id);
let stream = logstore.read(&ns, 0).await.unwrap();
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 0);
assert_eq!(42, entries[0].namespace_id);
}
#[tokio::test]
pub async fn test_namespace() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 1024 * 1024,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert_eq!(
0,
logstore
.append(EntryImpl::new(
generate_data(96),
0,
LocalNamespace::new(42),
))
.await
.unwrap()
.entry_id
);
assert_eq!(
1,
logstore
.append(EntryImpl::new(
generate_data(96),
1,
LocalNamespace::new(43),
))
.await
.unwrap()
.entry_id
);
let stream = logstore.read(&LocalNamespace::new(42), 0).await.unwrap();
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 0);
assert_eq!(42, entries[0].namespace_id);
let stream = logstore.read(&LocalNamespace::new(43), 0).await.unwrap();
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 1);
assert_eq!(43, entries[0].namespace_id);
}
#[test]
fn test_find_files_to_delete() {
let file_map = vec![(1u64, ()), (11u64, ()), (21u64, ()), (31u64, ())]
.into_iter()
.collect::<BTreeMap<u64, ()>>();
assert!(find_files_to_delete(&file_map, 0).is_empty());
assert!(find_files_to_delete(&file_map, 1).is_empty());
assert!(find_files_to_delete(&file_map, 2).is_empty());
assert!(find_files_to_delete(&file_map, 10).is_empty());
assert_eq!(vec![1], find_files_to_delete(&file_map, 11));
assert_eq!(vec![1], find_files_to_delete(&file_map, 20));
assert_eq!(vec![1, 11], find_files_to_delete(&file_map, 21));
assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 31));
assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 100));
}
#[tokio::test]
async fn test_find_lowest_id() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb-log-compact").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 4096,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert!(find_lowest_id(logstore.obsolete_ids.clone())
.await
.is_none());
logstore
.obsolete(LocalNamespace::new(1), 100)
.await
.unwrap();
assert_eq!(
Some(100),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(2), 200)
.await
.unwrap();
assert_eq!(
Some(100),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(1), 101)
.await
.unwrap();
assert_eq!(
Some(101),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(2), 202)
.await
.unwrap();
assert_eq!(
Some(101),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(1), 300)
.await
.unwrap();
assert_eq!(
Some(202),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
}
#[tokio::test]
async fn test_compact_log_file() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb-log-compact").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 4096,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
for id in 0..50 {
logstore
.append(EntryImpl::new(
generate_data(990),
id,
LocalNamespace::new(42),
))
.await
.unwrap();
}
assert_eq!(
vec![0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
gc_inner(logstore.files.clone(), 10).await.unwrap();
assert_eq!(
vec![8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
gc_inner(logstore.files.clone(), 28).await.unwrap();
assert_eq!(
vec![28, 32, 36, 40, 44, 48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
gc_inner(logstore.files.clone(), 50).await.unwrap();
assert_eq!(
vec![48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn test_gc_task() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb-log-compact").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 4096,
log_file_dir: dir.path().to_str().unwrap().to_string(),
gc_interval: Duration::from_millis(100),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
logstore.start().await.unwrap();
for id in 0..50 {
logstore
.append(EntryImpl::new(
generate_data(990),
id,
LocalNamespace::new(42),
))
.await
.unwrap();
}
logstore
.obsolete(LocalNamespace::new(42), 30)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let file_ids = logstore
.files
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>();
assert_eq!(vec![28, 32, 36, 40, 44, 48], file_ids);
let mut files = vec![];
let mut readir = tokio::fs::read_dir(dir.path()).await.unwrap();
while let Some(r) = readir.next_entry().await.transpose() {
let entry = r.unwrap();
files.push(entry.file_name().to_str().unwrap().to_string());
}
assert_eq!(
vec![
"00000000000000000028.log".to_string(),
"00000000000000000048.log".to_string(),
"00000000000000000040.log".to_string(),
"00000000000000000044.log".to_string(),
"00000000000000000036.log".to_string(),
"00000000000000000032.log".to_string()
]
.into_iter()
.collect::<HashSet<String>>(),
files.into_iter().collect::<HashSet<String>>()
);
}
}

View File

@@ -1,38 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::logstore::namespace::{Id, Namespace};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct LocalNamespace {
pub(crate) id: Id,
}
impl Default for LocalNamespace {
fn default() -> Self {
LocalNamespace::new(0)
}
}
impl LocalNamespace {
pub(crate) fn new(id: Id) -> Self {
Self { id }
}
}
impl Namespace for LocalNamespace {
fn id(&self) -> Id {
self.id
}
}

View File

@@ -1,93 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::logstore::entry::Id;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::LogStore;
use crate::error::{Error, Result};
use crate::fs::entry::EntryImpl;
use crate::fs::namespace::LocalNamespace;
use crate::fs::AppendResponseImpl;
/// A noop log store which only for test
// TODO: Add a test feature
#[derive(Debug, Default)]
pub struct NoopLogStore;
#[async_trait::async_trait]
impl LogStore for NoopLogStore {
type Error = Error;
type Namespace = LocalNamespace;
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn append(&self, mut _e: Self::Entry) -> Result<Self::AppendResponse> {
Ok(AppendResponseImpl {
entry_id: 0,
offset: 0,
})
}
async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
todo!()
}
async fn read(
&self,
_ns: &Self::Namespace,
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
}
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
todo!()
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
EntryImpl::new(data, id, ns)
}
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
LocalNamespace::new(id)
}
async fn obsolete(
&self,
namespace: Self::Namespace,
id: Id,
) -> std::result::Result<(), Self::Error> {
let _ = namespace;
let _ = id;
Ok(())
}
}

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod config;
pub mod error;
pub mod fs;
mod noop;
pub mod raft_engine;
pub mod test_util;
pub use config::LogConfig;
pub use noop::NoopLogStore;

148
src/log-store/src/noop.rs Normal file
View File

@@ -0,0 +1,148 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::logstore::entry::{Entry, Id};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::{AppendResponse, LogStore};
use crate::error::{Error, Result};
/// A noop log store which only for test
#[derive(Debug, Default)]
pub struct NoopLogStore;
#[derive(Debug, Default, Clone, PartialEq)]
pub struct EntryImpl;
#[derive(Debug, Clone, Default, Hash, PartialEq)]
pub struct NamespaceImpl;
impl Namespace for NamespaceImpl {
fn id(&self) -> NamespaceId {
0
}
}
impl Entry for EntryImpl {
type Error = Error;
type Namespace = NamespaceImpl;
fn data(&self) -> &[u8] {
&[]
}
fn id(&self) -> Id {
0
}
fn namespace(&self) -> Self::Namespace {
Default::default()
}
}
#[async_trait::async_trait]
impl LogStore for NoopLogStore {
type Error = Error;
type Namespace = NamespaceImpl;
type Entry = EntryImpl;
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
Ok(AppendResponse { entry_id: 0 })
}
async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec<Self::Entry>) -> Result<Vec<Id>> {
Ok(vec![])
}
async fn read(
&self,
_ns: &Self::Namespace,
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
}
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}
async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
Ok(vec![])
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
let _ = data;
let _ = id;
let _ = ns;
EntryImpl::default()
}
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
let _ = id;
NamespaceImpl::default()
}
async fn obsolete(
&self,
namespace: Self::Namespace,
id: Id,
) -> std::result::Result<(), Self::Error> {
let _ = namespace;
let _ = id;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mock_entry() {
let e = EntryImpl::default();
assert_eq!(0, e.data().len());
assert_eq!(0, e.id());
}
#[tokio::test]
async fn test_noop_logstore() {
let mut store = NoopLogStore::default();
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
store.append(e.clone()).await.unwrap();
store
.append_batch(&NamespaceImpl::default(), vec![e])
.await
.unwrap();
store
.create_namespace(&NamespaceImpl::default())
.await
.unwrap();
assert_eq!(0, store.list_namespaces().await.unwrap().len());
store
.delete_namespace(&NamespaceImpl::default())
.await
.unwrap();
assert_eq!(NamespaceImpl::default(), store.namespace(0));
store.obsolete(NamespaceImpl::default(), 1).await.unwrap();
}
}

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::{Hash, Hasher};
use store_api::logstore::entry::{Entry, Id};
use store_api::logstore::namespace::Namespace;
use crate::error::Error;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
pub mod log_store;
pub mod protos {
include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs")));
}
impl EntryImpl {
pub fn create(id: u64, ns: u64, data: Vec<u8>) -> Self {
Self {
id,
namespace_id: ns,
data,
..Default::default()
}
}
}
impl NamespaceImpl {
pub fn with_id(id: Id) -> Self {
Self {
id,
..Default::default()
}
}
}
#[allow(clippy::derive_hash_xor_eq)]
impl Hash for NamespaceImpl {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl Namespace for NamespaceImpl {
fn id(&self) -> store_api::logstore::namespace::Id {
self.id
}
}
impl Entry for EntryImpl {
type Error = Error;
type Namespace = NamespaceImpl;
fn data(&self) -> &[u8] {
self.data.as_slice()
}
fn id(&self) -> Id {
self.id
}
fn namespace(&self) -> Self::Namespace {
NamespaceImpl {
id: self.namespace_id,
..Default::default()
}
}
}

View File

@@ -0,0 +1,553 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_stream::stream;
use common_telemetry::{error, info};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::entry::Id;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace as NamespaceTrait;
use store_api::logstore::{AppendResponse, LogStore};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::config::LogConfig;
use crate::error::{
AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu,
RaftEngineSnafu, WaitGcTaskStopSnafu,
};
use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace};
const NAMESPACE_PREFIX: &str = "__sys_namespace_";
const SYSTEM_NAMESPACE: u64 = 0;
pub struct RaftEngineLogStore {
config: LogConfig,
engine: Arc<Engine>,
cancel_token: Mutex<Option<CancellationToken>>,
gc_task_handle: Mutex<Option<JoinHandle<()>>>,
started: AtomicBool,
}
impl RaftEngineLogStore {
pub async fn try_new(config: LogConfig) -> Result<Self, Error> {
// TODO(hl): set according to available disk space
let raft_engine_config = Config {
dir: config.log_file_dir.clone(),
purge_threshold: ReadableSize(config.purge_threshold as u64),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(config.max_log_file_size as u64),
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
let log_store = Self {
config,
engine,
cancel_token: Mutex::new(None),
gc_task_handle: Mutex::new(None),
started: AtomicBool::new(false),
};
log_store.start().await?;
Ok(log_store)
}
pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
}
async fn start(&self) -> Result<(), Error> {
let engine_clone = self.engine.clone();
let interval = self.config.gc_interval;
let token = CancellationToken::new();
let child = token.child_token();
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = common_runtime::spawn_bg(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
info!("LogStore gc task has been cancelled");
return;
}
}
match engine_clone.purge_expired_files().context(RaftEngineSnafu) {
Ok(res) => {
// TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact,
// which is useful when monitoring regions failed to flush it's memtable to SSTs.
info!(
"Successfully purged logstore files, namespaces need compaction: {:?}",
res
);
}
Err(e) => {
error!(e; "Failed to purge files in logstore");
}
}
}
});
*self.cancel_token.lock().await = Some(token);
*self.gc_task_handle.lock().await = Some(handle);
self.started.store(true, Ordering::Relaxed);
info!("RaftEngineLogStore started with config: {:?}", self.config);
Ok(())
}
}
impl Debug for RaftEngineLogStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftEngineLogsStore")
.field("config", &self.config)
.field("started", &self.started.load(Ordering::Relaxed))
.finish()
}
}
#[async_trait::async_trait]
impl LogStore for RaftEngineLogStore {
type Error = Error;
type Namespace = Namespace;
type Entry = Entry;
async fn stop(&self) -> Result<(), Self::Error> {
ensure!(
self.started
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok(),
IllegalStateSnafu
);
let handle = self
.gc_task_handle
.lock()
.await
.take()
.context(IllegalStateSnafu)?;
let token = self
.cancel_token
.lock()
.await
.take()
.context(IllegalStateSnafu)?;
token.cancel();
handle.await.context(WaitGcTaskStopSnafu)?;
info!("RaftEngineLogStore stopped");
Ok(())
}
/// Append an entry to logstore. Currently of existence of entry's namespace is not checked.
async fn append(&self, e: Self::Entry) -> Result<AppendResponse, Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let entry_id = e.id;
let mut batch = LogBatch::with_capacity(1);
batch
.add_entries::<MessageType>(e.namespace_id, &[e])
.context(AddEntryLogBatchSnafu)?;
self.engine
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
Ok(AppendResponse { entry_id })
}
/// Append a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
/// batch append.
async fn append_batch(
&self,
ns: &Self::Namespace,
entries: Vec<Self::Entry>,
) -> Result<Vec<Id>, Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let entry_ids = entries.iter().map(Entry::get_id).collect::<Vec<_>>();
let mut batch = LogBatch::with_capacity(entries.len());
batch
.add_entries::<MessageType>(ns.id, &entries)
.context(AddEntryLogBatchSnafu)?;
self.engine
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
Ok(entry_ids)
}
/// Create a stream of entries from logstore in the given namespace. The end of stream is
/// determined by the current "last index" of the namespace.
async fn read(
&self,
ns: &Self::Namespace,
id: Id,
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>, Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let engine = self.engine.clone();
let last_index = engine.last_index(ns.id).unwrap_or(0);
let mut start_index = id.max(engine.first_index(ns.id).unwrap_or(last_index + 1));
let max_batch_size = self.config.read_batch_size;
let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
let ns = ns.clone();
common_runtime::spawn_read(async move {
while start_index <= last_index {
let mut vec = Vec::with_capacity(max_batch_size);
match engine
.fetch_entries_to::<MessageType>(
ns.id,
start_index,
last_index + 1,
Some(max_batch_size),
&mut vec,
)
.context(FetchEntrySnafu {
ns: ns.id,
start: start_index,
end: last_index,
max_size: max_batch_size,
}) {
Ok(_) => {
if let Some(last_entry) = vec.last() {
start_index = last_entry.id + 1;
}
// reader side closed, cancel following reads
if tx.send(Ok(vec)).await.is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(e)).await;
break;
}
}
}
});
let s = stream!({
while let Some(res) = rx.recv().await {
yield res;
}
});
Ok(Box::pin(s))
}
async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> {
ensure!(
ns.id != SYSTEM_NAMESPACE,
IllegalNamespaceSnafu { ns: ns.id }
);
ensure!(self.started(), IllegalStateSnafu);
let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec();
let mut batch = LogBatch::with_capacity(1);
batch
.put_message::<Namespace>(SYSTEM_NAMESPACE, key, ns)
.context(RaftEngineSnafu)?;
self.engine
.write(&mut batch, true)
.context(RaftEngineSnafu)?;
Ok(())
}
async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> {
ensure!(
ns.id != SYSTEM_NAMESPACE,
IllegalNamespaceSnafu { ns: ns.id }
);
ensure!(self.started(), IllegalStateSnafu);
let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec();
let mut batch = LogBatch::with_capacity(1);
batch.delete(SYSTEM_NAMESPACE, key);
self.engine
.write(&mut batch, true)
.context(RaftEngineSnafu)?;
Ok(())
}
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let mut namespaces: Vec<Namespace> = vec![];
self.engine
.scan_messages::<Namespace, _>(
SYSTEM_NAMESPACE,
Some(NAMESPACE_PREFIX.as_bytes()),
None,
false,
|_, v| {
namespaces.push(v);
true
},
)
.context(RaftEngineSnafu)?;
Ok(namespaces)
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
Entry {
id,
data: data.as_ref().to_vec(),
namespace_id: ns.id(),
..Default::default()
}
}
fn namespace(&self, id: store_api::logstore::namespace::Id) -> Self::Namespace {
Namespace {
id,
..Default::default()
}
}
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let obsoleted = self.engine.compact_to(namespace.id(), id + 1);
info!(
"Namespace {} obsoleted {} entries",
namespace.id(),
obsoleted
);
Ok(())
}
}
#[derive(Debug, Clone)]
struct MessageType;
impl MessageExt for MessageType {
type Entry = Entry;
fn index(e: &Self::Entry) -> u64 {
e.id
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::time::Duration;
use common_telemetry::debug;
use futures_util::StreamExt;
use raft_engine::ReadableSize;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace as NamespaceTrait;
use store_api::logstore::LogStore;
use tempdir::TempDir;
use crate::config::LogConfig;
use crate::error::Error;
use crate::raft_engine::log_store::RaftEngineLogStore;
use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace};
#[tokio::test]
async fn test_open_logstore() {
let dir = TempDir::new("raft-engine-logstore-test").unwrap();
let logstore = RaftEngineLogStore::try_new(LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
let namespaces = logstore.list_namespaces().await.unwrap();
assert_eq!(0, namespaces.len());
}
#[tokio::test]
async fn test_manage_namespace() {
let dir = TempDir::new("raft-engine-logstore-test").unwrap();
let mut logstore = RaftEngineLogStore::try_new(LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
assert!(logstore.list_namespaces().await.unwrap().is_empty());
logstore
.create_namespace(&Namespace::with_id(42))
.await
.unwrap();
let namespaces = logstore.list_namespaces().await.unwrap();
assert_eq!(1, namespaces.len());
assert_eq!(Namespace::with_id(42), namespaces[0]);
logstore
.delete_namespace(&Namespace::with_id(42))
.await
.unwrap();
assert!(logstore.list_namespaces().await.unwrap().is_empty());
}
#[tokio::test]
async fn test_append_and_read() {
let dir = TempDir::new("raft-engine-logstore-test").unwrap();
let logstore = RaftEngineLogStore::try_new(LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
let namespace = Namespace::with_id(1);
let cnt = 1024;
for i in 0..cnt {
let response = logstore
.append(Entry::create(
i,
namespace.id,
i.to_string().as_bytes().to_vec(),
))
.await
.unwrap();
assert_eq!(i, response.entry_id);
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap();
while let Some(r) = s.next().await {
let vec = r.unwrap();
entries.extend(vec.into_iter().map(|e| e.id));
}
assert_eq!((0..cnt).into_iter().collect::<HashSet<_>>(), entries);
}
async fn collect_entries(mut s: SendableEntryStream<'_, Entry, Error>) -> Vec<Entry> {
let mut res = vec![];
while let Some(r) = s.next().await {
res.extend(r.unwrap());
}
res
}
#[tokio::test]
async fn test_reopen() {
let dir = TempDir::new("raft-engine-logstore-reopen-test").unwrap();
{
let logstore = RaftEngineLogStore::try_new(LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore
.append(Entry::create(1, 1, "1".as_bytes().to_vec()))
.await
.unwrap();
let entries = logstore
.read(&Namespace::with_id(1), 1)
.await
.unwrap()
.collect::<Vec<_>>()
.await;
assert_eq!(1, entries.len());
logstore.stop().await.unwrap();
}
let logstore = RaftEngineLogStore::try_new(LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
let entries =
collect_entries(logstore.read(&Namespace::with_id(1), 1).await.unwrap()).await;
assert_eq!(1, entries.len());
assert_eq!(1, entries[0].id);
assert_eq!(1, entries[0].namespace_id);
}
async fn wal_dir_usage(path: impl AsRef<str>) -> usize {
let mut size: usize = 0;
let mut read_dir = tokio::fs::read_dir(path.as_ref()).await.unwrap();
while let Ok(dir_entry) = read_dir.next_entry().await {
let Some(entry) = dir_entry else {
break;
};
if entry.file_type().await.unwrap().is_file() {
let file_name = entry.file_name();
let file_size = entry.metadata().await.unwrap().len() as usize;
debug!("File: {file_name:?}, size: {file_size}");
size += file_size;
}
}
size
}
#[tokio::test]
async fn test_compaction() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("raft-engine-logstore-test").unwrap();
let config = LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
max_log_file_size: ReadableSize::mb(2).0 as usize,
purge_threshold: ReadableSize::mb(4).0 as usize,
gc_interval: Duration::from_secs(5),
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
logstore.start().await.unwrap();
let namespace = Namespace::with_id(42);
for id in 0..4096 {
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
logstore.append(entry).await.unwrap();
}
let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
logstore.obsolete(namespace, 4000).await.unwrap();
tokio::time::sleep(Duration::from_secs(6)).await;
let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
debug!(
"Before purge: {}, after purge: {}",
before_purge, after_purge
);
assert!(before_purge > after_purge);
}
#[tokio::test]
async fn test_obsolete() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("raft-engine-logstore-test").unwrap();
let config = LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
max_log_file_size: ReadableSize::mb(2).0 as usize,
purge_threshold: ReadableSize::mb(4).0 as usize,
gc_interval: Duration::from_secs(5),
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
logstore.start().await.unwrap();
let namespace = Namespace::with_id(42);
for id in 0..1024 {
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
logstore.append(entry).await.unwrap();
}
logstore.obsolete(namespace.clone(), 100).await.unwrap();
assert_eq!(101, logstore.engine.first_index(namespace.id).unwrap());
let res = logstore.read(&namespace, 100).await.unwrap();
let mut vec = collect_entries(res).await;
vec.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
assert_eq!(101, vec.first().unwrap().id);
}
}

View File

@@ -14,12 +14,12 @@
use tempdir::TempDir;
use crate::fs::config::LogConfig;
use crate::fs::log::LocalFileLogStore;
use crate::raft_engine::log_store::RaftEngineLogStore;
use crate::LogConfig;
/// Create a tmp directory for write log, used for test.
// TODO: Add a test feature
pub async fn create_tmp_local_file_log_store(dir: &str) -> (LocalFileLogStore, TempDir) {
pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, TempDir) {
let dir = TempDir::new(dir).unwrap();
let cfg = LogConfig {
append_buffer_size: 128,
@@ -28,5 +28,6 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (LocalFileLogStore, T
..Default::default()
};
(LocalFileLogStore::open(&cfg).await.unwrap(), dir)
let logstore = RaftEngineLogStore::try_new(cfg).await.unwrap();
(logstore, dir)
}

View File

@@ -527,7 +527,7 @@ mod tests {
use datatypes::vectors::{
Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef,
};
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::manifest::Manifest;

View File

@@ -20,7 +20,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use datatypes::vectors::VectorRef;
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use storage::config::EngineConfig as StorageEngineConfig;

View File

@@ -68,6 +68,7 @@ mito = { path = "../mito", features = ["test"] }
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
storage = { path = "../storage" }
store-api = { path = "../store-api" }
tempdir = "0.3"
tokio = { version = "1.18", features = ["full"] }
tokio-test = "0.4"

View File

@@ -101,9 +101,11 @@ mod tests {
use query::QueryEngineFactory;
use super::*;
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use mito::engine::MitoEngine;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -121,8 +123,7 @@ mod tests {
..Default::default()
};
let log_store = LocalFileLogStore::open(&log_config).await.unwrap();
let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap();
let mock_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(

View File

@@ -205,11 +205,11 @@ pub enum Error {
},
#[snafu(display(
"Failed to mark WAL as stable, region id: {}, source: {}",
"Failed to mark WAL as obsolete, region id: {}, source: {}",
region_id,
source
))]
MarkWalStable {
MarkWalObsolete {
region_id: u64,
#[snafu(backtrace)]
source: BoxedError,
@@ -504,7 +504,7 @@ impl ErrorExt for Error {
PushBatch { source, .. } => source.status_code(),
CreateDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalStable { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
}
}

View File

@@ -245,7 +245,7 @@ impl<S: LogStore> Job for FlushJob<S> {
#[cfg(test)]
mod tests {
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use regex::Regex;
use super::*;

View File

@@ -26,8 +26,8 @@ use datatypes::prelude::{ScalarVector, WrapperType};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::fs::log::LocalFileLogStore;
use log_store::fs::noop::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::NoopLogStore;
use object_store::backend::fs;
use object_store::ObjectStore;
use store_api::storage::{
@@ -67,6 +67,10 @@ impl<S: LogStore> TesterBase<S> {
}
}
pub async fn close(&self) {
self.region.inner.wal.close().await.unwrap();
}
/// Put without version specified.
///
/// Format of data: (timestamp, v0), timestamp is key, v0 is value.
@@ -126,7 +130,7 @@ impl<S: LogStore> TesterBase<S> {
}
}
pub type FileTesterBase = TesterBase<LocalFileLogStore>;
pub type FileTesterBase = TesterBase<RaftEngineLogStore>;
fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
if enable_version_column {

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use datatypes::prelude::*;
use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor,
ColumnDescriptorBuilder, ColumnId, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
@@ -34,7 +34,7 @@ use crate::test_util::descriptor_util::RegionDescBuilder;
const REGION_NAME: &str = "region-alter-0";
async fn create_region_for_alter(store_dir: &str) -> RegionImpl<LocalFileLogStore> {
async fn create_region_for_alter(store_dir: &str) -> RegionImpl<RaftEngineLogStore> {
// Always disable version column in this test.
let metadata = tests::new_metadata(REGION_NAME, false);
@@ -103,6 +103,9 @@ impl AlterTester {
async fn reopen(&mut self) {
// Close the old region.
if let Some(base) = self.base.as_ref() {
base.close().await;
}
self.base = None;
// Reopen the region.
let store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;

View File

@@ -1,10 +1,10 @@
// Copyright 2022 Greptime Team
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,7 +14,8 @@
//! Region read/write tests.
use log_store::fs::log::LocalFileLogStore;
use common_telemetry::info;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{OpenOptions, SequenceNumber, WriteResponse};
use tempdir::TempDir;
@@ -30,7 +31,7 @@ async fn create_region_for_basic(
region_name: &str,
store_dir: &str,
enable_version_column: bool,
) -> RegionImpl<LocalFileLogStore> {
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(region_name, enable_version_column);
let store_config = config_util::new_store_config(region_name, store_dir).await;
@@ -70,6 +71,11 @@ impl Tester {
async fn try_reopen(&mut self) -> Result<bool> {
// Close the old region.
if let Some(base) = self.base.as_ref() {
info!("Reopen tester base");
base.close().await;
}
self.base = None;
// Reopen the region.
let store_config = config_util::new_store_config(&self.region_name, &self.store_dir).await;

View File

@@ -17,7 +17,7 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{OpenOptions, WriteResponse};
use tempdir::TempDir;
@@ -34,7 +34,7 @@ async fn create_region_for_flush(
store_dir: &str,
enable_version_column: bool,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<LocalFileLogStore> {
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(REGION_NAME, enable_version_column);
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
@@ -63,6 +63,9 @@ impl FlushTester {
async fn reopen(&mut self) {
// Close the old region.
if let Some(base) = self.base.as_ref() {
base.close().await;
}
self.base = None;
// Reopen the region.
let mut store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;

View File

@@ -19,7 +19,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::logstore::LogStore;
use store_api::storage::{
Chunk, ChunkReader, ReadContext, Region, ScanRequest, Snapshot, WriteContext, WriteRequest,
@@ -166,7 +166,7 @@ impl<S: LogStore> ProjectionTester<S> {
const REGION_NAME: &str = "region-projection-0";
async fn new_tester(store_dir: &str) -> ProjectionTester<LocalFileLogStore> {
async fn new_tester(store_dir: &str) -> ProjectionTester<RaftEngineLogStore> {
let metadata = new_metadata(REGION_NAME);
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;

View File

@@ -157,30 +157,34 @@ impl<'a> ParquetReader<'a> {
let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?;
let pruned_row_groups = self.predicate.prune_row_groups(
store_schema.schema().clone(),
builder.metadata().row_groups(),
);
let pruned_row_groups = self
.predicate
.prune_row_groups(
store_schema.schema().clone(),
builder.metadata().row_groups(),
)
.into_iter()
.enumerate()
.flat_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
let projection = ProjectionMask::roots(
builder.metadata().file_metadata().schema_descr(),
adapter.fields_to_read(),
);
let mut masked_stream = builder
let mut stream = builder
.with_projection(projection)
.with_row_groups(pruned_row_groups)
.build()
.context(ReadParquetSnafu {
file: self.file_path,
})?
.zip(futures_util::stream::iter(pruned_row_groups.into_iter()));
})?;
let file_name = self.file_path.to_string();
let chunk_stream = try_stream!({
while let Some((record_batch, valid)) = masked_stream.next().await {
if valid {
yield record_batch.context(ReadParquetSnafu { file: &file_name })?
}
while let Some(res) = stream.next().await {
yield res.context(ReadParquetSnafu { file: &file_name })?
}
});
@@ -330,6 +334,65 @@ mod tests {
);
}
#[tokio::test]
async fn test_parquet_read_large_batch() {
common_telemetry::init_default_ut_logging();
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema.clone());
let rows_total = 4096 * 4;
let mut keys_vec = Vec::with_capacity(rows_total);
let mut values_vec = Vec::with_capacity(rows_total);
for i in 0..rows_total {
keys_vec.push((i as i64, i as u64));
values_vec.push((Some(i as u64), Some(i as u64)));
}
memtable_tests::write_kvs(
&*memtable,
10, // sequence
OpType::Put,
&keys_vec, // keys
&values_vec, // values
);
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let sst_file_name = "test-read-large.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone());
writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
let operator = ObjectStore::new(
object_store::backend::fs::Builder::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
"test-read-large.parquet",
operator,
projected_schema,
Predicate::empty(),
);
let mut rows_fetched = 0;
let mut stream = reader.chunk_stream().await.unwrap();
while let Some(res) = stream.next_batch().await.unwrap() {
rows_fetched += res.num_rows();
}
assert_eq!(rows_total, rows_fetched);
}
#[tokio::test]
async fn test_parquet_reader() {
common_telemetry::init_default_ut_logging();

View File

@@ -14,8 +14,8 @@
use std::sync::Arc;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use object_store::backend::fs::Builder;
use object_store::ObjectStore;
@@ -35,7 +35,7 @@ fn log_store_dir(store_dir: &str) -> String {
pub async fn new_store_config(
region_name: &str,
store_dir: &str,
) -> StoreConfig<LocalFileLogStore> {
) -> StoreConfig<RaftEngineLogStore> {
let parent_dir = "";
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
@@ -50,7 +50,7 @@ pub async fn new_store_config(
log_file_dir: log_store_dir(store_dir),
..Default::default()
};
let log_store = Arc::new(LocalFileLogStore::open(&log_config).await.unwrap());
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
StoreConfig {
log_store,

View File

@@ -1,10 +1,10 @@
// Copyright 2022 Greptime Team
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,12 +19,15 @@ use common_error::prelude::BoxedError;
use futures::{stream, Stream, TryStreamExt};
use prost::Message;
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Entry;
use store_api::logstore::{AppendResponse, LogStore};
use store_api::logstore::entry::{Entry, Id};
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use crate::codec::{Decoder, Encoder};
use crate::error::{self, Error, MarkWalStableSnafu, Result};
use crate::error::{
DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result,
WalDataCorruptedSnafu, WriteWalSnafu,
};
use crate::proto::wal::{self, WalHeader};
use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder};
use crate::write_batch::Payload;
@@ -65,7 +68,7 @@ impl<S: LogStore> Wal<S> {
.obsolete(self.namespace.clone(), seq)
.await
.map_err(BoxedError::new)
.context(MarkWalStableSnafu {
.context(MarkWalObsoleteSnafu {
region_id: self.region_id,
})
}
@@ -74,6 +77,12 @@ impl<S: LogStore> Wal<S> {
pub fn region_id(&self) -> RegionId {
self.region_id
}
#[cfg(test)]
pub async fn close(&self) -> Result<()> {
let _ = self.store.stop().await;
Ok(())
}
}
impl<S: LogStore> Wal<S> {
@@ -96,7 +105,7 @@ impl<S: LogStore> Wal<S> {
seq: SequenceNumber,
mut header: WalHeader,
payload: Option<&Payload>,
) -> Result<(u64, usize)> {
) -> Result<Id> {
if let Some(p) = payload {
header.mutation_types = wal::gen_mutation_types(p);
}
@@ -114,7 +123,7 @@ impl<S: LogStore> Wal<S> {
encoder
.encode(p, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
.context(WriteWalSnafu {
region_id: self.region_id(),
})?;
}
@@ -129,7 +138,7 @@ impl<S: LogStore> Wal<S> {
.read(&self.namespace, start_seq)
.await
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
.context(ReadWalSnafu {
region_id: self.region_id(),
})?
// Handle the error when reading from the stream.
@@ -147,19 +156,19 @@ impl<S: LogStore> Wal<S> {
Ok(Box::pin(stream))
}
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> {
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<u64> {
let e = self.store.entry(bytes, seq, self.namespace.clone());
let res = self
let response = self
.store
.append(e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
.context(WriteWalSnafu {
region_id: self.region_id(),
})?;
Ok((res.entry_id(), res.offset()))
Ok(response.entry_id)
}
fn decode_entry<E: Entry>(
@@ -174,7 +183,7 @@ impl<S: LogStore> Wal<S> {
ensure!(
data_pos <= input.len(),
error::WalDataCorruptedSnafu {
WalDataCorruptedSnafu {
region_id: self.region_id(),
message: format!(
"Not enough input buffer, expected data position={}, actual buffer length={}",
@@ -192,7 +201,7 @@ impl<S: LogStore> Wal<S> {
let payload = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
.context(ReadWalSnafu {
region_id: self.region_id(),
})?;
@@ -209,7 +218,7 @@ impl Encoder for WalHeaderEncoder {
fn encode(&self, item: &WalHeader, dst: &mut Vec<u8>) -> Result<()> {
item.encode_length_delimited(dst)
.map_err(|err| err.into())
.context(error::EncodeWalHeaderSnafu)
.context(EncodeWalHeaderSnafu)
}
}
@@ -222,12 +231,12 @@ impl Decoder for WalHeaderDecoder {
fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> {
let mut data_pos = prost::decode_length_delimiter(src)
.map_err(|err| err.into())
.context(error::DecodeWalHeaderSnafu)?;
.context(DecodeWalHeaderSnafu)?;
data_pos += prost::length_delimiter_len(data_pos);
let wal_header = WalHeader::decode_length_delimited(src)
.map_err(|err| err.into())
.context(error::DecodeWalHeaderSnafu)?;
.context(DecodeWalHeaderSnafu)?;
Ok((data_pos, wal_header))
}
@@ -247,13 +256,9 @@ mod tests {
let res = wal.write(0, b"test1").await.unwrap();
assert_eq!(0, res.0);
assert_eq!(0, res.1);
assert_eq!(0, res);
let res = wal.write(1, b"test2").await.unwrap();
assert_eq!(1, res.0);
assert_eq!(5 + 32, res.1);
assert_eq!(1, res);
}
#[tokio::test]
@@ -263,18 +268,15 @@ mod tests {
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new(0, Arc::new(log_store));
let header = WalHeader::with_last_manifest_version(111);
let (seq_num, _) = wal.write_to_wal(3, header, None).await?;
assert_eq!(3, seq_num);
let seq_num = 3;
wal.write_to_wal(seq_num, header, None).await?;
let mut stream = wal.read_from_wal(seq_num).await?;
let mut data = vec![];
while let Some((seq_num, header, write_batch)) = stream.try_next().await? {
data.push((seq_num, header, write_batch));
}
assert_eq!(1, data.len());
assert_eq!(seq_num, data[0].0);
assert_eq!(111, data[0].1.last_manifest_version);
assert!(data[0].2.is_none());

View File

@@ -16,7 +16,7 @@
use common_error::prelude::ErrorExt;
use crate::logstore::entry::{Entry, Id, Offset};
use crate::logstore::entry::{Entry, Id};
use crate::logstore::entry_stream::SendableEntryStream;
use crate::logstore::namespace::Namespace;
@@ -30,21 +30,20 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
type Error: ErrorExt + Send + Sync + 'static;
type Namespace: Namespace;
type Entry: Entry;
type AppendResponse: AppendResponse;
async fn start(&self) -> Result<(), Self::Error>;
/// Stop components of logstore.
async fn stop(&self) -> Result<(), Self::Error>;
/// Append an `Entry` to WAL with given namespace
async fn append(&self, mut e: Self::Entry) -> Result<Self::AppendResponse, Self::Error>;
/// Append an `Entry` to WAL with given namespace and return append response containing
/// the entry id.
async fn append(&self, mut e: Self::Entry) -> Result<AppendResponse, Self::Error>;
/// Append a batch of entries atomically and return the offset of first entry.
async fn append_batch(
&self,
ns: &Self::Namespace,
e: Vec<Self::Entry>,
) -> Result<Id, Self::Error>;
) -> Result<Vec<Id>, Self::Error>;
/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`.
@@ -76,8 +75,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error>;
}
pub trait AppendResponse: Send + Sync {
fn entry_id(&self) -> Id;
fn offset(&self) -> Offset;
#[derive(Debug)]
pub struct AppendResponse {
pub entry_id: Id,
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::buffer::{Buffer, BufferMut};
use common_error::ext::ErrorExt;
use crate::logstore::namespace::Namespace;
@@ -22,7 +21,7 @@ pub type Epoch = u64;
pub type Id = u64;
/// Entry is the minimal data storage unit in `LogStore`.
pub trait Entry: Encode + Send + Sync {
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
type Namespace: Namespace;
@@ -32,32 +31,5 @@ pub trait Entry: Encode + Send + Sync {
/// Return entry id that monotonically increments.
fn id(&self) -> Id;
/// Return file offset of entry.
fn offset(&self) -> Offset;
fn set_id(&mut self, id: Id);
/// Returns epoch of entry.
fn epoch(&self) -> Epoch;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn namespace(&self) -> Self::Namespace;
}
pub trait Encode: Sized {
type Error: ErrorExt + Send + Sync + 'static;
/// Encodes item to given byte slice and return encoded size on success;
/// # Panics
/// If given buffer is not large enough to hold the encoded item.
fn encode_to<T: BufferMut>(&self, buf: &mut T) -> Result<usize, Self::Error>;
/// Decodes item from given buffer.
fn decode<T: Buffer>(buf: &mut T) -> Result<Self, Self::Error>;
/// Return the size in bytes of the encoded item;
fn encoded_size(&self) -> usize;
}

View File

@@ -36,18 +36,13 @@ mod tests {
use futures::StreamExt;
use super::*;
use crate::logstore::entry::{Encode, Epoch, Id, Offset};
pub use crate::logstore::entry::Id;
pub struct SimpleEntry {
/// Offset of current entry
offset: Offset,
/// Epoch of current entry
epoch: Epoch,
/// Binary data of current entry
data: Vec<u8>,
}
use common_base::buffer::{Buffer, BufferMut};
use common_error::prelude::{ErrorExt, Snafu};
use snafu::{Backtrace, ErrorCompat};
@@ -74,23 +69,6 @@ mod tests {
}
}
impl Encode for SimpleEntry {
type Error = Error;
fn encode_to<T: BufferMut>(&self, buf: &mut T) -> Result<usize, Self::Error> {
buf.write_from_slice(self.data.as_slice()).unwrap();
Ok(self.data.as_slice().len())
}
fn decode<T: Buffer>(_buf: &mut T) -> Result<Self, Self::Error> {
unimplemented!()
}
fn encoded_size(&self) -> usize {
self.data.as_slice().len()
}
}
impl Entry for SimpleEntry {
type Error = Error;
type Namespace = Namespace;
@@ -103,37 +81,15 @@ mod tests {
0u64
}
fn offset(&self) -> Offset {
self.offset
}
fn set_id(&mut self, _id: Id) {}
fn epoch(&self) -> Epoch {
self.epoch
}
fn len(&self) -> usize {
self.data.len()
}
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn namespace(&self) -> Self::Namespace {
Namespace {}
}
}
impl SimpleEntry {
pub fn new(data: impl AsRef<[u8]>, offset: Offset, epoch: u64) -> Self {
pub fn new(data: impl AsRef<[u8]>) -> Self {
let data = data.as_ref().to_vec();
Self {
data,
offset,
epoch,
}
Self { data }
}
}
@@ -165,9 +121,8 @@ mod tests {
#[tokio::test]
pub async fn test_entry_stream() {
let stream = async_stream::stream!({
yield Ok(vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)])
});
let stream =
async_stream::stream!({ yield Ok(vec![SimpleEntry::new("test_entry".as_bytes())]) });
let mut stream_impl = EntryStreamImpl {
inner: Box::pin(stream),

View File

@@ -16,6 +16,6 @@ use std::hash::Hash;
pub type Id = u64;
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + Eq {
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq {
fn id(&self) -> Id;
}