fix: unit tests

This commit is contained in:
Lei, HUANG
2023-03-14 16:11:25 +08:00
parent 8b9671f376
commit d1ba9ca126
9 changed files with 64 additions and 16 deletions

View File

@@ -96,7 +96,6 @@ impl Instance {
table_name,
region_number: expr.region_id,
};
println!("req: {:?}\n", req);
self.sql_handler()
.execute(SqlRequest::FlushTable(req), QueryContext::arc())
.await

View File

@@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() {
assert!(!has_parquet_file(&region_dir));
// Trigger flush all region
table.flush(FlushTableRequest::default()).await.unwrap();
table.flush(None).await.unwrap();
// Trigger again, wait for the previous task finished
table.flush(FlushTableRequest::default()).await.unwrap();
table.flush(None).await.unwrap();
assert!(has_parquet_file(&region_dir));
}
@@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() {
};
// Trigger flush all region
table.flush(req.clone()).await.unwrap();
table.flush(req.region_number).await.unwrap();
// Trigger again, wait for the previous task finished
table.flush(req).await.unwrap();
table.flush(req.region_number).await.unwrap();
assert!(has_parquet_file(&region_dir));
}

View File

@@ -349,7 +349,6 @@ async fn serve_docs() -> Html<String> {
#[derive(Clone)]
pub struct ApiState {
pub sql_handler: ServerSqlQueryHandlerRef,
pub grpc_handler: ServerGrpcQueryHandlerRef,
pub script_handler: Option<ScriptHandlerRef>,
}
@@ -428,7 +427,6 @@ impl HttpServer {
let sql_router = self
.route_sql(ApiState {
sql_handler: self.sql_handler.clone(),
grpc_handler: self.grpc_handler.clone(),
script_handler: self.script_handler.clone(),
})
.finish_api(&mut api)
@@ -604,15 +602,26 @@ mod test {
use query::parser::PromQuery;
use session::context::QueryContextRef;
use tokio::sync::mpsc;
use api::v1::greptime_request::Request;
use super::*;
use crate::error::Error;
use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor};
use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler};
struct DummyInstance {
_tx: mpsc::Sender<(String, Vec<u8>)>,
}
#[async_trait]
impl GrpcQueryHandler for DummyInstance {
type Error = Error;
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
unimplemented!()
}
}
#[async_trait]
impl SqlQueryHandler for DummyInstance {
type Error = Error;
@@ -652,9 +661,11 @@ mod test {
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { _tx: tx });
let instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
let server = HttpServer::new(instance.clone(),
instance,
let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance);
let server = HttpServer::new(sql_instance,
grpc_instance,
HttpOptions::default());
server.make_app().route(
"/test/timeout",

View File

@@ -41,7 +41,7 @@ pub async fn flush(
// if table name is not present, flush all tables inside schema
let table_name = params.get("table_name").cloned().unwrap_or_default();
let region_id: Option<u32> = params.get("region_id").map(|v| v.parse()).transpose().ok().flatten();
let region_id: Option<u32> = params.get("region").map(|v| v.parse()).transpose().ok().flatten();
let request = Request::Ddl(DdlRequest {
expr: Some(Expr::FlushTable(FlushTableExpr {

View File

@@ -17,11 +17,12 @@ use axum_test_helper::TestClient;
use servers::http::{HttpOptions, HttpServer};
use table::test_util::MemTable;
use crate::create_testing_sql_query_handler;
use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler};
fn make_test_app() -> Router {
let server = HttpServer::new(
create_testing_sql_query_handler(MemTable::default_numbers_table()),
create_testing_grpc_query_handler(MemTable::default_numbers_table()),
HttpOptions::default(),
);
server.make_app()

View File

@@ -28,6 +28,8 @@ use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use tokio::sync::mpsc;
use api::v1::greptime_request::Request;
use servers::query_handler::grpc::GrpcQueryHandler;
use crate::auth::{DatabaseAuthInfo, MockUserProvider};
@@ -35,6 +37,15 @@ struct DummyInstance {
tx: Arc<mpsc::Sender<(String, String)>>,
}
#[async_trait]
impl GrpcQueryHandler for DummyInstance {
type Error =Error;
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
unimplemented!()
}
}
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
@@ -79,7 +90,9 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone(), HttpOptions::default());
let mut server = HttpServer::new(instance.clone(),
instance.clone(),
HttpOptions::default());
let mut user_provider = MockUserProvider::default();
if let Some(name) = db_name {
user_provider.set_authorization_info(DatabaseAuthInfo {

View File

@@ -27,11 +27,22 @@ use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use tokio::sync::mpsc;
use api::v1::greptime_request::Request;
use servers::query_handler::grpc::GrpcQueryHandler;
struct DummyInstance {
tx: mpsc::Sender<String>,
}
#[async_trait]
impl GrpcQueryHandler for DummyInstance {
type Error = crate::Error;
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
unimplemented!()
}
}
#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
@@ -77,7 +88,7 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone(), HttpOptions::default());
let mut server = HttpServer::new(instance.clone(),instance.clone(), HttpOptions::default());
server.set_opentsdb_handler(instance);
server.make_app()
}

View File

@@ -32,11 +32,22 @@ use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use session::context::QueryContextRef;
use tokio::sync::mpsc;
use api::v1::greptime_request::Request;
use servers::query_handler::grpc::GrpcQueryHandler;
struct DummyInstance {
tx: mpsc::Sender<(String, Vec<u8>)>,
}
#[async_trait]
impl GrpcQueryHandler for DummyInstance {
type Error =Error;
async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result<Output, Self::Error> {
unimplemented!()
}
}
#[async_trait]
impl PrometheusProtocolHandler for DummyInstance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> {
@@ -102,7 +113,9 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone(), HttpOptions::default());
let mut server = HttpServer::new(instance.clone(),
instance.clone(),
HttpOptions::default());
server.set_prom_handler(instance);
server.make_app()
}

View File

@@ -47,7 +47,7 @@ mod py_script;
const LOCALHOST_WITH_0: &str = "127.0.0.1:0";
struct DummyInstance {
pub struct DummyInstance {
query_engine: QueryEngineRef,
py_engine: Arc<PyEngine>,
scripts: RwLock<HashMap<String, Arc<PyScript>>>,