From 51a3fbc7bf2f6b493031c85d402b85e5086de0b5 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 11 Jan 2024 10:26:44 +0800 Subject: [PATCH] refactor: change how frontend grpc services are orchestrated (#3134) --- Cargo.lock | 3 + src/cmd/src/frontend.rs | 9 ++- src/cmd/src/standalone.rs | 9 ++- src/common/test-util/Cargo.toml | 3 + src/common/test-util/src/lib.rs | 1 + src/common/test-util/src/recordbatch.rs | 46 +++++++++++++ src/frontend/src/instance.rs | 4 +- src/frontend/src/lib.rs | 2 +- src/frontend/src/server.rs | 72 ++++++++++++++------ src/servers/src/grpc.rs | 9 --- src/servers/src/grpc/builder.rs | 23 +++---- tests-integration/src/test_util.rs | 21 ++---- tests-integration/src/tests/instance_test.rs | 2 +- tests-integration/tests/region_failover.rs | 3 +- tests-integration/tests/region_migration.rs | 5 +- 15 files changed, 141 insertions(+), 71 deletions(-) create mode 100644 src/common/test-util/src/recordbatch.rs diff --git a/Cargo.lock b/Cargo.lock index 3b93683d12..29a60c4f89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1978,6 +1978,9 @@ dependencies = [ name = "common-test-util" version = "0.5.1" dependencies = [ + "client", + "common-query", + "common-recordbatch", "once_cell", "rand", "tempfile", diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 1f43c7ab08..94718630c6 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -28,6 +28,7 @@ use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHa use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; +use frontend::server::Services; use meta_client::MetaClientOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; @@ -246,14 +247,18 @@ impl StartCommand { meta_client, ) .with_cache_invalidator(meta_backend) - .with_plugin(plugins) + .with_plugin(plugins.clone()) .with_heartbeat_task(heartbeat_task) .try_build() .await .context(StartFrontendSnafu)?; + let servers = Services::new(plugins) + .build(opts.clone(), Arc::new(instance.clone())) + .await + .context(StartFrontendSnafu)?; instance - .build_servers(opts) + .build_servers(opts, servers) .await .context(StartFrontendSnafu)?; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8511dbf519..8df0158f54 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -40,6 +40,7 @@ use file_engine::config::EngineConfig as FileEngineConfig; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; +use frontend::server::Services; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; @@ -431,13 +432,17 @@ impl StartCommand { .await?; let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) - .with_plugin(fe_plugins) + .with_plugin(fe_plugins.clone()) .try_build() .await .context(StartFrontendSnafu)?; + let servers = Services::new(fe_plugins) + .build(opts.clone(), Arc::new(frontend.clone())) + .await + .context(StartFrontendSnafu)?; frontend - .build_servers(opts) + .build_servers(opts, servers) .await .context(StartFrontendSnafu)?; diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 60e8547406..310fb45a7e 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -5,6 +5,9 @@ edition.workspace = true license.workspace = true [dependencies] +client.workspace = true +common-query.workspace = true +common-recordbatch.workspace = true once_cell.workspace = true rand.workspace = true tempfile.workspace = true diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs index ef6ff46968..08eeadc233 100644 --- a/src/common/test-util/src/lib.rs +++ b/src/common/test-util/src/lib.rs @@ -19,6 +19,7 @@ use std::process::Command; use std::sync::LazyLock; pub mod ports; +pub mod recordbatch; pub mod temp_dir; // Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the diff --git a/src/common/test-util/src/recordbatch.rs b/src/common/test-util/src/recordbatch.rs new file mode 100644 index 0000000000..ad05965dc5 --- /dev/null +++ b/src/common/test-util/src/recordbatch.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use client::Database; +use common_query::Output; +use common_recordbatch::util; + +pub enum ExpectedOutput<'a> { + AffectedRows(usize), + QueryResult(&'a str), +} + +pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { + let output = db.sql(sql).await.unwrap(); + match (&output, expected) { + (Output::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { + assert_eq!(*x, y, "actual: \n{}", x) + } + (Output::RecordBatches(_), ExpectedOutput::QueryResult(x)) + | (Output::Stream(_), ExpectedOutput::QueryResult(x)) => { + check_output_stream(output, x).await + } + _ => panic!(), + } +} + +pub async fn check_output_stream(output: Output, expected: &str) { + let recordbatches = match output { + Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), + Output::RecordBatches(recordbatches) => recordbatches, + _ => unreachable!(), + }; + let pretty_print = recordbatches.pretty_print().unwrap(); + assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print); +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f0ee10046c..4470476772 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -86,7 +86,6 @@ use crate::frontend::{FrontendOptions, TomlSerializable}; use crate::heartbeat::HeartbeatTask; use crate::metrics; use crate::script::ScriptExecutor; -use crate::server::Services; #[async_trait] pub trait FrontendInstance: @@ -190,12 +189,13 @@ impl Instance { pub async fn build_servers( &mut self, opts: impl Into + TomlSerializable, + servers: ServerHandlers, ) -> Result<()> { let opts: FrontendOptions = opts.into(); self.export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins)) .context(StartServerSnafu)?; - let servers = Services::build(opts, Arc::new(self.clone()), self.plugins.clone()).await?; + self.servers = Arc::new(servers); Ok(()) diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 8761cc098c..de800b0b41 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -20,5 +20,5 @@ pub mod heartbeat; pub mod instance; pub(crate) mod metrics; mod script; -mod server; +pub mod server; pub mod service_config; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index a3c047490a..42079aab99 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -20,6 +20,7 @@ use common_base::Plugins; use common_runtime::Builder as RuntimeBuilder; use servers::error::InternalIoSnafu; use servers::grpc::builder::GrpcServerBuilder; +use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::GrpcServerConfig; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; @@ -34,14 +35,49 @@ use snafu::ResultExt; use crate::error::{self, Result, StartServerSnafu}; use crate::frontend::{FrontendOptions, TomlSerializable}; use crate::instance::FrontendInstance; +use crate::service_config::GrpcOptions; -pub(crate) struct Services; +pub struct Services { + plugins: Plugins, +} impl Services { - pub(crate) async fn build( + pub fn new(plugins: Plugins) -> Self { + Self { plugins } + } + + pub fn grpc_server_builder(opts: &GrpcOptions) -> Result { + let grpc_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.runtime_size) + .thread_name("grpc-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.max_send_message_size.as_bytes() as usize, + }; + + Ok(GrpcServerBuilder::new(grpc_runtime).config(grpc_config)) + } + + pub async fn build(&self, opts: T, instance: Arc) -> Result + where + T: Into + TomlSerializable + Clone, + U: FrontendInstance, + { + let grpc_options = &opts.clone().into().grpc; + let builder = Self::grpc_server_builder(grpc_options)?; + self.build_with(opts, instance, builder).await + } + + pub async fn build_with( + &self, opts: T, instance: Arc, - plugins: Plugins, + builder: GrpcServerBuilder, ) -> Result where T: Into + TomlSerializable, @@ -49,33 +85,27 @@ impl Services { { let toml = opts.to_toml()?; let opts: FrontendOptions = opts.into(); - let mut result = Vec::::with_capacity(plugins.len()); - let user_provider = plugins.get::(); + + let mut result = Vec::::new(); + + let user_provider = self.plugins.get::(); { // Always init GRPC server let opts = &opts.grpc; let grpc_addr = parse_addr(&opts.addr)?; - let grpc_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.runtime_size) - .thread_name("grpc-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, + let greptime_request_handler = GreptimeRequestHandler::new( + ServerGrpcQueryHandlerAdapter::arc(instance.clone()), + user_provider.clone(), + builder.runtime().clone(), ); - - let grpc_config = GrpcServerConfig { - max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize, - max_send_message_size: opts.max_send_message_size.as_bytes() as usize, - }; - - let grpc_server = GrpcServerBuilder::new(grpc_runtime) - .config(grpc_config) - .query_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone())) + let grpc_server = builder + .database_handler(greptime_request_handler.clone()) .prometheus_handler(instance.clone()) .otlp_handler(instance.clone()) .user_provider(user_provider.clone()) + .flight_handler(Arc::new(greptime_request_handler)) .build(); result.push((Box::new(grpc_server), grpc_addr)); @@ -116,7 +146,7 @@ impl Services { let http_server = http_server_builder .with_metrics_handler(MetricsHandler) .with_script_handler(instance.clone()) - .with_plugins(plugins) + .with_plugins(self.plugins.clone()) .with_greptime_config_options(toml) .build(); result.push((Box::new(http_server), http_addr)); diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index b2ba7e8f9d..d8bc6fc387 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -249,15 +249,6 @@ impl Server for GrpcServer { .max_decoding_message_size(max_recv_message_size) .max_encoding_message_size(max_send_message_size), ) - } else { - // TODO(ruihang): this is a temporary workaround before region server is ready. - builder = builder.add_service( - FlightServiceServer::new(FlightCraftWrapper( - self.database_handler.clone().unwrap(), - )) - .max_decoding_message_size(max_recv_message_size) - .max_encoding_message_size(max_send_message_size), - ) } if let Some(region_server_handler) = &self.region_server_handler { builder = builder.add_service( diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index 910fc67ee4..33e49f9be8 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -23,12 +23,11 @@ use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler}; use super::{GrpcServer, GrpcServerConfig}; use crate::grpc::greptime_handler::GreptimeRequestHandler; use crate::prometheus_handler::PrometheusHandlerRef; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::OpenTelemetryProtocolHandlerRef; pub struct GrpcServerBuilder { config: Option, - query_handler: Option, + database_handler: Option, prometheus_handler: Option, flight_handler: Option, region_server_handler: Option, @@ -41,7 +40,7 @@ impl GrpcServerBuilder { pub fn new(runtime: Arc) -> Self { Self { config: None, - query_handler: None, + database_handler: None, prometheus_handler: None, flight_handler: None, region_server_handler: None, @@ -61,8 +60,12 @@ impl GrpcServerBuilder { self } - pub fn query_handler(mut self, query_handler: ServerGrpcQueryHandlerRef) -> Self { - self.query_handler = Some(query_handler); + pub fn runtime(&self) -> &Arc { + &self.runtime + } + + pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self { + self.database_handler = Some(database_handler); self } @@ -93,15 +96,7 @@ impl GrpcServerBuilder { pub fn build(self) -> GrpcServer { let config = self.config.unwrap_or_default(); - - let user_provider = self.user_provider.as_ref(); - let runtime = self.runtime; - - let database_handler = self.query_handler.map(|handler| { - GreptimeRequestHandler::new(handler, user_provider.cloned(), runtime.clone()) - }); - let region_server_handler = self .region_server_handler .map(|handler| RegionServerRequestHandler::new(handler, runtime)); @@ -111,7 +106,7 @@ impl GrpcServerBuilder { prometheus_handler: self.prometheus_handler, flight_handler: self.flight_handler, region_server_handler, - database_handler, + database_handler: self.database_handler, otlp_handler: self.otlp_handler, user_provider: self.user_provider, shutdown_tx: Mutex::new(None), diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 677300edae..830bb23fad 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -24,8 +24,6 @@ use catalog::kvbackend::KvBackendCatalogManager; use common_config::WalConfig; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; -use common_query::Output; -use common_recordbatch::util; use common_runtime::Builder as RuntimeBuilder; use common_telemetry::warn; use common_test_util::ports; @@ -503,16 +501,19 @@ pub async fn setup_grpc_server_with( ); let fe_instance_ref = instance.instance.clone(); - let flight_handler = Arc::new(GreptimeRequestHandler::new( + + let greptime_request_handler = GreptimeRequestHandler::new( ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()), user_provider.clone(), runtime.clone(), - )); + ); + + let flight_handler = Arc::new(greptime_request_handler.clone()); let fe_grpc_server = Arc::new( GrpcServerBuilder::new(runtime) .option_config(grpc_config) - .query_handler(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone())) + .database_handler(greptime_request_handler) .flight_handler(flight_handler) .prometheus_handler(fe_instance_ref.clone()) .user_provider(user_provider) @@ -532,16 +533,6 @@ pub async fn setup_grpc_server_with( (fe_grpc_addr, instance.guard, fe_grpc_server) } -pub async fn check_output_stream(output: Output, expected: &str) { - let recordbatches = match output { - Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), - Output::RecordBatches(recordbatches) => recordbatches, - _ => unreachable!(), - }; - let pretty_print = recordbatches.pretty_print().unwrap(); - assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print); -} - pub async fn setup_mysql_server( store_type: StorageType, name: &str, diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 686b98a2ee..b1e796b796 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -19,6 +19,7 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::Output; use common_recordbatch::util; use common_telemetry::logging; +use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir; use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef}; use frontend::error::{Error, Result}; @@ -29,7 +30,6 @@ use rstest_reuse::apply; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; -use crate::test_util::check_output_stream; use crate::tests::test_util::{ both_instances_cases, both_instances_cases_with_custom_storages, check_unordered_output_stream, distributed, distributed_with_multiple_object_stores, find_testing_resource, prepare_path, diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 73f1c4c331..c6874ccb40 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -24,6 +24,7 @@ use common_meta::{distributed_time_constants, RegionIdent}; use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; use common_telemetry::info; +use common_test_util::recordbatch::check_output_stream; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use futures::TryStreamExt; @@ -35,7 +36,7 @@ use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use table::metadata::TableId; use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; -use tests_integration::test_util::{check_output_stream, get_test_store_config, StorageType}; +use tests_integration::test_util::{get_test_store_config, StorageType}; use tokio::time; #[macro_export] diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 11a5595c16..59348d2b40 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -24,6 +24,7 @@ use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig; use common_meta::wal::WalConfig as MetaWalConfig; use common_query::Output; use common_telemetry::info; +use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir::create_temp_dir; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; @@ -37,9 +38,7 @@ use session::context::{QueryContext, QueryContextRef}; use store_api::storage::RegionId; use table::metadata::TableId; use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; -use tests_integration::test_util::{ - check_output_stream, get_test_store_config, StorageType, PEER_PLACEHOLDER_ADDR, -}; +use tests_integration::test_util::{get_test_store_config, StorageType, PEER_PLACEHOLDER_ADDR}; use uuid::Uuid; const TEST_TABLE_NAME: &str = "migration_target";