feat: frontend start with instance param (#385)

* chore: fix conflict

* chore: remove unused import
This commit is contained in:
shuiyisong
2022-11-03 18:05:01 +08:00
committed by GitHub
parent 9fd2d4e8db
commit 750310c648
4 changed files with 46 additions and 19 deletions

View File

@@ -2,6 +2,7 @@ use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
@@ -56,7 +57,7 @@ struct StartCommand {
impl StartCommand {
async fn run(self) -> Result<()> {
let opts = self.try_into()?;
let mut frontend = Frontend::new(opts);
let mut frontend = Frontend::new(opts, Instance::new());
frontend.start().await.context(error::StartFrontendSnafu)
}
}

View File

@@ -6,7 +6,7 @@ use snafu::prelude::*;
use crate::error::{self, Result};
use crate::grpc::GrpcOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::Instance;
use crate::instance::FrontendInstance;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
@@ -45,14 +45,19 @@ impl FrontendOptions {
}
}
pub struct Frontend {
pub struct Frontend<T>
where
T: FrontendInstance,
{
opts: FrontendOptions,
instance: Option<Instance>,
instance: Option<T>,
}
impl Frontend {
pub fn new(opts: FrontendOptions) -> Self {
let instance = Instance::new();
impl<T> Frontend<T>
where
T: FrontendInstance,
{
pub fn new(opts: FrontendOptions, instance: T) -> Self {
Self {
opts,
instance: Some(instance),

View File

@@ -3,7 +3,6 @@ mod opentsdb;
mod prometheus;
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
@@ -17,7 +16,10 @@ use common_error::prelude::BoxedError;
use common_query::Output;
use datatypes::schema::ColumnSchema;
use servers::error as server_error;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use servers::query_handler::{
GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
PrometheusProtocolHandler, SqlQueryHandler,
};
use snafu::prelude::*;
use sql::ast::{ColumnDef, TableConstraint};
use sql::statements::create_table::{CreateTable, TIME_INDEX};
@@ -28,7 +30,20 @@ use sql::{dialect::GenericDialect, parser::ParserContext};
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::frontend::FrontendOptions;
pub(crate) type InstanceRef = Arc<Instance>;
#[async_trait]
pub trait FrontendInstance:
GrpcAdminHandler
+ GrpcQueryHandler
+ SqlQueryHandler
+ OpentsdbProtocolHandler
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ Send
+ Sync
+ 'static
{
async fn start(&mut self, opts: &FrontendOptions) -> Result<()>;
}
#[derive(Default)]
pub struct Instance {
@@ -36,16 +51,10 @@ pub struct Instance {
}
impl Instance {
pub(crate) fn new() -> Self {
pub fn new() -> Self {
Default::default()
}
pub(crate) async fn start(&mut self, opts: &FrontendOptions) -> Result<()> {
let addr = opts.datanode_grpc_addr();
self.client.start(vec![addr]);
Ok(())
}
// TODO(fys): temporarily hard code
pub fn database(&self) -> Database {
Database::new("greptime", self.client.clone())
@@ -57,6 +66,15 @@ impl Instance {
}
}
#[async_trait]
impl FrontendInstance for Instance {
async fn start(&mut self, opts: &FrontendOptions) -> Result<()> {
let addr = opts.datanode_grpc_addr();
self.client.start(vec![addr]);
Ok(())
}
}
#[cfg(test)]
impl Instance {
pub fn with_client(client: Client) -> Self {

View File

@@ -14,13 +14,16 @@ use tokio::try_join;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::InstanceRef;
use crate::instance::FrontendInstance;
use crate::prometheus::PrometheusOptions;
pub(crate) struct Services;
impl Services {
pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> {
pub(crate) async fn start<T>(opts: &FrontendOptions, instance: Arc<T>) -> Result<()>
where
T: FrontendInstance,
{
let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options {
let grpc_addr = parse_addr(&opts.addr)?;