feat: support influxdb ping and health endpoint (#1027)

* feat: support influxdb ping and health endpoint

* add some unit tests

* ping and health api no need auth

* cr
This commit is contained in:
fys
2023-02-20 10:31:51 +08:00
committed by GitHub
parent af1f8d6101
commit 4e88a01638
4 changed files with 67 additions and 4 deletions

View File

@@ -50,7 +50,7 @@ use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::trace::TraceLayer;
use self::authorize::HttpAuth;
use self::influxdb::influxdb_write;
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
use crate::auth::UserProviderRef;
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
@@ -88,6 +88,9 @@ pub(crate) fn query_context_from_db(
pub const HTTP_API_VERSION: &str = "v1";
pub const HTTP_API_PREFIX: &str = "/v1/";
// TODO(fys): This is a temporary workaround, it will be improved later
pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
pub struct HttpServer {
sql_handler: ServerSqlQueryHandlerRef,
options: HttpOptions,
@@ -492,6 +495,8 @@ impl HttpServer {
fn route_influxdb<S>(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/write", routing::post(influxdb_write))
.route("/ping", routing::get(influxdb_ping))
.route("/health", routing::get(influxdb_health))
.with_state(influxdb_handler)
}

View File

@@ -1,4 +1,3 @@
use std::collections::HashMap;
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +11,8 @@ use std::collections::HashMap;
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::marker::PhantomData;
use axum::http::{self, Request, StatusCode};
@@ -23,6 +24,7 @@ use session::context::UserInfo;
use snafu::{OptionExt, ResultExt};
use tower_http::auth::AsyncAuthorizeRequest;
use super::PUBLIC_APIS;
use crate::auth::Error::IllegalParam;
use crate::auth::{Identity, IllegalParamSnafu, InternalStateSnafu, UserProviderRef};
use crate::error::{self, Result};
@@ -63,7 +65,8 @@ where
fn authorize(&mut self, mut request: Request<B>) -> Self::Future {
let user_provider = self.user_provider.clone();
Box::pin(async move {
let need_auth = request.uri().path().starts_with(HTTP_API_PREFIX);
let need_auth = need_auth(&request);
let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) {
user_provider
} else {
@@ -209,10 +212,46 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> {
error::InvalidAuthorizationHeaderSnafu {}.fail()
}
fn need_auth<B>(req: &Request<B>) -> bool {
let path = req.uri().path();
for api in PUBLIC_APIS {
if path.starts_with(api) {
return false;
}
}
path.starts_with(HTTP_API_PREFIX)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_need_auth() {
let req = Request::builder()
.uri("http://127.0.0.1/v1/influxdb/ping")
.body(())
.unwrap();
assert!(!need_auth(&req));
let req = Request::builder()
.uri("http://127.0.0.1/v1/influxdb/health")
.body(())
.unwrap();
assert!(!need_auth(&req));
let req = Request::builder()
.uri("http://127.0.0.1/v1/influxdb/write")
.body(())
.unwrap();
assert!(need_auth(&req));
}
#[test]
fn test_decode_basic() {
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::writer::Precision;
use session::context::QueryContext;
@@ -25,12 +26,24 @@ use crate::error::{Result, TimePrecisionSnafu};
use crate::influxdb::InfluxdbRequest;
use crate::query_handler::InfluxdbLineProtocolHandlerRef;
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint
#[axum_macros::debug_handler]
pub async fn influxdb_ping() -> Result<impl IntoResponse> {
Ok(StatusCode::NO_CONTENT)
}
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#health-http-endpoint
#[axum_macros::debug_handler]
pub async fn influxdb_health() -> Result<impl IntoResponse> {
Ok(StatusCode::OK)
}
#[axum_macros::debug_handler]
pub async fn influxdb_write(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
) -> Result<(StatusCode, ())> {
) -> Result<impl IntoResponse> {
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());

View File

@@ -110,6 +110,12 @@ async fn test_influxdb_write() {
let app = make_test_app(tx.clone(), None);
let client = TestClient::new(app);
let result = client.get("/v1/influxdb/health").send().await;
assert_eq!(result.status(), 200);
let result = client.get("/v1/influxdb/ping").send().await;
assert_eq!(result.status(), 204);
// right request
let result = client
.post("/v1/influxdb/write?db=public")