mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
fix: make test-integration able to compile (#2384)
* fix: make test-integration able to compile * chore: fmt toml --------- Co-authored-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -16,6 +16,7 @@ use std::time::Duration;
|
||||
|
||||
use clap::Parser;
|
||||
use common_telemetry::logging;
|
||||
use datanode::datanode::builder::DatanodeBuilder;
|
||||
use datanode::datanode::{Datanode, DatanodeOptions};
|
||||
use meta_client::MetaClientOptions;
|
||||
use servers::Mode;
|
||||
@@ -162,7 +163,8 @@ impl StartCommand {
|
||||
logging::info!("Datanode start command: {:#?}", self);
|
||||
logging::info!("Datanode options: {:#?}", opts);
|
||||
|
||||
let datanode = Datanode::new(opts, Default::default())
|
||||
let datanode = DatanodeBuilder::new(opts, Default::default())
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::LoggingOptions;
|
||||
use datanode::datanode::builder::DatanodeBuilder;
|
||||
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
|
||||
use datanode::region_server::RegionServer;
|
||||
use frontend::catalog::FrontendCatalogManager;
|
||||
@@ -306,7 +307,8 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let datanode = Datanode::new(dn_opts.clone(), plugins.clone())
|
||||
let datanode = DatanodeBuilder::new(dn_opts.clone(), plugins.clone())
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
let region_server = datanode.region_server();
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
api = { workspace = true }
|
||||
arrow-flight.workspace = true
|
||||
|
||||
@@ -202,6 +202,11 @@ impl TableMetadataManager {
|
||||
&self.table_route_manager
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn kv_backend(&self) -> &KvBackendRef {
|
||||
&self.kv_backend
|
||||
}
|
||||
|
||||
pub async fn get_full_table_info(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
//! Datanode configurations
|
||||
|
||||
pub mod builder;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -33,7 +35,7 @@ use meta_client::MetaClientOptions;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::MitoEngine;
|
||||
use object_store::util::normalize_dir;
|
||||
use query::{QueryEngineFactory, QueryEngineRef};
|
||||
use query::QueryEngineFactory;
|
||||
use secrecy::SecretString;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -53,7 +55,6 @@ use tokio::fs;
|
||||
use crate::error::{
|
||||
CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
|
||||
};
|
||||
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::region_server::RegionServer;
|
||||
use crate::server::Services;
|
||||
@@ -401,12 +402,14 @@ pub struct Datanode {
|
||||
services: Option<Services>,
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
region_server: RegionServer,
|
||||
query_engine: QueryEngineRef,
|
||||
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
|
||||
}
|
||||
|
||||
impl Datanode {
|
||||
pub async fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Result<Datanode> {
|
||||
async fn new_region_server(
|
||||
opts: &DatanodeOptions,
|
||||
plugins: Arc<Plugins>,
|
||||
) -> Result<RegionServer> {
|
||||
let query_engine_factory = QueryEngineFactory::new_with_plugins(
|
||||
// query engine in datanode only executes plan with resolved table source.
|
||||
MemoryCatalogManager::with_default_setup(),
|
||||
@@ -425,46 +428,30 @@ impl Datanode {
|
||||
);
|
||||
|
||||
let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
|
||||
let log_store = Self::build_log_store(&opts).await?;
|
||||
let object_store = store::new_object_store(&opts).await?;
|
||||
let engines = Self::build_store_engines(&opts, log_store, object_store).await?;
|
||||
let log_store = Self::build_log_store(opts).await?;
|
||||
let object_store = store::new_object_store(opts).await?;
|
||||
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
|
||||
for engine in engines {
|
||||
region_server.register_engine(engine);
|
||||
}
|
||||
|
||||
// build optional things with different modes
|
||||
let services = match opts.mode {
|
||||
Mode::Distributed => Some(Services::try_new(region_server.clone(), &opts).await?),
|
||||
Mode::Standalone => None,
|
||||
};
|
||||
let heartbeat_task = match opts.mode {
|
||||
Mode::Distributed => Some(HeartbeatTask::try_new(&opts, region_server.clone()).await?),
|
||||
Mode::Standalone => None,
|
||||
};
|
||||
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
|
||||
Some(opts.storage.data_home.clone()),
|
||||
&opts.mode,
|
||||
opts.enable_telemetry,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Self {
|
||||
opts,
|
||||
services,
|
||||
heartbeat_task,
|
||||
region_server,
|
||||
query_engine,
|
||||
greptimedb_telemetry_task,
|
||||
})
|
||||
Ok(region_server)
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
info!("Starting datanode instance...");
|
||||
|
||||
self.start_heartbeat().await?;
|
||||
|
||||
let _ = self.greptimedb_telemetry_task.start();
|
||||
self.start_services().await
|
||||
}
|
||||
|
||||
pub async fn start_heartbeat(&self) -> Result<()> {
|
||||
if let Some(task) = &self.heartbeat_task {
|
||||
task.start().await?;
|
||||
}
|
||||
let _ = self.greptimedb_telemetry_task.start();
|
||||
self.start_services().await
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start services of datanode. This method call will block until services are shutdown.
|
||||
@@ -503,10 +490,6 @@ impl Datanode {
|
||||
self.region_server.clone()
|
||||
}
|
||||
|
||||
pub fn query_engine(&self) -> QueryEngineRef {
|
||||
self.query_engine.clone()
|
||||
}
|
||||
|
||||
// internal utils
|
||||
|
||||
/// Build [RaftEngineLogStore]
|
||||
|
||||
98
src/datanode/src/datanode/builder.rs
Normal file
98
src/datanode/src/datanode/builder.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::Plugins;
|
||||
use meta_client::client::MetaClient;
|
||||
use servers::Mode;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::datanode::{Datanode, DatanodeOptions};
|
||||
use crate::error::{MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result};
|
||||
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
|
||||
use crate::heartbeat::{new_metasrv_client, HeartbeatTask};
|
||||
use crate::server::Services;
|
||||
|
||||
pub struct DatanodeBuilder {
|
||||
opts: DatanodeOptions,
|
||||
plugins: Arc<Plugins>,
|
||||
meta_client: Option<MetaClient>,
|
||||
}
|
||||
|
||||
impl DatanodeBuilder {
|
||||
pub fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
plugins,
|
||||
meta_client: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_meta_client(self, meta_client: MetaClient) -> Self {
|
||||
Self {
|
||||
meta_client: Some(meta_client),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build(mut self) -> Result<Datanode> {
|
||||
let region_server = Datanode::new_region_server(&self.opts, self.plugins.clone()).await?;
|
||||
|
||||
let mode = &self.opts.mode;
|
||||
|
||||
let heartbeat_task = match mode {
|
||||
Mode::Distributed => {
|
||||
let meta_client = if let Some(meta_client) = self.meta_client.take() {
|
||||
meta_client
|
||||
} else {
|
||||
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
|
||||
|
||||
let meta_config = self
|
||||
.opts
|
||||
.meta_client_options
|
||||
.as_ref()
|
||||
.context(MissingMetasrvOptsSnafu)?;
|
||||
|
||||
new_metasrv_client(node_id, meta_config).await?
|
||||
};
|
||||
|
||||
let heartbeat_task =
|
||||
HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?;
|
||||
Some(heartbeat_task)
|
||||
}
|
||||
Mode::Standalone => None,
|
||||
};
|
||||
|
||||
let services = match mode {
|
||||
Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?),
|
||||
Mode::Standalone => None,
|
||||
};
|
||||
|
||||
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
|
||||
Some(self.opts.storage.data_home.clone()),
|
||||
mode,
|
||||
self.opts.enable_telemetry,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Datanode {
|
||||
opts: self.opts,
|
||||
services,
|
||||
heartbeat_task,
|
||||
region_server,
|
||||
greptimedb_telemetry_task,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -27,16 +27,14 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
|
||||
use meta_client::MetaClientOptions;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use self::handler::RegionHeartbeatResponseHandler;
|
||||
use crate::alive_keeper::RegionAliveKeeper;
|
||||
use crate::datanode::DatanodeOptions;
|
||||
use crate::error::{
|
||||
self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result,
|
||||
};
|
||||
use crate::error::{self, MetaClientInitSnafu, Result};
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
pub(crate) mod handler;
|
||||
@@ -62,15 +60,11 @@ impl Drop for HeartbeatTask {
|
||||
|
||||
impl HeartbeatTask {
|
||||
/// Create a new heartbeat task instance.
|
||||
pub async fn try_new(opts: &DatanodeOptions, region_server: RegionServer) -> Result<Self> {
|
||||
let meta_client = new_metasrv_client(
|
||||
opts.node_id.context(MissingNodeIdSnafu)?,
|
||||
opts.meta_client_options
|
||||
.as_ref()
|
||||
.context(MissingMetasrvOptsSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
pub async fn try_new(
|
||||
opts: &DatanodeOptions,
|
||||
region_server: RegionServer,
|
||||
meta_client: MetaClient,
|
||||
) -> Result<Self> {
|
||||
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
|
||||
region_server.clone(),
|
||||
opts.heartbeat.interval_millis,
|
||||
|
||||
@@ -194,7 +194,11 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
pub async fn plan(
|
||||
&self,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<LogicalPlan> {
|
||||
self.query_engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx)
|
||||
|
||||
@@ -21,11 +21,11 @@ pub mod region_server;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use api::v1::greptime_database_server::GreptimeDatabase;
|
||||
use api::v1::greptime_database_server::GreptimeDatabaseServer;
|
||||
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
|
||||
use api::v1::prometheus_gateway_server::{PrometheusGateway, PrometheusGatewayServer};
|
||||
#[cfg(feature = "testing")]
|
||||
use api::v1::region::region_server::Region;
|
||||
use api::v1::region::region_server::RegionServer;
|
||||
use api::v1::{HealthCheckRequest, HealthCheckResponse};
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -123,12 +123,12 @@ impl GrpcServer {
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn create_flight_service(&self) -> FlightServiceServer<impl FlightService> {
|
||||
FlightServiceServer::new(FlightCraftWrapper(self.database_handler.clone().unwrap()))
|
||||
FlightServiceServer::new(FlightCraftWrapper(self.flight_handler.clone().unwrap()))
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn create_database_service(&self) -> GreptimeDatabaseServer<impl GreptimeDatabase> {
|
||||
GreptimeDatabaseServer::new(DatabaseService::new(self.database_handler.clone().unwrap()))
|
||||
pub fn create_region_service(&self) -> RegionServer<impl Region> {
|
||||
RegionServer::new(self.region_server_handler.clone().unwrap())
|
||||
}
|
||||
|
||||
pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
|
||||
|
||||
@@ -18,9 +18,11 @@ chrono.workspace = true
|
||||
client = { workspace = true, features = ["testing"] }
|
||||
common-base = { workspace = true }
|
||||
common-catalog = { workspace = true }
|
||||
common-config = { workspace = true }
|
||||
common-error = { workspace = true }
|
||||
common-grpc = { workspace = true }
|
||||
common-meta = { workspace = true }
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-procedure = { workspace = true }
|
||||
common-query = { workspace = true }
|
||||
common-recordbatch = { workspace = true }
|
||||
common-runtime = { workspace = true }
|
||||
@@ -53,6 +55,7 @@ sqlx = { version = "0.6", features = [
|
||||
"postgres",
|
||||
"chrono",
|
||||
] }
|
||||
substrait = { workspace = true }
|
||||
table = { workspace = true }
|
||||
tempfile.workspace = true
|
||||
tokio.workspace = true
|
||||
@@ -61,7 +64,6 @@ tower = "0.4"
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-procedure = { workspace = true }
|
||||
datafusion-expr.workspace = true
|
||||
datafusion.workspace = true
|
||||
itertools.workspace = true
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use catalog::RegisterSystemTableRequest;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME};
|
||||
use table::requests::{CreateTableRequest, TableOptions};
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_register_system_table() {
|
||||
let instance =
|
||||
crate::tests::create_distributed_instance("test_register_system_table").await;
|
||||
|
||||
let catalog_name = DEFAULT_CATALOG_NAME;
|
||||
let schema_name = DEFAULT_SCHEMA_NAME;
|
||||
let table_name = SCRIPTS_TABLE_NAME;
|
||||
let request = CreateTableRequest {
|
||||
id: 1,
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some("Scripts table".to_string()),
|
||||
schema: build_scripts_schema(),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![0, 1],
|
||||
create_if_not_exists: true,
|
||||
table_options: TableOptions::default(),
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
|
||||
instance
|
||||
.frontend()
|
||||
.catalog_manager()
|
||||
.register_system_table(RegisterSystemTableRequest {
|
||||
create_table_request: request,
|
||||
open_hook: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
instance
|
||||
.frontend()
|
||||
.catalog_manager()
|
||||
.table(catalog_name, schema_name, table_name)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some(),
|
||||
"the registered system table cannot be found in catalog"
|
||||
);
|
||||
|
||||
let mut actually_created_table_in_datanode = 0;
|
||||
for datanode in instance.datanodes().values() {
|
||||
if datanode
|
||||
.catalog_manager()
|
||||
.table(catalog_name, schema_name, table_name)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some()
|
||||
{
|
||||
actually_created_table_in_datanode += 1;
|
||||
}
|
||||
}
|
||||
assert_eq!(
|
||||
actually_created_table_in_datanode, 1,
|
||||
"system table should be actually created at one and only one datanode"
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -21,26 +21,21 @@ use client::client_manager::DatanodeClients;
|
||||
use client::Client;
|
||||
use common_base::Plugins;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::DatanodeId;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig};
|
||||
use datanode::heartbeat::HeartbeatTask;
|
||||
use datanode::instance::Instance as DatanodeInstance;
|
||||
use datanode::datanode::builder::DatanodeBuilder;
|
||||
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig};
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance};
|
||||
use meta_client::client::MetaClientBuilder;
|
||||
use meta_srv::cluster::MetaPeerClientRef;
|
||||
use meta_srv::metadata_service::{DefaultMetadataService, MetadataService};
|
||||
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
|
||||
use meta_srv::mocks::MockInfo;
|
||||
use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef};
|
||||
use meta_srv::service::store::kv::KvStoreRef;
|
||||
use meta_srv::service::store::memory::MemStore;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::GrpcServer;
|
||||
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
|
||||
use servers::Mode;
|
||||
use tonic::transport::Server;
|
||||
use tower::service_fn;
|
||||
@@ -53,8 +48,7 @@ pub struct GreptimeDbCluster {
|
||||
pub storage_guards: Vec<StorageGuard>,
|
||||
pub _dir_guards: Vec<FileDirGuard>,
|
||||
|
||||
pub datanode_instances: HashMap<DatanodeId, Arc<DatanodeInstance>>,
|
||||
pub datanode_heartbeat_tasks: HashMap<DatanodeId, Option<HeartbeatTask>>,
|
||||
pub datanode_instances: HashMap<DatanodeId, Datanode>,
|
||||
pub kv_store: KvStoreRef,
|
||||
pub meta_srv: MetaSrv,
|
||||
pub frontend: Arc<FeInstance>,
|
||||
@@ -95,7 +89,7 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let meta_srv = self.build_metasrv(datanode_clients.clone()).await;
|
||||
|
||||
let (datanode_instances, heartbeat_tasks, storage_guards, dir_guards) =
|
||||
let (datanode_instances, storage_guards, dir_guards) =
|
||||
self.build_datanodes(meta_srv.clone(), datanodes).await;
|
||||
|
||||
build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;
|
||||
@@ -113,7 +107,6 @@ impl GreptimeDbClusterBuilder {
|
||||
storage_guards,
|
||||
_dir_guards: dir_guards,
|
||||
datanode_instances,
|
||||
datanode_heartbeat_tasks: heartbeat_tasks,
|
||||
kv_store: self.kv_store.clone(),
|
||||
meta_srv: meta_srv.meta_srv,
|
||||
frontend,
|
||||
@@ -131,18 +124,7 @@ impl GreptimeDbClusterBuilder {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mock =
|
||||
meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await;
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
|
||||
mock.meta_srv.kv_store().clone(),
|
||||
)));
|
||||
let metadata_service = DefaultMetadataService::new(table_metadata_manager);
|
||||
metadata_service
|
||||
.create_schema("another_catalog", "another_schema", true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
mock
|
||||
meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await
|
||||
}
|
||||
|
||||
async fn build_datanodes(
|
||||
@@ -150,13 +132,11 @@ impl GreptimeDbClusterBuilder {
|
||||
meta_srv: MockInfo,
|
||||
datanodes: u32,
|
||||
) -> (
|
||||
HashMap<DatanodeId, Arc<DatanodeInstance>>,
|
||||
HashMap<DatanodeId, Option<HeartbeatTask>>,
|
||||
HashMap<DatanodeId, Datanode>,
|
||||
Vec<StorageGuard>,
|
||||
Vec<FileDirGuard>,
|
||||
) {
|
||||
let mut instances = HashMap::with_capacity(datanodes as usize);
|
||||
let mut heartbeat_tasks = HashMap::with_capacity(datanodes as usize);
|
||||
let mut storage_guards = Vec::with_capacity(datanodes as usize);
|
||||
let mut dir_guards = Vec::with_capacity(datanodes as usize);
|
||||
|
||||
@@ -167,12 +147,9 @@ impl GreptimeDbClusterBuilder {
|
||||
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
|
||||
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{}", &self.cluster_name));
|
||||
let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string();
|
||||
dir_guards.push(FileDirGuard::new(home_tmp_dir, false));
|
||||
dir_guards.push(FileDirGuard::new(wal_tmp_dir, true));
|
||||
dir_guards.push(FileDirGuard::new(home_tmp_dir));
|
||||
|
||||
create_datanode_opts(store_config.clone(), home_dir, wal_dir)
|
||||
create_datanode_opts(store_config.clone(), home_dir)
|
||||
} else {
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(
|
||||
StorageType::File,
|
||||
@@ -181,19 +158,17 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
storage_guards.push(guard.storage_guard);
|
||||
dir_guards.push(guard.home_guard);
|
||||
dir_guards.push(guard.wal_guard);
|
||||
|
||||
opts
|
||||
};
|
||||
opts.node_id = Some(datanode_id);
|
||||
opts.mode = Mode::Distributed;
|
||||
|
||||
let dn_instance = self.create_datanode(&opts, meta_srv.clone()).await;
|
||||
let datanode = self.create_datanode(opts, meta_srv.clone()).await;
|
||||
|
||||
let _ = instances.insert(datanode_id, dn_instance.0.clone());
|
||||
let _ = heartbeat_tasks.insert(datanode_id, dn_instance.1);
|
||||
instances.insert(datanode_id, datanode);
|
||||
}
|
||||
(instances, heartbeat_tasks, storage_guards, dir_guards)
|
||||
(instances, storage_guards, dir_guards)
|
||||
}
|
||||
|
||||
async fn wait_datanodes_alive(
|
||||
@@ -215,19 +190,24 @@ impl GreptimeDbClusterBuilder {
|
||||
panic!("Some Datanodes are not alive in 10 seconds!")
|
||||
}
|
||||
|
||||
async fn create_datanode(
|
||||
&self,
|
||||
opts: &DatanodeOptions,
|
||||
meta_srv: MockInfo,
|
||||
) -> (Arc<DatanodeInstance>, Option<HeartbeatTask>) {
|
||||
let (instance, heartbeat) = DatanodeInstance::with_mock_meta_server(opts, meta_srv)
|
||||
async fn create_datanode(&self, opts: DatanodeOptions, meta_srv: MockInfo) -> Datanode {
|
||||
let mut meta_client = MetaClientBuilder::new(1000, opts.node_id.unwrap(), Role::Datanode)
|
||||
.enable_router()
|
||||
.enable_store()
|
||||
.enable_heartbeat()
|
||||
.channel_manager(meta_srv.channel_manager)
|
||||
.build();
|
||||
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
|
||||
|
||||
let datanode = DatanodeBuilder::new(opts, Arc::new(Plugins::default()))
|
||||
.with_meta_client(meta_client)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
if let Some(heartbeat) = heartbeat.as_ref() {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
(instance, heartbeat)
|
||||
|
||||
datanode.start_heartbeat().await.unwrap();
|
||||
|
||||
datanode
|
||||
}
|
||||
|
||||
async fn build_frontend(
|
||||
@@ -262,12 +242,12 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
async fn build_datanode_clients(
|
||||
clients: Arc<DatanodeClients>,
|
||||
instances: &HashMap<DatanodeId, Arc<DatanodeInstance>>,
|
||||
instances: &HashMap<DatanodeId, Datanode>,
|
||||
datanodes: u32,
|
||||
) {
|
||||
for i in 0..datanodes {
|
||||
let datanode_id = i as u64 + 1;
|
||||
let instance = instances.get(&datanode_id).cloned().unwrap();
|
||||
let instance = instances.get(&datanode_id).unwrap();
|
||||
let (addr, client) = create_datanode_client(instance).await;
|
||||
clients
|
||||
.insert_client(Peer::new(datanode_id, addr), client)
|
||||
@@ -275,7 +255,7 @@ async fn build_datanode_clients(
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_datanode_client(datanode_instance: Arc<DatanodeInstance>) -> (String, Client) {
|
||||
async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
|
||||
let (client, server) = tokio::io::duplex(1024);
|
||||
|
||||
let runtime = Arc::new(
|
||||
@@ -286,25 +266,20 @@ async fn create_datanode_client(datanode_instance: Arc<DatanodeInstance>) -> (St
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
// create a mock datanode grpc service, see example here:
|
||||
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
|
||||
let query_handler = Arc::new(GreptimeRequestHandler::new(
|
||||
ServerGrpcQueryHandlerAdaptor::arc(datanode_instance.clone()),
|
||||
None,
|
||||
runtime.clone(),
|
||||
));
|
||||
let flight_handler = Some(Arc::new(datanode.region_server()) as _);
|
||||
let region_server_handler = Some(Arc::new(datanode.region_server()) as _);
|
||||
let grpc_server = GrpcServer::new(
|
||||
Some(ServerGrpcQueryHandlerAdaptor::arc(datanode_instance)),
|
||||
None,
|
||||
Some(query_handler),
|
||||
None,
|
||||
flight_handler,
|
||||
region_server_handler,
|
||||
None,
|
||||
runtime,
|
||||
);
|
||||
let _handle = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(grpc_server.create_flight_service())
|
||||
.add_service(grpc_server.create_database_service())
|
||||
.add_service(grpc_server.create_region_service())
|
||||
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
|
||||
.await
|
||||
});
|
||||
|
||||
@@ -20,6 +20,7 @@ mod test {
|
||||
use api::v1::ddl_request::Expr as DdlExpr;
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use api::v1::region::QueryRequest as RegionQueryRequest;
|
||||
use api::v1::{
|
||||
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
|
||||
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests,
|
||||
@@ -31,9 +32,13 @@ mod test {
|
||||
use common_recordbatch::RecordBatches;
|
||||
use frontend::instance::Instance;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::QueryContext;
|
||||
use store_api::storage::RegionId;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
|
||||
use crate::standalone::GreptimeDbStandaloneBuilder;
|
||||
use crate::tests;
|
||||
use crate::tests::MockDistributedInstance;
|
||||
|
||||
@@ -49,8 +54,9 @@ mod test {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_handle_ddl_request() {
|
||||
let standalone =
|
||||
tests::create_standalone_instance("test_standalone_handle_ddl_request").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_handle_ddl_request")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
test_handle_ddl_request(instance.as_ref()).await;
|
||||
@@ -81,15 +87,17 @@ mod test {
|
||||
column_defs: vec![
|
||||
ColumnDef {
|
||||
name: "a".to_string(),
|
||||
datatype: ColumnDataType::String as _,
|
||||
data_type: ColumnDataType::String as _,
|
||||
is_nullable: true,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "ts".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as _,
|
||||
data_type: ColumnDataType::TimestampMillisecond as _,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
},
|
||||
],
|
||||
time_index: "ts".to_string(),
|
||||
@@ -109,15 +117,14 @@ mod test {
|
||||
add_columns: vec![AddColumn {
|
||||
column_def: Some(ColumnDef {
|
||||
name: "b".to_string(),
|
||||
datatype: ColumnDataType::Int32 as _,
|
||||
data_type: ColumnDataType::Int32 as _,
|
||||
is_nullable: true,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
}),
|
||||
is_key: false,
|
||||
location: None,
|
||||
}],
|
||||
})),
|
||||
..Default::default()
|
||||
})),
|
||||
});
|
||||
let output = query(instance, request).await;
|
||||
@@ -161,18 +168,17 @@ mod test {
|
||||
}
|
||||
|
||||
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
|
||||
for (_, dn) in instance.datanodes().iter() {
|
||||
assert!(dn
|
||||
.catalog_manager()
|
||||
.table(
|
||||
"greptime",
|
||||
"database_created_through_grpc",
|
||||
"table_created_through_grpc"
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
assert!(instance
|
||||
.frontend()
|
||||
.catalog_manager()
|
||||
.table(
|
||||
"greptime",
|
||||
"database_created_through_grpc",
|
||||
"table_created_through_grpc"
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
@@ -277,10 +283,9 @@ CREATE TABLE {table_name} (
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_insert_and_query() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let standalone =
|
||||
tests::create_standalone_instance("test_standalone_insert_and_query").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_insert_and_query")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
let table_name = "my_table";
|
||||
@@ -518,22 +523,31 @@ CREATE TABLE {table_name} (
|
||||
.collect::<HashMap<u32, u64>>();
|
||||
assert_eq!(region_to_dn_map.len(), expected_distribution.len());
|
||||
|
||||
for (region, dn) in region_to_dn_map.iter() {
|
||||
let stmt = QueryLanguageParser::parse_sql(&format!(
|
||||
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
|
||||
))
|
||||
let stmt = QueryLanguageParser::parse_sql(&format!(
|
||||
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
|
||||
))
|
||||
.unwrap();
|
||||
let LogicalPlan::DfPlan(plan) = instance
|
||||
.frontend()
|
||||
.statement_executor()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let dn = instance.datanodes().get(dn).unwrap();
|
||||
let engine = dn.query_engine();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
let plan = DFLogicalSubstraitConvertor.encode(&plan).unwrap();
|
||||
|
||||
for (region, dn) in region_to_dn_map.iter() {
|
||||
let region_server = instance.datanodes().get(dn).unwrap().region_server();
|
||||
|
||||
let region_id = RegionId::new(table_id, *region);
|
||||
|
||||
let stream = region_server
|
||||
.handle_read(RegionQueryRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
plan: plan.to_vec(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
|
||||
let Output::Stream(stream) = output else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let actual = recordbatches.pretty_print().unwrap();
|
||||
|
||||
@@ -680,9 +694,9 @@ CREATE TABLE {table_name} (
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_promql_query() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let standalone = tests::create_standalone_instance("test_standalone_promql_query").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_promql_query")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
let table_name = "my_table";
|
||||
|
||||
@@ -24,12 +24,14 @@ mod test {
|
||||
use servers::query_handler::InfluxdbLineProtocolHandler;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use crate::standalone::GreptimeDbStandaloneBuilder;
|
||||
use crate::tests;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_put_influxdb_lines() {
|
||||
let standalone =
|
||||
tests::create_standalone_instance("test_standalone_put_influxdb_lines").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_put_influxdb_lines")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
test_put_influxdb_lines(instance).await;
|
||||
@@ -44,8 +46,11 @@ mod test {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_put_influxdb_lines_without_time_column() {
|
||||
let standalone =
|
||||
tests::create_standalone_instance("test_standalone_put_influxdb_lines").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new(
|
||||
"test_standalone_put_influxdb_lines_without_time_column",
|
||||
)
|
||||
.build()
|
||||
.await;
|
||||
test_put_influxdb_lines_without_time_column(&standalone.instance).await;
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ mod tests {
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::QueryRequest;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
@@ -29,17 +30,23 @@ mod tests {
|
||||
use frontend::error::{self, Error, Result};
|
||||
use frontend::instance::Instance;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use sql::statements::statement::Statement;
|
||||
use store_api::storage::RegionId;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
|
||||
use crate::standalone::GreptimeDbStandaloneBuilder;
|
||||
use crate::tests;
|
||||
use crate::tests::MockDistributedInstance;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_exec_sql() {
|
||||
let standalone = tests::create_standalone_instance("test_standalone_exec_sql").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
|
||||
.build()
|
||||
.await;
|
||||
let instance = standalone.instance.as_ref();
|
||||
|
||||
let sql = r#"
|
||||
@@ -216,18 +223,27 @@ mod tests {
|
||||
assert_eq!(region_to_dn_map.len(), expected_distribution.len());
|
||||
|
||||
let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap();
|
||||
let LogicalPlan::DfPlan(plan) = instance
|
||||
.frontend()
|
||||
.statement_executor()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan = DFLogicalSubstraitConvertor.encode(&plan).unwrap();
|
||||
|
||||
for (region, dn) in region_to_dn_map.iter() {
|
||||
let dn = instance.datanodes().get(dn).unwrap();
|
||||
let engine = dn.query_engine();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt.clone(), QueryContext::arc())
|
||||
let region_server = instance.datanodes().get(dn).unwrap().region_server();
|
||||
|
||||
let region_id = RegionId::new(table_id, *region);
|
||||
|
||||
let stream = region_server
|
||||
.handle_read(QueryRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
plan: plan.to_vec(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
|
||||
let Output::Stream(stream) = output else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let actual = recordbatches.pretty_print().unwrap();
|
||||
|
||||
@@ -246,14 +262,13 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
|
||||
for (_, dn) in instance.datanodes().iter() {
|
||||
assert!(dn
|
||||
.catalog_manager()
|
||||
.table("greptime", "public", "demo")
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none())
|
||||
}
|
||||
assert!(instance
|
||||
.frontend()
|
||||
.catalog_manager()
|
||||
.table("greptime", "public", "demo")
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
@@ -314,13 +329,15 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let standalone = tests::create_standalone_instance("test_hook").await;
|
||||
let mut instance = standalone.instance;
|
||||
|
||||
let plugins = Plugins::new();
|
||||
let counter_hook = Arc::new(AssertionHook::default());
|
||||
plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
|
||||
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
|
||||
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
|
||||
.with_plugin(plugins)
|
||||
.build()
|
||||
.await;
|
||||
let instance = standalone.instance;
|
||||
|
||||
let sql = r#"CREATE TABLE demo(
|
||||
host STRING,
|
||||
@@ -374,13 +391,15 @@ mod tests {
|
||||
|
||||
let query_ctx = QueryContext::arc();
|
||||
|
||||
let standalone = tests::create_standalone_instance("test_db_hook").await;
|
||||
let mut instance = standalone.instance;
|
||||
|
||||
let plugins = Plugins::new();
|
||||
let hook = Arc::new(DisableDBOpHook);
|
||||
plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
|
||||
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
|
||||
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
|
||||
.with_plugin(plugins)
|
||||
.build()
|
||||
.await;
|
||||
let instance = standalone.instance;
|
||||
|
||||
let sql = r#"CREATE TABLE demo(
|
||||
host STRING,
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod catalog;
|
||||
pub mod cluster;
|
||||
mod grpc;
|
||||
mod influxdb;
|
||||
@@ -22,7 +21,7 @@ mod otlp;
|
||||
mod prom_store;
|
||||
pub mod test_util;
|
||||
|
||||
// TODO(LFC): Refactor: move instance structs out of mod "tests", like the `GreptimeDbCluster`.
|
||||
mod standalone;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
|
||||
@@ -25,11 +25,14 @@ mod tests {
|
||||
use servers::query_handler::OpentsdbProtocolHandler;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use crate::standalone::GreptimeDbStandaloneBuilder;
|
||||
use crate::tests;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_exec() {
|
||||
let standalone = tests::create_standalone_instance("test_standalone_exec").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
test_exec(instance).await;
|
||||
|
||||
@@ -30,11 +30,14 @@ mod test {
|
||||
use servers::query_handler::OpenTelemetryProtocolHandler;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use crate::standalone::GreptimeDbStandaloneBuilder;
|
||||
use crate::tests;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
pub async fn test_otlp_on_standalone() {
|
||||
let standalone = tests::create_standalone_instance("test_standalone_otlp").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_otlp")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
test_otlp(instance).await;
|
||||
|
||||
@@ -28,12 +28,14 @@ mod tests {
|
||||
use servers::query_handler::PromStoreProtocolHandler;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use crate::standalone::GreptimeDbStandaloneBuilder;
|
||||
use crate::tests;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_standalone_prom_store_remote_rw() {
|
||||
let standalone =
|
||||
tests::create_standalone_instance("test_standalone_prom_store_remote_rw").await;
|
||||
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_prom_store_remote_rw")
|
||||
.build()
|
||||
.await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
test_prom_store_remote_rw(instance).await;
|
||||
|
||||
112
tests-integration/src/standalone.rs
Normal file
112
tests-integration/src/standalone.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::remote::DummyKvCacheInvalidator;
|
||||
use common_base::Plugins;
|
||||
use common_config::KvStoreConfig;
|
||||
use common_procedure::options::ProcedureConfig;
|
||||
use datanode::datanode::builder::DatanodeBuilder;
|
||||
use datanode::datanode::DatanodeOptions;
|
||||
use frontend::catalog::FrontendCatalogManager;
|
||||
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
||||
|
||||
use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
|
||||
|
||||
pub struct GreptimeDbStandalone {
|
||||
pub instance: Arc<Instance>,
|
||||
pub datanode_opts: DatanodeOptions,
|
||||
pub guard: TestGuard,
|
||||
}
|
||||
|
||||
pub struct GreptimeDbStandaloneBuilder {
|
||||
instance_name: String,
|
||||
store_type: Option<StorageType>,
|
||||
plugin: Option<Plugins>,
|
||||
}
|
||||
|
||||
impl GreptimeDbStandaloneBuilder {
|
||||
pub fn new(instance_name: &str) -> Self {
|
||||
Self {
|
||||
instance_name: instance_name.to_string(),
|
||||
store_type: None,
|
||||
plugin: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_store_type(self, store_type: StorageType) -> Self {
|
||||
Self {
|
||||
store_type: Some(store_type),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn with_plugin(self, plugin: Plugins) -> Self {
|
||||
Self {
|
||||
plugin: Some(plugin),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build(self) -> GreptimeDbStandalone {
|
||||
let store_type = self.store_type.unwrap_or(StorageType::File);
|
||||
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, &self.instance_name);
|
||||
|
||||
let (kv_store, procedure_manager) = Instance::try_build_standalone_components(
|
||||
format!("{}/kv", &opts.storage.data_home),
|
||||
KvStoreConfig::default(),
|
||||
ProcedureConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plugins = Arc::new(self.plugin.unwrap_or_default());
|
||||
|
||||
let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone())
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let catalog_manager = FrontendCatalogManager::new(
|
||||
kv_store.clone(),
|
||||
Arc::new(DummyKvCacheInvalidator),
|
||||
Arc::new(StandaloneDatanodeManager(datanode.region_server())),
|
||||
);
|
||||
|
||||
catalog_manager
|
||||
.table_metadata_manager_ref()
|
||||
.init()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let instance = Instance::try_new_standalone(
|
||||
kv_store,
|
||||
procedure_manager,
|
||||
catalog_manager,
|
||||
plugins,
|
||||
datanode.region_server(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
GreptimeDbStandalone {
|
||||
instance: Arc::new(instance),
|
||||
datanode_opts: opts,
|
||||
guard,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,29 +21,21 @@ use std::time::Duration;
|
||||
|
||||
use auth::UserProviderRef;
|
||||
use axum::Router;
|
||||
use catalog::{CatalogManagerRef, RegisterTableRequest};
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
|
||||
};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::util;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use common_test_util::ports;
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use datanode::datanode::{
|
||||
AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig,
|
||||
ProcedureConfig, S3Config, StorageConfig, WalConfig,
|
||||
AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
|
||||
StorageConfig,
|
||||
};
|
||||
use datanode::error::{CreateTableSnafu, Result};
|
||||
use datanode::instance::Instance;
|
||||
use datanode::sql::SqlHandler;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use datatypes::vectors::{
|
||||
Float64VectorBuilder, MutableVector, StringVectorBuilder, TimestampMillisecondVectorBuilder,
|
||||
};
|
||||
use frontend::instance::Instance as FeInstance;
|
||||
use frontend::instance::Instance;
|
||||
use frontend::service_config::{MysqlOptions, PostgresOptions};
|
||||
use object_store::services::{Azblob, Gcs, Oss, S3};
|
||||
use object_store::test_util::TempFolder;
|
||||
@@ -56,12 +48,13 @@ use servers::metrics_handler::MetricsHandler;
|
||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||
use servers::postgres::PostgresServer;
|
||||
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
|
||||
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
|
||||
use servers::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler};
|
||||
use servers::server::Server;
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::{CreateTableRequest, InsertRequest, TableOptions};
|
||||
use session::context::QueryContext;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
|
||||
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum StorageType {
|
||||
@@ -254,18 +247,16 @@ pub enum TempDirGuard {
|
||||
|
||||
pub struct TestGuard {
|
||||
pub home_guard: FileDirGuard,
|
||||
pub wal_guard: FileDirGuard,
|
||||
pub storage_guard: StorageGuard,
|
||||
}
|
||||
|
||||
pub struct FileDirGuard {
|
||||
pub temp_dir: TempDir,
|
||||
pub is_wal: bool,
|
||||
}
|
||||
|
||||
impl FileDirGuard {
|
||||
pub fn new(temp_dir: TempDir, is_wal: bool) -> Self {
|
||||
Self { temp_dir, is_wal }
|
||||
pub fn new(temp_dir: TempDir) -> Self {
|
||||
Self { temp_dir }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -288,124 +279,77 @@ pub fn create_tmp_dir_and_datanode_opts(
|
||||
name: &str,
|
||||
) -> (DatanodeOptions, TestGuard) {
|
||||
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
|
||||
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
|
||||
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
|
||||
let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let (store, data_tmp_dir) = get_test_store_config(&store_type);
|
||||
let opts = create_datanode_opts(store, home_dir, wal_dir);
|
||||
let opts = create_datanode_opts(store, home_dir);
|
||||
|
||||
(
|
||||
opts,
|
||||
TestGuard {
|
||||
home_guard: FileDirGuard::new(home_tmp_dir, false),
|
||||
wal_guard: FileDirGuard::new(wal_tmp_dir, true),
|
||||
home_guard: FileDirGuard::new(home_tmp_dir),
|
||||
storage_guard: StorageGuard(data_tmp_dir),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn create_datanode_opts(
|
||||
store: ObjectStoreConfig,
|
||||
home_dir: String,
|
||||
wal_dir: String,
|
||||
) -> DatanodeOptions {
|
||||
pub(crate) fn create_datanode_opts(store: ObjectStoreConfig, home_dir: String) -> DatanodeOptions {
|
||||
DatanodeOptions {
|
||||
wal: WalConfig {
|
||||
dir: Some(wal_dir),
|
||||
..Default::default()
|
||||
},
|
||||
storage: StorageConfig {
|
||||
data_home: home_dir,
|
||||
store,
|
||||
..Default::default()
|
||||
},
|
||||
mode: Mode::Standalone,
|
||||
procedure: ProcedureConfig::default(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_test_table(
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
sql_handler: &SqlHandler,
|
||||
ts_type: ConcreteDataType,
|
||||
table_name: &str,
|
||||
) -> Result<()> {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
|
||||
];
|
||||
let table_engine: TableEngineRef = sql_handler
|
||||
.table_engine_manager()
|
||||
.engine(MITO_ENGINE)
|
||||
.unwrap();
|
||||
let table = table_engine
|
||||
.create_table(
|
||||
&EngineContext::default(),
|
||||
CreateTableRequest {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some(" a test table".to_string()),
|
||||
schema: RawSchema::new(column_schemas),
|
||||
create_if_not_exists: true,
|
||||
primary_key_indices: vec![0], // "host" is in primary keys
|
||||
table_options: TableOptions::default(),
|
||||
region_numbers: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context(CreateTableSnafu { table_name })?;
|
||||
pub(crate) async fn create_test_table(instance: &Instance, table_name: &str) {
|
||||
let sql = format!(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
host String NOT NULL PRIMARY KEY,
|
||||
cpu DOUBLE NULL,
|
||||
memory DOUBLE NULL,
|
||||
ts TIMESTAMP NULL TIME INDEX
|
||||
)
|
||||
"#
|
||||
);
|
||||
|
||||
let req = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id: table.table_info().ident.table_id,
|
||||
table,
|
||||
};
|
||||
let _ = catalog_manager.register_table(req).await.unwrap();
|
||||
Ok(())
|
||||
let _ = instance.do_query(&sql, QueryContext::arc()).await;
|
||||
}
|
||||
|
||||
async fn setup_standalone_instance(
|
||||
test_name: &str,
|
||||
store_type: StorageType,
|
||||
) -> GreptimeDbStandalone {
|
||||
let instance = GreptimeDbStandaloneBuilder::new(test_name)
|
||||
.with_store_type(store_type)
|
||||
.build()
|
||||
.await;
|
||||
|
||||
create_test_table(instance.instance.as_ref(), "demo").await;
|
||||
|
||||
instance
|
||||
}
|
||||
|
||||
pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
|
||||
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
|
||||
create_test_table(
|
||||
instance.catalog_manager(),
|
||||
instance.sql_handler(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
"demo",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let frontend_instance = FeInstance::try_new_standalone(instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
|
||||
let http_opts = HttpOptions {
|
||||
addr: format!("127.0.0.1:{}", ports::get_port()),
|
||||
..Default::default()
|
||||
};
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new(
|
||||
frontend_instance,
|
||||
)))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()))
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(
|
||||
instance.instance.clone(),
|
||||
))
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
|
||||
.build();
|
||||
(http_server.build(http_server.make_app()), guard)
|
||||
(http_server.build(http_server.make_app()), instance.guard)
|
||||
}
|
||||
|
||||
pub async fn setup_test_http_app_with_frontend(
|
||||
@@ -420,37 +364,22 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
|
||||
name: &str,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
) -> (Router, TestGuard) {
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
|
||||
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
let frontend = FeInstance::try_new_standalone(instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
create_test_table(
|
||||
frontend.catalog_manager(),
|
||||
instance.sql_handler(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
"demo",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
|
||||
let http_opts = HttpOptions {
|
||||
addr: format!("127.0.0.1:{}", ports::get_port()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let frontend_ref = Arc::new(frontend);
|
||||
let mut http_server = HttpServerBuilder::new(http_opts);
|
||||
|
||||
http_server
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_script_handler(frontend_ref)
|
||||
.with_greptime_config_options(opts.to_toml_string());
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(
|
||||
instance.instance.clone(),
|
||||
))
|
||||
.with_script_handler(instance.instance.clone())
|
||||
.with_greptime_config_options(instance.datanode_opts.to_toml_string());
|
||||
|
||||
if let Some(user_provider) = user_provider {
|
||||
http_server.with_user_provider(user_provider);
|
||||
@@ -459,7 +388,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
|
||||
let http_server = http_server.build();
|
||||
|
||||
let app = http_server.build(http_server.make_app());
|
||||
(app, guard)
|
||||
(app, instance.guard)
|
||||
}
|
||||
|
||||
fn mock_insert_request(host: &str, cpu: f64, memory: f64, ts: i64) -> InsertRequest {
|
||||
@@ -500,25 +429,11 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
name: &str,
|
||||
) -> (Router, TestGuard) {
|
||||
std::env::set_var("TZ", "UTC");
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
|
||||
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
let frontend = FeInstance::try_new_standalone(instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
|
||||
create_test_table(
|
||||
frontend.catalog_manager(),
|
||||
instance.sql_handler(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
"demo",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let demo = frontend
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
|
||||
let demo = instance
|
||||
.instance
|
||||
.catalog_manager()
|
||||
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "demo")
|
||||
.await
|
||||
@@ -538,17 +453,17 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
addr: format!("127.0.0.1:{}", ports::get_port()),
|
||||
..Default::default()
|
||||
};
|
||||
let frontend_ref = Arc::new(frontend);
|
||||
let frontend_ref = instance.instance.clone();
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
|
||||
.with_script_handler(frontend_ref.clone())
|
||||
.with_prom_handler(frontend_ref.clone())
|
||||
.with_prometheus_handler(frontend_ref)
|
||||
.with_greptime_config_options(opts.to_toml_string())
|
||||
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
|
||||
.build();
|
||||
let app = http_server.build(http_server.make_app());
|
||||
(app, guard)
|
||||
(app, instance.guard)
|
||||
}
|
||||
|
||||
pub async fn setup_grpc_server(
|
||||
@@ -563,10 +478,7 @@ pub async fn setup_grpc_server_with_user_provider(
|
||||
name: &str,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
) -> (String, TestGuard, Arc<GrpcServer>) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
|
||||
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
|
||||
let runtime = Arc::new(
|
||||
RuntimeBuilder::default()
|
||||
@@ -576,14 +488,7 @@ pub async fn setup_grpc_server_with_user_provider(
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let fe_instance = FeInstance::try_new_standalone(instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
let fe_instance_ref = Arc::new(fe_instance);
|
||||
let fe_instance_ref = instance.instance.clone();
|
||||
let flight_handler = Arc::new(GreptimeRequestHandler::new(
|
||||
ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()),
|
||||
user_provider.clone(),
|
||||
@@ -608,7 +513,7 @@ pub async fn setup_grpc_server_with_user_provider(
|
||||
// wait for GRPC server to start
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
(fe_grpc_addr, guard, fe_grpc_server)
|
||||
(fe_grpc_addr, instance.guard, fe_grpc_server)
|
||||
}
|
||||
|
||||
pub async fn check_output_stream(output: Output, expected: &str) {
|
||||
@@ -633,10 +538,7 @@ pub async fn setup_mysql_server_with_user_provider(
|
||||
name: &str,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
) -> (String, TestGuard, Arc<Box<dyn Server>>) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
|
||||
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
|
||||
let runtime = Arc::new(
|
||||
RuntimeBuilder::default()
|
||||
@@ -648,14 +550,7 @@ pub async fn setup_mysql_server_with_user_provider(
|
||||
|
||||
let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port());
|
||||
|
||||
let fe_instance = FeInstance::try_new_standalone(instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
let fe_instance_ref = Arc::new(fe_instance);
|
||||
let fe_instance_ref = instance.instance.clone();
|
||||
let opts = MysqlOptions {
|
||||
addr: fe_mysql_addr.clone(),
|
||||
..Default::default()
|
||||
@@ -682,7 +577,7 @@ pub async fn setup_mysql_server_with_user_provider(
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
(fe_mysql_addr, guard, fe_mysql_server)
|
||||
(fe_mysql_addr, instance.guard, fe_mysql_server)
|
||||
}
|
||||
|
||||
pub async fn setup_pg_server(
|
||||
@@ -697,10 +592,7 @@ pub async fn setup_pg_server_with_user_provider(
|
||||
name: &str,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
) -> (String, TestGuard, Arc<Box<dyn Server>>) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
|
||||
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
|
||||
let runtime = Arc::new(
|
||||
RuntimeBuilder::default()
|
||||
@@ -712,14 +604,7 @@ pub async fn setup_pg_server_with_user_provider(
|
||||
|
||||
let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port());
|
||||
|
||||
let fe_instance = FeInstance::try_new_standalone(instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
}
|
||||
let fe_instance_ref = Arc::new(fe_instance);
|
||||
let fe_instance_ref = instance.instance.clone();
|
||||
let opts = PostgresOptions {
|
||||
addr: fe_pg_addr.clone(),
|
||||
..Default::default()
|
||||
@@ -740,5 +625,5 @@ pub async fn setup_pg_server_with_user_provider(
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
(fe_pg_addr, guard, fe_pg_server)
|
||||
(fe_pg_addr, instance.guard, fe_pg_server)
|
||||
}
|
||||
|
||||
@@ -19,13 +19,11 @@ mod test_util;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::RegisterSchemaRequest;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use datanode::instance::Instance as DatanodeInstance;
|
||||
use datanode::datanode::Datanode;
|
||||
use frontend::instance::Instance;
|
||||
|
||||
use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
|
||||
use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
|
||||
|
||||
pub struct MockDistributedInstance(GreptimeDbCluster);
|
||||
|
||||
@@ -34,7 +32,7 @@ impl MockDistributedInstance {
|
||||
self.0.frontend.clone()
|
||||
}
|
||||
|
||||
pub fn datanodes(&self) -> &HashMap<u64, Arc<DatanodeInstance>> {
|
||||
pub fn datanodes(&self) -> &HashMap<u64, Datanode> {
|
||||
&self.0.datanode_instances
|
||||
}
|
||||
|
||||
@@ -43,48 +41,6 @@ impl MockDistributedInstance {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockStandaloneInstance {
|
||||
pub instance: Arc<Instance>,
|
||||
_guard: TestGuard,
|
||||
}
|
||||
|
||||
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
|
||||
let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts, Default::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
dn_instance.start().await.unwrap();
|
||||
|
||||
assert!(dn_instance
|
||||
.catalog_manager()
|
||||
.clone()
|
||||
.register_catalog("another_catalog".to_string())
|
||||
.await
|
||||
.is_ok());
|
||||
let req = RegisterSchemaRequest {
|
||||
catalog: "another_catalog".to_string(),
|
||||
schema: "another_schema".to_string(),
|
||||
};
|
||||
assert!(dn_instance
|
||||
.catalog_manager()
|
||||
.register_schema(req)
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
if let Some(heartbeat) = heartbeat {
|
||||
heartbeat.start().await.unwrap();
|
||||
};
|
||||
MockStandaloneInstance {
|
||||
instance: Arc::new(frontend_instance),
|
||||
_guard: guard,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance {
|
||||
let cluster = GreptimeDbClusterBuilder::new(test_name).build().await;
|
||||
MockDistributedInstance(cluster)
|
||||
|
||||
@@ -20,10 +20,8 @@ use common_test_util::find_workspace_path;
|
||||
use frontend::instance::Instance;
|
||||
use rstest_reuse::{self, template};
|
||||
|
||||
use crate::tests::{
|
||||
create_distributed_instance, create_standalone_instance, MockDistributedInstance,
|
||||
MockStandaloneInstance,
|
||||
};
|
||||
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
|
||||
use crate::tests::{create_distributed_instance, MockDistributedInstance};
|
||||
|
||||
pub(crate) trait MockInstance {
|
||||
fn frontend(&self) -> Arc<Instance>;
|
||||
@@ -31,7 +29,7 @@ pub(crate) trait MockInstance {
|
||||
fn is_distributed_mode(&self) -> bool;
|
||||
}
|
||||
|
||||
impl MockInstance for MockStandaloneInstance {
|
||||
impl MockInstance for GreptimeDbStandalone {
|
||||
fn frontend(&self) -> Arc<Instance> {
|
||||
self.instance.clone()
|
||||
}
|
||||
@@ -53,7 +51,7 @@ impl MockInstance for MockDistributedInstance {
|
||||
|
||||
pub(crate) async fn standalone() -> Arc<dyn MockInstance> {
|
||||
let test_name = uuid::Uuid::new_v4().to_string();
|
||||
let instance = create_standalone_instance(&test_name).await;
|
||||
let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await;
|
||||
Arc::new(instance)
|
||||
}
|
||||
|
||||
|
||||
@@ -250,14 +250,14 @@ pub async fn test_insert_and_select(store_type: StorageType) {
|
||||
//alter
|
||||
let add_column = ColumnDef {
|
||||
name: "test_column".to_string(),
|
||||
datatype: ColumnDataType::Int64.into(),
|
||||
data_type: ColumnDataType::Int64.into(),
|
||||
is_nullable: true,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
};
|
||||
let kind = Kind::AddColumns(AddColumns {
|
||||
add_columns: vec![AddColumn {
|
||||
column_def: Some(add_column),
|
||||
is_key: false,
|
||||
location: None,
|
||||
}],
|
||||
});
|
||||
@@ -266,7 +266,6 @@ pub async fn test_insert_and_select(store_type: StorageType) {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
kind: Some(kind),
|
||||
..Default::default()
|
||||
};
|
||||
let result = db.alter(expr).await.unwrap();
|
||||
assert!(matches!(result, Output::AffectedRows(0)));
|
||||
@@ -342,27 +341,31 @@ fn testing_create_expr() -> CreateTableExpr {
|
||||
let column_defs = vec![
|
||||
ColumnDef {
|
||||
name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
data_type: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "cpu".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
data_type: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "memory".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
data_type: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "ts".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32, // timestamp
|
||||
data_type: ColumnDataType::TimestampMillisecond as i32, // timestamp
|
||||
is_nullable: true,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
},
|
||||
];
|
||||
CreateTableExpr {
|
||||
|
||||
@@ -19,10 +19,9 @@ use api::v1::meta::Peer;
|
||||
use catalog::remote::CachedMetaKvBackend;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::key::table_name::{TableNameKey, TableNameValue};
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteKey;
|
||||
use common_meta::key::{RegionDistribution, TableMetaKey};
|
||||
use common_meta::rpc::router::TableRoute;
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_meta::RegionIdent;
|
||||
use common_procedure::{watcher, ProcedureWithId};
|
||||
use common_query::Output;
|
||||
@@ -109,12 +108,12 @@ pub async fn test_region_failover(store_type: StorageType) {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
let cache_key = TableNameKey::new("greptime", "public", "my_table").as_raw_key();
|
||||
|
||||
let cache = get_table_cache(&frontend, &cache_key).unwrap();
|
||||
let table_name_value = TableNameValue::try_from_raw_value(&cache.unwrap().value).unwrap();
|
||||
let table_id = table_name_value.table_id();
|
||||
assert!(get_route_cache(&frontend, table_id).is_some());
|
||||
let table_id = get_table_id(
|
||||
&frontend,
|
||||
TableNameKey::new("greptime", "public", "my_table"),
|
||||
)
|
||||
.await;
|
||||
assert!(has_route_cache(&frontend, table_id).await);
|
||||
|
||||
let distribution = find_region_distribution(&cluster, table_id).await;
|
||||
info!("Find region distribution: {distribution:?}");
|
||||
@@ -145,8 +144,7 @@ pub async fn test_region_failover(store_type: StorageType) {
|
||||
// Waits for invalidating table cache
|
||||
time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
let route_cache = get_route_cache(&frontend, table_id);
|
||||
assert!(route_cache.is_none());
|
||||
assert!(!has_route_cache(&frontend, table_id).await);
|
||||
|
||||
// Inserts data to each datanode after failover
|
||||
let frontend = cluster.frontend.clone();
|
||||
@@ -167,33 +165,41 @@ pub async fn test_region_failover(store_type: StorageType) {
|
||||
assert!(success)
|
||||
}
|
||||
|
||||
fn get_table_cache(instance: &Arc<Instance>, key: &[u8]) -> Option<Option<KeyValue>> {
|
||||
async fn get_table_id(instance: &Arc<Instance>, key: TableNameKey<'_>) -> TableId {
|
||||
let catalog_manager = instance
|
||||
.catalog_manager()
|
||||
.as_any()
|
||||
.downcast_ref::<FrontendCatalogManager>()
|
||||
.unwrap();
|
||||
|
||||
let kvbackend = catalog_manager.backend();
|
||||
|
||||
let kvbackend = kvbackend
|
||||
.as_any()
|
||||
.downcast_ref::<CachedMetaKvBackend>()
|
||||
.unwrap();
|
||||
let cache = kvbackend.cache();
|
||||
|
||||
Some(cache.get(key))
|
||||
catalog_manager
|
||||
.table_metadata_manager_ref()
|
||||
.table_name_manager()
|
||||
.get(key)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.table_id()
|
||||
}
|
||||
|
||||
fn get_route_cache(instance: &Arc<Instance>, table_id: TableId) -> Option<Arc<TableRoute>> {
|
||||
async fn has_route_cache(instance: &Arc<Instance>, table_id: TableId) -> bool {
|
||||
let catalog_manager = instance
|
||||
.catalog_manager()
|
||||
.as_any()
|
||||
.downcast_ref::<FrontendCatalogManager>()
|
||||
.unwrap();
|
||||
let pm = catalog_manager.partition_manager();
|
||||
let cache = pm.table_routes().cache();
|
||||
cache.get(&table_id)
|
||||
|
||||
let kv_backend = catalog_manager.table_metadata_manager_ref().kv_backend();
|
||||
|
||||
let cache = kv_backend
|
||||
.as_any()
|
||||
.downcast_ref::<CachedMetaKvBackend>()
|
||||
.unwrap()
|
||||
.cache();
|
||||
|
||||
cache
|
||||
.get(TableRouteKey::new(table_id).as_raw_key().as_slice())
|
||||
.is_some()
|
||||
}
|
||||
|
||||
async fn write_datas(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
|
||||
@@ -354,6 +360,7 @@ async fn run_region_failover_procedure(
|
||||
let procedure = RegionFailoverProcedure::new(
|
||||
failed_region.clone(),
|
||||
RegionFailoverContext {
|
||||
region_lease_secs: 10,
|
||||
in_memory: meta_srv.in_memory().clone(),
|
||||
mailbox: meta_srv.mailbox().clone(),
|
||||
selector,
|
||||
|
||||
Reference in New Issue
Block a user