feat: specify prom server start addr (#1111)

* feat: specify promql server start addr

* refactor: rename promql to prom in Prometheus API server scenario
This commit is contained in:
yuanbohan
2023-03-06 11:07:21 +08:00
committed by GitHub
parent b022556b79
commit d4e0dc3685
15 changed files with 97 additions and 73 deletions

View File

@@ -23,6 +23,7 @@ use frontend::instance::Instance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prom::PromOptions;
use meta_client::MetaClientOptions;
use servers::auth::UserProviderRef;
use servers::http::HttpOptions;
@@ -67,6 +68,8 @@ pub struct StartCommand {
#[clap(long)]
mysql_addr: Option<String>,
#[clap(long)]
prom_addr: Option<String>,
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
opentsdb_addr: Option<String>,
@@ -141,6 +144,9 @@ impl TryFrom<StartCommand> for FrontendOptions {
..Default::default()
});
}
if let Some(addr) = cmd.prom_addr {
opts.prom_options = Some(PromOptions { addr });
}
if let Some(addr) = cmd.postgres_addr {
opts.postgres_options = Some(PostgresOptions {
addr,
@@ -186,6 +192,7 @@ mod tests {
let command = StartCommand {
http_addr: Some("127.0.0.1:1234".to_string()),
grpc_addr: None,
prom_addr: Some("127.0.0.1:4444".to_string()),
mysql_addr: Some("127.0.0.1:5678".to_string()),
postgres_addr: Some("127.0.0.1:5432".to_string()),
opentsdb_addr: Some("127.0.0.1:4321".to_string()),
@@ -209,6 +216,7 @@ mod tests {
opts.opentsdb_options.as_ref().unwrap().addr,
"127.0.0.1:4321"
);
assert_eq!(opts.prom_options.as_ref().unwrap().addr, "127.0.0.1:4444");
let default_opts = FrontendOptions::default();
assert_eq!(
@@ -247,6 +255,7 @@ mod tests {
http_addr: None,
grpc_addr: None,
mysql_addr: None,
prom_addr: None,
postgres_addr: None,
opentsdb_addr: None,
influxdb_enable: None,
@@ -276,6 +285,7 @@ mod tests {
http_addr: None,
grpc_addr: None,
mysql_addr: None,
prom_addr: None,
postgres_addr: None,
opentsdb_addr: None,
influxdb_enable: None,

View File

@@ -28,8 +28,8 @@ use frontend::instance::Instance as FeInstance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prom::PromOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::promql::PromqlOptions;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -77,7 +77,7 @@ pub struct StandaloneOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
@@ -96,7 +96,7 @@ impl Default for StandaloneOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
@@ -116,7 +116,7 @@ impl StandaloneOptions {
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prometheus_options: self.prometheus_options,
promql_options: self.promql_options,
prom_options: self.prom_options,
meta_client_options: None,
}
}
@@ -142,6 +142,8 @@ struct StartCommand {
#[clap(long)]
mysql_addr: Option<String>,
#[clap(long)]
prom_addr: Option<String>,
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
opentsdb_addr: Option<String>,
@@ -254,6 +256,11 @@ impl TryFrom<StartCommand> for FrontendOptions {
..Default::default()
})
}
if let Some(addr) = cmd.prom_addr {
opts.prom_options = Some(PromOptions { addr })
}
if let Some(addr) = cmd.postgres_addr {
opts.postgres_options = Some(PostgresOptions {
addr,
@@ -302,6 +309,7 @@ mod tests {
http_addr: None,
rpc_addr: None,
mysql_addr: None,
prom_addr: None,
postgres_addr: None,
opentsdb_addr: None,
config_file: Some(format!(
@@ -347,6 +355,7 @@ mod tests {
let command = StartCommand {
http_addr: None,
rpc_addr: None,
prom_addr: None,
mysql_addr: None,
postgres_addr: None,
opentsdb_addr: None,

View File

@@ -24,7 +24,7 @@ use datatypes::schema::Schema;
use futures::StreamExt;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
use servers::prom::PromHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
@@ -366,7 +366,7 @@ impl SqlQueryHandler for Instance {
}
#[async_trait]
impl PromqlHandler for Instance {
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);

View File

@@ -28,8 +28,8 @@ use crate::instance::FrontendInstance;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prom::PromOptions;
use crate::prometheus::PrometheusOptions;
use crate::promql::PromqlOptions;
use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -43,7 +43,7 @@ pub struct FrontendOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub prom_options: Option<PromOptions>,
pub meta_client_options: Option<MetaClientOptions>,
}
@@ -58,7 +58,7 @@ impl Default for FrontendOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
prom_options: Some(PromOptions::default()),
meta_client_options: None,
}
}

View File

@@ -49,7 +49,7 @@ use query::parser::PromQuery;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::promql::{PromqlHandler, PromqlHandlerRef};
use servers::prom::{PromHandler, PromHandlerRef};
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef};
use servers::query_handler::{
@@ -81,7 +81,7 @@ pub trait FrontendInstance:
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ ScriptHandler
+ PromqlHandler
+ PromHandler
+ Send
+ Sync
+ 'static
@@ -99,7 +99,7 @@ pub struct Instance {
script_handler: Option<ScriptHandlerRef>,
sql_handler: SqlQueryHandlerRef<Error>,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
promql_handler: Option<PromqlHandlerRef>,
promql_handler: Option<PromHandlerRef>,
create_expr_factory: CreateExprFactoryRef,
@@ -539,7 +539,7 @@ impl ScriptHandler for Instance {
}
#[async_trait]
impl PromqlHandler for Instance {
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
if let Some(promql_handler) = &self.promql_handler {
promql_handler.do_query(query).await

View File

@@ -25,8 +25,8 @@ pub mod instance;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prometheus;
pub mod promql;
mod server;
mod sql;
mod table;

View File

@@ -15,11 +15,11 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromqlOptions {
pub struct PromOptions {
pub addr: String,
}
impl Default for PromqlOptions {
impl Default for PromOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4004".to_string(),
@@ -29,11 +29,11 @@ impl Default for PromqlOptions {
#[cfg(test)]
mod tests {
use super::PromqlOptions;
use super::PromOptions;
#[test]
fn test_prometheus_options() {
let default = PromqlOptions::default();
let default = PromOptions::default();
assert_eq!(default.addr, "127.0.0.1:4004".to_string());
}
}

View File

@@ -25,7 +25,7 @@ use servers::http::HttpServer;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::promql::PromqlServer;
use servers::prom::PromServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -183,15 +183,15 @@ impl Services {
None
};
let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options {
let promql_addr = parse_addr(&promql_options.addr)?;
let prom_server_and_addr = if let Some(prom_options) = &opts.prom_options {
let prom_addr = parse_addr(&prom_options.addr)?;
let mut promql_server = PromqlServer::create_server(instance.clone());
let mut prom_server = PromServer::create_server(instance.clone());
if let Some(user_provider) = user_provider {
promql_server.set_user_provider(user_provider);
prom_server.set_user_provider(user_provider);
}
Some((promql_server as _, promql_addr))
Some((prom_server as _, prom_addr))
} else {
None
};
@@ -202,7 +202,7 @@ impl Services {
start_server(mysql_server_and_addr),
start_server(postgres_server_and_addr),
start_server(opentsdb_server_and_addr),
start_server(promql_server_and_addr),
start_server(prom_server_and_addr),
)
.context(error::StartServerSnafu)?;
Ok(())

View File

@@ -28,8 +28,8 @@ pub mod line_writer;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prometheus;
pub mod promql;
pub mod query_handler;
pub mod server;
mod shutdown;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -52,24 +53,25 @@ use crate::error::{
use crate::http::authorize::HttpAuth;
use crate::server::Server;
pub const PROMQL_API_VERSION: &str = "v1";
pub const PROM_API_VERSION: &str = "v1";
pub type PromqlHandlerRef = Arc<dyn PromqlHandler + Send + Sync>;
pub type PromHandlerRef = Arc<dyn PromHandler + Send + Sync>;
#[async_trait]
pub trait PromqlHandler {
pub trait PromHandler {
async fn do_query(&self, query: &PromQuery) -> Result<Output>;
}
pub struct PromqlServer {
query_handler: PromqlHandlerRef,
/// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API
pub struct PromServer {
query_handler: PromHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
}
impl PromqlServer {
pub fn create_server(query_handler: PromqlHandlerRef) -> Box<Self> {
Box::new(PromqlServer {
impl PromServer {
pub fn create_server(query_handler: PromHandlerRef) -> Box<Self> {
Box::new(PromServer {
query_handler,
shutdown_tx: Mutex::new(None),
user_provider: None,
@@ -90,7 +92,7 @@ impl PromqlServer {
.with_state(self.query_handler.clone());
Router::new()
.nest(&format!("/api/{PROMQL_API_VERSION}"), router)
.nest(&format!("/api/{PROM_API_VERSION}"), router)
// middlewares
.layer(
ServiceBuilder::new()
@@ -105,15 +107,15 @@ impl PromqlServer {
}
#[async_trait]
impl Server for PromqlServer {
impl Server for PromServer {
async fn shutdown(&self) -> Result<()> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the PromQl server has already existed");
info!("Receiver dropped, the Prometheus API server has already existed");
}
}
info!("Shutdown PromQL server");
info!("Shutdown Prometheus API server");
Ok(())
}
@@ -124,7 +126,9 @@ impl Server for PromqlServer {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu { server: "PromQL" }
AlreadyStartedSnafu {
server: "Prometheus"
}
);
let app = self.make_app();
@@ -135,7 +139,7 @@ impl Server for PromqlServer {
server
};
let listening = server.local_addr();
info!("PromQL server is bound to {}", listening);
info!("Prometheus API server is bound to {}", listening);
let graceful = server.with_graceful_shutdown(rx.map(drop));
graceful.await.context(StartHttpSnafu)?;
@@ -145,22 +149,22 @@ impl Server for PromqlServer {
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlSeries {
pub struct PromSeries {
metric: HashMap<String, String>,
values: Vec<(f64, String)>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlData {
pub struct PromData {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<PromqlSeries>,
result: Vec<PromSeries>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlJsonResponse {
pub struct PromJsonResponse {
status: String,
data: PromqlData,
data: PromData,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -170,23 +174,23 @@ pub struct PromqlJsonResponse {
warnings: Option<Vec<String>>,
}
impl PromqlJsonResponse {
impl PromJsonResponse {
pub fn error<S1, S2>(error_type: S1, reason: S2) -> Json<Self>
where
S1: Into<String>,
S2: Into<String>,
{
Json(PromqlJsonResponse {
Json(PromJsonResponse {
status: "error".to_string(),
data: PromqlData::default(),
data: PromData::default(),
error: Some(reason.into()),
error_type: Some(error_type.into()),
warnings: None,
})
}
pub fn success(data: PromqlData) -> Json<Self> {
Json(PromqlJsonResponse {
pub fn success(data: PromData) -> Json<Self> {
Json(PromJsonResponse {
status: "success".to_string(),
data,
error: None,
@@ -224,7 +228,7 @@ impl PromqlJsonResponse {
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PromqlData {
Self::success(PromData {
result_type: "matrix".to_string(),
..Default::default()
})
@@ -235,7 +239,7 @@ impl PromqlJsonResponse {
}
}
fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromqlData> {
fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromData> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
@@ -322,13 +326,13 @@ impl PromqlJsonResponse {
let result = buffer
.into_iter()
.map(|(tags, values)| PromqlSeries {
.map(|(tags, values)| PromSeries {
metric: tags.into_iter().collect(),
values,
})
.collect();
let data = PromqlData {
let data = PromData {
result_type: "matrix".to_string(),
result,
};
@@ -346,10 +350,10 @@ pub struct InstantQuery {
#[axum_macros::debug_handler]
pub async fn instant_query(
State(_handler): State<PromqlHandlerRef>,
State(_handler): State<PromHandlerRef>,
Query(_params): Query<InstantQuery>,
) -> Json<PromqlJsonResponse> {
PromqlJsonResponse::error(
) -> Json<PromJsonResponse> {
PromJsonResponse::error(
"not implemented",
"instant query api `/query` is not implemented. Use `/query_range` instead.",
)
@@ -366,10 +370,10 @@ pub struct RangeQuery {
#[axum_macros::debug_handler]
pub async fn range_query(
State(handler): State<PromqlHandlerRef>,
State(handler): State<PromHandlerRef>,
Query(params): Query<RangeQuery>,
Form(form_params): Form<RangeQuery>,
) -> Json<PromqlJsonResponse> {
) -> Json<PromJsonResponse> {
let prom_query = PromQuery {
query: params.query.or(form_params.query).unwrap_or_default(),
start: params.start.or(form_params.start).unwrap_or_default(),
@@ -378,7 +382,7 @@ pub async fn range_query(
};
let result = handler.do_query(&prom_query).await;
let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default();
PromqlJsonResponse::from_query_result(result, metric_name).await
PromJsonResponse::from_query_result(result, metric_name).await
}
fn retrieve_metric_name(promql: &str) -> Option<String> {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
//! prometheus protocol supportings
//! handles prometheus remote_write, remote_read logic
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};