diff --git a/Cargo.lock b/Cargo.lock index 21eaf6c62e..2b9b8f6384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1518,9 +1518,12 @@ dependencies = [ "common-error", "common-runtime", "common-telemetry", + "futures", + "object-store", "serde", "serde_json", "snafu", + "tempdir", "tokio", "uuid", ] diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 106e81d940..23f40ce307 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -9,8 +9,13 @@ async-trait.workspace = true common-error = { path = "../error" } common-runtime = { path = "../runtime" } common-telemetry = { path = "../telemetry" } +futures.workspace = true +object-store = { path = "../../object-store" } serde.workspace = true serde_json = "1.0" snafu.workspace = true tokio.workspace = true uuid.workspace = true + +[dev-dependencies] +tempdir = "0.3" diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 4fbfec5f51..0bea4becec 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -45,6 +45,30 @@ pub enum Error { procedure_id: ProcedureId, backtrace: Backtrace, }, + + #[snafu(display("Failed to put {}, source: {}", key, source))] + PutState { + key: String, + source: object_store::Error, + }, + + #[snafu(display("Failed to delete {}, source: {}", key, source))] + DeleteState { + key: String, + source: object_store::Error, + }, + + #[snafu(display("Failed to list {}, source: {}", path, source))] + ListState { + path: String, + source: object_store::Error, + }, + + #[snafu(display("Failed to read {}, source: {}", key, source))] + ReadState { + key: String, + source: object_store::Error, + }, } pub type Result = std::result::Result; @@ -53,7 +77,11 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::External { source } => source.status_code(), - Error::ToJson { .. } => StatusCode::Internal, + Error::ToJson { .. } + | Error::PutState { .. } + | Error::DeleteState { .. } + | Error::ListState { .. } + | Error::ReadState { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 6ff0888759..a8d1628bb0 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -16,6 +16,9 @@ pub mod error; mod procedure; +// TODO(yingwen): Remove this attribute once ProcedureManager is implemented. +#[allow(dead_code)] +mod store; pub use crate::error::{Error, Result}; pub use crate::procedure::{ diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 150a67201b..0c9de92142 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -197,18 +197,6 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Ref-counted pointer to the [ProcedureManager]. pub type ProcedureManagerRef = Arc; -/// Serialized data of a procedure. -#[derive(Debug, Serialize, Deserialize)] -struct ProcedureMessage { - /// Type name of the procedure. The procedure framework also use the type name to - /// find a loader to load the procedure. - type_name: String, - /// The data of the procedure. - data: String, - /// Parent procedure id. - parent_id: Option, -} - #[cfg(test)] mod tests { use super::*; @@ -252,5 +240,17 @@ mod tests { let parsed = ProcedureId::parse_str(&uuid_str).unwrap(); assert_eq!(id, parsed); + let parsed = uuid_str.parse().unwrap(); + assert_eq!(id, parsed); + } + + #[test] + fn test_procedure_id_serialization() { + let id = ProcedureId::random(); + let json = serde_json::to_string(&id).unwrap(); + assert_eq!(format!("\"{id}\""), json); + + let parsed = serde_json::from_str(&json).unwrap(); + assert_eq!(id, parsed); } } diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs new file mode 100644 index 0000000000..0f306d9195 --- /dev/null +++ b/src/common/procedure/src/store.rs @@ -0,0 +1,399 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt; + +use common_telemetry::logging; +use futures::TryStreamExt; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{Result, ToJsonSnafu}; +use crate::store::state_store::StateStoreRef; +use crate::{BoxedProcedure, ProcedureId}; + +mod state_store; + +/// Serialized data of a procedure. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +struct ProcedureMessage { + /// Type name of the procedure. The procedure framework also use the type name to + /// find a loader to load the procedure. + type_name: String, + /// The data of the procedure. + data: String, + /// Parent procedure id. + parent_id: Option, +} + +/// Procedure storage layer. +#[derive(Clone)] +struct ProcedureStore(StateStoreRef); + +impl ProcedureStore { + /// Dump the `procedure` to the storage. + async fn store_procedure( + &self, + procedure_id: ProcedureId, + step: u32, + procedure: &BoxedProcedure, + parent_id: Option, + ) -> Result<()> { + let type_name = procedure.type_name(); + let data = procedure.dump()?; + + let message = ProcedureMessage { + type_name: type_name.to_string(), + data, + parent_id, + }; + let key = ParsedKey { + procedure_id, + step, + is_committed: false, + } + .to_string(); + let value = serde_json::to_string(&message).context(ToJsonSnafu)?; + + self.0.put(&key, value.into_bytes()).await?; + + Ok(()) + } + + /// Write commit flag to the storage. + async fn commit_procedure(&self, procedure_id: ProcedureId, step: u32) -> Result<()> { + let key = ParsedKey { + procedure_id, + step, + is_committed: true, + } + .to_string(); + self.0.put(&key, Vec::new()).await?; + + Ok(()) + } + + /// Load uncommitted procedures from the storage. + async fn load_messages(&self) -> Result> { + let mut messages = HashMap::new(); + // Track the key-value pair by procedure id. + let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); + + // Scan all procedures. + let mut key_values = self.0.walk_top_down("/").await?; + while let Some((key, value)) = key_values.try_next().await? { + let Some(curr_key) = ParsedKey::parse_str(&key) else { + logging::warn!("Unknown key while loading procedures, key: {}", key); + continue; + }; + + if let Some(entry) = procedure_key_values.get_mut(&curr_key.procedure_id) { + if entry.0.step < curr_key.step { + entry.0 = curr_key; + entry.1 = value; + } + } else { + procedure_key_values.insert(curr_key.procedure_id, (curr_key, value)); + } + } + + for (procedure_id, (parsed_key, value)) in procedure_key_values { + if !parsed_key.is_committed { + let Some(message) = self.load_one_message(&parsed_key, &value) else { + // We don't abort the loading process and just ignore errors to ensure all remaining + // procedures are loaded. + continue; + }; + messages.insert(procedure_id, message); + } + } + + Ok(messages) + } + + fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option { + serde_json::from_slice(value) + .map_err(|e| { + // `e` doesn't impl ErrorExt so we print it as normal error. + logging::error!("Failed to parse value, key: {:?}, source: {}", key, e); + e + }) + .ok() + } +} + +/// Key to refer the procedure in the [ProcedureStore]. +#[derive(Debug, PartialEq, Eq)] +struct ParsedKey { + procedure_id: ProcedureId, + step: u32, + is_committed: bool, +} + +impl fmt::Display for ParsedKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}/{:010}.{}", + self.procedure_id, + self.step, + if self.is_committed { "commit" } else { "step" } + ) + } +} + +impl ParsedKey { + /// Try to parse the key from specific `input`. + fn parse_str(input: &str) -> Option { + let mut iter = input.rsplit('/'); + let name = iter.next()?; + let id_str = iter.next()?; + + let procedure_id = ProcedureId::parse_str(id_str).ok()?; + + let mut parts = name.split('.'); + let step_str = parts.next()?; + let suffix = parts.next()?; + let is_committed = match suffix { + "commit" => true, + "step" => false, + _ => return None, + }; + let step = step_str.parse().ok()?; + + Some(ParsedKey { + procedure_id, + step, + is_committed, + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use async_trait::async_trait; + use object_store::services::fs::Builder; + use object_store::ObjectStore; + use tempdir::TempDir; + + use super::*; + use crate::store::state_store::ObjectStateStore; + use crate::{Context, LockKey, Procedure, Status}; + + #[test] + fn test_parsed_key() { + let procedure_id = ProcedureId::random(); + let key = ParsedKey { + procedure_id, + step: 2, + is_committed: false, + }; + assert_eq!(format!("{procedure_id}/0000000002.step"), key.to_string()); + assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); + + let key = ParsedKey { + procedure_id, + step: 2, + is_committed: true, + }; + assert_eq!(format!("{procedure_id}/0000000002.commit"), key.to_string()); + assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); + } + + #[test] + fn test_parse_invalid_key() { + assert!(ParsedKey::parse_str("").is_none()); + + let procedure_id = ProcedureId::random(); + let input = format!("{procedure_id}"); + assert!(ParsedKey::parse_str(&input).is_none()); + + let input = format!("{procedure_id}/"); + assert!(ParsedKey::parse_str(&input).is_none()); + + let input = format!("{procedure_id}/0000000003"); + assert!(ParsedKey::parse_str(&input).is_none()); + + let input = format!("{procedure_id}/0000000003."); + assert!(ParsedKey::parse_str(&input).is_none()); + + let input = format!("{procedure_id}/0000000003.other"); + assert!(ParsedKey::parse_str(&input).is_none()); + + assert!(ParsedKey::parse_str("12345/0000000003.step").is_none()); + + let input = format!("{procedure_id}-0000000003.commit"); + assert!(ParsedKey::parse_str(&input).is_none()); + } + + #[test] + fn test_procedure_message() { + let mut message = ProcedureMessage { + type_name: "TestMessage".to_string(), + data: "no parent id".to_string(), + parent_id: None, + }; + + let json = serde_json::to_string(&message).unwrap(); + assert_eq!( + json, + r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null}"# + ); + + let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap(); + message.parent_id = Some(procedure_id); + let json = serde_json::to_string(&message).unwrap(); + assert_eq!( + json, + r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1"}"# + ); + } + + struct MockProcedure { + data: String, + } + + impl MockProcedure { + fn new(data: impl Into) -> MockProcedure { + MockProcedure { data: data.into() } + } + } + + #[async_trait] + impl Procedure for MockProcedure { + fn type_name(&self) -> &str { + "MockProcedure" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + unimplemented!() + } + + fn dump(&self) -> Result { + Ok(self.data.clone()) + } + + fn lock_key(&self) -> Option { + None + } + } + + fn new_procedure_store(dir: &TempDir) -> ProcedureStore { + let store_dir = dir.path().to_str().unwrap(); + let accessor = Builder::default().root(store_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor); + let state_store = ObjectStateStore::new(object_store); + + ProcedureStore(Arc::new(state_store)) + } + + #[tokio::test] + async fn test_store_procedure() { + let dir = TempDir::new("store_procedure").unwrap(); + let store = new_procedure_store(&dir); + + let procedure_id = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); + + store + .store_procedure(procedure_id, 0, &procedure, None) + .await + .unwrap(); + + let messages = store.load_messages().await.unwrap(); + assert_eq!(1, messages.len()); + let msg = messages.get(&procedure_id).unwrap(); + let expect = ProcedureMessage { + type_name: "MockProcedure".to_string(), + data: "test store procedure".to_string(), + parent_id: None, + }; + assert_eq!(expect, *msg); + } + + #[tokio::test] + async fn test_commit_procedure() { + let dir = TempDir::new("store_procedure").unwrap(); + let store = new_procedure_store(&dir); + + let procedure_id = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); + + store + .store_procedure(procedure_id, 0, &procedure, None) + .await + .unwrap(); + store.commit_procedure(procedure_id, 1).await.unwrap(); + + let messages = store.load_messages().await.unwrap(); + assert!(messages.is_empty()); + } + + #[tokio::test] + async fn test_load_messages() { + let dir = TempDir::new("store_procedure").unwrap(); + let store = new_procedure_store(&dir); + + // store 3 steps + let id0 = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0")); + store + .store_procedure(id0, 0, &procedure, None) + .await + .unwrap(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1")); + store + .store_procedure(id0, 1, &procedure, None) + .await + .unwrap(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2")); + store + .store_procedure(id0, 2, &procedure, None) + .await + .unwrap(); + + // store 2 steps and then commit + let id1 = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0")); + store + .store_procedure(id1, 0, &procedure, None) + .await + .unwrap(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1")); + store + .store_procedure(id1, 1, &procedure, None) + .await + .unwrap(); + store.commit_procedure(id1, 2).await.unwrap(); + + // store 1 step + let id2 = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0")); + store + .store_procedure(id2, 0, &procedure, None) + .await + .unwrap(); + + let messages = store.load_messages().await.unwrap(); + assert_eq!(2, messages.len()); + + let msg = messages.get(&id0).unwrap(); + assert_eq!("id0-2", msg.data); + let msg = messages.get(&id2).unwrap(); + assert_eq!("id2-0", msg.data); + } +} diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs new file mode 100644 index 0000000000..052d26a45a --- /dev/null +++ b/src/common/procedure/src/store/state_store.rs @@ -0,0 +1,188 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{Stream, TryStreamExt}; +use object_store::{ObjectMode, ObjectStore}; +use snafu::ResultExt; + +use crate::error::{ + DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, ReadStateSnafu, Result, +}; + +/// Key value from state store. +type KeyValue = (String, Vec); + +/// Stream that yields [KeyValue]. +type KeyValueStream = Pin> + Send>>; + +/// Storage layer for persisting procedure's state. +#[async_trait] +pub(crate) trait StateStore: Send + Sync { + /// Puts `key` and `value` into the store. + async fn put(&self, key: &str, value: Vec) -> Result<()>; + + /// Returns the key-value pairs under `path` in top down way. + /// + /// # Note + /// - There is no guarantee about the order of the keys in the stream. + /// - The `path` must ends with `/`. + async fn walk_top_down(&self, path: &str) -> Result; + + /// Deletes key-value pairs by `keys`. + async fn delete(&self, keys: &[String]) -> Result<()>; +} + +/// Reference counted pointer to [StateStore]. +pub(crate) type StateStoreRef = Arc; + +/// [StateStore] based on [ObjectStore]. +#[derive(Debug)] +pub(crate) struct ObjectStateStore { + store: ObjectStore, +} + +impl ObjectStateStore { + /// Returns a new [ObjectStateStore] with specific `store`. + pub(crate) fn new(store: ObjectStore) -> ObjectStateStore { + ObjectStateStore { store } + } +} + +#[async_trait] +impl StateStore for ObjectStateStore { + async fn put(&self, key: &str, value: Vec) -> Result<()> { + let object = self.store.object(key); + object.write(value).await.context(PutStateSnafu { key }) + } + + async fn walk_top_down(&self, path: &str) -> Result { + let path_string = path.to_string(); + let op = self.store.batch(); + // Note that there is no guarantee about the order between files and dirs + // at the same level. + // See https://docs.rs/opendal/0.25.2/opendal/raw/struct.TopDownWalker.html#note + let stream = op + .walk_top_down(path) + .context(ListStateSnafu { path })? + .map_err(move |e| Error::ListState { + path: path_string.clone(), + source: e, + }) + .try_filter_map(|entry| async move { + let key = entry.path(); + let key_value = match entry.mode().await.context(ReadStateSnafu { key })? { + ObjectMode::FILE => { + let value = entry.read().await.context(ReadStateSnafu { key })?; + + Some((key.to_string(), value)) + } + ObjectMode::DIR | ObjectMode::Unknown => None, + }; + + Ok(key_value) + }); + + Ok(Box::pin(stream)) + } + + async fn delete(&self, keys: &[String]) -> Result<()> { + for key in keys { + let object = self.store.object(key); + object.delete().await.context(DeleteStateSnafu { key })?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use object_store::services::fs::Builder; + use tempdir::TempDir; + + use super::*; + + #[tokio::test] + async fn test_object_state_store() { + let dir = TempDir::new("state_store").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let accessor = Builder::default().root(store_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor); + let state_store = ObjectStateStore::new(object_store); + + let data: Vec<_> = state_store + .walk_top_down("/") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert!(data.is_empty()); + + state_store.put("a/1", b"v1".to_vec()).await.unwrap(); + state_store.put("a/2", b"v2".to_vec()).await.unwrap(); + state_store.put("b/1", b"v3".to_vec()).await.unwrap(); + + let mut data: Vec<_> = state_store + .walk_top_down("/") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + assert_eq!( + vec![ + ("a/1".to_string(), b"v1".to_vec()), + ("a/2".to_string(), b"v2".to_vec()), + ("b/1".to_string(), b"v3".to_vec()) + ], + data + ); + + let mut data: Vec<_> = state_store + .walk_top_down("a/") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + assert_eq!( + vec![ + ("a/1".to_string(), b"v1".to_vec()), + ("a/2".to_string(), b"v2".to_vec()), + ], + data + ); + + state_store + .delete(&["a/2".to_string(), "b/1".to_string()]) + .await + .unwrap(); + let mut data: Vec<_> = state_store + .walk_top_down("a/") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data); + } +}