diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index f8c6af218a..75779690d2 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -2,6 +2,7 @@ use clap::Parser; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; +use frontend::instance::Instance; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; @@ -56,7 +57,7 @@ struct StartCommand { impl StartCommand { async fn run(self) -> Result<()> { let opts = self.try_into()?; - let mut frontend = Frontend::new(opts); + let mut frontend = Frontend::new(opts, Instance::new()); frontend.start().await.context(error::StartFrontendSnafu) } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 47fe7bf405..306f92a1a6 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -6,7 +6,7 @@ use snafu::prelude::*; use crate::error::{self, Result}; use crate::grpc::GrpcOptions; use crate::influxdb::InfluxdbOptions; -use crate::instance::Instance; +use crate::instance::FrontendInstance; use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; use crate::postgres::PostgresOptions; @@ -45,14 +45,19 @@ impl FrontendOptions { } } -pub struct Frontend { +pub struct Frontend +where + T: FrontendInstance, +{ opts: FrontendOptions, - instance: Option, + instance: Option, } -impl Frontend { - pub fn new(opts: FrontendOptions) -> Self { - let instance = Instance::new(); +impl Frontend +where + T: FrontendInstance, +{ + pub fn new(opts: FrontendOptions, instance: T) -> Self { Self { opts, instance: Some(instance), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5d72af5099..d840222a59 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -3,7 +3,6 @@ mod opentsdb; mod prometheus; use std::collections::HashMap; -use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{ @@ -17,7 +16,10 @@ use common_error::prelude::BoxedError; use common_query::Output; use datatypes::schema::ColumnSchema; use servers::error as server_error; -use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler}; +use servers::query_handler::{ + GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, + PrometheusProtocolHandler, SqlQueryHandler, +}; use snafu::prelude::*; use sql::ast::{ColumnDef, TableConstraint}; use sql::statements::create_table::{CreateTable, TIME_INDEX}; @@ -28,7 +30,20 @@ use sql::{dialect::GenericDialect, parser::ParserContext}; use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; use crate::frontend::FrontendOptions; -pub(crate) type InstanceRef = Arc; +#[async_trait] +pub trait FrontendInstance: + GrpcAdminHandler + + GrpcQueryHandler + + SqlQueryHandler + + OpentsdbProtocolHandler + + InfluxdbLineProtocolHandler + + PrometheusProtocolHandler + + Send + + Sync + + 'static +{ + async fn start(&mut self, opts: &FrontendOptions) -> Result<()>; +} #[derive(Default)] pub struct Instance { @@ -36,16 +51,10 @@ pub struct Instance { } impl Instance { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Default::default() } - pub(crate) async fn start(&mut self, opts: &FrontendOptions) -> Result<()> { - let addr = opts.datanode_grpc_addr(); - self.client.start(vec![addr]); - Ok(()) - } - // TODO(fys): temporarily hard code pub fn database(&self) -> Database { Database::new("greptime", self.client.clone()) @@ -57,6 +66,15 @@ impl Instance { } } +#[async_trait] +impl FrontendInstance for Instance { + async fn start(&mut self, opts: &FrontendOptions) -> Result<()> { + let addr = opts.datanode_grpc_addr(); + self.client.start(vec![addr]); + Ok(()) + } +} + #[cfg(test)] impl Instance { pub fn with_client(client: Client) -> Self { diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 882c671bb4..72a0eb31de 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -14,13 +14,16 @@ use tokio::try_join; use crate::error::{self, Result}; use crate::frontend::FrontendOptions; use crate::influxdb::InfluxdbOptions; -use crate::instance::InstanceRef; +use crate::instance::FrontendInstance; use crate::prometheus::PrometheusOptions; pub(crate) struct Services; impl Services { - pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> { + pub(crate) async fn start(opts: &FrontendOptions, instance: Arc) -> Result<()> + where + T: FrontendInstance, + { let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options { let grpc_addr = parse_addr(&opts.addr)?;