feat: improve observability for procedure (#4675)

* feat: improve observability for procedure

* fix: test error

* test: add sqlness test for information_schema.procedure_info

* fix: sqlness test error

* fix: cr comment

* chore: update proto version

* fix: apply cr comment

* update version

* fix: cr comment

* optimize procedure type output format

* upgrade dep version

* fix: clippy error

* fix: `procedure` borrowed error

* fix: optimize code
This commit is contained in:
taobo
2024-09-20 14:07:53 +08:00
committed by GitHub
parent 75c6fad1a3
commit 0c9b8eb0d2
33 changed files with 662 additions and 20 deletions

4
Cargo.lock generated
View File

@@ -1379,6 +1379,7 @@ dependencies = [
"common-error",
"common-macro",
"common-meta",
"common-procedure",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -2179,6 +2180,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
"futures",
"futures-util",
"humantime-serde",
@@ -4394,7 +4396,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=973f49cde88a582fb65755cc572ebcf6fb93ccf7#973f49cde88a582fb65755cc572ebcf6fb93ccf7"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=796ce9b003c6689e853825f649e03543c81ede99#796ce9b003c6689e853825f649e03543c81ede99"
dependencies = [
"prost 0.12.6",
"serde",

View File

@@ -120,7 +120,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "973f49cde88a582fb65755cc572ebcf6fb93ccf7" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "796ce9b003c6689e853825f649e03543c81ede99" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -22,6 +22,7 @@ common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true

View File

@@ -82,6 +82,33 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to get procedure client in {mode} mode"))]
GetProcedureClient {
mode: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to list procedures"))]
ListProcedures {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Procedure id not found"))]
ProcedureIdNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("convert proto data error"))]
ConvertProtoData {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to re-compile script due to internal error"))]
CompileScriptInternal {
#[snafu(implicit)]
@@ -266,7 +293,9 @@ impl ErrorExt for Error {
| Error::FindRegionRoutes { .. }
| Error::CacheNotFound { .. }
| Error::CastManager { .. }
| Error::Json { .. } => StatusCode::Unexpected,
| Error::Json { .. }
| Error::GetProcedureClient { .. }
| Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected,
Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
@@ -283,7 +312,9 @@ impl ErrorExt for Error {
| Error::ListNodes { source, .. }
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. } => source.status_code(),
| Error::ListFlows { source, .. }
| Error::ListProcedures { source, .. }
| Error::ConvertProtoData { source, .. } => source.status_code(),
Error::CreateTable { source, .. } => source.status_code(),

View File

@@ -31,6 +31,7 @@ use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use meta_client::client::MetaClient;
@@ -69,6 +70,7 @@ pub struct KvBackendCatalogManager {
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
}
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
@@ -79,6 +81,7 @@ impl KvBackendCatalogManager {
meta_client: Option<Arc<MetaClient>>,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
mode,
@@ -106,6 +109,7 @@ impl KvBackendCatalogManager {
backend,
},
cache_registry,
procedure_manager,
})
}
@@ -132,6 +136,10 @@ impl KvBackendCatalogManager {
pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
self.procedure_manager.clone()
}
}
#[async_trait::async_trait]

View File

@@ -18,6 +18,7 @@ pub mod flows;
mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod region_peers;
mod runtime_metrics;
pub mod schemata;
@@ -188,6 +189,11 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_name.clone(),
self.flow_metadata_manager.clone(),
)) as _),
PROCEDURE_INFO => Some(
Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
self.catalog_manager.clone(),
)) as _,
),
_ => None,
}
}
@@ -250,7 +256,10 @@ impl InformationSchemaProvider {
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
tables.insert(
PROCEDURE_INFO.to_string(),
self.build_table(PROCEDURE_INFO).unwrap(),
);
// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).expect(name));

View File

@@ -0,0 +1,310 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Weak};
use api::v1::meta::{ProcedureMeta, ProcedureStatus};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID;
use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::timestamp::Timestamp;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, TimestampMillisecondVectorBuilder};
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use super::PROCEDURE_INFO;
use crate::error::{
ConvertProtoDataSnafu, CreateRecordBatchSnafu, GetProcedureClientSnafu, InternalSnafu,
ListProceduresSnafu, ProcedureIdNotFoundSnafu, Result,
};
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::system_schema::utils;
use crate::CatalogManager;
const PROCEDURE_ID: &str = "procedure_id";
const PROCEDURE_TYPE: &str = "procedure_type";
const START_TIME: &str = "start_time";
const END_TIME: &str = "end_time";
const STATUS: &str = "status";
const LOCK_KEYS: &str = "lock_keys";
const INIT_CAPACITY: usize = 42;
/// The `PROCEDURE_INFO` table provides information about the current procedure information of the cluster.
///
/// - `procedure_id`: the unique identifier of the procedure.
/// - `procedure_name`: the name of the procedure.
/// - `start_time`: the starting execution time of the procedure.
/// - `end_time`: the ending execution time of the procedure.
/// - `status`: the status of the procedure.
/// - `lock_keys`: the lock keys of the procedure.
///
pub(super) struct InformationSchemaProcedureInfo {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
}
impl InformationSchemaProcedureInfo {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_manager,
}
}
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(PROCEDURE_ID, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PROCEDURE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
START_TIME,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(
END_TIME,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(LOCK_KEYS, ConcreteDataType::string_datatype(), true),
]))
}
fn builder(&self) -> InformationSchemaProcedureInfoBuilder {
InformationSchemaProcedureInfoBuilder::new(
self.schema.clone(),
self.catalog_manager.clone(),
)
}
}
impl InformationTable for InformationSchemaProcedureInfo {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID
}
fn table_name(&self) -> &'static str {
PROCEDURE_INFO
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_procedure_info(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
struct InformationSchemaProcedureInfoBuilder {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
procedure_ids: StringVectorBuilder,
procedure_types: StringVectorBuilder,
start_times: TimestampMillisecondVectorBuilder,
end_times: TimestampMillisecondVectorBuilder,
statuses: StringVectorBuilder,
lock_keys: StringVectorBuilder,
}
impl InformationSchemaProcedureInfoBuilder {
fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema,
catalog_manager,
procedure_ids: StringVectorBuilder::with_capacity(INIT_CAPACITY),
procedure_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
end_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
lock_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
/// Construct the `information_schema.procedure_info` virtual table
async fn make_procedure_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let predicates = Predicates::from_scan_request(&request);
let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone);
match mode {
Mode::Standalone => {
if let Some(procedure_manager) = utils::procedure_manager(&self.catalog_manager)? {
let procedures = procedure_manager
.list_procedures()
.await
.map_err(BoxedError::new)
.context(ListProceduresSnafu)?;
for procedure in procedures {
self.add_procedure(
&predicates,
procedure.state.as_str_name().to_string(),
procedure,
);
}
} else {
return GetProcedureClientSnafu { mode: "standalone" }.fail();
}
}
Mode::Distributed => {
if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? {
let procedures = meta_client
.list_procedures(&ExecutorContext::default())
.await
.map_err(BoxedError::new)
.context(ListProceduresSnafu)?;
for procedure in procedures.procedures {
self.add_procedure_info(&predicates, procedure)?;
}
} else {
return GetProcedureClientSnafu {
mode: "distributed",
}
.fail();
}
}
};
self.finish()
}
fn add_procedure(
&mut self,
predicates: &Predicates,
status: String,
procedure_info: ProcedureInfo,
) {
let ProcedureInfo {
id,
type_name,
start_time_ms,
end_time_ms,
lock_keys,
..
} = procedure_info;
let pid = id.to_string();
let start_time = TimestampMillisecond(Timestamp::new_millisecond(start_time_ms));
let end_time = TimestampMillisecond(Timestamp::new_millisecond(end_time_ms));
let lock_keys = lock_keys.join(",");
let row = [
(PROCEDURE_ID, &Value::from(pid.clone())),
(PROCEDURE_TYPE, &Value::from(type_name.clone())),
(START_TIME, &Value::from(start_time)),
(END_TIME, &Value::from(end_time)),
(STATUS, &Value::from(status.clone())),
(LOCK_KEYS, &Value::from(lock_keys.clone())),
];
if !predicates.eval(&row) {
return;
}
self.procedure_ids.push(Some(&pid));
self.procedure_types.push(Some(&type_name));
self.start_times.push(Some(start_time));
self.end_times.push(Some(end_time));
self.statuses.push(Some(&status));
self.lock_keys.push(Some(&lock_keys));
}
fn add_procedure_info(
&mut self,
predicates: &Predicates,
procedure: ProcedureMeta,
) -> Result<()> {
let pid = match procedure.id {
Some(pid) => pid,
None => return ProcedureIdNotFoundSnafu {}.fail(),
};
let pid = procedure::pb_pid_to_pid(&pid)
.map_err(BoxedError::new)
.context(ConvertProtoDataSnafu)?;
let status = ProcedureStatus::try_from(procedure.status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown")
.to_string();
let procedure_info = ProcedureInfo {
id: pid,
type_name: procedure.type_name,
start_time_ms: procedure.start_time_ms,
end_time_ms: procedure.end_time_ms,
state: ProcedureState::Running,
lock_keys: procedure.lock_keys,
};
self.add_procedure(predicates, status, procedure_info);
Ok(())
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.procedure_ids.finish()),
Arc::new(self.procedure_types.finish()),
Arc::new(self.start_times.finish()),
Arc::new(self.end_times.finish()),
Arc::new(self.statuses.finish()),
Arc::new(self.lock_keys.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
impl DfPartitionStream for InformationSchemaProcedureInfo {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_procedure_info(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

View File

@@ -45,3 +45,4 @@ pub const TABLE_CONSTRAINTS: &str = "table_constraints";
pub const CLUSTER_INFO: &str = "cluster_info";
pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";

View File

@@ -18,6 +18,7 @@ use std::sync::{Arc, Weak};
use common_config::Mode;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureManagerRef;
use meta_client::client::MetaClient;
use snafu::OptionExt;
@@ -68,3 +69,17 @@ pub fn table_meta_manager(
.downcast_ref::<KvBackendCatalogManager>()
.map(|manager| manager.table_metadata_manager_ref().clone()))
}
/// Try to get the `[ProcedureManagerRef]` from `[CatalogManager]` weak reference.
pub fn procedure_manager(
catalog_manager: &Weak<dyn CatalogManager>,
) -> Result<Option<ProcedureManagerRef>> {
let catalog_manager = catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
Ok(catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.and_then(|manager| manager.procedure_manager()))
}

View File

@@ -327,6 +327,7 @@ mod tests {
None,
backend.clone(),
layered_cache_registry,
None,
);
let table_metadata_manager = TableMetadataManager::new(backend);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);

View File

@@ -280,6 +280,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
Some(meta_client.clone()),
cached_meta_backend.clone(),
layered_cache_registry,
None,
);
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(

View File

@@ -274,6 +274,7 @@ impl StartCommand {
Some(meta_client.clone()),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
);
let table_metadata_manager =

View File

@@ -320,6 +320,7 @@ impl StartCommand {
Some(meta_client.clone()),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
);
let executor = HandlerGroupExecutor::new(vec![

View File

@@ -482,6 +482,7 @@ impl StartCommand {
None,
kv_backend.clone(),
layered_cache_registry.clone(),
Some(procedure_manager.clone()),
);
let table_metadata_manager =

View File

@@ -98,6 +98,8 @@ pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
/// id for information_schema.FLOWS
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
/// id for information_schema.procedure_info
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
/// ----- End of information_schema tables -----
/// ----- Begin of pg_catalog tables -----

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionId, RegionNumber, TableId};
@@ -82,6 +83,8 @@ pub trait ProcedureExecutor: Send + Sync {
ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse>;
async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
}
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use common_procedure::{
watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
};
@@ -825,6 +826,15 @@ impl ProcedureExecutor for DdlManager {
Ok(procedure::procedure_state_to_pb_response(&state))
}
async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
let metas = self
.procedure_manager
.list_procedures()
.await
.context(QueryProcedureSnafu)?;
Ok(procedure::procedure_details_to_pb_response(metas))
}
}
#[cfg(test)]

View File

@@ -16,10 +16,11 @@ use std::time::Duration;
pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
use api::v1::meta::{
ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse,
ProcedureDetailResponse as PbProcedureDetailResponse, ProcedureId as PbProcedureId,
ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse,
ProcedureStatus as PbProcedureStatus,
};
use common_procedure::{ProcedureId, ProcedureState};
use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
use snafu::ResultExt;
use crate::error::{ParseProcedureIdSnafu, Result};
@@ -49,9 +50,9 @@ pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId {
}
}
/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`].
pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse {
let (status, error) = match state {
/// Cast the [`ProcedureState`] to protobuf [`PbProcedureStatus`].
pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus, String) {
match state {
ProcedureState::Running => (PbProcedureStatus::Running, String::default()),
ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.to_string()),
@@ -62,8 +63,12 @@ pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStat
ProcedureState::RollingBack { error } => {
(PbProcedureStatus::RollingBack, error.to_string())
}
};
}
}
/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`].
pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse {
let (status, error) = procedure_state_to_pb_state(state);
PbProcedureStateResponse {
status: status.into(),
error,
@@ -71,6 +76,28 @@ pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStat
}
}
pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedureDetailResponse {
let procedures = metas
.into_iter()
.map(|meta| {
let (status, error) = procedure_state_to_pb_state(&meta.state);
PbProcedureMeta {
id: Some(pid_to_pb_pid(meta.id)),
type_name: meta.type_name.to_string(),
status: status.into(),
start_time_ms: meta.start_time_ms,
end_time_ms: meta.end_time_ms,
lock_keys: meta.lock_keys,
error,
}
})
.collect();
PbProcedureDetailResponse {
procedures,
..Default::default()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -19,6 +19,7 @@ common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
futures.workspace = true
humantime-serde.workspace = true
object-store.workspace = true

View File

@@ -26,7 +26,7 @@ pub mod watcher;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
Procedure, ProcedureId, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId,
Status, StringKey,
Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, ProcedureState,
ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -16,7 +16,7 @@ mod runner;
mod rwlock;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
@@ -35,7 +35,7 @@ use crate::error::{
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState};
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
@@ -57,6 +57,8 @@ const META_TTL: Duration = Duration::from_secs(60 * 10);
pub(crate) struct ProcedureMeta {
/// Id of this procedure.
id: ProcedureId,
/// Type name of this procedure.
type_name: String,
/// Parent procedure id.
parent_id: Option<ProcedureId>,
/// Notify to wait for subprocedures.
@@ -69,6 +71,10 @@ pub(crate) struct ProcedureMeta {
state_receiver: Receiver<ProcedureState>,
/// Id of child procedures.
children: Mutex<Vec<ProcedureId>>,
/// Start execution time of this procedure.
start_time_ms: AtomicI64,
/// End execution time of this procedure.
end_time_ms: AtomicI64,
}
impl ProcedureMeta {
@@ -77,6 +83,7 @@ impl ProcedureMeta {
procedure_state: ProcedureState,
parent_id: Option<ProcedureId>,
lock_key: LockKey,
type_name: &str,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(procedure_state);
ProcedureMeta {
@@ -87,6 +94,9 @@ impl ProcedureMeta {
state_sender,
state_receiver,
children: Mutex::new(Vec::new()),
start_time_ms: AtomicI64::new(0),
end_time_ms: AtomicI64::new(0),
type_name: type_name.to_string(),
}
}
@@ -117,6 +127,18 @@ impl ProcedureMeta {
fn num_children(&self) -> usize {
self.children.lock().unwrap().len()
}
/// update the start time of the procedure.
fn set_start_time_ms(&self) {
self.start_time_ms
.store(common_time::util::current_time_millis(), Ordering::Relaxed);
}
/// update the end time of the procedure.
fn set_end_time_ms(&self) {
self.end_time_ms
.store(common_time::util::current_time_millis(), Ordering::Relaxed);
}
}
/// Reference counted pointer to [ProcedureMeta].
@@ -210,6 +232,22 @@ impl ManagerContext {
procedures.get(&procedure_id).map(|meta| meta.state())
}
/// Returns the [ProcedureMeta] of all procedures.
fn list_procedure(&self) -> Vec<ProcedureInfo> {
let procedures = self.procedures.read().unwrap();
procedures
.values()
.map(|meta| ProcedureInfo {
id: meta.id,
type_name: meta.type_name.clone(),
start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
state: meta.state(),
lock_keys: meta.lock_key.get_keys(),
})
.collect()
}
/// Returns the [Watcher] of specific `procedure_id`.
fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
let procedures = self.procedures.read().unwrap();
@@ -438,6 +476,7 @@ impl LocalManager {
procedure_state,
None,
procedure.lock_key(),
procedure.type_name(),
));
let runner = Runner {
meta: meta.clone(),
@@ -641,6 +680,10 @@ impl ProcedureManager for LocalManager {
fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
self.manager_ctx.watcher(procedure_id)
}
async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
Ok(self.manager_ctx.list_procedure())
}
}
struct RemoveOutdatedMetaFunction {
@@ -675,6 +718,7 @@ pub(crate) mod test_util {
ProcedureState::Running,
None,
LockKey::default(),
"ProcedureAdapter",
)
}

View File

@@ -27,7 +27,9 @@ use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::{Output, StringKey};
use crate::store::{ProcedureMessage, ProcedureStore};
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};
use crate::{
BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
};
/// A guard to cleanup procedure state.
struct ProcedureGuard {
@@ -129,7 +131,9 @@ impl Runner {
// Execute the procedure. We need to release the lock whenever the execution
// is successful or fail.
self.meta.set_start_time_ms();
self.execute_procedure_in_loop().await;
self.meta.set_end_time_ms();
// We can't remove the metadata of the procedure now as users and its parent might
// need to query its state.
@@ -368,6 +372,7 @@ impl Runner {
procedure_state,
Some(self.meta.id),
procedure.lock_key(),
procedure.type_name(),
));
let runner = Runner {
meta: meta.clone(),

View File

@@ -159,6 +159,14 @@ impl<T: Procedure + ?Sized> Procedure for Box<T> {
(**self).execute(ctx).await
}
async fn rollback(&mut self, ctx: &Context) -> Result<()> {
(**self).rollback(ctx).await
}
fn rollback_supported(&self) -> bool {
(**self).rollback_supported()
}
fn dump(&self) -> Result<String> {
(**self).dump()
}
@@ -227,6 +235,11 @@ impl LockKey {
pub fn keys_to_lock(&self) -> impl Iterator<Item = &StringKey> {
self.0.iter()
}
/// Returns the keys to lock.
pub fn get_keys(&self) -> Vec<String> {
self.0.iter().map(|key| format!("{:?}", key)).collect()
}
}
/// Boxed [Procedure].
@@ -374,6 +387,18 @@ impl ProcedureState {
_ => None,
}
}
/// Return the string values of the enum field names.
pub fn as_str_name(&self) -> &str {
match self {
ProcedureState::Running => "Running",
ProcedureState::Done { .. } => "Done",
ProcedureState::Retrying { .. } => "Retrying",
ProcedureState::Failed { .. } => "Failed",
ProcedureState::PrepareRollback { .. } => "PrepareRollback",
ProcedureState::RollingBack { .. } => "RollingBack",
}
}
}
/// The initial procedure state.
@@ -412,11 +437,30 @@ pub trait ProcedureManager: Send + Sync + 'static {
/// Returns a [Watcher] to watch [ProcedureState] of specific procedure.
fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher>;
/// Returns the details of the procedure.
async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>>;
}
/// Ref-counted pointer to the [ProcedureManager].
pub type ProcedureManagerRef = Arc<dyn ProcedureManager>;
#[derive(Debug, Clone)]
pub struct ProcedureInfo {
/// Id of this procedure.
pub id: ProcedureId,
/// Type name of this procedure.
pub type_name: String,
/// Start execution time of this procedure.
pub start_time_ms: i64,
/// End execution time of this procedure.
pub end_time_ms: i64,
/// status of this procedure.
pub state: ProcedureState,
/// Lock keys of this procedure.
pub lock_keys: Vec<String>,
}
#[cfg(test)]
mod tests {
use common_error::mock::MockError;

View File

@@ -22,7 +22,7 @@ mod cluster;
mod store;
mod util;
use api::v1::meta::Role;
use api::v1::meta::{ProcedureDetailResponse, Role};
use cluster::Client as ClusterClient;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -259,6 +259,16 @@ impl ProcedureExecutor for MetaClient {
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
self.procedure_client()
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?
.list_procedures()
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
}
#[async_trait::async_trait]

View File

@@ -18,8 +18,9 @@ use std::time::Duration;
use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureId,
ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role,
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
QueryProcedureRequest, ResponseHeader, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
@@ -89,6 +90,11 @@ impl Client {
.migrate_region(region_id, from_peer, to_peer, replay_timeout)
.await
}
pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
let inner = self.inner.read().await;
inner.list_procedures().await
}
}
#[derive(Debug)]
@@ -279,4 +285,23 @@ impl Inner {
)
.await
}
async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
let mut req = ProcedureDetailRequest::default();
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
self.with_retry(
"list procedure",
move |mut client| {
let req = req.clone();
async move { client.details(req).await.map(|res| res.into_inner()) }
},
|resp: &ProcedureDetailResponse| &resp.header,
)
.await
}
}

View File

@@ -18,7 +18,7 @@ use std::time::Duration;
use api::v1::meta::{
procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse, QueryProcedureRequest,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
};
use common_meta::ddl::ExecutorContext;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
@@ -146,4 +146,20 @@ impl procedure_service_server::ProcedureService for Metasrv {
Ok(Response::new(resp))
}
async fn details(
&self,
request: Request<ProcedureDetailRequest>,
) -> GrpcResult<ProcedureDetailResponse> {
let ProcedureDetailRequest { header } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let metas = self
.procedure_manager()
.list_procedures()
.await
.context(error::QueryProcedureSnafu)?;
Ok(Response::new(procedure::procedure_details_to_pb_response(
metas,
)))
}
}

View File

@@ -370,6 +370,7 @@ impl GreptimeDbClusterBuilder {
Some(meta_client.clone()),
cached_meta_backend.clone(),
cache_registry.clone(),
None,
);
let handlers_executor = HandlerGroupExecutor::new(vec![

View File

@@ -149,6 +149,7 @@ impl GreptimeDbStandaloneBuilder {
None,
kv_backend.clone(),
cache_registry.clone(),
Some(procedure_manager.clone()),
);
let flow_builder = FlownodeBuilder::new(

View File

@@ -0,0 +1,40 @@
--- test information_schema.procedure_info ----
USE public;
Affected Rows: 0
CREATE TABLE procedure_info_for_sql_test1(
ts TIMESTAMP TIME INDEX,
temperature DOUBLE DEFAULT 10,
) engine=mito with('append_mode'='true');
Affected Rows: 0
CREATE TABLE procedure_info_for_sql_test2(
ts TIMESTAMP TIME INDEX,
temperature DOUBLE DEFAULT 10,
) engine=mito with('append_mode'='true');
Affected Rows: 0
use INFORMATION_SCHEMA;
Affected Rows: 0
select procedure_type from procedure_info where lock_keys like '%procedure_info_for_sql_test%';
+--------------------------------+
| procedure_type |
+--------------------------------+
| metasrv-procedure::CreateTable |
| metasrv-procedure::CreateTable |
+--------------------------------+
use public;
Affected Rows: 0
DROP TABLE procedure_info_for_sql_test1, procedure_info_for_sql_test2;
Affected Rows: 0

View File

@@ -0,0 +1,20 @@
--- test information_schema.procedure_info ----
USE public;
CREATE TABLE procedure_info_for_sql_test1(
ts TIMESTAMP TIME INDEX,
temperature DOUBLE DEFAULT 10,
) engine=mito with('append_mode'='true');
CREATE TABLE procedure_info_for_sql_test2(
ts TIMESTAMP TIME INDEX,
temperature DOUBLE DEFAULT 10,
) engine=mito with('append_mode'='true');
use INFORMATION_SCHEMA;
select procedure_type from procedure_info where lock_keys like '%procedure_info_for_sql_test%';
use public;
DROP TABLE procedure_info_for_sql_test1, procedure_info_for_sql_test2;

View File

@@ -45,6 +45,7 @@ SHOW TABLES;
| optimizer_trace |
| parameters |
| partitions |
| procedure_info |
| profiling |
| referential_constraints |
| region_peers |
@@ -91,6 +92,7 @@ SHOW FULL TABLES;
| optimizer_trace | LOCAL TEMPORARY |
| parameters | LOCAL TEMPORARY |
| partitions | LOCAL TEMPORARY |
| procedure_info | LOCAL TEMPORARY |
| profiling | LOCAL TEMPORARY |
| referential_constraints | LOCAL TEMPORARY |
| region_peers | LOCAL TEMPORARY |
@@ -131,6 +133,7 @@ SHOW TABLE STATUS;
|optimizer_trace||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||
|parameters||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||
|partitions||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||
|procedure_info||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||
|profiling||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||
|referential_constraints||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||
|region_peers||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0|||

View File

@@ -32,6 +32,7 @@ order by table_schema, table_name;
|greptime|information_schema|optimizer_trace|LOCALTEMPORARY|17|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
|greptime|information_schema|parameters|LOCALTEMPORARY|18|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
|greptime|information_schema|partitions|LOCALTEMPORARY|28|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
|greptime|information_schema|procedure_info|LOCALTEMPORARY|34|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
|greptime|information_schema|profiling|LOCALTEMPORARY|19|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
|greptime|information_schema|referential_constraints|LOCALTEMPORARY|20|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
|greptime|information_schema|region_peers|LOCALTEMPORARY|29|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y|
@@ -256,6 +257,12 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | partitions | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | partitions | tablespace_name | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | partitions | update_time | 20 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | Yes | datetime | | |
| greptime | information_schema | procedure_info | end_time | 4 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |
| greptime | information_schema | procedure_info | lock_keys | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | procedure_info | procedure_id | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | procedure_info | procedure_type | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | procedure_info | start_time | 3 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |
| greptime | information_schema | procedure_info | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | profiling | block_ops_in | 9 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | profiling | block_ops_out | 10 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | profiling | context_involuntary | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |

View File

@@ -106,6 +106,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
|greptime|information_schema|optimizer_trace|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|
|greptime|information_schema|parameters|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|
|greptime|information_schema|partitions|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|
|greptime|information_schema|procedure_info|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|
|greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|
|greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|
|greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|