feat: extract standalone functionality and introduce plugin-based router configuration (#7002)

* feat: extract standalone functionality and introduce plugin-based router configuration

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: ensure dump file does not exist

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: introduce `External` error

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-09-22 19:21:04 +08:00
committed by GitHub
parent 03954e8b3b
commit b3d413258d
26 changed files with 731 additions and 365 deletions

38
Cargo.lock generated
View File

@@ -1951,6 +1951,7 @@ dependencies = [
"session",
"similar-asserts",
"snafu 0.8.6",
"standalone",
"stat",
"store-api",
"substrait 0.18.0",
@@ -4900,7 +4901,6 @@ dependencies = [
"humantime-serde",
"lazy_static",
"log-query",
"log-store",
"meta-client",
"num_cpus",
"opentelemetry-proto",
@@ -9457,12 +9457,14 @@ dependencies = [
"cli",
"common-base",
"common-error",
"common-meta",
"datanode",
"flow",
"frontend",
"meta-srv",
"serde",
"snafu 0.8.6",
"standalone",
]
[[package]]
@@ -12310,6 +12312,39 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "standalone"
version = "0.18.0"
dependencies = [
"async-trait",
"catalog",
"client",
"common-base",
"common-config",
"common-error",
"common-macro",
"common-meta",
"common-options",
"common-procedure",
"common-query",
"common-telemetry",
"common-time",
"common-version",
"common-wal",
"datanode",
"file-engine",
"flow",
"frontend",
"log-store",
"mito2",
"query",
"serde",
"servers",
"snafu 0.8.6",
"store-api",
"tokio",
]
[[package]]
name = "stat"
version = "0.18.0"
@@ -13025,6 +13060,7 @@ dependencies = [
"snafu 0.8.6",
"sql",
"sqlx",
"standalone",
"store-api",
"substrait 0.18.0",
"table",

View File

@@ -61,6 +61,7 @@ members = [
"src/promql",
"src/puffin",
"src/query",
"src/standalone",
"src/servers",
"src/session",
"src/sql",
@@ -308,6 +309,7 @@ query = { path = "src/query" }
servers = { path = "src/servers" }
session = { path = "src/session" }
sql = { path = "src/sql" }
standalone = { path = "src/standalone" }
stat = { path = "src/common/stat" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }

View File

@@ -54,7 +54,6 @@ common-wal.workspace = true
datanode.workspace = true
datatypes.workspace = true
etcd-client.workspace = true
file-engine.workspace = true
flow.workspace = true
frontend = { workspace = true, default-features = false }
futures.workspace = true
@@ -64,7 +63,6 @@ lazy_static.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
metric-engine.workspace = true
mito2.workspace = true
moka.workspace = true
nu-ansi-term = "0.46"
object-store.workspace = true
@@ -75,6 +73,7 @@ query.workspace = true
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
standalone.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
@@ -100,6 +99,8 @@ common-version.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true
file-engine.workspace = true
mito2.workspace = true
[target.'cfg(not(windows))'.dev-dependencies]
rexpect = "0.5"

View File

@@ -302,6 +302,20 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to build metadata kvbackend"))]
BuildMetadataKvbackend {
#[snafu(implicit)]
location: Location,
source: standalone::error::Error,
},
#[snafu(display("Failed to setup standalone plugins"))]
SetupStandalonePlugins {
#[snafu(implicit)]
location: Location,
source: standalone::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -320,6 +334,8 @@ impl ErrorExt for Error {
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::BuildCli { source, .. } => source.status_code(),
Error::StartCli { source, .. } => source.status_code(),
Error::BuildMetadataKvbackend { source, .. } => source.status_code(),
Error::SetupStandalonePlugins { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()

View File

@@ -19,71 +19,47 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::SendableRecordBatchStream;
use client::api::v1::meta::RegionRole;
use common_base::Plugins;
use common_base::readable_size::ReadableSize;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{Configurable, KvBackendConfig, metadata_store_dir};
use common_config::{Configurable, metadata_store_dir};
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::datanode::RegionStat;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_options::memory::MemoryOptions;
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_query::request::QueryRequest;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::{
DEFAULT_LOGGING_DIR, LoggingOptions, SlowQueryOptions, TracingOptions,
};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{
FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
GrpcQueryHandlerWithBoxedError,
};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::frontend::Frontend;
use frontend::instance::StandaloneDatanodeManager;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
use frontend::server::Services;
use frontend::service_config::{
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::export_metrics::ExportMetricsTask;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::RwLock;
use standalone::StandaloneInformationExtension;
use standalone::options::StandaloneOptions;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{Result, StartFlownodeSnafu};
@@ -133,131 +109,6 @@ impl SubCommand {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct StandaloneOptions {
pub enable_telemetry: bool,
pub default_timezone: Option<String>,
pub http: HttpOptions,
pub grpc: GrpcOptions,
pub mysql: MysqlOptions,
pub postgres: PostgresOptions,
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub jaeger: JaegerOptions,
pub prom_store: PromStoreOptions,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
pub flow: FlowConfig,
pub logging: LoggingOptions,
pub user_provider: Option<String>,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: SlowQueryOptions,
pub query: QueryOptions,
pub memory: MemoryOptions,
}
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
enable_telemetry: true,
default_timezone: None,
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
mysql: MysqlOptions::default(),
postgres: PostgresOptions::default(),
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
jaeger: JaegerOptions::default(),
prom_store: PromStoreOptions::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
flow: FlowConfig::default(),
logging: LoggingOptions::default(),
export_metrics: ExportMetricsOption::default(),
user_provider: None,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
],
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
slow_query: SlowQueryOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
}
}
}
impl Configurable for StandaloneOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}
/// The [`StandaloneOptions`] is only defined in cmd crate,
/// we don't want to make `frontend` depends on it, so impl [`Into`]
/// rather than [`From`].
#[allow(clippy::from_over_into)]
impl Into<FrontendOptions> for StandaloneOptions {
fn into(self) -> FrontendOptions {
self.frontend_options()
}
}
impl StandaloneOptions {
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
FrontendOptions {
default_timezone: cloned_opts.default_timezone,
http: cloned_opts.http,
grpc: cloned_opts.grpc,
mysql: cloned_opts.mysql,
postgres: cloned_opts.postgres,
opentsdb: cloned_opts.opentsdb,
influxdb: cloned_opts.influxdb,
jaeger: cloned_opts.jaeger,
prom_store: cloned_opts.prom_store,
meta_client: None,
logging: cloned_opts.logging,
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
slow_query: cloned_opts.slow_query,
..Default::default()
}
}
pub fn datanode_options(&self) -> DatanodeOptions {
let cloned_opts = self.clone();
DatanodeOptions {
node_id: Some(0),
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal,
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
query: cloned_opts.query,
..Default::default()
}
}
}
pub struct Instance {
datanode: Datanode,
frontend: Frontend,
@@ -523,13 +374,14 @@ impl StartCommand {
.context(error::CreateDirSnafu { dir: data_home })?;
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
opts.metadata_store,
opts.procedure,
)
let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
.context(error::BuildMetadataKvbackendSnafu)?;
let procedure_manager =
standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
.await
.context(error::StartFrontendSnafu)?;
.context(error::SetupStandalonePluginsSnafu)?;
// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
@@ -745,141 +597,6 @@ impl StartCommand {
}
}
pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
}
impl StandaloneInformationExtension {
pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
Self {
region_server,
procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
flow_streaming_engine: RwLock::new(None),
}
}
/// Set the flow streaming engine for the standalone instance.
pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
let mut guard = self.flow_streaming_engine.write().await;
*guard = Some(flow_streaming_engine);
}
}
#[async_trait::async_trait]
impl InformationExtension for StandaloneInformationExtension {
type Error = catalog::error::Error;
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
let build_info = common_version::build_info();
let node_info = NodeInfo {
// For the standalone:
// - id always 0
// - empty string for peer_addr
peer: Peer {
id: 0,
addr: "".to_string(),
},
last_activity_ts: -1,
status: NodeStatus::Standalone,
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
// Use `self.start_time_ms` instead.
// It's not precise but enough.
start_time_ms: self.start_time_ms,
cpus: common_config::utils::get_cpus() as u32,
memory_bytes: common_config::utils::get_sys_total_memory()
.unwrap_or_default()
.as_bytes(),
};
Ok(vec![node_info])
}
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
self.procedure_manager
.list_procedures()
.await
.map_err(BoxedError::new)
.map(|procedures| {
procedures
.into_iter()
.map(|procedure| {
let status = procedure.state.as_str_name().to_string();
(status, procedure)
})
.collect::<Vec<_>>()
})
.context(catalog::error::ListProceduresSnafu)
}
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
let stats = self
.region_server
.reportable_regions()
.into_iter()
.map(|stat| {
let region_stat = self
.region_server
.region_statistic(stat.region_id)
.unwrap_or_default();
RegionStat {
id: stat.region_id,
rcus: 0,
wcus: 0,
approximate_bytes: region_stat.estimated_disk_size(),
engine: stat.engine,
role: RegionRole::from(stat.role).into(),
num_rows: region_stat.num_rows,
memtable_size: region_stat.memtable_size,
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
sst_num: region_stat.sst_num,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
written_bytes: region_stat.written_bytes,
}
})
.collect::<Vec<_>>();
Ok(stats)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_streaming_engine
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await,
))
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
let req = QueryRequest {
plan: request
.build_plan()
.context(catalog::error::DatafusionSnafu)?,
region_id: RegionId::default(),
header: None,
};
self.region_server
.handle_read(req)
.await
.map_err(BoxedError::new)
.context(catalog::error::InternalSnafu)
}
}
#[cfg(test)]
mod tests {
use std::default::Default;
@@ -891,7 +608,9 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use frontend::frontend::FrontendOptions;
use object_store::config::{FileConfig, GcsConfig};
use servers::grpc::GrpcOptions;
use super::*;
use crate::options::GlobalOptions;

View File

@@ -15,7 +15,6 @@
use std::time::Duration;
use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
@@ -35,6 +34,7 @@ use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use standalone::options::StandaloneOptions;
use store_api::path_utils::WAL_DIR;
#[allow(deprecated)]

View File

@@ -23,12 +23,12 @@ use common_telemetry::info;
use file::{Metadata, MetadataContent};
use futures::{TryStreamExt, future};
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, ensure};
use strum::Display;
use crate::error::{
Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
Result, WriteObjectSnafu,
Result, UnexpectedSnafu, WriteObjectSnafu,
};
use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
use crate::kv_backend::KvBackendRef;
@@ -247,6 +247,20 @@ impl MetadataSnapshotManager {
let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
})?;
// Ensure the file does not exist
ensure!(
!self
.object_store
.exists(file_path)
.await
.context(ReadObjectSnafu { file_path })?,
UnexpectedSnafu {
err_msg: format!(
"The file '{}' already exists. Please choose a different name or remove the existing file before proceeding.",
file_path
),
}
);
let now = Instant::now();
let req = RangeRequest::new().with_range(vec![0], vec![0]);
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
@@ -403,6 +417,15 @@ mod tests {
.unwrap();
// Clean up the kv backend
kv_backend.clear();
let err = manager
.dump(
&dump_path.as_path().display().to_string(),
"metadata_snapshot",
)
.await
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(err.to_string().contains("already exists"));
let restore_path = dump_path
.join("metadata_snapshot.metadata.fb")

View File

@@ -50,7 +50,6 @@ humantime.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
num_cpus.workspace = true
opentelemetry-proto.workspace = true

View File

@@ -36,13 +36,6 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to open raft engine backend"))]
OpenRaftEngineBackend {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to handle heartbeat response"))]
HandleHeartbeatResponse {
#[snafu(implicit)]
@@ -420,8 +413,6 @@ impl ErrorExt for Error {
Error::Table { source, .. } | Error::Insert { source, .. } => source.status_code(),
Error::OpenRaftEngineBackend { .. } => StatusCode::StorageUnavailable,
Error::RequestQuery { source, .. } => source.status_code(),
Error::CacheRequired { .. } => StatusCode::Internal,

View File

@@ -39,30 +39,22 @@ use catalog::process_manager::{
use client::OutputData;
use common_base::Plugins;
use common_base::cancellation::CancellableFuture;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_event_recorder::EventRecorderRef;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::table_name::TableNameKey;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::procedure_executor::ProcedureExecutorRef;
use common_meta::state_store::KvStateStore;
use common_procedure::ProcedureManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_query::Output;
use common_recordbatch::RecordBatchStreamWrapper;
use common_recordbatch::error::StreamTimeoutSnafu;
use common_telemetry::logging::SlowQueryOptions;
use common_telemetry::{debug, error, info, tracing};
use common_telemetry::{debug, error, tracing};
use dashmap::DashMap;
use datafusion_expr::LogicalPlan;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::{StatementExecutor, StatementExecutorRef};
@@ -136,40 +128,6 @@ pub struct Instance {
}
impl Instance {
pub async fn try_build_standalone_components(
dir: String,
kv_backend_config: KvBackendConfig,
procedure_config: ProcedureConfig,
) -> Result<(KvBackendRef, ProcedureManagerRef)> {
info!(
"Creating metadata kvbackend with config: {:?}",
kv_backend_config
);
let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
.map_err(BoxedError::new)
.context(error::OpenRaftEngineBackendSnafu)?;
let kv_backend = Arc::new(kv_backend);
let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager),
None,
));
Ok((kv_backend, procedure_manager))
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use meta_client::MetaClientOptions;
use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
@@ -25,6 +26,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef;
use servers::http::utils::router::RouterConfigurator;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef;
use servers::metrics_handler::MetricsHandler;
@@ -115,6 +117,11 @@ where
builder = builder.with_jaeger_handler(self.instance.clone());
}
if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
info!("Adding extra router from plugins");
builder = builder.with_extra_router(configurator.router());
}
builder
}

View File

@@ -13,9 +13,11 @@ clap.workspace = true
cli.workspace = true
common-base.workspace = true
common-error.workspace = true
common-meta.workspace = true
datanode.workspace = true
flow.workspace = true
frontend.workspace = true
meta-srv.workspace = true
serde.workspace = true
snafu.workspace = true
standalone.workspace = true

View File

@@ -18,6 +18,7 @@ mod flownode;
mod frontend;
mod meta_srv;
mod options;
mod standalone;
pub use cli::SubCommand;
pub use datanode::{setup_datanode_plugins, start_datanode_plugins};
@@ -25,3 +26,4 @@ pub use flownode::{setup_flownode_plugins, start_flownode_plugins};
pub use frontend::{setup_frontend_plugins, start_frontend_plugins};
pub use meta_srv::{setup_metasrv_plugins, start_metasrv_plugins};
pub use options::PluginOptions;
pub use standalone::{setup_standalone_plugins, start_standalone_plugins};

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::Plugins;
use common_meta::kv_backend::KvBackendRef;
use standalone::error::Result;
use standalone::options::StandaloneOptions;
use crate::options::PluginOptions;
#[allow(unused_variables)]
#[allow(unused_mut)]
pub async fn setup_standalone_plugins(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
standalone_opts: &StandaloneOptions,
metadata_kvbackend: KvBackendRef,
) -> Result<()> {
Ok(())
}
pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}

View File

@@ -103,6 +103,7 @@ pub mod prom_store;
pub mod prometheus;
pub mod result;
mod timeout;
pub mod utils;
pub(crate) use timeout::DynamicTimeoutLayer;

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod router;

View File

@@ -0,0 +1,37 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Mutex};
use axum::Router;
/// A thread-safe wrapper for an Axum [`Router`] that allows for dynamic configuration.
#[derive(Default, Clone)]
pub struct RouterConfigurator(Arc<Mutex<Router>>);
impl RouterConfigurator {
/// Returns a clone of the current [`Router`].
pub fn router(&self) -> Router {
self.0.lock().unwrap().clone()
}
/// Applies a configuration function to the current [`Router`].
///
/// The provided closure receives the current router (cloned) and should return a new [`Router`].
/// The internal router is then replaced with the result.
pub fn configure(&self, f: impl FnOnce(Router) -> Router) {
let mut router = self.0.lock().unwrap();
*router = f(router.clone());
}
}

37
src/standalone/Cargo.toml Normal file
View File

@@ -0,0 +1,37 @@
[package]
name = "standalone"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
catalog.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
datanode.workspace = true
file-engine.workspace = true
flow.workspace = true
frontend.workspace = true
log-store.workspace = true
mito2.workspace = true
query.workspace = true
serde.workspace = true
servers.workspace = true
snafu.workspace = true
store-api.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to open metadata kvbackend"))]
OpenMetadataKvBackend {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::OpenMetadataKvBackend { source, .. } => source.status_code(),
Error::External { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,166 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
use client::SendableRecordBatchStream;
use client::api::v1::meta::RegionRole;
use common_error::ext::BoxedError;
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::peer::Peer;
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_query::request::QueryRequest;
use datanode::region_server::RegionServer;
use flow::StreamingEngine;
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::RwLock;
pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
}
impl StandaloneInformationExtension {
pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
Self {
region_server,
procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
flow_streaming_engine: RwLock::new(None),
}
}
/// Set the flow streaming engine for the standalone instance.
pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
let mut guard = self.flow_streaming_engine.write().await;
*guard = Some(flow_streaming_engine);
}
}
#[async_trait::async_trait]
impl InformationExtension for StandaloneInformationExtension {
type Error = catalog::error::Error;
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
let build_info = common_version::build_info();
let node_info = NodeInfo {
// For the standalone:
// - id always 0
// - empty string for peer_addr
peer: Peer {
id: 0,
addr: "".to_string(),
},
last_activity_ts: -1,
status: NodeStatus::Standalone,
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
// Use `self.start_time_ms` instead.
// It's not precise but enough.
start_time_ms: self.start_time_ms,
cpus: common_config::utils::get_cpus() as u32,
memory_bytes: common_config::utils::get_sys_total_memory()
.unwrap_or_default()
.as_bytes(),
};
Ok(vec![node_info])
}
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
self.procedure_manager
.list_procedures()
.await
.map_err(BoxedError::new)
.map(|procedures| {
procedures
.into_iter()
.map(|procedure| {
let status = procedure.state.as_str_name().to_string();
(status, procedure)
})
.collect::<Vec<_>>()
})
.context(catalog::error::ListProceduresSnafu)
}
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
let stats = self
.region_server
.reportable_regions()
.into_iter()
.map(|stat| {
let region_stat = self
.region_server
.region_statistic(stat.region_id)
.unwrap_or_default();
RegionStat {
id: stat.region_id,
rcus: 0,
wcus: 0,
approximate_bytes: region_stat.estimated_disk_size(),
engine: stat.engine,
role: RegionRole::from(stat.role).into(),
num_rows: region_stat.num_rows,
memtable_size: region_stat.memtable_size,
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
sst_num: region_stat.sst_num,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
written_bytes: region_stat.written_bytes,
}
})
.collect::<Vec<_>>();
Ok(stats)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_streaming_engine
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await,
))
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
let req = QueryRequest {
plan: request
.build_plan()
.context(catalog::error::DatafusionSnafu)?,
region_id: RegionId::default(),
header: None,
};
self.region_server
.handle_read(req)
.await
.map_err(BoxedError::new)
.context(catalog::error::InternalSnafu)
}
}

23
src/standalone/src/lib.rs Normal file
View File

@@ -0,0 +1,23 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod error;
pub mod information_extension;
pub mod metadata;
pub mod options;
pub mod procedure;
pub use information_extension::StandaloneInformationExtension;
pub use metadata::build_metadata_kvbackend;
pub use procedure::build_procedure_manager;

View File

@@ -0,0 +1,37 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackendRef;
use common_telemetry::info;
use log_store::raft_engine::RaftEngineBackend;
use snafu::ResultExt;
use crate::error::{OpenMetadataKvBackendSnafu, Result};
/// Builds the metadata kvbackend.
pub fn build_metadata_kvbackend(dir: String, config: KvBackendConfig) -> Result<KvBackendRef> {
info!(
"Creating metadata kvbackend with dir: {}, config: {:?}",
dir, config
);
let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &config)
.map_err(BoxedError::new)
.context(OpenMetadataKvBackendSnafu)?;
Ok(Arc::new(kv_backend))
}

View File

@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::readable_size::ReadableSize;
use common_config::{Configurable, KvBackendConfig};
use common_options::memory::MemoryOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlowConfig;
use frontend::frontend::FrontendOptions;
use frontend::service_config::{
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
PromStoreOptions,
};
use mito2::config::MitoConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct StandaloneOptions {
pub enable_telemetry: bool,
pub default_timezone: Option<String>,
pub http: HttpOptions,
pub grpc: GrpcOptions,
pub mysql: MysqlOptions,
pub postgres: PostgresOptions,
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub jaeger: JaegerOptions,
pub prom_store: PromStoreOptions,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
pub flow: FlowConfig,
pub logging: LoggingOptions,
pub user_provider: Option<String>,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: SlowQueryOptions,
pub query: QueryOptions,
pub memory: MemoryOptions,
}
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
enable_telemetry: true,
default_timezone: None,
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
mysql: MysqlOptions::default(),
postgres: PostgresOptions::default(),
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
jaeger: JaegerOptions::default(),
prom_store: PromStoreOptions::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
flow: FlowConfig::default(),
logging: LoggingOptions::default(),
export_metrics: ExportMetricsOption::default(),
user_provider: None,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
],
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
slow_query: SlowQueryOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
}
}
}
impl Configurable for StandaloneOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}
/// The [`StandaloneOptions`] is only defined in `standalone` crate,
/// we don't want to make `frontend` depends on it, so impl [`Into`]
/// rather than [`From`].
#[allow(clippy::from_over_into)]
impl Into<FrontendOptions> for StandaloneOptions {
fn into(self) -> FrontendOptions {
self.frontend_options()
}
}
impl StandaloneOptions {
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
FrontendOptions {
default_timezone: cloned_opts.default_timezone,
http: cloned_opts.http,
grpc: cloned_opts.grpc,
mysql: cloned_opts.mysql,
postgres: cloned_opts.postgres,
opentsdb: cloned_opts.opentsdb,
influxdb: cloned_opts.influxdb,
jaeger: cloned_opts.jaeger,
prom_store: cloned_opts.prom_store,
meta_client: None,
logging: cloned_opts.logging,
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
slow_query: cloned_opts.slow_query,
..Default::default()
}
}
pub fn datanode_options(&self) -> DatanodeOptions {
let cloned_opts = self.clone();
DatanodeOptions {
node_id: Some(0),
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal,
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
query: cloned_opts.query,
..Default::default()
}
}
}

View File

@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
use common_procedure::ProcedureManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
/// Builds the procedure manager.
pub fn build_procedure_manager(
kv_backend: KvBackendRef,
procedure_config: ProcedureConfig,
) -> ProcedureManagerRef {
let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager),
None,
))
}

View File

@@ -77,6 +77,7 @@ sqlx = { version = "0.8", features = [
"postgres",
"chrono",
] }
standalone.workspace = true
substrait.workspace = true
table.workspace = true
tempfile.workspace = true

View File

@@ -23,7 +23,6 @@ use catalog::information_schema::NoopInformationExtension;
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::process_manager::ProcessManager;
use cmd::error::StartFlownodeSnafu;
use cmd::standalone::StandaloneOptions;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::KvBackendConfig;
@@ -53,6 +52,7 @@ use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use servers::grpc::GrpcOptions;
use servers::server::ServerHandlers;
use snafu::ResultExt;
use standalone::options::StandaloneOptions;
use crate::test_util::{self, StorageType, TestGuard, create_tmp_dir_and_datanode_opts};
@@ -315,13 +315,14 @@ impl GreptimeDbStandaloneBuilder {
let kv_backend_config = KvBackendConfig::default();
let procedure_config = ProcedureConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
let kv_backend = standalone::build_metadata_kvbackend(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config,
procedure_config,
)
.await
.unwrap();
let procedure_manager =
standalone::build_procedure_manager(kv_backend.clone(), procedure_config);
let standalone_opts = StandaloneOptions {
storage: opts.storage,