refactor: start datanode more flexibly (#2800)

* refactor: start datanode more flexibly

* Update src/datanode/src/datanode.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fix: resolve PR comments

* Apply suggestions from code review

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
LFC
2023-11-24 16:16:21 +08:00
committed by GitHub
parent 33566ea0f0
commit 64a36e9b36
15 changed files with 296 additions and 253 deletions

View File

@@ -12,15 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::logging;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu};
use crate::options::{Options, TopLevelOptions};
@@ -177,7 +179,27 @@ impl StartCommand {
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);
let datanode = DatanodeBuilder::new(opts, None, plugins)
let node_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client_options'",
})?;
let meta_client = datanode::heartbeat::new_metasrv_client(node_id, meta_config)
.await
.context(StartDatanodeSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone()),
});
let datanode = DatanodeBuilder::new(opts, plugins)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.enable_region_server_service()
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;

View File

@@ -225,6 +225,13 @@ pub enum Error {
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to parse address {}", addr))]
ParseAddr {
addr: String,
#[snafu(source)]
error: std::net::AddrParseError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -252,7 +259,9 @@ impl ErrorExt for Error {
| Error::NotDataFromOutput { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments,
| Error::InvalidDatabaseName { .. }
| Error::ParseAddr { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,

View File

@@ -28,7 +28,7 @@ pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
/// Options mixed up from datanode, frontend and metasrv.
#[derive(Serialize)]
#[derive(Serialize, Debug)]
pub struct MixOptions {
pub data_home: String,
pub procedure: ProcedureConfig,

View File

@@ -169,9 +169,7 @@ pub struct Instance {
impl Instance {
pub async fn start(&mut self) -> Result<()> {
// Start datanode instance before starting services, to avoid requests come in before internal components are started.
self.datanode.start().await.context(StartDatanodeSnafu)?;
info!("Datanode instance started");
self.datanode.start_telemetry();
self.procedure_manager
.start()
@@ -325,10 +323,8 @@ impl StartCommand {
let dn_opts = opts.datanode.clone();
info!("Standalone start command: {:#?}", self);
info!(
"Standalone frontend options: {:#?}, datanode options: {:#?}",
fe_opts, dn_opts
);
info!("Building standalone instance with {opts:#?}");
// Ensure the data_home directory exists.
fs::create_dir_all(path::Path::new(&opts.data_home)).context(CreateDirSnafu {
@@ -344,14 +340,12 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
let datanode = DatanodeBuilder::new(
dn_opts.clone(),
Some(kv_backend.clone()),
Default::default(),
)
.build()
.await
.context(StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
let catalog_manager = KvBackendCatalogManager::new(

View File

@@ -17,6 +17,7 @@ axum = "0.6"
axum-macros = "0.3"
bytes = "1.1"
catalog.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true

View File

@@ -14,10 +14,11 @@
//! Datanode implementation.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
@@ -26,8 +27,9 @@ use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
use futures_util::StreamExt;
use log_store::raft_engine::log_store::RaftEngineLogStore;
@@ -36,6 +38,10 @@ use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{start_server, ServerHandler, ServerHandlers};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
@@ -48,26 +54,26 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingMetaClientSnafu,
MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu,
ShutdownInstanceSnafu,
CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu,
ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, ShutdownServerSnafu,
StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
RegionServerEventReceiver,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::{new_metasrv_client, HeartbeatTask};
use crate::region_server::RegionServer;
use crate::server::Services;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;
const OPEN_REGION_PARALLELISM: usize = 16;
const REGION_SERVER_SERVICE_NAME: &str = "REGION_SERVER_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
services: Option<Services>,
services: ServerHandlers,
heartbeat_task: Option<HeartbeatTask>,
region_event_receiver: Option<RegionServerEventReceiver>,
region_server: RegionServer,
@@ -83,10 +89,17 @@ impl Datanode {
self.start_heartbeat().await?;
self.wait_coordinated().await;
let _ = self.greptimedb_telemetry_task.start();
self.start_telemetry();
self.start_services().await
}
pub fn start_telemetry(&self) {
if let Err(e) = self.greptimedb_telemetry_task.start() {
warn!(e; "Failed to start telemetry task!");
}
}
pub async fn start_heartbeat(&mut self) -> Result<()> {
if let Some(task) = &self.heartbeat_task {
// Safety: The event_receiver must exist.
@@ -106,19 +119,17 @@ impl Datanode {
/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
if let Some(service) = self.services.as_mut() {
service.start(&self.opts).await
} else {
Ok(())
}
let _ = future::try_join_all(self.services.values().map(start_server))
.await
.context(StartServerSnafu)?;
Ok(())
}
async fn shutdown_services(&self) -> Result<()> {
if let Some(service) = self.services.as_ref() {
service.shutdown().await
} else {
Ok(())
}
let _ = future::try_join_all(self.services.values().map(|server| server.0.shutdown()))
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
@@ -150,17 +161,21 @@ pub struct DatanodeBuilder {
plugins: Plugins,
meta_client: Option<MetaClient>,
kv_backend: Option<KvBackendRef>,
enable_region_server_service: bool,
enable_http_service: bool,
}
impl DatanodeBuilder {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(opts: DatanodeOptions, kv_backend: Option<KvBackendRef>, plugins: Plugins) -> Self {
pub fn new(opts: DatanodeOptions, plugins: Plugins) -> Self {
Self {
opts,
plugins,
meta_client: None,
kv_backend,
kv_backend: None,
enable_region_server_service: false,
enable_http_service: false,
}
}
@@ -171,76 +186,63 @@ impl DatanodeBuilder {
}
}
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
..self
}
}
pub fn enable_region_server_service(self) -> Self {
Self {
enable_region_server_service: true,
..self
}
}
pub fn enable_http_service(self) -> Self {
Self {
enable_http_service: true,
..self
}
}
pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
// build meta client
let meta_client = match mode {
Mode::Distributed => {
let meta_client = if let Some(meta_client) = self.meta_client.take() {
meta_client
} else {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = self.meta_client.take();
let meta_config = self
.opts
.meta_client
.as_ref()
.context(MissingMetasrvOptsSnafu)?;
// If metasrv client is provided, we will use it to control the region server.
// Otherwise the region server is self-controlled, meaning no heartbeat and immediately
// writable upon open.
let controlled_by_metasrv = meta_client.is_some();
new_metasrv_client(node_id, meta_config).await?
};
Some(meta_client)
}
Mode::Standalone => None,
};
// build kv-backend
let kv_backend = match mode {
Mode::Distributed => Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone().context(MissingMetaClientSnafu)?),
}),
Mode::Standalone => self.kv_backend.clone().context(MissingKvBackendSnafu)?,
};
let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
// build and initialize region server
let log_store = Self::build_log_store(&self.opts).await?;
let (region_event_listener, region_event_receiver) = match mode {
Mode::Distributed => {
let (tx, rx) = new_region_server_event_channel();
(Box::new(tx) as RegionServerEventListenerRef, Some(rx))
}
Mode::Standalone => (
Box::new(NoopRegionServerEventListener) as RegionServerEventListenerRef,
None,
),
let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
let (tx, rx) = new_region_server_event_channel();
(Box::new(tx) as _, Some(rx))
} else {
(Box::new(NoopRegionServerEventListener) as _, None)
};
let region_server = Self::new_region_server(
&self.opts,
self.plugins.clone(),
log_store,
region_event_listener,
)
.await?;
self.initialize_region_server(&region_server, kv_backend, matches!(mode, Mode::Standalone))
let region_server = self
.new_region_server(log_store, region_event_listener)
.await?;
let heartbeat_task = match mode {
Mode::Distributed => {
let meta_client = meta_client.context(MissingMetaClientSnafu)?;
self.initialize_region_server(&region_server, kv_backend, !controlled_by_metasrv)
.await?;
let heartbeat_task =
HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?;
Some(heartbeat_task)
}
Mode::Standalone => None,
let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
} else {
None
};
let services = match mode {
Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?),
Mode::Standalone => None,
};
let services = self.create_datanode_services(&region_server)?;
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
@@ -257,7 +259,6 @@ impl DatanodeBuilder {
};
Ok(Datanode {
opts: self.opts,
services,
heartbeat_task,
region_server,
@@ -268,6 +269,68 @@ impl DatanodeBuilder {
})
}
fn create_datanode_services(&self, region_server: &RegionServer) -> Result<ServerHandlers> {
let mut services = HashMap::new();
if self.enable_region_server_service {
services.insert(
REGION_SERVER_SERVICE_NAME.to_string(),
self.create_region_server_service(region_server)?,
);
}
if self.enable_http_service {
services.insert(
DATANODE_HTTP_SERVICE_NAME.to_string(),
self.create_http_service()?,
);
}
Ok(services)
}
fn create_region_server_service(&self, region_server: &RegionServer) -> Result<ServerHandler> {
let opts = &self.opts;
let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};
let server = Box::new(GrpcServer::new(
Some(config),
None,
None,
Some(Arc::new(region_server.clone()) as _),
Some(Arc::new(region_server.clone()) as _),
None,
region_server.runtime(),
));
let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
Ok((server, addr))
}
fn create_http_service(&self) -> Result<ServerHandler> {
let opts = &self.opts;
let server = Box::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.build(),
);
let addr = opts.http.addr.parse().context(ParseAddrSnafu {
addr: &opts.http.addr,
})?;
Ok((server, addr))
}
/// Open all regions belong to this datanode.
async fn initialize_region_server(
&self,
@@ -329,18 +392,19 @@ impl DatanodeBuilder {
}
async fn new_region_server(
opts: &DatanodeOptions,
plugins: Plugins,
&self,
log_store: Arc<RaftEngineLogStore>,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts = &self.opts;
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
None,
false,
plugins,
self.plugins.clone(),
);
let query_engine = query_engine_factory.query_engine();
@@ -352,8 +416,15 @@ impl DatanodeBuilder {
.context(RuntimeResourceSnafu)?,
);
let mut region_server =
RegionServer::new(query_engine.clone(), runtime.clone(), event_listener);
let table_provider_factory = Arc::new(DummyTableProviderFactory);
let mut region_server = RegionServer::with_table_provider(
query_engine,
runtime,
event_listener,
table_provider_factory,
);
let object_store = store::new_object_store(opts).await?;
let object_store_manager = ObjectStoreManager::new(
"default", // TODO: use a name which is set in the configuration when #919 is done.
@@ -472,7 +543,6 @@ mod tests {
node_id: Some(0),
..Default::default()
},
None,
Plugins::default(),
);

View File

@@ -212,9 +212,6 @@ pub enum Error {
#[snafu(display("Expect KvBackend but not found"))]
MissingKvBackend { location: Location },
#[snafu(display("Expect MetaClient but not found"))]
MissingMetaClient { location: Location },
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -295,9 +292,6 @@ pub enum Error {
#[snafu(display("Missing node id in Datanode config"))]
MissingNodeId { location: Location },
#[snafu(display("Missing node id option in distributed mode"))]
MissingMetasrvOpts { location: Location },
#[snafu(display("Missing required field: {}", name))]
MissingRequiredField { name: String, location: Location },
@@ -477,13 +471,11 @@ impl ErrorExt for Error {
| SchemaExists { .. }
| DatabaseNotFound { .. }
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. }
| MissingWalDirConfig { .. }
| PrepareImmutableTable { .. }
| ColumnDataType { .. }
| MissingKvBackend { .. }
| MissingMetaClient { .. } => StatusCode::InvalidArguments,
| MissingKvBackend { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => {
StatusCode::Unexpected

View File

@@ -24,7 +24,6 @@ mod greptimedb_telemetry;
pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod server;
mod store;
#[cfg(test)]
#[allow(dead_code)]

View File

@@ -79,12 +79,27 @@ impl RegionServer {
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
) -> Self {
Self::with_table_provider(
query_engine,
runtime,
event_listener,
Arc::new(DummyTableProviderFactory),
)
}
pub fn with_table_provider(
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
query_engine,
runtime,
event_listener,
table_provider_factory,
)),
}
}
@@ -233,6 +248,7 @@ struct RegionServerInner {
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
}
impl RegionServerInner {
@@ -240,6 +256,7 @@ impl RegionServerInner {
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
@@ -247,6 +264,7 @@ impl RegionServerInner {
query_engine,
runtime,
event_listener,
table_provider_factory,
}
}
@@ -346,7 +364,13 @@ impl RegionServerInner {
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?
.clone();
let catalog_list = Arc::new(DummyCatalogList::new(region_id, engine).await?);
let table_provider = self
.table_provider_factory
.create(region_id, engine)
.await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider));
// decode substrait plan to logical plan and execute it
let logical_plan = DFLogicalSubstraitConvertor
@@ -407,31 +431,16 @@ struct DummyCatalogList {
}
impl DummyCatalogList {
pub async fn new(region_id: RegionId, engine: RegionEngineRef) -> Result<Self> {
let metadata =
engine
.get_metadata(region_id)
.await
.with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
let table_provider = DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Default::default(),
};
fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
let schema_provider = DummySchemaProvider {
table: table_provider,
};
let catalog_provider = DummyCatalogProvider {
schema: schema_provider,
};
let catalog_list = Self {
Self {
catalog: catalog_provider,
};
Ok(catalog_list)
}
}
}
@@ -480,7 +489,7 @@ impl CatalogProvider for DummyCatalogProvider {
/// For [DummyCatalogList].
#[derive(Clone)]
struct DummySchemaProvider {
table: DummyTableProvider,
table: Arc<dyn TableProvider>,
}
#[async_trait]
@@ -494,7 +503,7 @@ impl SchemaProvider for DummySchemaProvider {
}
async fn table(&self, _name: &str) -> Option<Arc<dyn TableProvider>> {
Some(Arc::new(self.table.clone()))
Some(self.table.clone())
}
fn table_exist(&self, _name: &str) -> bool {
@@ -555,3 +564,40 @@ impl TableProvider for DummyTableProvider {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}
pub struct DummyTableProviderFactory;
#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
) -> Result<Arc<dyn TableProvider>> {
let metadata =
engine
.get_metadata(region_id)
.await
.with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
Ok(Arc::new(DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Default::default(),
}))
}
}
#[async_trait]
pub trait TableProviderFactory: Send + Sync {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
) -> Result<Arc<dyn TableProvider>>;
}
pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;

View File

@@ -1,95 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::sync::Arc;
use futures::future;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use snafu::ResultExt;
use crate::config::DatanodeOptions;
use crate::error::{
ParseAddrSnafu, Result, ShutdownServerSnafu, StartServerSnafu, WaitForGrpcServingSnafu,
};
use crate::region_server::RegionServer;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
http_server: HttpServer,
}
impl Services {
pub async fn try_new(region_server: RegionServer, opts: &DatanodeOptions) -> Result<Self> {
let flight_handler = Some(Arc::new(region_server.clone()) as _);
let region_server_handler = Some(Arc::new(region_server.clone()) as _);
let runtime = region_server.runtime();
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};
Ok(Self {
grpc_server: GrpcServer::new(
Some(grpc_config),
None,
None,
flight_handler,
region_server_handler,
None,
runtime,
),
http_server: HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.build(),
})
}
pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
let http_addr = opts.http.addr.parse().context(ParseAddrSnafu {
addr: &opts.http.addr,
})?;
let grpc = self.grpc_server.start(grpc_addr);
let http = self.http_server.start(http_addr);
let _ = future::try_join_all(vec![grpc, http])
.await
.context(StartServerSnafu)?;
self.grpc_server
.wait_for_serve()
.await
.context(WaitForGrpcServingSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
self.grpc_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
self.http_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
}

View File

@@ -19,7 +19,8 @@ mod otlp;
mod prom_store;
mod region_query;
mod script;
mod standalone;
pub mod standalone;
use std::collections::HashMap;
use std::sync::Arc;
@@ -72,6 +73,7 @@ use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::{start_server, ServerHandlers};
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::dialect::Dialect;
@@ -93,7 +95,7 @@ use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandl
use crate::heartbeat::HeartbeatTask;
use crate::metrics;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
use crate::server::Services;
#[async_trait]
pub trait FrontendInstance:

View File

@@ -12,14 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::info;
use servers::error::InternalIoSnafu;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
@@ -29,7 +27,7 @@ use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
use servers::server::{Server, ServerHandler, ServerHandlers};
use snafu::ResultExt;
use crate::error::{self, Result, StartServerSnafu};
@@ -38,10 +36,6 @@ use crate::instance::FrontendInstance;
pub(crate) struct Services;
pub type ServerHandlers = HashMap<String, ServerHandler>;
pub type ServerHandler = (Box<dyn Server>, SocketAddr);
impl Services {
pub(crate) async fn build<T, U>(
opts: T,
@@ -210,11 +204,3 @@ impl Services {
fn parse_addr(addr: &str) -> Result<SocketAddr> {
addr.parse().context(error::ParseAddrSnafu { addr })
}
pub async fn start_server(
server_and_addr: &(Box<dyn Server>, SocketAddr),
) -> servers::error::Result<Option<SocketAddr>> {
let (server, addr) = server_and_addr;
info!("Starting {} at {}", server.name(), addr);
server.start(*addr).await.map(Some)
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -28,6 +29,16 @@ use crate::error::{self, Result};
pub(crate) type AbortableStream = Abortable<TcpListenerStream>;
pub type ServerHandlers = HashMap<String, ServerHandler>;
pub type ServerHandler = (Box<dyn Server>, SocketAddr);
pub async fn start_server(server_handler: &ServerHandler) -> Result<Option<SocketAddr>> {
let (server, addr) = server_handler;
info!("Starting {} at {}", server.name(), addr);
server.start(*addr).await.map(Some)
}
#[async_trait]
pub trait Server: Send + Sync {
/// Shutdown the server gracefully.

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use catalog::kvbackend::MetaKvBackend;
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
@@ -220,7 +221,12 @@ impl GreptimeDbClusterBuilder {
.build();
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let mut datanode = DatanodeBuilder::new(opts, None, Plugins::default())
let meta_backend = Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone()),
});
let mut datanode = DatanodeBuilder::new(opts, Plugins::default())
.with_kv_backend(meta_backend)
.with_meta_client(meta_client)
.build()
.await

View File

@@ -82,11 +82,11 @@ impl GreptimeDbStandaloneBuilder {
let plugins = self.plugin.unwrap_or_default();
let datanode =
DatanodeBuilder::new(opts.clone(), Some(kv_backend.clone()), plugins.clone())
.build()
.await
.unwrap();
let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.unwrap();
let catalog_manager = KvBackendCatalogManager::new(
kv_backend.clone(),