From b3d413258d8caf4ed277e03b6d139a8ad65b226a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 22 Sep 2025 19:21:04 +0800 Subject: [PATCH] feat: extract standalone functionality and introduce plugin-based router configuration (#7002) * feat: extract standalone functionality and introduce plugin-based router configuration Signed-off-by: WenyXu * fix: ensure dump file does not exist Signed-off-by: WenyXu * chore: introduce `External` error Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 38 ++- Cargo.toml | 2 + src/cmd/Cargo.toml | 5 +- src/cmd/src/error.rs | 16 + src/cmd/src/standalone.rs | 323 ++------------------ src/cmd/tests/load_config_test.rs | 2 +- src/common/meta/src/snapshot.rs | 27 +- src/frontend/Cargo.toml | 1 - src/frontend/src/error.rs | 9 - src/frontend/src/instance.rs | 44 +-- src/frontend/src/server.rs | 7 + src/plugins/Cargo.toml | 2 + src/plugins/src/lib.rs | 2 + src/plugins/src/standalone.rs | 35 +++ src/servers/src/http.rs | 1 + src/servers/src/http/utils.rs | 15 + src/servers/src/http/utils/router.rs | 37 +++ src/standalone/Cargo.toml | 37 +++ src/standalone/src/error.rs | 54 ++++ src/standalone/src/information_extension.rs | 166 ++++++++++ src/standalone/src/lib.rs | 23 ++ src/standalone/src/metadata.rs | 37 +++ src/standalone/src/options.rs | 158 ++++++++++ src/standalone/src/procedure.rs | 45 +++ tests-integration/Cargo.toml | 1 + tests-integration/src/standalone.rs | 9 +- 26 files changed, 731 insertions(+), 365 deletions(-) create mode 100644 src/plugins/src/standalone.rs create mode 100644 src/servers/src/http/utils.rs create mode 100644 src/servers/src/http/utils/router.rs create mode 100644 src/standalone/Cargo.toml create mode 100644 src/standalone/src/error.rs create mode 100644 src/standalone/src/information_extension.rs create mode 100644 src/standalone/src/lib.rs create mode 100644 src/standalone/src/metadata.rs create mode 100644 src/standalone/src/options.rs create mode 100644 src/standalone/src/procedure.rs diff --git a/Cargo.lock b/Cargo.lock index c3724b7d72..c5ae7dd2d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1951,6 +1951,7 @@ dependencies = [ "session", "similar-asserts", "snafu 0.8.6", + "standalone", "stat", "store-api", "substrait 0.18.0", @@ -4900,7 +4901,6 @@ dependencies = [ "humantime-serde", "lazy_static", "log-query", - "log-store", "meta-client", "num_cpus", "opentelemetry-proto", @@ -9457,12 +9457,14 @@ dependencies = [ "cli", "common-base", "common-error", + "common-meta", "datanode", "flow", "frontend", "meta-srv", "serde", "snafu 0.8.6", + "standalone", ] [[package]] @@ -12310,6 +12312,39 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "standalone" +version = "0.18.0" +dependencies = [ + "async-trait", + "catalog", + "client", + "common-base", + "common-config", + "common-error", + "common-macro", + "common-meta", + "common-options", + "common-procedure", + "common-query", + "common-telemetry", + "common-time", + "common-version", + "common-wal", + "datanode", + "file-engine", + "flow", + "frontend", + "log-store", + "mito2", + "query", + "serde", + "servers", + "snafu 0.8.6", + "store-api", + "tokio", +] + [[package]] name = "stat" version = "0.18.0" @@ -13025,6 +13060,7 @@ dependencies = [ "snafu 0.8.6", "sql", "sqlx", + "standalone", "store-api", "substrait 0.18.0", "table", diff --git a/Cargo.toml b/Cargo.toml index 7b9cb4e41c..65b44da10c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ members = [ "src/promql", "src/puffin", "src/query", + "src/standalone", "src/servers", "src/session", "src/sql", @@ -308,6 +309,7 @@ query = { path = "src/query" } servers = { path = "src/servers" } session = { path = "src/session" } sql = { path = "src/sql" } +standalone = { path = "src/standalone" } stat = { path = "src/common/stat" } store-api = { path = "src/store-api" } substrait = { path = "src/common/substrait" } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 4f7f1874f8..0cc88769eb 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -54,7 +54,6 @@ common-wal.workspace = true datanode.workspace = true datatypes.workspace = true etcd-client.workspace = true -file-engine.workspace = true flow.workspace = true frontend = { workspace = true, default-features = false } futures.workspace = true @@ -64,7 +63,6 @@ lazy_static.workspace = true meta-client.workspace = true meta-srv.workspace = true metric-engine.workspace = true -mito2.workspace = true moka.workspace = true nu-ansi-term = "0.46" object-store.workspace = true @@ -75,6 +73,7 @@ query.workspace = true rand.workspace = true regex.workspace = true reqwest.workspace = true +standalone.workspace = true serde.workspace = true serde_json.workspace = true servers.workspace = true @@ -100,6 +99,8 @@ common-version.workspace = true serde.workspace = true temp-env = "0.3" tempfile.workspace = true +file-engine.workspace = true +mito2.workspace = true [target.'cfg(not(windows))'.dev-dependencies] rexpect = "0.5" diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index ed9dda22d3..3b9ec93353 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -302,6 +302,20 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display("Failed to build metadata kvbackend"))] + BuildMetadataKvbackend { + #[snafu(implicit)] + location: Location, + source: standalone::error::Error, + }, + + #[snafu(display("Failed to setup standalone plugins"))] + SetupStandalonePlugins { + #[snafu(implicit)] + location: Location, + source: standalone::error::Error, + }, } pub type Result = std::result::Result; @@ -320,6 +334,8 @@ impl ErrorExt for Error { Error::UnsupportedSelectorType { source, .. } => source.status_code(), Error::BuildCli { source, .. } => source.status_code(), Error::StartCli { source, .. } => source.status_code(), + Error::BuildMetadataKvbackend { source, .. } => source.status_code(), + Error::SetupStandalonePlugins { source, .. } => source.status_code(), Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { source.status_code() diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 9ee7d6b728..fa4c4e4320 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -19,71 +19,47 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; -use catalog::information_schema::{DatanodeInspectRequest, InformationExtension}; use catalog::kvbackend::KvBackendCatalogManagerBuilder; use catalog::process_manager::ProcessManager; use clap::Parser; -use client::SendableRecordBatchStream; -use client::api::v1::meta::RegionRole; use common_base::Plugins; -use common_base::readable_size::ReadableSize; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; -use common_config::{Configurable, KvBackendConfig, metadata_store_dir}; +use common_config::{Configurable, metadata_store_dir}; use common_error::ext::BoxedError; use common_meta::cache::LayeredCacheRegistryBuilder; -use common_meta::cluster::{NodeInfo, NodeStatus}; -use common_meta::datanode::RegionStat; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::FlowMetadataManager; -use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::peer::Peer; use common_meta::procedure_executor::LocalProcedureExecutor; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator}; -use common_options::memory::MemoryOptions; -use common_procedure::{ProcedureInfo, ProcedureManagerRef}; -use common_query::request::QueryRequest; +use common_procedure::ProcedureManagerRef; use common_telemetry::info; -use common_telemetry::logging::{ - DEFAULT_LOGGING_DIR, LoggingOptions, SlowQueryOptions, TracingOptions, -}; +use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_time::timezone::set_default_timezone; use common_version::{short_version, verbose_version}; -use common_wal::config::DatanodeWalConfig; -use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; +use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; -use datanode::region_server::RegionServer; -use file_engine::config::EngineConfig as FileEngineConfig; use flow::{ - FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, - FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine, + FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker, + GrpcQueryHandlerWithBoxedError, }; -use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::frontend::Frontend; +use frontend::instance::StandaloneDatanodeManager; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager}; use frontend::server::Services; -use frontend::service_config::{ - InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, - PromStoreOptions, -}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; -use mito2::config::MitoConfig; -use query::options::QueryOptions; -use serde::{Deserialize, Serialize}; -use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; -use servers::grpc::GrpcOptions; -use servers::http::HttpOptions; +use servers::export_metrics::ExportMetricsTask; use servers::tls::{TlsMode, TlsOption}; use snafu::ResultExt; -use store_api::storage::RegionId; -use tokio::sync::RwLock; +use standalone::StandaloneInformationExtension; +use standalone::options::StandaloneOptions; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{Result, StartFlownodeSnafu}; @@ -133,131 +109,6 @@ impl SubCommand { } } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -#[serde(default)] -pub struct StandaloneOptions { - pub enable_telemetry: bool, - pub default_timezone: Option, - pub http: HttpOptions, - pub grpc: GrpcOptions, - pub mysql: MysqlOptions, - pub postgres: PostgresOptions, - pub opentsdb: OpentsdbOptions, - pub influxdb: InfluxdbOptions, - pub jaeger: JaegerOptions, - pub prom_store: PromStoreOptions, - pub wal: DatanodeWalConfig, - pub storage: StorageConfig, - pub metadata_store: KvBackendConfig, - pub procedure: ProcedureConfig, - pub flow: FlowConfig, - pub logging: LoggingOptions, - pub user_provider: Option, - /// Options for different store engines. - pub region_engine: Vec, - pub export_metrics: ExportMetricsOption, - pub tracing: TracingOptions, - pub init_regions_in_background: bool, - pub init_regions_parallelism: usize, - pub max_in_flight_write_bytes: Option, - pub slow_query: SlowQueryOptions, - pub query: QueryOptions, - pub memory: MemoryOptions, -} - -impl Default for StandaloneOptions { - fn default() -> Self { - Self { - enable_telemetry: true, - default_timezone: None, - http: HttpOptions::default(), - grpc: GrpcOptions::default(), - mysql: MysqlOptions::default(), - postgres: PostgresOptions::default(), - opentsdb: OpentsdbOptions::default(), - influxdb: InfluxdbOptions::default(), - jaeger: JaegerOptions::default(), - prom_store: PromStoreOptions::default(), - wal: DatanodeWalConfig::default(), - storage: StorageConfig::default(), - metadata_store: KvBackendConfig::default(), - procedure: ProcedureConfig::default(), - flow: FlowConfig::default(), - logging: LoggingOptions::default(), - export_metrics: ExportMetricsOption::default(), - user_provider: None, - region_engine: vec![ - RegionEngineConfig::Mito(MitoConfig::default()), - RegionEngineConfig::File(FileEngineConfig::default()), - ], - tracing: TracingOptions::default(), - init_regions_in_background: false, - init_regions_parallelism: 16, - max_in_flight_write_bytes: None, - slow_query: SlowQueryOptions::default(), - query: QueryOptions::default(), - memory: MemoryOptions::default(), - } - } -} - -impl Configurable for StandaloneOptions { - fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["wal.broker_endpoints"]) - } -} - -/// The [`StandaloneOptions`] is only defined in cmd crate, -/// we don't want to make `frontend` depends on it, so impl [`Into`] -/// rather than [`From`]. -#[allow(clippy::from_over_into)] -impl Into for StandaloneOptions { - fn into(self) -> FrontendOptions { - self.frontend_options() - } -} - -impl StandaloneOptions { - pub fn frontend_options(&self) -> FrontendOptions { - let cloned_opts = self.clone(); - FrontendOptions { - default_timezone: cloned_opts.default_timezone, - http: cloned_opts.http, - grpc: cloned_opts.grpc, - mysql: cloned_opts.mysql, - postgres: cloned_opts.postgres, - opentsdb: cloned_opts.opentsdb, - influxdb: cloned_opts.influxdb, - jaeger: cloned_opts.jaeger, - prom_store: cloned_opts.prom_store, - meta_client: None, - logging: cloned_opts.logging, - user_provider: cloned_opts.user_provider, - // Handle the export metrics task run by standalone to frontend for execution - export_metrics: cloned_opts.export_metrics, - max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes, - slow_query: cloned_opts.slow_query, - ..Default::default() - } - } - - pub fn datanode_options(&self) -> DatanodeOptions { - let cloned_opts = self.clone(); - DatanodeOptions { - node_id: Some(0), - enable_telemetry: cloned_opts.enable_telemetry, - wal: cloned_opts.wal, - storage: cloned_opts.storage, - region_engine: cloned_opts.region_engine, - grpc: cloned_opts.grpc, - init_regions_in_background: cloned_opts.init_regions_in_background, - init_regions_parallelism: cloned_opts.init_regions_parallelism, - query: cloned_opts.query, - ..Default::default() - } - } -} - pub struct Instance { datanode: Datanode, frontend: Frontend, @@ -523,13 +374,14 @@ impl StartCommand { .context(error::CreateDirSnafu { dir: data_home })?; let metadata_dir = metadata_store_dir(data_home); - let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components( - metadata_dir, - opts.metadata_store, - opts.procedure, - ) - .await - .context(error::StartFrontendSnafu)?; + let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store) + .context(error::BuildMetadataKvbackendSnafu)?; + let procedure_manager = + standalone::build_procedure_manager(kv_backend.clone(), opts.procedure); + + plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone()) + .await + .context(error::SetupStandalonePluginsSnafu)?; // Builds cache registry let layered_cache_builder = LayeredCacheRegistryBuilder::default(); @@ -745,141 +597,6 @@ impl StartCommand { } } -pub struct StandaloneInformationExtension { - region_server: RegionServer, - procedure_manager: ProcedureManagerRef, - start_time_ms: u64, - flow_streaming_engine: RwLock>>, -} - -impl StandaloneInformationExtension { - pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self { - Self { - region_server, - procedure_manager, - start_time_ms: common_time::util::current_time_millis() as u64, - flow_streaming_engine: RwLock::new(None), - } - } - - /// Set the flow streaming engine for the standalone instance. - pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc) { - let mut guard = self.flow_streaming_engine.write().await; - *guard = Some(flow_streaming_engine); - } -} - -#[async_trait::async_trait] -impl InformationExtension for StandaloneInformationExtension { - type Error = catalog::error::Error; - - async fn nodes(&self) -> std::result::Result, Self::Error> { - let build_info = common_version::build_info(); - let node_info = NodeInfo { - // For the standalone: - // - id always 0 - // - empty string for peer_addr - peer: Peer { - id: 0, - addr: "".to_string(), - }, - last_activity_ts: -1, - status: NodeStatus::Standalone, - version: build_info.version.to_string(), - git_commit: build_info.commit_short.to_string(), - // Use `self.start_time_ms` instead. - // It's not precise but enough. - start_time_ms: self.start_time_ms, - cpus: common_config::utils::get_cpus() as u32, - memory_bytes: common_config::utils::get_sys_total_memory() - .unwrap_or_default() - .as_bytes(), - }; - Ok(vec![node_info]) - } - - async fn procedures(&self) -> std::result::Result, Self::Error> { - self.procedure_manager - .list_procedures() - .await - .map_err(BoxedError::new) - .map(|procedures| { - procedures - .into_iter() - .map(|procedure| { - let status = procedure.state.as_str_name().to_string(); - (status, procedure) - }) - .collect::>() - }) - .context(catalog::error::ListProceduresSnafu) - } - - async fn region_stats(&self) -> std::result::Result, Self::Error> { - let stats = self - .region_server - .reportable_regions() - .into_iter() - .map(|stat| { - let region_stat = self - .region_server - .region_statistic(stat.region_id) - .unwrap_or_default(); - RegionStat { - id: stat.region_id, - rcus: 0, - wcus: 0, - approximate_bytes: region_stat.estimated_disk_size(), - engine: stat.engine, - role: RegionRole::from(stat.role).into(), - num_rows: region_stat.num_rows, - memtable_size: region_stat.memtable_size, - manifest_size: region_stat.manifest_size, - sst_size: region_stat.sst_size, - sst_num: region_stat.sst_num, - index_size: region_stat.index_size, - region_manifest: region_stat.manifest.into(), - data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, - metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, - written_bytes: region_stat.written_bytes, - } - }) - .collect::>(); - Ok(stats) - } - - async fn flow_stats(&self) -> std::result::Result, Self::Error> { - Ok(Some( - self.flow_streaming_engine - .read() - .await - .as_ref() - .unwrap() - .gen_state_report() - .await, - )) - } - - async fn inspect_datanode( - &self, - request: DatanodeInspectRequest, - ) -> std::result::Result { - let req = QueryRequest { - plan: request - .build_plan() - .context(catalog::error::DatafusionSnafu)?, - region_id: RegionId::default(), - header: None, - }; - - self.region_server - .handle_read(req) - .await - .map_err(BoxedError::new) - .context(catalog::error::InternalSnafu) - } -} - #[cfg(test)] mod tests { use std::default::Default; @@ -891,7 +608,9 @@ mod tests { use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use common_wal::config::DatanodeWalConfig; + use frontend::frontend::FrontendOptions; use object_store::config::{FileConfig, GcsConfig}; + use servers::grpc::GrpcOptions; use super::*; use crate::options::GlobalOptions; diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index da2dfd456c..827fd017a1 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -15,7 +15,6 @@ use std::time::Duration; use cmd::options::GreptimeOptions; -use cmd::standalone::StandaloneOptions; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_options::datanode::{ClientOptions, DatanodeClientOptions}; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions}; @@ -35,6 +34,7 @@ use servers::export_metrics::ExportMetricsOption; use servers::grpc::GrpcOptions; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; +use standalone::options::StandaloneOptions; use store_api::path_utils::WAL_DIR; #[allow(deprecated)] diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs index c342716f2e..54d5dcf224 100644 --- a/src/common/meta/src/snapshot.rs +++ b/src/common/meta/src/snapshot.rs @@ -23,12 +23,12 @@ use common_telemetry::info; use file::{Metadata, MetadataContent}; use futures::{TryStreamExt, future}; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use strum::Display; use crate::error::{ Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu, - Result, WriteObjectSnafu, + Result, UnexpectedSnafu, WriteObjectSnafu, }; use crate::key::{CANDIDATES_ROOT, ELECTION_KEY}; use crate::kv_backend::KvBackendRef; @@ -247,6 +247,20 @@ impl MetadataSnapshotManager { let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu { reason: format!("Invalid file path: {}, filename: {}", path, filename_str), })?; + // Ensure the file does not exist + ensure!( + !self + .object_store + .exists(file_path) + .await + .context(ReadObjectSnafu { file_path })?, + UnexpectedSnafu { + err_msg: format!( + "The file '{}' already exists. Please choose a different name or remove the existing file before proceeding.", + file_path + ), + } + ); let now = Instant::now(); let req = RangeRequest::new().with_range(vec![0], vec![0]); let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| { @@ -403,6 +417,15 @@ mod tests { .unwrap(); // Clean up the kv backend kv_backend.clear(); + let err = manager + .dump( + &dump_path.as_path().display().to_string(), + "metadata_snapshot", + ) + .await + .unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(err.to_string().contains("already exists")); let restore_path = dump_path .join("metadata_snapshot.metadata.fb") diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 69aad70a93..bf6b6bd25d 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -50,7 +50,6 @@ humantime.workspace = true humantime-serde.workspace = true lazy_static.workspace = true log-query.workspace = true -log-store.workspace = true meta-client.workspace = true num_cpus.workspace = true opentelemetry-proto.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 286e633eeb..710c855958 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -36,13 +36,6 @@ pub enum Error { source: common_meta::error::Error, }, - #[snafu(display("Failed to open raft engine backend"))] - OpenRaftEngineBackend { - #[snafu(implicit)] - location: Location, - source: BoxedError, - }, - #[snafu(display("Failed to handle heartbeat response"))] HandleHeartbeatResponse { #[snafu(implicit)] @@ -420,8 +413,6 @@ impl ErrorExt for Error { Error::Table { source, .. } | Error::Insert { source, .. } => source.status_code(), - Error::OpenRaftEngineBackend { .. } => StatusCode::StorageUnavailable, - Error::RequestQuery { source, .. } => source.status_code(), Error::CacheRequired { .. } => StatusCode::Internal, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index bbc12ae876..7d4642f6c2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -39,30 +39,22 @@ use catalog::process_manager::{ use client::OutputData; use common_base::Plugins; use common_base::cancellation::CancellableFuture; -use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; use common_event_recorder::EventRecorderRef; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::key::TableMetadataManagerRef; -use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::table_name::TableNameKey; -use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; use common_meta::procedure_executor::ProcedureExecutorRef; -use common_meta::state_store::KvStateStore; -use common_procedure::ProcedureManagerRef; -use common_procedure::local::{LocalManager, ManagerConfig}; -use common_procedure::options::ProcedureConfig; use common_query::Output; use common_recordbatch::RecordBatchStreamWrapper; use common_recordbatch::error::StreamTimeoutSnafu; use common_telemetry::logging::SlowQueryOptions; -use common_telemetry::{debug, error, info, tracing}; +use common_telemetry::{debug, error, tracing}; use dashmap::DashMap; use datafusion_expr::LogicalPlan; use futures::{Stream, StreamExt}; use lazy_static::lazy_static; -use log_store::raft_engine::RaftEngineBackend; use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::statement::{StatementExecutor, StatementExecutorRef}; @@ -136,40 +128,6 @@ pub struct Instance { } impl Instance { - pub async fn try_build_standalone_components( - dir: String, - kv_backend_config: KvBackendConfig, - procedure_config: ProcedureConfig, - ) -> Result<(KvBackendRef, ProcedureManagerRef)> { - info!( - "Creating metadata kvbackend with config: {:?}", - kv_backend_config - ); - let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config) - .map_err(BoxedError::new) - .context(error::OpenRaftEngineBackendSnafu)?; - - let kv_backend = Arc::new(kv_backend); - let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone())); - - let manager_config = ManagerConfig { - max_retry_times: procedure_config.max_retry_times, - retry_delay: procedure_config.retry_delay, - max_running_procedures: procedure_config.max_running_procedures, - ..Default::default() - }; - let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone())); - let procedure_manager = Arc::new(LocalManager::new( - manager_config, - kv_state_store.clone(), - kv_state_store, - Some(runtime_switch_manager), - None, - )); - - Ok((kv_backend, procedure_manager)) - } - pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index cdd2c01c60..6c19109ab2 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; use common_config::Configurable; +use common_telemetry::info; use meta_client::MetaClientOptions; use servers::error::Error as ServerError; use servers::grpc::builder::GrpcServerBuilder; @@ -25,6 +26,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcOptions, GrpcServer}; use servers::http::event::LogValidatorRef; +use servers::http::utils::router::RouterConfigurator; use servers::http::{HttpServer, HttpServerBuilder}; use servers::interceptor::LogIngestInterceptorRef; use servers::metrics_handler::MetricsHandler; @@ -115,6 +117,11 @@ where builder = builder.with_jaeger_handler(self.instance.clone()); } + if let Some(configurator) = self.plugins.get::() { + info!("Adding extra router from plugins"); + builder = builder.with_extra_router(configurator.router()); + } + builder } diff --git a/src/plugins/Cargo.toml b/src/plugins/Cargo.toml index f888d85173..14df62c4fa 100644 --- a/src/plugins/Cargo.toml +++ b/src/plugins/Cargo.toml @@ -13,9 +13,11 @@ clap.workspace = true cli.workspace = true common-base.workspace = true common-error.workspace = true +common-meta.workspace = true datanode.workspace = true flow.workspace = true frontend.workspace = true meta-srv.workspace = true serde.workspace = true snafu.workspace = true +standalone.workspace = true diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index fc1887ba7d..9a979a23a1 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -18,6 +18,7 @@ mod flownode; mod frontend; mod meta_srv; mod options; +mod standalone; pub use cli::SubCommand; pub use datanode::{setup_datanode_plugins, start_datanode_plugins}; @@ -25,3 +26,4 @@ pub use flownode::{setup_flownode_plugins, start_flownode_plugins}; pub use frontend::{setup_frontend_plugins, start_frontend_plugins}; pub use meta_srv::{setup_metasrv_plugins, start_metasrv_plugins}; pub use options::PluginOptions; +pub use standalone::{setup_standalone_plugins, start_standalone_plugins}; diff --git a/src/plugins/src/standalone.rs b/src/plugins/src/standalone.rs new file mode 100644 index 0000000000..97b1c22aa7 --- /dev/null +++ b/src/plugins/src/standalone.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::Plugins; +use common_meta::kv_backend::KvBackendRef; +use standalone::error::Result; +use standalone::options::StandaloneOptions; + +use crate::options::PluginOptions; + +#[allow(unused_variables)] +#[allow(unused_mut)] +pub async fn setup_standalone_plugins( + plugins: &mut Plugins, + plugin_options: &[PluginOptions], + standalone_opts: &StandaloneOptions, + metadata_kvbackend: KvBackendRef, +) -> Result<()> { + Ok(()) +} + +pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> { + Ok(()) +} diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 596f6cee08..0e06cd1ad7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -103,6 +103,7 @@ pub mod prom_store; pub mod prometheus; pub mod result; mod timeout; +pub mod utils; pub(crate) use timeout::DynamicTimeoutLayer; diff --git a/src/servers/src/http/utils.rs b/src/servers/src/http/utils.rs new file mode 100644 index 0000000000..d2acdbfa5b --- /dev/null +++ b/src/servers/src/http/utils.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod router; diff --git a/src/servers/src/http/utils/router.rs b/src/servers/src/http/utils/router.rs new file mode 100644 index 0000000000..9c5132c88e --- /dev/null +++ b/src/servers/src/http/utils/router.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Mutex}; + +use axum::Router; + +/// A thread-safe wrapper for an Axum [`Router`] that allows for dynamic configuration. +#[derive(Default, Clone)] +pub struct RouterConfigurator(Arc>); + +impl RouterConfigurator { + /// Returns a clone of the current [`Router`]. + pub fn router(&self) -> Router { + self.0.lock().unwrap().clone() + } + + /// Applies a configuration function to the current [`Router`]. + /// + /// The provided closure receives the current router (cloned) and should return a new [`Router`]. + /// The internal router is then replaced with the result. + pub fn configure(&self, f: impl FnOnce(Router) -> Router) { + let mut router = self.0.lock().unwrap(); + *router = f(router.clone()); + } +} diff --git a/src/standalone/Cargo.toml b/src/standalone/Cargo.toml new file mode 100644 index 0000000000..035b1dc06a --- /dev/null +++ b/src/standalone/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "standalone" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +async-trait.workspace = true +catalog.workspace = true +client.workspace = true +common-base.workspace = true +common-config.workspace = true +common-error.workspace = true +common-macro.workspace = true +common-meta.workspace = true +common-options.workspace = true +common-procedure.workspace = true +common-query.workspace = true +common-telemetry.workspace = true +common-time.workspace = true +common-version.workspace = true +common-wal.workspace = true +datanode.workspace = true +file-engine.workspace = true +flow.workspace = true +frontend.workspace = true +log-store.workspace = true +mito2.workspace = true +query.workspace = true +serde.workspace = true +servers.workspace = true +snafu.workspace = true +store-api.workspace = true +tokio.workspace = true diff --git a/src/standalone/src/error.rs b/src/standalone/src/error.rs new file mode 100644 index 0000000000..352271762c --- /dev/null +++ b/src/standalone/src/error.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to open metadata kvbackend"))] + OpenMetadataKvBackend { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + + #[snafu(display("External error"))] + External { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::OpenMetadataKvBackend { source, .. } => source.status_code(), + Error::External { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/standalone/src/information_extension.rs b/src/standalone/src/information_extension.rs new file mode 100644 index 0000000000..347955cfab --- /dev/null +++ b/src/standalone/src/information_extension.rs @@ -0,0 +1,166 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::information_schema::{DatanodeInspectRequest, InformationExtension}; +use client::SendableRecordBatchStream; +use client::api::v1::meta::RegionRole; +use common_error::ext::BoxedError; +use common_meta::cluster::{NodeInfo, NodeStatus}; +use common_meta::datanode::RegionStat; +use common_meta::key::flow::flow_state::FlowStat; +use common_meta::peer::Peer; +use common_procedure::{ProcedureInfo, ProcedureManagerRef}; +use common_query::request::QueryRequest; +use datanode::region_server::RegionServer; +use flow::StreamingEngine; +use snafu::ResultExt; +use store_api::storage::RegionId; +use tokio::sync::RwLock; + +pub struct StandaloneInformationExtension { + region_server: RegionServer, + procedure_manager: ProcedureManagerRef, + start_time_ms: u64, + flow_streaming_engine: RwLock>>, +} + +impl StandaloneInformationExtension { + pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self { + Self { + region_server, + procedure_manager, + start_time_ms: common_time::util::current_time_millis() as u64, + flow_streaming_engine: RwLock::new(None), + } + } + + /// Set the flow streaming engine for the standalone instance. + pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc) { + let mut guard = self.flow_streaming_engine.write().await; + *guard = Some(flow_streaming_engine); + } +} + +#[async_trait::async_trait] +impl InformationExtension for StandaloneInformationExtension { + type Error = catalog::error::Error; + + async fn nodes(&self) -> std::result::Result, Self::Error> { + let build_info = common_version::build_info(); + let node_info = NodeInfo { + // For the standalone: + // - id always 0 + // - empty string for peer_addr + peer: Peer { + id: 0, + addr: "".to_string(), + }, + last_activity_ts: -1, + status: NodeStatus::Standalone, + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + // Use `self.start_time_ms` instead. + // It's not precise but enough. + start_time_ms: self.start_time_ms, + cpus: common_config::utils::get_cpus() as u32, + memory_bytes: common_config::utils::get_sys_total_memory() + .unwrap_or_default() + .as_bytes(), + }; + Ok(vec![node_info]) + } + + async fn procedures(&self) -> std::result::Result, Self::Error> { + self.procedure_manager + .list_procedures() + .await + .map_err(BoxedError::new) + .map(|procedures| { + procedures + .into_iter() + .map(|procedure| { + let status = procedure.state.as_str_name().to_string(); + (status, procedure) + }) + .collect::>() + }) + .context(catalog::error::ListProceduresSnafu) + } + + async fn region_stats(&self) -> std::result::Result, Self::Error> { + let stats = self + .region_server + .reportable_regions() + .into_iter() + .map(|stat| { + let region_stat = self + .region_server + .region_statistic(stat.region_id) + .unwrap_or_default(); + RegionStat { + id: stat.region_id, + rcus: 0, + wcus: 0, + approximate_bytes: region_stat.estimated_disk_size(), + engine: stat.engine, + role: RegionRole::from(stat.role).into(), + num_rows: region_stat.num_rows, + memtable_size: region_stat.memtable_size, + manifest_size: region_stat.manifest_size, + sst_size: region_stat.sst_size, + sst_num: region_stat.sst_num, + index_size: region_stat.index_size, + region_manifest: region_stat.manifest.into(), + data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, + metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, + written_bytes: region_stat.written_bytes, + } + }) + .collect::>(); + Ok(stats) + } + + async fn flow_stats(&self) -> std::result::Result, Self::Error> { + Ok(Some( + self.flow_streaming_engine + .read() + .await + .as_ref() + .unwrap() + .gen_state_report() + .await, + )) + } + + async fn inspect_datanode( + &self, + request: DatanodeInspectRequest, + ) -> std::result::Result { + let req = QueryRequest { + plan: request + .build_plan() + .context(catalog::error::DatafusionSnafu)?, + region_id: RegionId::default(), + header: None, + }; + + self.region_server + .handle_read(req) + .await + .map_err(BoxedError::new) + .context(catalog::error::InternalSnafu) + } +} diff --git a/src/standalone/src/lib.rs b/src/standalone/src/lib.rs new file mode 100644 index 0000000000..8c6f041fe3 --- /dev/null +++ b/src/standalone/src/lib.rs @@ -0,0 +1,23 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod information_extension; +pub mod metadata; +pub mod options; +pub mod procedure; + +pub use information_extension::StandaloneInformationExtension; +pub use metadata::build_metadata_kvbackend; +pub use procedure::build_procedure_manager; diff --git a/src/standalone/src/metadata.rs b/src/standalone/src/metadata.rs new file mode 100644 index 0000000000..9432949c4f --- /dev/null +++ b/src/standalone/src/metadata.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_config::KvBackendConfig; +use common_error::ext::BoxedError; +use common_meta::kv_backend::KvBackendRef; +use common_telemetry::info; +use log_store::raft_engine::RaftEngineBackend; +use snafu::ResultExt; + +use crate::error::{OpenMetadataKvBackendSnafu, Result}; + +/// Builds the metadata kvbackend. +pub fn build_metadata_kvbackend(dir: String, config: KvBackendConfig) -> Result { + info!( + "Creating metadata kvbackend with dir: {}, config: {:?}", + dir, config + ); + let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &config) + .map_err(BoxedError::new) + .context(OpenMetadataKvBackendSnafu)?; + + Ok(Arc::new(kv_backend)) +} diff --git a/src/standalone/src/options.rs b/src/standalone/src/options.rs new file mode 100644 index 0000000000..34e4598e61 --- /dev/null +++ b/src/standalone/src/options.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::readable_size::ReadableSize; +use common_config::{Configurable, KvBackendConfig}; +use common_options::memory::MemoryOptions; +use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions}; +use common_wal::config::DatanodeWalConfig; +use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; +use file_engine::config::EngineConfig as FileEngineConfig; +use flow::FlowConfig; +use frontend::frontend::FrontendOptions; +use frontend::service_config::{ + InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, + PromStoreOptions, +}; +use mito2::config::MitoConfig; +use query::options::QueryOptions; +use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; +use servers::grpc::GrpcOptions; +use servers::http::HttpOptions; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(default)] +pub struct StandaloneOptions { + pub enable_telemetry: bool, + pub default_timezone: Option, + pub http: HttpOptions, + pub grpc: GrpcOptions, + pub mysql: MysqlOptions, + pub postgres: PostgresOptions, + pub opentsdb: OpentsdbOptions, + pub influxdb: InfluxdbOptions, + pub jaeger: JaegerOptions, + pub prom_store: PromStoreOptions, + pub wal: DatanodeWalConfig, + pub storage: StorageConfig, + pub metadata_store: KvBackendConfig, + pub procedure: ProcedureConfig, + pub flow: FlowConfig, + pub logging: LoggingOptions, + pub user_provider: Option, + /// Options for different store engines. + pub region_engine: Vec, + pub export_metrics: ExportMetricsOption, + pub tracing: TracingOptions, + pub init_regions_in_background: bool, + pub init_regions_parallelism: usize, + pub max_in_flight_write_bytes: Option, + pub slow_query: SlowQueryOptions, + pub query: QueryOptions, + pub memory: MemoryOptions, +} + +impl Default for StandaloneOptions { + fn default() -> Self { + Self { + enable_telemetry: true, + default_timezone: None, + http: HttpOptions::default(), + grpc: GrpcOptions::default(), + mysql: MysqlOptions::default(), + postgres: PostgresOptions::default(), + opentsdb: OpentsdbOptions::default(), + influxdb: InfluxdbOptions::default(), + jaeger: JaegerOptions::default(), + prom_store: PromStoreOptions::default(), + wal: DatanodeWalConfig::default(), + storage: StorageConfig::default(), + metadata_store: KvBackendConfig::default(), + procedure: ProcedureConfig::default(), + flow: FlowConfig::default(), + logging: LoggingOptions::default(), + export_metrics: ExportMetricsOption::default(), + user_provider: None, + region_engine: vec![ + RegionEngineConfig::Mito(MitoConfig::default()), + RegionEngineConfig::File(FileEngineConfig::default()), + ], + tracing: TracingOptions::default(), + init_regions_in_background: false, + init_regions_parallelism: 16, + max_in_flight_write_bytes: None, + slow_query: SlowQueryOptions::default(), + query: QueryOptions::default(), + memory: MemoryOptions::default(), + } + } +} + +impl Configurable for StandaloneOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { + Some(&["wal.broker_endpoints"]) + } +} + +/// The [`StandaloneOptions`] is only defined in `standalone` crate, +/// we don't want to make `frontend` depends on it, so impl [`Into`] +/// rather than [`From`]. +#[allow(clippy::from_over_into)] +impl Into for StandaloneOptions { + fn into(self) -> FrontendOptions { + self.frontend_options() + } +} + +impl StandaloneOptions { + pub fn frontend_options(&self) -> FrontendOptions { + let cloned_opts = self.clone(); + FrontendOptions { + default_timezone: cloned_opts.default_timezone, + http: cloned_opts.http, + grpc: cloned_opts.grpc, + mysql: cloned_opts.mysql, + postgres: cloned_opts.postgres, + opentsdb: cloned_opts.opentsdb, + influxdb: cloned_opts.influxdb, + jaeger: cloned_opts.jaeger, + prom_store: cloned_opts.prom_store, + meta_client: None, + logging: cloned_opts.logging, + user_provider: cloned_opts.user_provider, + // Handle the export metrics task run by standalone to frontend for execution + export_metrics: cloned_opts.export_metrics, + max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes, + slow_query: cloned_opts.slow_query, + ..Default::default() + } + } + + pub fn datanode_options(&self) -> DatanodeOptions { + let cloned_opts = self.clone(); + DatanodeOptions { + node_id: Some(0), + enable_telemetry: cloned_opts.enable_telemetry, + wal: cloned_opts.wal, + storage: cloned_opts.storage, + region_engine: cloned_opts.region_engine, + grpc: cloned_opts.grpc, + init_regions_in_background: cloned_opts.init_regions_in_background, + init_regions_parallelism: cloned_opts.init_regions_parallelism, + query: cloned_opts.query, + ..Default::default() + } + } +} diff --git a/src/standalone/src/procedure.rs b/src/standalone/src/procedure.rs new file mode 100644 index 0000000000..2443abf39f --- /dev/null +++ b/src/standalone/src/procedure.rs @@ -0,0 +1,45 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::runtime_switch::RuntimeSwitchManager; +use common_meta::kv_backend::KvBackendRef; +use common_meta::state_store::KvStateStore; +use common_procedure::ProcedureManagerRef; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::options::ProcedureConfig; + +/// Builds the procedure manager. +pub fn build_procedure_manager( + kv_backend: KvBackendRef, + procedure_config: ProcedureConfig, +) -> ProcedureManagerRef { + let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone())); + + let manager_config = ManagerConfig { + max_retry_times: procedure_config.max_retry_times, + retry_delay: procedure_config.retry_delay, + max_running_procedures: procedure_config.max_running_procedures, + ..Default::default() + }; + let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend)); + Arc::new(LocalManager::new( + manager_config, + kv_state_store.clone(), + kv_state_store, + Some(runtime_switch_manager), + None, + )) +} diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 9be5578aca..3112ff866e 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -77,6 +77,7 @@ sqlx = { version = "0.8", features = [ "postgres", "chrono", ] } +standalone.workspace = true substrait.workspace = true table.workspace = true tempfile.workspace = true diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2c1c296d55..29a308875c 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,7 +23,6 @@ use catalog::information_schema::NoopInformationExtension; use catalog::kvbackend::KvBackendCatalogManagerBuilder; use catalog::process_manager::ProcessManager; use cmd::error::StartFlownodeSnafu; -use cmd::standalone::StandaloneOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; @@ -53,6 +52,7 @@ use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use servers::grpc::GrpcOptions; use servers::server::ServerHandlers; use snafu::ResultExt; +use standalone::options::StandaloneOptions; use crate::test_util::{self, StorageType, TestGuard, create_tmp_dir_and_datanode_opts}; @@ -315,13 +315,14 @@ impl GreptimeDbStandaloneBuilder { let kv_backend_config = KvBackendConfig::default(); let procedure_config = ProcedureConfig::default(); - let (kv_backend, procedure_manager) = Instance::try_build_standalone_components( + + let kv_backend = standalone::build_metadata_kvbackend( format!("{}/kv", &opts.storage.data_home), kv_backend_config, - procedure_config, ) - .await .unwrap(); + let procedure_manager = + standalone::build_procedure_manager(kv_backend.clone(), procedure_config); let standalone_opts = StandaloneOptions { storage: opts.storage,