feat(mito): Implement mito2 Wal (#2103)

* feat: define wal struct

* feat: Implement Wal read/write

* feat: obsolete wal

* test: test wal

* refactor: use try_stream and remove async from scan
This commit is contained in:
Yingwen
2023-08-04 20:04:25 +09:00
committed by GitHub
parent 9139962070
commit cb4dd89754
8 changed files with 433 additions and 13 deletions

5
Cargo.lock generated
View File

@@ -4114,7 +4114,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c#ec4b84931378004db60d168e2604bc3fb9735e9c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032#10c349c033dded29097d0dc933fbc2f89f658032"
dependencies = [
"prost",
"serde",
@@ -5510,12 +5510,13 @@ dependencies = [
"datafusion-common",
"datatypes",
"futures",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c)",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032)",
"lazy_static",
"log-store",
"metrics",
"object-store",
"parquet",
"prost",
"regex",
"serde",
"serde_json",

View File

@@ -33,12 +33,13 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged.
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec4b84931378004db60d168e2604bc3fb9735e9c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" }
lazy_static = "1.4"
log-store = { path = "../log-store" }
metrics.workspace = true
object-store = { path = "../object-store" }
parquet = { workspace = true, features = ["async"] }
prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -52,6 +52,9 @@ impl MitoEngine {
}
/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}

View File

@@ -15,9 +15,10 @@
use std::any::Any;
use common_datasource::compression::CompressionType;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use datatypes::arrow::error::ArrowError;
use prost::{DecodeError, EncodeError};
use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
@@ -205,6 +206,60 @@ pub enum Error {
column: String,
source: datatypes::Error,
},
#[snafu(display(
"Failed to encode WAL entry, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
EncodeWal {
region_id: RegionId,
location: Location,
source: EncodeError,
},
#[snafu(display("Failed to write WAL, location: {}, source: {}", location, source))]
WriteWal {
location: Location,
source: BoxedError,
},
#[snafu(display(
"Failed to read WAL, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
ReadWal {
region_id: RegionId,
location: Location,
source: BoxedError,
},
#[snafu(display(
"Failed to decode WAL entry, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
DecodeWal {
region_id: RegionId,
location: Location,
source: DecodeError,
},
#[snafu(display(
"Failed to delete WAL, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
DeleteWal {
region_id: RegionId,
location: Location,
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -214,9 +269,12 @@ impl ErrorExt for Error {
use Error::*;
match self {
OpenDal { .. } | WriteParquet { .. } | ReadParquet { .. } => {
StatusCode::StorageUnavailable
}
OpenDal { .. }
| WriteParquet { .. }
| ReadParquet { .. }
| WriteWal { .. }
| ReadWal { .. }
| DeleteWal { .. } => StatusCode::StorageUnavailable,
CompressObject { .. }
| DecompressObject { .. }
| SerdeJson { .. }
@@ -231,9 +289,12 @@ impl ErrorExt for Error {
| InvalidSchema { .. }
| InvalidRequest { .. }
| FillDefault { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}
RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
}
}

View File

@@ -39,6 +39,7 @@ mod region;
pub mod request;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
#[allow(dead_code)]
mod worker;

View File

@@ -40,6 +40,7 @@ use crate::worker::WorkerGroup;
pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
// TODO(yingwen): Maybe provide a way to close the log store.
}
impl Default for TestEnv {

351
src/mito2/src/wal.rs Normal file
View File

@@ -0,0 +1,351 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Write ahead log of the engine.
use std::mem;
use std::sync::Arc;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use futures::stream::BoxStream;
use futures::StreamExt;
use greptime_proto::v1::mito::WalEntry;
use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::{
DecodeWalSnafu, DeleteWalSnafu, EncodeWalSnafu, ReadWalSnafu, Result, WriteWalSnafu,
};
/// WAL entry id.
pub type EntryId = store_api::logstore::entry::Id;
/// A stream that yields tuple of WAL entry id and corresponding entry.
pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
/// Write ahead log.
///
/// All regions in the engine shares the same WAL instance.
#[derive(Debug)]
pub struct Wal<S> {
/// The underlying log store.
store: Arc<S>,
}
impl<S> Wal<S> {
/// Creates a new [Wal] from the log store.
pub fn new(store: Arc<S>) -> Self {
Self { store }
}
}
impl<S: LogStore> Wal<S> {
/// Returns a writer to write to the WAL.
pub fn writer(&self) -> WalWriter<S> {
WalWriter {
store: self.store.clone(),
entries: Vec::new(),
entry_encode_buf: Vec::new(),
}
}
/// Scan entries of specific region starting from `start_id` (inclusive).
pub fn scan(&self, region_id: RegionId, start_id: EntryId) -> Result<WalEntryStream> {
let stream = try_stream!({
let namespace = self.store.namespace(region_id.into());
let mut stream = self
.store
.read(&namespace, start_id)
.await
.map_err(BoxedError::new)
.context(ReadWalSnafu { region_id })?;
while let Some(entries) = stream.next().await {
let entries = entries
.map_err(BoxedError::new)
.context(ReadWalSnafu { region_id })?;
for entry in entries {
yield decode_entry(region_id, entry)?;
}
}
});
Ok(Box::pin(stream))
}
/// Mark entries whose ids `<= last_id` as deleted.
pub async fn obsolete(&self, region_id: RegionId, last_id: EntryId) -> Result<()> {
let namespace = self.store.namespace(region_id.into());
self.store
.obsolete(namespace, last_id)
.await
.map_err(BoxedError::new)
.context(DeleteWalSnafu { region_id })
}
}
/// Decode Wal entry from log store.
fn decode_entry<E: Entry>(region_id: RegionId, entry: E) -> Result<(EntryId, WalEntry)> {
let entry_id = entry.id();
let data = entry.data();
let wal_entry = WalEntry::decode(data).context(DecodeWalSnafu { region_id })?;
Ok((entry_id, wal_entry))
}
/// WAL batch writer.
pub struct WalWriter<S: LogStore> {
/// Log store of the WAL.
store: Arc<S>,
/// Entries to write.
entries: Vec<S::Entry>,
/// Buffer to encode WAL entry.
entry_encode_buf: Vec<u8>,
}
impl<S: LogStore> WalWriter<S> {
/// Add an wal entry for specific region to the writer's buffer.
pub fn add_entry(
&mut self,
region_id: RegionId,
entry_id: EntryId,
wal_entry: &WalEntry,
) -> Result<()> {
let namespace = self.store.namespace(region_id.into());
// Encode wal entry to log store entry.
self.entry_encode_buf.clear();
wal_entry
.encode(&mut self.entry_encode_buf)
.context(EncodeWalSnafu { region_id })?;
let entry = self
.store
.entry(&self.entry_encode_buf, entry_id, namespace);
self.entries.push(entry);
Ok(())
}
/// Write all buffered entries to the WAL.
pub async fn write_to_wal(&mut self) -> Result<()> {
// TODO(yingwen): metrics.
let entries = mem::take(&mut self.entries);
self.store
.append_batch(entries)
.await
.map_err(BoxedError::new)
.context(WriteWalSnafu)
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use futures::TryStreamExt;
use greptime_proto::v1::mito::{Mutation, OpType};
use greptime_proto::v1::{value, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use store_api::storage::SequenceNumber;
use super::*;
struct WalEnv {
_wal_dir: TempDir,
log_store: Option<Arc<RaftEngineLogStore>>,
}
impl WalEnv {
async fn new() -> WalEnv {
let wal_dir = create_temp_dir("");
let log_store =
log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
.await;
WalEnv {
_wal_dir: wal_dir,
log_store: Some(Arc::new(log_store)),
}
}
fn new_wal(&self) -> Wal<RaftEngineLogStore> {
let log_store = self.log_store.clone().unwrap();
Wal::new(log_store)
}
}
/// Create a new mutation from rows.
///
/// The row format is (string, i64).
fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
let rows = rows
.iter()
.map(|(str_col, int_col)| {
let values = vec![
Value {
value: Some(value::Value::StringValue(str_col.to_string())),
},
Value {
value: Some(value::Value::TsMillisecondValue(*int_col)),
},
];
Row { values }
})
.collect();
let schema = vec![
ColumnSchema {
column_name: "tag".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
},
];
Mutation {
op_type: op_type as i32,
sequence,
rows: Some(Rows { schema, rows }),
}
}
#[tokio::test]
async fn test_write_wal() {
let env = WalEnv::new().await;
let wal = env.new_wal();
let entry = WalEntry {
mutations: vec![
new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
],
};
let mut writer = wal.writer();
// Region 1 entry 1.
writer.add_entry(RegionId::new(1, 1), 1, &entry).unwrap();
// Region 2 entry 1.
writer.add_entry(RegionId::new(1, 2), 1, &entry).unwrap();
// Region 1 entry 2.
writer.add_entry(RegionId::new(1, 1), 2, &entry).unwrap();
// Test writing multiple region to wal.
writer.write_to_wal().await.unwrap();
}
fn sample_entries() -> Vec<WalEntry> {
vec![
WalEntry {
mutations: vec![
new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
],
},
WalEntry {
mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
},
WalEntry {
mutations: vec![
new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
],
},
WalEntry {
mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
},
]
}
fn check_entries(
expect: &[WalEntry],
expect_start_id: EntryId,
actual: &[(EntryId, WalEntry)],
) {
for (idx, (expect_entry, (actual_id, actual_entry))) in
expect.iter().zip(actual.iter()).enumerate()
{
let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
assert_eq!(expect_id_entry, (*actual_id, actual_entry));
}
assert_eq!(expect.len(), actual.len());
}
#[tokio::test]
async fn test_scan_wal() {
let env = WalEnv::new().await;
let wal = env.new_wal();
let entries = sample_entries();
let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
let mut writer = wal.writer();
writer.add_entry(id1, 1, &entries[0]).unwrap();
// Insert one entry into region2. Scan should not return this entry.
writer.add_entry(id2, 1, &entries[0]).unwrap();
writer.add_entry(id1, 2, &entries[1]).unwrap();
writer.add_entry(id1, 3, &entries[2]).unwrap();
writer.add_entry(id1, 4, &entries[3]).unwrap();
writer.write_to_wal().await.unwrap();
// Scan all contents region1
let stream = wal.scan(id1, 1).unwrap();
let actual: Vec<_> = stream.try_collect().await.unwrap();
check_entries(&entries, 1, &actual);
// Scan parts of contents
let stream = wal.scan(id1, 2).unwrap();
let actual: Vec<_> = stream.try_collect().await.unwrap();
check_entries(&entries[1..], 2, &actual);
// Scan out of range
let stream = wal.scan(id1, 5).unwrap();
let actual: Vec<_> = stream.try_collect().await.unwrap();
assert!(actual.is_empty());
}
#[tokio::test]
async fn test_obsolete_wal() {
let env = WalEnv::new().await;
let wal = env.new_wal();
let entries = sample_entries();
let mut writer = wal.writer();
let region_id = RegionId::new(1, 1);
writer.add_entry(region_id, 1, &entries[0]).unwrap();
writer.add_entry(region_id, 2, &entries[1]).unwrap();
writer.add_entry(region_id, 3, &entries[2]).unwrap();
writer.write_to_wal().await.unwrap();
// Delete 1, 2.
wal.obsolete(region_id, 2).await.unwrap();
// Put 4.
let mut writer = wal.writer();
writer.add_entry(region_id, 4, &entries[3]).unwrap();
writer.write_to_wal().await.unwrap();
// Scan all
let stream = wal.scan(region_id, 1).unwrap();
let actual: Vec<_> = stream.try_collect().await.unwrap();
check_entries(&entries[2..], 3, &actual);
}
}

View File

@@ -39,6 +39,7 @@ use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{RegionMap, RegionMapRef};
use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest};
use crate::wal::Wal;
/// Identifier for a worker.
pub(crate) type WorkerId = u32;
@@ -179,7 +180,7 @@ impl RegionWorker {
config,
regions: regions.clone(),
receiver,
log_store,
wal: Wal::new(log_store),
object_store,
running: running.clone(),
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
@@ -274,8 +275,8 @@ struct RegionWorkerLoop<S> {
regions: RegionMapRef,
/// Request receiver.
receiver: Receiver<WorkerRequest>,
// TODO(yingwen): Replaced by Wal.
log_store: Arc<S>,
/// WAL of the engine.
wal: Wal<S>,
/// Object store for manifest and SSTs.
object_store: ObjectStore,
/// Whether the worker thread is still running.