mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat(config-endpoint): add initial implementation (#1896)
* feat(config-endpoint): add initial implementation * feat: add initial handler implementation * fix: apply clippy suggestions, use axum response instead of string * feat: address CR suggestions * fix: minor adjustments in formatting * fix: add a test * feat: add to_toml_string method to options * fix: adjust the assertion for the integration test * fix: adjust expected indents * fix: adjust assertion for the integration test * fix: improve according to clippy
This commit is contained in:
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -5,7 +5,7 @@ on:
|
||||
schedule:
|
||||
# At 00:00 on Monday.
|
||||
- cron: '0 0 * * 1'
|
||||
# Mannually trigger only builds binaries.
|
||||
# Manually trigger only builds binaries.
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
dry_run:
|
||||
|
||||
@@ -61,6 +61,7 @@ table = { path = "../table" }
|
||||
table-procedure = { path = "../table-procedure" }
|
||||
tokio.workspace = true
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
toml = "0.5"
|
||||
tonic.workspace = true
|
||||
tower = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.3", features = ["full"] }
|
||||
@@ -72,4 +73,3 @@ client = { path = "../client" }
|
||||
common-test-util = { path = "../common/test-util" }
|
||||
common-query = { path = "../common/query" }
|
||||
datafusion-common.workspace = true
|
||||
toml = "0.5"
|
||||
|
||||
@@ -405,6 +405,10 @@ impl DatanodeOptions {
|
||||
pub fn env_list_keys() -> Option<&'static [&'static str]> {
|
||||
Some(&["meta_client_options.metasrv_addrs"])
|
||||
}
|
||||
|
||||
pub fn to_toml_string(&self) -> String {
|
||||
toml::to_string(&self).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Datanode service.
|
||||
|
||||
@@ -59,6 +59,7 @@ impl Services {
|
||||
),
|
||||
http_server: HttpServerBuilder::new(opts.http_opts.clone())
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.build(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ store-api = { path = "../store-api" }
|
||||
substrait = { path = "../common/substrait" }
|
||||
table = { path = "../table" }
|
||||
tokio.workspace = true
|
||||
toml = "0.5"
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
@@ -72,6 +73,5 @@ datanode = { path = "../datanode" }
|
||||
futures = "0.3"
|
||||
meta-srv = { path = "../meta-srv", features = ["mock"] }
|
||||
strfmt = "0.2"
|
||||
toml = "0.5"
|
||||
tower = "0.4"
|
||||
uuid.workspace = true
|
||||
|
||||
@@ -65,6 +65,10 @@ impl FrontendOptions {
|
||||
pub fn env_list_keys() -> Option<&'static [&'static str]> {
|
||||
Some(&["meta_client_options.metasrv_addrs"])
|
||||
}
|
||||
|
||||
pub fn to_toml_string(&self) -> String {
|
||||
toml::to_string(&self).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -182,6 +182,7 @@ impl Services {
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_script_handler(instance.clone())
|
||||
.with_configurator(plugins.get::<ConfiguratorRef>())
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.build();
|
||||
result.push((Box::new(http_server), http_addr));
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ store-api = { path = "../store-api" }
|
||||
table = { path = "../table" }
|
||||
tokio.workspace = true
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
toml = "0.5"
|
||||
tonic.workspace = true
|
||||
tower = "0.4"
|
||||
typetag = "0.2"
|
||||
|
||||
@@ -62,6 +62,7 @@ impl MetaSrvInstance {
|
||||
let http_srv = Arc::new(
|
||||
HttpServerBuilder::new(opts.http_opts.clone())
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.build(),
|
||||
);
|
||||
Ok(MetaSrvInstance {
|
||||
|
||||
@@ -71,6 +71,12 @@ impl Default for MetaSrvOptions {
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaSrvOptions {
|
||||
pub fn to_toml_string(&self) -> String {
|
||||
toml::to_string(&self).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Context {
|
||||
pub server_addr: String,
|
||||
|
||||
@@ -124,6 +124,7 @@ pub struct HttpServer {
|
||||
user_provider: Option<UserProviderRef>,
|
||||
metrics_handler: Option<MetricsHandler>,
|
||||
configurator: Option<ConfiguratorRef>,
|
||||
greptime_config_options: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -378,6 +379,11 @@ pub struct ApiState {
|
||||
pub script_handler: Option<ScriptHandlerRef>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GreptimeOptionsConfigState {
|
||||
pub greptime_config_options: String,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct HttpServerBuilder {
|
||||
inner: HttpServer,
|
||||
@@ -398,6 +404,7 @@ impl HttpServerBuilder {
|
||||
metrics_handler: None,
|
||||
shutdown_tx: Mutex::new(None),
|
||||
configurator: None,
|
||||
greptime_config_options: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -447,6 +454,11 @@ impl HttpServerBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_greptime_config_options(&mut self, opts: String) -> &mut Self {
|
||||
self.inner.greptime_config_options = Some(opts);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(&mut self) -> HttpServer {
|
||||
std::mem::take(self).inner
|
||||
}
|
||||
@@ -477,7 +489,7 @@ impl HttpServer {
|
||||
script_handler: self.script_handler.clone(),
|
||||
})
|
||||
.finish_api(&mut api)
|
||||
.layer(Extension(api));
|
||||
.layer(Extension(api.clone()));
|
||||
router = router.nest(&format!("/{HTTP_API_VERSION}"), sql_router);
|
||||
}
|
||||
|
||||
@@ -518,6 +530,17 @@ impl HttpServer {
|
||||
routing::get(handler::health).post(handler::health),
|
||||
);
|
||||
|
||||
let config_router = self
|
||||
.route_config(GreptimeOptionsConfigState {
|
||||
greptime_config_options: self
|
||||
.greptime_config_options
|
||||
.clone()
|
||||
.unwrap_or("".to_string()),
|
||||
})
|
||||
.finish_api(&mut api);
|
||||
|
||||
router = router.nest("", config_router);
|
||||
|
||||
router = router.route("/status", routing::get(handler::status));
|
||||
|
||||
#[cfg(feature = "dashboard")]
|
||||
@@ -629,6 +652,12 @@ impl HttpServer {
|
||||
.route("/flush", routing::post(flush))
|
||||
.with_state(grpc_handler)
|
||||
}
|
||||
|
||||
fn route_config<S>(&self, state: GreptimeOptionsConfigState) -> ApiRouter<S> {
|
||||
ApiRouter::new()
|
||||
.route("/config", apirouting::get(handler::config))
|
||||
.with_state(state)
|
||||
}
|
||||
}
|
||||
|
||||
/// A middleware to record metrics for HTTP.
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::time::Instant;
|
||||
|
||||
use aide::transform::TransformOperation;
|
||||
use axum::extract::{Json, Query, State};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Extension, Form};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_telemetry::{error, timer};
|
||||
@@ -26,7 +27,7 @@ use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::UserInfo;
|
||||
|
||||
use crate::http::{ApiState, JsonResponse};
|
||||
use crate::http::{ApiState, GreptimeOptionsConfigState, JsonResponse};
|
||||
use crate::metrics::JEMALLOC_COLLECTOR;
|
||||
use crate::metrics_handler::MetricsHandler;
|
||||
|
||||
@@ -184,3 +185,9 @@ pub async fn status() -> Json<StatusResponse<'static>> {
|
||||
version: env!("CARGO_PKG_VERSION"),
|
||||
})
|
||||
}
|
||||
|
||||
/// Handler to expose configuration information info about runtime, build, etc.
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
|
||||
(axum::http::StatusCode::OK, state.greptime_config_options).into_response()
|
||||
}
|
||||
|
||||
@@ -14,12 +14,17 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::body::Body;
|
||||
use axum::body::{Body, Bytes};
|
||||
use axum::extract::{Json, Query, RawBody, State};
|
||||
use axum::Form;
|
||||
use common_telemetry::metric;
|
||||
use http_body::combinators::UnsyncBoxBody;
|
||||
use hyper::Response;
|
||||
use metrics::counter;
|
||||
use servers::http::{handler as http_handler, script as script_handler, ApiState, JsonOutput};
|
||||
use servers::http::{
|
||||
handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState,
|
||||
JsonOutput,
|
||||
};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use session::context::UserInfo;
|
||||
use table::test_util::MemTable;
|
||||
@@ -380,3 +385,29 @@ async fn test_status() {
|
||||
let Json(json) = http_handler::status().await;
|
||||
assert_eq!(json, expected_json);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_config() {
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
|
||||
[http_options]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "30s"
|
||||
body_limit = "2GB"
|
||||
|
||||
[logging]
|
||||
level = "debug"
|
||||
dir = "/tmp/greptimedb/test/logs"
|
||||
"#;
|
||||
let rs = http_handler::config(State(GreptimeOptionsConfigState {
|
||||
greptime_config_options: toml_str.to_string(),
|
||||
}))
|
||||
.await;
|
||||
assert_eq!(200_u16, rs.status().as_u16());
|
||||
assert_eq!(get_body(rs).await, toml_str);
|
||||
}
|
||||
|
||||
async fn get_body(response: Response<UnsyncBoxBody<Bytes, axum::Error>>) -> Bytes {
|
||||
hyper::body::to_bytes(response.into_body()).await.unwrap()
|
||||
}
|
||||
|
||||
@@ -381,6 +381,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
|
||||
)))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()))
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.build();
|
||||
(http_server.build(http_server.make_app()), guard)
|
||||
}
|
||||
@@ -417,6 +418,7 @@ pub async fn setup_test_http_app_with_frontend(
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_script_handler(frontend_ref)
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.build();
|
||||
let app = http_server.build(http_server.make_app());
|
||||
(app, guard)
|
||||
@@ -504,6 +506,7 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_script_handler(frontend_ref.clone())
|
||||
.with_prom_handler(frontend_ref.clone())
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.build();
|
||||
let prom_server = PromServer::create_server(frontend_ref);
|
||||
let app = http_server.build(http_server.make_app());
|
||||
|
||||
@@ -59,6 +59,7 @@ macro_rules! http_tests {
|
||||
test_metrics_api,
|
||||
test_scripts_api,
|
||||
test_health_api,
|
||||
test_config_api,
|
||||
test_dashboard_path,
|
||||
);
|
||||
)*
|
||||
@@ -495,6 +496,78 @@ pub async fn test_health_api(store_type: StorageType) {
|
||||
assert_eq!(body, HealthResponse {});
|
||||
}
|
||||
|
||||
pub async fn test_config_api(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, _guard) = setup_test_http_app_with_frontend(store_type, "config_api").await;
|
||||
let client = TestClient::new(app);
|
||||
|
||||
let res_get = client.get("/config").send().await;
|
||||
assert_eq!(res_get.status(), StatusCode::OK);
|
||||
let expected_toml_str = r#"
|
||||
mode = "standalone"
|
||||
enable_memory_catalog = false
|
||||
rpc_addr = "127.0.0.1:3001"
|
||||
rpc_runtime_size = 8
|
||||
heartbeat_interval_millis = 5000
|
||||
|
||||
[http_opts]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "30s"
|
||||
body_limit = "64MiB"
|
||||
|
||||
[wal]
|
||||
file_size = "256MiB"
|
||||
purge_threshold = "4GiB"
|
||||
purge_interval = "10m"
|
||||
read_batch_size = 128
|
||||
sync_write = false
|
||||
|
||||
[storage]
|
||||
type = "File"
|
||||
|
||||
[storage.compaction]
|
||||
max_inflight_tasks = 4
|
||||
max_files_in_level0 = 8
|
||||
max_purge_tasks = 32
|
||||
sst_write_buffer_size = "8MiB"
|
||||
|
||||
[storage.manifest]
|
||||
checkpoint_margin = 10
|
||||
gc_duration = "10m"
|
||||
checkpoint_on_startup = false
|
||||
compress = false
|
||||
|
||||
[storage.flush]
|
||||
max_flush_tasks = 8
|
||||
region_write_buffer_size = "32MiB"
|
||||
picker_schedule_interval = "5m"
|
||||
auto_flush_interval = "1h"
|
||||
|
||||
[procedure]
|
||||
max_retry_times = 3
|
||||
retry_delay = "500ms"
|
||||
|
||||
[logging]
|
||||
enable_jaeger_tracing = false"#;
|
||||
let body_text = drop_lines_with_inconsistent_results(res_get.text().await);
|
||||
assert_eq!(
|
||||
normalize_str(body_text.as_str()),
|
||||
normalize_str(expected_toml_str)
|
||||
);
|
||||
}
|
||||
|
||||
fn drop_lines_with_inconsistent_results(input: String) -> String {
|
||||
input
|
||||
.lines()
|
||||
.filter(|line| !line.trim().starts_with("dir =") && !line.trim().starts_with("data_home ="))
|
||||
.collect::<Vec<&str>>()
|
||||
.join("\n")
|
||||
}
|
||||
|
||||
fn normalize_str(s: &str) -> String {
|
||||
s.replace([' ', '\n'], "")
|
||||
}
|
||||
|
||||
#[cfg(feature = "dashboard")]
|
||||
pub async fn test_dashboard_path(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
Reference in New Issue
Block a user