feat: Implement ProcedureStore (#927)

* test: Add more tests for ProcedureId

* feat: Add ObjectStore based state store

* feat: Implement ProcedureStore

* test: Add tests for ParsedKey

* refactor: Rename list to walk_top_down

* fix: Test ProcedureStore and handles unordered key values.

* style: Fix clippy

* docs: Update comment

* chore: Adjust log level for printing invalid key
This commit is contained in:
Yingwen
2023-02-02 17:49:31 +08:00
committed by GitHub
parent 54c7a8be02
commit 74adb077bc
7 changed files with 639 additions and 13 deletions

3
Cargo.lock generated
View File

@@ -1518,9 +1518,12 @@ dependencies = [
"common-error",
"common-runtime",
"common-telemetry",
"futures",
"object-store",
"serde",
"serde_json",
"snafu",
"tempdir",
"tokio",
"uuid",
]

View File

@@ -9,8 +9,13 @@ async-trait.workspace = true
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
futures.workspace = true
object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
tempdir = "0.3"

View File

@@ -45,6 +45,30 @@ pub enum Error {
procedure_id: ProcedureId,
backtrace: Backtrace,
},
#[snafu(display("Failed to put {}, source: {}", key, source))]
PutState {
key: String,
source: object_store::Error,
},
#[snafu(display("Failed to delete {}, source: {}", key, source))]
DeleteState {
key: String,
source: object_store::Error,
},
#[snafu(display("Failed to list {}, source: {}", path, source))]
ListState {
path: String,
source: object_store::Error,
},
#[snafu(display("Failed to read {}, source: {}", key, source))]
ReadState {
key: String,
source: object_store::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -53,7 +77,11 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::External { source } => source.status_code(),
Error::ToJson { .. } => StatusCode::Internal,
Error::ToJson { .. }
| Error::PutState { .. }
| Error::DeleteState { .. }
| Error::ListState { .. }
| Error::ReadState { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}

View File

@@ -16,6 +16,9 @@
pub mod error;
mod procedure;
// TODO(yingwen): Remove this attribute once ProcedureManager is implemented.
#[allow(dead_code)]
mod store;
pub use crate::error::{Error, Result};
pub use crate::procedure::{

View File

@@ -197,18 +197,6 @@ pub trait ProcedureManager: Send + Sync + 'static {
/// Ref-counted pointer to the [ProcedureManager].
pub type ProcedureManagerRef = Arc<dyn ProcedureManager>;
/// Serialized data of a procedure.
#[derive(Debug, Serialize, Deserialize)]
struct ProcedureMessage {
/// Type name of the procedure. The procedure framework also use the type name to
/// find a loader to load the procedure.
type_name: String,
/// The data of the procedure.
data: String,
/// Parent procedure id.
parent_id: Option<ProcedureId>,
}
#[cfg(test)]
mod tests {
use super::*;
@@ -252,5 +240,17 @@ mod tests {
let parsed = ProcedureId::parse_str(&uuid_str).unwrap();
assert_eq!(id, parsed);
let parsed = uuid_str.parse().unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_procedure_id_serialization() {
let id = ProcedureId::random();
let json = serde_json::to_string(&id).unwrap();
assert_eq!(format!("\"{id}\""), json);
let parsed = serde_json::from_str(&json).unwrap();
assert_eq!(id, parsed);
}
}

View File

@@ -0,0 +1,399 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt;
use common_telemetry::logging;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{Result, ToJsonSnafu};
use crate::store::state_store::StateStoreRef;
use crate::{BoxedProcedure, ProcedureId};
mod state_store;
/// Serialized data of a procedure.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct ProcedureMessage {
/// Type name of the procedure. The procedure framework also use the type name to
/// find a loader to load the procedure.
type_name: String,
/// The data of the procedure.
data: String,
/// Parent procedure id.
parent_id: Option<ProcedureId>,
}
/// Procedure storage layer.
#[derive(Clone)]
struct ProcedureStore(StateStoreRef);
impl ProcedureStore {
/// Dump the `procedure` to the storage.
async fn store_procedure(
&self,
procedure_id: ProcedureId,
step: u32,
procedure: &BoxedProcedure,
parent_id: Option<ProcedureId>,
) -> Result<()> {
let type_name = procedure.type_name();
let data = procedure.dump()?;
let message = ProcedureMessage {
type_name: type_name.to_string(),
data,
parent_id,
};
let key = ParsedKey {
procedure_id,
step,
is_committed: false,
}
.to_string();
let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
self.0.put(&key, value.into_bytes()).await?;
Ok(())
}
/// Write commit flag to the storage.
async fn commit_procedure(&self, procedure_id: ProcedureId, step: u32) -> Result<()> {
let key = ParsedKey {
procedure_id,
step,
is_committed: true,
}
.to_string();
self.0.put(&key, Vec::new()).await?;
Ok(())
}
/// Load uncommitted procedures from the storage.
async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
let mut messages = HashMap::new();
// Track the key-value pair by procedure id.
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
// Scan all procedures.
let mut key_values = self.0.walk_top_down("/").await?;
while let Some((key, value)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
continue;
};
if let Some(entry) = procedure_key_values.get_mut(&curr_key.procedure_id) {
if entry.0.step < curr_key.step {
entry.0 = curr_key;
entry.1 = value;
}
} else {
procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
}
}
for (procedure_id, (parsed_key, value)) in procedure_key_values {
if !parsed_key.is_committed {
let Some(message) = self.load_one_message(&parsed_key, &value) else {
// We don't abort the loading process and just ignore errors to ensure all remaining
// procedures are loaded.
continue;
};
messages.insert(procedure_id, message);
}
}
Ok(messages)
}
fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
serde_json::from_slice(value)
.map_err(|e| {
// `e` doesn't impl ErrorExt so we print it as normal error.
logging::error!("Failed to parse value, key: {:?}, source: {}", key, e);
e
})
.ok()
}
}
/// Key to refer the procedure in the [ProcedureStore].
#[derive(Debug, PartialEq, Eq)]
struct ParsedKey {
procedure_id: ProcedureId,
step: u32,
is_committed: bool,
}
impl fmt::Display for ParsedKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{:010}.{}",
self.procedure_id,
self.step,
if self.is_committed { "commit" } else { "step" }
)
}
}
impl ParsedKey {
/// Try to parse the key from specific `input`.
fn parse_str(input: &str) -> Option<ParsedKey> {
let mut iter = input.rsplit('/');
let name = iter.next()?;
let id_str = iter.next()?;
let procedure_id = ProcedureId::parse_str(id_str).ok()?;
let mut parts = name.split('.');
let step_str = parts.next()?;
let suffix = parts.next()?;
let is_committed = match suffix {
"commit" => true,
"step" => false,
_ => return None,
};
let step = step_str.parse().ok()?;
Some(ParsedKey {
procedure_id,
step,
is_committed,
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use async_trait::async_trait;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use tempdir::TempDir;
use super::*;
use crate::store::state_store::ObjectStateStore;
use crate::{Context, LockKey, Procedure, Status};
#[test]
fn test_parsed_key() {
let procedure_id = ProcedureId::random();
let key = ParsedKey {
procedure_id,
step: 2,
is_committed: false,
};
assert_eq!(format!("{procedure_id}/0000000002.step"), key.to_string());
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
let key = ParsedKey {
procedure_id,
step: 2,
is_committed: true,
};
assert_eq!(format!("{procedure_id}/0000000002.commit"), key.to_string());
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
}
#[test]
fn test_parse_invalid_key() {
assert!(ParsedKey::parse_str("").is_none());
let procedure_id = ProcedureId::random();
let input = format!("{procedure_id}");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/0000000003");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/0000000003.");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/0000000003.other");
assert!(ParsedKey::parse_str(&input).is_none());
assert!(ParsedKey::parse_str("12345/0000000003.step").is_none());
let input = format!("{procedure_id}-0000000003.commit");
assert!(ParsedKey::parse_str(&input).is_none());
}
#[test]
fn test_procedure_message() {
let mut message = ProcedureMessage {
type_name: "TestMessage".to_string(),
data: "no parent id".to_string(),
parent_id: None,
};
let json = serde_json::to_string(&message).unwrap();
assert_eq!(
json,
r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null}"#
);
let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap();
message.parent_id = Some(procedure_id);
let json = serde_json::to_string(&message).unwrap();
assert_eq!(
json,
r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1"}"#
);
}
struct MockProcedure {
data: String,
}
impl MockProcedure {
fn new(data: impl Into<String>) -> MockProcedure {
MockProcedure { data: data.into() }
}
}
#[async_trait]
impl Procedure for MockProcedure {
fn type_name(&self) -> &str {
"MockProcedure"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
unimplemented!()
}
fn dump(&self) -> Result<String> {
Ok(self.data.clone())
}
fn lock_key(&self) -> Option<LockKey> {
None
}
}
fn new_procedure_store(dir: &TempDir) -> ProcedureStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let state_store = ObjectStateStore::new(object_store);
ProcedureStore(Arc::new(state_store))
}
#[tokio::test]
async fn test_store_procedure() {
let dir = TempDir::new("store_procedure").unwrap();
let store = new_procedure_store(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
store
.store_procedure(procedure_id, 0, &procedure, None)
.await
.unwrap();
let messages = store.load_messages().await.unwrap();
assert_eq!(1, messages.len());
let msg = messages.get(&procedure_id).unwrap();
let expect = ProcedureMessage {
type_name: "MockProcedure".to_string(),
data: "test store procedure".to_string(),
parent_id: None,
};
assert_eq!(expect, *msg);
}
#[tokio::test]
async fn test_commit_procedure() {
let dir = TempDir::new("store_procedure").unwrap();
let store = new_procedure_store(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
store
.store_procedure(procedure_id, 0, &procedure, None)
.await
.unwrap();
store.commit_procedure(procedure_id, 1).await.unwrap();
let messages = store.load_messages().await.unwrap();
assert!(messages.is_empty());
}
#[tokio::test]
async fn test_load_messages() {
let dir = TempDir::new("store_procedure").unwrap();
let store = new_procedure_store(&dir);
// store 3 steps
let id0 = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0"));
store
.store_procedure(id0, 0, &procedure, None)
.await
.unwrap();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1"));
store
.store_procedure(id0, 1, &procedure, None)
.await
.unwrap();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2"));
store
.store_procedure(id0, 2, &procedure, None)
.await
.unwrap();
// store 2 steps and then commit
let id1 = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0"));
store
.store_procedure(id1, 0, &procedure, None)
.await
.unwrap();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1"));
store
.store_procedure(id1, 1, &procedure, None)
.await
.unwrap();
store.commit_procedure(id1, 2).await.unwrap();
// store 1 step
let id2 = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0"));
store
.store_procedure(id2, 0, &procedure, None)
.await
.unwrap();
let messages = store.load_messages().await.unwrap();
assert_eq!(2, messages.len());
let msg = messages.get(&id0).unwrap();
assert_eq!("id0-2", msg.data);
let msg = messages.get(&id2).unwrap();
assert_eq!("id2-0", msg.data);
}
}

View File

@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use futures::{Stream, TryStreamExt};
use object_store::{ObjectMode, ObjectStore};
use snafu::ResultExt;
use crate::error::{
DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, ReadStateSnafu, Result,
};
/// Key value from state store.
type KeyValue = (String, Vec<u8>);
/// Stream that yields [KeyValue].
type KeyValueStream = Pin<Box<dyn Stream<Item = Result<KeyValue>> + Send>>;
/// Storage layer for persisting procedure's state.
#[async_trait]
pub(crate) trait StateStore: Send + Sync {
/// Puts `key` and `value` into the store.
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()>;
/// Returns the key-value pairs under `path` in top down way.
///
/// # Note
/// - There is no guarantee about the order of the keys in the stream.
/// - The `path` must ends with `/`.
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream>;
/// Deletes key-value pairs by `keys`.
async fn delete(&self, keys: &[String]) -> Result<()>;
}
/// Reference counted pointer to [StateStore].
pub(crate) type StateStoreRef = Arc<dyn StateStore>;
/// [StateStore] based on [ObjectStore].
#[derive(Debug)]
pub(crate) struct ObjectStateStore {
store: ObjectStore,
}
impl ObjectStateStore {
/// Returns a new [ObjectStateStore] with specific `store`.
pub(crate) fn new(store: ObjectStore) -> ObjectStateStore {
ObjectStateStore { store }
}
}
#[async_trait]
impl StateStore for ObjectStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
let object = self.store.object(key);
object.write(value).await.context(PutStateSnafu { key })
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let path_string = path.to_string();
let op = self.store.batch();
// Note that there is no guarantee about the order between files and dirs
// at the same level.
// See https://docs.rs/opendal/0.25.2/opendal/raw/struct.TopDownWalker.html#note
let stream = op
.walk_top_down(path)
.context(ListStateSnafu { path })?
.map_err(move |e| Error::ListState {
path: path_string.clone(),
source: e,
})
.try_filter_map(|entry| async move {
let key = entry.path();
let key_value = match entry.mode().await.context(ReadStateSnafu { key })? {
ObjectMode::FILE => {
let value = entry.read().await.context(ReadStateSnafu { key })?;
Some((key.to_string(), value))
}
ObjectMode::DIR | ObjectMode::Unknown => None,
};
Ok(key_value)
});
Ok(Box::pin(stream))
}
async fn delete(&self, keys: &[String]) -> Result<()> {
for key in keys {
let object = self.store.object(key);
object.delete().await.context(DeleteStateSnafu { key })?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use object_store::services::fs::Builder;
use tempdir::TempDir;
use super::*;
#[tokio::test]
async fn test_object_state_store() {
let dir = TempDir::new("state_store").unwrap();
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let state_store = ObjectStateStore::new(object_store);
let data: Vec<_> = state_store
.walk_top_down("/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert!(data.is_empty());
state_store.put("a/1", b"v1".to_vec()).await.unwrap();
state_store.put("a/2", b"v2".to_vec()).await.unwrap();
state_store.put("b/1", b"v3".to_vec()).await.unwrap();
let mut data: Vec<_> = state_store
.walk_top_down("/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
("b/1".to_string(), b"v3".to_vec())
],
data
);
let mut data: Vec<_> = state_store
.walk_top_down("a/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
],
data
);
state_store
.delete(&["a/2".to_string(), "b/1".to_string()])
.await
.unwrap();
let mut data: Vec<_> = state_store
.walk_top_down("a/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data);
}
}