refactor: change how frontend grpc services are orchestrated (#3134)

This commit is contained in:
LFC
2024-01-11 10:26:44 +08:00
committed by GitHub
parent d521bc9dc5
commit 51a3fbc7bf
15 changed files with 141 additions and 71 deletions

3
Cargo.lock generated
View File

@@ -1978,6 +1978,9 @@ dependencies = [
name = "common-test-util"
version = "0.5.1"
dependencies = [
"client",
"common-query",
"common-recordbatch",
"once_cell",
"rand",
"tempfile",

View File

@@ -28,6 +28,7 @@ use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHa
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::server::Services;
use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
@@ -246,14 +247,18 @@ impl StartCommand {
meta_client,
)
.with_cache_invalidator(meta_backend)
.with_plugin(plugins)
.with_plugin(plugins.clone())
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.context(StartFrontendSnafu)?;
let servers = Services::new(plugins)
.build(opts.clone(), Arc::new(instance.clone()))
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts)
.build_servers(opts, servers)
.await
.context(StartFrontendSnafu)?;

View File

@@ -40,6 +40,7 @@ use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::server::Services;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
@@ -431,13 +432,17 @@ impl StartCommand {
.await?;
let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
let servers = Services::new(fe_plugins)
.build(opts.clone(), Arc::new(frontend.clone()))
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(opts)
.build_servers(opts, servers)
.await
.context(StartFrontendSnafu)?;

View File

@@ -5,6 +5,9 @@ edition.workspace = true
license.workspace = true
[dependencies]
client.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
once_cell.workspace = true
rand.workspace = true
tempfile.workspace = true

View File

@@ -19,6 +19,7 @@ use std::process::Command;
use std::sync::LazyLock;
pub mod ports;
pub mod recordbatch;
pub mod temp_dir;
// Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the

View File

@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use client::Database;
use common_query::Output;
use common_recordbatch::util;
pub enum ExpectedOutput<'a> {
AffectedRows(usize),
QueryResult(&'a str),
}
pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
let output = db.sql(sql).await.unwrap();
match (&output, expected) {
(Output::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
assert_eq!(*x, y, "actual: \n{}", x)
}
(Output::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (Output::Stream(_), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
}
}
pub async fn check_output_stream(output: Output, expected: &str) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print);
}

View File

@@ -86,7 +86,6 @@ use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::heartbeat::HeartbeatTask;
use crate::metrics;
use crate::script::ScriptExecutor;
use crate::server::Services;
#[async_trait]
pub trait FrontendInstance:
@@ -190,12 +189,13 @@ impl Instance {
pub async fn build_servers(
&mut self,
opts: impl Into<FrontendOptions> + TomlSerializable,
servers: ServerHandlers,
) -> Result<()> {
let opts: FrontendOptions = opts.into();
self.export_metrics_task =
ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;
let servers = Services::build(opts, Arc::new(self.clone()), self.plugins.clone()).await?;
self.servers = Arc::new(servers);
Ok(())

View File

@@ -20,5 +20,5 @@ pub mod heartbeat;
pub mod instance;
pub(crate) mod metrics;
mod script;
mod server;
pub mod server;
pub mod service_config;

View File

@@ -20,6 +20,7 @@ use common_base::Plugins;
use common_runtime::Builder as RuntimeBuilder;
use servers::error::InternalIoSnafu;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::GrpcServerConfig;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
@@ -34,14 +35,49 @@ use snafu::ResultExt;
use crate::error::{self, Result, StartServerSnafu};
use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::instance::FrontendInstance;
use crate::service_config::GrpcOptions;
pub(crate) struct Services;
pub struct Services {
plugins: Plugins,
}
impl Services {
pub(crate) async fn build<T, U>(
pub fn new(plugins: Plugins) -> Self {
Self { plugins }
}
pub fn grpc_server_builder(opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.runtime_size)
.thread_name("grpc-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
};
Ok(GrpcServerBuilder::new(grpc_runtime).config(grpc_config))
}
pub async fn build<T, U>(&self, opts: T, instance: Arc<U>) -> Result<ServerHandlers>
where
T: Into<FrontendOptions> + TomlSerializable + Clone,
U: FrontendInstance,
{
let grpc_options = &opts.clone().into().grpc;
let builder = Self::grpc_server_builder(grpc_options)?;
self.build_with(opts, instance, builder).await
}
pub async fn build_with<T, U>(
&self,
opts: T,
instance: Arc<U>,
plugins: Plugins,
builder: GrpcServerBuilder,
) -> Result<ServerHandlers>
where
T: Into<FrontendOptions> + TomlSerializable,
@@ -49,33 +85,27 @@ impl Services {
{
let toml = opts.to_toml()?;
let opts: FrontendOptions = opts.into();
let mut result = Vec::<ServerHandler>::with_capacity(plugins.len());
let user_provider = plugins.get::<UserProviderRef>();
let mut result = Vec::<ServerHandler>::new();
let user_provider = self.plugins.get::<UserProviderRef>();
{
// Always init GRPC server
let opts = &opts.grpc;
let grpc_addr = parse_addr(&opts.addr)?;
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.runtime_size)
.thread_name("grpc-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(instance.clone()),
user_provider.clone(),
builder.runtime().clone(),
);
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
};
let grpc_server = GrpcServerBuilder::new(grpc_runtime)
.config(grpc_config)
.query_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone()))
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(instance.clone())
.otlp_handler(instance.clone())
.user_provider(user_provider.clone())
.flight_handler(Arc::new(greptime_request_handler))
.build();
result.push((Box::new(grpc_server), grpc_addr));
@@ -116,7 +146,7 @@ impl Services {
let http_server = http_server_builder
.with_metrics_handler(MetricsHandler)
.with_script_handler(instance.clone())
.with_plugins(plugins)
.with_plugins(self.plugins.clone())
.with_greptime_config_options(toml)
.build();
result.push((Box::new(http_server), http_addr));

View File

@@ -249,15 +249,6 @@ impl Server for GrpcServer {
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
} else {
// TODO(ruihang): this is a temporary workaround before region server is ready.
builder = builder.add_service(
FlightServiceServer::new(FlightCraftWrapper(
self.database_handler.clone().unwrap(),
))
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
}
if let Some(region_server_handler) = &self.region_server_handler {
builder = builder.add_service(

View File

@@ -23,12 +23,11 @@ use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use super::{GrpcServer, GrpcServerConfig};
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub struct GrpcServerBuilder {
config: Option<GrpcServerConfig>,
query_handler: Option<ServerGrpcQueryHandlerRef>,
database_handler: Option<GreptimeRequestHandler>,
prometheus_handler: Option<PrometheusHandlerRef>,
flight_handler: Option<FlightCraftRef>,
region_server_handler: Option<RegionServerHandlerRef>,
@@ -41,7 +40,7 @@ impl GrpcServerBuilder {
pub fn new(runtime: Arc<Runtime>) -> Self {
Self {
config: None,
query_handler: None,
database_handler: None,
prometheus_handler: None,
flight_handler: None,
region_server_handler: None,
@@ -61,8 +60,12 @@ impl GrpcServerBuilder {
self
}
pub fn query_handler(mut self, query_handler: ServerGrpcQueryHandlerRef) -> Self {
self.query_handler = Some(query_handler);
pub fn runtime(&self) -> &Arc<Runtime> {
&self.runtime
}
pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
self.database_handler = Some(database_handler);
self
}
@@ -93,15 +96,7 @@ impl GrpcServerBuilder {
pub fn build(self) -> GrpcServer {
let config = self.config.unwrap_or_default();
let user_provider = self.user_provider.as_ref();
let runtime = self.runtime;
let database_handler = self.query_handler.map(|handler| {
GreptimeRequestHandler::new(handler, user_provider.cloned(), runtime.clone())
});
let region_server_handler = self
.region_server_handler
.map(|handler| RegionServerRequestHandler::new(handler, runtime));
@@ -111,7 +106,7 @@ impl GrpcServerBuilder {
prometheus_handler: self.prometheus_handler,
flight_handler: self.flight_handler,
region_server_handler,
database_handler,
database_handler: self.database_handler,
otlp_handler: self.otlp_handler,
user_provider: self.user_provider,
shutdown_tx: Mutex::new(None),

View File

@@ -24,8 +24,6 @@ use catalog::kvbackend::KvBackendCatalogManager;
use common_config::WalConfig;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_recordbatch::util;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
@@ -503,16 +501,19 @@ pub async fn setup_grpc_server_with(
);
let fe_instance_ref = instance.instance.clone();
let flight_handler = Arc::new(GreptimeRequestHandler::new(
let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
user_provider.clone(),
runtime.clone(),
));
);
let flight_handler = Arc::new(greptime_request_handler.clone());
let fe_grpc_server = Arc::new(
GrpcServerBuilder::new(runtime)
.option_config(grpc_config)
.query_handler(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()))
.database_handler(greptime_request_handler)
.flight_handler(flight_handler)
.prometheus_handler(fe_instance_ref.clone())
.user_provider(user_provider)
@@ -532,16 +533,6 @@ pub async fn setup_grpc_server_with(
(fe_grpc_addr, instance.guard, fe_grpc_server)
}
pub async fn check_output_stream(output: Output, expected: &str) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print);
}
pub async fn setup_mysql_server(
store_type: StorageType,
name: &str,

View File

@@ -19,6 +19,7 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::logging;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir;
use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef};
use frontend::error::{Error, Result};
@@ -29,7 +30,6 @@ use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use crate::test_util::check_output_stream;
use crate::tests::test_util::{
both_instances_cases, both_instances_cases_with_custom_storages, check_unordered_output_stream,
distributed, distributed_with_multiple_object_stores, find_testing_resource, prepare_path,

View File

@@ -24,6 +24,7 @@ use common_meta::{distributed_time_constants, RegionIdent};
use common_procedure::{watcher, ProcedureWithId};
use common_query::Output;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::TryStreamExt;
@@ -35,7 +36,7 @@ use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use table::metadata::TableId;
use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use tests_integration::test_util::{check_output_stream, get_test_store_config, StorageType};
use tests_integration::test_util::{get_test_store_config, StorageType};
use tokio::time;
#[macro_export]

View File

@@ -24,6 +24,7 @@ use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_query::Output;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
@@ -37,9 +38,7 @@ use session::context::{QueryContext, QueryContextRef};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use tests_integration::test_util::{
check_output_stream, get_test_store_config, StorageType, PEER_PLACEHOLDER_ADDR,
};
use tests_integration::test_util::{get_test_store_config, StorageType, PEER_PLACEHOLDER_ADDR};
use uuid::Uuid;
const TEST_TABLE_NAME: &str = "migration_target";