feat: supports s3 storage (#656)

* feat: adds s3 object storage configuration

* feat: adds s3 integration test

* chore: use map

* fix: forgot license header

* fix: checking if bucket is empty in test_on

* chore: address CR issues

* refactor: run s3 test with dotenv

* chore: randomize grpc port for test

* fix: README in tests-integration

* chore: remove redundant comments
This commit is contained in:
dennis zhuang
2022-12-01 10:59:14 +08:00
committed by GitHub
parent b0cbfa7ffb
commit 2e17e9c4b5
17 changed files with 508 additions and 179 deletions

4
.env.example Normal file
View File

@@ -0,0 +1,4 @@
# Settings for s3 test
GT_S3_BUCKET=S3 bucket
GT_S3_ACCESS_KEY_ID=S3 access key id
GT_S3_ACCESS_KEY=S3 secret access key

3
.gitignore vendored
View File

@@ -32,3 +32,6 @@ logs/
# Benchmark dataset
benchmarks/data
# dotenv
.env

14
Cargo.lock generated
View File

@@ -1984,6 +1984,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "dyn-clone"
version = "1.0.9"
@@ -3660,6 +3666,7 @@ dependencies = [
"opendal",
"tempdir",
"tokio",
"uuid",
]
[[package]]
@@ -6199,8 +6206,13 @@ dependencies = [
"common-telemetry",
"datanode",
"datatypes",
"dotenv",
"frontend",
"mito",
"object-store",
"once_cell",
"paste",
"rand 0.8.5",
"serde",
"serde_json",
"servers",
@@ -6209,6 +6221,7 @@ dependencies = [
"table",
"tempdir",
"tokio",
"uuid",
]
[[package]]
@@ -6985,6 +6998,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom 0.2.7",
"serde",
]
[[package]]

View File

@@ -170,6 +170,7 @@ mod tests {
ObjectStoreConfig::File { data_dir } => {
assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
};
}

View File

@@ -26,7 +26,15 @@ use crate::server::Services;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File { data_dir: String },
File {
data_dir: String,
},
S3 {
bucket: String,
root: String,
access_key_id: String,
secret_access_key: String,
},
}
impl Default for ObjectStoreConfig {

View File

@@ -18,6 +18,8 @@ use common_error::prelude::*;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
use crate::datanode::ObjectStoreConfig;
/// Business error of datanode.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -142,9 +144,9 @@ pub enum Error {
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
#[snafu(display("Failed to init backend, dir: {}, source: {}", dir, source))]
#[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))]
InitBackend {
dir: String,
config: ObjectStoreConfig,
source: object_store::Error,
backtrace: Backtrace,
},

View File

@@ -28,7 +28,8 @@ use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder;
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::s3::Builder as S3Builder;
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
@@ -187,35 +188,64 @@ impl Instance {
}
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
// TODO(dennis): supports other backend
let data_dir = util::normalize_dir(match store_config {
ObjectStoreConfig::File { data_dir } => data_dir,
});
let object_store = match store_config {
ObjectStoreConfig::File { data_dir } => new_fs_object_store(data_dir).await,
ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await,
};
object_store.map(|object_store| {
object_store
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
.layer(MetricsLayer)
.layer(LoggingLayer)
.layer(TracingLayer)
})
}
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let (root, secret_key, key_id, bucket) = match store_config {
ObjectStoreConfig::S3 {
bucket,
root,
access_key_id,
secret_access_key,
} => (root, secret_access_key, access_key_id, bucket),
_ => unreachable!(),
};
let root = util::normalize_dir(root);
info!("The s3 storage bucket is: {}, root is: {}", bucket, &root);
let accessor = S3Builder::default()
.root(&root)
.bucket(bucket)
.access_key_id(key_id)
.secret_access_key(secret_key)
.build()
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))
}
pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result<ObjectStore> {
let data_dir = util::normalize_dir(data_dir);
fs::create_dir_all(path::Path::new(&data_dir))
.context(error::CreateDirSnafu { dir: &data_dir })?;
info!("The storage directory is: {}", &data_dir);
info!("The file storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{}/.tmp/", data_dir);
let accessor = Builder::default()
let accessor = FsBuilder::default()
.root(&data_dir)
.atomic_write_dir(&atomic_write_dir)
.build()
.context(error::InitBackendSnafu { dir: &data_dir })?;
.context(error::InitBackendSnafu {
config: ObjectStoreConfig::File { data_dir },
})?;
let object_store = ObjectStore::new(accessor)
// Add retry
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
// Add metrics
.layer(MetricsLayer)
// Add logging
.layer(LoggingLayer)
// Add tracing
.layer(TracingLayer);
Ok(object_store)
Ok(ObjectStore::new(accessor))
}
/// Create metasrv client instance and spawn heartbeat loop.

View File

@@ -13,3 +13,4 @@ tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0"
common-telemetry = { path = "../common/telemetry" }
tempdir = "0.3"
uuid = { version = "1", features = ["serde", "v4"] }

View File

@@ -15,7 +15,8 @@
pub use opendal::raw::SeekableReader;
pub use opendal::{
layers, services, Error, ErrorKind, Layer, Object, ObjectLister, ObjectMetadata, ObjectMode,
Operator as ObjectStore,
Operator as ObjectStore, Result,
};
pub mod backend;
pub mod test_util;
pub mod util;

View File

@@ -0,0 +1,35 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{ObjectStore, Result};
pub struct TempFolder {
store: ObjectStore,
// The path under root.
path: String,
}
impl TempFolder {
pub fn new(store: &ObjectStore, path: &str) -> Self {
Self {
store: store.clone(),
path: path.to_string(),
}
}
pub async fn remove_all(&mut self) -> Result<()> {
let batch = self.store.batch();
batch.remove_all(&self.path).await
}
}

View File

@@ -17,6 +17,7 @@ use std::env;
use anyhow::Result;
use common_telemetry::logging;
use object_store::backend::{fs, s3};
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
use tempdir::TempDir;
@@ -110,15 +111,21 @@ async fn test_s3_backend() -> Result<()> {
if !bucket.is_empty() {
logging::info!("Running s3 test.");
let root = uuid::Uuid::new_v4().to_string();
let accessor = s3::Builder::default()
.root(&root)
.access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?)
.secret_access_key(&env::var("GT_S3_ACCESS_KEY")?)
.bucket(&bucket)
.build()?;
let store = ObjectStore::new(accessor);
let mut guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
test_object_list(&store).await?;
guard.remove_all().await?;
}
}

View File

@@ -15,8 +15,12 @@ common-runtime = { path = "../src/common/runtime" }
common-telemetry = { path = "../src/common/telemetry" }
datanode = { path = "../src/datanode" }
datatypes = { path = "../src/datatypes" }
dotenv = "0.15"
frontend = { path = "../src/frontend" }
mito = { path = "../src/mito", features = ["test"] }
object-store = { path = "../src/object-store" }
once_cell = "1.16"
rand = "0.8"
serde = "1.0"
serde_json = "1.0"
servers = { path = "../src/servers" }
@@ -25,3 +29,7 @@ sql = { path = "../src/sql" }
table = { path = "../src/table" }
tempdir = "0.3"
tokio = { version = "1.20", features = ["full"] }
uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]
paste = "1.0"

View File

@@ -0,0 +1,26 @@
## Setup
To run the integration test, please copy `.env.example` to `.env` in the project root folder and change the values on need.
Take `s3` for example. You need to set your S3 bucket, access key id and secret key:
```sh
# Settings for s3 test
GT_S3_BUCKET=S3 bucket
GT_S3_ACCESS_KEY_ID=S3 access key id
GT_S3_ACCESS_KEY=S3 secret access key
```
## Run
Execute the following command in the project root folder:
```
cargo test integration
```
Test s3 storage:
```
cargo test s3
```

View File

@@ -13,36 +13,144 @@
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_runtime::Builder as RuntimeBuilder;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::{Instance, InstanceRef};
use datanode::sql::SqlHandler;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use object_store::backend::s3;
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use once_cell::sync::OnceCell;
use rand::Rng;
use servers::grpc::GrpcServer;
use servers::http::{HttpOptions, HttpServer};
use servers::server::Server;
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use tempdir::TempDir;
static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
fn get_port() -> usize {
PORTS
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3500..3900)))
.fetch_add(1, Ordering::Relaxed)
}
pub enum StorageType {
S3,
File,
}
impl StorageType {
pub fn test_on(&self) -> bool {
let _ = dotenv::dotenv();
match self {
StorageType::File => true, // always test file
StorageType::S3 => {
if let Ok(b) = env::var("GT_S3_BUCKET") {
!b.is_empty()
} else {
false
}
}
}
}
}
fn get_test_store_config(
store_type: &StorageType,
name: &str,
) -> (ObjectStoreConfig, Option<TempDirGuard>) {
let _ = dotenv::dotenv();
match store_type {
StorageType::S3 => {
let root = uuid::Uuid::new_v4().to_string();
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let secret_key = env::var("GT_S3_ACCESS_KEY").unwrap();
let bucket = env::var("GT_S3_BUCKET").unwrap();
let accessor = s3::Builder::default()
.root(&root)
.access_key_id(&key_id)
.secret_access_key(&secret_key)
.bucket(&bucket)
.build()
.unwrap();
let config = ObjectStoreConfig::S3 {
root,
bucket,
access_key_id: key_id,
secret_access_key: secret_key,
};
let store = ObjectStore::new(accessor);
(config, Some(TempDirGuard::S3(TempFolder::new(&store, "/"))))
}
StorageType::File => {
let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap();
(
ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
Some(TempDirGuard::File(data_tmp_dir)),
)
}
}
}
enum TempDirGuard {
File(TempDir),
S3(TempFolder),
}
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
/// Only for test.
pub struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
data_tmp_dir: Option<TempDirGuard>,
}
pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
impl TestGuard {
pub async fn remove_all(&mut self) {
if let Some(TempDirGuard::S3(mut guard)) = self.data_tmp_dir.take() {
guard.remove_all().await.unwrap();
}
}
}
pub fn create_tmp_dir_and_datanode_opts(
store_type: StorageType,
name: &str,
) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap();
let (storage, data_tmp_dir) = get_test_store_config(&store_type, name);
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
storage,
mode: Mode::Standalone,
..Default::default()
};
@@ -50,7 +158,7 @@ pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGua
opts,
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
data_tmp_dir,
},
)
}
@@ -105,3 +213,121 @@ pub async fn create_test_table(
.unwrap();
Ok(())
}
async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
let fe_opts = FrontendOptions::default();
let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap();
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance
}
pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
let http_server = HttpServer::new(instance, HttpOptions::default());
(http_server.make_app(), guard)
}
pub async fn setup_test_app_with_frontend(
store_type: StorageType,
name: &str,
) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let mut frontend = build_frontend_instance(instance.clone()).await;
instance.start().await.unwrap();
create_test_table(
frontend.catalog_manager().as_ref().unwrap(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
frontend.start().await.unwrap();
let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default());
http_server.set_script_handler(instance.clone());
let app = http_server.make_app();
(app, guard)
}
pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,
) -> (String, TestGuard, Arc<GrpcServer>, Arc<GrpcServer>) {
common_telemetry::init_default_ut_logging();
let datanode_port = get_port();
let frontend_port = get_port();
let (mut opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port);
opts.rpc_addr = datanode_grpc_addr.clone();
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let datanode_grpc_addr = datanode_grpc_addr.clone();
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);
let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
let fe_opts = FrontendOptions {
mode: Mode::Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),
runtime_size: 8,
}),
..Default::default()
};
let datanode_grpc_server = Arc::new(GrpcServer::new(
instance.clone(),
instance.clone(),
runtime.clone(),
));
let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts)
.await
.unwrap();
fe_instance.set_catalog_manager(instance.catalog_manager().clone());
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
fe_instance_ref.clone(),
fe_instance_ref,
runtime,
));
let grpc_server_clone = fe_grpc_server.clone();
let fe_grpc_addr_clone = fe_grpc_addr.clone();
tokio::spawn(async move {
let addr = fe_grpc_addr_clone.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()
});
let dn_grpc_addr_clone = datanode_grpc_addr.clone();
let dn_grpc_server_clone = datanode_grpc_server.clone();
tokio::spawn(async move {
let addr = dn_grpc_addr_clone.parse::<SocketAddr>().unwrap();
dn_grpc_server_clone.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
(fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server)
}

View File

@@ -11,13 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
use std::assert_matches::assert_matches;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::alter_expr::Kind;
use api::v1::column::SemanticType;
use api::v1::{
@@ -27,96 +20,56 @@ use api::v1::{
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_runtime::Builder as RuntimeBuilder;
use datanode::instance::Instance;
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use servers::grpc::GrpcServer;
use servers::server::Server;
use servers::Mode;
use tests_integration::test_util::{self, TestGuard};
use tests_integration::test_util::{setup_grpc_server, StorageType};
async fn setup_grpc_server(
name: &str,
datanode_port: usize,
frontend_port: usize,
) -> (String, TestGuard, Arc<GrpcServer>, Arc<GrpcServer>) {
common_telemetry::init_default_ut_logging();
#[macro_export]
macro_rules! grpc_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<integration_grpc_ $service:lower _test>] {
$(
#[tokio::test(flavor = "multi_thread")]
$(
#[$meta]
)*
async fn [< $test >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
let _ = $crate::grpc::$test(store_type).await;
}
let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port);
opts.rpc_addr = datanode_grpc_addr.clone();
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let datanode_grpc_addr = datanode_grpc_addr.clone();
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);
let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
let fe_opts = FrontendOptions {
mode: Mode::Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),
runtime_size: 8,
}),
..Default::default()
}
)*
}
}
};
let datanode_grpc_server = Arc::new(GrpcServer::new(
instance.clone(),
instance.clone(),
runtime.clone(),
));
let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts)
.await
.unwrap();
fe_instance.set_catalog_manager(instance.catalog_manager().clone());
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
fe_instance_ref.clone(),
fe_instance_ref,
runtime,
));
let grpc_server_clone = fe_grpc_server.clone();
let fe_grpc_addr_clone = fe_grpc_addr.clone();
tokio::spawn(async move {
let addr = fe_grpc_addr_clone.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()
});
let dn_grpc_addr_clone = datanode_grpc_addr.clone();
let dn_grpc_server_clone = datanode_grpc_server.clone();
tokio::spawn(async move {
let addr = dn_grpc_addr_clone.parse::<SocketAddr>().unwrap();
dn_grpc_server_clone.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
(fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_auto_create_table() {
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server("auto_create_table", 3992, 3993).await;
#[macro_export]
macro_rules! grpc_tests {
($($service:ident),*) => {
$(
grpc_test!(
$service,
test_auto_create_table,
test_insert_and_select,
);
)*
};
}
pub async fn test_auto_create_table(store_type: StorageType) {
let (addr, mut guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server(store_type, "auto_create_table").await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client);
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
guard.remove_all().await;
}
fn expect_data() -> (Column, Column, Column, Column) {
@@ -173,11 +126,10 @@ fn expect_data() -> (Column, Column, Column, Column) {
)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_and_select() {
pub async fn test_insert_and_select(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server("insert_and_select", 3990, 3991).await;
let (addr, mut guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server(store_type, "insert_and_select").await;
let grpc_client = Client::with_urls(vec![addr]);
@@ -187,13 +139,13 @@ async fn test_insert_and_select() {
// create
let expr = testing_create_expr();
let result = admin.create(expr).await.unwrap();
assert_matches!(
assert!(matches!(
result.result,
Some(admin_result::Result::Mutate(MutateResult {
success: 1,
failure: 0
}))
);
));
//alter
let add_column = ColumnDef {
@@ -222,6 +174,7 @@ async fn test_insert_and_select() {
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
guard.remove_all().await;
}
async fn insert_and_assert(db: &Database) {

View File

@@ -12,66 +12,52 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use datanode::instance::{Instance, InstanceRef};
use datatypes::prelude::ConcreteDataType;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use serde_json::json;
use servers::http::{HttpOptions, HttpServer, JsonOutput, JsonResponse};
use test_util::TestGuard;
use tests_integration::test_util;
use servers::http::{JsonOutput, JsonResponse};
use tests_integration::test_util::{setup_test_app, setup_test_app_with_frontend, StorageType};
async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
let fe_opts = FrontendOptions::default();
let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap();
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance
#[macro_export]
macro_rules! http_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<integration_http_ $service:lower _test>] {
$(
#[tokio::test(flavor = "multi_thread")]
$(
#[$meta]
)*
async fn [< $test >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
let _ = $crate::http::$test(store_type).await;
}
}
)*
}
}
};
}
async fn make_test_app(name: &str) -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
let http_server = HttpServer::new(instance, HttpOptions::default());
(http_server.make_app(), guard)
#[macro_export]
macro_rules! http_tests {
($($service:ident),*) => {
$(
http_test!(
$service,
test_sql_api,
test_metrics_api,
test_scripts_api,
);
)*
};
}
async fn make_test_app_with_frontend(name: &str) -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let mut frontend = build_frontend_instance(instance.clone()).await;
instance.start().await.unwrap();
test_util::create_test_table(
frontend.catalog_manager().as_ref().unwrap(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
frontend.start().await.unwrap();
let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default());
http_server.set_script_handler(instance.clone());
let app = http_server.make_app();
(app, guard)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_api() {
pub async fn test_sql_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, _guard) = make_test_app("sql_api").await;
let (app, mut guard) = setup_test_app(store_type, "sql_api").await;
let client = TestClient::new(app);
let res = client.get("/v1/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
@@ -174,13 +160,14 @@ async fn test_sql_api() {
"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}
})).unwrap()
);
guard.remove_all().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_metrics_api() {
pub async fn test_metrics_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
let (app, _guard) = make_test_app("metrics_api").await;
let (app, mut guard) = setup_test_app(store_type, "metrics_api").await;
let client = TestClient::new(app);
// Send a sql
@@ -195,12 +182,12 @@ async fn test_metrics_api() {
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert!(body.contains("datanode_handle_sql_elapsed"));
guard.remove_all().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_scripts_api() {
pub async fn test_scripts_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, _guard) = make_test_app_with_frontend("script_api").await;
let (app, mut guard) = setup_test_app_with_frontend(store_type, "script_api").await;
let client = TestClient::new(app);
let res = client
@@ -238,4 +225,6 @@ def test(n):
"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}
})).unwrap()
);
guard.remove_all().await;
}

View File

@@ -0,0 +1,21 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[macro_use]
mod grpc;
#[macro_use]
mod http;
grpc_tests!(File, S3);
http_tests!(File, S3);