mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 20:00:36 +00:00
fix: format
This commit is contained in:
@@ -247,7 +247,7 @@ mod tests {
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
];
|
||||
|
||||
@@ -23,23 +23,46 @@ use crate::sql::SqlHandler;
|
||||
impl SqlHandler {
|
||||
pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result<Output> {
|
||||
if let Some(table) = &req.table_name {
|
||||
self.flush_table_inner(&req.catalog_name, &req.schema_name, table, req.region_number).await?;
|
||||
self.flush_table_inner(
|
||||
&req.catalog_name,
|
||||
&req.schema_name,
|
||||
table,
|
||||
req.region_number,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
let schema = self.catalog_manager.schema(&req.catalog_name, &req.schema_name).context(CatalogSnafu)?.context(DatabaseNotFoundSnafu {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
})?;
|
||||
let schema = self
|
||||
.catalog_manager
|
||||
.schema(&req.catalog_name, &req.schema_name)
|
||||
.context(CatalogSnafu)?
|
||||
.context(DatabaseNotFoundSnafu {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
})?;
|
||||
|
||||
let all_table_names = schema.table_names().context(CatalogSnafu)?;
|
||||
futures::future::join_all(all_table_names.iter().map(|table| {
|
||||
self.flush_table_inner(&req.catalog_name, &req.schema_name,table, req.region_number)
|
||||
})
|
||||
).await.into_iter().collect::<Result<Vec<_>>>()?;
|
||||
self.flush_table_inner(
|
||||
&req.catalog_name,
|
||||
&req.schema_name,
|
||||
table,
|
||||
req.region_number,
|
||||
)
|
||||
}))
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
}
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
|
||||
async fn flush_table_inner(&self, catalog:&str, schema:&str, table: &str, region: Option<u32>) -> Result<()> {
|
||||
async fn flush_table_inner(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
region: Option<u32>,
|
||||
) -> Result<()> {
|
||||
let table_ref = TableReference {
|
||||
catalog,
|
||||
schema,
|
||||
|
||||
@@ -150,7 +150,6 @@ impl Services {
|
||||
if let Some(http_options) = &opts.http_options {
|
||||
let http_addr = parse_addr(&http_options.addr)?;
|
||||
|
||||
|
||||
let mut http_server = HttpServer::new(
|
||||
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
|
||||
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
|
||||
|
||||
@@ -160,9 +160,9 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to put OpenTSDB data point: {:?}, source: {}",
|
||||
data_point,
|
||||
source
|
||||
"Failed to put OpenTSDB data point: {:?}, source: {}",
|
||||
data_point,
|
||||
source
|
||||
))]
|
||||
PutOpentsdbDataPoint {
|
||||
data_point: String,
|
||||
@@ -268,10 +268,7 @@ pub enum Error {
|
||||
InvalidPrepareStatement { err_msg: String },
|
||||
|
||||
#[snafu(display("Invalid flush argument: {}", err_msg))]
|
||||
InvalidFlushArgument {
|
||||
err_msg: String,
|
||||
},
|
||||
|
||||
InvalidFlushArgument { err_msg: String },
|
||||
// #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}",
|
||||
// catalog, schema, table, region_id, source
|
||||
// ))]
|
||||
|
||||
@@ -19,9 +19,9 @@ pub mod opentsdb;
|
||||
pub mod prometheus;
|
||||
pub mod script;
|
||||
|
||||
mod admin;
|
||||
#[cfg(feature = "mem-prof")]
|
||||
pub mod mem_prof;
|
||||
mod admin;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -58,12 +58,12 @@ use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
|
||||
use crate::auth::UserProviderRef;
|
||||
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
|
||||
use crate::http::admin::flush;
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
||||
use crate::query_handler::{
|
||||
InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef,
|
||||
ScriptHandlerRef,
|
||||
};
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
use crate::server::Server;
|
||||
|
||||
/// create query context from database name information, catalog and schema are
|
||||
@@ -353,9 +353,11 @@ pub struct ApiState {
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn new(sql_handler: ServerSqlQueryHandlerRef,
|
||||
grpc_handler: ServerGrpcQueryHandlerRef,
|
||||
options: HttpOptions) -> Self {
|
||||
pub fn new(
|
||||
sql_handler: ServerSqlQueryHandlerRef,
|
||||
grpc_handler: ServerGrpcQueryHandlerRef,
|
||||
options: HttpOptions,
|
||||
) -> Self {
|
||||
Self {
|
||||
sql_handler,
|
||||
grpc_handler,
|
||||
@@ -433,7 +435,10 @@ impl HttpServer {
|
||||
.layer(Extension(api));
|
||||
|
||||
let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router);
|
||||
router = router.nest(&format!("/{HTTP_API_VERSION}/admin"), self.route_admin(self.grpc_handler.clone()));
|
||||
router = router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/admin"),
|
||||
self.route_admin(self.grpc_handler.clone()),
|
||||
);
|
||||
|
||||
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
|
||||
router = router.nest(
|
||||
@@ -527,7 +532,8 @@ impl HttpServer {
|
||||
}
|
||||
|
||||
fn route_admin<S>(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router<S> {
|
||||
Router::new().route("/flush", routing::post(flush))
|
||||
Router::new()
|
||||
.route("/flush", routing::post(flush))
|
||||
.with_state(grpc_handler)
|
||||
}
|
||||
}
|
||||
@@ -591,6 +597,7 @@ mod test {
|
||||
use std::future::pending;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use axum::handler::Handler;
|
||||
use axum::http::StatusCode;
|
||||
use axum::routing::get;
|
||||
@@ -602,7 +609,6 @@ mod test {
|
||||
use query::parser::PromQuery;
|
||||
use session::context::QueryContextRef;
|
||||
use tokio::sync::mpsc;
|
||||
use api::v1::greptime_request::Request;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
@@ -617,7 +623,11 @@ mod test {
|
||||
impl GrpcQueryHandler for DummyInstance {
|
||||
type Error = Error;
|
||||
|
||||
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
@@ -664,9 +674,7 @@ mod test {
|
||||
let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
|
||||
let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance);
|
||||
|
||||
let server = HttpServer::new(sql_instance,
|
||||
grpc_instance,
|
||||
HttpOptions::default());
|
||||
let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default());
|
||||
server.make_app().route(
|
||||
"/test/timeout",
|
||||
get(forever.layer(
|
||||
|
||||
@@ -12,20 +12,20 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::ddl_request::Expr;
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::{DdlRequest, FlushTableExpr};
|
||||
use axum::extract::{Query, RawBody, State};
|
||||
use axum::http::StatusCode as HttpStatusCode;
|
||||
use axum::Json;
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
use std::collections::HashMap;
|
||||
use snafu::OptionExt;
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::{DdlRequest, FlushTableExpr};
|
||||
use api::v1::ddl_request::Expr;
|
||||
use session::context::QueryContext;
|
||||
use crate::error;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{Result};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn flush(
|
||||
@@ -33,15 +33,27 @@ pub async fn flush(
|
||||
Query(params): Query<HashMap<String, String>>,
|
||||
RawBody(_): RawBody,
|
||||
) -> Result<(HttpStatusCode, Json<String>)> {
|
||||
let catalog_name = params.get("catalog_name").cloned().unwrap_or("greptime".to_string());
|
||||
let schema_name = params.get("schema_name").cloned().context(error::InvalidFlushArgumentSnafu {
|
||||
err_msg: "schema_name is not present",
|
||||
})?;
|
||||
let catalog_name = params
|
||||
.get("catalog_name")
|
||||
.cloned()
|
||||
.unwrap_or("greptime".to_string());
|
||||
let schema_name =
|
||||
params
|
||||
.get("schema_name")
|
||||
.cloned()
|
||||
.context(error::InvalidFlushArgumentSnafu {
|
||||
err_msg: "schema_name is not present",
|
||||
})?;
|
||||
|
||||
// if table name is not present, flush all tables inside schema
|
||||
let table_name = params.get("table_name").cloned().unwrap_or_default();
|
||||
|
||||
let region_id: Option<u32> = params.get("region").map(|v| v.parse()).transpose().ok().flatten();
|
||||
let region_id: Option<u32> = params
|
||||
.get("region")
|
||||
.map(|v| v.parse())
|
||||
.transpose()
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(Expr::FlushTable(FlushTableExpr {
|
||||
@@ -49,9 +61,9 @@ pub async fn flush(
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
region_id,
|
||||
}))
|
||||
})),
|
||||
});
|
||||
|
||||
grpc_handler.do_query(request, QueryContext::arc()).await?;
|
||||
Ok((HttpStatusCode::OK, Json::from("hello, world".to_string())))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::InsertRequest;
|
||||
use async_trait::async_trait;
|
||||
use axum::{http, Router};
|
||||
@@ -24,12 +25,11 @@ use query::parser::PromQuery;
|
||||
use servers::error::{Error, Result};
|
||||
use servers::http::{HttpOptions, HttpServer};
|
||||
use servers::influxdb::InfluxdbRequest;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::InfluxdbLineProtocolHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use tokio::sync::mpsc;
|
||||
use api::v1::greptime_request::Request;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
|
||||
use crate::auth::{DatabaseAuthInfo, MockUserProvider};
|
||||
|
||||
@@ -39,9 +39,13 @@ struct DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl GrpcQueryHandler for DummyInstance {
|
||||
type Error =Error;
|
||||
type Error = Error;
|
||||
|
||||
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
@@ -90,9 +94,7 @@ impl SqlQueryHandler for DummyInstance {
|
||||
|
||||
fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>) -> Router {
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let mut server = HttpServer::new(instance.clone(),
|
||||
instance.clone(),
|
||||
HttpOptions::default());
|
||||
let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default());
|
||||
let mut user_provider = MockUserProvider::default();
|
||||
if let Some(name) = db_name {
|
||||
user_provider.set_authorization_info(DatabaseAuthInfo {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use async_trait::async_trait;
|
||||
use axum::Router;
|
||||
use axum_test_helper::TestClient;
|
||||
@@ -23,12 +24,11 @@ use query::parser::PromQuery;
|
||||
use servers::error::{self, Result};
|
||||
use servers::http::{HttpOptions, HttpServer};
|
||||
use servers::opentsdb::codec::DataPoint;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::OpentsdbProtocolHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use tokio::sync::mpsc;
|
||||
use api::v1::greptime_request::Request;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
|
||||
struct DummyInstance {
|
||||
tx: mpsc::Sender<String>,
|
||||
@@ -38,7 +38,11 @@ struct DummyInstance {
|
||||
impl GrpcQueryHandler for DummyInstance {
|
||||
type Error = crate::Error;
|
||||
|
||||
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
@@ -88,7 +92,7 @@ impl SqlQueryHandler for DummyInstance {
|
||||
|
||||
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let mut server = HttpServer::new(instance.clone(),instance.clone(), HttpOptions::default());
|
||||
let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default());
|
||||
server.set_opentsdb_handler(instance);
|
||||
server.make_app()
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
use api::prometheus::remote::{
|
||||
LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest,
|
||||
};
|
||||
use api::v1::greptime_request::Request;
|
||||
use async_trait::async_trait;
|
||||
use axum::Router;
|
||||
use axum_test_helper::TestClient;
|
||||
@@ -28,12 +29,11 @@ use servers::error::{Error, Result};
|
||||
use servers::http::{HttpOptions, HttpServer};
|
||||
use servers::prometheus;
|
||||
use servers::prometheus::{snappy_compress, Metrics};
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
|
||||
use session::context::QueryContextRef;
|
||||
use tokio::sync::mpsc;
|
||||
use api::v1::greptime_request::Request;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
|
||||
struct DummyInstance {
|
||||
tx: mpsc::Sender<(String, Vec<u8>)>,
|
||||
@@ -41,9 +41,13 @@ struct DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl GrpcQueryHandler for DummyInstance {
|
||||
type Error =Error;
|
||||
type Error = Error;
|
||||
|
||||
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
@@ -113,9 +117,7 @@ impl SqlQueryHandler for DummyInstance {
|
||||
|
||||
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let mut server = HttpServer::new(instance.clone(),
|
||||
instance.clone(),
|
||||
HttpOptions::default());
|
||||
let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default());
|
||||
server.set_prom_handler(instance);
|
||||
server.make_app()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user