diff --git a/Cargo.lock b/Cargo.lock index f2fcbd6f9a..77a91da067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1517,6 +1517,21 @@ dependencies = [ "table", ] +[[package]] +name = "common-procedure" +version = "0.1.0" +dependencies = [ + "async-trait", + "common-error", + "common-runtime", + "common-telemetry", + "serde", + "serde_json", + "snafu", + "tokio", + "uuid", +] + [[package]] name = "common-query" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d97faffda6..8eb6855006 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "src/common/function-macro", "src/common/grpc", "src/common/grpc-expr", + "src/common/procedure", "src/common/query", "src/common/recordbatch", "src/common/runtime", @@ -67,6 +68,7 @@ snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.28" tokio = { version = "1.24.2", features = ["full"] } tonic = "0.8" +uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } [profile.release] debug = true diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml index e29203c07d..e7c6abfc1a 100644 --- a/src/common/error/Cargo.toml +++ b/src/common/error/Cargo.toml @@ -5,5 +5,5 @@ edition.workspace = true license.workspace = true [dependencies] -strum = { version = "0.24", features = ["std", "derive"] } snafu = { version = "0.7", features = ["backtraces"] } +strum = { version = "0.24", features = ["std", "derive"] } diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml new file mode 100644 index 0000000000..106e81d940 --- /dev/null +++ b/src/common/procedure/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "common-procedure" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +common-error = { path = "../error" } +common-runtime = { path = "../runtime" } +common-telemetry = { path = "../telemetry" } +serde.workspace = true +serde_json = "1.0" +snafu.workspace = true +tokio.workspace = true +uuid.workspace = true diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs new file mode 100644 index 0000000000..4fbfec5f51 --- /dev/null +++ b/src/common/procedure/src/error.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::prelude::*; + +use crate::procedure::ProcedureId; + +/// Procedure error. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display( + "Failed to execute procedure due to external error, source: {}", + source + ))] + External { + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Loader {} is already registered", name))] + LoaderConflict { name: String, backtrace: Backtrace }, + + #[snafu(display("Failed to serialize to json, source: {}", source))] + ToJson { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Procedure {} already exists", procedure_id))] + DuplicateProcedure { + procedure_id: ProcedureId, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::External { source } => source.status_code(), + Error::ToJson { .. } => StatusCode::Internal, + Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { + StatusCode::InvalidArguments + } + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl Error { + /// Creates a new [Error::External] error from source `err`. + pub fn external(err: E) -> Error { + Error::External { + source: BoxedError::new(err), + } + } +} diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs new file mode 100644 index 0000000000..6ff0888759 --- /dev/null +++ b/src/common/procedure/src/lib.rs @@ -0,0 +1,24 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Common traits and structures for the procedure framework. + +pub mod error; +mod procedure; + +pub use crate::error::{Error, Result}; +pub use crate::procedure::{ + BoxedProcedure, Context, LockKey, Procedure, ProcedureId, ProcedureManager, + ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, +}; diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs new file mode 100644 index 0000000000..150a67201b --- /dev/null +++ b/src/common/procedure/src/procedure.rs @@ -0,0 +1,256 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::str::FromStr; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use uuid::Uuid; + +use crate::error::Result; + +/// Procedure execution status. +pub enum Status { + /// The procedure is still executing. + Executing { + /// Whether the framework needs to persist the procedure. + persist: bool, + }, + /// The procedure has suspended itself and is waiting for subprocedures. + Suspended { + subprocedures: Vec, + /// Whether the framework needs to persist the procedure. + persist: bool, + }, + /// the procedure is done. + Done, +} + +impl Status { + /// Returns a [Status::Executing] with given `persist` flag. + pub fn executing(persist: bool) -> Status { + Status::Executing { persist } + } + + /// Returns `true` if the procedure needs the framework to persist its intermediate state. + pub fn need_persist(&self) -> bool { + // If the procedure is done, the framework doesn't need to persist the procedure + // anymore. It only needs to mark the procedure as committed. + match self { + Status::Executing { persist } | Status::Suspended { persist, .. } => *persist, + Status::Done => false, + } + } +} + +/// Procedure execution context. +#[derive(Debug)] +pub struct Context { + /// Id of the procedure. + pub procedure_id: ProcedureId, +} + +/// A `Procedure` represents an operation or a set of operations to be performed step-by-step. +#[async_trait] +pub trait Procedure: Send + Sync { + /// Type name of the procedure. + fn type_name(&self) -> &str; + + /// Execute the procedure. + /// + /// The implementation must be idempotent. + async fn execute(&mut self, ctx: &Context) -> Result; + + /// Dump the state of the procedure to a string. + fn dump(&self) -> Result; + + /// Returns the [LockKey] if this procedure needs to acquire lock. + fn lock_key(&self) -> Option; +} + +/// A key to identify the lock. +// We might hold multiple keys in this struct. When there are multiple keys, we need to sort the +// keys lock all the keys in order to avoid dead lock. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LockKey(String); + +impl LockKey { + /// Returns a new [LockKey]. + pub fn new(key: impl Into) -> LockKey { + LockKey(key.into()) + } + + /// Returns the lock key. + pub fn key(&self) -> &str { + &self.0 + } +} + +/// Boxed [Procedure]. +pub type BoxedProcedure = Box; + +/// A procedure with specific id. +pub struct ProcedureWithId { + /// Id of the procedure. + pub id: ProcedureId, + pub procedure: BoxedProcedure, +} + +impl ProcedureWithId { + /// Returns a new [ProcedureWithId] that holds specific `procedure` + /// and a random [ProcedureId]. + pub fn with_random_id(procedure: BoxedProcedure) -> ProcedureWithId { + ProcedureWithId { + id: ProcedureId::random(), + procedure, + } + } +} + +#[derive(Debug, Snafu)] +pub struct ParseIdError { + source: uuid::Error, +} + +/// Unique id for [Procedure]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ProcedureId(Uuid); + +impl ProcedureId { + /// Returns a new unique [ProcedureId] randomly. + pub fn random() -> ProcedureId { + ProcedureId(Uuid::new_v4()) + } + + /// Parses id from string. + pub fn parse_str(input: &str) -> std::result::Result { + Uuid::parse_str(input) + .map(ProcedureId) + .context(ParseIdSnafu) + } +} + +impl fmt::Display for ProcedureId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl FromStr for ProcedureId { + type Err = ParseIdError; + + fn from_str(s: &str) -> std::result::Result { + ProcedureId::parse_str(s) + } +} + +/// Loader to recover the [Procedure] instance from serialized data. +pub type BoxedProcedureLoader = Box Result + Send>; + +// TODO(yingwen): Find a way to return the error message if the procedure is failed. +/// State of a submitted procedure. +#[derive(Debug)] +pub enum ProcedureState { + /// The procedure is running. + Running, + /// The procedure is finished. + Done, + /// The procedure is failed and cannot proceed anymore. + Failed, +} + +// TODO(yingwen): Shutdown +/// `ProcedureManager` executes [Procedure] submitted to it. +#[async_trait] +pub trait ProcedureManager: Send + Sync + 'static { + /// Registers loader for specific procedure type `name`. + fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>; + + /// Submits a procedure to execute. + async fn submit(&self, procedure: ProcedureWithId) -> Result<()>; + + /// Recovers unfinished procedures and reruns them. + /// + /// Callers should ensure all loaders are registered. + async fn recover(&self) -> Result<()>; + + /// Query the procedure state. + /// + /// Returns `Ok(None)` if the procedure doesn't exist. + async fn procedure_state(&self, procedure_id: ProcedureId) -> Result>; +} + +/// Ref-counted pointer to the [ProcedureManager]. +pub type ProcedureManagerRef = Arc; + +/// Serialized data of a procedure. +#[derive(Debug, Serialize, Deserialize)] +struct ProcedureMessage { + /// Type name of the procedure. The procedure framework also use the type name to + /// find a loader to load the procedure. + type_name: String, + /// The data of the procedure. + data: String, + /// Parent procedure id. + parent_id: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_status() { + let status = Status::Executing { persist: false }; + assert!(!status.need_persist()); + + let status = Status::Executing { persist: true }; + assert!(status.need_persist()); + + let status = Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }; + assert!(!status.need_persist()); + + let status = Status::Suspended { + subprocedures: Vec::new(), + persist: true, + }; + assert!(status.need_persist()); + + let status = Status::Done; + assert!(!status.need_persist()); + } + + #[test] + fn test_lock_key() { + let entity = "catalog.schema.my_table"; + let key = LockKey::new(entity); + assert_eq!(entity, key.key()); + } + + #[test] + fn test_procedure_id() { + let id = ProcedureId::random(); + let uuid_str = id.to_string(); + assert_eq!(id.0.to_string(), uuid_str); + + let parsed = ProcedureId::parse_str(&uuid_str).unwrap(); + assert_eq!(id, parsed); + } +} diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index bf03f7e501..37d78ea391 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -16,4 +16,4 @@ tokio.workspace = true anyhow = "1.0" common-telemetry = { path = "../common/telemetry" } tempdir = "0.3" -uuid = { version = "1", features = ["serde", "v4"] } +uuid.workspace = true diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 6cc5cb10c9..bdda0431a9 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -35,7 +35,7 @@ store-api = { path = "../store-api" } table = { path = "../table" } tokio.workspace = true tonic.workspace = true -uuid = { version = "1.1", features = ["v4"] } +uuid.workspace = true [dev-dependencies] atomic_float = "0.1" diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 5ce976e489..3e10363ad4 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -32,7 +32,7 @@ sql = { path = "../src/sql" } table = { path = "../src/table" } tempdir = "0.3" tokio.workspace = true -uuid = { version = "1", features = ["serde", "v4"] } +uuid.workspace = true [dev-dependencies] paste.workspace = true