chore: add http metrics server in datanode node when greptime start in distributed mode (#1256)

* chore: add http metrics server in datanode node when greptime start in distributed mode

* chore: add some docs and license

* chore: change metrics_addr to resolve address already in use error

* chore add metrics for meta service

* chore: replace metrics exporter http server from hyper to axum

* chore: format

* fix: datanode mode branching error

* fix: sqlness test address already in use and start metrics in defualt config

* chore: change metrics location

* chore: use builder pattern to builder httpserver

* chore: remove useless debug_assert macro in httpserver builder

* chore: resolve conflicting build error

* chore: format code
This commit is contained in:
localhost
2023-03-31 18:37:52 +08:00
committed by GitHub
parent 972f64c3d7
commit a2b262ebc0
34 changed files with 342 additions and 151 deletions

1
Cargo.lock generated
View File

@@ -4136,6 +4136,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"servers",
"snafu",
"table",
"tokio",

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{
@@ -86,6 +88,10 @@ struct StartCommand {
wal_dir: Option<String>,
#[clap(long)]
procedure_dir: Option<String>,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
http_timeout: Option<u64>,
}
impl StartCommand {
@@ -155,6 +161,12 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}
if let Some(http_addr) = cmd.http_addr {
opts.http_opts.addr = http_addr
}
if let Some(http_timeout) = cmd.http_timeout {
opts.http_opts.timeout = Duration::from_secs(http_timeout)
}
Ok(opts)
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use clap::Parser;
use common_telemetry::{info, logging, warn};
use meta_srv::bootstrap::MetaSrvInstance;
@@ -80,6 +82,10 @@ struct StartCommand {
selector: Option<String>,
#[clap(long)]
use_memory_store: bool,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
http_timeout: Option<u64>,
}
impl StartCommand {
@@ -128,6 +134,13 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
opts.use_memory_store = true;
}
if let Some(http_addr) = cmd.http_addr {
opts.http_opts.addr = http_addr;
}
if let Some(http_timeout) = cmd.http_timeout {
opts.http_opts.timeout = Duration::from_secs(http_timeout);
}
Ok(opts)
}
}
@@ -150,6 +163,8 @@ mod tests {
config_file: None,
selector: Some("LoadBased".to_string()),
use_memory_store: false,
http_addr: None,
http_timeout: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
@@ -178,6 +193,8 @@ mod tests {
selector: None,
config_file: Some(file.path().to_str().unwrap().to_string()),
use_memory_store: false,
http_addr: None,
http_timeout: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);

View File

@@ -14,7 +14,7 @@
pub mod error;
mod global;
pub mod metric;
mod metrics;
mod repeated_task;
pub mod runtime;

View File

@@ -24,7 +24,7 @@ use tokio::sync::oneshot;
pub use tokio::task::{JoinError, JoinHandle};
use crate::error::*;
use crate::metric::*;
use crate::metrics::*;
/// A runtime to run future tasks
#[derive(Clone, Debug)]

View File

@@ -12,7 +12,7 @@ deadlock_detection = ["parking_lot"]
backtrace = "0.3"
common-error = { path = "../error" }
console-subscriber = { version = "0.1", optional = true }
metrics = "0.20"
metrics = "0.20.1"
metrics-exporter-prometheus = { version = "0.11", default-features = false }
once_cell = "1.10"
opentelemetry = { version = "0.17", default-features = false, features = [

View File

@@ -19,6 +19,7 @@ use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::SchedulerConfig;
@@ -224,6 +225,7 @@ pub struct DatanodeOptions {
pub rpc_runtime_size: usize,
pub mysql_addr: String,
pub mysql_runtime_size: usize,
pub http_opts: HttpOptions,
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
@@ -241,6 +243,7 @@ impl Default for DatanodeOptions {
rpc_runtime_size: 8,
mysql_addr: "127.0.0.1:4406".to_string(),
mysql_runtime_size: 2,
http_opts: HttpOptions::default(),
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
@@ -252,14 +255,17 @@ impl Default for DatanodeOptions {
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
services: Services,
services: Option<Services>,
instance: InstanceRef,
}
impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::new(&opts).await?);
let services = Services::try_new(instance.clone(), &opts).await?;
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,
};
Ok(Self {
opts,
services,
@@ -280,7 +286,11 @@ impl Datanode {
/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
self.services.start(&self.opts).await
if let Some(service) = self.services.as_mut() {
service.start(&self.opts).await
} else {
Ok(())
}
}
pub fn get_instance(&self) -> InstanceRef {
@@ -292,7 +302,11 @@ impl Datanode {
}
async fn shutdown_services(&self) -> Result<()> {
self.services.shutdown().await
if let Some(service) = self.services.as_ref() {
service.shutdown().await
} else {
Ok(())
}
}
pub async fn shutdown(&self) -> Result<()> {

View File

@@ -20,7 +20,7 @@ use common_telemetry::timer;
use servers::query_handler::ScriptHandler;
use crate::instance::Instance;
use crate::metric;
use crate::metrics;
#[async_trait]
impl ScriptHandler for Instance {
@@ -30,7 +30,7 @@ impl ScriptHandler for Instance {
name: &str,
script: &str,
) -> servers::error::Result<()> {
let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED);
self.script_executor
.insert_script(schema, name, script)
.await
@@ -42,7 +42,7 @@ impl ScriptHandler for Instance {
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED);
let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED);
self.script_executor
.execute_script(schema, name, params)
.await

View File

@@ -37,7 +37,7 @@ use crate::error::{
TableIdProviderNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metric;
use crate::metrics;
use crate::sql::{SqlHandler, SqlRequest};
impl Instance {
@@ -190,7 +190,7 @@ impl Instance {
promql: &PromQuery,
query_ctx: QueryContextRef,
) -> Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED);
let stmt = QueryLanguageParser::parse_promql(promql).context(ExecuteSqlSnafu)?;
@@ -294,7 +294,7 @@ impl StatementHandler for Instance {
#[async_trait]
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED);
self.execute_promql(query, QueryContext::arc())
.await

View File

@@ -19,7 +19,7 @@ pub mod datanode;
pub mod error;
mod heartbeat;
pub mod instance;
pub mod metric;
pub mod metrics;
mod mock;
mod script;
pub mod server;

View File

@@ -18,9 +18,12 @@ use std::sync::Arc;
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::server::Server;
use snafu::ResultExt;
use tokio::select;
use crate::datanode::DatanodeOptions;
use crate::error::{
@@ -33,6 +36,7 @@ pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
http_server: HttpServer,
}
impl Services {
@@ -51,6 +55,9 @@ impl Services {
None,
grpc_runtime,
),
http_server: HttpServerBuilder::new(opts.http_opts.clone())
.with_metrics_handler(MetricsHandler)
.build(),
})
}
@@ -58,10 +65,15 @@ impl Services {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
self.grpc_server
.start(grpc_addr)
.await
.context(StartServerSnafu)?;
let http_addr = opts.http_opts.addr.parse().context(ParseAddrSnafu {
addr: &opts.http_opts.addr,
})?;
let grpc = self.grpc_server.start(grpc_addr);
let http = self.http_server.start(http_addr);
select!(
v = grpc => v.context(StartServerSnafu)?,
v = http => v.context(StartServerSnafu)?,
);
Ok(())
}
@@ -69,6 +81,11 @@ impl Services {
self.grpc_server
.shutdown()
.await
.context(ShutdownServerSnafu)
.context(ShutdownServerSnafu)?;
self.http_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
}

View File

@@ -40,7 +40,7 @@ use common_telemetry::timer;
use datafusion::sql::sqlparser::ast::ObjectName;
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datanode::metric;
use datanode::metrics;
use datatypes::schema::Schema;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
@@ -532,7 +532,7 @@ impl SqlQueryHandler for Instance {
type Error = Error;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED);
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {

View File

@@ -22,7 +22,7 @@ use common_telemetry::info;
use servers::auth::UserProviderRef;
use servers::error::Error::InternalIo;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::http::HttpServerBuilder;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
@@ -150,33 +150,33 @@ impl Services {
if let Some(http_options) = &opts.http_options {
let http_addr = parse_addr(&http_options.addr)?;
let mut http_server = HttpServer::new(
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
http_options.clone(),
);
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
http_server_builder
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));
if let Some(user_provider) = user_provider.clone() {
http_server.set_user_provider(user_provider);
http_server_builder.with_user_provider(user_provider);
}
if set_opentsdb_handler {
http_server.set_opentsdb_handler(instance.clone());
http_server_builder.with_opentsdb_handler(instance.clone());
}
if matches!(
opts.influxdb_options,
Some(InfluxdbOptions { enable: true })
) {
http_server.set_influxdb_handler(instance.clone());
http_server_builder.with_influxdb_handler(instance.clone());
}
if matches!(
opts.prometheus_options,
Some(PrometheusOptions { enable: true })
) {
http_server.set_prom_handler(instance.clone());
http_server_builder.with_prom_handler(instance.clone());
}
http_server.set_script_handler(instance.clone());
http_server_builder.with_script_handler(instance.clone());
let http_server = http_server_builder.build();
result.push((Box::new(http_server), http_addr));
}

View File

@@ -41,6 +41,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic.workspace = true
tower = "0.4"
url = "2.3"
servers = { path = "../servers" }
[dev-dependencies]
tracing = "0.1"

View File

@@ -20,8 +20,12 @@ use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use etcd_client::Client;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;
@@ -44,6 +48,8 @@ use crate::{error, Result};
pub struct MetaSrvInstance {
meta_srv: MetaSrv,
http_srv: Arc<HttpServer>,
opts: MetaSrvOptions,
signal_sender: Option<Sender<()>>,
@@ -52,9 +58,14 @@ pub struct MetaSrvInstance {
impl MetaSrvInstance {
pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts).await?;
let http_srv = Arc::new(
HttpServerBuilder::new(opts.http_opts.clone())
.with_metrics_handler(MetricsHandler)
.build(),
);
Ok(MetaSrvInstance {
meta_srv,
http_srv,
opts,
signal_sender: None,
})
@@ -67,12 +78,24 @@ impl MetaSrvInstance {
self.signal_sender = Some(tx);
bootstrap_meta_srv_with_router(
let meta_srv = bootstrap_meta_srv_with_router(
&self.opts.bind_addr,
router(self.meta_srv.clone()),
&mut rx,
)
.await?;
);
let addr = self
.opts
.http_opts
.addr
.parse()
.context(error::ParseAddrSnafu {
addr: &self.opts.http_opts.addr,
})?;
let http_srv = self.http_srv.start(addr);
select! {
v = meta_srv => v?,
v = http_srv => v.map(|_| ()).context(error::StartMetricsExportSnafu)?,
}
Ok(())
}
@@ -86,7 +109,12 @@ impl MetaSrvInstance {
}
self.meta_srv.shutdown();
self.http_srv
.shutdown()
.await
.context(error::ShutdownServerSnafu {
server: self.http_srv.name(),
})?;
Ok(())
}
}

View File

@@ -25,6 +25,13 @@ pub enum Error {
#[snafu(display("Failed to send shutdown signal"))]
SendShutdownSignal { source: SendError<()> },
#[snafu(display("Failed to shutdown {} server, source: {}", server, source))]
ShutdownServer {
#[snafu(backtrace)]
source: servers::error::Error,
server: String,
},
#[snafu(display("Error stream request next is None"))]
StreamNone { backtrace: Backtrace },
@@ -55,7 +62,16 @@ pub enum Error {
source: tonic::transport::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to start gRPC server, source: {}", source))]
StartMetricsExport {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
source: std::net::AddrParseError,
},
#[snafu(display("Empty table name"))]
EmptyTableName { backtrace: Backtrace },
@@ -323,6 +339,7 @@ impl ErrorExt for Error {
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::ParseAddr { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
@@ -348,6 +365,9 @@ impl ErrorExt for Error {
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::MetaInternal { source } => source.status_code(),
Error::RecoverProcedure { source } => source.status_code(),
Error::ShutdownServer { source, .. } | Error::StartMetricsExport { source } => {
source.status_code()
}
}
}
}

View File

@@ -21,6 +21,7 @@ use api::v1::meta::Peer;
use common_procedure::ProcedureManagerRef;
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use snafu::ResultExt;
use tokio::sync::broadcast::error::RecvError;
@@ -44,6 +45,7 @@ pub struct MetaSrvOptions {
pub datanode_lease_secs: i64,
pub selector: SelectorType,
pub use_memory_store: bool,
pub http_opts: HttpOptions,
}
impl Default for MetaSrvOptions {
@@ -55,6 +57,7 @@ impl Default for MetaSrvOptions {
datanode_lease_secs: 15,
selector: SelectorType::default(),
use_memory_store: false,
http_opts: HttpOptions::default(),
}
}
}

View File

@@ -58,7 +58,7 @@ use crate::physical_planner::PhysicalPlanner;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{QueryEngineContext, QueryEngineState};
use crate::{metric, QueryEngine};
use crate::{metrics, QueryEngine};
pub struct DatafusionQueryEngine {
state: Arc<QueryEngineState>,
@@ -254,7 +254,7 @@ impl QueryEngine for DatafusionQueryEngine {
impl LogicalOptimizer for DatafusionQueryEngine {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED);
let _timer = timer!(metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED);
match plan {
LogicalPlan::DfPlan(df_plan) => {
let optimized_plan = self
@@ -280,7 +280,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>> {
let _timer = timer!(metric::METRIC_CREATE_PHYSICAL_ELAPSED);
let _timer = timer!(metrics::METRIC_CREATE_PHYSICAL_ELAPSED);
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();
@@ -315,7 +315,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
ctx: &mut QueryEngineContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>> {
let _timer = timer!(metric::METRIC_OPTIMIZE_PHYSICAL_ELAPSED);
let _timer = timer!(metrics::METRIC_OPTIMIZE_PHYSICAL_ELAPSED);
let mut new_plan = plan
.as_any()
@@ -342,7 +342,7 @@ impl QueryExecutor for DatafusionQueryEngine {
ctx: &QueryEngineContext,
plan: &Arc<dyn PhysicalPlan>,
) -> Result<SendableRecordBatchStream> {
let _timer = timer!(metric::METRIC_EXEC_PLAN_ELAPSED);
let _timer = timer!(metrics::METRIC_EXEC_PLAN_ELAPSED);
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => Ok(plan

View File

@@ -16,7 +16,7 @@ pub mod datafusion;
pub mod error;
pub mod executor;
pub mod logical_optimizer;
mod metric;
mod metrics;
mod optimizer;
pub mod parser;
pub mod physical_optimizer;

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! query engine metrics
pub static METRIC_PARSE_SQL_ELAPSED: &str = "query.parse_sql_elapsed";
pub static METRIC_PARSE_PROMQL_ELAPSED: &str = "query.parse_promql_elapsed";
pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_elapsed";

View File

@@ -28,7 +28,7 @@ use sql::statements::statement::Statement;
use crate::error::{
MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result,
};
use crate::metric::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m

View File

@@ -60,6 +60,7 @@ use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
use crate::auth::UserProviderRef;
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
use crate::http::admin::flush;
use crate::metrics_handler::MetricsHandler;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
@@ -99,9 +100,10 @@ pub const HTTP_API_PREFIX: &str = "/v1/";
// TODO(fys): This is a temporary workaround, it will be improved later
pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
#[derive(Default)]
pub struct HttpServer {
sql_handler: ServerSqlQueryHandlerRef,
grpc_handler: ServerGrpcQueryHandlerRef,
sql_handler: Option<ServerSqlQueryHandlerRef>,
grpc_handler: Option<ServerGrpcQueryHandlerRef>,
options: HttpOptions,
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
@@ -109,9 +111,11 @@ pub struct HttpServer {
script_handler: Option<ScriptHandlerRef>,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
metrics_handler: Option<MetricsHandler>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct HttpOptions {
pub addr: String,
#[serde(with = "humantime_serde")]
@@ -354,65 +358,74 @@ pub struct ApiState {
pub script_handler: Option<ScriptHandlerRef>,
}
impl HttpServer {
pub fn new(
sql_handler: ServerSqlQueryHandlerRef,
grpc_handler: ServerGrpcQueryHandlerRef,
options: HttpOptions,
) -> Self {
#[derive(Default)]
pub struct HttpServerBuilder {
inner: HttpServer,
}
impl HttpServerBuilder {
pub fn new(options: HttpOptions) -> Self {
Self {
sql_handler,
grpc_handler,
options,
opentsdb_handler: None,
influxdb_handler: None,
prom_handler: None,
user_provider: None,
script_handler: None,
shutdown_tx: Mutex::new(None),
inner: HttpServer {
sql_handler: None,
grpc_handler: None,
options,
opentsdb_handler: None,
influxdb_handler: None,
prom_handler: None,
user_provider: None,
script_handler: None,
metrics_handler: None,
shutdown_tx: Mutex::new(None),
},
}
}
pub fn set_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) {
debug_assert!(
self.opentsdb_handler.is_none(),
"OpenTSDB handler can be set only once!"
);
self.opentsdb_handler.get_or_insert(handler);
pub fn with_sql_handler(&mut self, handler: ServerSqlQueryHandlerRef) -> &mut Self {
self.inner.sql_handler.get_or_insert(handler);
self
}
pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) {
debug_assert!(
self.script_handler.is_none(),
"Script handler can be set only once!"
);
self.script_handler.get_or_insert(handler);
pub fn with_grpc_handler(&mut self, handler: ServerGrpcQueryHandlerRef) -> &mut Self {
self.inner.grpc_handler.get_or_insert(handler);
self
}
pub fn set_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) {
debug_assert!(
self.influxdb_handler.is_none(),
"Influxdb line protocol handler can be set only once!"
);
self.influxdb_handler.get_or_insert(handler);
pub fn with_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) -> &mut Self {
self.inner.opentsdb_handler.get_or_insert(handler);
self
}
pub fn set_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) {
debug_assert!(
self.prom_handler.is_none(),
"Prometheus protocol handler can be set only once!"
);
self.prom_handler.get_or_insert(handler);
pub fn with_script_handler(&mut self, handler: ScriptHandlerRef) -> &mut Self {
self.inner.script_handler.get_or_insert(handler);
self
}
pub fn set_user_provider(&mut self, user_provider: UserProviderRef) {
debug_assert!(
self.user_provider.is_none(),
"User provider can be set only once!"
);
self.user_provider.get_or_insert(user_provider);
pub fn with_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) -> &mut Self {
self.inner.influxdb_handler.get_or_insert(handler);
self
}
pub fn with_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) -> &mut Self {
self.inner.prom_handler.get_or_insert(handler);
self
}
pub fn with_user_provider(&mut self, user_provider: UserProviderRef) -> &mut Self {
self.inner.user_provider.get_or_insert(user_provider);
self
}
pub fn with_metrics_handler(&mut self, handler: MetricsHandler) -> &mut Self {
self.inner.metrics_handler.get_or_insert(handler);
self
}
pub fn build(&mut self) -> HttpServer {
std::mem::take(self).inner
}
}
impl HttpServer {
pub fn make_app(&self) -> Router {
let mut api = OpenApi {
info: Info {
@@ -428,19 +441,25 @@ impl HttpServer {
..OpenApi::default()
};
let sql_router = self
.route_sql(ApiState {
sql_handler: self.sql_handler.clone(),
script_handler: self.script_handler.clone(),
})
.finish_api(&mut api)
.layer(Extension(api));
let mut router = Router::new();
let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router);
router = router.nest(
&format!("/{HTTP_API_VERSION}/admin"),
self.route_admin(self.grpc_handler.clone()),
);
if let Some(sql_handler) = self.sql_handler.clone() {
let sql_router = self
.route_sql(ApiState {
sql_handler,
script_handler: self.script_handler.clone(),
})
.finish_api(&mut api)
.layer(Extension(api));
router = router.nest(&format!("/{HTTP_API_VERSION}"), sql_router);
}
if let Some(grpc_handler) = self.grpc_handler.clone() {
router = router.nest(
&format!("/{HTTP_API_VERSION}/admin"),
self.route_admin(grpc_handler.clone()),
);
}
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
router = router.nest(
@@ -472,7 +491,9 @@ impl HttpServer {
);
}
router = router.route("/metrics", routing::get(handler::metrics));
if let Some(metrics_handler) = self.metrics_handler {
router = router.nest("", self.route_metrics(metrics_handler));
}
router = router.route(
"/health",
@@ -498,6 +519,12 @@ impl HttpServer {
)
}
fn route_metrics<S>(&self, metrics_handler: MetricsHandler) -> Router<S> {
Router::new()
.route("/metrics", routing::get(handler::metrics))
.with_state(metrics_handler)
}
fn route_sql<S>(&self, api_state: ApiState) -> ApiRouter<S> {
ApiRouter::new()
.api_route(
@@ -680,8 +707,10 @@ mod test {
let instance = Arc::new(DummyInstance { _tx: tx });
let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance);
let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default());
let server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(sql_instance)
.with_grpc_handler(grpc_instance)
.build();
server.make_app().route(
"/test/timeout",
get(forever.layer(

View File

@@ -19,13 +19,13 @@ use aide::transform::TransformOperation;
use axum::extract::{Json, Query, State};
use axum::{Extension, Form};
use common_error::status_code::StatusCode;
use common_telemetry::metric;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::UserInfo;
use crate::http::{ApiState, JsonResponse};
use crate::metrics_handler::MetricsHandler;
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SqlQuery {
@@ -114,12 +114,11 @@ pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation {
/// Handler to export metrics
#[axum_macros::debug_handler]
pub async fn metrics(Query(_params): Query<HashMap<String, String>>) -> String {
if let Some(handle) = metric::try_handle() {
handle.render()
} else {
"Prometheus handle not initialized.".to_owned()
}
pub async fn metrics(
State(state): State<MetricsHandler>,
Query(_params): Query<HashMap<String, String>>,
) -> String {
state.render()
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]

View File

@@ -25,6 +25,7 @@ pub mod http;
pub mod influxdb;
pub mod interceptor;
pub mod line_writer;
pub mod metrics_handler;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;

View File

@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::metric;
/// a server that serves metrics
/// only start when datanode starts in distributed mode
#[derive(Copy, Clone)]
pub struct MetricsHandler;
impl MetricsHandler {
pub fn render(&self) -> String {
if let Some(handle) = metric::try_handle() {
handle.render()
} else {
"Prometheus handle not initialized.".to_owned()
}
}
}

View File

@@ -20,6 +20,7 @@ use axum::Form;
use common_telemetry::metric;
use metrics::counter;
use servers::http::{handler as http_handler, script as script_handler, ApiState, JsonOutput};
use servers::metrics_handler::MetricsHandler;
use session::context::UserInfo;
use table::test_util::MemTable;
@@ -146,8 +147,8 @@ async fn test_metrics() {
metric::init_default_metrics_recorder();
counter!("test_metrics", 1);
let text = http_handler::metrics(Query(HashMap::default())).await;
let stats = MetricsHandler;
let text = http_handler::metrics(axum::extract::State(stats), Query(HashMap::default())).await;
assert!(text.contains("test_metrics counter"));
}

View File

@@ -14,17 +14,20 @@
use axum::Router;
use axum_test_helper::TestClient;
use servers::http::{HttpOptions, HttpServer};
use servers::http::{HttpOptions, HttpServerBuilder};
use table::test_util::MemTable;
use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler};
fn make_test_app() -> Router {
let server = HttpServer::new(
create_testing_sql_query_handler(MemTable::default_numbers_table()),
create_testing_grpc_query_handler(MemTable::default_numbers_table()),
HttpOptions::default(),
);
let server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(create_testing_sql_query_handler(
MemTable::default_numbers_table(),
))
.with_grpc_handler(create_testing_grpc_query_handler(
MemTable::default_numbers_table(),
))
.build();
server.make_app()
}

View File

@@ -23,7 +23,7 @@ use common_query::Output;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::error::{Error, Result};
use servers::http::{HttpOptions, HttpServer};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
@@ -94,7 +94,9 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default());
let mut server_builder = HttpServerBuilder::new(HttpOptions::default());
server_builder.with_sql_handler(instance.clone());
server_builder.with_grpc_handler(instance.clone());
let mut user_provider = MockUserProvider::default();
if let Some(name) = db_name {
user_provider.set_authorization_info(DatabaseAuthInfo {
@@ -103,9 +105,10 @@ fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>)
username: "greptime",
})
}
server.set_user_provider(Arc::new(user_provider));
server_builder.with_user_provider(Arc::new(user_provider));
server.set_influxdb_handler(instance);
server_builder.with_influxdb_handler(instance);
let server = server_builder.build();
server.make_app()
}

View File

@@ -22,7 +22,7 @@ use common_query::Output;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::error::{self, Result};
use servers::http::{HttpOptions, HttpServer};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
@@ -92,8 +92,11 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default());
server.set_opentsdb_handler(instance);
let server = HttpServerBuilder::new(HttpOptions::default())
.with_grpc_handler(instance.clone())
.with_sql_handler(instance.clone())
.with_opentsdb_handler(instance)
.build();
server.make_app()
}

View File

@@ -26,7 +26,7 @@ use datatypes::schema::Schema;
use prost::Message;
use query::parser::PromQuery;
use servers::error::{Error, Result};
use servers::http::{HttpOptions, HttpServer};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::prometheus;
use servers::prometheus::{snappy_compress, Metrics};
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -117,8 +117,11 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default());
server.set_prom_handler(instance);
let server = HttpServerBuilder::new(HttpOptions::default())
.with_grpc_handler(instance.clone())
.with_sql_handler(instance.clone())
.with_prom_handler(instance)
.build();
server.make_app()
}

View File

@@ -38,7 +38,8 @@ use object_store::ObjectStore;
use once_cell::sync::OnceCell;
use rand::Rng;
use servers::grpc::GrpcServer;
use servers::http::{HttpOptions, HttpServer};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::prom::PromServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
@@ -271,11 +272,13 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
)
.await
.unwrap();
let http_server = HttpServer::new(
ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance.clone()))),
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
HttpOptions::default(),
);
let http_server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new(
build_frontend_instance(instance.clone()),
)))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()))
.with_metrics_handler(MetricsHandler)
.build();
(http_server.make_app(), guard)
}
@@ -295,12 +298,11 @@ pub async fn setup_test_http_app_with_frontend(
.await
.unwrap();
let frontend_ref = Arc::new(frontend);
let mut http_server = HttpServer::new(
ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()),
ServerGrpcQueryHandlerAdaptor::arc(frontend_ref),
HttpOptions::default(),
);
http_server.set_script_handler(instance.clone());
let http_server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref))
.with_script_handler(instance.clone())
.build();
let app = http_server.make_app();
(app, guard)
}

View File

@@ -161,10 +161,16 @@ impl Env {
"datanode" | "standalone" => {
args.push("-c".to_string());
args.push(Self::generate_config_file(subcommand, db_ctx));
args.push("--http-addr=0.0.0.0:5001".to_string());
}
"frontend" => {
args.push("--metasrv-addr=0.0.0.0:3002".to_string());
args.push("--http-addr=0.0.0.0:5000".to_string());
}
"metasrv" => {
args.push("--use-memory-store".to_string());
args.push("--http-addr=0.0.0.0:5002".to_string());
}
"frontend" => args.push("--metasrv-addr=0.0.0.0:3002".to_string()),
"metasrv" => args.push("--use-memory-store".to_string()),
_ => panic!("Unexpected subcommand: {subcommand}"),
}