mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat: manual flush http API
This commit is contained in:
@@ -84,12 +84,19 @@ impl Instance {
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
|
||||
let table_name = if expr.table_name.trim().is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(expr.table_name)
|
||||
};
|
||||
|
||||
let req = FlushTableRequest {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_name,
|
||||
region_number: expr.region_id,
|
||||
};
|
||||
println!("req: {:?}\n", req);
|
||||
self.sql_handler()
|
||||
.execute(SqlRequest::FlushTable(req), QueryContext::arc())
|
||||
.await
|
||||
@@ -148,7 +155,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
fn test_create_column_schema() {
|
||||
let column_def = ColumnDef {
|
||||
name: "a".to_string(),
|
||||
@@ -242,7 +248,7 @@ mod tests {
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
];
|
||||
|
||||
@@ -13,28 +13,43 @@
|
||||
// limitations under the License.
|
||||
|
||||
use common_query::Output;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::engine::TableReference;
|
||||
use table::requests::FlushTableRequest;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::error::{self, CatalogSnafu, DatabaseNotFoundSnafu, Result};
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl SqlHandler {
|
||||
pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result<Output> {
|
||||
if let Some(table) = &req.table_name {
|
||||
self.flush_table_inner(&req.catalog_name, &req.schema_name, table, req.region_number).await?;
|
||||
} else {
|
||||
let schema = self.catalog_manager.schema(&req.catalog_name, &req.schema_name).context(CatalogSnafu)?.context(DatabaseNotFoundSnafu {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
})?;
|
||||
|
||||
let all_table_names = schema.table_names().context(CatalogSnafu)?;
|
||||
futures::future::join_all(all_table_names.iter().map(|table| {
|
||||
self.flush_table_inner(&req.catalog_name, &req.schema_name,table, req.region_number)
|
||||
})
|
||||
).await.into_iter().collect::<Result<Vec<_>>>()?;
|
||||
}
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
|
||||
async fn flush_table_inner(&self, catalog:&str, schema:&str, table: &str, region: Option<u32>) -> Result<()> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &req.table_name,
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
};
|
||||
|
||||
let full_table_name = table_ref.to_string();
|
||||
|
||||
let table = self.get_table(&table_ref)?;
|
||||
|
||||
table.flush(req).await.context(error::FlushTableSnafu {
|
||||
table.flush(region).await.context(error::FlushTableSnafu {
|
||||
table_name: full_table_name,
|
||||
})?;
|
||||
Ok(Output::AffectedRows(0))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,8 +150,10 @@ impl Services {
|
||||
if let Some(http_options) = &opts.http_options {
|
||||
let http_addr = parse_addr(&http_options.addr)?;
|
||||
|
||||
|
||||
let mut http_server = HttpServer::new(
|
||||
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
|
||||
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
|
||||
http_options.clone(),
|
||||
);
|
||||
if let Some(user_provider) = user_provider.clone() {
|
||||
|
||||
@@ -44,7 +44,7 @@ use table::metadata::{
|
||||
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
|
||||
};
|
||||
use table::requests::{
|
||||
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
|
||||
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest,
|
||||
};
|
||||
use table::table::scan::SimpleTableScan;
|
||||
use table::table::{AlterContext, Table};
|
||||
@@ -323,9 +323,9 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
Ok(rows_deleted)
|
||||
}
|
||||
|
||||
async fn flush(&self, request: FlushTableRequest) -> TableResult<()> {
|
||||
if let Some(region_id) = request.region_number {
|
||||
if let Some(region) = self.regions.get(®ion_id) {
|
||||
async fn flush(&self, region_number: Option<RegionNumber>) -> TableResult<()> {
|
||||
if let Some(region_number) = region_number {
|
||||
if let Some(region) = self.regions.get(®ion_number) {
|
||||
region
|
||||
.flush()
|
||||
.await
|
||||
|
||||
@@ -160,9 +160,9 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to put OpenTSDB data point: {:?}, source: {}",
|
||||
data_point,
|
||||
source
|
||||
"Failed to put OpenTSDB data point: {:?}, source: {}",
|
||||
data_point,
|
||||
source
|
||||
))]
|
||||
PutOpentsdbDataPoint {
|
||||
data_point: String,
|
||||
@@ -263,8 +263,26 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: common_mem_prof::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid prepare statement: {}", err_msg))]
|
||||
InvalidPrepareStatement { err_msg: String },
|
||||
|
||||
#[snafu(display("Invalid flush argument: {}", err_msg))]
|
||||
InvalidFlushArgument {
|
||||
err_msg: String,
|
||||
},
|
||||
|
||||
// #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}",
|
||||
// catalog, schema, table, region_id, source
|
||||
// ))]
|
||||
// FlushTable {
|
||||
// catalog: String,
|
||||
// schema: String,
|
||||
// table: String,
|
||||
// region_id: Option<u32>,
|
||||
// #[snafu(backtrace)]
|
||||
// source: Error,
|
||||
// },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -327,6 +345,7 @@ impl ErrorExt for Error {
|
||||
DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,
|
||||
#[cfg(feature = "mem-prof")]
|
||||
DumpProfileData { source, .. } => source.status_code(),
|
||||
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod script;
|
||||
|
||||
#[cfg(feature = "mem-prof")]
|
||||
pub mod mem_prof;
|
||||
mod admin;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -56,11 +57,13 @@ use self::authorize::HttpAuth;
|
||||
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
|
||||
use crate::auth::UserProviderRef;
|
||||
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
|
||||
use crate::http::admin::flush;
|
||||
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
||||
use crate::query_handler::{
|
||||
InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef,
|
||||
ScriptHandlerRef,
|
||||
};
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
use crate::server::Server;
|
||||
|
||||
/// create query context from database name information, catalog and schema are
|
||||
@@ -96,6 +99,7 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"]
|
||||
|
||||
pub struct HttpServer {
|
||||
sql_handler: ServerSqlQueryHandlerRef,
|
||||
grpc_handler: ServerGrpcQueryHandlerRef,
|
||||
options: HttpOptions,
|
||||
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
|
||||
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
|
||||
@@ -345,13 +349,17 @@ async fn serve_docs() -> Html<String> {
|
||||
#[derive(Clone)]
|
||||
pub struct ApiState {
|
||||
pub sql_handler: ServerSqlQueryHandlerRef,
|
||||
pub grpc_handler: ServerGrpcQueryHandlerRef,
|
||||
pub script_handler: Option<ScriptHandlerRef>,
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn new(sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions) -> Self {
|
||||
pub fn new(sql_handler: ServerSqlQueryHandlerRef,
|
||||
grpc_handler: ServerGrpcQueryHandlerRef,
|
||||
options: HttpOptions) -> Self {
|
||||
Self {
|
||||
sql_handler,
|
||||
grpc_handler,
|
||||
options,
|
||||
opentsdb_handler: None,
|
||||
influxdb_handler: None,
|
||||
@@ -420,12 +428,14 @@ impl HttpServer {
|
||||
let sql_router = self
|
||||
.route_sql(ApiState {
|
||||
sql_handler: self.sql_handler.clone(),
|
||||
grpc_handler: self.grpc_handler.clone(),
|
||||
script_handler: self.script_handler.clone(),
|
||||
})
|
||||
.finish_api(&mut api)
|
||||
.layer(Extension(api));
|
||||
|
||||
let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router);
|
||||
router = router.nest(&format!("/{HTTP_API_VERSION}/admin"), self.route_admin(self.grpc_handler.clone()));
|
||||
|
||||
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
|
||||
router = router.nest(
|
||||
@@ -517,6 +527,11 @@ impl HttpServer {
|
||||
.route("/api/put", routing::post(opentsdb::put))
|
||||
.with_state(opentsdb_handler)
|
||||
}
|
||||
|
||||
fn route_admin<S>(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router<S> {
|
||||
Router::new().route("/flush", routing::post(flush))
|
||||
.with_state(grpc_handler)
|
||||
}
|
||||
}
|
||||
|
||||
pub const HTTP_SERVER: &str = "HTTP_SERVER";
|
||||
@@ -637,8 +652,10 @@ mod test {
|
||||
|
||||
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
||||
let instance = Arc::new(DummyInstance { _tx: tx });
|
||||
let instance = ServerSqlQueryHandlerAdaptor::arc(instance);
|
||||
let server = HttpServer::new(instance, HttpOptions::default());
|
||||
let instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
|
||||
let server = HttpServer::new(instance.clone(),
|
||||
instance,
|
||||
HttpOptions::default());
|
||||
server.make_app().route(
|
||||
"/test/timeout",
|
||||
get(forever.layer(
|
||||
|
||||
57
src/servers/src/http/admin.rs
Normal file
57
src/servers/src/http/admin.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
|
||||
use axum::extract::{Query, RawBody, State};
|
||||
use axum::http::StatusCode as HttpStatusCode;
|
||||
use axum::Json;
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
use std::collections::HashMap;
|
||||
use snafu::OptionExt;
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::{DdlRequest, FlushTableExpr};
|
||||
use api::v1::ddl_request::Expr;
|
||||
use session::context::QueryContext;
|
||||
use crate::error;
|
||||
|
||||
use crate::error::{Result};
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn flush(
|
||||
State(grpc_handler): State<ServerGrpcQueryHandlerRef>,
|
||||
Query(params): Query<HashMap<String, String>>,
|
||||
RawBody(_): RawBody,
|
||||
) -> Result<(HttpStatusCode, Json<String>)> {
|
||||
let catalog_name = params.get("catalog_name").cloned().unwrap_or("greptime".to_string());
|
||||
let schema_name = params.get("schema_name").cloned().context(error::InvalidFlushArgumentSnafu {
|
||||
err_msg: "schema_name is not present",
|
||||
})?;
|
||||
|
||||
// if table name is not present, flush all tables inside schema
|
||||
let table_name = params.get("table_name").cloned().unwrap_or_default();
|
||||
|
||||
let region_id: Option<u32> = params.get("region_id").map(|v| v.parse()).transpose().ok().flatten();
|
||||
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(Expr::FlushTable(FlushTableExpr {
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
region_id,
|
||||
}))
|
||||
});
|
||||
|
||||
grpc_handler.do_query(request, QueryContext::arc()).await?;
|
||||
Ok((HttpStatusCode::OK, Json::from("hello, world".to_string())))
|
||||
}
|
||||
@@ -43,6 +43,7 @@ pub async fn sql(
|
||||
Form(form_params): Form<SqlQuery>,
|
||||
) -> Json<JsonResponse> {
|
||||
let sql_handler = &state.sql_handler;
|
||||
|
||||
let start = Instant::now();
|
||||
let sql = query_params.sql.or(form_params.sql);
|
||||
let db = query_params.db.or(form_params.db);
|
||||
|
||||
@@ -213,7 +213,7 @@ pub struct CopyTableFromRequest {
|
||||
pub struct FlushTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_name: Option<String>,
|
||||
pub region_number: Option<RegionNumber>,
|
||||
}
|
||||
|
||||
|
||||
@@ -23,10 +23,11 @@ use async_trait::async_trait;
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_query::physical_plan::PhysicalPlanRef;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::{Result, UnsupportedSnafu};
|
||||
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};
|
||||
use crate::requests::{AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
|
||||
use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest};
|
||||
|
||||
pub type AlterContext = anymap::Map<dyn Any + Send + Sync>;
|
||||
|
||||
@@ -95,7 +96,8 @@ pub trait Table: Send + Sync {
|
||||
}
|
||||
|
||||
/// Flush table.
|
||||
async fn flush(&self, _request: FlushTableRequest) -> Result<()> {
|
||||
async fn flush(&self, region_number: Option<RegionNumber>) -> Result<()> {
|
||||
let _ = region_number;
|
||||
UnsupportedSnafu { operation: "FLUSH" }.fail()?
|
||||
}
|
||||
|
||||
|
||||
@@ -275,7 +275,8 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
|
||||
.await
|
||||
.unwrap();
|
||||
let http_server = HttpServer::new(
|
||||
ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance))),
|
||||
ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance.clone()))),
|
||||
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
|
||||
HttpOptions::default(),
|
||||
);
|
||||
(http_server.make_app(), guard)
|
||||
@@ -296,8 +297,10 @@ pub async fn setup_test_http_app_with_frontend(
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let frontend_ref = Arc::new(frontend);
|
||||
let mut http_server = HttpServer::new(
|
||||
ServerSqlQueryHandlerAdaptor::arc(Arc::new(frontend)),
|
||||
ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()),
|
||||
ServerGrpcQueryHandlerAdaptor::arc(frontend_ref),
|
||||
HttpOptions::default(),
|
||||
);
|
||||
http_server.set_script_handler(instance.clone());
|
||||
|
||||
Reference in New Issue
Block a user