build: bump rust edition to 2024 (#6920)

* bump edition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* gen keyword

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* lifetime and env var

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* one more gen fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* lifetime of temporaries in tail expressions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clippy nested if

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clippy let and return

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-09-07 19:37:18 -07:00
committed by GitHub
parent 658d07bfc8
commit c9377e7c5a
1014 changed files with 6268 additions and 5540 deletions

View File

@@ -28,30 +28,30 @@ use cache::{
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::process_manager::ProcessManager;
use client::client_manager::NodeClients;
use client::Client;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::DatanodeId;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::runtime::BuilderBuild;
use common_runtime::Builder as RuntimeBuilder;
use common_runtime::runtime::BuilderBuild;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::Instance as FeInstance;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use hyper_util::rt::TokioIo;
use meta_client::client::MetaClientBuilder;
@@ -60,9 +60,9 @@ use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use object_store::config::ObjectStoreConfig;
use rand::Rng;
use servers::grpc::GrpcOptions;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::ServerHandlers;
use tempfile::TempDir;
@@ -72,8 +72,8 @@ use tower::service_fn;
use uuid::Uuid;
use crate::test_util::{
self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageType,
TestGuard, PEER_PLACEHOLDER_ADDR,
self, FileDirGuard, PEER_PLACEHOLDER_ADDR, StorageType, TestGuard, create_datanode_opts,
create_tmp_dir_and_datanode_opts,
};
pub struct GreptimeDbCluster {

View File

@@ -14,9 +14,9 @@
mod flight;
use api::v1::QueryRequest;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use common_query::OutputData;
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
@@ -51,10 +51,10 @@ mod test {
use api::v1::query_request::Query;
use api::v1::region::QueryRequest as RegionQueryRequest;
use api::v1::{
alter_table_expr, AddColumn, AddColumns, AlterTableExpr, Column, ColumnDataType,
ColumnDataTypeExtension, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DdlRequest,
DeleteRequest, DeleteRequests, DropTableExpr, InsertRequest, InsertRequests, QueryRequest,
SemanticType, VectorTypeExtension,
AddColumn, AddColumns, AlterTableExpr, Column, ColumnDataType, ColumnDataTypeExtension,
ColumnDef, CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests,
DropTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType,
VectorTypeExtension, alter_table_expr,
};
use client::OutputData;
use common_catalog::consts::MITO_ENGINE;
@@ -76,10 +76,10 @@ mod test {
use super::*;
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::tests;
use crate::tests::test_util::{
both_instances_cases, distributed, execute_sql, standalone, MockInstance,
};
use crate::tests::MockDistributedInstance;
use crate::tests::test_util::{
MockInstance, both_instances_cases, distributed, execute_sql, standalone,
};
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {
@@ -371,18 +371,20 @@ mod test {
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
assert!(instance
.frontend()
.catalog_manager()
.table(
"greptime",
"database_created_through_grpc",
"table_created_through_grpc",
None,
)
.await
.unwrap()
.is_none());
assert!(
instance
.frontend()
.catalog_manager()
.table(
"greptime",
"database_created_through_grpc",
"table_created_through_grpc",
None,
)
.await
.unwrap()
.is_none()
);
}
#[tokio::test(flavor = "multi_thread")]
@@ -497,7 +499,9 @@ CREATE TABLE {table_name} (
let instance = standalone.fe_instance();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, c JSON, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b, c))");
let sql = format!(
"CREATE TABLE {table_name} (a INT, b STRING, c JSON, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b, c))"
);
create_table(instance, sql).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
@@ -1075,7 +1079,9 @@ CREATE TABLE {table_name} (
let instance = standalone.fe_instance();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))");
let sql = format!(
"CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))"
);
create_table(instance, sql).await;
let insert = InsertRequest {

View File

@@ -40,7 +40,7 @@ mod test {
use crate::cluster::GreptimeDbClusterBuilder;
use crate::grpc::query_and_expect;
use crate::test_util::{setup_grpc_server, StorageType};
use crate::test_util::{StorageType, setup_grpc_server};
use crate::tests::test_util::MockInstance;
#[tokio::test(flavor = "multi_thread")]

View File

@@ -21,11 +21,11 @@ mod test {
use rstest::rstest;
use rstest_reuse::apply;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::InfluxdbLineProtocolHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use crate::tests::test_util::{both_instances_cases, distributed, standalone, MockInstance};
use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone};
#[apply(both_instances_cases)]
async fn test_put_influxdb_lines_without_time_column(instance: Arc<dyn MockInstance>) {

View File

@@ -16,8 +16,8 @@
mod tests {
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use api::v1::region::QueryRequest;
use client::OutputData;
@@ -278,13 +278,15 @@ mod tests {
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
assert!(instance
.frontend()
.catalog_manager()
.table("greptime", "public", "demo", None)
.await
.unwrap()
.is_none())
assert!(
instance
.frontend()
.catalog_manager()
.table("greptime", "public", "demo", None)
.await
.unwrap()
.is_none()
)
}
#[tokio::test(flavor = "multi_thread")]
@@ -395,7 +397,7 @@ mod tests {
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
return Err(Error::NotSupported {
feat: "Database operations".to_owned(),
})
});
}
_ => {}
}

View File

@@ -21,8 +21,8 @@ mod tests {
use frontend::instance::Instance;
use itertools::Itertools;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use crate::standalone::GreptimeDbStandaloneBuilder;

View File

@@ -16,7 +16,7 @@
mod test {
use std::sync::Arc;
use client::{OutputData, DEFAULT_CATALOG_NAME};
use client::{DEFAULT_CATALOG_NAME, OutputData};
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
@@ -26,12 +26,12 @@ mod test {
};
use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
metric, Gauge, Histogram, HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics,
ScopeMetrics,
Gauge, Histogram, HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics,
ScopeMetrics, metric,
};
use otel_arrow_rust::proto::opentelemetry::resource::v1::Resource;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpenTelemetryProtocolHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use crate::standalone::GreptimeDbStandaloneBuilder;
@@ -59,15 +59,17 @@ mod test {
let db = "otlp";
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
&format!("CREATE DATABASE IF NOT EXISTS {db}"),
ctx.clone(),
)
.await
.first()
.unwrap()
.is_ok());
assert!(
SqlQueryHandler::do_query(
instance.as_ref(),
&format!("CREATE DATABASE IF NOT EXISTS {db}"),
ctx.clone(),
)
.await
.first()
.unwrap()
.is_ok()
);
let resp = instance.metrics(req, ctx.clone()).await;
assert!(resp.is_ok());

View File

@@ -26,8 +26,8 @@ mod tests {
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::prom_store;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::PromStoreProtocolHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use crate::standalone::GreptimeDbStandaloneBuilder;
@@ -98,15 +98,17 @@ mod tests {
}
let ctx = Arc::new(ctx);
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
"CREATE DATABASE IF NOT EXISTS prometheus",
ctx.clone(),
)
.await
.first()
.unwrap()
.is_ok());
assert!(
SqlQueryHandler::do_query(
instance.as_ref(),
"CREATE DATABASE IF NOT EXISTS prometheus",
ctx.clone(),
)
.await
.first()
.unwrap()
.is_ok()
);
let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
instance

View File

@@ -32,16 +32,16 @@ use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::SlowQueryOptions;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
@@ -54,7 +54,7 @@ use servers::grpc::GrpcOptions;
use servers::server::ServerHandlers;
use snafu::ResultExt;
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
use crate::test_util::{self, StorageType, TestGuard, create_tmp_dir_and_datanode_opts};
pub struct GreptimeDbStandalone {
pub frontend: Arc<Frontend>,

View File

@@ -27,17 +27,17 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_test_util::temp_dir::{TempDir, create_temp_dir};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, StorageConfig};
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use object_store::ObjectStore;
use object_store::config::{
AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
};
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
@@ -83,10 +83,10 @@ impl StorageType {
pub fn build_storage_types_based_on_env() -> Vec<StorageType> {
let mut storage_types = Vec::with_capacity(4);
storage_types.push(StorageType::File);
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
storage_types.push(StorageType::S3);
}
if let Ok(bucket) = env::var("GT_S3_BUCKET")
&& !bucket.is_empty()
{
storage_types.push(StorageType::S3);
}
if env::var("GT_OSS_BUCKET").is_ok() {
storage_types.push(StorageType::Oss);
@@ -309,10 +309,9 @@ impl Drop for TestGuard {
| TempDirGuard::Oss(guard)
| TempDirGuard::Azblob(guard)
| TempDirGuard::Gcs(guard) = guard.0
&& let Err(e) = guard.remove_all().await
{
if let Err(e) = guard.remove_all().await {
errors.push(e);
}
errors.push(e);
}
}
if errors.is_empty() {
@@ -483,7 +482,9 @@ pub async fn setup_test_prom_app_with_frontend(
store_type: StorageType,
name: &str,
) -> (Router, TestGuard) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = setup_standalone_instance(name, store_type).await;

View File

@@ -13,16 +13,16 @@
// limitations under the License.
use std::assert_matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use client::DEFAULT_CATALOG_NAME;
use common_query::{Output, OutputData};
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::instance::Instance;
use itertools::Itertools;
use rand::rngs::ThreadRng;
use rand::Rng;
use rand::rngs::ThreadRng;
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;

View File

@@ -16,7 +16,7 @@ use std::assert_matches::assert_matches;
use std::env;
use std::sync::Arc;
use client::{OutputData, DEFAULT_SCHEMA_NAME};
use client::{DEFAULT_SCHEMA_NAME, OutputData};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use common_recordbatch::util;
@@ -32,9 +32,10 @@ use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use crate::tests::test_util::{
both_instances_cases, both_instances_cases_with_custom_storages, check_unordered_output_stream,
distributed, distributed_with_multiple_object_stores, find_testing_resource, prepare_path,
standalone, standalone_instance_case, standalone_with_multiple_object_stores, MockInstance,
MockInstance, both_instances_cases, both_instances_cases_with_custom_storages,
check_unordered_output_stream, distributed, distributed_with_multiple_object_stores,
find_testing_resource, prepare_path, standalone, standalone_instance_case,
standalone_with_multiple_object_stores,
};
#[apply(both_instances_cases)]
@@ -180,7 +181,9 @@ async fn test_extra_external_table_options(instance: Arc<dyn MockInstance>) {
#[apply(both_instances_cases)]
async fn test_show_create_external_table(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let fe_instance = instance.frontend();
let format = "csv";
@@ -675,7 +678,9 @@ async fn test_execute_external_create_with_invalid_ts(instance: Arc<dyn MockInst
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "parquet";
@@ -748,7 +753,9 @@ async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstanc
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_orc(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "orc";
@@ -831,7 +838,9 @@ async fn test_execute_query_external_table_orc(instance: Arc<dyn MockInstance>)
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_orc_with_schema(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "orc";
@@ -887,7 +896,9 @@ async fn test_execute_query_external_table_orc_with_schema(instance: Arc<dyn Moc
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "csv";
@@ -940,7 +951,9 @@ async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>)
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "json";
@@ -1000,7 +1013,9 @@ async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>)
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_json_with_schema(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "json";
@@ -1069,7 +1084,9 @@ async fn test_execute_query_external_table_json_with_schema(instance: Arc<dyn Mo
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_json_type_cast(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "json";
@@ -1142,7 +1159,9 @@ async fn test_execute_query_external_table_json_type_cast(instance: Arc<dyn Mock
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_json_default_ts_column(instance: Arc<dyn MockInstance>) {
std::env::set_var("TZ", "UTC");
unsafe {
std::env::set_var("TZ", "UTC");
}
let instance = instance.frontend();
let format = "json";
@@ -1596,39 +1615,42 @@ async fn test_delete(instance: Arc<dyn MockInstance>) {
#[apply(both_instances_cases)]
async fn test_execute_copy_to_s3(instance: Arc<dyn MockInstance>) {
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = instance.frontend();
if let Ok(bucket) = env::var("GT_S3_BUCKET")
&& !bucket.is_empty()
{
let instance = instance.frontend();
// setups
assert!(matches!(execute_sql(
// setups
assert!(matches!(execute_sql(
&instance,
"create table demo(host string, cpu double, memory double, ts timestamp time index);",
)
.await.data, OutputData::AffectedRows(0)));
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let key = env::var("GT_S3_ACCESS_KEY").unwrap();
let region = env::var("GT_S3_REGION").unwrap();
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let key = env::var("GT_S3_ACCESS_KEY").unwrap();
let region = env::var("GT_S3_REGION").unwrap();
let root = uuid::Uuid::new_v4().to_string();
let root = uuid::Uuid::new_v4().to_string();
// exports
let copy_to_stmt = format!("Copy demo TO 's3://{}/{}/export/demo.parquet' CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')", bucket, root, key_id, key, region);
// exports
let copy_to_stmt = format!(
"Copy demo TO 's3://{}/{}/export/demo.parquet' CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')",
bucket, root, key_id, key, region
);
let output = execute_sql(&instance, &copy_to_stmt).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
}
let output = execute_sql(&instance, &copy_to_stmt).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
}
}
@@ -1637,67 +1659,71 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
use common_telemetry::info;
common_telemetry::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = instance.frontend();
if let Ok(bucket) = env::var("GT_S3_BUCKET")
&& !bucket.is_empty()
{
let instance = instance.frontend();
// setups
assert!(matches!(execute_sql(
// setups
assert!(matches!(execute_sql(
&instance,
"create table demo(host string, cpu double, memory double, ts timestamp time index);",
)
.await.data, OutputData::AffectedRows(0)));
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
// export
let root = uuid::Uuid::new_v4().to_string();
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let key = env::var("GT_S3_ACCESS_KEY").unwrap();
let region = env::var("GT_S3_REGION").unwrap();
// export
let root = uuid::Uuid::new_v4().to_string();
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let key = env::var("GT_S3_ACCESS_KEY").unwrap();
let region = env::var("GT_S3_REGION").unwrap();
let copy_to_stmt = format!("Copy demo TO 's3://{}/{}/export/demo.parquet' CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')", bucket, root, key_id, key, region);
let copy_to_stmt = format!(
"Copy demo TO 's3://{}/{}/export/demo.parquet' CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')",
bucket, root, key_id, key, region
);
let output = execute_sql(&instance, &copy_to_stmt).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(&instance, &copy_to_stmt).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
struct Test<'a> {
sql: &'a str,
table_name: &'a str,
}
let tests = [
Test {
sql: &format!(
"Copy with_filename FROM 's3://{}/{}/export/demo.parquet'",
bucket, root
),
table_name: "with_filename",
},
Test {
sql: &format!("Copy with_path FROM 's3://{}/{}/export/'", bucket, root),
table_name: "with_path",
},
Test {
sql: &format!(
"Copy with_pattern FROM 's3://{}/{}/export/' WITH (PATTERN = 'demo.*')",
bucket, root
),
table_name: "with_pattern",
},
];
struct Test<'a> {
sql: &'a str,
table_name: &'a str,
}
let tests = [
Test {
sql: &format!(
"Copy with_filename FROM 's3://{}/{}/export/demo.parquet'",
bucket, root
),
table_name: "with_filename",
},
Test {
sql: &format!("Copy with_path FROM 's3://{}/{}/export/'", bucket, root),
table_name: "with_path",
},
Test {
sql: &format!(
"Copy with_pattern FROM 's3://{}/{}/export/' WITH (PATTERN = 'demo.*')",
bucket, root
),
table_name: "with_pattern",
},
];
for test in tests {
// import
assert!(matches!(
for test in tests {
// import
assert!(matches!(
execute_sql(
&instance,
&format!(
@@ -1709,30 +1735,29 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
.data,
OutputData::AffectedRows(0)
));
let sql = format!(
"{} CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')",
test.sql, key_id, key, region,
);
info!("Running sql: {}", sql);
let sql = format!(
"{} CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')",
test.sql, key_id, key, region,
);
info!("Running sql: {}", sql);
let output = execute_sql(&instance, &sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(&instance, &sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(
&instance,
&format!("select * from {} order by ts", test.table_name),
)
.await
.data;
let expected = "\
let output = execute_sql(
&instance,
&format!("select * from {} order by ts", test.table_name),
)
.await
.data;
let expected = "\
+-------+------+--------+---------------------+
| host | cpu | memory | ts |
+-------+------+--------+---------------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+";
check_output_stream(output, expected).await;
}
check_output_stream(output, expected).await;
}
}
}

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use client::{OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::reconciliation::manager::ReconciliationManagerRef;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, OutputData};
use common_meta::reconciliation::ResolveStrategy;
use common_meta::reconciliation::manager::ReconciliationManagerRef;
use common_procedure::ProcedureManagerRef;
use common_recordbatch::util::collect_batches;
use common_test_util::recordbatch::check_output_stream;
@@ -22,8 +22,8 @@ use table::table_reference::TableReference;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
dump_kvbackend, execute_sql, restore_kvbackend, try_execute_sql, wait_procedure, MockInstance,
MockInstanceBuilder, RebuildableMockInstance, TestContext,
MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend,
execute_sql, restore_kvbackend, try_execute_sql, wait_procedure,
};
const CREATE_MONITOR_TABLE_SQL: &str = r#"

View File

@@ -18,10 +18,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use client::OutputData;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::{BatchPutRequest, DeleteRangeRequest, RangeRequest};
use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use common_meta::rpc::KeyValue;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef};
use common_meta::rpc::store::{BatchPutRequest, DeleteRangeRequest, RangeRequest};
use common_procedure::{ProcedureId, ProcedureManagerRef, watcher};
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::tracing::info;
@@ -41,7 +41,7 @@ use session::context::{QueryContext, QueryContextRef};
use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
use crate::test_util::StorageType;
use crate::tests::{create_distributed_instance, MockDistributedInstance};
use crate::tests::{MockDistributedInstance, create_distributed_instance};
#[async_trait::async_trait]
pub(crate) trait RebuildableMockInstance: MockInstance {