Files
greptimedb/tests-integration/src/cluster.rs
Lei, HUANG 05b708ed2e feat: implement process manager and information_schema.process_list (#5865)
* ### Add Process List Management

 - **Error Handling Enhancements**:

* refactor: Update test IP addresses to include ports in ProcessKey

* feat/show-process-list:
 Refactor Process Management in Meta Module

 - Introduced `ProcessManager` for handling process registration and deregistration.
 - Added methods for managing and querying process states, including `register_query`, `deregister_query`, and `list_all_processes`.
 - Removed redundant process management code from the query module.
 - Updated error handling to reflect changes in process management.
 - Enhanced test coverage for process management functionalities.

* chore: rebase main

* add information schema process list table

* integrate process list table to system catalog

* build ProcessManager on frontend and standalone mode

* feat/show-process-list:
 **Add Process Management Enhancements**

 - **`manager.rs`**: Introduced `process_manager` to `SystemCatalog` and `KvBackendCatalogManager` for improved process handling.
 - **`information_schema.rs`**: Updated table insertion logic to conditionally include `PROCESS_LIST`.
 - **`frontend.rs`, `standalone.rs`**: Enhanced `StartCommand` to clone `process_manager` for better resource management.
 - **`instance.rs`, `builder.rs`**: Integrated `ProcessManager` into `Instance` and `FrontendBuilder` to manage query

* feat/show-process-list:
 ### Add Process Listing and Error Handling Enhancements

 - **Error Handling**: Introduced a new error variant `ListProcess` in `error.rs` to handle failures when listing running processes.
 - **Process List Implementation**: Enhanced `InformationSchemaProcessList` in `process_list.rs` to track running queries, including defining column names and implementing the `make_process_list` function to build the process list.
 - **Frontend Builder**: Added a `#[allow(clippy::too_many_arguments)]` attribute in `builder.rs` to suppress Clippy warnings for the `FrontendBuilder::new` function.

 These changes improve error handling and process tracking capabilities within the system.

* feat/show-process-list:
 Refactor imports in `process_list.rs`

 - Updated import paths for `Predicates` and `InformationTable` in `process_list.rs` to align with the new module structure.

* feat/show-process-list:
 Refactor process list generation in `process_list.rs`

 - Simplified the process list generation by removing intermediate row storage and directly building vectors.
 - Updated `process_to_row` function to use a mutable vector for current row data, improving memory efficiency.
 - Removed `rows_to_record_batch` function, integrating its logic directly into the main loop for streamlined processing.

* wip: move ProcessManager to catalog crate

* feat/show-process-list:
 - **Refactor Row Construction**: Updated row construction in multiple files to use references for `Value` objects, improving memory efficiency. Affected files include:
   - `cluster_info.rs`
   - `columns.rs`
   - `flows.rs`
   - `key_column_usage.rs`
   - `partitions.rs`
   - `procedure_info.rs`
   - `process_list.rs`
   - `region_peers.rs`
   - `region_statistics.rs`
   - `schemata.rs`
   - `table_constraints.rs`
   - `tables.rs`
   - `views.rs`
   - `pg_class.rs`
   - `pg_database.rs`
   - `pg_namespace.rs`
 - **Remove Unused Code**: Deleted unused functions and error variants related to process management in `process_list.rs` and `error.rs`.
 - **Predicate Evaluation Update**: Modified predicate evaluation functions in `predicate.rs` to work with references, enhancing performance.

* feat/show-process-list:
 ### Implement Process Management Enhancements

 - **Error Handling Enhancements**:
   - Added new error variants `BumpSequence`, `StartReportTask`, `ReportProcess`, and `BuildProcessManager` in `error.rs` to improve error handling for process management tasks.
   - Updated `ErrorExt` implementations to handle new error types.

 - **Process Manager Improvements**:
   - Introduced `ProcessManager` enhancements in `process_manager.rs` to manage process states using `ProcessWithState` and `ProcessState` enums.
   - Implemented periodic task `ReportTask` to report running queries to the KV backend.
   - Modified `register_query` and `deregister_query` methods to use the new state management system.

 - **Testing and Validation**:
   - Updated tests in `process_manager.rs` to validate new process management logic.
   - Replaced `dump` method with `list_all_processes` for listing processes.

 - **Integration with Frontend and Standalone**:
   - Updated `frontend.rs` and `standalone.rs` to handle `ProcessManager` initialization errors using `BuildProcessManager` error variant.

 - **Schema Adjustments**:
   - Modified `process_list.rs` in `system_schema/information_schema` to use the updated process listing method.

 - **Key-Value Conversion**:
   - Added `TryFrom` implementation for converting `Process` to `KeyValue` in `process_list.rs`.

* chore: remove register

* fix: sqlness tests

* merge main

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Refactor `ProcessManager`**: Simplified the `ProcessManager` implementation by removing the use of `KvBackendRef` and `SequenceRef`, and replaced them with `AtomicU64` and `RwLock` for managing process IDs and catalogs in `process_manager.rs`.
 - **Remove Process List Metadata**: Deleted the `process_list.rs` file and removed related metadata key definitions in `key.rs`.
 - **Update Process List Logic**: Modified the process list logic in `process_list.rs` to use the new `ProcessManager` structure.
 - **Adjust Frontend and Standalone Start Commands**: Updated `frontend.rs` and `standalone.rs` to use the new `ProcessManager` constructor.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency version in `Cargo.lock` and `Cargo.toml` to a new commit hash.
 - **Refactor Error Handling**: Removed unused error variants and added a new `ParseProcessId` error in `src/catalog/src/error.rs`.
 - **Enhance Process Management**: Introduced `DisplayProcessId` struct for better process ID representation and parsing in `src/catalog/src/process_manager.rs`.
 - **Revise Process List Schema**: Updated the schema and logic for process listing in `src/catalog/src/system_schema/information_schema/process_list.rs` to include new fields like `client` and `frontend`.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Commit Message

 **Enhancements and Refactoring**

 - **Process Management:**
   - Refactored `ProcessManager` to list local processes with an optional catalog filter in `process_manager.rs`.
   - Updated related tests in `process_manager.rs` and `process_list.rs`.

 - **Client Enhancements:**
   - Added `frontend_client` method in `client.rs` to support gRPC communication with the frontend.

 - **Error Handling:**
   - Extended error handling in `error.rs` to include gRPC and Meta errors.

 - **Frontend Module:**
   - Introduced `selector.rs` for frontend client selection and process listing.
   - Updated `Cargo.toml` to include new dependencies and dev-dependencies.

 - **gRPC Server:**
   - Integrated `FrontendServer` in `builder.rs` for enhanced gRPC server capabilities.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Commit Message

 **Refactor Process Management and Frontend Integration**

 - **Add `common-frontend` Dependency**:
   - Updated `Cargo.lock`, `Cargo.toml` files to include `common-frontend` as a dependency.

 - **Refactor Process Management**:
   - Moved `ProcessManager` trait and `DisplayProcessId` struct to `common-frontend`.
   - Updated `process_manager.rs` to use `MetaProcessManager` and `ProcessManagerRef`.
   - Removed `ParseProcessId` error variant from `error.rs` in `catalog` and `frontend`.

 - **Frontend gRPC Service**:
   - Added `frontend_grpc_handler.rs` to handle gRPC requests for frontend processes.
   - Updated `grpc.rs` and `builder.rs` to integrate `FrontendGrpcHandler`.

 - **Update Tests**:
   - Modified tests in `process_manager.rs` to align with new `ProcessManager` implementation.

 - **Remove Unused Code**:
   - Removed `DisplayProcessId` and related parsing logic from `process_manager.rs`.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Add `MetaClientRef` to `MetaProcessManager` and Update Instantiation

 - **Files Modified**:
   - `src/catalog/src/process_manager.rs`
   - `src/cmd/src/frontend.rs`
   - `src/cmd/src/standalone.rs`

 - **Key Changes**:
   - Added `MetaClientRef` as an optional parameter to the `MetaProcessManager::new` method.
   - Updated instantiation of `MetaProcessManager` to include `MetaClientRef` where applicable.

 ### Update `ProcessManagerRef` Usage

 - **Files Modified**:
   - `src/catalog/src/kvbackend/manager.rs`
   - `src/catalog/src/system_schema/information_schema.rs`
   - `src/catalog/src/system_schema/information_schema/process_list.rs`
   - `src/frontend/src/instance.rs`
   - `src/frontend/src/instance/builder.rs`

 - **Key Changes**:
   - Ensured consistent usage of `ProcessManagerRef` across various modules.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ## Refactor Process Management

 - **Unified Process Manager**:
   - Replaced `MetaProcessManager` with `ProcessManager` across the codebase.
   - Updated `ProcessManager` to use `Arc` for shared references and introduced a `Ticket` struct for query registration and deregistration.
   - Affected files: `manager.rs`, `process_manager.rs`, `frontend.rs`, `standalone.rs`, `frontend_grpc_handler.rs`, `instance.rs`, `builder.rs`, `cluster.rs`, `standalone.rs`.

 - **Stream Wrapper Implementation**:
   - Added `StreamWrapper` to handle record batch streams with process management.
   - Affected file: `stream_wrapper.rs`.

 - **Test Adjustments**:
   - Updated tests to align with the new `ProcessManager` implementation.
   - Affected file: `tests-integration/src/cluster.rs`, `tests-integration/src/standalone.rs`.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Add Error Handling and Process Management

 - **Error Handling Enhancements**:
   - Added new error variants `ListProcess` and `CreateChannel` in `error.rs` to handle specific gRPC service invocation failures.
   - Updated error handling in `selector.rs` to use the new error variants for better context and error propagation.

 - **Process Management Integration**:
   - Introduced `process_manager` method in `instance.rs` to access the process manager.
   - Integrated `FrontendGrpcHandler` with process management in `server.rs` to handle gRPC requests related to process management.

 - **gRPC Server Enhancements**:
   - Made `frontend_grpc_handler` public in `grpc.rs` to allow external access and integration with other modules.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 Update `greptime-proto` dependency and enhance process management

 - **Dependency Update**: Updated `greptime-proto` in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Process Management**:
   - Modified `process_manager.rs` to include catalog filtering in `list_process`.
   - Updated `frontend_grpc_handler.rs` to handle catalog filtering in `list_process` requests.
 - **System Schema**: Added a TODO comment in `process_list.rs` for future user catalog filtering implementation.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 - **Update Workspace Dependencies**:
   - Modified `Cargo.toml` files in `src/catalog`, `src/common/frontend`, and `src/servers` to adjust workspace dependencies.

 - **Refactor `ProcessManager` Logic**:
   - Updated `process_manager.rs` to simplify the condition in the `select` method.

 - **Remove Unused Error Variants**:
   - Deleted `BuildProcessManager` error variant from `error.rs` in `src/cmd`.
   - Removed `InvalidProcessKey` error variant from `error.rs` in `src/common/meta`.

 - **Add License Header**:
   - Added Apache License header to `stream_wrapper.rs` in `src/frontend`.

 - **Update Test Results**:
   - Adjusted expected results in `information_schema.result` to reflect changes in the schema.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Add Error Handling for Process Listing

 - **`src/catalog/src/error.rs`**: Introduced a new error variant `ListProcess` to handle failures in listing frontend nodes.
 - **`src/catalog/src/process_manager.rs`**: Updated `local_processes` and `list_all_processes` methods to return the new error type, adding context for error handling.
 - **`src/catalog/src/system_schema/information_schema/process_list.rs`**: Modified `make_process_list` to propagate errors using the new error handling mechanism.
 - **`src/servers/src/grpc/frontend_grpc_handler.rs`**: Enhanced error handling in the `list_process` method to log errors and return appropriate gRPC status codes.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 Update `greptime-proto` Dependency and Remove `frontend_client` Method

 - **Cargo.lock** and **Cargo.toml**: Updated the `greptime-proto` dependency to a new revision (`5f6119ac7952878d39dcde0343c4bf828d18ffc8`).
 - **src/client/src/client.rs**: Removed the `frontend_client` method from the `Client` implementation.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Add Query Registration with Pre-Generated ID

 - **`process_manager.rs`**: Introduced `register_query_with_id` method to allow registering queries with a pre-generated ID. This includes creating a `ProcessInfo` instance and inserting it into the catalog. Added `next_id` method to generate the next process ID.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Update Process List Retrieval Method

 - **File**: `process_list.rs`
   - Updated the method for retrieving process lists from `local_processes` to `list_all_processes` to support asynchronous operations.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* feat/show-process-list:
 ### Update error handling in `error.rs`

 - Refined status code handling for `CreateChannel` error by delegating to `source.status_code()`.
 - Separated `ListProcess` and `CreateChannel` error handling for clarity.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

---------

Signed-off-by: Lei, HUANG <lhuang@greptime.com>
2025-06-12 06:55:22 +00:00

505 lines
17 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use cache::{
build_datanode_cache_registry, build_fundamental_cache_registry,
with_default_composite_cache_registry,
};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::process_manager::ProcessManager;
use client::client_manager::NodeClients;
use client::Client;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::runtime::BuilderBuild;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::Instance as FeInstance;
use hyper_util::rt::TokioIo;
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::ServerHandlers;
use tempfile::TempDir;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tower::service_fn;
use uuid::Uuid;
use crate::test_util::{
self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageType,
TestGuard, PEER_PLACEHOLDER_ADDR,
};
pub struct GreptimeDbCluster {
pub guards: Vec<TestGuard>,
pub datanode_options: Vec<DatanodeOptions>,
pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub metasrv: Arc<Metasrv>,
pub frontend: Arc<Frontend>,
}
impl GreptimeDbCluster {
pub fn fe_instance(&self) -> &Arc<FeInstance> {
&self.frontend.instance
}
}
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
datanode_wal_config: DatanodeWalConfig,
metasrv_wal_config: MetasrvWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}
impl GreptimeDbClusterBuilder {
pub async fn new(cluster_name: &str) -> Self {
let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
let kv_backend: KvBackendRef = if endpoints.is_empty() {
Arc::new(MemoryKvBackend::new())
} else {
let endpoints = endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let backend = EtcdStore::with_endpoints(endpoints, 128)
.await
.expect("malformed endpoints");
// Each retry requires a new isolation namespace.
let chroot = format!("{}{}", cluster_name, Uuid::new_v4());
Arc::new(ChrootKvBackend::new(chroot.into(), backend))
};
Self {
cluster_name: cluster_name.to_string(),
kv_backend,
store_config: None,
store_providers: None,
datanodes: None,
datanode_wal_config: DatanodeWalConfig::default(),
metasrv_wal_config: MetasrvWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
}
#[must_use]
pub fn with_store_config(mut self, store_config: ObjectStoreConfig) -> Self {
self.store_config = Some(store_config);
self
}
#[must_use]
pub fn with_store_providers(mut self, store_providers: Vec<StorageType>) -> Self {
self.store_providers = Some(store_providers);
self
}
#[must_use]
pub fn with_datanodes(mut self, datanodes: u32) -> Self {
self.datanodes = Some(datanodes);
self
}
#[must_use]
pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self {
self.datanode_wal_config = datanode_wal_config;
self
}
#[must_use]
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self {
self.metasrv_wal_config = metasrv_wal_config;
self
}
#[must_use]
pub fn with_shared_home_dir(mut self, shared_home_dir: Arc<TempDir>) -> Self {
self.shared_home_dir = Some(shared_home_dir);
self
}
#[must_use]
pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self {
self.meta_selector = Some(selector);
self
}
pub async fn build_with(
&self,
datanode_options: Vec<DatanodeOptions>,
guards: Vec<TestGuard>,
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(NodeClients::new(channel_config));
let opt = MetasrvOptions {
procedure: ProcedureConfig {
// Due to large network delay during cross data-center.
// We only make max_retry_times and retry_delay large than the default in tests.
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
max_metadata_value_size: None,
max_running_procedures: 128,
},
wal: self.metasrv_wal_config.clone(),
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
..Default::default()
};
let metasrv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
self.meta_selector.clone(),
Some(datanode_clients.clone()),
None,
)
.await;
let datanode_instances = self
.build_datanodes_with_options(&metasrv, &datanode_options)
.await;
build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;
self.wait_datanodes_alive(metasrv.metasrv.meta_peer_client(), datanodes)
.await;
let mut frontend = self.build_frontend(metasrv.clone(), datanode_clients).await;
test_util::prepare_another_catalog_and_schema(&frontend.instance).await;
frontend.start().await.unwrap();
GreptimeDbCluster {
datanode_options,
guards,
datanode_instances,
kv_backend: self.kv_backend.clone(),
metasrv: metasrv.metasrv,
frontend: Arc::new(frontend),
}
}
pub async fn build(&self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let (datanode_options, guards) = self.build_datanode_options_and_guards(datanodes).await;
self.build_with(datanode_options, guards).await
}
async fn build_datanode_options_and_guards(
&self,
datanodes: u32,
) -> (Vec<DatanodeOptions>, Vec<TestGuard>) {
let mut options = Vec::with_capacity(datanodes as usize);
let mut guards = Vec::with_capacity(datanodes as usize);
for i in 0..datanodes {
let datanode_id = i as u64 + 1;
let mut opts = if let Some(store_config) = &self.store_config {
let home_dir = if let Some(home_dir) = &self.shared_home_dir {
home_dir.path().to_str().unwrap().to_string()
} else {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
guards.push(TestGuard {
home_guard: FileDirGuard::new(home_tmp_dir),
storage_guards: Vec::new(),
});
home_dir
};
create_datanode_opts(
store_config.clone(),
vec![],
home_dir,
self.datanode_wal_config.clone(),
)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
StorageType::File,
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
self.datanode_wal_config.clone(),
);
guards.push(guard);
opts
};
opts.node_id = Some(datanode_id);
options.push(opts);
}
(options, guards)
}
async fn build_datanodes_with_options(
&self,
metasrv: &MockInfo,
options: &[DatanodeOptions],
) -> HashMap<DatanodeId, Datanode> {
let mut instances = HashMap::with_capacity(options.len());
for opts in options {
let datanode = self.create_datanode(opts.clone(), metasrv.clone()).await;
instances.insert(opts.node_id.unwrap(), datanode);
}
instances
}
async fn wait_datanodes_alive(
&self,
meta_peer_client: &MetaPeerClientRef,
expected_datanodes: usize,
) {
for _ in 0..10 {
let alive_datanodes = meta_srv::lease::alive_datanodes(meta_peer_client, u64::MAX)
.await
.unwrap()
.len();
if alive_datanodes == expected_datanodes {
return;
}
tokio::time::sleep(Duration::from_secs(1)).await
}
panic!("Some Datanodes are not alive in 10 seconds!")
}
async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode {
let mut meta_client = MetaClientBuilder::datanode_default_options(opts.node_id.unwrap())
.channel_manager(metasrv.channel_manager)
.build();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut builder = DatanodeBuilder::new(opts, Plugins::default(), meta_backend);
builder
.with_cache_registry(layered_cache_registry)
.with_meta_client(meta_client);
let mut datanode = builder.build().await.unwrap();
datanode.start_heartbeat().await.unwrap();
datanode
}
async fn build_frontend(
&self,
metasrv: MockInfo,
datanode_clients: Arc<NodeClients>,
) -> Frontend {
let mut meta_client = MetaClientBuilder::frontend_default_options()
.channel_manager(metasrv.channel_manager)
.enable_access_cluster_info()
.build();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.unwrap()
.build(),
);
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let catalog_manager = KvBackendCatalogManager::new(
information_extension,
cached_meta_backend.clone(),
cache_registry.clone(),
None,
None,
);
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(cache_registry.clone())),
]);
let options = FrontendOptions::default();
let heartbeat_task = HeartbeatTask::new(
&options,
meta_client.clone(),
HeartbeatOptions::default(),
Arc::new(handlers_executor),
);
let server_addr = options.grpc.server_addr.clone();
let instance = FrontendBuilder::new(
options,
cached_meta_backend.clone(),
cache_registry.clone(),
catalog_manager,
datanode_clients,
meta_client,
Arc::new(ProcessManager::new(server_addr, None)),
)
.with_local_cache_invalidator(cache_registry)
.try_build()
.await
.unwrap();
let instance = Arc::new(instance);
Frontend {
instance,
servers: ServerHandlers::default(),
heartbeat_task: Some(heartbeat_task),
export_metrics_task: None,
}
}
}
async fn build_datanode_clients(
clients: Arc<NodeClients>,
instances: &HashMap<DatanodeId, Datanode>,
datanodes: usize,
) {
for i in 0..datanodes {
let datanode_id = i as u64 + 1;
let instance = instances.get(&datanode_id).unwrap();
let (addr, client) = create_datanode_client(instance).await;
clients
.insert_client(Peer::new(datanode_id, addr), client)
.await;
}
}
async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
let (client, server) = tokio::io::duplex(1024);
let runtime = RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap();
let flight_handler = FlightCraftWrapper(datanode.region_server());
let region_server_handler =
RegionServerRequestHandler::new(Arc::new(datanode.region_server()), runtime);
let _handle = tokio::spawn(async move {
Server::builder()
.add_service(
FlightServiceServer::new(flight_handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.add_service(
RegionServer::new(region_server_handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// `PEER_PLACEHOLDER_ADDR` is just a placeholder, does not actually connect to it.
let addr = PEER_PLACEHOLDER_ADDR;
let channel_manager = ChannelManager::new();
let _ = channel_manager
.reset_with_connector(
addr,
service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(TokioIo::new(client))
} else {
Err(std::io::Error::other("Client already taken"))
}
}
}),
)
.unwrap();
(
addr.to_string(),
Client::with_manager_and_urls(channel_manager, vec![addr]),
)
}