From c994e0de88cdf62a444cfe81a9fb6f0798ed473f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Mar 2023 16:13:57 +0800 Subject: [PATCH] fix: format --- src/datanode/src/server/grpc.rs | 2 +- src/datanode/src/sql/flush_table.rs | 41 +++++++++++++++++----- src/frontend/src/server.rs | 1 - src/servers/src/error.rs | 11 +++--- src/servers/src/http.rs | 32 ++++++++++------- src/servers/src/http/admin.rs | 42 +++++++++++++++-------- src/servers/tests/http/influxdb_test.rs | 16 +++++---- src/servers/tests/http/opentsdb_test.rs | 12 ++++--- src/servers/tests/http/prometheus_test.rs | 16 +++++---- 9 files changed, 110 insertions(+), 63 deletions(-) diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 8170d96f67..a38093dc98 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -247,7 +247,7 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), false, ) - .with_time_index(true), + .with_time_index(true), ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index 7b12ef36ca..ab3b459311 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -23,23 +23,46 @@ use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result { if let Some(table) = &req.table_name { - self.flush_table_inner(&req.catalog_name, &req.schema_name, table, req.region_number).await?; + self.flush_table_inner( + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + ) + .await?; } else { - let schema = self.catalog_manager.schema(&req.catalog_name, &req.schema_name).context(CatalogSnafu)?.context(DatabaseNotFoundSnafu { - catalog: &req.catalog_name, - schema: &req.schema_name, - })?; + let schema = self + .catalog_manager + .schema(&req.catalog_name, &req.schema_name) + .context(CatalogSnafu)? + .context(DatabaseNotFoundSnafu { + catalog: &req.catalog_name, + schema: &req.schema_name, + })?; let all_table_names = schema.table_names().context(CatalogSnafu)?; futures::future::join_all(all_table_names.iter().map(|table| { - self.flush_table_inner(&req.catalog_name, &req.schema_name,table, req.region_number) - }) - ).await.into_iter().collect::>>()?; + self.flush_table_inner( + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + ) + })) + .await + .into_iter() + .collect::>>()?; } Ok(Output::AffectedRows(0)) } - async fn flush_table_inner(&self, catalog:&str, schema:&str, table: &str, region: Option) -> Result<()> { + async fn flush_table_inner( + &self, + catalog: &str, + schema: &str, + table: &str, + region: Option, + ) -> Result<()> { let table_ref = TableReference { catalog, schema, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 395a91d979..d4ab41dee8 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -150,7 +150,6 @@ impl Services { if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; - let mut http_server = HttpServer::new( ServerSqlQueryHandlerAdaptor::arc(instance.clone()), ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8e7c19f76e..92735341df 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -160,9 +160,9 @@ pub enum Error { }, #[snafu(display( - "Failed to put OpenTSDB data point: {:?}, source: {}", - data_point, - source + "Failed to put OpenTSDB data point: {:?}, source: {}", + data_point, + source ))] PutOpentsdbDataPoint { data_point: String, @@ -268,10 +268,7 @@ pub enum Error { InvalidPrepareStatement { err_msg: String }, #[snafu(display("Invalid flush argument: {}", err_msg))] - InvalidFlushArgument { - err_msg: String, - }, - + InvalidFlushArgument { err_msg: String }, // #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}", // catalog, schema, table, region_id, source // ))] diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 0c049fd4eb..0f7b9da563 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -19,9 +19,9 @@ pub mod opentsdb; pub mod prometheus; pub mod script; +mod admin; #[cfg(feature = "mem-prof")] pub mod mem_prof; -mod admin; use std::net::SocketAddr; use std::sync::Arc; @@ -58,12 +58,12 @@ use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write}; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; use crate::http::admin::flush; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, ScriptHandlerRef, }; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::server::Server; /// create query context from database name information, catalog and schema are @@ -353,9 +353,11 @@ pub struct ApiState { } impl HttpServer { - pub fn new(sql_handler: ServerSqlQueryHandlerRef, - grpc_handler: ServerGrpcQueryHandlerRef, - options: HttpOptions) -> Self { + pub fn new( + sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, + options: HttpOptions, + ) -> Self { Self { sql_handler, grpc_handler, @@ -433,7 +435,10 @@ impl HttpServer { .layer(Extension(api)); let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router); - router = router.nest(&format!("/{HTTP_API_VERSION}/admin"), self.route_admin(self.grpc_handler.clone())); + router = router.nest( + &format!("/{HTTP_API_VERSION}/admin"), + self.route_admin(self.grpc_handler.clone()), + ); if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { router = router.nest( @@ -527,7 +532,8 @@ impl HttpServer { } fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { - Router::new().route("/flush", routing::post(flush)) + Router::new() + .route("/flush", routing::post(flush)) .with_state(grpc_handler) } } @@ -591,6 +597,7 @@ mod test { use std::future::pending; use std::sync::Arc; + use api::v1::greptime_request::Request; use axum::handler::Handler; use axum::http::StatusCode; use axum::routing::get; @@ -602,7 +609,6 @@ mod test { use query::parser::PromQuery; use session::context::QueryContextRef; use tokio::sync::mpsc; - use api::v1::greptime_request::Request; use super::*; use crate::error::Error; @@ -617,7 +623,11 @@ mod test { impl GrpcQueryHandler for DummyInstance { type Error = Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -664,9 +674,7 @@ mod test { let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); - let server = HttpServer::new(sql_instance, - grpc_instance, - HttpOptions::default()); + let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default()); server.make_app().route( "/test/timeout", get(forever.layer( diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs index 466bb313fb..d801b1703c 100644 --- a/src/servers/src/http/admin.rs +++ b/src/servers/src/http/admin.rs @@ -12,20 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use api::v1::ddl_request::Expr; +use api::v1::greptime_request::Request; +use api::v1::{DdlRequest, FlushTableExpr}; use axum::extract::{Query, RawBody, State}; use axum::http::StatusCode as HttpStatusCode; use axum::Json; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; -use std::collections::HashMap; -use snafu::OptionExt; -use api::v1::greptime_request::Request; -use api::v1::{DdlRequest, FlushTableExpr}; -use api::v1::ddl_request::Expr; use session::context::QueryContext; -use crate::error; +use snafu::OptionExt; -use crate::error::{Result}; +use crate::error; +use crate::error::Result; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; #[axum_macros::debug_handler] pub async fn flush( @@ -33,15 +33,27 @@ pub async fn flush( Query(params): Query>, RawBody(_): RawBody, ) -> Result<(HttpStatusCode, Json)> { - let catalog_name = params.get("catalog_name").cloned().unwrap_or("greptime".to_string()); - let schema_name = params.get("schema_name").cloned().context(error::InvalidFlushArgumentSnafu { - err_msg: "schema_name is not present", - })?; + let catalog_name = params + .get("catalog_name") + .cloned() + .unwrap_or("greptime".to_string()); + let schema_name = + params + .get("schema_name") + .cloned() + .context(error::InvalidFlushArgumentSnafu { + err_msg: "schema_name is not present", + })?; // if table name is not present, flush all tables inside schema let table_name = params.get("table_name").cloned().unwrap_or_default(); - let region_id: Option = params.get("region").map(|v| v.parse()).transpose().ok().flatten(); + let region_id: Option = params + .get("region") + .map(|v| v.parse()) + .transpose() + .ok() + .flatten(); let request = Request::Ddl(DdlRequest { expr: Some(Expr::FlushTable(FlushTableExpr { @@ -49,9 +61,9 @@ pub async fn flush( schema_name: schema_name.clone(), table_name: table_name.clone(), region_id, - })) + })), }); grpc_handler.do_query(request, QueryContext::arc()).await?; Ok((HttpStatusCode::OK, Json::from("hello, world".to_string()))) -} \ No newline at end of file +} diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 3f1e2ab7fd..086c6403c5 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::greptime_request::Request; use api::v1::InsertRequest; use async_trait::async_trait; use axum::{http, Router}; @@ -24,12 +25,11 @@ use query::parser::PromQuery; use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::influxdb::InfluxdbRequest; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; -use api::v1::greptime_request::Request; -use servers::query_handler::grpc::GrpcQueryHandler; use crate::auth::{DatabaseAuthInfo, MockUserProvider}; @@ -39,9 +39,13 @@ struct DummyInstance { #[async_trait] impl GrpcQueryHandler for DummyInstance { - type Error =Error; + type Error = Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -90,9 +94,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), - instance.clone(), - HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); let mut user_provider = MockUserProvider::default(); if let Some(name) = db_name { user_provider.set_authorization_info(DatabaseAuthInfo { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 453cb2f7ee..694751635e 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::greptime_request::Request; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -23,12 +24,11 @@ use query::parser::PromQuery; use servers::error::{self, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::opentsdb::codec::DataPoint; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; -use api::v1::greptime_request::Request; -use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender, @@ -38,7 +38,11 @@ struct DummyInstance { impl GrpcQueryHandler for DummyInstance { type Error = crate::Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -88,7 +92,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(),instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); server.set_opentsdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 5677fb6c8d..173382b1e4 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::prometheus::remote::{ LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, }; +use api::v1::greptime_request::Request; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -28,12 +29,11 @@ use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::prometheus; use servers::prometheus::{snappy_compress, Metrics}; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; use tokio::sync::mpsc; -use api::v1::greptime_request::Request; -use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender<(String, Vec)>, @@ -41,9 +41,13 @@ struct DummyInstance { #[async_trait] impl GrpcQueryHandler for DummyInstance { - type Error =Error; + type Error = Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -113,9 +117,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), - instance.clone(), - HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); server.set_prom_handler(instance); server.make_app() }