refactor(frontend): minor changes around FrontendInstance constructor (#748)

* refactor: minor changes in some testing codes

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-12-15 14:34:40 +08:00
committed by GitHub
parent e3785fca70
commit 61d8bc2ea1
9 changed files with 88 additions and 142 deletions

View File

@@ -193,7 +193,6 @@ async fn build_frontend(
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
Ok(Frontend::new(fe_opts, frontend_instance, plugins))
}

View File

@@ -24,7 +24,7 @@ use query::QueryEngineFactory;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
use table::table::{TableIdProvider, TableIdProviderRef};
use table::table::TableIdProvider;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
@@ -34,57 +34,6 @@ use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;
impl Instance {
// This method is used in other crate's testing codes, so move it out of "cfg(test)".
// TODO(LFC): Delete it when callers no longer need it.
pub async fn new_mock() -> Result<Self> {
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
let mock_info = meta_srv::mocks::mock_with_memstore().await;
let meta_client = Arc::new(mock_meta_client(mock_info, 0).await);
let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let logstore = Arc::new(create_local_file_log_store(dir.path().to_str().unwrap()).await?);
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
MockEngine::default(),
object_store,
));
let catalog_manager = Arc::new(
catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(
mock_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
);
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
.unwrap();
let heartbeat_task = Some(HeartbeatTask::new(
0,
"127.0.0.1:3302".to_string(),
meta_client,
));
let table_id_provider = Some(catalog_manager.clone() as TableIdProviderRef);
Ok(Self {
query_engine,
sql_handler,
catalog_manager,
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}
pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result<Self> {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
Self::with_mock_meta_server(opts, mock_info).await

View File

@@ -173,10 +173,10 @@ async fn assert_query_result(instance: &Instance, sql: &str, ts: i64, host: &str
}
}
async fn setup_test_instance() -> Instance {
async fn setup_test_instance(test_name: &str) -> Instance {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_insert");
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(test_name);
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
@@ -193,7 +193,7 @@ async fn setup_test_instance() -> Instance {
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = setup_test_instance().await;
let instance = setup_test_instance("test_execute_insert").await;
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
@@ -409,18 +409,10 @@ async fn check_output_stream(output: Output, expected: Vec<&str>) {
assert_eq!(pretty_print, expected);
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table() {
let instance = Instance::new_mock().await.unwrap();
instance.start().await.unwrap();
let instance = setup_test_instance("test_alter_table").await;
test_util::create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
// make sure table insertion is ok before altering table
execute_sql(
&instance,

View File

@@ -56,13 +56,13 @@ use sql::parser::ParserContext;
use sql::statements::create::Partitions;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, CatalogNotFoundSnafu, CatalogSnafu, CreateDatabaseSnafu,
CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
SchemaNotFoundSnafu,
self, AlterTableOnInsertionSnafu, CatalogSnafu, CreateDatabaseSnafu, CreateTableSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
@@ -90,8 +90,7 @@ pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
#[derive(Clone)]
pub struct Instance {
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<CatalogManagerRef>,
catalog_manager: CatalogManagerRef,
/// Script handler is None in distributed mode, only works on standalone mode.
script_handler: Option<ScriptHandlerRef>,
create_expr_factory: CreateExprFactoryRef,
@@ -128,7 +127,7 @@ impl Instance {
let dist_instance_ref = Arc::new(dist_instance.clone());
Ok(Instance {
catalog_manager: Some(catalog_manager),
catalog_manager,
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Distributed,
@@ -171,7 +170,7 @@ impl Instance {
pub fn new_standalone(dn_instance: DnInstanceRef) -> Self {
Instance {
catalog_manager: None,
catalog_manager: dn_instance.catalog_manager().clone(),
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Standalone,
@@ -182,18 +181,10 @@ impl Instance {
}
}
pub fn catalog_manager(&self) -> &Option<CatalogManagerRef> {
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
pub fn set_catalog_manager(&mut self, catalog_manager: CatalogManagerRef) {
debug_assert!(
self.catalog_manager.is_none(),
"Catalog manager can be set only once!"
);
self.catalog_manager = Some(catalog_manager);
}
pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) {
debug_assert!(
self.script_handler.is_none(),
@@ -293,21 +284,7 @@ impl Instance {
table_name: &str,
columns: &[Column],
) -> Result<()> {
match self
.catalog_manager
.as_ref()
.expect("catalog manager cannot be None")
.catalog(catalog_name)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu { catalog_name })?
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: schema_name,
})?
.table(table_name)
.context(CatalogSnafu)?
{
match self.find_table(catalog_name, schema_name, table_name)? {
None => {
info!(
"Table {}.{}.{} does not exist, try create table",
@@ -403,8 +380,6 @@ impl Instance {
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
self.catalog_manager
.as_ref()
.context(error::CatalogManagerSnafu)?
.catalog(catalog_name)
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu { catalog_name })
@@ -419,6 +394,12 @@ impl Instance {
})
}
fn find_table(&self, catalog: &str, schema: &str, table: &str) -> Result<Option<TableRef>> {
self.catalog_manager
.table(catalog, schema, table)
.context(CatalogSnafu)
}
async fn sql_dist_insert(&self, insert: Box<Insert>) -> Result<usize> {
let (catalog, schema, table) = insert.full_table_name().context(error::ParseSqlSnafu)?;
@@ -458,23 +439,17 @@ impl Instance {
}
fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
let catalog_manager = &self.catalog_manager;
if let Some(catalog_manager) = catalog_manager {
ensure!(
catalog_manager
.schema(DEFAULT_CATALOG_NAME, &db)
.context(error::CatalogSnafu)?
.is_some(),
error::SchemaNotFoundSnafu { schema_info: &db }
);
ensure!(
self.catalog_manager
.schema(DEFAULT_CATALOG_NAME, &db)
.context(error::CatalogSnafu)?
.is_some(),
error::SchemaNotFoundSnafu { schema_info: &db }
);
query_ctx.set_current_schema(&db);
query_ctx.set_current_schema(&db);
Ok(Output::RecordBatches(RecordBatches::empty()))
} else {
// TODO(LFC): Handle "use" stmt here.
unimplemented!()
}
Ok(Output::RecordBatches(RecordBatches::empty()))
}
}
@@ -679,11 +654,11 @@ mod tests {
use super::*;
use crate::tests;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_sql() {
let query_ctx = Arc::new(QueryContext::new());
let instance = tests::create_frontend_instance().await;
let (instance, _guard) = tests::create_frontend_instance("test_execute_sql").await;
let sql = r#"CREATE TABLE demo(
host STRING,
@@ -761,9 +736,9 @@ mod tests {
};
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_grpc() {
let instance = tests::create_frontend_instance().await;
let (instance, _guard) = tests::create_frontend_instance("test_execute_grpc").await;
// testing data:
let expected_host_col = Column {

View File

@@ -70,9 +70,9 @@ mod tests {
use super::*;
use crate::tests;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_exec() {
let instance = tests::create_frontend_instance().await;
let (instance, _guard) = tests::create_frontend_instance("test_exec").await;
instance
.exec(
&DataPoint::try_create(
@@ -88,9 +88,10 @@ mod tests {
.unwrap();
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_opentsdb_metric() {
let instance = tests::create_frontend_instance().await;
let (instance, _guard) =
tests::create_frontend_instance("test_insert_opentsdb_metric").await;
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
@@ -124,7 +125,10 @@ mod tests {
assert!(result.is_ok());
let output = instance
.do_query("select * from my_metric_1", Arc::new(QueryContext::new()))
.do_query(
"select * from my_metric_1 order by greptime_timestamp",
Arc::new(QueryContext::new()),
)
.await
.unwrap();
match output {

View File

@@ -182,10 +182,11 @@ mod tests {
use super::*;
use crate::tests;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_prometheus_remote_write_and_read() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_frontend_instance().await;
let (instance, _guard) =
tests::create_frontend_instance("test_prometheus_remote_write_and_read").await;
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),

View File

@@ -29,6 +29,7 @@ use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::memory::MemStore;
use servers::grpc::GrpcServer;
use servers::Mode;
use tempdir::TempDir;
use tonic::transport::Server;
use tower::service_fn;
@@ -39,21 +40,42 @@ use crate::instance::distributed::DistInstance;
use crate::instance::Instance;
use crate::table::route::TableRoutes;
async fn create_datanode_instance() -> Arc<DatanodeInstance> {
// TODO(LFC) Use real Mito engine when we can alter its region schema,
// and delete the `new_mock` method.
let instance = Arc::new(DatanodeInstance::new_mock().await.unwrap());
instance.start().await.unwrap();
instance
/// Guard against the `TempDir`s that used in unit tests.
/// (The `TempDir` will be deleted once it goes out of scope.)
pub struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
}
pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
let datanode_instance: Arc<DatanodeInstance> = create_datanode_instance().await;
let dn_catalog_manager = datanode_instance.catalog_manager().clone();
pub(crate) async fn create_frontend_instance(test_name: &str) -> (Arc<Instance>, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(test_name);
let datanode_instance = DatanodeInstance::with_mock_meta_client(&opts)
.await
.unwrap();
datanode_instance.start().await.unwrap();
let mut frontend_instance = Instance::new_standalone(datanode_instance);
frontend_instance.set_catalog_manager(dn_catalog_manager);
Arc::new(frontend_instance)
let frontend_instance = Instance::new_standalone(Arc::new(datanode_instance));
(Arc::new(frontend_instance), guard)
}
fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
mode: Mode::Standalone,
..Default::default()
};
(
opts,
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
},
)
}
pub(crate) async fn create_datanode_client(

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::error;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use pgwire::tokio::process_socket;
use tokio;
@@ -79,6 +80,11 @@ impl PostgresServer {
match tcp_stream {
Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt.
Ok(io_stream) => {
match io_stream.peer_addr() {
Ok(addr) => debug!("PostgreSQL client coming from {}", addr),
Err(e) => warn!("Failed to get PostgreSQL client addr, err: {}", e),
}
io_runtime.spawn(process_socket(
io_stream,
tls_acceptor.clone(),
@@ -102,6 +108,7 @@ impl Server for PostgresServer {
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr> {
let (stream, addr) = self.base_server.bind(listening).await?;
debug!("Starting PostgreSQL with TLS option: {:?}", self.tls);
let tls_acceptor = self
.tls
.setup()?

View File

@@ -214,7 +214,6 @@ pub async fn create_test_table(
async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance
}
@@ -243,7 +242,7 @@ pub async fn setup_test_app_with_frontend(
let mut frontend = build_frontend_instance(instance.clone()).await;
instance.start().await.unwrap();
create_test_table(
frontend.catalog_manager().as_ref().unwrap(),
frontend.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
@@ -276,9 +275,7 @@ pub async fn setup_grpc_server(
let fe_grpc_addr = format!("127.0.0.1:{}", get_port());
let mut fe_instance = frontend::instance::Instance::new_standalone(instance.clone());
fe_instance.set_catalog_manager(instance.catalog_manager().clone());
let fe_instance = frontend::instance::Instance::new_standalone(instance.clone());
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
fe_instance_ref.clone(),