refactor: extract plugins crate (#2487)

* chore: move frontend plugins fn

* chore: move datanode plugins to fn

* chore: add opt plugins

* chore: add plugins to meta-srv

* chore: setup meta plugins, wait for router extension

* chore: try use configurator for grpc too

* chore: minor fix fmt

* chore: minor fix fmt

* chore: add start meta_srv for hook

* chore: merge develop

* chore: minor fix

* chore: replace Arc<Plugins> with PluginsRef

* chore: fix header

* chore: remove empty file

* chore: modify comments

* chore: remove PluginsRef type alias

* chore: remove `OptPlugins`
This commit is contained in:
shuiyisong
2023-10-09 12:54:27 +08:00
committed by GitHub
parent dfe68a7e0b
commit 007f7ba03c
34 changed files with 296 additions and 140 deletions

13
Cargo.lock generated
View File

@@ -1607,6 +1607,7 @@ dependencies = [
"mito2",
"nu-ansi-term",
"partition",
"plugins",
"prost",
"query",
"rand",
@@ -6766,6 +6767,18 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "plugins"
version = "0.4.0-nightly"
dependencies = [
"auth",
"common-base",
"datanode",
"frontend",
"meta-srv",
"snafu",
]
[[package]]
name = "pmutil"
version = "0.5.3"

View File

@@ -39,6 +39,7 @@ members = [
"src/object-store",
"src/operator",
"src/partition",
"src/plugins",
"src/promql",
"src/query",
"src/script",
@@ -61,7 +62,6 @@ license = "Apache-2.0"
[workspace.dependencies]
aquamarine = "0.3"
arrow = { version = "43.0" }
etcd-client = "0.11"
arrow-array = "43.0"
arrow-flight = "43.0"
arrow-schema = { version = "43.0", features = ["serde"] }
@@ -76,18 +76,22 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" }
derive_builder = "0.12"
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
metrics = "0.20"
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "43.0"
paste = "1.0"
prost = "0.11"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
rand = "0.8"
regex = "1.8"
reqwest = { version = "0.11", default-features = false, features = [
@@ -109,8 +113,6 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.7"
tonic = { version = "0.9", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
metrics = "0.20"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
## workspaces members
api = { path = "src/api" }
auth = { path = "src/auth" }
@@ -123,19 +125,18 @@ common-config = { path = "src/common/config" }
common-datasource = { path = "src/common/datasource" }
common-error = { path = "src/common/error" }
common-function = { path = "src/common/function" }
common-macro = { path = "src/common/macro" }
common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" }
common-grpc = { path = "src/common/grpc" }
common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" }
common-pprof = { path = "src/common/pprof" }
common-procedure = { path = "src/common/procedure" }
common-procedure-test = { path = "src/common/procedure-test" }
common-pprof = { path = "src/common/pprof" }
common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
substrait = { path = "src/common/substrait" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }
@@ -149,20 +150,20 @@ meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
mito = { path = "src/mito" }
mito2 = { path = "src/mito2" }
operator = { path = "src/operator" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
partition = { path = "src/partition" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
query = { path = "src/query" }
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
script = { path = "src/script" }
servers = { path = "src/servers" }
session = { path = "src/session" }
sql = { path = "src/sql" }
storage = { path = "src/storage" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
table-procedure = { path = "src/table-procedure" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"

View File

@@ -4,8 +4,6 @@ version.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
testing = []

View File

@@ -49,6 +49,7 @@ metrics.workspace = true
mito2 = { workspace = true }
nu-ansi-term = "0.46"
partition = { workspace = true }
plugins.workspace = true
prost.workspace = true
query = { workspace = true }
rand.workspace = true

View File

@@ -257,7 +257,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
cached_meta_backend.clone(),
datanode_clients,
);
let plugins: Arc<Plugins> = Default::default();
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
None,

View File

@@ -31,6 +31,10 @@ pub struct Instance {
impl Instance {
pub async fn start(&mut self) -> Result<()> {
plugins::start_datanode_plugins(self.datanode.plugins())
.await
.context(StartDatanodeSnafu)?;
self.datanode.start().await.context(StartDatanodeSnafu)
}
@@ -159,11 +163,15 @@ impl StartCommand {
Ok(Options::Datanode(Box::new(opts)))
}
async fn build(self, opts: DatanodeOptions) -> Result<Instance> {
async fn build(self, mut opts: DatanodeOptions) -> Result<Instance> {
let plugins = plugins::setup_datanode_plugins(&mut opts)
.await
.context(StartDatanodeSnafu)?;
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);
let datanode = DatanodeBuilder::new(opts, None, Default::default())
let datanode = DatanodeBuilder::new(opts, None, plugins)
.build()
.await
.context(StartDatanodeSnafu)?;

View File

@@ -85,12 +85,6 @@ pub enum Error {
#[snafu(display("Illegal config: {}", msg))]
IllegalConfig { msg: String, location: Location },
#[snafu(display("Illegal auth config"))]
IllegalAuthConfig {
location: Location,
source: auth::error::Error,
},
#[snafu(display("Unsupported selector type: {}", selector_type))]
UnsupportedSelectorType {
selector_type: String,
@@ -208,7 +202,6 @@ impl ErrorExt for Error {
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::IllegalAuthConfig { .. }
| Error::ConnectEtcd { .. } => StatusCode::InvalidArguments,
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,

View File

@@ -12,11 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use auth::UserProviderRef;
use clap::Parser;
use common_base::Plugins;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
@@ -25,7 +21,7 @@ use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use crate::error::{self, IllegalAuthConfigSnafu, Result};
use crate::error::{self, Result, StartFrontendSnafu};
use crate::options::{Options, TopLevelOptions};
pub struct Instance {
@@ -34,10 +30,11 @@ pub struct Instance {
impl Instance {
pub async fn start(&mut self) -> Result<()> {
self.frontend
.start()
plugins::start_frontend_plugins(self.frontend.plugins().clone())
.await
.context(error::StartFrontendSnafu)
.context(StartFrontendSnafu)?;
self.frontend.start().await.context(StartFrontendSnafu)
}
pub async fn stop(&self) -> Result<()> {
@@ -177,38 +174,32 @@ impl StartCommand {
opts.mode = Mode::Distributed;
}
opts.user_provider = self.user_provider.clone();
Ok(Options::Frontend(Box::new(opts)))
}
async fn build(self, opts: FrontendOptions) -> Result<Instance> {
async fn build(self, mut opts: FrontendOptions) -> Result<Instance> {
let plugins = plugins::setup_frontend_plugins(&mut opts)
.await
.context(StartFrontendSnafu)?;
logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone())
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
instance
.build_servers(&opts)
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
Ok(Instance { frontend: instance })
}
}
pub fn load_frontend_plugins(user_provider: &Option<String>) -> Result<Plugins> {
let plugins = Plugins::new();
if let Some(provider) = user_provider {
let provider = auth::user_provider_from_option(provider).context(IllegalAuthConfigSnafu)?;
plugins.insert::<UserProviderRef>(provider);
}
Ok(plugins)
}
#[cfg(test)]
mod tests {
use std::io::Write;
@@ -218,6 +209,7 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use frontend::service_config::GrpcOptions;
use servers::http::HttpOptions;
use super::*;
use crate::options::ENV_VAR_SEP;
@@ -303,14 +295,17 @@ mod tests {
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
let command = StartCommand {
let mut fe_opts = FrontendOptions {
http: HttpOptions {
disable_dashboard: false,
..Default::default()
},
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
disable_dashboard: Some(false),
..Default::default()
};
let plugins = load_frontend_plugins(&command.user_provider);
let plugins = plugins.unwrap();
let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap();
let provider = plugins.get::<UserProviderRef>().unwrap();
let result = provider
.authenticate(

View File

@@ -20,7 +20,7 @@ use meta_srv::bootstrap::MetaSrvInstance;
use meta_srv::metasrv::MetaSrvOptions;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::error::{self, Result, StartMetaServerSnafu};
use crate::options::{Options, TopLevelOptions};
pub struct Instance {
@@ -29,10 +29,10 @@ pub struct Instance {
impl Instance {
pub async fn start(&mut self) -> Result<()> {
self.instance
.start()
plugins::start_meta_srv_plugins(self.instance.plugins())
.await
.context(error::StartMetaServerSnafu)
.context(StartMetaServerSnafu)?;
self.instance.start().await.context(StartMetaServerSnafu)
}
pub async fn stop(&self) -> Result<()> {
@@ -158,12 +158,15 @@ impl StartCommand {
Ok(Options::Metasrv(Box::new(opts)))
}
async fn build(self, opts: MetaSrvOptions) -> Result<Instance> {
logging::info!("MetaSrv start command: {:#?}", self);
async fn build(self, mut opts: MetaSrvOptions) -> Result<Instance> {
let plugins = plugins::setup_meta_srv_plugins(&mut opts)
.await
.context(StartMetaServerSnafu)?;
logging::info!("MetaSrv start command: {:#?}", self);
logging::info!("MetaSrv options: {:#?}", opts);
let instance = MetaSrvInstance::new(opts)
let instance = MetaSrvInstance::new(opts, plugins)
.await
.context(error::BuildMetaServerSnafu)?;

View File

@@ -44,7 +44,6 @@ use crate::error::{
IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFrontendSnafu,
};
use crate::frontend::load_frontend_plugins;
use crate::options::{MixOptions, Options, TopLevelOptions};
#[derive(Parser)]
@@ -298,8 +297,11 @@ impl StartCommand {
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(self, opts: MixOptions) -> Result<Instance> {
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let fe_opts = opts.frontend;
let mut fe_opts = opts.frontend;
let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts)
.await
.context(StartFrontendSnafu)?;
let dn_opts = opts.datanode;
info!("Standalone start command: {:#?}", self);
@@ -315,7 +317,7 @@ impl StartCommand {
.context(StartFrontendSnafu)?;
let datanode =
DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), plugins.clone())
DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), Default::default())
.build()
.await
.context(StartDatanodeSnafu)?;
@@ -335,7 +337,7 @@ impl StartCommand {
// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
plugins,
fe_plugins,
kv_store,
procedure_manager,
catalog_manager,
@@ -354,7 +356,7 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
plugins: Arc<Plugins>,
plugins: Plugins,
kv_store: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
@@ -388,13 +390,13 @@ mod tests {
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
let command = StartCommand {
let mut fe_opts = FrontendOptions {
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
..Default::default()
};
let plugins = load_frontend_plugins(&command.user_provider);
let plugins = plugins.unwrap();
let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap();
let provider = plugins.get::<UserProviderRef>().unwrap();
let result = provider
.authenticate(

View File

@@ -23,6 +23,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
pub use bit_vec::BitVec;
/// [`Plugins`] is a wrapper of Arc contents.
/// Make it Cloneable and we can treat it like an Arc struct.
#[derive(Default, Clone)]
pub struct Plugins {
inner: Arc<Mutex<anymap::Map<dyn Any + Send + Sync>>>,

View File

@@ -72,6 +72,7 @@ pub struct Datanode {
region_server: RegionServer,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leases_notifier: Option<Arc<Notify>>,
plugins: Plugins,
}
impl Datanode {
@@ -137,11 +138,15 @@ impl Datanode {
pub fn region_server(&self) -> RegionServer {
self.region_server.clone()
}
pub fn plugins(&self) -> Plugins {
self.plugins.clone()
}
}
pub struct DatanodeBuilder {
opts: DatanodeOptions,
plugins: Arc<Plugins>,
plugins: Plugins,
meta_client: Option<MetaClient>,
kv_backend: Option<KvBackendRef>,
}
@@ -149,11 +154,7 @@ pub struct DatanodeBuilder {
impl DatanodeBuilder {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(
opts: DatanodeOptions,
kv_backend: Option<KvBackendRef>,
plugins: Arc<Plugins>,
) -> Self {
pub fn new(opts: DatanodeOptions, kv_backend: Option<KvBackendRef>, plugins: Plugins) -> Self {
Self {
opts,
plugins,
@@ -262,6 +263,7 @@ impl DatanodeBuilder {
greptimedb_telemetry_task,
region_event_receiver,
leases_notifier,
plugins: self.plugins.clone(),
})
}
@@ -327,7 +329,7 @@ impl DatanodeBuilder {
async fn new_region_server(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
plugins: Plugins,
log_store: Arc<RaftEngineLogStore>,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
@@ -360,6 +362,8 @@ impl DatanodeBuilder {
Ok(region_server)
}
// internal utils
/// Build [RaftEngineLogStore]
async fn build_log_store(opts: &DatanodeOptions) -> Result<Arc<RaftEngineLogStore>> {
let data_home = normalize_dir(&opts.storage.data_home);
@@ -454,7 +458,7 @@ mod tests {
..Default::default()
},
None,
Arc::new(Plugins::default()),
Plugins::default(),
);
let kv = Arc::new(MemoryKvBackend::default()) as _;

View File

@@ -272,6 +272,9 @@ pub enum Error {
source: operator::error::Error,
location: Location,
},
#[snafu(display("Invalid auth config"))]
IllegalAuthConfig { source: auth::error::Error },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -288,6 +291,7 @@ impl ErrorExt for Error {
| Error::ColumnNotFound { .. }
| Error::MissingMetasrvOpts { .. }
| Error::UnsupportedFormat { .. }
| Error::IllegalAuthConfig { .. }
| Error::EmptyData { .. }
| Error::ColumnNoneDefaultValue { .. }
| Error::IncompleteGrpcRequest { .. } => StatusCode::InvalidArguments,

View File

@@ -41,6 +41,7 @@ pub struct FrontendOptions {
pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions,
pub datanode: DatanodeOptions,
pub user_provider: Option<String>,
}
impl Default for FrontendOptions {
@@ -60,6 +61,7 @@ impl Default for FrontendOptions {
meta_client: None,
logging: LoggingOptions::default(),
datanode: DatanodeOptions::default(),
user_provider: None,
}
}
}

View File

@@ -122,9 +122,7 @@ pub struct Instance {
script_executor: Arc<ScriptExecutor>,
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
plugins: Arc<Plugins>,
plugins: Plugins,
servers: Arc<ServerHandlers>,
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
@@ -132,10 +130,7 @@ pub struct Instance {
}
impl Instance {
pub async fn try_new_distributed(
opts: &FrontendOptions,
plugins: Arc<Plugins>,
) -> Result<Self> {
pub async fn try_new_distributed(opts: &FrontendOptions, plugins: Plugins) -> Result<Self> {
let meta_client = Self::create_meta_client(opts).await?;
let datanode_clients = Arc::new(DatanodeClients::default());
@@ -146,7 +141,7 @@ impl Instance {
pub async fn try_new_distributed_with(
meta_client: Arc<MetaClient>,
datanode_clients: Arc<DatanodeClients>,
plugins: Arc<Plugins>,
plugins: Plugins,
opts: &FrontendOptions,
) -> Result<Self> {
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
@@ -297,7 +292,7 @@ impl Instance {
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
plugins: Arc<Plugins>,
plugins: Plugins,
region_server: RegionServer,
) -> Result<Self> {
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
@@ -377,7 +372,7 @@ impl Instance {
&self.catalog_manager
}
pub fn plugins(&self) -> Arc<Plugins> {
pub fn plugins(&self) -> Plugins {
self.plugins.clone()
}
@@ -593,7 +588,7 @@ impl PrometheusHandler for Instance {
}
pub fn check_permission(
plugins: Arc<Plugins>,
plugins: Plugins,
stmt: &Statement,
query_ctx: &QueryContextRef,
) -> Result<()> {
@@ -664,6 +659,7 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
mod tests {
use std::collections::HashMap;
use common_base::Plugins;
use query::query_engine::options::QueryOptions;
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
@@ -674,11 +670,10 @@ mod tests {
#[test]
fn test_exec_validation() {
let query_ctx = QueryContext::arc();
let plugins = Plugins::new();
let plugins: Plugins = Plugins::new();
plugins.insert(QueryOptions {
disallow_cross_schema_query: true,
});
let plugins = Arc::new(plugins);
let sql = r#"
SELECT * FROM demo;
@@ -704,7 +699,7 @@ mod tests {
re.unwrap();
}
fn replace_test(template_sql: &str, plugins: Arc<Plugins>, query_ctx: &QueryContextRef) {
fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
// test right
let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
for (catalog, schema) in right {
@@ -732,7 +727,7 @@ mod tests {
template.format(&vars).unwrap()
}
fn do_test(sql: &str, plugins: Arc<Plugins>, query_ctx: &QueryContextRef, is_ok: bool) {
fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
let re = check_permission(plugins, stmt, query_ctx);
if is_ok {

View File

@@ -20,7 +20,6 @@ use auth::UserProviderRef;
use common_base::Plugins;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::info;
use servers::configurator::ConfiguratorRef;
use servers::error::InternalIoSnafu;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
@@ -47,7 +46,7 @@ impl Services {
pub(crate) async fn build<T>(
opts: &FrontendOptions,
instance: Arc<T>,
plugins: Arc<Plugins>,
plugins: Plugins,
) -> Result<ServerHandlers>
where
T: FrontendInstance,
@@ -120,7 +119,7 @@ impl Services {
let http_server = http_server_builder
.with_metrics_handler(MetricsHandler)
.with_script_handler(instance.clone())
.with_configurator(plugins.get::<ConfiguratorRef>())
.with_plugins(plugins)
.with_greptime_config_options(opts.to_toml_string())
.build();
result.push((Box::new(http_server), http_addr));

View File

@@ -20,7 +20,9 @@ use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use etcd_client::Client;
use servers::configurator::ConfiguratorRef;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
@@ -53,22 +55,27 @@ pub struct MetaSrvInstance {
opts: MetaSrvOptions,
signal_sender: Option<Sender<()>>,
plugins: Plugins,
}
impl MetaSrvInstance {
pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts).await?;
pub async fn new(opts: MetaSrvOptions, plugins: Plugins) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts, plugins.clone()).await?;
let http_srv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.build(),
);
// put meta_srv into plugins for later use
plugins.insert::<Arc<MetaSrv>>(Arc::new(meta_srv.clone()));
Ok(MetaSrvInstance {
meta_srv,
http_srv,
opts,
signal_sender: None,
plugins,
})
}
@@ -79,8 +86,12 @@ impl MetaSrvInstance {
self.signal_sender = Some(tx);
let meta_srv =
bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone()), rx);
let mut router = router(self.meta_srv.clone());
if let Some(configurator) = self.meta_srv.plugins().get::<ConfiguratorRef>() {
router = configurator.config_grpc(router);
}
let meta_srv = bootstrap_meta_srv_with_router(&self.opts.bind_addr, router, rx);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
@@ -110,6 +121,10 @@ impl MetaSrvInstance {
})?;
Ok(())
}
pub fn plugins(&self) -> Plugins {
self.plugins.clone()
}
}
pub async fn bootstrap_meta_srv_with_router(
@@ -146,7 +161,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(admin::make_admin_service(meta_srv))
}
pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result<MetaSrv> {
let (kv_store, election, lock) = if opts.use_memory_store {
(
Arc::new(MemStore::new()) as _,
@@ -179,14 +194,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
.selector(selector)
.election(election)
.lock(lock)
.plugins(plugins)
.build()
.await
}
pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
let meta_srv = build_meta_srv(opts).await?;
meta_srv.try_start().await?;
Ok(meta_srv)
}

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Peer;
use common_base::Plugins;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
@@ -188,7 +189,8 @@ pub struct MetaSrv {
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
plugins: Plugins,
}
impl MetaSrv {
@@ -208,7 +210,7 @@ impl MetaSrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let subscribe_manager = self.subscribe_manager().cloned();
let subscribe_manager = self.subscribe_manager();
let mut rx = election.subscribe_leader_change();
let task_handler = self.greptimedb_telemetry_task.clone();
let _handle = common_runtime::spawn_bg(async move {
@@ -350,12 +352,16 @@ impl MetaSrv {
&self.table_metadata_manager
}
pub fn publish(&self) -> Option<&PublishRef> {
self.pubsub.as_ref().map(|suite| &suite.0)
pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> {
self.pubsub.as_ref().map(|suite| &suite.1)
pub fn subscribe_manager(&self) -> Option<SubscribeManagerRef> {
self.plugins.get::<SubscribeManagerRef>()
}
pub fn plugins(&self) -> &Plugins {
&self.plugins
}
#[inline]

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl_manager::{DdlManager, DdlManagerRef};
use common_meta::distributed_time_constants;
@@ -48,7 +49,7 @@ use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::pubsub::PublishRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
@@ -67,7 +68,7 @@ pub struct MetaSrvBuilder {
meta_peer_client: Option<MetaPeerClientRef>,
lock: Option<DistLockRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
plugins: Option<Plugins>,
}
impl MetaSrvBuilder {
@@ -82,7 +83,7 @@ impl MetaSrvBuilder {
options: None,
lock: None,
datanode_clients: None,
pubsub: None,
plugins: None,
}
}
@@ -131,8 +132,8 @@ impl MetaSrvBuilder {
self
}
pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self {
self.pubsub = Some((publish, subscribe_manager));
pub fn plugins(mut self, plugins: Plugins) -> Self {
self.plugins = Some(plugins);
self
}
@@ -149,7 +150,7 @@ impl MetaSrvBuilder {
handler_group,
lock,
datanode_clients,
pubsub,
plugins,
} = self;
let options = options.unwrap_or_default();
@@ -206,11 +207,10 @@ impl MetaSrvBuilder {
None
};
let publish_heartbeat_handler = if let Some((publish, _)) = pubsub.as_ref() {
Some(PublishHeartbeatHandler::new(publish.clone()))
} else {
None
};
let publish_heartbeat_handler = plugins
.clone()
.and_then(|plugins| plugins.get::<PublishRef>())
.map(|publish| PublishHeartbeatHandler::new(publish.clone()));
let region_lease_handler =
RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS);
@@ -263,7 +263,7 @@ impl MetaSrvBuilder {
enable_telemetry,
)
.await,
pubsub,
plugins: plugins.unwrap_or_else(Plugins::default),
})
}
}

View File

@@ -4,8 +4,6 @@ version.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { workspace = true }
async-trait = "0.1"

13
src/plugins/Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "plugins"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
auth.workspace = true
common-base.workspace = true
datanode.workspace = true
frontend.workspace = true
meta-srv.workspace = true
snafu.workspace = true

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::Plugins;
use datanode::config::DatanodeOptions;
use datanode::error::Result;
pub async fn setup_datanode_plugins(_opts: &mut DatanodeOptions) -> Result<Plugins> {
Ok(Plugins::new())
}
pub async fn start_datanode_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use auth::UserProviderRef;
use common_base::Plugins;
use frontend::error::{IllegalAuthConfigSnafu, Result};
use frontend::frontend::FrontendOptions;
use snafu::ResultExt;
pub async fn setup_frontend_plugins(opts: &mut FrontendOptions) -> Result<Plugins> {
let plugins = Plugins::new();
if let Some(user_provider) = opts.user_provider.as_ref() {
let provider =
auth::user_provider_from_option(user_provider).context(IllegalAuthConfigSnafu)?;
plugins.insert::<UserProviderRef>(provider);
}
Ok(plugins)
}
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}

21
src/plugins/src/lib.rs Normal file
View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod datanode;
mod frontend;
mod meta_srv;
pub use datanode::{setup_datanode_plugins, start_datanode_plugins};
pub use frontend::{setup_frontend_plugins, start_frontend_plugins};
pub use meta_srv::{setup_meta_srv_plugins, start_meta_srv_plugins};

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::Plugins;
use meta_srv::error::Result;
use meta_srv::metasrv::MetaSrvOptions;
pub async fn setup_meta_srv_plugins(_opts: &mut MetaSrvOptions) -> Result<Plugins> {
Ok(Plugins::new())
}
pub async fn start_meta_srv_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}

View File

@@ -69,11 +69,11 @@ use crate::{metrics, QueryEngine};
pub struct DatafusionQueryEngine {
state: Arc<QueryEngineState>,
plugins: Arc<Plugins>,
plugins: Plugins,
}
impl DatafusionQueryEngine {
pub fn new(state: Arc<QueryEngineState>, plugins: Arc<Plugins>) -> Self {
pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
Self { state, plugins }
}

View File

@@ -98,7 +98,7 @@ impl QueryEngineFactory {
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
with_dist_planner: bool,
plugins: Arc<Plugins>,
plugins: Plugins,
) -> Self {
let state = Arc::new(QueryEngineState::new(
catalog_manager,

View File

@@ -61,7 +61,7 @@ pub struct QueryEngineState {
catalog_manager: CatalogManagerRef,
table_mutation_handler: Option<TableMutationHandlerRef>,
aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
plugins: Arc<Plugins>,
plugins: Plugins,
}
impl fmt::Debug for QueryEngineState {
@@ -78,7 +78,7 @@ impl QueryEngineState {
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
with_dist_planner: bool,
plugins: Arc<Plugins>,
plugins: Plugins,
) -> Self {
let runtime_env = Arc::new(RuntimeEnv::default());
let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);

View File

@@ -127,7 +127,6 @@ async fn test_query_validate() -> Result<()> {
plugins.insert(QueryOptions {
disallow_cross_schema_query: true,
});
let plugins = Arc::new(plugins);
let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, None, false, plugins);
let engine = factory.query_engine();

View File

@@ -14,10 +14,15 @@
use std::sync::Arc;
use axum::Router;
use axum::Router as HttpRouter;
use tonic::transport::server::Router as GrpcRouter;
pub trait Configurator: Send + Sync {
fn config_http(&self, route: Router) -> Router {
fn config_http(&self, route: HttpRouter) -> HttpRouter {
route
}
fn config_grpc(&self, route: GrpcRouter) -> GrpcRouter {
route
}
}

View File

@@ -42,6 +42,7 @@ use axum::middleware::{self, Next};
use axum::response::{Html, IntoResponse, Json};
use axum::{routing, BoxError, Extension, Router};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
@@ -103,8 +104,8 @@ pub struct HttpServer {
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
metrics_handler: Option<MetricsHandler>,
configurator: Option<ConfiguratorRef>,
greptime_config_options: Option<String>,
plugins: Plugins,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -382,8 +383,8 @@ impl HttpServerBuilder {
script_handler: None,
metrics_handler: None,
shutdown_tx: Mutex::new(None),
configurator: None,
greptime_config_options: None,
plugins: Default::default(),
},
}
}
@@ -438,8 +439,8 @@ impl HttpServerBuilder {
self
}
pub fn with_configurator(&mut self, configurator: Option<ConfiguratorRef>) -> &mut Self {
self.inner.configurator = configurator;
pub fn with_plugins(&mut self, plugins: Plugins) -> &mut Self {
self.inner.plugins = plugins;
self
}
@@ -724,7 +725,7 @@ impl Server for HttpServer {
);
let mut app = self.make_app();
if let Some(configurator) = self.configurator.as_ref() {
if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
app = configurator.config_http(app);
}
let app = self.build(app);

View File

@@ -32,5 +32,5 @@ allowed_blank_lines = 1
crlf = false
[[rule]]
keys = ["dependencies", "dev-dependencies", "build-dependencies"]
keys = ["build-dependencies", "dependencies", "dev-dependencies", "workspace.dependencies"]
formatting = { reorder_keys = true }

View File

@@ -202,7 +202,7 @@ impl GreptimeDbClusterBuilder {
.build();
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let mut datanode = DatanodeBuilder::new(opts, None, Arc::new(Plugins::default()))
let mut datanode = DatanodeBuilder::new(opts, None, Plugins::default())
.with_meta_client(meta_client)
.build()
.await
@@ -234,7 +234,7 @@ impl GreptimeDbClusterBuilder {
FeInstance::try_new_distributed_with(
meta_client,
datanode_clients,
Arc::new(Plugins::default()),
Plugins::default(),
&frontend_opts,
)
.await

View File

@@ -74,7 +74,7 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
let plugins = Arc::new(self.plugin.unwrap_or_default());
let plugins = self.plugin.unwrap_or_default();
let datanode = DatanodeBuilder::new(opts.clone(), Some(kv_store.clone()), plugins.clone())
.build()