feat: improve influxdb v2 api compability (#1831)

* feat: support influxdb v2 api

* cr
This commit is contained in:
fys
2023-06-27 18:21:51 +08:00
committed by GitHub
parent 313121f2ae
commit 99f0479bd2
2 changed files with 49 additions and 11 deletions

View File

@@ -59,7 +59,7 @@ use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::trace::TraceLayer;
use self::authorize::HttpAuth;
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::auth::UserProviderRef;
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
@@ -598,7 +598,8 @@ impl HttpServer {
fn route_influxdb<S>(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/write", routing::post(influxdb_write))
.route("/write", routing::post(influxdb_write_v1))
.route("/api/v2/write", routing::post(influxdb_write_v2))
.route("/ping", routing::get(influxdb_ping))
.route("/health", routing::get(influxdb_health))
.with_state(influxdb_handler)

View File

@@ -41,7 +41,7 @@ pub async fn influxdb_health() -> Result<impl IntoResponse> {
}
#[axum_macros::debug_handler]
pub async fn influxdb_write(
pub async fn influxdb_write_v1(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
@@ -49,27 +49,62 @@ pub async fn influxdb_write(
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let _timer = timer!(
crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, db.clone())]
);
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
let ctx = Arc::new(QueryContext::with(catalog, schema));
let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;
influxdb_write(&db, precision, lines, handler).await
}
#[axum_macros::debug_handler]
pub async fn influxdb_write_v2(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
) -> Result<impl IntoResponse> {
let db = params
.remove("bucket")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;
influxdb_write(&db, precision, lines, handler).await
}
pub async fn influxdb_write(
db: &str,
precision: Option<Precision>,
lines: String,
handler: InfluxdbLineProtocolHandlerRef,
) -> Result<impl IntoResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, db.to_string())]
);
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(db);
let ctx = Arc::new(QueryContext::with(catalog, schema));
let request = InfluxdbRequest { precision, lines };
handler.exec(&request, ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}
fn parse_time_precision(value: &str) -> Result<Precision> {
// Precision conversion needs to be compatible with influxdb v1 v2 api.
// For details, see the Influxdb documents.
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2write-http-endpoint
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
match value {
"n" => Ok(Precision::Nanosecond),
"u" => Ok(Precision::Microsecond),
"n" | "ns" => Ok(Precision::Nanosecond),
"u" | "us" => Ok(Precision::Microsecond),
"ms" => Ok(Precision::Millisecond),
"s" => Ok(Precision::Second),
"m" => Ok(Precision::Minute),
@@ -90,7 +125,9 @@ mod tests {
#[test]
fn test_parse_time_precision() {
assert_eq!(Precision::Nanosecond, parse_time_precision("n").unwrap());
assert_eq!(Precision::Nanosecond, parse_time_precision("ns").unwrap());
assert_eq!(Precision::Microsecond, parse_time_precision("u").unwrap());
assert_eq!(Precision::Microsecond, parse_time_precision("us").unwrap());
assert_eq!(Precision::Millisecond, parse_time_precision("ms").unwrap());
assert_eq!(Precision::Second, parse_time_precision("s").unwrap());
assert_eq!(Precision::Minute, parse_time_precision("m").unwrap());