refactor: add builder for Frontend (#2849)

This commit is contained in:
LFC
2023-12-01 12:39:47 +08:00
committed by GitHub
parent c0df2b9086
commit 9ce9421850
15 changed files with 403 additions and 362 deletions

View File

@@ -19,7 +19,6 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
@@ -55,7 +54,6 @@ pub struct KvBackendCatalogManager {
cache_invalidator: CacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
}
@@ -76,16 +74,11 @@ impl CacheInvalidator for KvBackendCatalogManager {
}
impl KvBackendCatalogManager {
pub fn new(
backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
cache_invalidator,
datanode_manager,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
},
@@ -99,10 +92,6 @@ impl KvBackendCatalogManager {
pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
pub fn datanode_manager(&self) -> DatanodeManagerRef {
self.datanode_manager.clone()
}
}
#[async_trait::async_trait]

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use std::time::Instant;
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
@@ -250,13 +249,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let datanode_clients = Arc::new(DatanodeClients::default());
let catalog_list = KvBackendCatalogManager::new(
cached_meta_backend.clone(),
cached_meta_backend.clone(),
datanode_clients,
);
let catalog_list =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,

View File

@@ -37,6 +37,12 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to init DDL manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
location: Location,
@@ -225,13 +231,6 @@ pub enum Error {
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to parse address {}", addr))]
ParseAddr {
addr: String,
#[snafu(source)]
error: std::net::AddrParseError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -247,9 +246,11 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => {
source.status_code()
}
Error::IterStream { source, .. }
| Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. } => source.status_code(),
Error::ConnectServer { source, .. } => source.status_code(),
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
@@ -259,8 +260,7 @@ impl ErrorExt for Error {
| Error::NotDataFromOutput { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidDatabaseName { .. }
| Error::ParseAddr { .. } => StatusCode::InvalidArguments,
| Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),

View File

@@ -12,18 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::CachedMetaKvBackend;
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result, StartFrontendSnafu};
use crate::error::{self, MissingConfigSnafu, Result, StartFrontendSnafu};
use crate::options::{Options, TopLevelOptions};
pub struct Instance {
@@ -196,10 +204,38 @@ impl StartCommand {
logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);
let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone())
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client'",
})?;
let meta_client = FeInstance::create_meta_client(meta_client_options)
.await
.context(StartFrontendSnafu)?;
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
]);
let heartbeat_task = HeartbeatTask::new(
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
);
let mut instance = FrontendBuilder::new(
meta_backend.clone(),
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(meta_backend)
.with_plugin(plugins)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts)
.await

View File

@@ -173,7 +173,12 @@ impl StartCommand {
logging::info!("MetaSrv start command: {:#?}", self);
logging::info!("MetaSrv options: {:#?}", opts);
let instance = MetaSrvInstance::new(opts, plugins)
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
let instance = MetaSrvInstance::new(opts, plugins, metasrv)
.await
.context(error::BuildMetaServerSnafu)?;

View File

@@ -15,21 +15,23 @@
use std::sync::Arc;
use std::{fs, path};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
@@ -42,9 +44,9 @@ use servers::Mode;
use snafu::ResultExt;
use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StopProcedureManagerSnafu,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};
@@ -156,6 +158,7 @@ impl StandaloneOptions {
wal: self.wal,
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
..Default::default()
}
}
@@ -347,36 +350,25 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let catalog_manager = KvBackendCatalogManager::new(
let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
);
catalog_manager
.table_metadata_manager_ref()
.init()
.await
.context(InitMetadataSnafu)?;
// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
fe_plugins,
kv_backend,
procedure_manager.clone(),
catalog_manager,
region_server,
datanode_manager.clone(),
)
.await?;
let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins)
.try_build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(opts)
.await
@@ -388,26 +380,41 @@ impl StartCommand {
procedure_manager,
})
}
}
/// Build frontend instance in standalone mode
async fn build_frontend(
plugins: Plugins,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let frontend_instance = FeInstance::try_new_standalone(
kv_backend,
procedure_manager,
catalog_manager,
plugins,
region_server,
)
.await
.context(StartFrontendSnafu)?;
Ok(frontend_instance)
async fn create_ddl_task_executor(
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Result<DdlTaskExecutorRef> {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(StandaloneTableMetadataCreator::new(kv_backend)),
)
.context(InitDdlManagerSnafu)?,
);
Ok(ddl_task_executor)
}
async fn create_table_metadata_manager(
kv_backend: KvBackendRef,
) -> Result<TableMetadataManagerRef> {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;
Ok(table_metadata_manager)
}
}
#[cfg(test)]

View File

@@ -182,9 +182,6 @@ pub enum Error {
source: servers::error::Error,
},
#[snafu(display("Missing meta_client_options section in config"))]
MissingMetasrvOpts { location: Location },
#[snafu(display("Failed to find leaders when altering table, table: {}", table))]
LeaderNotFound { table: String, location: Location },
@@ -299,7 +296,6 @@ impl ErrorExt for Error {
| Error::IllegalPrimaryKeysDef { .. }
| Error::SchemaExists { .. }
| Error::ColumnNotFound { .. }
| Error::MissingMetasrvOpts { .. }
| Error::UnsupportedFormat { .. }
| Error::IllegalAuthConfig { .. }
| Error::EmptyData { .. }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod builder;
mod grpc;
mod influxdb;
mod opentsdb;
@@ -21,24 +22,16 @@ mod region_query;
mod script;
pub mod standalone;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl_manager::DdlManager;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
@@ -47,19 +40,18 @@ use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use common_telemetry::logging::info;
use datanode::region_server::RegionServer;
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use operator::delete::{Deleter, DeleterRef};
use operator::insert::{Inserter, InserterRef};
use meta_client::MetaClientOptions;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use operator::table::{table_idents_to_full_name, TableMutationOperator};
use partition::manager::PartitionRuleManager;
use operator::table::table_idents_to_full_name;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
@@ -83,15 +75,11 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use self::region_query::FrontendRegionQueryHandler;
use self::standalone::StandaloneTableMetadataCreator;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
TableOperationSnafu,
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, TableOperationSnafu,
};
use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::metrics;
use crate::script::ScriptExecutor;
@@ -131,99 +119,9 @@ pub struct Instance {
}
impl Instance {
pub async fn try_new_distributed(opts: &FrontendOptions, plugins: Plugins) -> Result<Self> {
let meta_client = Self::create_meta_client(opts).await?;
let datanode_clients = Arc::new(DatanodeClients::default());
Self::try_new_distributed_with(meta_client, datanode_clients, plugins, opts).await
}
pub async fn try_new_distributed_with(
meta_client: Arc<MetaClient>,
datanode_clients: Arc<DatanodeClients>,
plugins: Plugins,
opts: &FrontendOptions,
) -> Result<Self> {
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let catalog_manager = KvBackendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
datanode_clients.clone(),
);
let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));
let region_query_handler = FrontendRegionQueryHandler::arc(
partition_manager.clone(),
catalog_manager.datanode_manager().clone(),
);
let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_clients.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_clients,
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
true,
plugins.clone(),
)
.query_engine();
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
meta_client.clone(),
meta_backend.clone(),
catalog_manager.clone(),
inserter.clone(),
));
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend)),
]);
let heartbeat_task = Some(HeartbeatTask::new(
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(handlers_executor),
));
Ok(Instance {
catalog_manager,
script_executor,
statement_executor,
query_engine,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
heartbeat_task,
inserter,
deleter,
})
}
async fn create_meta_client(opts: &FrontendOptions) -> Result<Arc<MetaClient>> {
let meta_client_options = opts.meta_client.as_ref().context(MissingMetasrvOptsSnafu)?;
pub async fn create_meta_client(
meta_client_options: &MetaClientOptions,
) -> Result<Arc<MetaClient>> {
info!(
"Creating Frontend instance in distributed mode with Meta server addr {:?}",
meta_client_options.metasrv_addrs
@@ -285,82 +183,6 @@ impl Instance {
Ok((kv_backend, procedure_manager))
}
pub async fn try_new_standalone(
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
plugins: Plugins,
region_server: RegionServer,
) -> Result<Self> {
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone());
let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager.clone(),
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler),
Some(table_mutation_handler),
true,
plugins.clone(),
)
.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
)
.context(error::InitDdlManagerSnafu)?,
);
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
ddl_executor,
kv_backend.clone(),
cache_invalidator,
inserter.clone(),
));
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
statement_executor,
query_engine,
plugins,
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
inserter,
deleter,
})
}
pub async fn build_servers(
&mut self,
opts: impl Into<FrontendOptions> + TomlSerializable,
@@ -400,10 +222,13 @@ impl FrontendInstance for Instance {
self.script_executor.start(self)?;
futures::future::try_join_all(self.servers.values().map(start_server))
.await
.context(error::StartServerSnafu)
.map(|_| ())
futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
info!("Starting service: {name}");
start_server(handler).await
}))
.await
.context(error::StartServerSnafu)
.map(|_| ())
}
}

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::kv_backend::KvBackendRef;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::statement::StatementExecutor;
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use query::QueryEngineFactory;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::{Instance, StatementExecutorRef};
use crate::script::ScriptExecutor;
pub struct FrontendBuilder {
kv_backend: KvBackendRef,
cache_invalidator: Option<CacheInvalidatorRef>,
datanode_manager: DatanodeManagerRef,
plugins: Option<Plugins>,
ddl_task_executor: DdlTaskExecutorRef,
heartbeat_task: Option<HeartbeatTask>,
}
impl FrontendBuilder {
pub fn new(
kv_backend: KvBackendRef,
datanode_manager: DatanodeManagerRef,
ddl_task_executor: DdlTaskExecutorRef,
) -> Self {
Self {
kv_backend,
cache_invalidator: None,
datanode_manager,
plugins: None,
ddl_task_executor,
heartbeat_task: None,
}
}
pub fn with_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
Self {
cache_invalidator: Some(cache_invalidator),
..self
}
}
pub fn with_plugin(self, plugins: Plugins) -> Self {
Self {
plugins: Some(plugins),
..self
}
}
pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
Self {
heartbeat_task: Some(heartbeat_task),
..self
}
}
pub async fn try_build(self) -> Result<Instance> {
let kv_backend = self.kv_backend;
let datanode_manager = self.datanode_manager;
let plugins = self.plugins.unwrap_or_default();
let catalog_manager = KvBackendCatalogManager::new(
kv_backend.clone(),
self.cache_invalidator
.unwrap_or_else(|| Arc::new(DummyCacheInvalidator)),
);
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone());
let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager.clone(),
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
true,
plugins.clone(),
)
.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
self.ddl_task_executor,
kv_backend,
catalog_manager.clone(),
inserter.clone(),
));
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
Ok(Instance {
catalog_manager,
script_executor,
statement_executor,
query_engine,
plugins,
servers: Arc::new(HashMap::new()),
heartbeat_task: self.heartbeat_task,
inserter,
deleter,
})
}
}

View File

@@ -107,7 +107,7 @@ impl Datanode for RegionInvoker {
}
}
pub(crate) struct StandaloneTableMetadataCreator {
pub struct StandaloneTableMetadataCreator {
table_id_sequence: SequenceRef,
}

View File

@@ -22,7 +22,7 @@ use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use etcd_client::Client;
use servers::configurator::ConfiguratorRef;
@@ -60,8 +60,11 @@ pub struct MetaSrvInstance {
}
impl MetaSrvInstance {
pub async fn new(opts: MetaSrvOptions, plugins: Plugins) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts, plugins.clone()).await?;
pub async fn new(
opts: MetaSrvOptions,
plugins: Plugins,
meta_srv: MetaSrv,
) -> Result<MetaSrvInstance> {
let http_srv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
@@ -161,28 +164,26 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(admin::make_admin_service(meta_srv))
}
pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result<MetaSrv> {
let (kv_backend, election, lock) = if opts.use_memory_store {
(
pub async fn metasrv_builder(
opts: &MetaSrvOptions,
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetaSrvBuilder> {
let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) {
(Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)),
(None, true) => (
Arc::new(MemoryKvBackend::new()) as _,
None,
Some(Arc::new(MemLock::default()) as _),
)
} else {
let etcd_endpoints = opts
.store_addr
.split(',')
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
let etcd_client = Client::connect(&etcd_endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
(
EtcdStore::with_etcd_client(etcd_client.clone()),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
)
),
(None, false) => {
let etcd_client = create_etcd_client(opts).await?;
(
EtcdStore::with_etcd_client(etcd_client.clone()),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
)
}
};
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
@@ -192,14 +193,24 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result<M
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
};
MetaSrvBuilder::new()
Ok(MetaSrvBuilder::new()
.options(opts.clone())
.kv_backend(kv_backend)
.in_memory(in_memory)
.selector(selector)
.election(election)
.lock(lock)
.plugins(plugins)
.build()
.await
.plugins(plugins))
}
async fn create_etcd_client(opts: &MetaSrvOptions) -> Result<Client> {
let etcd_endpoints = opts
.store_addr
.split(',')
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
Client::connect(&etcd_endpoints, None)
.await
.context(error::ConnectEtcdSnafu)
}

View File

@@ -19,12 +19,14 @@ use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::TableMetadataAllocatorRef;
use common_meta::ddl_manager::{DdlManager, DdlManagerRef};
use common_meta::distributed_time_constants;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::sequence::Sequence;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
@@ -70,8 +72,9 @@ pub struct MetaSrvBuilder {
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClientRef>,
lock: Option<DistLockRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
datanode_manager: Option<DatanodeManagerRef>,
plugins: Option<Plugins>,
table_metadata_allocator: Option<TableMetadataAllocatorRef>,
}
impl MetaSrvBuilder {
@@ -85,8 +88,9 @@ impl MetaSrvBuilder {
election: None,
options: None,
lock: None,
datanode_clients: None,
datanode_manager: None,
plugins: None,
table_metadata_allocator: None,
}
}
@@ -130,8 +134,8 @@ impl MetaSrvBuilder {
self
}
pub fn datanode_clients(mut self, clients: Arc<DatanodeClients>) -> Self {
self.datanode_clients = Some(clients);
pub fn datanode_manager(mut self, datanode_manager: DatanodeManagerRef) -> Self {
self.datanode_manager = Some(datanode_manager);
self
}
@@ -140,6 +144,14 @@ impl MetaSrvBuilder {
self
}
pub fn table_metadata_allocator(
mut self,
table_metadata_allocator: TableMetadataAllocatorRef,
) -> Self {
self.table_metadata_allocator = Some(table_metadata_allocator);
self
}
pub async fn build(self) -> Result<MetaSrv> {
let started = Arc::new(AtomicBool::new(false));
@@ -152,8 +164,9 @@ impl MetaSrvBuilder {
selector,
handler_group,
lock,
datanode_clients,
datanode_manager,
plugins,
table_metadata_allocator,
} = self;
let options = options.unwrap_or_default();
@@ -189,14 +202,22 @@ impl MetaSrvBuilder {
meta_peer_client: meta_peer_client.clone(),
table_id: None,
};
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
table_id_sequence.clone(),
))
});
let ddl_manager = build_ddl_manager(
&options,
datanode_clients,
datanode_manager,
&procedure_manager,
&mailbox,
&table_metadata_manager,
(&selector, &selector_ctx),
&table_id_sequence,
table_metadata_allocator,
)?;
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());
@@ -324,12 +345,11 @@ fn build_procedure_manager(
fn build_ddl_manager(
options: &MetaSrvOptions,
datanode_clients: Option<Arc<DatanodeClients>>,
datanode_clients: Option<DatanodeManagerRef>,
procedure_manager: &ProcedureManagerRef,
mailbox: &MailboxRef,
table_metadata_manager: &TableMetadataManagerRef,
(selector, selector_ctx): (&SelectorRef, &SelectorContext),
table_id_sequence: &SequenceRef,
table_metadata_allocator: TableMetadataAllocatorRef,
) -> Result<DdlManagerRef> {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
@@ -349,19 +369,13 @@ fn build_ddl_manager(
},
));
let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
table_id_sequence.clone(),
));
Ok(Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
table_metadata_allocator,
)
.context(error::InitDdlManagerSnafu)?,
))

View File

@@ -70,7 +70,7 @@ pub async fn mock(
};
let builder = match datanode_clients {
Some(clients) => builder.datanode_clients(clients),
Some(clients) => builder.datanode_manager(clients),
None => builder,
};

View File

@@ -18,11 +18,13 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use catalog::kvbackend::MetaKvBackend;
use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -33,13 +35,16 @@ use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::mocks::MockInfo;
use servers::grpc::GrpcServer;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tonic::transport::Server;
use tower::service_fn;
@@ -252,18 +257,26 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let frontend_opts = FrontendOptions::default();
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
Arc::new(
FeInstance::try_new_distributed_with(
meta_client,
datanode_clients,
Plugins::default(),
&frontend_opts,
)
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
]);
let heartbeat_task = HeartbeatTask::new(
meta_client.clone(),
HeartbeatOptions::default(),
Arc::new(handlers_executor),
);
let instance = FrontendBuilder::new(meta_backend, datanode_clients, meta_client)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.unwrap(),
)
.unwrap();
Arc::new(instance)
}
}

View File

@@ -14,16 +14,19 @@
use std::sync::Arc;
use catalog::kvbackend::KvBackendCatalogManager;
use cmd::options::MixOptions;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
@@ -88,29 +91,28 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
let catalog_manager = KvBackendCatalogManager::new(
kv_backend.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(datanode.region_server())),
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();
let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let ddl_task_executor = Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_manager.clone(),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
)
.unwrap(),
);
catalog_manager
.table_metadata_manager_ref()
.init()
let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();
let instance = Instance::try_new_standalone(
kv_backend,
procedure_manager.clone(),
catalog_manager,
plugins,
datanode.region_server(),
)
.await
.unwrap();
// Ensures all loaders are registered.
procedure_manager.start().await.unwrap();
test_util::prepare_another_catalog_and_schema(&instance).await;