diff --git a/.env.example b/.env.example new file mode 100644 index 0000000000..9117d0d4c6 --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +# Settings for s3 test +GT_S3_BUCKET=S3 bucket +GT_S3_ACCESS_KEY_ID=S3 access key id +GT_S3_ACCESS_KEY=S3 secret access key diff --git a/.gitignore b/.gitignore index 842b7967d6..1cb44bbdf1 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,6 @@ logs/ # Benchmark dataset benchmarks/data + +# dotenv +.env diff --git a/Cargo.lock b/Cargo.lock index f9de198cc2..84c7b25a4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1984,6 +1984,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dyn-clone" version = "1.0.9" @@ -3660,6 +3666,7 @@ dependencies = [ "opendal", "tempdir", "tokio", + "uuid", ] [[package]] @@ -6199,8 +6206,13 @@ dependencies = [ "common-telemetry", "datanode", "datatypes", + "dotenv", "frontend", "mito", + "object-store", + "once_cell", + "paste", + "rand 0.8.5", "serde", "serde_json", "servers", @@ -6209,6 +6221,7 @@ dependencies = [ "table", "tempdir", "tokio", + "uuid", ] [[package]] @@ -6985,6 +6998,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" dependencies = [ "getrandom 0.2.7", + "serde", ] [[package]] diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index d386bfa64e..4afc1ea619 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -170,6 +170,7 @@ mod tests { ObjectStoreConfig::File { data_dir } => { assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir) } + ObjectStoreConfig::S3 { .. } => unreachable!(), }; } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index af62014697..ccc8b0d3c6 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -26,7 +26,15 @@ use crate::server::Services; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ObjectStoreConfig { - File { data_dir: String }, + File { + data_dir: String, + }, + S3 { + bucket: String, + root: String, + access_key_id: String, + secret_access_key: String, + }, } impl Default for ObjectStoreConfig { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 7a97abcad2..fa5fb8c4b4 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -18,6 +18,8 @@ use common_error::prelude::*; use storage::error::Error as StorageError; use table::error::Error as TableError; +use crate::datanode::ObjectStoreConfig; + /// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -142,9 +144,9 @@ pub enum Error { #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, - #[snafu(display("Failed to init backend, dir: {}, source: {}", dir, source))] + #[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))] InitBackend { - dir: String, + config: ObjectStoreConfig, source: object_store::Error, backtrace: Backtrace, }, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index cd7c123572..27cd13e12e 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -28,7 +28,8 @@ use meta_client::MetaClientOpts; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; -use object_store::services::fs::Builder; +use object_store::services::fs::Builder as FsBuilder; +use object_store::services::s3::Builder as S3Builder; use object_store::{util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use servers::Mode; @@ -187,35 +188,64 @@ impl Instance { } pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { - // TODO(dennis): supports other backend - let data_dir = util::normalize_dir(match store_config { - ObjectStoreConfig::File { data_dir } => data_dir, - }); + let object_store = match store_config { + ObjectStoreConfig::File { data_dir } => new_fs_object_store(data_dir).await, + ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await, + }; + object_store.map(|object_store| { + object_store + .layer(RetryLayer::new(ExponentialBackoff::default().with_jitter())) + .layer(MetricsLayer) + .layer(LoggingLayer) + .layer(TracingLayer) + }) +} + +pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { + let (root, secret_key, key_id, bucket) = match store_config { + ObjectStoreConfig::S3 { + bucket, + root, + access_key_id, + secret_access_key, + } => (root, secret_access_key, access_key_id, bucket), + _ => unreachable!(), + }; + + let root = util::normalize_dir(root); + info!("The s3 storage bucket is: {}, root is: {}", bucket, &root); + + let accessor = S3Builder::default() + .root(&root) + .bucket(bucket) + .access_key_id(key_id) + .secret_access_key(secret_key) + .build() + .with_context(|_| error::InitBackendSnafu { + config: store_config.clone(), + })?; + + Ok(ObjectStore::new(accessor)) +} + +pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result { + let data_dir = util::normalize_dir(data_dir); fs::create_dir_all(path::Path::new(&data_dir)) .context(error::CreateDirSnafu { dir: &data_dir })?; - - info!("The storage directory is: {}", &data_dir); + info!("The file storage directory is: {}", &data_dir); let atomic_write_dir = format!("{}/.tmp/", data_dir); - let accessor = Builder::default() + let accessor = FsBuilder::default() .root(&data_dir) .atomic_write_dir(&atomic_write_dir) .build() - .context(error::InitBackendSnafu { dir: &data_dir })?; + .context(error::InitBackendSnafu { + config: ObjectStoreConfig::File { data_dir }, + })?; - let object_store = ObjectStore::new(accessor) - // Add retry - .layer(RetryLayer::new(ExponentialBackoff::default().with_jitter())) - // Add metrics - .layer(MetricsLayer) - // Add logging - .layer(LoggingLayer) - // Add tracing - .layer(TracingLayer); - - Ok(object_store) + Ok(ObjectStore::new(accessor)) } /// Create metasrv client instance and spawn heartbeat loop. diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 7bbaabdc5b..b7423bbc41 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -13,3 +13,4 @@ tokio = { version = "1.0", features = ["full"] } anyhow = "1.0" common-telemetry = { path = "../common/telemetry" } tempdir = "0.3" +uuid = { version = "1", features = ["serde", "v4"] } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 94155be6fe..7d6673d647 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -15,7 +15,8 @@ pub use opendal::raw::SeekableReader; pub use opendal::{ layers, services, Error, ErrorKind, Layer, Object, ObjectLister, ObjectMetadata, ObjectMode, - Operator as ObjectStore, + Operator as ObjectStore, Result, }; pub mod backend; +pub mod test_util; pub mod util; diff --git a/src/object-store/src/test_util.rs b/src/object-store/src/test_util.rs new file mode 100644 index 0000000000..d443aaf005 --- /dev/null +++ b/src/object-store/src/test_util.rs @@ -0,0 +1,35 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ObjectStore, Result}; + +pub struct TempFolder { + store: ObjectStore, + // The path under root. + path: String, +} + +impl TempFolder { + pub fn new(store: &ObjectStore, path: &str) -> Self { + Self { + store: store.clone(), + path: path.to_string(), + } + } + + pub async fn remove_all(&mut self) -> Result<()> { + let batch = self.store.batch(); + batch.remove_all(&self.path).await + } +} diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 21f704a503..33cba429fd 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -17,6 +17,7 @@ use std::env; use anyhow::Result; use common_telemetry::logging; use object_store::backend::{fs, s3}; +use object_store::test_util::TempFolder; use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore}; use tempdir::TempDir; @@ -110,15 +111,21 @@ async fn test_s3_backend() -> Result<()> { if !bucket.is_empty() { logging::info!("Running s3 test."); + let root = uuid::Uuid::new_v4().to_string(); + let accessor = s3::Builder::default() + .root(&root) .access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?) .secret_access_key(&env::var("GT_S3_ACCESS_KEY")?) .bucket(&bucket) .build()?; let store = ObjectStore::new(accessor); + + let mut guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; + guard.remove_all().await?; } } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index c2cc5fb9af..1a7107fc8f 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -15,8 +15,12 @@ common-runtime = { path = "../src/common/runtime" } common-telemetry = { path = "../src/common/telemetry" } datanode = { path = "../src/datanode" } datatypes = { path = "../src/datatypes" } +dotenv = "0.15" frontend = { path = "../src/frontend" } mito = { path = "../src/mito", features = ["test"] } +object-store = { path = "../src/object-store" } +once_cell = "1.16" +rand = "0.8" serde = "1.0" serde_json = "1.0" servers = { path = "../src/servers" } @@ -25,3 +29,7 @@ sql = { path = "../src/sql" } table = { path = "../src/table" } tempdir = "0.3" tokio = { version = "1.20", features = ["full"] } +uuid = { version = "1", features = ["serde", "v4"] } + +[dev-dependencies] +paste = "1.0" diff --git a/tests-integration/README.md b/tests-integration/README.md new file mode 100644 index 0000000000..ec0905504a --- /dev/null +++ b/tests-integration/README.md @@ -0,0 +1,26 @@ +## Setup + +To run the integration test, please copy `.env.example` to `.env` in the project root folder and change the values on need. + +Take `s3` for example. You need to set your S3 bucket, access key id and secret key: + +```sh +# Settings for s3 test +GT_S3_BUCKET=S3 bucket +GT_S3_ACCESS_KEY_ID=S3 access key id +GT_S3_ACCESS_KEY=S3 secret access key +``` + +## Run + +Execute the following command in the project root folder: + +``` +cargo test integration +``` + +Test s3 storage: + +``` +cargo test s3 +``` diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 9ae6ac7751..ea4d3e12db 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -13,36 +13,144 @@ // limitations under the License. use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::time::Duration; +use axum::Router; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_runtime::Builder as RuntimeBuilder; use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::error::{CreateTableSnafu, Result}; +use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use frontend::frontend::FrontendOptions; +use frontend::grpc::GrpcOptions; +use frontend::instance::{FrontendInstance, Instance as FeInstance}; +use object_store::backend::s3; +use object_store::test_util::TempFolder; +use object_store::ObjectStore; +use once_cell::sync::OnceCell; +use rand::Rng; +use servers::grpc::GrpcServer; +use servers::http::{HttpOptions, HttpServer}; +use servers::server::Server; use servers::Mode; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; use tempdir::TempDir; +static PORTS: OnceCell = OnceCell::new(); + +fn get_port() -> usize { + PORTS + .get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3500..3900))) + .fetch_add(1, Ordering::Relaxed) +} + +pub enum StorageType { + S3, + File, +} + +impl StorageType { + pub fn test_on(&self) -> bool { + let _ = dotenv::dotenv(); + + match self { + StorageType::File => true, // always test file + StorageType::S3 => { + if let Ok(b) = env::var("GT_S3_BUCKET") { + !b.is_empty() + } else { + false + } + } + } + } +} + +fn get_test_store_config( + store_type: &StorageType, + name: &str, +) -> (ObjectStoreConfig, Option) { + let _ = dotenv::dotenv(); + + match store_type { + StorageType::S3 => { + let root = uuid::Uuid::new_v4().to_string(); + let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap(); + let secret_key = env::var("GT_S3_ACCESS_KEY").unwrap(); + let bucket = env::var("GT_S3_BUCKET").unwrap(); + + let accessor = s3::Builder::default() + .root(&root) + .access_key_id(&key_id) + .secret_access_key(&secret_key) + .bucket(&bucket) + .build() + .unwrap(); + + let config = ObjectStoreConfig::S3 { + root, + bucket, + access_key_id: key_id, + secret_access_key: secret_key, + }; + + let store = ObjectStore::new(accessor); + + (config, Some(TempDirGuard::S3(TempFolder::new(&store, "/")))) + } + StorageType::File => { + let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap(); + + ( + ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + Some(TempDirGuard::File(data_tmp_dir)), + ) + } + } +} + +enum TempDirGuard { + File(TempDir), + S3(TempFolder), +} /// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, /// Only for test. pub struct TestGuard { _wal_tmp_dir: TempDir, - _data_tmp_dir: TempDir, + data_tmp_dir: Option, } -pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) { +impl TestGuard { + pub async fn remove_all(&mut self) { + if let Some(TempDirGuard::S3(mut guard)) = self.data_tmp_dir.take() { + guard.remove_all().await.unwrap(); + } + } +} + +pub fn create_tmp_dir_and_datanode_opts( + store_type: StorageType, + name: &str, +) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap(); - let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap(); + + let (storage, data_tmp_dir) = get_test_store_config(&store_type, name); + let opts = DatanodeOptions { wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), - storage: ObjectStoreConfig::File { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, + storage, mode: Mode::Standalone, ..Default::default() }; @@ -50,7 +158,7 @@ pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGua opts, TestGuard { _wal_tmp_dir: wal_tmp_dir, - _data_tmp_dir: data_tmp_dir, + data_tmp_dir, }, ) } @@ -105,3 +213,121 @@ pub async fn create_test_table( .unwrap(); Ok(()) } + +async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { + let fe_opts = FrontendOptions::default(); + let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap(); + frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); + frontend_instance.set_script_handler(datanode_instance); + frontend_instance +} + +pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + instance.start().await.unwrap(); + create_test_table( + instance.catalog_manager(), + instance.sql_handler(), + ConcreteDataType::timestamp_millis_datatype(), + ) + .await + .unwrap(); + let http_server = HttpServer::new(instance, HttpOptions::default()); + (http_server.make_app(), guard) +} + +pub async fn setup_test_app_with_frontend( + store_type: StorageType, + name: &str, +) -> (Router, TestGuard) { + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let mut frontend = build_frontend_instance(instance.clone()).await; + instance.start().await.unwrap(); + create_test_table( + frontend.catalog_manager().as_ref().unwrap(), + instance.sql_handler(), + ConcreteDataType::timestamp_millis_datatype(), + ) + .await + .unwrap(); + frontend.start().await.unwrap(); + let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default()); + http_server.set_script_handler(instance.clone()); + let app = http_server.make_app(); + (app, guard) +} + +pub async fn setup_grpc_server( + store_type: StorageType, + name: &str, +) -> (String, TestGuard, Arc, Arc) { + common_telemetry::init_default_ut_logging(); + + let datanode_port = get_port(); + let frontend_port = get_port(); + + let (mut opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); + let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port); + opts.rpc_addr = datanode_grpc_addr.clone(); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + instance.start().await.unwrap(); + + let datanode_grpc_addr = datanode_grpc_addr.clone(); + let runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(2) + .thread_name("grpc-handlers") + .build() + .unwrap(), + ); + + let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port); + let fe_opts = FrontendOptions { + mode: Mode::Standalone, + datanode_rpc_addr: datanode_grpc_addr.clone(), + grpc_options: Some(GrpcOptions { + addr: fe_grpc_addr.clone(), + runtime_size: 8, + }), + ..Default::default() + }; + + let datanode_grpc_server = Arc::new(GrpcServer::new( + instance.clone(), + instance.clone(), + runtime.clone(), + )); + + let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts) + .await + .unwrap(); + fe_instance.set_catalog_manager(instance.catalog_manager().clone()); + + let fe_instance_ref = Arc::new(fe_instance); + let fe_grpc_server = Arc::new(GrpcServer::new( + fe_instance_ref.clone(), + fe_instance_ref, + runtime, + )); + let grpc_server_clone = fe_grpc_server.clone(); + + let fe_grpc_addr_clone = fe_grpc_addr.clone(); + tokio::spawn(async move { + let addr = fe_grpc_addr_clone.parse::().unwrap(); + grpc_server_clone.start(addr).await.unwrap() + }); + + let dn_grpc_addr_clone = datanode_grpc_addr.clone(); + let dn_grpc_server_clone = datanode_grpc_server.clone(); + tokio::spawn(async move { + let addr = dn_grpc_addr_clone.parse::().unwrap(); + dn_grpc_server_clone.start(addr).await.unwrap() + }); + + // wait for GRPC server to start + tokio::time::sleep(Duration::from_secs(1)).await; + + (fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server) +} diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 0fc41addae..026f3b5321 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -11,13 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#![feature(assert_matches)] - -use std::assert_matches::assert_matches; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; - use api::v1::alter_expr::Kind; use api::v1::column::SemanticType; use api::v1::{ @@ -27,96 +20,56 @@ use api::v1::{ use client::admin::Admin; use client::{Client, Database, ObjectResult}; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_runtime::Builder as RuntimeBuilder; -use datanode::instance::Instance; -use frontend::frontend::FrontendOptions; -use frontend::grpc::GrpcOptions; -use servers::grpc::GrpcServer; use servers::server::Server; -use servers::Mode; -use tests_integration::test_util::{self, TestGuard}; +use tests_integration::test_util::{setup_grpc_server, StorageType}; -async fn setup_grpc_server( - name: &str, - datanode_port: usize, - frontend_port: usize, -) -> (String, TestGuard, Arc, Arc) { - common_telemetry::init_default_ut_logging(); +#[macro_export] +macro_rules! grpc_test { + ($service:ident, $($(#[$meta:meta])* $test:ident),*,) => { + paste::item! { + mod [] { + $( + #[tokio::test(flavor = "multi_thread")] + $( + #[$meta] + )* + async fn [< $test >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + let _ = $crate::grpc::$test(store_type).await; + } - let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name); - let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port); - opts.rpc_addr = datanode_grpc_addr.clone(); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - instance.start().await.unwrap(); - - let datanode_grpc_addr = datanode_grpc_addr.clone(); - let runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(2) - .thread_name("grpc-handlers") - .build() - .unwrap(), - ); - - let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port); - let fe_opts = FrontendOptions { - mode: Mode::Standalone, - datanode_rpc_addr: datanode_grpc_addr.clone(), - grpc_options: Some(GrpcOptions { - addr: fe_grpc_addr.clone(), - runtime_size: 8, - }), - ..Default::default() + } + )* + } + } }; - - let datanode_grpc_server = Arc::new(GrpcServer::new( - instance.clone(), - instance.clone(), - runtime.clone(), - )); - - let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts) - .await - .unwrap(); - fe_instance.set_catalog_manager(instance.catalog_manager().clone()); - - let fe_instance_ref = Arc::new(fe_instance); - let fe_grpc_server = Arc::new(GrpcServer::new( - fe_instance_ref.clone(), - fe_instance_ref, - runtime, - )); - let grpc_server_clone = fe_grpc_server.clone(); - - let fe_grpc_addr_clone = fe_grpc_addr.clone(); - tokio::spawn(async move { - let addr = fe_grpc_addr_clone.parse::().unwrap(); - grpc_server_clone.start(addr).await.unwrap() - }); - - let dn_grpc_addr_clone = datanode_grpc_addr.clone(); - let dn_grpc_server_clone = datanode_grpc_server.clone(); - tokio::spawn(async move { - let addr = dn_grpc_addr_clone.parse::().unwrap(); - dn_grpc_server_clone.start(addr).await.unwrap() - }); - - // wait for GRPC server to start - tokio::time::sleep(Duration::from_secs(1)).await; - - (fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server) } -#[tokio::test(flavor = "multi_thread")] -async fn test_auto_create_table() { - let (addr, _guard, fe_grpc_server, dn_grpc_server) = - setup_grpc_server("auto_create_table", 3992, 3993).await; +#[macro_export] +macro_rules! grpc_tests { + ($($service:ident),*) => { + $( + grpc_test!( + $service, + + test_auto_create_table, + test_insert_and_select, + ); + )* + }; +} + +pub async fn test_auto_create_table(store_type: StorageType) { + let (addr, mut guard, fe_grpc_server, dn_grpc_server) = + setup_grpc_server(store_type, "auto_create_table").await; let grpc_client = Client::with_urls(vec![addr]); let db = Database::new("greptime", grpc_client); insert_and_assert(&db).await; let _ = fe_grpc_server.shutdown().await; let _ = dn_grpc_server.shutdown().await; + guard.remove_all().await; } fn expect_data() -> (Column, Column, Column, Column) { @@ -173,11 +126,10 @@ fn expect_data() -> (Column, Column, Column, Column) { ) } -#[tokio::test(flavor = "multi_thread")] -async fn test_insert_and_select() { +pub async fn test_insert_and_select(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (addr, _guard, fe_grpc_server, dn_grpc_server) = - setup_grpc_server("insert_and_select", 3990, 3991).await; + let (addr, mut guard, fe_grpc_server, dn_grpc_server) = + setup_grpc_server(store_type, "insert_and_select").await; let grpc_client = Client::with_urls(vec![addr]); @@ -187,13 +139,13 @@ async fn test_insert_and_select() { // create let expr = testing_create_expr(); let result = admin.create(expr).await.unwrap(); - assert_matches!( + assert!(matches!( result.result, Some(admin_result::Result::Mutate(MutateResult { success: 1, failure: 0 })) - ); + )); //alter let add_column = ColumnDef { @@ -222,6 +174,7 @@ async fn test_insert_and_select() { let _ = fe_grpc_server.shutdown().await; let _ = dn_grpc_server.shutdown().await; + guard.remove_all().await; } async fn insert_and_assert(db: &Database) { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 712900a809..37c2ba7393 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -12,66 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use axum::http::StatusCode; -use axum::Router; use axum_test_helper::TestClient; -use datanode::instance::{Instance, InstanceRef}; -use datatypes::prelude::ConcreteDataType; -use frontend::frontend::FrontendOptions; -use frontend::instance::{FrontendInstance, Instance as FeInstance}; use serde_json::json; -use servers::http::{HttpOptions, HttpServer, JsonOutput, JsonResponse}; -use test_util::TestGuard; -use tests_integration::test_util; +use servers::http::{JsonOutput, JsonResponse}; +use tests_integration::test_util::{setup_test_app, setup_test_app_with_frontend, StorageType}; -async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { - let fe_opts = FrontendOptions::default(); - let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap(); - frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); - frontend_instance.set_script_handler(datanode_instance); - frontend_instance +#[macro_export] +macro_rules! http_test { + ($service:ident, $($(#[$meta:meta])* $test:ident),*,) => { + paste::item! { + mod [] { + $( + #[tokio::test(flavor = "multi_thread")] + $( + #[$meta] + )* + async fn [< $test >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + let _ = $crate::http::$test(store_type).await; + } + } + )* + } + } + }; } -async fn make_test_app(name: &str) -> (Router, TestGuard) { - let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - instance.start().await.unwrap(); - test_util::create_test_table( - instance.catalog_manager(), - instance.sql_handler(), - ConcreteDataType::timestamp_millis_datatype(), - ) - .await - .unwrap(); - let http_server = HttpServer::new(instance, HttpOptions::default()); - (http_server.make_app(), guard) +#[macro_export] +macro_rules! http_tests { + ($($service:ident),*) => { + $( + http_test!( + $service, + + test_sql_api, + test_metrics_api, + test_scripts_api, + ); + )* + }; } -async fn make_test_app_with_frontend(name: &str) -> (Router, TestGuard) { - let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - let mut frontend = build_frontend_instance(instance.clone()).await; - instance.start().await.unwrap(); - test_util::create_test_table( - frontend.catalog_manager().as_ref().unwrap(), - instance.sql_handler(), - ConcreteDataType::timestamp_millis_datatype(), - ) - .await - .unwrap(); - frontend.start().await.unwrap(); - let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default()); - http_server.set_script_handler(instance.clone()); - let app = http_server.make_app(); - (app, guard) -} - -#[tokio::test(flavor = "multi_thread")] -async fn test_sql_api() { +pub async fn test_sql_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, _guard) = make_test_app("sql_api").await; + let (app, mut guard) = setup_test_app(store_type, "sql_api").await; let client = TestClient::new(app); let res = client.get("/v1/sql").send().await; assert_eq!(res.status(), StatusCode::OK); @@ -174,13 +160,14 @@ async fn test_sql_api() { "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]} })).unwrap() ); + + guard.remove_all().await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_metrics_api() { +pub async fn test_metrics_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); common_telemetry::init_default_metrics_recorder(); - let (app, _guard) = make_test_app("metrics_api").await; + let (app, mut guard) = setup_test_app(store_type, "metrics_api").await; let client = TestClient::new(app); // Send a sql @@ -195,12 +182,12 @@ async fn test_metrics_api() { assert_eq!(res.status(), StatusCode::OK); let body = res.text().await; assert!(body.contains("datanode_handle_sql_elapsed")); + guard.remove_all().await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_scripts_api() { +pub async fn test_scripts_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, _guard) = make_test_app_with_frontend("script_api").await; + let (app, mut guard) = setup_test_app_with_frontend(store_type, "script_api").await; let client = TestClient::new(app); let res = client @@ -238,4 +225,6 @@ def test(n): "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]} })).unwrap() ); + + guard.remove_all().await; } diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs new file mode 100644 index 0000000000..b664e03ad9 --- /dev/null +++ b/tests-integration/tests/main.rs @@ -0,0 +1,21 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[macro_use] +mod grpc; +#[macro_use] +mod http; + +grpc_tests!(File, S3); +http_tests!(File, S3);