mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 09:50:40 +00:00
* ### Add Process List Management - **Error Handling Enhancements**: * refactor: Update test IP addresses to include ports in ProcessKey * feat/show-process-list: Refactor Process Management in Meta Module - Introduced `ProcessManager` for handling process registration and deregistration. - Added methods for managing and querying process states, including `register_query`, `deregister_query`, and `list_all_processes`. - Removed redundant process management code from the query module. - Updated error handling to reflect changes in process management. - Enhanced test coverage for process management functionalities. * chore: rebase main * add information schema process list table * integrate process list table to system catalog * build ProcessManager on frontend and standalone mode * feat/show-process-list: **Add Process Management Enhancements** - **`manager.rs`**: Introduced `process_manager` to `SystemCatalog` and `KvBackendCatalogManager` for improved process handling. - **`information_schema.rs`**: Updated table insertion logic to conditionally include `PROCESS_LIST`. - **`frontend.rs`, `standalone.rs`**: Enhanced `StartCommand` to clone `process_manager` for better resource management. - **`instance.rs`, `builder.rs`**: Integrated `ProcessManager` into `Instance` and `FrontendBuilder` to manage query * feat/show-process-list: ### Add Process Listing and Error Handling Enhancements - **Error Handling**: Introduced a new error variant `ListProcess` in `error.rs` to handle failures when listing running processes. - **Process List Implementation**: Enhanced `InformationSchemaProcessList` in `process_list.rs` to track running queries, including defining column names and implementing the `make_process_list` function to build the process list. - **Frontend Builder**: Added a `#[allow(clippy::too_many_arguments)]` attribute in `builder.rs` to suppress Clippy warnings for the `FrontendBuilder::new` function. These changes improve error handling and process tracking capabilities within the system. * feat/show-process-list: Refactor imports in `process_list.rs` - Updated import paths for `Predicates` and `InformationTable` in `process_list.rs` to align with the new module structure. * feat/show-process-list: Refactor process list generation in `process_list.rs` - Simplified the process list generation by removing intermediate row storage and directly building vectors. - Updated `process_to_row` function to use a mutable vector for current row data, improving memory efficiency. - Removed `rows_to_record_batch` function, integrating its logic directly into the main loop for streamlined processing. * wip: move ProcessManager to catalog crate * feat/show-process-list: - **Refactor Row Construction**: Updated row construction in multiple files to use references for `Value` objects, improving memory efficiency. Affected files include: - `cluster_info.rs` - `columns.rs` - `flows.rs` - `key_column_usage.rs` - `partitions.rs` - `procedure_info.rs` - `process_list.rs` - `region_peers.rs` - `region_statistics.rs` - `schemata.rs` - `table_constraints.rs` - `tables.rs` - `views.rs` - `pg_class.rs` - `pg_database.rs` - `pg_namespace.rs` - **Remove Unused Code**: Deleted unused functions and error variants related to process management in `process_list.rs` and `error.rs`. - **Predicate Evaluation Update**: Modified predicate evaluation functions in `predicate.rs` to work with references, enhancing performance. * feat/show-process-list: ### Implement Process Management Enhancements - **Error Handling Enhancements**: - Added new error variants `BumpSequence`, `StartReportTask`, `ReportProcess`, and `BuildProcessManager` in `error.rs` to improve error handling for process management tasks. - Updated `ErrorExt` implementations to handle new error types. - **Process Manager Improvements**: - Introduced `ProcessManager` enhancements in `process_manager.rs` to manage process states using `ProcessWithState` and `ProcessState` enums. - Implemented periodic task `ReportTask` to report running queries to the KV backend. - Modified `register_query` and `deregister_query` methods to use the new state management system. - **Testing and Validation**: - Updated tests in `process_manager.rs` to validate new process management logic. - Replaced `dump` method with `list_all_processes` for listing processes. - **Integration with Frontend and Standalone**: - Updated `frontend.rs` and `standalone.rs` to handle `ProcessManager` initialization errors using `BuildProcessManager` error variant. - **Schema Adjustments**: - Modified `process_list.rs` in `system_schema/information_schema` to use the updated process listing method. - **Key-Value Conversion**: - Added `TryFrom` implementation for converting `Process` to `KeyValue` in `process_list.rs`. * chore: remove register * fix: sqlness tests * merge main Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision. - **Refactor `ProcessManager`**: Simplified the `ProcessManager` implementation by removing the use of `KvBackendRef` and `SequenceRef`, and replaced them with `AtomicU64` and `RwLock` for managing process IDs and catalogs in `process_manager.rs`. - **Remove Process List Metadata**: Deleted the `process_list.rs` file and removed related metadata key definitions in `key.rs`. - **Update Process List Logic**: Modified the process list logic in `process_list.rs` to use the new `ProcessManager` structure. - **Adjust Frontend and Standalone Start Commands**: Updated `frontend.rs` and `standalone.rs` to use the new `ProcessManager` constructor. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency version in `Cargo.lock` and `Cargo.toml` to a new commit hash. - **Refactor Error Handling**: Removed unused error variants and added a new `ParseProcessId` error in `src/catalog/src/error.rs`. - **Enhance Process Management**: Introduced `DisplayProcessId` struct for better process ID representation and parsing in `src/catalog/src/process_manager.rs`. - **Revise Process List Schema**: Updated the schema and logic for process listing in `src/catalog/src/system_schema/information_schema/process_list.rs` to include new fields like `client` and `frontend`. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Commit Message **Enhancements and Refactoring** - **Process Management:** - Refactored `ProcessManager` to list local processes with an optional catalog filter in `process_manager.rs`. - Updated related tests in `process_manager.rs` and `process_list.rs`. - **Client Enhancements:** - Added `frontend_client` method in `client.rs` to support gRPC communication with the frontend. - **Error Handling:** - Extended error handling in `error.rs` to include gRPC and Meta errors. - **Frontend Module:** - Introduced `selector.rs` for frontend client selection and process listing. - Updated `Cargo.toml` to include new dependencies and dev-dependencies. - **gRPC Server:** - Integrated `FrontendServer` in `builder.rs` for enhanced gRPC server capabilities. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Commit Message **Refactor Process Management and Frontend Integration** - **Add `common-frontend` Dependency**: - Updated `Cargo.lock`, `Cargo.toml` files to include `common-frontend` as a dependency. - **Refactor Process Management**: - Moved `ProcessManager` trait and `DisplayProcessId` struct to `common-frontend`. - Updated `process_manager.rs` to use `MetaProcessManager` and `ProcessManagerRef`. - Removed `ParseProcessId` error variant from `error.rs` in `catalog` and `frontend`. - **Frontend gRPC Service**: - Added `frontend_grpc_handler.rs` to handle gRPC requests for frontend processes. - Updated `grpc.rs` and `builder.rs` to integrate `FrontendGrpcHandler`. - **Update Tests**: - Modified tests in `process_manager.rs` to align with new `ProcessManager` implementation. - **Remove Unused Code**: - Removed `DisplayProcessId` and related parsing logic from `process_manager.rs`. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Add `MetaClientRef` to `MetaProcessManager` and Update Instantiation - **Files Modified**: - `src/catalog/src/process_manager.rs` - `src/cmd/src/frontend.rs` - `src/cmd/src/standalone.rs` - **Key Changes**: - Added `MetaClientRef` as an optional parameter to the `MetaProcessManager::new` method. - Updated instantiation of `MetaProcessManager` to include `MetaClientRef` where applicable. ### Update `ProcessManagerRef` Usage - **Files Modified**: - `src/catalog/src/kvbackend/manager.rs` - `src/catalog/src/system_schema/information_schema.rs` - `src/catalog/src/system_schema/information_schema/process_list.rs` - `src/frontend/src/instance.rs` - `src/frontend/src/instance/builder.rs` - **Key Changes**: - Ensured consistent usage of `ProcessManagerRef` across various modules. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ## Refactor Process Management - **Unified Process Manager**: - Replaced `MetaProcessManager` with `ProcessManager` across the codebase. - Updated `ProcessManager` to use `Arc` for shared references and introduced a `Ticket` struct for query registration and deregistration. - Affected files: `manager.rs`, `process_manager.rs`, `frontend.rs`, `standalone.rs`, `frontend_grpc_handler.rs`, `instance.rs`, `builder.rs`, `cluster.rs`, `standalone.rs`. - **Stream Wrapper Implementation**: - Added `StreamWrapper` to handle record batch streams with process management. - Affected file: `stream_wrapper.rs`. - **Test Adjustments**: - Updated tests to align with the new `ProcessManager` implementation. - Affected file: `tests-integration/src/cluster.rs`, `tests-integration/src/standalone.rs`. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Add Error Handling and Process Management - **Error Handling Enhancements**: - Added new error variants `ListProcess` and `CreateChannel` in `error.rs` to handle specific gRPC service invocation failures. - Updated error handling in `selector.rs` to use the new error variants for better context and error propagation. - **Process Management Integration**: - Introduced `process_manager` method in `instance.rs` to access the process manager. - Integrated `FrontendGrpcHandler` with process management in `server.rs` to handle gRPC requests related to process management. - **gRPC Server Enhancements**: - Made `frontend_grpc_handler` public in `grpc.rs` to allow external access and integration with other modules. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: Update `greptime-proto` dependency and enhance process management - **Dependency Update**: Updated `greptime-proto` in `Cargo.lock` and `Cargo.toml` to a new revision. - **Process Management**: - Modified `process_manager.rs` to include catalog filtering in `list_process`. - Updated `frontend_grpc_handler.rs` to handle catalog filtering in `list_process` requests. - **System Schema**: Added a TODO comment in `process_list.rs` for future user catalog filtering implementation. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: - **Update Workspace Dependencies**: - Modified `Cargo.toml` files in `src/catalog`, `src/common/frontend`, and `src/servers` to adjust workspace dependencies. - **Refactor `ProcessManager` Logic**: - Updated `process_manager.rs` to simplify the condition in the `select` method. - **Remove Unused Error Variants**: - Deleted `BuildProcessManager` error variant from `error.rs` in `src/cmd`. - Removed `InvalidProcessKey` error variant from `error.rs` in `src/common/meta`. - **Add License Header**: - Added Apache License header to `stream_wrapper.rs` in `src/frontend`. - **Update Test Results**: - Adjusted expected results in `information_schema.result` to reflect changes in the schema. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Add Error Handling for Process Listing - **`src/catalog/src/error.rs`**: Introduced a new error variant `ListProcess` to handle failures in listing frontend nodes. - **`src/catalog/src/process_manager.rs`**: Updated `local_processes` and `list_all_processes` methods to return the new error type, adding context for error handling. - **`src/catalog/src/system_schema/information_schema/process_list.rs`**: Modified `make_process_list` to propagate errors using the new error handling mechanism. - **`src/servers/src/grpc/frontend_grpc_handler.rs`**: Enhanced error handling in the `list_process` method to log errors and return appropriate gRPC status codes. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: Update `greptime-proto` Dependency and Remove `frontend_client` Method - **Cargo.lock** and **Cargo.toml**: Updated the `greptime-proto` dependency to a new revision (`5f6119ac7952878d39dcde0343c4bf828d18ffc8`). - **src/client/src/client.rs**: Removed the `frontend_client` method from the `Client` implementation. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Add Query Registration with Pre-Generated ID - **`process_manager.rs`**: Introduced `register_query_with_id` method to allow registering queries with a pre-generated ID. This includes creating a `ProcessInfo` instance and inserting it into the catalog. Added `next_id` method to generate the next process ID. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Update Process List Retrieval Method - **File**: `process_list.rs` - Updated the method for retrieving process lists from `local_processes` to `list_all_processes` to support asynchronous operations. Signed-off-by: Lei, HUANG <lhuang@greptime.com> * feat/show-process-list: ### Update error handling in `error.rs` - Refined status code handling for `CreateChannel` error by delegating to `source.status_code()`. - Separated `ListProcess` and `CreateChannel` error handling for clarity. Signed-off-by: Lei, HUANG <lhuang@greptime.com> --------- Signed-off-by: Lei, HUANG <lhuang@greptime.com>
334 lines
12 KiB
Rust
334 lines
12 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use std::sync::Arc;
|
|
|
|
use cache::{
|
|
build_datanode_cache_registry, build_fundamental_cache_registry,
|
|
with_default_composite_cache_registry,
|
|
};
|
|
use catalog::information_schema::NoopInformationExtension;
|
|
use catalog::kvbackend::KvBackendCatalogManager;
|
|
use catalog::process_manager::ProcessManager;
|
|
use cmd::error::StartFlownodeSnafu;
|
|
use cmd::standalone::StandaloneOptions;
|
|
use common_base::Plugins;
|
|
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
|
|
use common_config::KvBackendConfig;
|
|
use common_meta::cache::LayeredCacheRegistryBuilder;
|
|
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
|
|
use common_meta::ddl::table_meta::TableMetadataAllocator;
|
|
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
|
|
use common_meta::ddl_manager::DdlManager;
|
|
use common_meta::key::flow::FlowMetadataManager;
|
|
use common_meta::key::TableMetadataManager;
|
|
use common_meta::kv_backend::KvBackendRef;
|
|
use common_meta::region_keeper::MemoryRegionKeeper;
|
|
use common_meta::region_registry::LeaderRegionRegistry;
|
|
use common_meta::sequence::SequenceBuilder;
|
|
use common_meta::wal_options_allocator::build_wal_options_allocator;
|
|
use common_procedure::options::ProcedureConfig;
|
|
use common_procedure::ProcedureManagerRef;
|
|
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
|
use datanode::datanode::DatanodeBuilder;
|
|
use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
|
|
use frontend::frontend::Frontend;
|
|
use frontend::instance::builder::FrontendBuilder;
|
|
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
|
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
|
use servers::grpc::GrpcOptions;
|
|
use servers::server::ServerHandlers;
|
|
use snafu::ResultExt;
|
|
|
|
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
|
|
|
|
pub struct GreptimeDbStandalone {
|
|
pub frontend: Arc<Frontend>,
|
|
pub opts: StandaloneOptions,
|
|
pub guard: TestGuard,
|
|
// Used in rebuild.
|
|
pub kv_backend: KvBackendRef,
|
|
pub procedure_manager: ProcedureManagerRef,
|
|
}
|
|
|
|
impl GreptimeDbStandalone {
|
|
pub fn fe_instance(&self) -> &Arc<Instance> {
|
|
&self.frontend.instance
|
|
}
|
|
}
|
|
|
|
pub struct GreptimeDbStandaloneBuilder {
|
|
instance_name: String,
|
|
datanode_wal_config: DatanodeWalConfig,
|
|
metasrv_wal_config: MetasrvWalConfig,
|
|
store_providers: Option<Vec<StorageType>>,
|
|
default_store: Option<StorageType>,
|
|
plugin: Option<Plugins>,
|
|
}
|
|
|
|
impl GreptimeDbStandaloneBuilder {
|
|
pub fn new(instance_name: &str) -> Self {
|
|
Self {
|
|
instance_name: instance_name.to_string(),
|
|
store_providers: None,
|
|
plugin: None,
|
|
default_store: None,
|
|
datanode_wal_config: DatanodeWalConfig::default(),
|
|
metasrv_wal_config: MetasrvWalConfig::default(),
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn with_default_store_type(self, store_type: StorageType) -> Self {
|
|
Self {
|
|
default_store: Some(store_type),
|
|
..self
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[must_use]
|
|
pub fn with_store_providers(self, store_providers: Vec<StorageType>) -> Self {
|
|
Self {
|
|
store_providers: Some(store_providers),
|
|
..self
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[must_use]
|
|
pub fn with_plugin(self, plugin: Plugins) -> Self {
|
|
Self {
|
|
plugin: Some(plugin),
|
|
..self
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self {
|
|
self.datanode_wal_config = datanode_wal_config;
|
|
self
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self {
|
|
self.metasrv_wal_config = metasrv_wal_config;
|
|
self
|
|
}
|
|
|
|
pub async fn build_with(
|
|
&self,
|
|
kv_backend: KvBackendRef,
|
|
guard: TestGuard,
|
|
opts: StandaloneOptions,
|
|
procedure_manager: ProcedureManagerRef,
|
|
register_procedure_loaders: bool,
|
|
) -> GreptimeDbStandalone {
|
|
let plugins = self.plugin.clone().unwrap_or_default();
|
|
|
|
let layered_cache_registry = Arc::new(
|
|
LayeredCacheRegistryBuilder::default()
|
|
.add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
|
|
.build(),
|
|
);
|
|
|
|
let mut builder =
|
|
DatanodeBuilder::new(opts.datanode_options(), plugins.clone(), kv_backend.clone());
|
|
builder.with_cache_registry(layered_cache_registry);
|
|
let datanode = builder.build().await.unwrap();
|
|
|
|
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
|
table_metadata_manager.init().await.unwrap();
|
|
|
|
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
|
|
|
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
|
|
let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
|
|
let cache_registry = Arc::new(
|
|
with_default_composite_cache_registry(
|
|
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
|
|
)
|
|
.unwrap()
|
|
.build(),
|
|
);
|
|
|
|
let catalog_manager = KvBackendCatalogManager::new(
|
|
Arc::new(NoopInformationExtension),
|
|
kv_backend.clone(),
|
|
cache_registry.clone(),
|
|
Some(procedure_manager.clone()),
|
|
None,
|
|
);
|
|
|
|
let (frontend_client, frontend_instance_handler) =
|
|
FrontendClient::from_empty_grpc_handler();
|
|
let flow_builder = FlownodeBuilder::new(
|
|
Default::default(),
|
|
plugins.clone(),
|
|
table_metadata_manager.clone(),
|
|
catalog_manager.clone(),
|
|
flow_metadata_manager.clone(),
|
|
Arc::new(frontend_client),
|
|
);
|
|
let flownode = Arc::new(flow_builder.build().await.unwrap());
|
|
|
|
let node_manager = Arc::new(StandaloneDatanodeManager {
|
|
region_server: datanode.region_server(),
|
|
flow_server: flownode.flow_engine(),
|
|
});
|
|
|
|
let table_id_sequence = Arc::new(
|
|
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
|
|
.initial(MIN_USER_TABLE_ID as u64)
|
|
.step(10)
|
|
.build(),
|
|
);
|
|
let flow_id_sequence = Arc::new(
|
|
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
|
|
.initial(MIN_USER_FLOW_ID as u64)
|
|
.step(10)
|
|
.build(),
|
|
);
|
|
let kafka_options = opts.wal.clone().into();
|
|
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
|
|
.await
|
|
.unwrap();
|
|
let wal_options_allocator = Arc::new(wal_options_allocator);
|
|
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
|
|
table_id_sequence,
|
|
wal_options_allocator.clone(),
|
|
));
|
|
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
|
|
flow_id_sequence,
|
|
));
|
|
|
|
let ddl_task_executor = Arc::new(
|
|
DdlManager::try_new(
|
|
DdlContext {
|
|
node_manager: node_manager.clone(),
|
|
cache_invalidator: cache_registry.clone(),
|
|
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
|
|
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
|
|
table_metadata_manager,
|
|
table_metadata_allocator,
|
|
flow_metadata_manager,
|
|
flow_metadata_allocator,
|
|
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
|
|
},
|
|
procedure_manager.clone(),
|
|
register_procedure_loaders,
|
|
#[cfg(feature = "enterprise")]
|
|
None,
|
|
)
|
|
.unwrap(),
|
|
);
|
|
|
|
let server_addr = opts.frontend_options().grpc.server_addr.clone();
|
|
|
|
let instance = FrontendBuilder::new(
|
|
opts.frontend_options(),
|
|
kv_backend.clone(),
|
|
cache_registry.clone(),
|
|
catalog_manager.clone(),
|
|
node_manager.clone(),
|
|
ddl_task_executor.clone(),
|
|
Arc::new(ProcessManager::new(server_addr, None)),
|
|
)
|
|
.with_plugin(plugins)
|
|
.try_build()
|
|
.await
|
|
.unwrap();
|
|
let instance = Arc::new(instance);
|
|
|
|
// set the frontend client for flownode
|
|
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
|
|
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
|
|
frontend_instance_handler
|
|
.lock()
|
|
.unwrap()
|
|
.replace(weak_grpc_handler);
|
|
|
|
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
|
|
let invoker = flow::FrontendInvoker::build_from(
|
|
flow_streaming_engine.clone(),
|
|
catalog_manager.clone(),
|
|
kv_backend.clone(),
|
|
cache_registry.clone(),
|
|
ddl_task_executor.clone(),
|
|
node_manager.clone(),
|
|
)
|
|
.await
|
|
.context(StartFlownodeSnafu)
|
|
.unwrap();
|
|
|
|
flow_streaming_engine.set_frontend_invoker(invoker).await;
|
|
|
|
procedure_manager.start().await.unwrap();
|
|
wal_options_allocator.start().await.unwrap();
|
|
|
|
test_util::prepare_another_catalog_and_schema(&instance).await;
|
|
|
|
let mut frontend = Frontend {
|
|
instance,
|
|
servers: ServerHandlers::default(),
|
|
heartbeat_task: None,
|
|
export_metrics_task: None,
|
|
};
|
|
|
|
frontend.start().await.unwrap();
|
|
|
|
GreptimeDbStandalone {
|
|
frontend: Arc::new(frontend),
|
|
opts,
|
|
guard,
|
|
kv_backend,
|
|
procedure_manager,
|
|
}
|
|
}
|
|
|
|
pub async fn build(&self) -> GreptimeDbStandalone {
|
|
let default_store_type = self.default_store.unwrap_or(StorageType::File);
|
|
let store_types = self.store_providers.clone().unwrap_or_default();
|
|
|
|
let (opts, guard) = create_tmp_dir_and_datanode_opts(
|
|
default_store_type,
|
|
store_types,
|
|
&self.instance_name,
|
|
self.datanode_wal_config.clone(),
|
|
);
|
|
|
|
let kv_backend_config = KvBackendConfig::default();
|
|
let procedure_config = ProcedureConfig::default();
|
|
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
|
|
format!("{}/kv", &opts.storage.data_home),
|
|
kv_backend_config,
|
|
procedure_config,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let standalone_opts = StandaloneOptions {
|
|
storage: opts.storage,
|
|
procedure: procedure_config,
|
|
metadata_store: kv_backend_config,
|
|
wal: self.metasrv_wal_config.clone().into(),
|
|
grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"),
|
|
..StandaloneOptions::default()
|
|
};
|
|
|
|
self.build_with(kv_backend, guard, standalone_opts, procedure_manager, true)
|
|
.await
|
|
}
|
|
}
|