diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index f15f248f5e..8170d96f67 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -96,7 +96,6 @@ impl Instance { table_name, region_number: expr.region_id, }; - println!("req: {:?}\n", req); self.sql_handler() .execute(SqlRequest::FlushTable(req), QueryContext::arc()) .await diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 9771cb668e..c252d4f0c1 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() { assert!(!has_parquet_file(®ion_dir)); // Trigger flush all region - table.flush(FlushTableRequest::default()).await.unwrap(); + table.flush(None).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(FlushTableRequest::default()).await.unwrap(); + table.flush(None).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } @@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() { }; // Trigger flush all region - table.flush(req.clone()).await.unwrap(); + table.flush(req.region_number).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(req).await.unwrap(); + table.flush(req.region_number).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index b731eb0542..0c049fd4eb 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -349,7 +349,6 @@ async fn serve_docs() -> Html { #[derive(Clone)] pub struct ApiState { pub sql_handler: ServerSqlQueryHandlerRef, - pub grpc_handler: ServerGrpcQueryHandlerRef, pub script_handler: Option, } @@ -428,7 +427,6 @@ impl HttpServer { let sql_router = self .route_sql(ApiState { sql_handler: self.sql_handler.clone(), - grpc_handler: self.grpc_handler.clone(), script_handler: self.script_handler.clone(), }) .finish_api(&mut api) @@ -604,15 +602,26 @@ mod test { use query::parser::PromQuery; use session::context::QueryContextRef; use tokio::sync::mpsc; + use api::v1::greptime_request::Request; use super::*; use crate::error::Error; + use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor}; use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; struct DummyInstance { _tx: mpsc::Sender<(String, Vec)>, } + #[async_trait] + impl GrpcQueryHandler for DummyInstance { + type Error = Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } + } + #[async_trait] impl SqlQueryHandler for DummyInstance { type Error = Error; @@ -652,9 +661,11 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); - let server = HttpServer::new(instance.clone(), - instance, + let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); + let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); + + let server = HttpServer::new(sql_instance, + grpc_instance, HttpOptions::default()); server.make_app().route( "/test/timeout", diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs index ea60c7da38..466bb313fb 100644 --- a/src/servers/src/http/admin.rs +++ b/src/servers/src/http/admin.rs @@ -41,7 +41,7 @@ pub async fn flush( // if table name is not present, flush all tables inside schema let table_name = params.get("table_name").cloned().unwrap_or_default(); - let region_id: Option = params.get("region_id").map(|v| v.parse()).transpose().ok().flatten(); + let region_id: Option = params.get("region").map(|v| v.parse()).transpose().ok().flatten(); let request = Request::Ddl(DdlRequest { expr: Some(Expr::FlushTable(FlushTableExpr { diff --git a/src/servers/tests/http/http_test.rs b/src/servers/tests/http/http_test.rs index cf4ca38b07..c6398022b0 100644 --- a/src/servers/tests/http/http_test.rs +++ b/src/servers/tests/http/http_test.rs @@ -17,11 +17,12 @@ use axum_test_helper::TestClient; use servers::http::{HttpOptions, HttpServer}; use table::test_util::MemTable; -use crate::create_testing_sql_query_handler; +use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler}; fn make_test_app() -> Router { let server = HttpServer::new( create_testing_sql_query_handler(MemTable::default_numbers_table()), + create_testing_grpc_query_handler(MemTable::default_numbers_table()), HttpOptions::default(), ); server.make_app() diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 123d9e0d02..3f1e2ab7fd 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -28,6 +28,8 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; +use api::v1::greptime_request::Request; +use servers::query_handler::grpc::GrpcQueryHandler; use crate::auth::{DatabaseAuthInfo, MockUserProvider}; @@ -35,6 +37,15 @@ struct DummyInstance { tx: Arc>, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error =Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { @@ -79,7 +90,9 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), + instance.clone(), + HttpOptions::default()); let mut user_provider = MockUserProvider::default(); if let Some(name) = db_name { user_provider.set_authorization_info(DatabaseAuthInfo { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 96e40d8fd0..453cb2f7ee 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -27,11 +27,22 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; +use api::v1::greptime_request::Request; +use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error = crate::Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { @@ -77,7 +88,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(),instance.clone(), HttpOptions::default()); server.set_opentsdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 2a40af43b0..5677fb6c8d 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -32,11 +32,22 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; use tokio::sync::mpsc; +use api::v1::greptime_request::Request; +use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender<(String, Vec)>, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error =Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl PrometheusProtocolHandler for DummyInstance { async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> { @@ -102,7 +113,9 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), + instance.clone(), + HttpOptions::default()); server.set_prom_handler(instance); server.make_app() } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 789b952c5b..785614d570 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -47,7 +47,7 @@ mod py_script; const LOCALHOST_WITH_0: &str = "127.0.0.1:0"; -struct DummyInstance { +pub struct DummyInstance { query_engine: QueryEngineRef, py_engine: Arc, scripts: RwLock>>,