diff --git a/Cargo.lock b/Cargo.lock index ef8adf7e1b..758126a274 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2488,6 +2488,17 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de853764b47027c2e862a995c34978ffa63c1501f2e15f987ba11bd4f9bba193" +[[package]] +name = "fail" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe5e43d0f78a42ad591453aedb1d7ae631ce7ee445c7643691055a9ed8d3b01c" +dependencies = [ + "log", + "once_cell", + "rand 0.8.5", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -2691,11 +2702,21 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a802d974cc18ee7fe1a7868fc9ce31086294fd96ba62f8da64ecb44e92a2653" dependencies = [ - "frunk_core", - "frunk_proc_macro_helpers", - "proc-macro-hack", - "quote", - "syn", + "frunk_core", + "frunk_proc_macro_helpers", + "proc-macro-hack", + "quote", + "syn", +] + +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", ] [[package]] @@ -3161,19 +3182,25 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "unicode-bidi", + "unicode-normalization", ] +[[package]] +name = "if_chain" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed" + [[package]] name = "indexmap" version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ - "autocfg", - "hashbrown 0.12.3", - "serde", + "autocfg", + "hashbrown 0.12.3", + "serde", ] [[package]] @@ -3522,22 +3549,25 @@ dependencies = [ "async-stream", "async-trait", "base64 0.13.1", - "byteorder", - "bytes", - "common-base", - "common-error", - "common-runtime", - "common-telemetry", - "crc", - "futures", - "futures-util", - "hex", - "rand 0.8.5", - "snafu", - "store-api", - "tempdir", - "tokio", - "tokio-util", + "byteorder", + "bytes", + "common-base", + "common-error", + "common-runtime", + "common-telemetry", + "crc", + "futures", + "futures-util", + "hex", + "protobuf", + "protobuf-build", + "raft-engine", + "rand 0.8.5", + "snafu", + "store-api", + "tempdir", + "tokio", + "tokio-util", ] [[package]] @@ -4127,10 +4157,24 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa52e972a9a719cecb6864fb88568781eb706bac2cd1d4f04a648542dbf78069" dependencies = [ - "bitflags", - "cfg-if 1.0.0", - "libc", - "memoffset 0.6.5", + "bitflags", + "cfg-if 1.0.0", + "libc", + "memoffset 0.6.5", +] + +[[package]] +name = "nix" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if 1.0.0", + "libc", + "memoffset 0.6.5", + "pin-utils", ] [[package]] @@ -4139,8 +4183,8 @@ version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" dependencies = [ - "memchr", - "minimal-lexical", + "memchr", + "minimal-lexical", ] [[package]] @@ -4185,8 +4229,19 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ae39348c8bc5fbd7f40c727a9925f03517afd2ab27d46702108b6a7e5414c19" dependencies = [ - "num-traits", - "serde", + "num-traits", + "serde", +] + +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -4195,8 +4250,8 @@ version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ - "autocfg", - "num-traits", + "autocfg", + "num-traits", ] [[package]] @@ -5034,16 +5089,43 @@ version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" dependencies = [ - "unicode-ident", + "unicode-ident", +] + +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "prometheus-static-metric" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8f30cdb09c39930b8fa5e0f23cbb895ab3f766b187403a0ba0956fc1ef4f0e5" +dependencies = [ + "lazy_static", + "proc-macro2", + "quote", + "syn", ] [[package]] name = "promql" version = "0.1.0" dependencies = [ - "common-error", - "promql-parser", - "snafu", + "common-error", + "promql-parser", + "snafu", ] [[package]] @@ -5162,8 +5244,38 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a" dependencies = [ - "bytes", - "prost 0.11.3", + "bytes", + "prost 0.11.3", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +dependencies = [ + "bytes", +] + +[[package]] +name = "protobuf-build" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb3c02f54ecaf12572c1a60dbdb36b1f8f713a16105881143f2be84cca5bbe3" +dependencies = [ + "bitflags", + "protobuf", + "protobuf-codegen", + "regex", +] + +[[package]] +name = "protobuf-codegen" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +dependencies = [ + "protobuf", ] [[package]] @@ -5172,7 +5284,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" dependencies = [ - "ptr_meta_derive", + "ptr_meta_derive", ] [[package]] @@ -5295,8 +5407,41 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" dependencies = [ - "endian-type", - "nibble_vec", + "endian-type", + "nibble_vec", +] + +[[package]] +name = "raft-engine" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b66e735395b7ff12f3ebbb4794006aecb365c4c9a82141279b58b227ac3a8b" +dependencies = [ + "byteorder", + "crc32fast", + "crossbeam", + "fail", + "fs2", + "hashbrown 0.12.3", + "hex", + "if_chain", + "lazy_static", + "libc", + "log", + "lz4-sys", + "nix 0.25.1", + "num-derive", + "num-traits", + "parking_lot", + "prometheus", + "prometheus-static-metric", + "protobuf", + "rayon", + "scopeguard", + "serde", + "serde_repr", + "strum", + "thiserror", ] [[package]] @@ -5305,9 +5450,9 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" dependencies = [ - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", "rdrand", "winapi", ] @@ -6220,20 +6365,21 @@ dependencies = [ "rustpython-ast", "rustpython-codegen", "rustpython-compiler", - "rustpython-compiler-core", - "rustpython-parser", - "rustpython-pylib", - "rustpython-stdlib", - "rustpython-vm", - "serde", - "session", - "snafu", - "sql", - "storage", - "table", - "tempdir", - "tokio", - "tokio-test", + "rustpython-compiler-core", + "rustpython-parser", + "rustpython-pylib", + "rustpython-stdlib", + "rustpython-vm", + "serde", + "session", + "snafu", + "sql", + "storage", + "store-api", + "table", + "tempdir", + "tokio", + "tokio-test", ] [[package]] @@ -6366,7 +6512,18 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b04f22b563c91331a10074bda3dd5492e3cc39d56bd557e91c0af42b6c7341" dependencies = [ - "serde", + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a5ec9fa74a20ebbe5d9ac23dac1fc96ba0ecfe9f50f2843b52e537b10fbcb4e" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -6375,9 +6532,9 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ - "form_urlencoded", - "itoa 1.0.5", - "ryu", + "form_urlencoded", + "itoa 1.0.5", + "ryu", "serde", ] diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index df39d3a5ab..9fd68d1434 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -367,7 +367,7 @@ pub struct TableEntryValue { #[cfg(test)] mod tests { - use log_store::fs::noop::NoopLogStore; + use log_store::NoopLogStore; use mito::config::EngineConfig; use mito::engine::MitoEngine; use object_store::ObjectStore; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 697d931301..fa19cecf77 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -21,8 +21,8 @@ use catalog::remote::MetaKvBackend; use catalog::CatalogManagerRef; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::logging::info; -use log_store::fs::config::LogConfig; -use log_store::fs::log::LocalFileLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use log_store::LogConfig; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use mito::config::EngineConfig as TableEngineConfig; @@ -42,7 +42,7 @@ use table::table::TableIdProviderRef; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, Result, StartLogStoreSnafu, + NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -52,7 +52,7 @@ mod grpc; mod script; mod sql; -pub(crate) type DefaultEngine = MitoEngine>; +pub(crate) type DefaultEngine = MitoEngine>; // An abstraction to read/write services. pub struct Instance { @@ -62,7 +62,7 @@ pub struct Instance { pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, - pub(crate) logstore: Arc, + pub(crate) logstore: Arc, } pub type InstanceRef = Arc; @@ -70,7 +70,7 @@ pub type InstanceRef = Arc; impl Instance { pub async fn new(opts: &DatanodeOptions) -> Result { let object_store = new_object_store(&opts.storage).await?; - let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?); + let logstore = Arc::new(create_log_store(&opts.wal_dir).await?); let meta_client = match opts.mode { Mode::Standalone => None, @@ -165,11 +165,11 @@ impl Instance { } pub async fn start(&self) -> Result<()> { + self.logstore.start().await.context(StartLogStoreSnafu)?; self.catalog_manager .start() .await .context(NewCatalogSnafu)?; - self.logstore.start().await.context(StartLogStoreSnafu)?; if let Some(task) = &self.heartbeat_task { task.start().await?; } @@ -275,9 +275,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul Ok(meta_client) } -pub(crate) async fn create_local_file_log_store( - path: impl AsRef, -) -> Result { +pub(crate) async fn create_log_store(path: impl AsRef) -> Result { let path = path.as_ref(); // create WAL directory fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?; @@ -289,9 +287,6 @@ pub(crate) async fn create_local_file_log_store( ..Default::default() }; - let log_store = LocalFileLogStore::open(&log_config) - .await - .context(error::OpenLogStoreSnafu)?; - - Ok(log_store) + let logstore = RaftEngineLogStore::try_new(log_config).context(OpenLogStoreSnafu)?; + Ok(logstore) } diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index cff79afad1..0e3c804d35 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -29,7 +29,7 @@ use table::table::TableIdProvider; use crate::datanode::DatanodeOptions; use crate::error::Result; use crate::heartbeat::HeartbeatTask; -use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance}; +use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance}; use crate::script::ScriptExecutor; use crate::sql::SqlHandler; @@ -41,7 +41,7 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let object_store = new_object_store(&opts.storage).await?; - let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?); + let logstore = Arc::new(create_log_store(&opts.wal_dir).await?); let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index aa41e8a255..da3fde35c9 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -125,7 +125,7 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use datatypes::value::Value; - use log_store::fs::noop::NoopLogStore; + use log_store::NoopLogStore; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; use object_store::services::fs::Builder; diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 76540fafe8..3fbac5838d 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -4,10 +4,15 @@ version.workspace = true edition.workspace = true license.workspace = true +[build-dependencies] +protobuf-build = { version = "0.14", default-features = false, features = [ + "protobuf-codec", +] } + [dependencies] arc-swap = "1.5" -async-stream = "0.3" -async-trait = "0.1" +async-stream.workspace = true +async-trait.workspace = true base64 = "0.13" byteorder = "1.4" bytes = "1.1" @@ -16,9 +21,11 @@ common-error = { path = "../common/error" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } crc = "3.0" -futures = "0.3" futures-util = "0.3" +futures.workspace = true hex = "0.4" +protobuf = { version = "2", features = ["bytes"] } +raft-engine = "0.3" snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } tempdir = "0.3" diff --git a/src/log-store/src/fs/io.rs b/src/log-store/build.rs similarity index 63% rename from src/log-store/src/fs/io.rs rename to src/log-store/build.rs index 0d4bb2dd55..54cc5cb559 100644 --- a/src/log-store/src/fs/io.rs +++ b/src/log-store/build.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(all(unix, not(miri)))] -mod unix; -// todo(hl): maybe support windows seek_write/seek_read -#[cfg(any(not(unix), miri))] -mod fallback; +use protobuf_build::Builder; -#[cfg(any(all(not(unix), not(windows)), miri))] -pub use fallback::{pread_exact, pwrite_all}; -#[cfg(all(unix, not(miri)))] -pub use unix::{pread_exact, pwrite_all}; +fn main() { + let base = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string()); + Builder::new() + .search_dir_for_protos(&format!("{base}/proto")) + .includes(&[format!("{base}/include"), format!("{base}/proto")]) + .include_google_protos() + .generate() +} diff --git a/src/log-store/proto/logstore.proto b/src/log-store/proto/logstore.proto new file mode 100644 index 0000000000..f7c2c4acc3 --- /dev/null +++ b/src/log-store/proto/logstore.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package logstore; + +message EntryImpl { + uint64 id = 1; + uint64 namespace_id = 2; + bytes data = 3; +} + +message LogStoreState { + uint64 last_index = 1; +} + +message NamespaceImpl { + uint64 id = 1; +} diff --git a/src/log-store/src/fs/config.rs b/src/log-store/src/config.rs similarity index 82% rename from src/log-store/src/fs/config.rs rename to src/log-store/src/config.rs index 3a6c84a2ee..d6e0063237 100644 --- a/src/log-store/src/fs/config.rs +++ b/src/log-store/src/config.rs @@ -20,6 +20,9 @@ pub struct LogConfig { pub max_log_file_size: usize, pub log_file_dir: String, pub gc_interval: Duration, + pub purge_threshold: usize, + pub read_batch_size: usize, + pub sync_write: bool, } impl Default for LogConfig { @@ -31,6 +34,9 @@ impl Default for LogConfig { max_log_file_size: 1024 * 1024 * 1024, log_file_dir: "/tmp/greptimedb".to_string(), gc_interval: Duration::from_secs(10 * 60), + purge_threshold: 1024 * 1024 * 1024 * 50, + read_batch_size: 128, + sync_write: false, } } } @@ -49,5 +55,8 @@ mod tests { assert_eq!(1024 * 1024 * 1024, default.max_log_file_size); assert_eq!(128, default.append_buffer_size); assert_eq!(Duration::from_secs(600), default.gc_interval); + assert_eq!(1024 * 1024 * 1024 * 50, default.purge_threshold); + assert_eq!(128, default.read_batch_size); + assert!(!default.sync_write); } } diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index e510038ddb..fe4cb40c4e 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -14,7 +14,6 @@ use std::any::Any; -use common_error::ext::BoxedError; use common_error::prelude::{ErrorExt, Snafu}; use snafu::{Backtrace, ErrorCompat}; use tokio::task::JoinError; @@ -22,83 +21,46 @@ use tokio::task::JoinError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to encode entry, source: {}", source))] - Encode { source: common_base::buffer::Error }, - - #[snafu(display("Failed to decode entry, remain size: {}", size))] - Decode { size: usize, backtrace: Backtrace }, - - #[snafu(display("No enough data to decode, try again"))] - DecodeAgain, - - #[snafu(display("Failed to append entry, source: {}", source))] - Append { - #[snafu(backtrace)] - source: BoxedError, - }, - - #[snafu(display("Failed to wait for log file write complete, source: {}", source))] - Write { source: tokio::task::JoinError }, - - #[snafu(display("Entry corrupted, msg: {}", msg))] - Corrupted { msg: String, backtrace: Backtrace }, - - #[snafu(display("IO error, source: {}", source))] - Io { - source: std::io::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to create path {}, source: {}", path, source))] - CreateDir { - path: String, - source: std::io::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to read path {}, source: {}", path, source))] - ReadPath { - path: String, - source: std::io::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to open log file {}, source: {}", file_name, source))] - OpenLog { - file_name: String, - source: std::io::Error, - backtrace: Backtrace, - }, - - #[snafu(display("File name {} illegal", file_name))] - FileNameIllegal { - file_name: String, - backtrace: Backtrace, - }, - - #[snafu(display("Internal error, msg: {}", msg))] - Internal { msg: String, backtrace: Backtrace }, - - #[snafu(display("End of LogFile"))] - Eof, - - #[snafu(display("File duplicate on start: {}", msg))] - DuplicateFile { msg: String }, - - #[snafu(display("Log file suffix is illegal: {}", suffix))] - SuffixIllegal { suffix: String }, - - #[snafu(display("Failed while waiting for write to finish, source: {}", source))] - WaitWrite { source: tokio::task::JoinError }, - - #[snafu(display("Invalid logstore status, msg: {}", msg))] - InvalidState { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to wait for gc task to stop, source: {}", source))] WaitGcTaskStop { source: JoinError, backtrace: Backtrace, }, + + #[snafu(display("Failed to add entry to LogBatch, source: {}", source))] + AddEntryLogBatch { + source: raft_engine::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to perform raft-engine operation, source: {}", source))] + RaftEngine { + source: raft_engine::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Log store not started yet"))] + IllegalState { backtrace: Backtrace }, + + #[snafu(display("Namespace is illegal: {}", ns))] + IllegalNamespace { ns: u64, backtrace: Backtrace }, + + #[snafu(display( + "Failed to fetch entries from namespace: {}, start: {}, end: {}, max size: {}, source: {}", + start, + end, + max_size, + source, + ns + ))] + FetchEntry { + ns: u64, + start: u64, + end: u64, + max_size: usize, + source: raft_engine::Error, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/log-store/src/fs.rs b/src/log-store/src/fs.rs deleted file mode 100644 index e8f334be49..0000000000 --- a/src/log-store/src/fs.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use store_api::logstore::entry::{Id, Offset}; -use store_api::logstore::AppendResponse; - -mod chunk; -pub mod config; -mod crc; -mod entry; -mod file; -mod file_name; -mod index; -mod io; -pub mod log; -mod namespace; -pub mod noop; - -#[derive(Debug, PartialEq, Eq)] -pub struct AppendResponseImpl { - entry_id: Id, - offset: Offset, -} - -impl AppendResponse for AppendResponseImpl { - #[inline] - fn entry_id(&self) -> Id { - self.entry_id - } - - #[inline] - fn offset(&self) -> Offset { - self.offset - } -} diff --git a/src/log-store/src/fs/chunk.rs b/src/log-store/src/fs/chunk.rs deleted file mode 100644 index a59b34e557..0000000000 --- a/src/log-store/src/fs/chunk.rs +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::LinkedList; - -use common_base::buffer::{Buffer, UnderflowSnafu}; -use snafu::ensure; - -pub const DEFAULT_CHUNK_SIZE: usize = 4096; - -#[derive(Debug)] -pub(crate) struct Chunk { - // internal data - pub data: Box<[u8]>, - // read offset - pub read_offset: usize, - // write offset - pub write_offset: usize, -} - -impl Default for Chunk { - fn default() -> Self { - let data = vec![0u8; DEFAULT_CHUNK_SIZE].into_boxed_slice(); - Self { - write_offset: 0, - read_offset: 0, - data, - } - } -} - -impl Chunk { - #[cfg(test)] - pub fn copy_from_slice(s: &[u8]) -> Self { - let src_len = s.len(); - // before [box syntax](https://github.com/rust-lang/rust/issues/49733) becomes stable, - // we can only initialize an array on heap like this. - let mut data = vec![0u8; src_len].into_boxed_slice(); - data[0..src_len].copy_from_slice(s); - Self { - read_offset: 0, - write_offset: src_len, - data, - } - } - - pub fn new(data: Box<[u8]>, write: usize) -> Self { - Self { - write_offset: write, - read_offset: 0, - data, - } - } - - pub fn len(&self) -> usize { - self.write_offset - self.read_offset - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// allows short read. - /// Calling read **will not** advance read cursor, must call `advance` manually. - pub fn read(&self, dst: &mut [u8]) -> usize { - let size = self.len().min(dst.len()); - let range = self.read_offset..(self.read_offset + size); - dst[0..size].copy_from_slice(&self.data[range]); - size - } - - pub fn advance(&mut self, by: usize) -> usize { - assert!( - self.write_offset >= self.read_offset, - "Illegal chunk state, read: {}, write: {}", - self.read_offset, - self.write_offset - ); - let step = by.min(self.write_offset - self.read_offset); - self.read_offset += step; - step - } -} - -pub struct ChunkList { - chunks: LinkedList, -} - -impl ChunkList { - pub fn new() -> Self { - Self { - chunks: LinkedList::new(), - } - } - - pub(crate) fn push(&mut self, chunk: Chunk) { - self.chunks.push_back(chunk); - } -} - -impl Buffer for ChunkList { - fn remaining_size(&self) -> usize { - self.chunks.iter().map(|c| c.len()).sum() - } - - fn peek_to_slice(&self, mut dst: &mut [u8]) -> common_base::buffer::Result<()> { - ensure!(self.remaining_size() >= dst.len(), UnderflowSnafu); - - for c in &self.chunks { - if dst.is_empty() { - break; - } - let read = c.read(dst); - dst = &mut dst[read..]; - } - - ensure!(dst.is_empty(), UnderflowSnafu); - Ok(()) - } - - fn read_to_slice(&mut self, dst: &mut [u8]) -> common_base::buffer::Result<()> { - self.peek_to_slice(dst)?; - self.advance_by(dst.len()); - Ok(()) - } - - fn advance_by(&mut self, by: usize) { - let mut left = by; - while left > 0 { - if let Some(c) = self.chunks.front_mut() { - let actual = c.advance(left); - if c.is_empty() { - self.chunks.pop_front(); // remove first chunk - } - left -= actual; - } else { - panic!("Advance step [{by}] exceeds max readable bytes"); - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - pub fn test_chunk() { - let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes()); - assert_eq!(5, chunk.write_offset); - assert_eq!(0, chunk.read_offset); - assert_eq!(5, chunk.len()); - - let mut dst = [0u8; 3]; - assert_eq!(3, chunk.read(&mut dst)); - assert_eq!(5, chunk.write_offset); - assert_eq!(0, chunk.read_offset); - assert_eq!(5, chunk.len()); - } - - #[test] - pub fn test_chunk_short_read() { - let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes()); - - let mut dst = vec![0u8; 8]; - let read = chunk.read(&mut dst); - assert_eq!(5, read); - assert_eq!(vec![b'h', b'e', b'l', b'l', b'o', 0x0, 0x0, 0x0], dst); - } - - #[test] - pub fn test_chunk_advance() { - let mut chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes()); - let mut dst = vec![0u8; 8]; - assert_eq!(5, chunk.read(&mut dst)); - assert_eq!(0, chunk.read_offset); - assert_eq!(5, chunk.write_offset); - - assert_eq!(1, chunk.advance(1)); - assert_eq!(1, chunk.read_offset); - assert_eq!(5, chunk.write_offset); - - assert_eq!(4, chunk.advance(5)); - assert_eq!(5, chunk.read_offset); - assert_eq!(5, chunk.write_offset); - } - - #[test] - pub fn test_composite_chunk_read() { - let mut chunks = ChunkList { - chunks: LinkedList::new(), - }; - - chunks.push(Chunk::copy_from_slice("abcd".as_bytes())); - chunks.push(Chunk::copy_from_slice("12345".as_bytes())); - assert_eq!(9, chunks.remaining_size()); - - let mut dst = [0u8; 2]; - chunks.peek_to_slice(&mut dst).unwrap(); - chunks.advance_by(2); - assert_eq!([b'a', b'b'], dst); - assert_eq!(2, chunks.chunks.len()); - - let mut dst = [0u8; 3]; - chunks.peek_to_slice(&mut dst).unwrap(); - chunks.advance_by(3); - assert_eq!([b'c', b'd', b'1'], dst); - assert_eq!(4, chunks.remaining_size()); - assert_eq!(1, chunks.chunks.len()); - - let mut dst = [0u8; 4]; - chunks.peek_to_slice(&mut dst).unwrap(); - chunks.advance_by(4); - assert_eq!([b'2', b'3', b'4', b'5'], dst); - assert_eq!(0, chunks.remaining_size()); - assert_eq!(0, chunks.chunks.len()); - - chunks.push(Chunk::copy_from_slice("uvwxyz".as_bytes())); - assert_eq!(6, chunks.remaining_size()); - assert_eq!(1, chunks.chunks.len()); - } -} diff --git a/src/log-store/src/fs/crc.rs b/src/log-store/src/fs/crc.rs deleted file mode 100644 index 7410d21616..0000000000 --- a/src/log-store/src/fs/crc.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crc::{Crc, CRC_32_ISCSI}; - -pub const CRC_ALGO: Crc = Crc::::new(&CRC_32_ISCSI); diff --git a/src/log-store/src/fs/entry.rs b/src/log-store/src/fs/entry.rs deleted file mode 100644 index 5bc92cadbb..0000000000 --- a/src/log-store/src/fs/entry.rs +++ /dev/null @@ -1,394 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use bytes::{Bytes, BytesMut}; -use common_base::buffer::{Buffer, BufferMut}; -use futures::Stream; -use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::{Encode, Entry, Epoch, Id, Offset}; -use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream}; -use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; - -use crate::error::{CorruptedSnafu, DecodeAgainSnafu, DecodeSnafu, EncodeSnafu, Error}; -use crate::fs::crc; -use crate::fs::namespace::LocalNamespace; - -// length + offset + namespace id + epoch + crc -const ENTRY_MIN_LEN: usize = HEADER_LENGTH + 4; -// length + offset + namespace id + epoch -const HEADER_LENGTH: usize = 4 + 8 + 8 + 8; - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct EntryImpl { - pub data: Vec, - pub offset: Offset, - pub id: Id, - pub namespace_id: NamespaceId, - pub epoch: Epoch, -} - -impl EntryImpl { - #[cfg(test)] - fn set_offset(&mut self, offset: Offset) { - self.offset = offset; - } -} - -impl Encode for EntryImpl { - type Error = Error; - - /// Entry binary format (Little endian): - /// - // ```text - // +--------+--------------+-------+--------+--------+--------+ - // |entry id| namespace id | epoch | length | data | CRC | - // +--------+--------------+-------+--------+--------+--------+ - // | 8 bytes| 8 bytes |8 bytes| 4 bytes|| 4 bytes| - // +--------+--------------+-------+--------+--------+--------+ - // ``` - /// - fn encode_to(&self, buf: &mut T) -> Result { - let data_length = self.data.len(); - buf.write_u64_le(self.id).context(EncodeSnafu)?; - buf.write_u64_le(self.namespace_id).context(EncodeSnafu)?; - buf.write_u64_le(self.epoch).context(EncodeSnafu)?; - buf.write_u32_le(data_length as u32).context(EncodeSnafu)?; - buf.write_from_slice(self.data.as_slice()) - .context(EncodeSnafu)?; - let checksum = crc::CRC_ALGO.checksum(buf.as_slice()); - buf.write_u32_le(checksum).context(EncodeSnafu)?; - Ok(data_length + ENTRY_MIN_LEN) - } - - fn decode(buf: &mut T) -> Result { - ensure!(buf.remaining_size() >= HEADER_LENGTH, DecodeAgainSnafu); - - macro_rules! map_err { - ($stmt: expr, $var: ident) => { - $stmt.map_err(|_| { - DecodeSnafu { - size: $var.remaining_size(), - } - .build() - }) - }; - } - - let mut digest = crc::CRC_ALGO.digest(); - let mut header = [0u8; HEADER_LENGTH]; - buf.peek_to_slice(&mut header).unwrap(); - - let mut header = &header[..]; - let id = header.read_u64_le().unwrap(); // unwrap here is safe because header bytes must be present - digest.update(&id.to_le_bytes()); - - let namespace_id = header.read_u64_le().unwrap(); - digest.update(&namespace_id.to_le_bytes()); - - let epoch = header.read_u64_le().unwrap(); - digest.update(&epoch.to_le_bytes()); - - let data_len = header.read_u32_le().unwrap(); - digest.update(&data_len.to_le_bytes()); - - ensure!( - buf.remaining_size() >= ENTRY_MIN_LEN + data_len as usize, - DecodeAgainSnafu - ); - - buf.advance_by(HEADER_LENGTH); - - let mut data = vec![0u8; data_len as usize]; - map_err!(buf.peek_to_slice(&mut data), buf)?; - digest.update(&data); - buf.advance_by(data_len as usize); - - let crc_read = map_err!(buf.peek_u32_le(), buf)?; - let crc_calc = digest.finalize(); - - ensure!( - crc_read == crc_calc, - CorruptedSnafu { - msg: format!( - "CRC mismatch while decoding entry, read: {}, calc: {}", - hex::encode_upper(crc_read.to_le_bytes()), - hex::encode_upper(crc_calc.to_le_bytes()) - ) - } - ); - - buf.advance_by(4); - - Ok(Self { - id, - data, - epoch, - offset: 0, - namespace_id, - }) - } - - fn encoded_size(&self) -> usize { - self.data.len() + ENTRY_MIN_LEN - } -} - -impl EntryImpl { - pub(crate) fn new(data: impl AsRef<[u8]>, id: Id, namespace: LocalNamespace) -> EntryImpl { - EntryImpl { - id, - data: data.as_ref().to_vec(), - offset: 0, - epoch: 0, - namespace_id: namespace.id(), - } - } -} - -impl Entry for EntryImpl { - type Error = Error; - type Namespace = LocalNamespace; - - fn data(&self) -> &[u8] { - &self.data - } - - fn id(&self) -> Id { - self.id - } - - fn offset(&self) -> Offset { - self.offset - } - - fn set_id(&mut self, id: Id) { - self.id = id; - } - - fn epoch(&self) -> Epoch { - self.epoch - } - - fn len(&self) -> usize { - ENTRY_MIN_LEN + self.data.len() - } - - fn is_empty(&self) -> bool { - self.data.is_empty() - } - - fn namespace(&self) -> Self::Namespace { - LocalNamespace::new(self.namespace_id) - } -} - -impl TryFrom for EntryImpl { - type Error = Error; - - fn try_from(mut value: Bytes) -> Result { - EntryImpl::decode(&mut value) - } -} - -impl From<&EntryImpl> for BytesMut { - fn from(e: &EntryImpl) -> Self { - let size = e.encoded_size(); - let mut res = BytesMut::with_capacity(size); - e.encode_to(&mut res).unwrap(); // buffer is pre-allocated, so won't fail - res - } -} - -pub struct StreamImpl<'a> { - pub inner: SendableEntryStream<'a, EntryImpl, Error>, - pub start_entry_id: Id, -} - -impl<'a> Stream for StreamImpl<'a> { - type Item = Result, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_next(cx) - } -} - -impl<'a> EntryStream for StreamImpl<'a> { - type Error = Error; - type Entry = EntryImpl; - - fn start_id(&self) -> u64 { - self.start_entry_id - } -} - -#[cfg(test)] -mod tests { - use async_stream::stream; - use byteorder::{ByteOrder, LittleEndian}; - use futures::pin_mut; - use futures_util::StreamExt; - use tokio::time::Duration; - - use super::*; - use crate::fs::chunk::{Chunk, ChunkList}; - use crate::fs::crc::CRC_ALGO; - - #[test] - pub fn test_entry_deser() { - let data = "hello, world"; - let mut entry = EntryImpl::new(data.as_bytes(), 8, LocalNamespace::new(42)); - entry.epoch = 9; - let mut buf = BytesMut::with_capacity(entry.encoded_size()); - entry.encode_to(&mut buf).unwrap(); - assert_eq!(ENTRY_MIN_LEN + data.as_bytes().len(), buf.len()); - let decoded: EntryImpl = EntryImpl::decode(&mut buf.as_slice()).unwrap(); - assert_eq!(entry, decoded); - } - - #[test] - pub fn test_rewrite_entry_id() { - let data = "hello, world"; - let entry = EntryImpl::new(data.as_bytes(), 123, LocalNamespace::new(42)); - let mut buffer = BytesMut::with_capacity(entry.encoded_size()); - entry.encode_to(&mut buffer).unwrap(); - assert_eq!(123, entry.id()); - - // rewrite entry id. - LittleEndian::write_u64(&mut buffer[0..8], 333); - let len = buffer.len(); - let checksum = CRC_ALGO.checksum(&buffer[0..len - 4]); - LittleEndian::write_u32(&mut buffer[len - 4..], checksum); - - let entry_impl = EntryImpl::decode(&mut buffer.freeze()).expect("Failed to deserialize"); - assert_eq!(333, entry_impl.id()); - } - - fn prepare_entry_bytes(data: &str, id: Id) -> Bytes { - let mut entry = EntryImpl::new(data.as_bytes(), id, LocalNamespace::new(42)); - entry.set_id(123); - entry.set_offset(456); - let mut buffer = BytesMut::with_capacity(entry.encoded_size()); - entry.encode_to(&mut buffer).unwrap(); - let len = buffer.len(); - let checksum = CRC_ALGO.checksum(&buffer[0..len - 4]); - LittleEndian::write_u32(&mut buffer[len - 4..], checksum); - buffer.freeze() - } - - /// Test decode entry from a composite buffer. - #[test] - pub fn test_composite_buffer() { - let data_1 = "hello, world"; - let bytes = prepare_entry_bytes(data_1, 0); - EntryImpl::decode(&mut bytes.clone()).unwrap(); - let c1 = Chunk::copy_from_slice(&bytes); - - let data_2 = "LoremIpsumDolor"; - let bytes = prepare_entry_bytes(data_2, 1); - EntryImpl::decode(&mut bytes.clone()).unwrap(); - let c2 = Chunk::copy_from_slice(&bytes); - - let mut chunks = ChunkList::new(); - chunks.push(c1); - chunks.push(c2); - - assert_eq!( - ENTRY_MIN_LEN * 2 + data_2.len() + data_1.len(), - chunks.remaining_size() - ); - - let mut decoded = vec![]; - while chunks.remaining_size() > 0 { - let entry_impl = EntryImpl::decode(&mut chunks).unwrap(); - decoded.push(entry_impl.data); - } - - assert_eq!( - vec![data_1.as_bytes().to_vec(), data_2.as_bytes().to_vec()], - decoded - ); - } - - // split an encoded entry to two different chunk and try decode from this composite chunk - #[test] - pub fn test_decode_split_data_from_composite_chunk() { - let data = "hello, world"; - let bytes = prepare_entry_bytes(data, 42); - assert_eq!( - hex::decode("7B000000000000002A0000000000000000000000000000000C00000068656C6C6F2C20776F726C64E8EE2E57") - .unwrap() - .as_slice(), - &bytes[..] - ); - let original = EntryImpl::decode(&mut bytes.clone()).unwrap(); - let split_point = bytes.len() / 2; - let (left, right) = bytes.split_at(split_point); - - let mut chunks = ChunkList::new(); - chunks.push(Chunk::copy_from_slice(left)); - chunks.push(Chunk::copy_from_slice(right)); - - assert_eq!(bytes.len(), chunks.remaining_size()); - let decoded = EntryImpl::decode(&mut chunks).unwrap(); - assert_eq!(original, decoded); - } - - // Tests decode entry from encoded entry data as two chunks - #[tokio::test] - pub async fn test_decode_from_chunk_stream() { - // prepare entry - let data = "hello, world"; - let bytes = prepare_entry_bytes(data, 42); - assert_eq!( - hex::decode("7b000000000000002a0000000000000000000000000000000c00000068656c6c6f2c20776f726c64e8ee2e57") - .unwrap() - .as_slice(), - &bytes[..] - ); - let original = EntryImpl::decode(&mut bytes.clone()).unwrap(); - let split_point = bytes.len() / 2; - let (left, right) = bytes.split_at(split_point); - - // prepare chunk stream - let chunk_stream = stream!({ - yield Chunk::copy_from_slice(left); - tokio::time::sleep(Duration::from_millis(10)).await; - yield Chunk::copy_from_slice(right); - }); - - pin_mut!(chunk_stream); - - let mut chunks = ChunkList::new(); - let mut decoded = vec![]; - while let Some(c) = chunk_stream.next().await { - chunks.push(c); - match EntryImpl::decode(&mut chunks) { - Ok(e) => { - decoded.push(e); - } - Err(Error::DecodeAgain { .. }) => { - continue; - } - _ => { - panic!() - } - } - } - assert_eq!(1, decoded.len()); - assert_eq!(original, decoded.into_iter().next().unwrap()); - } -} diff --git a/src/log-store/src/fs/file.rs b/src/log-store/src/fs/file.rs deleted file mode 100644 index 132fbd337e..0000000000 --- a/src/log-store/src/fs/file.rs +++ /dev/null @@ -1,884 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::{Debug, Formatter}; -use std::fs::{File, OpenOptions}; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; - -use async_stream::stream; -use byteorder::{ByteOrder, LittleEndian}; -use bytes::{Bytes, BytesMut}; -use common_error::ext::BoxedError; -use common_telemetry::logging::{error, info}; -use common_telemetry::{debug, trace}; -use futures::Stream; -use futures_util::StreamExt; -use snafu::ResultExt; -use store_api::logstore::entry::{Encode, Entry, Id, Offset}; -use store_api::logstore::entry_stream::EntryStream; -use store_api::logstore::namespace::Namespace; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::{Receiver, Sender as MpscSender}; -use tokio::sync::oneshot::Sender as OneshotSender; -use tokio::sync::{oneshot, Notify}; -use tokio::task::JoinHandle; -use tokio::time; - -use crate::error::Error::Eof; -use crate::error::{ - AppendSnafu, Error, InternalSnafu, IoSnafu, OpenLogSnafu, Result, WaitWriteSnafu, WriteSnafu, -}; -use crate::fs::chunk::{Chunk, ChunkList}; -use crate::fs::config::LogConfig; -use crate::fs::crc::CRC_ALGO; -use crate::fs::entry::{EntryImpl, StreamImpl}; -use crate::fs::file_name::FileName; -use crate::fs::namespace::LocalNamespace; -use crate::fs::AppendResponseImpl; - -pub const CHUNK_SIZE: usize = 4096; -const LOG_WRITER_BATCH_SIZE: usize = 16; - -/// Wraps File operation to get rid of `&mut self` requirements -struct FileWriter { - inner: Arc, - path: String, -} - -impl FileWriter { - pub fn new(file: Arc, path: String) -> Self { - Self { inner: file, path } - } - - pub async fn write(&self, data: Bytes, offset: u64) -> Result<()> { - let file = self.inner.clone(); - let handle = common_runtime::spawn_blocking_write(move || { - crate::fs::io::pwrite_all(&file, &data, offset) - }); - handle.await.context(WriteSnafu)? - } - - /// Writes a batch of `AppendRequest` to file. - pub async fn write_batch(self: &Arc, batch: &Vec) -> Result { - let mut futures = Vec::with_capacity(batch.len()); - - let mut max_offset = 0; - for req in batch { - let offset = req.offset; - let end = req.data.len() + offset; - max_offset = max_offset.max(end); - let future = self.write(req.data.clone(), offset as u64); - futures.push(future); - } - debug!( - "Write batch, size: {}, max offset: {}", - batch.len(), - max_offset - ); - futures::future::join_all(futures) - .await - .into_iter() - .collect::>>() - .map(|_| max_offset) - } - - pub async fn flush(&self) -> Result<()> { - let file = self.inner.clone(); - common_runtime::spawn_blocking_write(move || file.sync_all().context(IoSnafu)) - .await - .context(WaitWriteSnafu)? - } - - pub async fn destroy(&self) -> Result<()> { - tokio::fs::remove_file(&self.path).await.context(IoSnafu)?; - Ok(()) - } -} - -pub type LogFileRef = Arc; - -pub struct LogFile { - // name of log file - name: FileName, - // file writer - writer: Arc, - // append request channel - pending_request_tx: Option>, - // flush task notifier - notify: Arc, - // flush task join handle - join_handle: Mutex>>>, - // internal state(offset, id counter...) - state: Arc, - // the start entry id of current log file - start_entry_id: u64, - // max file size of current log file - max_file_size: usize, - // buffer size for append request channel. read from config on start. - append_buffer_size: usize, -} - -impl Drop for LogFile { - fn drop(&mut self) { - self.state.stopped.store(true, Ordering::Relaxed); - info!("Dropping log file {}", self.name); - } -} - -impl LogFile { - /// Opens a file in path with given log config. - pub async fn open(path: impl Into, config: &LogConfig) -> Result { - let path = path.into(); - let file = OpenOptions::new() - .write(true) - .read(true) - .create(true) - .open(path.clone()) - .context(OpenLogSnafu { file_name: &path })?; - - let file_name = FileName::try_from(path.as_str())?; - let start_entry_id = file_name.entry_id(); - - let mut log = Self { - name: file_name, - writer: Arc::new(FileWriter::new(Arc::new(file), path.clone())), - start_entry_id, - pending_request_tx: None, - notify: Arc::new(Notify::new()), - max_file_size: config.max_log_file_size, - join_handle: Mutex::new(None), - state: Arc::new(State::default()), - append_buffer_size: config.append_buffer_size, - }; - - let metadata = log.writer.inner.metadata().context(IoSnafu)?; - let expect_length = metadata.len() as usize; - log.state - .write_offset - .store(expect_length, Ordering::Relaxed); - log.state - .flush_offset - .store(expect_length, Ordering::Relaxed); - - let replay_start_time = time::Instant::now(); - let (actual_offset, next_entry_id) = log.replay().await?; - - info!( - "Log file {} replay finished, last offset: {}, file start entry id: {}, elapsed time: {}ms", - path, actual_offset, start_entry_id, - time::Instant::now().duration_since(replay_start_time).as_millis() - ); - - log.state - .write_offset - .store(actual_offset, Ordering::Relaxed); - log.state - .flush_offset - .store(actual_offset, Ordering::Relaxed); - log.state - .last_entry_id - .store(next_entry_id, Ordering::Relaxed); - Ok(log) - } - - /// Returns the persisted size of current log file. - #[inline] - pub fn persisted_size(&self) -> usize { - self.state.flush_offset() - } - - /// Starts log file and it's internal components(including flush task, etc.). - pub async fn start(&mut self) -> Result<()> { - let notify = self.notify.clone(); - let writer = self.writer.clone(); - let state = self.state.clone(); - - let (tx, mut rx) = tokio::sync::mpsc::channel(self.append_buffer_size); - - let handle = tokio::spawn(async move { - while !state.is_stopped() { - let batch = Self::recv_batch(&mut rx, &state, ¬ify, true).await; - debug!("Receive write request, size: {}", batch.len()); - if !batch.is_empty() { - Self::handle_batch(batch, &state, &writer).await; - } - } - - // log file stopped - let batch = Self::recv_batch(&mut rx, &state, ¬ify, false).await; - if !batch.is_empty() { - Self::handle_batch(batch, &state, &writer).await; - } - info!("Writer task finished"); - Ok(()) - }); - - self.pending_request_tx = Some(tx); - *self.join_handle.lock().unwrap() = Some(handle); - info!("Flush task started: {}", self.name); - Ok(()) - } - - /// Stops log file. - /// # Panics - /// Panics when a log file is stopped while not being started ever. - pub async fn stop(&self) -> Result<()> { - self.state.stopped.store(true, Ordering::Release); - let join_handle = self - .join_handle - .lock() - .unwrap() - .take() - .expect("Join handle should present"); - self.notify.notify_waiters(); - let res = join_handle.await.unwrap(); - info!("LogFile task finished: {:?}", res); - res - } - - pub async fn destroy(&self) -> Result<()> { - self.writer.destroy().await?; - Ok(()) - } - - async fn handle_batch( - mut batch: Vec, - state: &Arc, - writer: &Arc, - ) { - // preserve previous write offset - let prev_write_offset = state.write_offset(); - - let mut last_id = 0; - for mut req in &mut batch { - req.offset = state - .write_offset - .fetch_add(req.data.len(), Ordering::AcqRel); - last_id = req.id; - debug!("Entry id: {}, offset: {}", req.id, req.offset,); - } - - match writer.write_batch(&batch).await { - Ok(max_offset) => match writer.flush().await { - Ok(_) => { - let prev_ofs = state.flush_offset.swap(max_offset, Ordering::Acquire); - let prev_id = state.last_entry_id.swap(last_id, Ordering::Acquire); - debug!( - "Flush offset: {} -> {}, max offset in batch: {}, entry id: {}->{}", - prev_ofs, - state.flush_offset.load(Ordering::Acquire), - max_offset, - prev_id, - state.last_entry_id.load(Ordering::Acquire), - ); - batch.into_iter().for_each(AppendRequest::complete); - } - Err(e) => { - error!(e; "Failed to flush log file"); - batch.into_iter().for_each(|r| r.fail()); - state - .write_offset - .store(prev_write_offset, Ordering::Release); - } - }, - Err(e) => { - error!(e; "Failed to write append requests"); - batch.into_iter().for_each(|r| r.fail()); - state - .write_offset - .store(prev_write_offset, Ordering::Release); - } - } - } - - async fn recv_batch( - rx: &mut Receiver, - state: &Arc, - notify: &Arc, - wait_on_empty: bool, - ) -> Vec { - let mut batch: Vec = Vec::with_capacity(LOG_WRITER_BATCH_SIZE); - for _ in 0..LOG_WRITER_BATCH_SIZE { - match rx.try_recv() { - Ok(req) => { - batch.push(req); - } - Err(e) => match e { - TryRecvError::Empty => { - if batch.is_empty() && wait_on_empty { - notify.notified().await; - if state.is_stopped() { - break; - } - } else { - break; - } - } - TryRecvError::Disconnected => { - error!("Channel unexpectedly disconnected!"); - break; - } - }, - } - } - batch - } - - #[inline] - pub fn start_entry_id(&self) -> Id { - self.start_entry_id - } - - /// Replays current file til last entry read - pub async fn replay(&mut self) -> Result<(usize, Id)> { - let log_name = self.name.to_string(); - let previous_offset = self.state.flush_offset(); - let ns = LocalNamespace::default(); - let mut stream = self.create_stream( - // TODO(hl): LocalNamespace should be filled - &ns, 0, - ); - - let mut last_offset = 0usize; - let mut last_entry_id: Option = None; - while let Some(res) = stream.next().await { - match res { - Ok(entries) => { - for e in entries { - last_offset += e.len(); - last_entry_id = Some(e.id()); - } - } - Err(e) => { - error!(e; "Error while replay log {}", log_name); - break; - } - } - } - info!( - "Replay log {} finished, offset: {} -> {}, last entry id: {:?}", - log_name, previous_offset, last_offset, last_entry_id - ); - Ok((last_offset, last_entry_id.unwrap_or(self.start_entry_id))) - } - - /// Creates a reader stream that asynchronously generates entries start from given entry id. - /// ### Notice - /// If the entry with start entry id is not present, the first generated entry will start with - /// the first entry with an id greater than `start_entry_id`. - pub fn create_stream( - &self, - _ns: &impl Namespace, - start_entry_id: u64, - ) -> impl EntryStream + '_ { - let length = self.state.flush_offset.load(Ordering::Relaxed); - - let mut chunk_stream = file_chunk_stream(self.writer.inner.clone(), 0, length, 0); - let entry_stream = stream!({ - let mut chunks = ChunkList::new(); - while let Some(chunk) = chunk_stream.next().await { - let chunk = chunk.unwrap(); - chunks.push(chunk); - let mut batch = vec![]; - loop { - match EntryImpl::decode(&mut chunks) { - Ok(e) => { - if e.id() >= start_entry_id { - batch.push(e); - } - } - Err(Error::DecodeAgain { .. }) => { - // no more data for decoding - break; - } - Err(e) => { - yield Err(e); - break; - } - } - } - trace!("Yield batch size: {}", batch.len()); - yield Ok(batch); - } - }); - - StreamImpl { - inner: Box::pin(entry_stream), - start_entry_id, - } - } - - /// Appends an entry to `LogFile` and return a `Result` containing the id of entry appended. - pub async fn append(&self, e: &mut T) -> Result - where - T: Encode, - { - if self.state.is_stopped() { - return Err(Error::Eof); - } - let entry_id = e.id(); - let mut serialized = BytesMut::with_capacity(e.encoded_size()); - e.encode_to(&mut serialized) - .map_err(BoxedError::new) - .context(AppendSnafu)?; - let size = serialized.len(); - - if size + self.state.write_offset() > self.max_file_size { - return Err(Error::Eof); - } - - // rewrite encoded data - LittleEndian::write_u64(&mut serialized[0..8], entry_id); - let checksum = CRC_ALGO.checksum(&serialized[0..size - 4]); - LittleEndian::write_u32(&mut serialized[size - 4..], checksum); - - let (tx, rx) = oneshot::channel(); - self.pending_request_tx - .as_ref() - .expect("Call start before write to LogFile!") - .send(AppendRequest { - data: serialized.freeze(), - tx, - offset: 0, - id: entry_id, - }) - .await - .map_err(|_| { - InternalSnafu { - msg: "Send append request", - } - .build() - })?; - - self.notify.notify_one(); // notify write thread. - - rx.await - .expect("Sender dropped while waiting for append result") - .map_err(|_| { - InternalSnafu { - msg: "Failed to write request".to_string(), - } - .build() - }) - } - - #[inline] - pub fn try_seal(&self) -> bool { - self.state - .sealed - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - } - - #[inline] - pub fn is_seal(&self) -> bool { - self.state.sealed.load(Ordering::Acquire) - } - - #[inline] - pub fn is_stopped(&self) -> bool { - self.state.stopped.load(Ordering::Acquire) - } - - #[inline] - pub fn unseal(&self) { - self.state.sealed.store(false, Ordering::Release); - } - - #[inline] - pub fn file_name(&self) -> String { - self.name.to_string() - } - - #[inline] - pub fn last_entry_id(&self) -> Id { - self.state.last_entry_id.load(Ordering::Acquire) - } -} - -impl Debug for LogFile { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LogFile") - .field("name", &self.name) - .field("start_entry_id", &self.start_entry_id) - .field("max_file_size", &self.max_file_size) - .field("state", &self.state) - .finish() - } -} - -#[derive(Debug)] -pub(crate) struct AppendRequest { - tx: OneshotSender>, - offset: Offset, - id: Id, - data: Bytes, -} - -impl AppendRequest { - #[inline] - pub fn complete(self) { - let _ = self.tx.send(Ok(AppendResponseImpl { - offset: self.offset, - entry_id: self.id, - })); - } - - #[inline] - pub fn fail(self) { - let _ = self.tx.send(Err(())); - } -} - -#[derive(Default, Debug)] -struct State { - write_offset: AtomicUsize, - flush_offset: AtomicUsize, - last_entry_id: AtomicU64, - sealed: AtomicBool, - stopped: AtomicBool, -} - -impl State { - #[inline] - pub fn is_stopped(&self) -> bool { - self.stopped.load(Ordering::Acquire) - } - - #[inline] - pub fn write_offset(&self) -> usize { - self.write_offset.load(Ordering::Acquire) - } - - #[inline] - pub fn flush_offset(&self) -> usize { - self.flush_offset.load(Ordering::Acquire) - } -} - -type SendableChunkStream = Pin> + Send>>; - -/// Creates a stream of chunks of data from file. If `buffer_size` is not 0, the returned stream -/// will have a bounded buffer and a background thread will do prefetching. When consumer cannot -/// catch up with spawned prefetch loop, the prefetch thread will be blocked and wait until buffer -/// has enough capacity. -/// -/// If the `buffer_size` is 0, there will not be a prefetching thread. File chunks will not be read -/// until stream consumer asks for next chunk. -fn file_chunk_stream( - file: Arc, - mut offset: usize, - file_size: usize, - buffer_size: usize, -) -> SendableChunkStream { - if buffer_size == 0 { - return file_chunk_stream_sync(file, offset, file_size); - } - - let (tx, mut rx) = tokio::sync::mpsc::channel(buffer_size); - common_runtime::spawn_blocking_read(move || loop { - if offset >= file_size { - return; - } - match read_at(&file, offset, file_size) { - Ok(data) => { - let data_len = data.len(); - if tx.blocking_send(Ok(data)).is_err() { - break; - } - offset += data_len; - continue; - } - Err(e) => { - error!(e; "Failed to read file chunk"); - // we're going to break any way so just forget the join result. - let _ = tx.blocking_send(Err(e)); - break; - } - } - }); - Box::pin(stream!({ - while let Some(v) = rx.recv().await { - yield v; - } - })) -} - -fn file_chunk_stream_sync( - file: Arc, - mut offset: usize, - file_size: usize, -) -> SendableChunkStream { - let s = stream!({ - loop { - if offset >= file_size { - return; - } - match read_at(&file, offset, file_size) { - Ok(data) => { - let data_len = data.len(); - yield Ok(data); - offset += data_len; - continue; - } - Err(e) => { - error!(e; "Failed to read file chunk"); - yield Err(e); - break; - } - } - } - }); - - Box::pin(s) -} - -/// Reads a chunk of data from file in a blocking manner. -/// The file may not contain enough data to fulfill the whole chunk so only data available -/// is read into chunk. The `write` field of `Chunk` indicates the end of valid data. -fn read_at(file: &Arc, offset: usize, file_length: usize) -> Result { - if offset > file_length { - return Err(Eof); - } - let size = CHUNK_SIZE.min(file_length - offset); - let mut data = Box::new([0u8; CHUNK_SIZE]); - crate::fs::io::pread_exact(file.as_ref(), &mut data[0..size], offset as u64)?; - Ok(Chunk::new(data, size)) -} - -#[cfg(test)] -mod tests { - use std::io::Read; - - use common_telemetry::logging; - use futures::pin_mut; - use futures_util::StreamExt; - use tempdir::TempDir; - use tokio::io::AsyncWriteExt; - - use super::*; - use crate::fs::namespace::LocalNamespace; - - #[tokio::test] - pub async fn test_create_entry_stream() { - logging::init_default_ut_logging(); - let config = LogConfig::default(); - - let dir = TempDir::new("greptimedb-store-test").unwrap(); - let path_buf = dir.path().join("0010.log"); - let path = path_buf.to_str().unwrap().to_string(); - File::create(path.as_str()).unwrap(); - - let mut file = LogFile::open(path.clone(), &config) - .await - .unwrap_or_else(|_| panic!("Failed to open file: {path}")); - file.start().await.expect("Failed to start log file"); - - assert_eq!( - 10, - file.append(&mut EntryImpl::new( - "test1".as_bytes(), - 10, - LocalNamespace::new(42) - )) - .await - .expect("Failed to append entry 1") - .entry_id - ); - - assert_eq!( - 11, - file.append(&mut EntryImpl::new( - "test-2".as_bytes(), - 11, - LocalNamespace::new(42) - )) - .await - .expect("Failed to append entry 2") - .entry_id - ); - - let mut log_file = std::fs::File::open(path.clone()).expect("Test log file does not exist"); - let metadata = log_file.metadata().expect("Failed to read file metadata"); - info!("Log file metadata: {:?}", metadata); - - assert_eq!(75, metadata.len()); // 32+5+32+6 - let mut content = vec![0; metadata.len() as usize]; - log_file - .read_exact(&mut content) - .expect("Read log file failed"); - - info!( - "Log file {:?} content: {}, size:{}", - dir, - hex::encode(content), - metadata.len() - ); - - let ns = LocalNamespace::new(42); - let mut stream = file.create_stream(&ns, 0); - let mut data = vec![]; - - while let Some(v) = stream.next().await { - let entries = v.unwrap(); - for e in entries { - let vec = e.data().to_vec(); - info!("Read entry: {}", String::from_utf8_lossy(&vec)); - data.push(String::from_utf8(vec).unwrap()); - } - } - - assert_eq!(vec!["test1".to_string(), "test-2".to_string()], data); - drop(stream); - - let result = file.stop().await; - info!("Stop file res: {:?}", result); - } - - #[tokio::test] - pub async fn test_read_at() { - let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); - let file_path = dir.path().join("chunk-stream-file-test"); - let mut file = tokio::fs::OpenOptions::new() - .create(true) - .write(true) - .read(true) - .open(&file_path) - .await - .unwrap(); - file.write_all("1234567890ab".as_bytes()).await.unwrap(); - file.flush().await.unwrap(); - - let file = Arc::new(file.into_std().await); - let result = read_at(&file, 0, 12).unwrap(); - - assert_eq!(12, result.len()); - assert_eq!("1234567890ab".as_bytes(), &result.data[0..result.len()]); - } - - #[tokio::test] - pub async fn test_read_at_center() { - let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); - let file_path = dir.path().join("chunk-stream-file-test-center"); - let mut file = tokio::fs::OpenOptions::new() - .create(true) - .write(true) - .read(true) - .open(&file_path) - .await - .unwrap(); - file.write_all("1234567890ab".as_bytes()).await.unwrap(); - file.flush().await.unwrap(); - - let file_len = file.metadata().await.unwrap().len(); - let file = Arc::new(file.into_std().await); - let result = read_at(&file, 8, file_len as usize).unwrap(); - assert_eq!(4, result.len()); - assert_eq!("90ab".as_bytes(), &result.data[0..result.len()]); - } - - #[tokio::test] - pub async fn test_file_chunk_stream() { - let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); - let file_path = dir.path().join("chunk-stream-file-test"); - let mut file = tokio::fs::OpenOptions::new() - .create(true) - .write(true) - .read(true) - .open(&file_path) - .await - .unwrap(); - file.write_all(&vec![42].repeat(4096 + 1024)).await.unwrap(); - file.flush().await.unwrap(); - - let file_size = file.metadata().await.unwrap().len(); - let file = Arc::new(file.into_std().await); - let stream = file_chunk_stream(file, 0, file_size as usize, 1024); - pin_mut!(stream); - - let mut chunks = vec![]; - while let Some(r) = stream.next().await { - chunks.push(r.unwrap()); - } - assert_eq!( - vec![4096, 1024], - chunks.iter().map(|c| c.write_offset).collect::>() - ); - assert_eq!( - vec![vec![42].repeat(4096), vec![42].repeat(1024)], - chunks - .iter() - .map(|c| &c.data[0..c.write_offset]) - .collect::>() - ); - } - - #[tokio::test] - pub async fn test_sync_chunk_stream() { - let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); - let file_path = dir.path().join("chunk-stream-file-test"); - let mut file = tokio::fs::OpenOptions::new() - .create(true) - .write(true) - .read(true) - .open(&file_path) - .await - .unwrap(); - file.write_all(&vec![42].repeat(4096 + 1024)).await.unwrap(); - file.flush().await.unwrap(); - - let file_size = file.metadata().await.unwrap().len(); - let file = Arc::new(file.into_std().await); - let stream = file_chunk_stream_sync(file, 0, file_size as usize); - pin_mut!(stream); - - let mut chunks = vec![]; - while let Some(r) = stream.next().await { - chunks.push(r.unwrap()); - } - assert_eq!( - vec![4096, 1024], - chunks.iter().map(|c| c.write_offset).collect::>() - ); - assert_eq!( - vec![vec![42].repeat(4096), vec![42].repeat(1024)], - chunks - .iter() - .map(|c| &c.data[0..c.write_offset]) - .collect::>() - ); - } - - #[tokio::test] - async fn test_shutdown() { - logging::init_default_ut_logging(); - let config = LogConfig::default(); - let dir = TempDir::new("greptimedb-store-test").unwrap(); - let path_buf = dir.path().join("0010.log"); - let path = path_buf.to_str().unwrap().to_string(); - File::create(path.as_str()).unwrap(); - - let mut file = LogFile::open(path.clone(), &config) - .await - .unwrap_or_else(|_| panic!("Failed to open file: {path}")); - - let state = file.state.clone(); - file.start().await.unwrap(); - drop(file); - - assert!(state.stopped.load(Ordering::Relaxed)); - } -} diff --git a/src/log-store/src/fs/file_name.rs b/src/log-store/src/fs/file_name.rs deleted file mode 100644 index 5555138039..0000000000 --- a/src/log-store/src/fs/file_name.rs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::{Display, Formatter}; -use std::path::Path; - -use snafu::OptionExt; -use store_api::logstore::entry::Id; - -use crate::error::{Error, FileNameIllegalSnafu, SuffixIllegalSnafu}; -use crate::fs::file_name::FileName::Log; - -/// FileName represents the file name with padded leading zeros. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum FileName { - // File name with .log as suffix. - Log(Id), -} - -impl TryFrom<&str> for FileName { - type Error = Error; - - fn try_from(p: &str) -> Result { - let path = Path::new(p); - - let extension = - path.extension() - .and_then(|s| s.to_str()) - .with_context(|| FileNameIllegalSnafu { - file_name: path.to_string_lossy(), - })?; - - let id: u64 = path - .file_stem() - .and_then(|s| s.to_str()) - .and_then(|s| s.parse::().ok()) - .with_context(|| FileNameIllegalSnafu { - file_name: p.to_string(), - })?; - - Self::new_with_suffix(id, extension) - } -} - -impl From for FileName { - fn from(entry_id: u64) -> Self { - Self::log(entry_id) - } -} - -impl FileName { - pub fn log(entry_id: Id) -> Self { - Log(entry_id) - } - - pub fn new_with_suffix(entry_id: Id, suffix: &str) -> Result { - match suffix { - "log" => Ok(Log(entry_id)), - _ => SuffixIllegalSnafu { suffix }.fail(), - } - } - - pub fn entry_id(&self) -> Id { - match self { - Log(id) => *id, - } - } - - fn suffix(&self) -> &str { - match self { - Log(_) => ".log", - } - } -} - -impl Display for FileName { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{:020}{}", self.entry_id(), self.suffix()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - pub fn test_padding_file_name() { - let id = u64::MIN; - assert_eq!("00000000000000000000", format!("{id:020}")); - let id = 123u64; - assert_eq!("00000000000000000123", format!("{id:020}")); - let id = 123123123123u64; - assert_eq!("00000000123123123123", format!("{id:020}")); - let id = u64::MAX; - assert_eq!(u64::MAX.to_string(), format!("{id:020}")); - } - - #[test] - pub fn test_file_name_to_string() { - assert_eq!("00000000000000000000.log", FileName::log(0).to_string()); - assert_eq!( - u64::MAX.to_string() + ".log", - FileName::log(u64::MAX).to_string() - ); - } - - #[test] - pub fn test_parse_file_name() { - let path = "/path/to/any/01010010000.log"; - let parsed = FileName::try_from(path).expect("Failed to parse file name"); - assert_eq!(1010010000u64, parsed.entry_id()); - assert_eq!(".log", parsed.suffix()); - } -} diff --git a/src/log-store/src/fs/index.rs b/src/log-store/src/fs/index.rs deleted file mode 100644 index 2d1a0da580..0000000000 --- a/src/log-store/src/fs/index.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::sync::RwLock; - -use store_api::logstore::entry::{Id, Offset}; - -use crate::error::Result; -use crate::fs::file_name::FileName; - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct Location { - pub file_name: FileName, - pub offset: Offset, -} - -#[allow(dead_code)] -impl Location { - pub fn new(file_name: FileName, offset: Offset) -> Self { - Self { file_name, offset } - } -} - -/// In-memory entry id to offset index. -pub trait EntryIndex { - /// Add entry id to offset mapping. - fn add_entry_id(&self, id: Id, loc: Location) -> Option; - - /// Find offset by entry id. - fn find_offset_by_id(&self, id: Id) -> Result>; -} - -pub struct MemoryIndex { - map: RwLock>, -} - -#[allow(dead_code)] -impl MemoryIndex { - pub fn new() -> Self { - Self { - map: RwLock::new(BTreeMap::new()), - } - } -} - -impl EntryIndex for MemoryIndex { - fn add_entry_id(&self, id: Id, loc: Location) -> Option { - self.map.write().unwrap().insert(id, loc) - } - - fn find_offset_by_id(&self, id: Id) -> Result> { - Ok(self.map.read().unwrap().get(&id).cloned()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - pub fn test_entry() { - let index = MemoryIndex::new(); - index.add_entry_id(1, Location::new(FileName::log(0), 1)); - assert_eq!( - Location::new(FileName::log(0), 1), - index.find_offset_by_id(1).unwrap().unwrap() - ); - assert_eq!(None, index.find_offset_by_id(2).unwrap()); - } -} diff --git a/src/log-store/src/fs/io/fallback.rs b/src/log-store/src/fs/io/fallback.rs deleted file mode 100644 index a56ca4538a..0000000000 --- a/src/log-store/src/fs/io/fallback.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::convert::TryFrom; -use std::fs::File; - -use snafu::ResultExt; - -use crate::error::Error; - -// TODO(hl): Implement pread/pwrite for non-Unix platforms -pub fn pread_exact(file: &File, _buf: &mut [u8], _offset: u64) -> Result<(), Error> { - unimplemented!() -} - -pub fn pwrite_all(file: &File, _buf: &[u8], _offset: u64) -> Result<(), Error> { - unimplemented!() -} diff --git a/src/log-store/src/fs/io/unix.rs b/src/log-store/src/fs/io/unix.rs deleted file mode 100644 index f0936ada2c..0000000000 --- a/src/log-store/src/fs/io/unix.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fs::File; -use std::os::unix::fs::FileExt; - -use snafu::ResultExt; - -use crate::error::{Error, IoSnafu}; - -pub fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> Result<(), Error> { - file.read_exact_at(buf, offset).context(IoSnafu) -} - -pub fn pwrite_all(file: &File, buf: &[u8], offset: u64) -> Result<(), Error> { - file.write_all_at(buf, offset).context(IoSnafu) -} diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs deleted file mode 100644 index 96576b330d..0000000000 --- a/src/log-store/src/fs/log.rs +++ /dev/null @@ -1,770 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{BTreeMap, HashMap}; -use std::path::Path; -use std::sync::Arc; - -use arc_swap::ArcSwap; -use async_stream::stream; -use common_telemetry::{debug, error, info, warn}; -use futures::{pin_mut, StreamExt}; -use snafu::{OptionExt, ResultExt}; -use store_api::logstore::entry::{Encode, Entry, Id}; -use store_api::logstore::entry_stream::SendableEntryStream; -use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; -use store_api::logstore::LogStore; -use tokio::sync::{Mutex, RwLock}; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; - -use crate::error::{ - CreateDirSnafu, DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu, - InvalidStateSnafu, IoSnafu, ReadPathSnafu, Result, WaitGcTaskStopSnafu, -}; -use crate::fs::config::LogConfig; -use crate::fs::entry::EntryImpl; -use crate::fs::file::{LogFile, LogFileRef}; -use crate::fs::file_name::FileName; -use crate::fs::namespace::LocalNamespace; -use crate::fs::AppendResponseImpl; - -type FileMap = BTreeMap; - -#[derive(Debug)] -pub struct LocalFileLogStore { - files: Arc>, - active: ArcSwap, - config: LogConfig, - obsolete_ids: Arc>>, - cancel_token: Mutex>, - gc_task_handle: Mutex>>, -} - -impl LocalFileLogStore { - /// Opens a directory as log store directory, initialize directory if it is empty. - pub async fn open(config: &LogConfig) -> Result { - // Create the log directory if missing. - tokio::fs::create_dir_all(&config.log_file_dir) - .await - .context(CreateDirSnafu { - path: &config.log_file_dir, - })?; - - let mut files = Self::load_dir(&config.log_file_dir, config).await?; - - if files.is_empty() { - Self::init_on_empty(&mut files, config).await?; - info!("Initialized log store directory: {}", config.log_file_dir) - } - - let id = *files.keys().max().context(InternalSnafu { - msg: format!( - "log store directory is empty after initialization: {}", - config.log_file_dir - ), - })?; - - info!( - "Successfully loaded log store directory, files: {:?}", - files - ); - - let active_file = files - .get_mut(&id) - .expect("Not expected to fail when initing log store"); - - active_file.unseal(); - let active_file_name = active_file.file_name(); - info!("Log store active log file: {}", active_file_name); - - // Start active log file - Arc::get_mut(active_file) - .with_context(|| InternalSnafu { - msg: format!( - "Concurrent modification on log store {active_file_name} start is not allowed" - ), - })? - .start() - .await?; - info!( - "Successfully started current active file: {}", - active_file_name - ); - - let active_file_cloned = active_file.clone(); - Ok(Self { - files: Arc::new(RwLock::new(files)), - active: ArcSwap::new(active_file_cloned), - config: config.clone(), - obsolete_ids: Arc::new(Default::default()), - cancel_token: Mutex::new(None), - gc_task_handle: Mutex::new(None), - }) - } - - pub async fn init_on_empty(files: &mut FileMap, config: &LogConfig) -> Result<()> { - let path = Path::new(&config.log_file_dir).join(FileName::log(0).to_string()); - let file_path = path.to_str().context(FileNameIllegalSnafu { - file_name: config.log_file_dir.clone(), - })?; - let file = LogFile::open(file_path, config).await?; - files.insert(0, Arc::new(file)); - Ok(()) - } - - pub async fn load_dir(path: impl AsRef, config: &LogConfig) -> Result { - let mut map = FileMap::new(); - let mut dir = tokio::fs::read_dir(Path::new(path.as_ref())) - .await - .context(ReadPathSnafu { - path: path.as_ref(), - })?; - - while let Some(f) = dir.next_entry().await.context(IoSnafu)? { - let path_buf = f.path(); - let path = path_buf.to_str().context(FileNameIllegalSnafu { - file_name: path.as_ref().to_string(), - })?; - let file_name = FileName::try_from(path)?; - let start_id = file_name.entry_id(); - let file = LogFile::open(path, config).await?; - info!("Load log store file {}: {:?}", start_id, file); - if map.contains_key(&start_id) { - error!("Log file with start entry id: {start_id} already exists"); - return DuplicateFileSnafu { - msg: format!("File with start id: {start_id} duplicates on start"), - } - .fail(); - } - file.try_seal(); - map.insert(start_id, Arc::new(file)); - } - Ok(map) - } - - /// Mark current active file as closed and create a new log file for writing. - async fn roll_next(&self, active: LogFileRef) -> Result<()> { - // acquires lock - let mut files = self.files.write().await; - - // if active is already sealed, then just return. - if active.is_seal() { - return Ok(()); - } - - // create and start a new log file - let next_entry_id = active.last_entry_id() + 1; - let path_buf = - Path::new(&self.config.log_file_dir).join(FileName::log(next_entry_id).to_string()); - let path = path_buf.to_str().context(FileNameIllegalSnafu { - file_name: self.config.log_file_dir.clone(), - })?; - - let mut new_file = LogFile::open(path, &self.config).await?; - new_file.start().await?; - - let new_file = Arc::new(new_file); - files.insert(new_file.start_entry_id(), new_file.clone()); - - self.active.swap(new_file); - active.try_seal(); - tokio::spawn(async move { - active.stop().await.unwrap(); - info!("Sealed log file {} stopped.", active.file_name()); - }); - Ok(()) // release lock - } - - pub fn active_file(&self) -> Arc { - self.active.load().clone() - } -} - -async fn gc( - files: Arc>, - obsolete_ids: Arc>>, -) -> Result<()> { - if let Some(lowest) = find_lowest_id(obsolete_ids).await { - gc_inner(files, lowest).await - } else { - Ok(()) - } -} - -async fn find_lowest_id(obsolete_ids: Arc>>) -> Option { - let mut lowest_obsolete = None; - { - let obsolete_ids = obsolete_ids.read().await; - for (ns, id) in obsolete_ids.iter() { - if *id <= *lowest_obsolete.get_or_insert(*id) { - lowest_obsolete = Some(*id); - debug!("Current lowest obsolete id: {}, namespace: {:?}", *id, ns); - } - } - } - lowest_obsolete -} - -async fn gc_inner(files: Arc>, obsolete_id: u64) -> Result<()> { - let mut files = files.write().await; - let files_to_delete = find_files_to_delete(&files, obsolete_id); - info!( - "Compacting log file up to entry id: {}, files to delete: {:?}", - obsolete_id, files_to_delete - ); - for entry_id in files_to_delete { - if let Some(f) = files.remove(&entry_id) { - if !f.is_stopped() { - f.stop().await?; - } - f.destroy().await?; - info!("Destroyed log file: {}", f.file_name()); - } - } - Ok(()) -} - -fn find_files_to_delete(offset_map: &BTreeMap, entry_id: u64) -> Vec { - let mut res = vec![]; - for (cur, next) in offset_map.keys().zip(offset_map.keys().skip(1)) { - if *cur < entry_id && *next <= entry_id { - res.push(*cur); - } - } - res -} - -#[async_trait::async_trait] -impl LogStore for LocalFileLogStore { - type Error = Error; - type Namespace = LocalNamespace; - type Entry = EntryImpl; - type AppendResponse = AppendResponseImpl; - - async fn start(&self) -> Result<()> { - let files = self.files.clone(); - let obsolete_ids = self.obsolete_ids.clone(); - let interval = self.config.gc_interval; - let token = tokio_util::sync::CancellationToken::new(); - let child = token.child_token(); - - let handle = common_runtime::spawn_bg(async move { - loop { - if let Err(e) = gc(files.clone(), obsolete_ids.clone()).await { - error!(e; "Failed to gc log store"); - } - - tokio::select! { - _ = tokio::time::sleep(interval) => {} - _ = child.cancelled() => { - info!("LogStore gc task has been cancelled"); - return; - } - } - } - }); - - *self.gc_task_handle.lock().await = Some(handle); - *self.cancel_token.lock().await = Some(token); - Ok(()) - } - - async fn stop(&self) -> Result<()> { - let handle = self - .gc_task_handle - .lock() - .await - .take() - .context(InvalidStateSnafu { - msg: "Logstore gc task not spawned", - })?; - let token = self - .cancel_token - .lock() - .await - .take() - .context(InvalidStateSnafu { - msg: "Logstore gc task not spawned", - })?; - token.cancel(); - Ok(handle.await.context(WaitGcTaskStopSnafu)?) - } - - async fn append(&self, mut entry: Self::Entry) -> Result { - // TODO(hl): configurable retry times - for _ in 0..3 { - let current_active_file = self.active_file(); - match current_active_file.append(&mut entry).await { - Ok(r) => return Ok(r), - Err(e) => match e { - Error::Eof => { - self.roll_next(current_active_file.clone()).await?; - info!( - "Rolled to next file, retry append, entry size: {}", - entry.encoded_size() - ); - continue; - } - Error::Internal { .. } => { - warn!("File closed, try new file"); - continue; - } - _ => { - error!(e; "Failed to roll to next log file"); - return Err(e); - } - }, - } - } - - return InternalSnafu { - msg: "Failed to append entry with max retry time exceeds", - } - .fail(); - } - - async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec) -> Result { - todo!() - } - - async fn read( - &self, - ns: &Self::Namespace, - id: Id, - ) -> Result> { - let files = self.files.read().await; - let ns = ns.clone(); - - let s = stream!({ - for (start_id, file) in files.iter() { - // TODO(hl): Use index to lookup file - if *start_id <= id { - let s = file.create_stream(&ns, id); - pin_mut!(s); - while let Some(entries) = s.next().await { - match entries { - Ok(entries) => { - yield Ok(entries - .into_iter() - .filter(|e| e.namespace().id() == ns.id()) - .collect::>()) - } - Err(e) => yield Err(e), - } - } - } - } - }); - - Ok(Box::pin(s)) - } - - async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { - todo!() - } - - async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { - todo!() - } - - async fn list_namespaces(&self) -> Result> { - todo!() - } - - fn entry>(&self, data: D, id: Id, namespace: Self::Namespace) -> Self::Entry { - EntryImpl::new(data, id, namespace) - } - - fn namespace(&self, id: NamespaceId) -> Self::Namespace { - LocalNamespace::new(id) - } - - async fn obsolete( - &self, - namespace: Self::Namespace, - id: Id, - ) -> std::result::Result<(), Self::Error> { - info!("Mark namespace obsolete entry id, {:?}:{}", namespace, id); - let mut map = self.obsolete_ids.write().await; - let prev = map.insert(namespace, id); - info!("Prev: {:?}", prev); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::time::Duration; - - use futures_util::StreamExt; - use rand::distributions::Alphanumeric; - use rand::Rng; - use store_api::logstore::entry::Entry; - use tempdir::TempDir; - - use super::*; - - #[tokio::test] - pub async fn test_roll_file() { - common_telemetry::logging::init_default_ut_logging(); - let dir = TempDir::new("greptimedb1").unwrap(); - let config = LogConfig { - append_buffer_size: 128, - max_log_file_size: 128, - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }; - - let logstore = LocalFileLogStore::open(&config).await.unwrap(); - assert_eq!( - 0, - logstore - .append(EntryImpl::new( - generate_data(96), - 0, - LocalNamespace::new(42) - ),) - .await - .unwrap() - .entry_id - ); - - assert_eq!( - 1, - logstore - .append(EntryImpl::new( - generate_data(96), - 1, - LocalNamespace::new(42) - )) - .await - .unwrap() - .entry_id - ); - } - - fn generate_data(size: usize) -> Vec { - let s: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(size) - .map(char::from) - .collect(); - s.into_bytes() - } - - #[tokio::test] - pub async fn test_write_and_read_data() { - common_telemetry::logging::init_default_ut_logging(); - let dir = TempDir::new("greptimedb2").unwrap(); - - let dir_str = dir.path().to_string_lossy().to_string(); - info!("dir: {}", dir_str); - - let config = LogConfig { - append_buffer_size: 128, - max_log_file_size: 128, - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }; - let logstore = LocalFileLogStore::open(&config).await.unwrap(); - let ns = LocalNamespace::new(42); - let id = logstore - .append(EntryImpl::new( - generate_data(96), - 0, - LocalNamespace::new(42), - )) - .await - .unwrap() - .entry_id; - assert_eq!(0, id); - - let stream = logstore.read(&ns, 0).await.unwrap(); - tokio::pin!(stream); - - let entries = stream.next().await.unwrap().unwrap(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].id(), 0); - assert_eq!(42, entries[0].namespace_id); - } - - #[tokio::test] - pub async fn test_namespace() { - common_telemetry::logging::init_default_ut_logging(); - let dir = TempDir::new("greptimedb").unwrap(); - let config = LogConfig { - append_buffer_size: 128, - max_log_file_size: 1024 * 1024, - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }; - let logstore = LocalFileLogStore::open(&config).await.unwrap(); - assert_eq!( - 0, - logstore - .append(EntryImpl::new( - generate_data(96), - 0, - LocalNamespace::new(42), - )) - .await - .unwrap() - .entry_id - ); - - assert_eq!( - 1, - logstore - .append(EntryImpl::new( - generate_data(96), - 1, - LocalNamespace::new(43), - )) - .await - .unwrap() - .entry_id - ); - - let stream = logstore.read(&LocalNamespace::new(42), 0).await.unwrap(); - tokio::pin!(stream); - - let entries = stream.next().await.unwrap().unwrap(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].id(), 0); - assert_eq!(42, entries[0].namespace_id); - - let stream = logstore.read(&LocalNamespace::new(43), 0).await.unwrap(); - tokio::pin!(stream); - - let entries = stream.next().await.unwrap().unwrap(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].id(), 1); - assert_eq!(43, entries[0].namespace_id); - } - - #[test] - fn test_find_files_to_delete() { - let file_map = vec![(1u64, ()), (11u64, ()), (21u64, ()), (31u64, ())] - .into_iter() - .collect::>(); - - assert!(find_files_to_delete(&file_map, 0).is_empty()); - assert!(find_files_to_delete(&file_map, 1).is_empty()); - assert!(find_files_to_delete(&file_map, 2).is_empty()); - assert!(find_files_to_delete(&file_map, 10).is_empty()); - - assert_eq!(vec![1], find_files_to_delete(&file_map, 11)); - assert_eq!(vec![1], find_files_to_delete(&file_map, 20)); - assert_eq!(vec![1, 11], find_files_to_delete(&file_map, 21)); - - assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 31)); - assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 100)); - } - - #[tokio::test] - async fn test_find_lowest_id() { - common_telemetry::logging::init_default_ut_logging(); - let dir = TempDir::new("greptimedb-log-compact").unwrap(); - let config = LogConfig { - append_buffer_size: 128, - max_log_file_size: 4096, - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }; - let logstore = LocalFileLogStore::open(&config).await.unwrap(); - assert!(find_lowest_id(logstore.obsolete_ids.clone()) - .await - .is_none()); - - logstore - .obsolete(LocalNamespace::new(1), 100) - .await - .unwrap(); - assert_eq!( - Some(100), - find_lowest_id(logstore.obsolete_ids.clone()).await - ); - - logstore - .obsolete(LocalNamespace::new(2), 200) - .await - .unwrap(); - assert_eq!( - Some(100), - find_lowest_id(logstore.obsolete_ids.clone()).await - ); - - logstore - .obsolete(LocalNamespace::new(1), 101) - .await - .unwrap(); - assert_eq!( - Some(101), - find_lowest_id(logstore.obsolete_ids.clone()).await - ); - - logstore - .obsolete(LocalNamespace::new(2), 202) - .await - .unwrap(); - assert_eq!( - Some(101), - find_lowest_id(logstore.obsolete_ids.clone()).await - ); - - logstore - .obsolete(LocalNamespace::new(1), 300) - .await - .unwrap(); - assert_eq!( - Some(202), - find_lowest_id(logstore.obsolete_ids.clone()).await - ); - } - - #[tokio::test] - async fn test_compact_log_file() { - common_telemetry::logging::init_default_ut_logging(); - let dir = TempDir::new("greptimedb-log-compact").unwrap(); - let config = LogConfig { - append_buffer_size: 128, - max_log_file_size: 4096, - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }; - let logstore = LocalFileLogStore::open(&config).await.unwrap(); - - for id in 0..50 { - logstore - .append(EntryImpl::new( - generate_data(990), - id, - LocalNamespace::new(42), - )) - .await - .unwrap(); - } - - assert_eq!( - vec![0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48], - logstore - .files - .read() - .await - .keys() - .copied() - .collect::>() - ); - - gc_inner(logstore.files.clone(), 10).await.unwrap(); - - assert_eq!( - vec![8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48], - logstore - .files - .read() - .await - .keys() - .copied() - .collect::>() - ); - - gc_inner(logstore.files.clone(), 28).await.unwrap(); - - assert_eq!( - vec![28, 32, 36, 40, 44, 48], - logstore - .files - .read() - .await - .keys() - .copied() - .collect::>() - ); - - gc_inner(logstore.files.clone(), 50).await.unwrap(); - - assert_eq!( - vec![48], - logstore - .files - .read() - .await - .keys() - .copied() - .collect::>() - ); - } - - #[tokio::test] - async fn test_gc_task() { - common_telemetry::logging::init_default_ut_logging(); - let dir = TempDir::new("greptimedb-log-compact").unwrap(); - let config = LogConfig { - append_buffer_size: 128, - max_log_file_size: 4096, - log_file_dir: dir.path().to_str().unwrap().to_string(), - gc_interval: Duration::from_millis(100), - }; - let logstore = LocalFileLogStore::open(&config).await.unwrap(); - logstore.start().await.unwrap(); - - for id in 0..50 { - logstore - .append(EntryImpl::new( - generate_data(990), - id, - LocalNamespace::new(42), - )) - .await - .unwrap(); - } - logstore - .obsolete(LocalNamespace::new(42), 30) - .await - .unwrap(); - tokio::time::sleep(Duration::from_millis(150)).await; - let file_ids = logstore - .files - .read() - .await - .keys() - .cloned() - .collect::>(); - assert_eq!(vec![28, 32, 36, 40, 44, 48], file_ids); - - let mut files = vec![]; - let mut readir = tokio::fs::read_dir(dir.path()).await.unwrap(); - while let Some(r) = readir.next_entry().await.transpose() { - let entry = r.unwrap(); - files.push(entry.file_name().to_str().unwrap().to_string()); - } - - assert_eq!( - vec![ - "00000000000000000028.log".to_string(), - "00000000000000000048.log".to_string(), - "00000000000000000040.log".to_string(), - "00000000000000000044.log".to_string(), - "00000000000000000036.log".to_string(), - "00000000000000000032.log".to_string() - ] - .into_iter() - .collect::>(), - files.into_iter().collect::>() - ); - } -} diff --git a/src/log-store/src/fs/namespace.rs b/src/log-store/src/fs/namespace.rs deleted file mode 100644 index 05203903b7..0000000000 --- a/src/log-store/src/fs/namespace.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use store_api::logstore::namespace::{Id, Namespace}; - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct LocalNamespace { - pub(crate) id: Id, -} - -impl Default for LocalNamespace { - fn default() -> Self { - LocalNamespace::new(0) - } -} - -impl LocalNamespace { - pub(crate) fn new(id: Id) -> Self { - Self { id } - } -} - -impl Namespace for LocalNamespace { - fn id(&self) -> Id { - self.id - } -} diff --git a/src/log-store/src/fs/noop.rs b/src/log-store/src/fs/noop.rs deleted file mode 100644 index 099d3a9ce9..0000000000 --- a/src/log-store/src/fs/noop.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use store_api::logstore::entry::Id; -use store_api::logstore::namespace::Id as NamespaceId; -use store_api::logstore::LogStore; - -use crate::error::{Error, Result}; -use crate::fs::entry::EntryImpl; -use crate::fs::namespace::LocalNamespace; -use crate::fs::AppendResponseImpl; - -/// A noop log store which only for test -// TODO: Add a test feature -#[derive(Debug, Default)] -pub struct NoopLogStore; - -#[async_trait::async_trait] -impl LogStore for NoopLogStore { - type Error = Error; - type Namespace = LocalNamespace; - type Entry = EntryImpl; - type AppendResponse = AppendResponseImpl; - - async fn start(&self) -> Result<()> { - Ok(()) - } - - async fn stop(&self) -> Result<()> { - Ok(()) - } - - async fn append(&self, mut _e: Self::Entry) -> Result { - Ok(AppendResponseImpl { - entry_id: 0, - offset: 0, - }) - } - - async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec) -> Result { - todo!() - } - - async fn read( - &self, - _ns: &Self::Namespace, - _id: Id, - ) -> Result> - { - todo!() - } - - async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { - todo!() - } - - async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { - todo!() - } - - async fn list_namespaces(&self) -> Result> { - todo!() - } - - fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { - EntryImpl::new(data, id, ns) - } - - fn namespace(&self, id: NamespaceId) -> Self::Namespace { - LocalNamespace::new(id) - } - - async fn obsolete( - &self, - namespace: Self::Namespace, - id: Id, - ) -> std::result::Result<(), Self::Error> { - let _ = namespace; - let _ = id; - Ok(()) - } -} diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 0cd3815d07..2d12d124f6 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -12,7 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod config; pub mod error; -pub mod fs; - +mod noop; +pub mod raft_engine; pub mod test_util; + +pub use config::LogConfig; +pub use noop::NoopLogStore; diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs new file mode 100644 index 0000000000..38c2fe6aab --- /dev/null +++ b/src/log-store/src/noop.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::logstore::entry::{Entry, Id}; +use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; +use store_api::logstore::{AppendResponse, LogStore}; + +use crate::error::{Error, Result}; + +/// A noop log store which only for test +#[derive(Debug, Default)] +pub struct NoopLogStore; + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct EntryImpl; + +#[derive(Debug, Clone, Default, Hash, PartialEq)] +pub struct NamespaceImpl; + +impl Namespace for NamespaceImpl { + fn id(&self) -> NamespaceId { + 0 + } +} + +impl Entry for EntryImpl { + type Error = Error; + type Namespace = NamespaceImpl; + + fn data(&self) -> &[u8] { + &[] + } + + fn id(&self) -> Id { + 0 + } + + fn namespace(&self) -> Self::Namespace { + Default::default() + } +} + +#[async_trait::async_trait] +impl LogStore for NoopLogStore { + type Error = Error; + type Namespace = NamespaceImpl; + type Entry = EntryImpl; + + async fn start(&self) -> Result<()> { + Ok(()) + } + + async fn stop(&self) -> Result<()> { + Ok(()) + } + + async fn append(&self, mut _e: Self::Entry) -> Result { + Ok(AppendResponse { entry_id: 0 }) + } + + async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec) -> Result> { + Ok(vec![]) + } + + async fn read( + &self, + _ns: &Self::Namespace, + _id: Id, + ) -> Result> + { + todo!() + } + + async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { + Ok(()) + } + + async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { + Ok(()) + } + + async fn list_namespaces(&self) -> Result> { + Ok(vec![]) + } + + fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { + let _ = data; + let _ = id; + let _ = ns; + EntryImpl::default() + } + + fn namespace(&self, id: NamespaceId) -> Self::Namespace { + let _ = id; + NamespaceImpl::default() + } + + async fn obsolete( + &self, + namespace: Self::Namespace, + id: Id, + ) -> std::result::Result<(), Self::Error> { + let _ = namespace; + let _ = id; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mock_entry() { + let e = EntryImpl::default(); + assert_eq!(0, e.data().len()); + assert_eq!(0, e.id()); + } + + #[tokio::test] + async fn test_noop_logstore() { + let mut store = NoopLogStore::default(); + store.start().await.unwrap(); + let e = store.entry("".as_bytes(), 1, NamespaceImpl::default()); + store.append(e.clone()).await.unwrap(); + store + .append_batch(&NamespaceImpl::default(), vec![e]) + .await + .unwrap(); + store + .create_namespace(&NamespaceImpl::default()) + .await + .unwrap(); + assert_eq!(0, store.list_namespaces().await.unwrap().len()); + store + .delete_namespace(&NamespaceImpl::default()) + .await + .unwrap(); + assert_eq!(NamespaceImpl::default(), store.namespace(0)); + store.obsolete(NamespaceImpl::default(), 1).await.unwrap(); + } +} diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs new file mode 100644 index 0000000000..251bccbcf0 --- /dev/null +++ b/src/log-store/src/raft_engine.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::{Hash, Hasher}; + +use store_api::logstore::entry::{Entry, Id}; +use store_api::logstore::namespace::Namespace; + +use crate::error::Error; +use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; + +pub mod log_store; + +pub mod protos { + include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs"))); +} + +impl EntryImpl { + pub fn create(id: u64, ns: u64, data: Vec) -> Self { + Self { + id, + namespace_id: ns, + data, + ..Default::default() + } + } +} + +impl NamespaceImpl { + pub fn with_id(id: Id) -> Self { + Self { + id, + ..Default::default() + } + } +} + +#[allow(clippy::derive_hash_xor_eq)] +impl Hash for NamespaceImpl { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl Namespace for NamespaceImpl { + fn id(&self) -> store_api::logstore::namespace::Id { + self.id + } +} + +impl Entry for EntryImpl { + type Error = Error; + type Namespace = NamespaceImpl; + + fn data(&self) -> &[u8] { + self.data.as_slice() + } + + fn id(&self) -> Id { + self.id + } + + fn namespace(&self) -> Self::Namespace { + NamespaceImpl { + id: self.namespace_id, + ..Default::default() + } + } +} diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs new file mode 100644 index 0000000000..4231767f6d --- /dev/null +++ b/src/log-store/src/raft_engine/log_store.rs @@ -0,0 +1,547 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use async_stream::stream; +use common_telemetry::{error, info}; +use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::logstore::entry::Id; +use store_api::logstore::entry_stream::SendableEntryStream; +use store_api::logstore::namespace::Namespace as NamespaceTrait; +use store_api::logstore::{AppendResponse, LogStore}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::config::LogConfig; +use crate::error::{ + AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, + RaftEngineSnafu, WaitGcTaskStopSnafu, +}; +use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace}; + +const NAMESPACE_PREFIX: &str = "__sys_namespace_"; +const SYSTEM_NAMESPACE: u64 = 0; + +pub struct RaftEngineLogStore { + config: LogConfig, + engine: Arc, + cancel_token: Mutex>, + gc_task_handle: Mutex>>, + started: AtomicBool, +} + +impl RaftEngineLogStore { + pub fn try_new(config: LogConfig) -> Result { + // TODO(hl): set according to available disk space + let raft_engine_config = Config { + dir: config.log_file_dir.clone(), + purge_threshold: ReadableSize(config.purge_threshold as u64), + recovery_mode: RecoveryMode::TolerateTailCorruption, + batch_compression_threshold: ReadableSize::kb(8), + target_file_size: ReadableSize(config.max_log_file_size as u64), + ..Default::default() + }; + let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); + Ok(Self { + config, + engine, + cancel_token: Mutex::new(None), + gc_task_handle: Mutex::new(None), + started: AtomicBool::new(false), + }) + } + + pub fn started(&self) -> bool { + self.started.load(Ordering::Relaxed) + } +} + +impl Debug for RaftEngineLogStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RaftEngineLogsStore") + .field("config", &self.config) + .field("started", &self.started.load(Ordering::Relaxed)) + .finish() + } +} + +#[async_trait::async_trait] +impl LogStore for RaftEngineLogStore { + type Error = Error; + type Namespace = Namespace; + type Entry = Entry; + + async fn start(&self) -> Result<(), Self::Error> { + let engine_clone = self.engine.clone(); + let interval = self.config.gc_interval; + let token = CancellationToken::new(); + let child = token.child_token(); + // TODO(hl): Maybe spawn to a blocking runtime. + let handle = common_runtime::spawn_bg(async move { + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => {} + _ = child.cancelled() => { + info!("LogStore gc task has been cancelled"); + return; + } + } + match engine_clone.purge_expired_files().context(RaftEngineSnafu) { + Ok(res) => { + // TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact, + // which is useful when monitoring regions failed to flush it's memtable to SSTs. + info!( + "Successfully purged logstore files, namespaces need compaction: {:?}", + res + ); + } + Err(e) => { + error!(e; "Failed to purge files in logstore"); + } + } + } + }); + *self.cancel_token.lock().await = Some(token); + *self.gc_task_handle.lock().await = Some(handle); + self.started.store(true, Ordering::Relaxed); + info!("RaftEngineLogStore started with config: {:?}", self.config); + Ok(()) + } + + async fn stop(&self) -> Result<(), Self::Error> { + ensure!( + self.started + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok(), + IllegalStateSnafu + ); + let handle = self + .gc_task_handle + .lock() + .await + .take() + .context(IllegalStateSnafu)?; + let token = self + .cancel_token + .lock() + .await + .take() + .context(IllegalStateSnafu)?; + token.cancel(); + handle.await.context(WaitGcTaskStopSnafu)?; + info!("RaftEngineLogStore stopped"); + Ok(()) + } + + /// Append an entry to logstore. Currently of existence of entry's namespace is not checked. + async fn append(&self, e: Self::Entry) -> Result { + ensure!(self.started(), IllegalStateSnafu); + let entry_id = e.id; + let mut batch = LogBatch::with_capacity(1); + batch + .add_entries::(e.namespace_id, &[e]) + .context(AddEntryLogBatchSnafu)?; + + self.engine + .write(&mut batch, self.config.sync_write) + .context(RaftEngineSnafu)?; + Ok(AppendResponse { entry_id }) + } + + /// Append a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of + /// batch append. + async fn append_batch( + &self, + ns: &Self::Namespace, + entries: Vec, + ) -> Result, Self::Error> { + ensure!(self.started(), IllegalStateSnafu); + let entry_ids = entries.iter().map(Entry::get_id).collect::>(); + let mut batch = LogBatch::with_capacity(entries.len()); + batch + .add_entries::(ns.id, &entries) + .context(AddEntryLogBatchSnafu)?; + self.engine + .write(&mut batch, self.config.sync_write) + .context(RaftEngineSnafu)?; + Ok(entry_ids) + } + + /// Create a stream of entries from logstore in the given namespace. The end of stream is + /// determined by the current "last index" of the namespace. + async fn read( + &self, + ns: &Self::Namespace, + id: Id, + ) -> Result, Self::Error> { + ensure!(self.started(), IllegalStateSnafu); + let engine = self.engine.clone(); + + let last_index = engine.last_index(ns.id).unwrap_or(0); + let mut start_index = id.max(engine.first_index(ns.id).unwrap_or(last_index + 1)); + + let max_batch_size = self.config.read_batch_size; + let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size); + let ns = ns.clone(); + common_runtime::spawn_read(async move { + while start_index <= last_index { + let mut vec = Vec::with_capacity(max_batch_size); + match engine + .fetch_entries_to::( + ns.id, + start_index, + last_index + 1, + Some(max_batch_size), + &mut vec, + ) + .context(FetchEntrySnafu { + ns: ns.id, + start: start_index, + end: last_index, + max_size: max_batch_size, + }) { + Ok(_) => { + if let Some(last_entry) = vec.last() { + start_index = last_entry.id + 1; + } + // reader side closed, cancel following reads + if tx.send(Ok(vec)).await.is_err() { + break; + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + let s = stream!({ + while let Some(res) = rx.recv().await { + yield res; + } + }); + Ok(Box::pin(s)) + } + + async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> { + ensure!( + ns.id != SYSTEM_NAMESPACE, + IllegalNamespaceSnafu { ns: ns.id } + ); + ensure!(self.started(), IllegalStateSnafu); + let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec(); + let mut batch = LogBatch::with_capacity(1); + batch + .put_message::(SYSTEM_NAMESPACE, key, ns) + .context(RaftEngineSnafu)?; + self.engine + .write(&mut batch, true) + .context(RaftEngineSnafu)?; + Ok(()) + } + + async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> { + ensure!( + ns.id != SYSTEM_NAMESPACE, + IllegalNamespaceSnafu { ns: ns.id } + ); + ensure!(self.started(), IllegalStateSnafu); + let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec(); + let mut batch = LogBatch::with_capacity(1); + batch.delete(SYSTEM_NAMESPACE, key); + self.engine + .write(&mut batch, true) + .context(RaftEngineSnafu)?; + Ok(()) + } + + async fn list_namespaces(&self) -> Result, Self::Error> { + ensure!(self.started(), IllegalStateSnafu); + let mut namespaces: Vec = vec![]; + self.engine + .scan_messages::( + SYSTEM_NAMESPACE, + Some(NAMESPACE_PREFIX.as_bytes()), + None, + false, + |_, v| { + namespaces.push(v); + true + }, + ) + .context(RaftEngineSnafu)?; + Ok(namespaces) + } + + fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { + Entry { + id, + data: data.as_ref().to_vec(), + namespace_id: ns.id(), + ..Default::default() + } + } + + fn namespace(&self, id: store_api::logstore::namespace::Id) -> Self::Namespace { + Namespace { + id, + ..Default::default() + } + } + + async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error> { + ensure!(self.started(), IllegalStateSnafu); + let obsoleted = self.engine.compact_to(namespace.id(), id + 1); + info!( + "Namespace {} obsoleted {} entries", + namespace.id(), + obsoleted + ); + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct MessageType; + +impl MessageExt for MessageType { + type Entry = Entry; + + fn index(e: &Self::Entry) -> u64 { + e.id + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::time::Duration; + + use common_telemetry::debug; + use futures_util::StreamExt; + use raft_engine::ReadableSize; + use store_api::logstore::entry_stream::SendableEntryStream; + use store_api::logstore::namespace::Namespace as NamespaceTrait; + use store_api::logstore::LogStore; + use tempdir::TempDir; + + use crate::config::LogConfig; + use crate::error::Error; + use crate::raft_engine::log_store::RaftEngineLogStore; + use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace}; + + #[tokio::test] + async fn test_open_logstore() { + let dir = TempDir::new("raft-engine-logstore-test").unwrap(); + let logstore = RaftEngineLogStore::try_new(LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap(); + logstore.start().await.unwrap(); + let namespaces = logstore.list_namespaces().await.unwrap(); + assert_eq!(0, namespaces.len()); + } + + #[tokio::test] + async fn test_manage_namespace() { + let dir = TempDir::new("raft-engine-logstore-test").unwrap(); + let mut logstore = RaftEngineLogStore::try_new(LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap(); + logstore.start().await.unwrap(); + assert!(logstore.list_namespaces().await.unwrap().is_empty()); + + logstore + .create_namespace(&Namespace::with_id(42)) + .await + .unwrap(); + let namespaces = logstore.list_namespaces().await.unwrap(); + assert_eq!(1, namespaces.len()); + assert_eq!(Namespace::with_id(42), namespaces[0]); + + logstore + .delete_namespace(&Namespace::with_id(42)) + .await + .unwrap(); + assert!(logstore.list_namespaces().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_append_and_read() { + let dir = TempDir::new("raft-engine-logstore-test").unwrap(); + let logstore = RaftEngineLogStore::try_new(LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap(); + logstore.start().await.unwrap(); + + let namespace = Namespace::with_id(1); + let cnt = 1024; + for i in 0..cnt { + let response = logstore + .append(Entry::create( + i, + namespace.id, + i.to_string().as_bytes().to_vec(), + )) + .await + .unwrap(); + assert_eq!(i, response.entry_id); + } + let mut entries = HashSet::with_capacity(1024); + let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap(); + while let Some(r) = s.next().await { + let vec = r.unwrap(); + entries.extend(vec.into_iter().map(|e| e.id)); + } + assert_eq!((0..cnt).into_iter().collect::>(), entries); + } + + async fn collect_entries(mut s: SendableEntryStream<'_, Entry, Error>) -> Vec { + let mut res = vec![]; + while let Some(r) = s.next().await { + res.extend(r.unwrap()); + } + res + } + + #[tokio::test] + async fn test_reopen() { + let dir = TempDir::new("raft-engine-logstore-reopen-test").unwrap(); + { + let logstore = RaftEngineLogStore::try_new(LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap(); + logstore.start().await.unwrap(); + logstore + .append(Entry::create(1, 1, "1".as_bytes().to_vec())) + .await + .unwrap(); + let entries = logstore + .read(&Namespace::with_id(1), 1) + .await + .unwrap() + .collect::>() + .await; + assert_eq!(1, entries.len()); + logstore.stop().await.unwrap(); + } + + let logstore = RaftEngineLogStore::try_new(LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap(); + logstore.start().await.unwrap(); + + let entries = + collect_entries(logstore.read(&Namespace::with_id(1), 1).await.unwrap()).await; + assert_eq!(1, entries.len()); + assert_eq!(1, entries[0].id); + assert_eq!(1, entries[0].namespace_id); + } + + async fn wal_dir_usage(path: impl AsRef) -> usize { + let mut size: usize = 0; + let mut read_dir = tokio::fs::read_dir(path.as_ref()).await.unwrap(); + while let Ok(dir_entry) = read_dir.next_entry().await { + let Some(entry) = dir_entry else { + break; + }; + if entry.file_type().await.unwrap().is_file() { + let file_name = entry.file_name(); + let file_size = entry.metadata().await.unwrap().len() as usize; + debug!("File: {file_name:?}, size: {file_size}"); + size += file_size; + } + } + size + } + + #[tokio::test] + async fn test_compaction() { + common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("raft-engine-logstore-test").unwrap(); + + let config = LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + max_log_file_size: ReadableSize::mb(2).0 as usize, + purge_threshold: ReadableSize::mb(4).0 as usize, + gc_interval: Duration::from_secs(5), + ..Default::default() + }; + + let logstore = RaftEngineLogStore::try_new(config).unwrap(); + logstore.start().await.unwrap(); + let namespace = Namespace::with_id(42); + for id in 0..4096 { + let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); + logstore.append(entry).await.unwrap(); + } + + let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; + logstore.obsolete(namespace, 4000).await.unwrap(); + + tokio::time::sleep(Duration::from_secs(6)).await; + let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; + debug!( + "Before purge: {}, after purge: {}", + before_purge, after_purge + ); + assert!(before_purge > after_purge); + } + + #[tokio::test] + async fn test_obsolete() { + common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("raft-engine-logstore-test").unwrap(); + + let config = LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + max_log_file_size: ReadableSize::mb(2).0 as usize, + purge_threshold: ReadableSize::mb(4).0 as usize, + gc_interval: Duration::from_secs(5), + ..Default::default() + }; + + let logstore = RaftEngineLogStore::try_new(config).unwrap(); + logstore.start().await.unwrap(); + let namespace = Namespace::with_id(42); + for id in 0..1024 { + let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); + logstore.append(entry).await.unwrap(); + } + + logstore.obsolete(namespace.clone(), 100).await.unwrap(); + assert_eq!(101, logstore.engine.first_index(namespace.id).unwrap()); + + let res = logstore.read(&namespace, 100).await.unwrap(); + let mut vec = collect_entries(res).await; + vec.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap()); + assert_eq!(101, vec.first().unwrap().id); + } +} diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index a8d4d24f88..06143712e6 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use store_api::logstore::LogStore; use tempdir::TempDir; -use crate::fs::config::LogConfig; -use crate::fs::log::LocalFileLogStore; +use crate::raft_engine::log_store::RaftEngineLogStore; +use crate::LogConfig; /// Create a tmp directory for write log, used for test. // TODO: Add a test feature -pub async fn create_tmp_local_file_log_store(dir: &str) -> (LocalFileLogStore, TempDir) { +pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, TempDir) { let dir = TempDir::new(dir).unwrap(); let cfg = LogConfig { append_buffer_size: 128, @@ -28,5 +29,7 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (LocalFileLogStore, T ..Default::default() }; - (LocalFileLogStore::open(&cfg).await.unwrap(), dir) + let logstore = RaftEngineLogStore::try_new(cfg).unwrap(); + logstore.start().await.unwrap(); + (logstore, dir) } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index cab3d527dc..4ad5da8416 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -527,7 +527,7 @@ mod tests { use datatypes::vectors::{ Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef, }; - use log_store::fs::noop::NoopLogStore; + use log_store::NoopLogStore; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use store_api::manifest::Manifest; diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 80f736c26d..e35cd686fe 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -20,7 +20,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; use datatypes::vectors::VectorRef; -use log_store::fs::noop::NoopLogStore; +use log_store::NoopLogStore; use object_store::services::fs::Builder; use object_store::ObjectStore; use storage::config::EngineConfig as StorageEngineConfig; diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index f084aab74b..4494624ac4 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -23,7 +23,7 @@ python = [ ] [dependencies] -async-trait = "0.1" +async-trait.workspace = true catalog = { path = "../catalog" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } @@ -33,26 +33,28 @@ common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } console = "0.15" +crossbeam-utils = "0.8.14" datafusion = { workspace = true, optional = true } datafusion-common = { workspace = true, optional = true } datafusion-expr = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } datatypes = { path = "../datatypes" } -futures = "0.3" futures-util = "0.3" -paste = { version = "1.0", optional = true } +futures.workspace = true +once_cell = "1.17.0" +paste = { workspace = true, optional = true } query = { path = "../query" } # TODO(discord9): This is a forked and tweaked version of RustPython, please update it to newest original RustPython After Update toolchain to 1.65 -rustpython-ast = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } -rustpython-codegen = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } -rustpython-compiler = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } -rustpython-compiler-core = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } -rustpython-parser = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } -rustpython-pylib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab", features = [ +rustpython-ast = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" } +rustpython-codegen = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" } +rustpython-compiler = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" } +rustpython-compiler-core = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" } +rustpython-parser = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" } +rustpython-pylib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537", features = [ "freeze-stdlib", ] } -rustpython-stdlib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } -rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab", features = [ +rustpython-stdlib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" } +rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537", features = [ "default", "codegen", ] } @@ -68,6 +70,7 @@ mito = { path = "../mito", features = ["test"] } ron = "0.7" serde = { version = "1.0", features = ["derive"] } storage = { path = "../storage" } +store-api = { path = "../store-api" } tempdir = "0.3" tokio = { version = "1.18", features = ["full"] } tokio-test = "0.4" diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 1949c77233..d01d1f961f 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -101,12 +101,15 @@ mod tests { use query::QueryEngineFactory; use super::*; - type DefaultEngine = MitoEngine>; - use log_store::fs::config::LogConfig; - use log_store::fs::log::LocalFileLogStore; + + type DefaultEngine = MitoEngine>; + + use log_store::raft_engine::log_store::RaftEngineLogStore; + use log_store::LogConfig; use mito::engine::MitoEngine; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; + use store_api::logstore::LogStore; use tempdir::TempDir; #[tokio::test] @@ -121,7 +124,8 @@ mod tests { ..Default::default() }; - let log_store = LocalFileLogStore::open(&log_config).await.unwrap(); + let log_store = RaftEngineLogStore::try_new(log_config).unwrap(); + log_store.start().await.unwrap(); let mock_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index bc8fa292af..5c88600156 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -245,7 +245,7 @@ impl Job for FlushJob { #[cfg(test)] mod tests { - use log_store::fs::noop::NoopLogStore; + use log_store::NoopLogStore; use regex::Regex; use super::*; diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index d14040ecdf..67d112c714 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -26,8 +26,8 @@ use datatypes::prelude::{ScalarVector, WrapperType}; use datatypes::timestamp::TimestampMillisecond; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; -use log_store::fs::log::LocalFileLogStore; -use log_store::fs::noop::NoopLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use log_store::NoopLogStore; use object_store::backend::fs; use object_store::ObjectStore; use store_api::storage::{ @@ -67,6 +67,10 @@ impl TesterBase { } } + pub async fn close(&self) { + self.region.inner.wal.close().await.unwrap(); + } + /// Put without version specified. /// /// Format of data: (timestamp, v0), timestamp is key, v0 is value. @@ -126,7 +130,7 @@ impl TesterBase { } } -pub type FileTesterBase = TesterBase; +pub type FileTesterBase = TesterBase; fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { if enable_version_column { diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index 238f2bd094..cc3898025a 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use datatypes::prelude::*; use datatypes::timestamp::TimestampMillisecond; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; -use log_store::fs::log::LocalFileLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, @@ -34,7 +34,7 @@ use crate::test_util::descriptor_util::RegionDescBuilder; const REGION_NAME: &str = "region-alter-0"; -async fn create_region_for_alter(store_dir: &str) -> RegionImpl { +async fn create_region_for_alter(store_dir: &str) -> RegionImpl { // Always disable version column in this test. let metadata = tests::new_metadata(REGION_NAME, false); @@ -103,6 +103,9 @@ impl AlterTester { async fn reopen(&mut self) { // Close the old region. + if let Some(base) = self.base.as_ref() { + base.close().await; + } self.base = None; // Reopen the region. let store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await; diff --git a/src/storage/src/region/tests/basic.rs b/src/storage/src/region/tests/basic.rs index c5352669a5..a878437788 100644 --- a/src/storage/src/region/tests/basic.rs +++ b/src/storage/src/region/tests/basic.rs @@ -1,10 +1,10 @@ -// Copyright 2022 Greptime Team +// Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +14,8 @@ //! Region read/write tests. -use log_store::fs::log::LocalFileLogStore; +use common_telemetry::info; +use log_store::raft_engine::log_store::RaftEngineLogStore; use store_api::storage::{OpenOptions, SequenceNumber, WriteResponse}; use tempdir::TempDir; @@ -30,7 +31,7 @@ async fn create_region_for_basic( region_name: &str, store_dir: &str, enable_version_column: bool, -) -> RegionImpl { +) -> RegionImpl { let metadata = tests::new_metadata(region_name, enable_version_column); let store_config = config_util::new_store_config(region_name, store_dir).await; @@ -70,6 +71,11 @@ impl Tester { async fn try_reopen(&mut self) -> Result { // Close the old region. + if let Some(base) = self.base.as_ref() { + info!("Reopen tester base"); + base.close().await; + } + self.base = None; // Reopen the region. let store_config = config_util::new_store_config(&self.region_name, &self.store_dir).await; diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index eadb4d5b5b..7fe84c51ff 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use log_store::fs::log::LocalFileLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; use store_api::storage::{OpenOptions, WriteResponse}; use tempdir::TempDir; @@ -34,7 +34,7 @@ async fn create_region_for_flush( store_dir: &str, enable_version_column: bool, flush_strategy: FlushStrategyRef, -) -> RegionImpl { +) -> RegionImpl { let metadata = tests::new_metadata(REGION_NAME, enable_version_column); let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await; @@ -63,6 +63,9 @@ impl FlushTester { async fn reopen(&mut self) { // Close the old region. + if let Some(base) = self.base.as_ref() { + base.close().await; + } self.base = None; // Reopen the region. let mut store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await; diff --git a/src/storage/src/region/tests/projection.rs b/src/storage/src/region/tests/projection.rs index 8a9536f5c2..3799b0075d 100644 --- a/src/storage/src/region/tests/projection.rs +++ b/src/storage/src/region/tests/projection.rs @@ -19,7 +19,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; -use log_store::fs::log::LocalFileLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; use store_api::logstore::LogStore; use store_api::storage::{ Chunk, ChunkReader, ReadContext, Region, ScanRequest, Snapshot, WriteContext, WriteRequest, @@ -166,7 +166,7 @@ impl ProjectionTester { const REGION_NAME: &str = "region-projection-0"; -async fn new_tester(store_dir: &str) -> ProjectionTester { +async fn new_tester(store_dir: &str) -> ProjectionTester { let metadata = new_metadata(REGION_NAME); let store_config = config_util::new_store_config(REGION_NAME, store_dir).await; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 41d994a5b8..38730bba5d 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -14,10 +14,11 @@ use std::sync::Arc; -use log_store::fs::config::LogConfig; -use log_store::fs::log::LocalFileLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use log_store::LogConfig; use object_store::backend::fs::Builder; use object_store::ObjectStore; +use store_api::logstore::LogStore; use crate::background::JobPoolImpl; use crate::engine; @@ -35,7 +36,7 @@ fn log_store_dir(store_dir: &str) -> String { pub async fn new_store_config( region_name: &str, store_dir: &str, -) -> StoreConfig { +) -> StoreConfig { let parent_dir = ""; let sst_dir = engine::region_sst_dir(parent_dir, region_name); let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); @@ -50,7 +51,8 @@ pub async fn new_store_config( log_file_dir: log_store_dir(store_dir), ..Default::default() }; - let log_store = Arc::new(LocalFileLogStore::open(&log_config).await.unwrap()); + let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).unwrap()); + log_store.start().await.unwrap(); StoreConfig { log_store, diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 575bd23808..5ecf3a1a37 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -19,8 +19,8 @@ use common_error::prelude::BoxedError; use futures::{stream, Stream, TryStreamExt}; use prost::Message; use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::Entry; -use store_api::logstore::{AppendResponse, LogStore}; +use store_api::logstore::entry::{Entry, Id}; +use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; use crate::codec::{Decoder, Encoder}; @@ -74,6 +74,12 @@ impl Wal { pub fn region_id(&self) -> RegionId { self.region_id } + + #[cfg(test)] + pub async fn close(&self) -> Result<()> { + let _ = self.store.stop().await; + Ok(()) + } } impl Wal { @@ -96,7 +102,7 @@ impl Wal { seq: SequenceNumber, mut header: WalHeader, payload: Option<&Payload>, - ) -> Result<(u64, usize)> { + ) -> Result { if let Some(p) = payload { header.mutation_types = wal::gen_mutation_types(p); } @@ -147,10 +153,10 @@ impl Wal { Ok(Box::pin(stream)) } - async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> { + async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result { let e = self.store.entry(bytes, seq, self.namespace.clone()); - let res = self + let response = self .store .append(e) .await @@ -159,7 +165,7 @@ impl Wal { region_id: self.region_id(), })?; - Ok((res.entry_id(), res.offset())) + Ok(response.entry_id) } fn decode_entry( @@ -247,13 +253,9 @@ mod tests { let res = wal.write(0, b"test1").await.unwrap(); - assert_eq!(0, res.0); - assert_eq!(0, res.1); - + assert_eq!(0, res); let res = wal.write(1, b"test2").await.unwrap(); - - assert_eq!(1, res.0); - assert_eq!(5 + 32, res.1); + assert_eq!(1, res); } #[tokio::test] @@ -263,18 +265,15 @@ mod tests { test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await; let wal = Wal::new(0, Arc::new(log_store)); let header = WalHeader::with_last_manifest_version(111); - let (seq_num, _) = wal.write_to_wal(3, header, None).await?; - - assert_eq!(3, seq_num); + let seq_num = 3; + wal.write_to_wal(seq_num, header, None).await?; let mut stream = wal.read_from_wal(seq_num).await?; let mut data = vec![]; while let Some((seq_num, header, write_batch)) = stream.try_next().await? { data.push((seq_num, header, write_batch)); } - assert_eq!(1, data.len()); - assert_eq!(seq_num, data[0].0); assert_eq!(111, data[0].1.last_manifest_version); assert!(data[0].2.is_none()); diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 80be436ea0..60e58e8fd1 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -16,7 +16,7 @@ use common_error::prelude::ErrorExt; -use crate::logstore::entry::{Entry, Id, Offset}; +use crate::logstore::entry::{Entry, Id}; use crate::logstore::entry_stream::SendableEntryStream; use crate::logstore::namespace::Namespace; @@ -30,21 +30,23 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Error: ErrorExt + Send + Sync + 'static; type Namespace: Namespace; type Entry: Entry; - type AppendResponse: AppendResponse; + /// Start the components and background tasks of logstore. async fn start(&self) -> Result<(), Self::Error>; + /// Stop components of logstore. async fn stop(&self) -> Result<(), Self::Error>; - /// Append an `Entry` to WAL with given namespace - async fn append(&self, mut e: Self::Entry) -> Result; + /// Append an `Entry` to WAL with given namespace and return append response containing + /// the entry id. + async fn append(&self, mut e: Self::Entry) -> Result; /// Append a batch of entries atomically and return the offset of first entry. async fn append_batch( &self, ns: &Self::Namespace, e: Vec, - ) -> Result; + ) -> Result, Self::Error>; /// Create a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. @@ -76,8 +78,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error>; } -pub trait AppendResponse: Send + Sync { - fn entry_id(&self) -> Id; - - fn offset(&self) -> Offset; +#[derive(Debug)] +pub struct AppendResponse { + pub entry_id: Id, } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 3fa7aad06d..ea8a4c8244 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_base::buffer::{Buffer, BufferMut}; use common_error::ext::ErrorExt; use crate::logstore::namespace::Namespace; @@ -22,7 +21,7 @@ pub type Epoch = u64; pub type Id = u64; /// Entry is the minimal data storage unit in `LogStore`. -pub trait Entry: Encode + Send + Sync { +pub trait Entry: Send + Sync { type Error: ErrorExt + Send + Sync; type Namespace: Namespace; @@ -32,32 +31,5 @@ pub trait Entry: Encode + Send + Sync { /// Return entry id that monotonically increments. fn id(&self) -> Id; - /// Return file offset of entry. - fn offset(&self) -> Offset; - - fn set_id(&mut self, id: Id); - - /// Returns epoch of entry. - fn epoch(&self) -> Epoch; - - fn len(&self) -> usize; - - fn is_empty(&self) -> bool; - fn namespace(&self) -> Self::Namespace; } - -pub trait Encode: Sized { - type Error: ErrorExt + Send + Sync + 'static; - - /// Encodes item to given byte slice and return encoded size on success; - /// # Panics - /// If given buffer is not large enough to hold the encoded item. - fn encode_to(&self, buf: &mut T) -> Result; - - /// Decodes item from given buffer. - fn decode(buf: &mut T) -> Result; - - /// Return the size in bytes of the encoded item; - fn encoded_size(&self) -> usize; -} diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index ac8c574bb1..3e655a5a49 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -36,18 +36,13 @@ mod tests { use futures::StreamExt; use super::*; - use crate::logstore::entry::{Encode, Epoch, Id, Offset}; + pub use crate::logstore::entry::Id; pub struct SimpleEntry { - /// Offset of current entry - offset: Offset, - /// Epoch of current entry - epoch: Epoch, /// Binary data of current entry data: Vec, } - use common_base::buffer::{Buffer, BufferMut}; use common_error::prelude::{ErrorExt, Snafu}; use snafu::{Backtrace, ErrorCompat}; @@ -74,23 +69,6 @@ mod tests { } } - impl Encode for SimpleEntry { - type Error = Error; - - fn encode_to(&self, buf: &mut T) -> Result { - buf.write_from_slice(self.data.as_slice()).unwrap(); - Ok(self.data.as_slice().len()) - } - - fn decode(_buf: &mut T) -> Result { - unimplemented!() - } - - fn encoded_size(&self) -> usize { - self.data.as_slice().len() - } - } - impl Entry for SimpleEntry { type Error = Error; type Namespace = Namespace; @@ -103,37 +81,15 @@ mod tests { 0u64 } - fn offset(&self) -> Offset { - self.offset - } - - fn set_id(&mut self, _id: Id) {} - - fn epoch(&self) -> Epoch { - self.epoch - } - - fn len(&self) -> usize { - self.data.len() - } - - fn is_empty(&self) -> bool { - self.data.is_empty() - } - fn namespace(&self) -> Self::Namespace { Namespace {} } } impl SimpleEntry { - pub fn new(data: impl AsRef<[u8]>, offset: Offset, epoch: u64) -> Self { + pub fn new(data: impl AsRef<[u8]>) -> Self { let data = data.as_ref().to_vec(); - Self { - data, - offset, - epoch, - } + Self { data } } } @@ -165,9 +121,8 @@ mod tests { #[tokio::test] pub async fn test_entry_stream() { - let stream = async_stream::stream!({ - yield Ok(vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)]) - }); + let stream = + async_stream::stream!({ yield Ok(vec![SimpleEntry::new("test_entry".as_bytes())]) }); let mut stream_impl = EntryStreamImpl { inner: Box::pin(stream), diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index d056939c95..e40d0c6bfa 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -16,6 +16,6 @@ use std::hash::Hash; pub type Id = u64; -pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + Eq { +pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq { fn id(&self) -> Id; }