diff --git a/.cargo/config.toml b/.cargo/config.toml index bf125a3eda..3bd4874fad 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -12,5 +12,9 @@ rustflags = [ "-Wclippy::print_stdout", "-Wclippy::print_stderr", "-Wclippy::implicit_clone", - "-Aclippy::items_after_test_module", + + # It seems clippy has made a false positive decision here when upgrading rust toolchain to + # nightly-2023-08-07, we do need it to be borrowed mutably. + # Allow it for now; try disallow it when the toolchain is upgraded in the future. + "-Aclippy::needless_pass_by_ref_mut", ] diff --git a/.github/workflows/apidoc.yml b/.github/workflows/apidoc.yml index 89ac4f25b3..05653d9911 100644 --- a/.github/workflows/apidoc.yml +++ b/.github/workflows/apidoc.yml @@ -13,7 +13,7 @@ on: name: Build API docs env: - RUST_TOOLCHAIN: nightly-2023-05-03 + RUST_TOOLCHAIN: nightly-2023-08-07 jobs: apidoc: diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index bf27e6d002..f249087f41 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -25,7 +25,7 @@ on: name: CI env: - RUST_TOOLCHAIN: nightly-2023-05-03 + RUST_TOOLCHAIN: nightly-2023-08-07 jobs: typos: @@ -62,11 +62,11 @@ jobs: - uses: actions/checkout@v3 - uses: dtolnay/rust-toolchain@master with: - toolchain: ${{ env.RUST_TOOLCHAIN }} + toolchain: stable - name: Rust Cache uses: Swatinem/rust-cache@v2 - name: Install taplo - run: cargo install taplo-cli --version ^0.8 --locked + run: cargo +stable install taplo-cli --version ^0.8 --locked - name: Run taplo run: taplo format --check diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7b1e2448ff..3b3450b814 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -82,7 +82,7 @@ on: # Use env variables to control all the release process. env: # The arguments of building greptime. - RUST_TOOLCHAIN: nightly-2023-05-03 + RUST_TOOLCHAIN: nightly-2023-08-07 CARGO_PROFILE: nightly # Controls whether to run tests, include unit-test, integration-test and sqlness. diff --git a/Cargo.toml b/Cargo.toml index 00936e93df..0ebc528d33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ members = [ "tests-integration", "tests/runner", ] +resolver = "2" [workspace.package] version = "0.3.2" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 423d5fc911..e67b8a0caa 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2023-05-03" +channel = "nightly-2023-08-07" diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 2bcf476497..30c0d1a795 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -153,7 +153,12 @@ impl InformationSchemaColumnsBuilder { .table_names(&catalog_name, &schema_name) .await? { - let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue }; + let Some(table) = catalog_manager + .table(&catalog_name, &schema_name, &table_name) + .await? + else { + continue; + }; let keys = &table.table_info().meta.primary_key_indices; let schema = table.schema(); for (idx, column) in schema.column_schemas().iter().enumerate() { diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 2382e9fcf0..36b6f198ad 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -151,7 +151,12 @@ impl InformationSchemaTablesBuilder { .table_names(&catalog_name, &schema_name) .await? { - let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue }; + let Some(table) = catalog_manager + .table(&catalog_name, &schema_name, &table_name) + .await? + else { + continue; + }; let table_info = table.table_info(); self.add_table( &catalog_name, diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 02d8dfdd1d..d8c46ac8f3 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -218,13 +218,27 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec let primary_key_columns = build_primary_key_columns(entry_type, key); let mut columns_values = HashMap::with_capacity(6); - columns_values.extend(primary_key_columns.into_iter()); + columns_values.extend(primary_key_columns); let _ = columns_values.insert( "value".to_string(), @@ -523,7 +523,7 @@ mod tests { EngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), + Arc::new(NoopLogStore), object_store.clone(), noop_compaction_scheduler, ) @@ -574,9 +574,15 @@ mod tests { assert_eq!(batch.num_rows(), 1); let row = batch.rows().next().unwrap(); - let Value::UInt8(entry_type) = row[0] else { unreachable!() }; - let Value::Binary(key) = row[1].clone() else { unreachable!() }; - let Value::Binary(value) = row[3].clone() else { unreachable!() }; + let Value::UInt8(entry_type) = row[0] else { + unreachable!() + }; + let Value::Binary(key) = row[1].clone() else { + unreachable!() + }; + let Value::Binary(value) = row[3].clone() else { + unreachable!() + }; let entry = decode_system_catalog(Some(entry_type), Some(&*key), Some(&*value)).unwrap(); let expected = Entry::Table(TableEntry { catalog_name: DEFAULT_CATALOG_NAME.to_string(), diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 075234da3e..dc6005cd49 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -313,17 +313,11 @@ mod tests { .await .is_ok()); assert_eq!( - HashSet::::from_iter( - vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter() - ), - HashSet::from_iter( - components - .catalog_manager - .catalog_names() - .await - .unwrap() - .into_iter() - ) + HashSet::::from_iter(vec![ + DEFAULT_CATALOG_NAME.to_string(), + catalog_name.clone() + ]), + HashSet::from_iter(components.catalog_manager.catalog_names().await.unwrap()) ); let table_to_register = components diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 279a8d0086..ad5e6840a6 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -243,8 +243,10 @@ mod tests { ..Default::default() }; - let Options::Datanode(options) = - cmd.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() }; + let Options::Datanode(options) = cmd.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!(Some(42), options.node_id); @@ -397,10 +399,10 @@ mod tests { let env_prefix = "DATANODE_UT"; temp_env::with_vars( - vec![ + [ ( // storage.manifest.gc_duration = 9s - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "manifest".to_uppercase(), @@ -411,7 +413,7 @@ mod tests { ), ( // storage.compaction.max_purge_tasks = 99 - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "compaction".to_uppercase(), @@ -422,7 +424,7 @@ mod tests { ), ( // meta_client_options.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003 - vec![ + [ env_prefix.to_string(), "meta_client_options".to_uppercase(), "metasrv_addrs".to_uppercase(), @@ -440,7 +442,10 @@ mod tests { }; let Options::Datanode(opts) = - command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; + command.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; // Should be read from env, env > default values. assert_eq!( diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 4d25c17247..bce8496db3 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -257,8 +257,10 @@ mod tests { ..Default::default() }; - let Options::Frontend(opts) = - command.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() }; + let Options::Frontend(opts) = command.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; assert_eq!(opts.http_options.as_ref().unwrap().addr, "127.0.0.1:1234"); assert_eq!( @@ -323,8 +325,10 @@ mod tests { ..Default::default() }; - let Options::Frontend(fe_opts) = - command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; + let Options::Frontend(fe_opts) = command.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; assert_eq!(Mode::Distributed, fe_opts.mode); assert_eq!( "127.0.0.1:4000".to_string(), @@ -404,10 +408,10 @@ mod tests { let env_prefix = "FRONTEND_UT"; temp_env::with_vars( - vec![ + [ ( // mysql_options.addr = 127.0.0.1:14002 - vec![ + [ env_prefix.to_string(), "mysql_options".to_uppercase(), "addr".to_uppercase(), @@ -417,7 +421,7 @@ mod tests { ), ( // mysql_options.runtime_size = 11 - vec![ + [ env_prefix.to_string(), "mysql_options".to_uppercase(), "runtime_size".to_uppercase(), @@ -427,7 +431,7 @@ mod tests { ), ( // http_options.addr = 127.0.0.1:24000 - vec![ + [ env_prefix.to_string(), "http_options".to_uppercase(), "addr".to_uppercase(), @@ -437,7 +441,7 @@ mod tests { ), ( // meta_client_options.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003 - vec![ + [ env_prefix.to_string(), "meta_client_options".to_uppercase(), "metasrv_addrs".to_uppercase(), @@ -458,8 +462,10 @@ mod tests { log_dir: None, log_level: Some("error".to_string()), }; - let Options::Frontend(fe_opts) = - command.load_options(top_level_opts).unwrap() else {unreachable!()}; + let Options::Frontend(fe_opts) = command.load_options(top_level_opts).unwrap() + else { + unreachable!() + }; // Should be read from env, env > default values. assert_eq!(fe_opts.mysql_options.as_ref().unwrap().runtime_size, 11); diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 9b1ac37824..3490ba7e4b 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -187,8 +187,10 @@ mod tests { ..Default::default() }; - let Options::Metasrv(options) = - cmd.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() }; + let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); assert_eq!(SelectorType::LoadBased, options.selector); @@ -216,8 +218,10 @@ mod tests { ..Default::default() }; - let Options::Metasrv(options) = - cmd.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() }; + let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2379".to_string(), options.store_addr); @@ -269,20 +273,20 @@ mod tests { let env_prefix = "METASRV_UT"; temp_env::with_vars( - vec![ + [ ( // bind_addr = 127.0.0.1:14002 - vec![env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP), + [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP), Some("127.0.0.1:14002"), ), ( // server_addr = 127.0.0.1:13002 - vec![env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP), + [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP), Some("127.0.0.1:13002"), ), ( // http_options.addr = 127.0.0.1:24000 - vec![ + [ env_prefix.to_string(), "http_options".to_uppercase(), "addr".to_uppercase(), @@ -300,7 +304,10 @@ mod tests { }; let Options::Metasrv(opts) = - command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; + command.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; // Should be read from env, env > default values. assert_eq!(opts.bind_addr, "127.0.0.1:14002"); diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index d633b851c7..9dc4add536 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -158,10 +158,10 @@ mod tests { let env_prefix = "DATANODE_UT"; temp_env::with_vars( // The following environment variables will be used to override the values in the config file. - vec![ + [ ( // storage.manifest.checkpoint_margin = 99 - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "manifest".to_uppercase(), @@ -172,7 +172,7 @@ mod tests { ), ( // storage.type = S3 - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "type".to_uppercase(), @@ -182,7 +182,7 @@ mod tests { ), ( // storage.bucket = mybucket - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "bucket".to_uppercase(), @@ -192,7 +192,7 @@ mod tests { ), ( // storage.manifest.gc_duration = 42s - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "manifest".to_uppercase(), @@ -203,7 +203,7 @@ mod tests { ), ( // storage.manifest.checkpoint_on_startup = true - vec![ + [ env_prefix.to_string(), "storage".to_uppercase(), "manifest".to_uppercase(), @@ -214,7 +214,7 @@ mod tests { ), ( // wal.dir = /other/wal/dir - vec![ + [ env_prefix.to_string(), "wal".to_uppercase(), "dir".to_uppercase(), @@ -224,7 +224,7 @@ mod tests { ), ( // meta_client_options.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003 - vec![ + [ env_prefix.to_string(), "meta_client_options".to_uppercase(), "metasrv_addrs".to_uppercase(), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f607a3b796..076ce37848 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -423,7 +423,10 @@ mod tests { ..Default::default() }; - let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; + let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap() + else { + unreachable!() + }; let fe_opts = options.fe_opts; let dn_opts = options.dn_opts; let logging_opts = options.logging; @@ -484,7 +487,8 @@ mod tests { log_dir: Some("/tmp/greptimedb/test/logs".to_string()), log_level: Some("debug".to_string()), }) - .unwrap() else { + .unwrap() + else { unreachable!() }; @@ -508,10 +512,10 @@ mod tests { let env_prefix = "STANDALONE_UT"; temp_env::with_vars( - vec![ + [ ( // logging.dir = /other/log/dir - vec![ + [ env_prefix.to_string(), "logging".to_uppercase(), "dir".to_uppercase(), @@ -521,7 +525,7 @@ mod tests { ), ( // logging.level = info - vec![ + [ env_prefix.to_string(), "logging".to_uppercase(), "level".to_uppercase(), @@ -531,7 +535,7 @@ mod tests { ), ( // http_options.addr = 127.0.0.1:24000 - vec![ + [ env_prefix.to_string(), "http_options".to_uppercase(), "addr".to_uppercase(), @@ -552,8 +556,10 @@ mod tests { log_dir: None, log_level: None, }; - let Options::Standalone(opts) = - command.load_options(top_level_opts).unwrap() else {unreachable!()}; + let Options::Standalone(opts) = command.load_options(top_level_opts).unwrap() + else { + unreachable!() + }; // Should be read from env, env > default values. assert_eq!(opts.logging.dir, "/other/log/dir"); diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 350fdd9cad..245e24e643 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -198,9 +198,8 @@ mod tests { #[tokio::test] async fn test_orc_infer_schema() { - let orc = OrcFormat::default(); let store = test_store(&test_data_root()); - let schema = orc.infer_schema(&store, "test.orc").await.unwrap(); + let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap(); let formatted: Vec<_> = format_schema(schema); assert_eq!( diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 51f2fd4a90..e95b74bb65 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -181,7 +181,7 @@ async fn test_parquet_exec() { .await; assert_batches_eq!( - vec![ + [ "+-----+-------+", "| num | str |", "+-----+-------+", @@ -198,8 +198,7 @@ async fn test_parquet_exec() { async fn test_orc_opener() { let root = test_util::get_data_dir("tests/orc").display().to_string(); let store = test_store(&root); - let orc = OrcFormat::default(); - let schema = orc.infer_schema(&store, "test.orc").await.unwrap(); + let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap(); let schema = Arc::new(schema); let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None); diff --git a/src/common/function-macro/Cargo.toml b/src/common/function-macro/Cargo.toml index 2b3a19844a..f76e6f8f57 100644 --- a/src/common/function-macro/Cargo.toml +++ b/src/common/function-macro/Cargo.toml @@ -10,7 +10,7 @@ proc-macro = true [dependencies] backtrace = "0.3" common-telemetry = { path = "../telemetry" } -proc-macro2 = "1.0" +proc-macro2 = "1.0.66" quote = "1.0" syn = "1.0" diff --git a/src/common/function-macro/src/lib.rs b/src/common/function-macro/src/lib.rs index 790b6ac5b0..c0cc670454 100644 --- a/src/common/function-macro/src/lib.rs +++ b/src/common/function-macro/src/lib.rs @@ -146,7 +146,9 @@ pub fn print_caller(args: TokenStream, input: TokenStream) -> TokenStream { .expect("Expected an ident!") .to_string(); if ident == "depth" { - let Lit::Int(i) = &name_value.lit else { panic!("Expected 'depth' to be a valid int!") }; + let Lit::Int(i) = &name_value.lit else { + panic!("Expected 'depth' to be a valid int!") + }; depth = i.base10_parse::().expect("Invalid 'depth' value"); break; } diff --git a/src/common/function/src/scalars/function_registry.rs b/src/common/function/src/scalars/function_registry.rs index dd9de0b651..1fd536982e 100644 --- a/src/common/function/src/scalars/function_registry.rs +++ b/src/common/function/src/scalars/function_registry.rs @@ -89,7 +89,7 @@ mod tests { #[test] fn test_function_registry() { let registry = FunctionRegistry::default(); - let func = Arc::new(TestAndFunction::default()); + let func = Arc::new(TestAndFunction); assert!(registry.get_function("test_and").is_none()); assert!(registry.functions().is_empty()); diff --git a/src/common/function/src/scalars/math.rs b/src/common/function/src/scalars/math.rs index f15d782bf7..9329bc9448 100644 --- a/src/common/function/src/scalars/math.rs +++ b/src/common/function/src/scalars/math.rs @@ -26,7 +26,7 @@ pub(crate) struct MathFunction; impl MathFunction { pub fn register(registry: &FunctionRegistry) { - registry.register(Arc::new(PowFunction::default())); - registry.register(Arc::new(RateFunction::default())) + registry.register(Arc::new(PowFunction)); + registry.register(Arc::new(RateFunction)) } } diff --git a/src/common/function/src/scalars/math/pow.rs b/src/common/function/src/scalars/math/pow.rs index c6a554d596..5a1922a4fc 100644 --- a/src/common/function/src/scalars/math/pow.rs +++ b/src/common/function/src/scalars/math/pow.rs @@ -85,7 +85,7 @@ mod tests { use super::*; #[test] fn test_pow_function() { - let pow = PowFunction::default(); + let pow = PowFunction; assert_eq!("pow", pow.name()); assert_eq!( diff --git a/src/common/function/src/scalars/math/rate.rs b/src/common/function/src/scalars/math/rate.rs index 9d668d39d5..6696ea1c00 100644 --- a/src/common/function/src/scalars/math/rate.rs +++ b/src/common/function/src/scalars/math/rate.rs @@ -80,7 +80,7 @@ mod tests { use super::*; #[test] fn test_rate_function() { - let rate = RateFunction::default(); + let rate = RateFunction; assert_eq!("prom_rate", rate.name()); assert_eq!( ConcreteDataType::float64_datatype(), diff --git a/src/common/function/src/scalars/numpy.rs b/src/common/function/src/scalars/numpy.rs index 67c42ffb56..7e1c1145bd 100644 --- a/src/common/function/src/scalars/numpy.rs +++ b/src/common/function/src/scalars/numpy.rs @@ -25,6 +25,6 @@ pub(crate) struct NumpyFunction; impl NumpyFunction { pub fn register(registry: &FunctionRegistry) { - registry.register(Arc::new(ClipFunction::default())); + registry.register(Arc::new(ClipFunction)); } } diff --git a/src/common/function/src/scalars/numpy/clip.rs b/src/common/function/src/scalars/numpy/clip.rs index a8d098e5cd..cdcc4d5622 100644 --- a/src/common/function/src/scalars/numpy/clip.rs +++ b/src/common/function/src/scalars/numpy/clip.rs @@ -156,7 +156,7 @@ mod tests { #[test] fn test_clip_signature() { - let clip = ClipFunction::default(); + let clip = ClipFunction; assert_eq!("clip", clip.name()); assert_eq!( @@ -202,8 +202,6 @@ mod tests { #[test] fn test_clip_fn_signed() { - let clip = ClipFunction::default(); - // eval with signed integers let args: Vec = vec![ Arc::new(Int32Vector::from_values(0..10)), @@ -217,7 +215,9 @@ mod tests { )), ]; - let vector = clip.eval(FunctionContext::default(), &args).unwrap(); + let vector = ClipFunction + .eval(FunctionContext::default(), &args) + .unwrap(); assert_eq!(10, vector.len()); // clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6] @@ -234,8 +234,6 @@ mod tests { #[test] fn test_clip_fn_unsigned() { - let clip = ClipFunction::default(); - // eval with unsigned integers let args: Vec = vec![ Arc::new(UInt8Vector::from_values(0..10)), @@ -249,7 +247,9 @@ mod tests { )), ]; - let vector = clip.eval(FunctionContext::default(), &args).unwrap(); + let vector = ClipFunction + .eval(FunctionContext::default(), &args) + .unwrap(); assert_eq!(10, vector.len()); // clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6] @@ -266,8 +266,6 @@ mod tests { #[test] fn test_clip_fn_float() { - let clip = ClipFunction::default(); - // eval with floats let args: Vec = vec![ Arc::new(Int8Vector::from_values(0..10)), @@ -281,7 +279,9 @@ mod tests { )), ]; - let vector = clip.eval(FunctionContext::default(), &args).unwrap(); + let vector = ClipFunction + .eval(FunctionContext::default(), &args) + .unwrap(); assert_eq!(10, vector.len()); // clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6] diff --git a/src/common/function/src/scalars/numpy/interp.rs b/src/common/function/src/scalars/numpy/interp.rs index 9d20fc3f9c..ed7345a601 100644 --- a/src/common/function/src/scalars/numpy/interp.rs +++ b/src/common/function/src/scalars/numpy/interp.rs @@ -291,7 +291,7 @@ mod tests { ]; let vector = interp(&args).unwrap(); assert_eq!(4, vector.len()); - let res = vec![3.0, 3.0, 2.5, 0.0]; + let res = [3.0, 3.0, 2.5, 0.0]; for (i, item) in res.iter().enumerate().take(vector.len()) { assert!(matches!(vector.get(i),Value::Float64(v) if v==*item)); } @@ -305,7 +305,7 @@ mod tests { let left = vec![-1]; let right = vec![2]; - let expect = vec![-1.0, 3.0, 2.5, 2.0, 0.0, 2.0]; + let expect = [-1.0, 3.0, 2.5, 2.0, 0.0, 2.0]; let args: Vec = vec![ Arc::new(Float64Vector::from_vec(x)), diff --git a/src/common/function/src/scalars/timestamp.rs b/src/common/function/src/scalars/timestamp.rs index 475e1c150f..eb0e1afb1c 100644 --- a/src/common/function/src/scalars/timestamp.rs +++ b/src/common/function/src/scalars/timestamp.rs @@ -22,6 +22,6 @@ pub(crate) struct TimestampFunction; impl TimestampFunction { pub fn register(registry: &FunctionRegistry) { - registry.register(Arc::new(ToUnixtimeFunction::default())); + registry.register(Arc::new(ToUnixtimeFunction)); } } diff --git a/src/common/function/src/scalars/timestamp/to_unixtime.rs b/src/common/function/src/scalars/timestamp/to_unixtime.rs index e07adc793d..9970565d34 100644 --- a/src/common/function/src/scalars/timestamp/to_unixtime.rs +++ b/src/common/function/src/scalars/timestamp/to_unixtime.rs @@ -162,7 +162,7 @@ mod tests { #[test] fn test_string_to_unixtime() { - let f = ToUnixtimeFunction::default(); + let f = ToUnixtimeFunction; assert_eq!("to_unixtime", f.name()); assert_eq!( ConcreteDataType::int64_datatype(), @@ -190,7 +190,7 @@ mod tests { Some("2022-06-30T23:59:60Z"), Some("invalid_time_stamp"), ]; - let results = vec![Some(1677652502), None, Some(1656633600), None]; + let results = [Some(1677652502), None, Some(1656633600), None]; let args: Vec = vec![Arc::new(StringVector::from(times.clone()))]; let vector = f.eval(FunctionContext::default(), &args).unwrap(); assert_eq!(4, vector.len()); @@ -211,7 +211,7 @@ mod tests { #[test] fn test_int_to_unixtime() { - let f = ToUnixtimeFunction::default(); + let f = ToUnixtimeFunction; assert_eq!("to_unixtime", f.name()); assert_eq!( ConcreteDataType::int64_datatype(), @@ -234,7 +234,7 @@ mod tests { )); let times = vec![Some(3_i64), None, Some(5_i64), None]; - let results = vec![Some(3), None, Some(5), None]; + let results = [Some(3), None, Some(5), None]; let args: Vec = vec![Arc::new(Int64Vector::from(times.clone()))]; let vector = f.eval(FunctionContext::default(), &args).unwrap(); assert_eq!(4, vector.len()); @@ -255,7 +255,7 @@ mod tests { #[test] fn test_timestamp_to_unixtime() { - let f = ToUnixtimeFunction::default(); + let f = ToUnixtimeFunction; assert_eq!("to_unixtime", f.name()); assert_eq!( ConcreteDataType::int64_datatype(), @@ -283,7 +283,7 @@ mod tests { Some(TimestampSecond::new(42)), None, ]; - let results = vec![Some(123), None, Some(42), None]; + let results = [Some(123), None, Some(42), None]; let ts_vector: TimestampSecondVector = build_vector_from_slice(×); let args: Vec = vec![Arc::new(ts_vector)]; let vector = f.eval(FunctionContext::default(), &args).unwrap(); diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index cd0ac48935..bf332720a2 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -77,7 +77,7 @@ mod tests { #[test] fn test_create_udf() { - let f = Arc::new(TestAndFunction::default()); + let f = Arc::new(TestAndFunction); let args: Vec = vec![ Arc::new(ConstantVector::new( diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 09928d1efc..9ecaaa6ae8 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -249,7 +249,9 @@ mod test { ) .unwrap(); assert_eq!(flight_data.len(), 3); - let [d1, d2, d3] = flight_data.as_slice() else { unreachable!() }; + let [d1, d2, d3] = flight_data.as_slice() else { + unreachable!() + }; let decoder = &mut FlightDecoder::default(); assert!(decoder.schema.is_none()); @@ -263,19 +265,25 @@ mod test { let message = decoder.try_decode(d1.clone()).unwrap(); assert!(matches!(message, FlightMessage::Schema(_))); - let FlightMessage::Schema(decoded_schema) = message else { unreachable!() }; + let FlightMessage::Schema(decoded_schema) = message else { + unreachable!() + }; assert_eq!(decoded_schema, schema); let _ = decoder.schema.as_ref().unwrap(); let message = decoder.try_decode(d2.clone()).unwrap(); assert!(matches!(message, FlightMessage::Recordbatch(_))); - let FlightMessage::Recordbatch(actual_batch) = message else { unreachable!() }; + let FlightMessage::Recordbatch(actual_batch) = message else { + unreachable!() + }; assert_eq!(actual_batch, batch1); let message = decoder.try_decode(d3.clone()).unwrap(); assert!(matches!(message, FlightMessage::Recordbatch(_))); - let FlightMessage::Recordbatch(actual_batch) = message else { unreachable!() }; + let FlightMessage::Recordbatch(actual_batch) = message else { + unreachable!() + }; assert_eq!(actual_batch, batch2); } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index f4d4954d08..3f621800ba 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -123,7 +123,8 @@ impl DatanodeTableManager { let Some(curr) = resp .prev_kv .map(|kv| DatanodeTableValue::try_from_raw_value(kv.value)) - .transpose()? else { + .transpose()? + else { return UnexpectedSnafu { err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, val: {val:?}"), }.fail(); diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index c7650931f2..a86b007cb3 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -81,7 +81,7 @@ impl TableInfoManager { let Some(curr) = curr else { return UnexpectedSnafu { err_msg: format!("compare_and_put expect None but failed with current value None, table_id: {table_id}, table_info: {table_info:?}"), - }.fail() + }.fail(); }; ensure!( &curr.table_info == table_info, diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 19dbd9c772..3b729ab20e 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -163,10 +163,11 @@ impl TableNameManager { let Some(curr) = result .prev_kv .map(|x| TableNameValue::try_from_raw_value(x.value)) - .transpose()? else { + .transpose()? + else { return UnexpectedSnafu { err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, value: {value:?}"), - }.fail() + }.fail(); }; ensure!( curr.table_id == table_id, @@ -226,7 +227,8 @@ impl TableNameManager { // name, then the table must not exist at the first place. return TableNotExistSnafu { table_name: TableName::from(key).to_string(), - }.fail(); + } + .fail(); }; ensure!( diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs index 05f9caaf4b..c69ed96cd2 100644 --- a/src/common/meta/src/key/table_region.rs +++ b/src/common/meta/src/key/table_region.rs @@ -91,7 +91,7 @@ impl TableRegionManager { let Some(curr) = curr else { return UnexpectedSnafu { err_msg: format!("compare_and_put expect None but failed with current value None, table_id: {table_id}, region_distribution: {region_distribution:?}"), - }.fail() + }.fail(); }; ensure!( &curr.region_distribution == region_distribution, diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 719403e441..02560add57 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -236,7 +236,7 @@ impl KvBackend for MemoryKvBackend { start: key, end: range_end, }; - kvs.drain_filter(|key, _| range.contains(key)) + kvs.extract_if(|key, _| range.contains(key)) .map(Into::into) .collect::>() }; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 70fe566b5c..e393ec5145 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(btree_drain_filter)] +#![feature(btree_extract_if)] pub mod error; pub mod heartbeat; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index d32efdb494..0848c7029d 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -474,7 +474,10 @@ impl ProcedureManager for LocalManager { if message.parent_id.is_none() { // This is the root procedure. We only submit the root procedure as it will // submit sub-procedures to the manager. - let Some(loaded_procedure) = self.manager_ctx.load_one_procedure_from_message(*procedure_id, message) else { + let Some(loaded_procedure) = self + .manager_ctx + .load_one_procedure_from_message(*procedure_id, message) + else { // Try to load other procedures. continue; }; diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index 5744b4f486..e5abddc4d2 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -44,7 +44,9 @@ pub fn build_filter_from_timestamp( ts_col_name: &str, time_range: Option<&TimestampRange>, ) -> Option { - let Some(time_range) = time_range else { return None; }; + let Some(time_range) = time_range else { + return None; + }; let ts_col_expr = DfExpr::Column(Column { relation: None, name: ts_col_name.to_string(), diff --git a/src/common/substrait/src/extension_serializer.rs b/src/common/substrait/src/extension_serializer.rs index 926912230f..813c525843 100644 --- a/src/common/substrait/src/extension_serializer.rs +++ b/src/common/substrait/src/extension_serializer.rs @@ -60,7 +60,7 @@ impl SerializerRegistry for ExtensionSerializer { name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( "EmptyMetric should not be serialized".to_string(), )), - name if name == "MergeScan" => Ok(vec![]), + "MergeScan" => Ok(vec![]), other => Err(DataFusionError::NotImplemented(format!( "Serizlize logical plan for {}", other diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index aa3f9a5706..b77d11aaff 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -50,7 +50,8 @@ impl HeartbeatResponseHandler for CloseRegionHandler { } async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { - let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else { + let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() + else { unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index a038b85254..c2bbc6848d 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -50,7 +50,8 @@ impl HeartbeatResponseHandler for OpenRegionHandler { } async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { - let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else { + let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() + else { unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4a8f0f5f97..9c654f9b3f 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -131,7 +131,7 @@ impl Instance { state: "region_alive_keepers is not provided when building heartbeat task", })?; let handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler::default()), + Arc::new(ParseMailboxMessageHandler), Arc::new(OpenRegionHandler::new( catalog_manager.clone(), engine_manager.clone(), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index f02ff845ea..86f291b1c0 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -721,7 +721,9 @@ mod test { let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql( "INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)", - ) else { unreachable!() }; + ) else { + unreachable!() + }; let output = instance .execute_sql(stmt, QueryContext::arc()) .await @@ -729,7 +731,9 @@ mod test { assert!(matches!(output, Output::AffectedRows(1))); let output = exec_selection(instance, "SELECT * FROM my_database.my_table").await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---+---+---+---------------------+---+ @@ -855,7 +859,9 @@ mod test { assert!(matches!(output, Output::AffectedRows(3))); let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+-------+-----+ @@ -925,7 +931,9 @@ mod test { assert!(matches!(output, Output::AffectedRows(1))); let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+-------+------+ @@ -965,7 +973,9 @@ mod test { )), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatch = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+-------+------+--------+ diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index b5e7fb3b7d..57b90cddb6 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -474,7 +474,9 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(host) ) engine=mito with(regions=1);"#; - let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() }; + let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { + unreachable!() + }; let output = instance .inner() .execute_sql(stmt, QueryContext::arc()) @@ -491,7 +493,9 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(host) ) engine=mito with(regions=1);"#; - let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() }; + let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { + unreachable!() + }; let output = instance .inner() .execute_sql(stmt, QueryContext::arc()) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index d0c4e23d66..3de50a8e6f 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -154,7 +154,9 @@ async fn test_open_region_handler() { ])); let instruction = open_region_instruction(); - let Instruction::OpenRegion(region_ident) = instruction.clone() else { unreachable!() }; + let Instruction::OpenRegion(region_ident) = instruction.clone() else { + unreachable!() + }; let table_ident = ®ion_ident.table_ident; let table = prepare_table(instance.inner()).await; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index c7cb67a7fa..56137ae393 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -278,23 +278,19 @@ impl_new_concrete_type_functions!( impl ConcreteDataType { pub fn timestamp_second_datatype() -> Self { - ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType::default())) + ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)) } pub fn timestamp_millisecond_datatype() -> Self { - ConcreteDataType::Timestamp(TimestampType::Millisecond( - TimestampMillisecondType::default(), - )) + ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)) } pub fn timestamp_microsecond_datatype() -> Self { - ConcreteDataType::Timestamp(TimestampType::Microsecond( - TimestampMicrosecondType::default(), - )) + ConcreteDataType::Timestamp(TimestampType::Microsecond(TimestampMicrosecondType)) } pub fn timestamp_nanosecond_datatype() -> Self { - ConcreteDataType::Timestamp(TimestampType::Nanosecond(TimestampNanosecondType::default())) + ConcreteDataType::Timestamp(TimestampType::Nanosecond(TimestampNanosecondType)) } /// Returns the time data type with `TimeUnit`. @@ -323,17 +319,15 @@ impl ConcreteDataType { } pub fn interval_month_day_nano_datatype() -> Self { - ConcreteDataType::Interval(IntervalType::MonthDayNano( - IntervalMonthDayNanoType::default(), - )) + ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)) } pub fn interval_year_month_datatype() -> Self { - ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType::default())) + ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType)) } pub fn interval_day_time_datatype() -> Self { - ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType::default())) + ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType)) } pub fn timestamp_datatype(unit: TimeUnit) -> Self { diff --git a/src/datatypes/src/types/time_type.rs b/src/datatypes/src/types/time_type.rs index 30548b73bd..8f60473546 100644 --- a/src/datatypes/src/types/time_type.rs +++ b/src/datatypes/src/types/time_type.rs @@ -202,19 +202,19 @@ mod tests { fn test_as_arrow_datatype() { assert_eq!( ArrowDataType::Time32(ArrowTimeUnit::Second), - TimeSecondType::default().as_arrow_type() + TimeSecondType.as_arrow_type() ); assert_eq!( ArrowDataType::Time32(ArrowTimeUnit::Millisecond), - TimeMillisecondType::default().as_arrow_type() + TimeMillisecondType.as_arrow_type() ); assert_eq!( ArrowDataType::Time64(ArrowTimeUnit::Microsecond), - TimeMicrosecondType::default().as_arrow_type() + TimeMicrosecondType.as_arrow_type() ); assert_eq!( ArrowDataType::Time64(ArrowTimeUnit::Nanosecond), - TimeNanosecondType::default().as_arrow_type() + TimeNanosecondType.as_arrow_type() ); } } diff --git a/src/datatypes/src/types/timestamp_type.rs b/src/datatypes/src/types/timestamp_type.rs index 3afc2e3f49..d0458073ea 100644 --- a/src/datatypes/src/types/timestamp_type.rs +++ b/src/datatypes/src/types/timestamp_type.rs @@ -66,16 +66,10 @@ impl TryFrom for TimestampType { /// - 9: nanosecond fn try_from(value: u64) -> Result { match value { - SECOND_VARIATION => Ok(TimestampType::Second(TimestampSecondType::default())), - MILLISECOND_VARIATION => Ok(TimestampType::Millisecond( - TimestampMillisecondType::default(), - )), - MICROSECOND_VARIATION => Ok(TimestampType::Microsecond( - TimestampMicrosecondType::default(), - )), - NANOSECOND_VARIATION => { - Ok(TimestampType::Nanosecond(TimestampNanosecondType::default())) - } + SECOND_VARIATION => Ok(TimestampType::Second(TimestampSecondType)), + MILLISECOND_VARIATION => Ok(TimestampType::Millisecond(TimestampMillisecondType)), + MICROSECOND_VARIATION => Ok(TimestampType::Microsecond(TimestampMicrosecondType)), + NANOSECOND_VARIATION => Ok(TimestampType::Nanosecond(TimestampNanosecondType)), _ => InvalidTimestampPrecisionSnafu { precision: value }.fail(), } } diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 5b9ed3b442..cd53838d62 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -342,7 +342,7 @@ mod tests { fn test_binary_vector_builder() { let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]); - let mut builder = BinaryType::default().create_mutable_vector(3); + let mut builder = BinaryType.create_mutable_vector(3); builder.push_value_ref(ValueRef::Binary("hello".as_bytes())); assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index a1714b2549..6aa483cf37 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -360,7 +360,7 @@ mod tests { fn test_boolean_vector_builder() { let input = BooleanVector::from_slice(&[true, false, true]); - let mut builder = BooleanType::default().create_mutable_vector(3); + let mut builder = BooleanType.create_mutable_vector(3); builder.push_value_ref(ValueRef::Boolean(true)); assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/datatypes/src/vectors/date.rs b/src/datatypes/src/vectors/date.rs index dda3d8777c..31d81e2d3b 100644 --- a/src/datatypes/src/vectors/date.rs +++ b/src/datatypes/src/vectors/date.rs @@ -68,7 +68,7 @@ mod tests { fn test_date_vector_builder() { let input = DateVector::from_slice([1, 2, 3]); - let mut builder = DateType::default().create_mutable_vector(3); + let mut builder = DateType.create_mutable_vector(3); builder.push_value_ref(ValueRef::Date(Date::new(5))); assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/datatypes/src/vectors/datetime.rs b/src/datatypes/src/vectors/datetime.rs index e30dda3372..55cd901561 100644 --- a/src/datatypes/src/vectors/datetime.rs +++ b/src/datatypes/src/vectors/datetime.rs @@ -88,7 +88,7 @@ mod tests { DateTime::new(3), ]); - let mut builder = DateTimeType::default().create_mutable_vector(3); + let mut builder = DateTimeType.create_mutable_vector(3); builder.push_value_ref(ValueRef::DateTime(DateTime::new(5))); assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index c10a1b3dc5..88c6185eef 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -58,7 +58,7 @@ impl From for NullVector { impl Vector for NullVector { fn data_type(&self) -> ConcreteDataType { - ConcreteDataType::Null(NullType::default()) + ConcreteDataType::Null(NullType) } fn vector_type_name(&self) -> String { @@ -269,7 +269,7 @@ mod tests { #[test] fn test_null_vector_builder() { - let mut builder = NullType::default().create_mutable_vector(3); + let mut builder = NullType.create_mutable_vector(3); builder.push_null(); assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 54ba053c08..1c7228974e 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -407,18 +407,26 @@ impl CatalogManager for FrontendCatalogManager { } let key = TableNameKey::new(catalog, schema, table_name); - let Some(table_name_value) = self.table_metadata_manager + let Some(table_name_value) = self + .table_metadata_manager .table_name_manager() .get(key) .await - .context(TableMetadataManagerSnafu)? else { return Ok(None) }; + .context(TableMetadataManagerSnafu)? + else { + return Ok(None); + }; let table_id = table_name_value.table_id(); - let Some(table_info_value) = self.table_metadata_manager + let Some(table_info_value) = self + .table_metadata_manager .table_info_manager() .get(table_id) .await - .context(TableMetadataManagerSnafu)? else { return Ok(None) }; + .context(TableMetadataManagerSnafu)? + else { + return Ok(None); + }; let table_info = Arc::new( table_info_value .table_info diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index cf528578b5..23c0869876 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -254,7 +254,7 @@ pub(crate) fn column_schemas_to_defs( column_schemas .iter() - .zip(column_datatypes.into_iter()) + .zip(column_datatypes) .map(|(schema, datatype)| { Ok(api::v1::ColumnDef { name: schema.name.clone(), @@ -341,7 +341,9 @@ mod tests { .pop() .unwrap(); - let Statement::CreateTable(create_table) = stmt else { unreachable!() }; + let Statement::CreateTable(create_table) = stmt else { + unreachable!() + }; let expr = create_to_expr(&create_table, QueryContext::arc()).unwrap(); assert_eq!("3days", expr.table_options.get("ttl").unwrap()); assert_eq!( diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index cd937bfc73..433e97ea0a 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -44,7 +44,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { // TODO(weny): considers introducing a macro - let Some((meta, Instruction::InvalidateTableCache(table_ident))) = ctx.incoming_message.take() else { + let Some((meta, Instruction::InvalidateTableCache(table_ident))) = + ctx.incoming_message.take() + else { unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1d835d1814..f2c81a6e6f 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -193,7 +193,7 @@ impl Instance { plugins.insert::(statement_executor.clone()); let handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler::default()), + Arc::new(ParseMailboxMessageHandler), Arc::new(InvalidateTableCacheHandler::new( meta_backend, partition_manager, diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index d7814023e2..1ea44b2a9f 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -300,7 +300,9 @@ impl DistInstance { .iter() .filter(|route| { route.region_routes.iter().any(|r| { - let Some(n) = region_number else { return true; }; + let Some(n) = region_number else { + return true; + }; n == r.region.id.region_number() }) }) diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index e07242df9d..9127e4d8ad 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -67,7 +67,9 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult ServerResult { - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream) .await .context(error::CollectRecordbatchSnafu)?; diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index f64c038f85..148f3cf80e 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -315,7 +315,7 @@ impl StatementExecutor { let columns_values = fields .iter() .cloned() - .zip(vectors.into_iter()) + .zip(vectors) .collect::>(); pending.push(table.insert(InsertRequest { diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index cf40a8c204..493148157e 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -218,7 +218,9 @@ impl Table for DistTable { .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let Output::AffectedRows(rows) = output else { unreachable!() }; + let Output::AffectedRows(rows) = output else { + unreachable!() + }; Ok(rows) } } diff --git a/src/frontend/src/table/delete.rs b/src/frontend/src/table/delete.rs index 6936165df6..c8a3a709ec 100644 --- a/src/frontend/src/table/delete.rs +++ b/src/frontend/src/table/delete.rs @@ -30,7 +30,7 @@ impl DistTable { let regions = requests.iter().map(|x| x.region_number).collect::>(); let instances = self.find_datanode_instances(®ions).await?; - let results = future::try_join_all(instances.into_iter().zip(requests.into_iter()).map( + let results = future::try_join_all(instances.into_iter().zip(requests).map( |(instance, request)| { common_runtime::spawn_write(async move { instance diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 510b1379b1..55b0015185 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -63,7 +63,9 @@ impl DatanodeInstance { .logical_plan(substrait_plan.to_vec(), None) .await .context(error::RequestDatanodeSnafu)?; - let Output::RecordBatches(record_batches) = result else { unreachable!() }; + let Output::RecordBatches(record_batches) = result else { + unreachable!() + }; Ok(record_batches) } diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 330911e70a..97b6f6ef66 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -96,12 +96,12 @@ impl LogStore for NoopLogStore { let _ = data; let _ = id; let _ = ns; - EntryImpl::default() + EntryImpl } fn namespace(&self, id: NamespaceId) -> Self::Namespace { let _ = id; - NamespaceImpl::default() + NamespaceImpl } async fn obsolete( @@ -121,27 +121,21 @@ mod tests { #[test] fn test_mock_entry() { - let e = EntryImpl::default(); + let e = EntryImpl; assert_eq!(0, e.data().len()); assert_eq!(0, e.id()); } #[tokio::test] async fn test_noop_logstore() { - let store = NoopLogStore::default(); - let e = store.entry("".as_bytes(), 1, NamespaceImpl::default()); + let store = NoopLogStore; + let e = store.entry("".as_bytes(), 1, NamespaceImpl); let _ = store.append(e.clone()).await.unwrap(); assert!(store.append_batch(vec![e]).await.is_ok()); - store - .create_namespace(&NamespaceImpl::default()) - .await - .unwrap(); + store.create_namespace(&NamespaceImpl).await.unwrap(); assert_eq!(0, store.list_namespaces().await.unwrap().len()); - store - .delete_namespace(&NamespaceImpl::default()) - .await - .unwrap(); - assert_eq!(NamespaceImpl::default(), store.namespace(0)); - store.obsolete(NamespaceImpl::default(), 1).await.unwrap(); + store.delete_namespace(&NamespaceImpl).await.unwrap(); + assert_eq!(NamespaceImpl, store.namespace(0)); + store.obsolete(NamespaceImpl, 1).await.unwrap(); } } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 611a0d5b18..9546827297 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -481,11 +481,11 @@ mod tests { #[tokio::test] async fn test_handler_name() { let group = HeartbeatHandlerGroup::default(); - group.add_handler(ResponseHeaderHandler::default()).await; - group.add_handler(CheckLeaderHandler::default()).await; - group.add_handler(OnLeaderStartHandler::default()).await; - group.add_handler(CollectStatsHandler::default()).await; - group.add_handler(MailboxHandler::default()).await; + group.add_handler(ResponseHeaderHandler).await; + group.add_handler(CheckLeaderHandler).await; + group.add_handler(OnLeaderStartHandler).await; + group.add_handler(CollectStatsHandler).await; + group.add_handler(MailboxHandler).await; group.add_handler(PersistStatsHandler::default()).await; let handlers = group.handlers.read().await; diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 72bbf14113..2939d8790f 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -73,7 +73,9 @@ impl HeartbeatHandler for RegionFailureHandler { .await; } - let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; + let Some(stat) = acc.stat.as_ref() else { + return Ok(()); + }; let heartbeat = DatanodeHeartbeat { region_idents: stat diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index da2ecc8a12..1f3fb63257 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -88,8 +88,12 @@ impl FailureDetectRunner { } async fn start_with(&mut self, failure_detectors: Arc) { - let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else { return }; - let Some(mut control_rx) = self.control_rx.take() else { return }; + let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else { + return; + }; + let Some(mut control_rx) = self.control_rx.take() else { + return; + }; let container = failure_detectors.clone(); let receiver_handle = common_runtime::spawn_bg(async move { @@ -218,9 +222,7 @@ impl FailureDetectorContainer { &self, ident: RegionIdent, ) -> impl DerefMut + '_ { - self.0 - .entry(ident) - .or_insert_with(PhiAccrualFailureDetector::default) + self.0.entry(ident).or_default() } pub(crate) fn iter(&self) -> Box + '_> { diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 814d7f4828..779ab2ab91 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -38,7 +38,9 @@ impl HeartbeatHandler for KeepLeaseHandler { _acc: &mut HeartbeatAccumulator, ) -> Result<()> { let HeartbeatRequest { header, peer, .. } = req; - let Some(peer) = &peer else { return Ok(()); }; + let Some(peer) = &peer else { + return Ok(()); + }; let key = LeaseKey { cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index db8d73362a..e22bcec34a 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -82,13 +82,12 @@ impl HeartbeatHandler for PersistStatsHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { - let Some(current_stat) = acc.stat.take() else { return Ok(()) }; + let Some(current_stat) = acc.stat.take() else { + return Ok(()); + }; let key = current_stat.stat_key(); - let mut entry = self - .stats_cache - .entry(key) - .or_insert_with(EpochStats::default); + let mut entry = self.stats_cache.entry(key).or_default(); let key: Vec = key.into(); let epoch_stats = entry.value_mut(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index ef8ed381da..058e6a760f 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -43,7 +43,9 @@ impl HeartbeatHandler for RegionLeaseHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { - let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; + let Some(stat) = acc.stat.as_ref() else { + return Ok(()); + }; let mut table_region_leases = HashMap::new(); stat.region_stats.iter().for_each(|region_stat| { @@ -133,7 +135,6 @@ mod test { let builder = MetaSrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let ctx = &mut metasrv.new_ctx(); - let handler = RegionLeaseHandler::default(); let acc = &mut HeartbeatAccumulator::default(); let new_region_stat = |region_number: RegionNumber| -> RegionStat { @@ -183,7 +184,7 @@ mod test { .await .unwrap(); - handler.handle(&req, ctx, acc).await.unwrap(); + RegionLeaseHandler.handle(&req, ctx, acc).await.unwrap(); assert_eq!(acc.region_leases.len(), 1); let lease = acc.region_leases.remove(0); diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 3b23152a67..e081d3fb8e 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(async_closure)] -#![feature(btree_drain_filter)] #![feature(result_flattening)] pub mod bootstrap; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index a662903007..ebfbb5da31 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -211,19 +211,19 @@ impl MetaSrvBuilder { }; let group = HeartbeatHandlerGroup::new(pushers); - group.add_handler(ResponseHeaderHandler::default()).await; + group.add_handler(ResponseHeaderHandler).await; // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, // because even if the current meta-server node is no longer the leader it can // still help the datanode to keep lease. - group.add_handler(KeepLeaseHandler::default()).await; - group.add_handler(CheckLeaderHandler::default()).await; - group.add_handler(OnLeaderStartHandler::default()).await; - group.add_handler(CollectStatsHandler::default()).await; - group.add_handler(MailboxHandler::default()).await; + group.add_handler(KeepLeaseHandler).await; + group.add_handler(CheckLeaderHandler).await; + group.add_handler(OnLeaderStartHandler).await; + group.add_handler(CollectStatsHandler).await; + group.add_handler(MailboxHandler).await; if let Some(region_failover_handler) = region_failover_handler { group.add_handler(region_failover_handler).await; } - group.add_handler(RegionLeaseHandler::default()).await; + group.add_handler(RegionLeaseHandler).await; group.add_handler(PersistStatsHandler::default()).await; if let Some((publish, _)) = pubsub.as_ref() { group diff --git a/src/meta-srv/src/procedure/alter_table.rs b/src/meta-srv/src/procedure/alter_table.rs index 644649d85f..9de37f5ff6 100644 --- a/src/meta-srv/src/procedure/alter_table.rs +++ b/src/meta-srv/src/procedure/alter_table.rs @@ -223,7 +223,7 @@ impl AlterTableProcedure { .context(error::TableSnafu)? .build() .with_context(|_| error::BuildTableMetaSnafu { - table_name: table_ref.table.clone(), + table_name: table_ref.table, })?; let mut new_info = table_info.clone(); diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 8b07f6acb6..b7b4dc7bc0 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -98,7 +98,8 @@ impl ActivateRegion { return UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), reason: "expect open region reply", - }.fail(); + } + .fail(); }; if result { Ok(Box::new(UpdateRegionMetadata::new(self.candidate))) @@ -114,7 +115,7 @@ impl ActivateRegion { RetryLaterSnafu { reason }.fail() } } - Err(e) if matches!(e, Error::MailboxTimeout { .. }) => { + Err(Error::MailboxTimeout { .. }) => { let reason = format!( "Mailbox received timeout for activate failed region {failed_region:?} on Datanode {:?}", self.candidate, diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c6c1dc23a7..9f09de0da4 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -89,8 +89,9 @@ impl DeactivateRegion { let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { return UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), - reason: "expect close region reply" - }.fail(); + reason: "expect close region reply", + } + .fail(); }; if result { InactiveNodeManager::new(&ctx.in_memory) @@ -108,7 +109,7 @@ impl DeactivateRegion { RetryLaterSnafu { reason }.fail() } } - Err(e) if matches!(e, Error::MailboxTimeout { .. }) => { + Err(Error::MailboxTimeout { .. }) => { // Since we are in a region failover situation, the Datanode that the failed region // resides might be unreachable. So we wait for the region lease to expire. The // region would be closed by its own [RegionAliveKeeper]. @@ -140,7 +141,7 @@ impl State for DeactivateRegion { .await; let mailbox_receiver = match result { Ok(mailbox_receiver) => mailbox_receiver, - Err(e) if matches!(e, Error::PusherNotFound { .. }) => { + Err(Error::PusherNotFound { .. }) => { // See the mailbox received timeout situation comments above. self.wait_for_region_lease_expiry().await; return Ok(Box::new(ActivateRegion::new(self.candidate))); diff --git a/src/meta-srv/src/pubsub/subscribe_manager.rs b/src/meta-srv/src/pubsub/subscribe_manager.rs index 743562e8aa..d1fa1f2c73 100644 --- a/src/meta-srv/src/pubsub/subscribe_manager.rs +++ b/src/meta-srv/src/pubsub/subscribe_manager.rs @@ -64,7 +64,7 @@ where self.topic2sub .get(topic) .map(|list_ref| list_ref.clone()) - .unwrap_or_else(Vec::new) + .unwrap_or_default() } } @@ -88,7 +88,7 @@ where let subscriber = Arc::new(subscriber); for topic in topic_list { - let mut entry = self.topic2sub.entry(topic).or_insert_with(Vec::new); + let mut entry = self.topic2sub.entry(topic).or_default(); entry.push(subscriber.clone()); } diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 63f1a21019..0aae00b260 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -145,7 +145,7 @@ where .into_owned() .collect() }) - .unwrap_or_else(HashMap::new); + .unwrap_or_default(); let path = req.uri().path().to_owned(); Box::pin(async move { router.call(&path, query_params).await }) } diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 6799bae848..1680dd923e 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -72,10 +72,7 @@ pub(crate) fn create_region_distribution( violated: "region should have been set", })? .id as u32; - regions_id_map - .entry(node_id) - .or_insert_with(Vec::new) - .push(region_id); + regions_id_map.entry(node_id).or_default().push(region_id); } Ok(regions_id_map) } diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 9f5e76fdbd..731d276063 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -61,7 +61,8 @@ pub(crate) async fn fetch_table( .table_info_manager() .get(table_id) .await - .context(TableMetadataManagerSnafu)? else { + .context(TableMetadataManagerSnafu)? + else { return Ok(None); }; diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 5e83015e09..a4f8a27b48 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -422,9 +422,13 @@ impl MitoEngineInner { let table_dir = table_dir(catalog_name, schema_name, table_id); let Some((manifest, table_info)) = self - .recover_table_manifest_and_info(table_name, &table_dir) - .await.map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)? else { return Ok(None) }; + .recover_table_manifest_and_info(table_name, &table_dir) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? + else { + return Ok(None); + }; let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options); let opts = OpenOptions { @@ -628,9 +632,12 @@ impl MitoEngineInner { self.object_store.clone(), self.compress_type, ); - let Some(table_info) = + let Some(table_info) = MitoTable::<::Region>::recover_table_info(table_name, &manifest) - .await? else { return Ok(None) }; + .await? + else { + return Ok(None); + }; Ok(Some((manifest, table_info))) } diff --git a/src/mito/src/engine/procedure.rs b/src/mito/src/engine/procedure.rs index a8627b6164..1ffdfcc2a5 100644 --- a/src/mito/src/engine/procedure.rs +++ b/src/mito/src/engine/procedure.rs @@ -63,7 +63,7 @@ mod procedure_test_util { let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let storage_engine = EngineImpl::new( StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), + Arc::new(NoopLogStore), object_store.clone(), compaction_scheduler, ) diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index ae9f450b35..f5b4387bfe 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -202,9 +202,8 @@ impl AlterMitoTable { /// Alter regions. async fn alter_regions(&mut self) -> Result<()> { let Some(alter_op) = &self.alter_op else { - // Don't need to alter the region. - return Ok(()); - }; + return Ok(()); + }; let table_name = &self.data.request.table_name; let table_version = self.data.table_version; diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs index 13cdcf1aa4..dd314484a8 100644 --- a/src/mito/src/engine/procedure/drop.rs +++ b/src/mito/src/engine/procedure/drop.rs @@ -62,7 +62,9 @@ impl Procedure for DropMitoTable { fn lock_key(&self) -> LockKey { let table_ref = self.data.table_ref(); - let Some(table) = &self.table else { return LockKey::default() }; + let Some(table) = &self.table else { + return LockKey::default(); + }; let info = table.table_info(); let keys = info .meta diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 6be3715ae6..7ef3946d87 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -78,7 +78,7 @@ async fn setup_table_with_column_default_constraint() -> (TempDir, String, Table EngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), + Arc::new(NoopLogStore), object_store.clone(), compaction_scheduler, ) diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 3a2d2e8c31..6eb4ac7430 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -166,7 +166,7 @@ pub async fn setup_test_engine_and_table() -> TestEngineComponents { let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let storage_engine = EngineImpl::new( StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), + Arc::new(NoopLogStore), object_store.clone(), compaction_scheduler, ) diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 09da51716e..cc6b599ef6 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -40,9 +40,7 @@ impl RegionWorkerLoop { if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { let Some(region) = self.regions.get_region(region_id) else { // No such region. - send_result(sender_req.sender, RegionNotFoundSnafu { - region_id, - }.fail()); + send_result(sender_req.sender, RegionNotFoundSnafu { region_id }.fail()); continue; }; diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 8eaf5da592..7ff422b257 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -175,10 +175,7 @@ impl WriteSplitter { return FindRegionSnafu { reason }.fail(); } }; - region_map - .entry(region_id) - .or_insert_with(Vec::default) - .push(idx); + region_map.entry(region_id).or_default().push(idx); } Ok(region_map) } diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index f35ceb2508..e6e93d8b50 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -115,7 +115,11 @@ impl RangeManipulate { // process time index column // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last. let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else { - return Err(datafusion::common::field_not_found(None::, time_index, input_schema.as_ref())) + return Err(datafusion::common::field_not_found( + None::, + time_index, + input_schema.as_ref(), + )); }; let ts_col_field = columns[ts_col_index].field(); let timestamp_range_field = Field::new( @@ -128,7 +132,11 @@ impl RangeManipulate { // process value columns for name in field_columns { let Some(index) = input_schema.index_of_column_by_name(None, name)? else { - return Err(datafusion::common::field_not_found(None::, name, input_schema.as_ref())) + return Err(datafusion::common::field_not_found( + None::, + name, + input_schema.as_ref(), + )); }; columns[index] = DFField::from(RangeArray::convert_field(columns[index].field())); } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 667f03af49..1b9c0ff37a 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -604,7 +604,7 @@ impl PromPlanner { let exprs = result_set .into_iter() .map(|col| DfExpr::Column(col.into())) - .chain(self.create_tag_column_exprs()?.into_iter()) + .chain(self.create_tag_column_exprs()?) .chain(Some(self.create_time_index_column_expr()?)) .collect::>(); // reuse this variable for simplicity @@ -1040,7 +1040,7 @@ impl PromPlanner { exprs.push(expr); } - utils::conjunction(exprs.into_iter()).context(ValueNotFoundSnafu { + utils::conjunction(exprs).context(ValueNotFoundSnafu { table: self.ctx.table_name.clone().unwrap(), }) } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 1a249bc8fe..e537518594 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -654,7 +654,9 @@ mod tests { let columns = vec![builder.to_vector()]; let record_batch = RecordBatch::new(schema, columns).unwrap(); let output = execute_show_with_filter(record_batch, None).await.unwrap(); - let Output::RecordBatches(record_batches) = output else {unreachable!()}; + let Output::RecordBatches(record_batches) = output else { + unreachable!() + }; let expected = "\ +----------------+ | Tables | @@ -682,12 +684,18 @@ mod tests { ) .unwrap()[0] .clone(); - let Statement::ShowTables(ShowTables { kind, .. }) = statement else {unreachable!()}; - let ShowKind::Where(filter) = kind else {unreachable!()}; + let Statement::ShowTables(ShowTables { kind, .. }) = statement else { + unreachable!() + }; + let ShowKind::Where(filter) = kind else { + unreachable!() + }; let output = execute_show_with_filter(record_batch, Some(filter)) .await .unwrap(); - let Output::RecordBatches(record_batches) = output else {unreachable!()}; + let Output::RecordBatches(record_batches) = output else { + unreachable!() + }; let expected = "\ +---------+ | Tables | diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index b131cdd808..7454084d9c 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -86,7 +86,7 @@ impl ExtensionPlanner for DistExtensionPlanner { .create_physical_plan(input_plan, session_state) .await?; let Some(table_name) = self.get_table_name(input_plan)? else { - // no relation found in input plan, going to execute them locally + // no relation found in input plan, going to execute them locally return Ok(Some(input_physical_plan)); }; diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 09d92e6d01..79d5fe755d 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -44,10 +44,9 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { .plan(stmt, QueryContext::arc()) .await .unwrap(); - let Output::Stream(stream) = engine - .execute(plan, QueryContext::arc()) - .await - .unwrap() else { unreachable!() }; + let Output::Stream(stream) = engine.execute(plan, QueryContext::arc()).await.unwrap() else { + unreachable!() + }; util::collect(stream).await.unwrap() } diff --git a/src/query/src/tests/mean_test.rs b/src/query/src/tests/mean_test.rs index 604ad33f67..288d25ba2a 100644 --- a/src/query/src/tests/mean_test.rs +++ b/src/query/src/tests/mean_test.rs @@ -57,7 +57,9 @@ where function::get_numbers_from_table::(column_name, table_name, engine.clone()).await; let numbers = numbers.iter().map(|&n| n.as_()).collect::>(); let expected = numbers.iter().sum::() / (numbers.len() as f64); - let Value::Float64(OrderedFloat(value)) = value else { unreachable!() }; + let Value::Float64(OrderedFloat(value)) = value else { + unreachable!() + }; assert!( (value - expected).abs() < 1e-3, "expected {expected}, actual {value}" diff --git a/src/script/src/python/rspython/builtins.rs b/src/script/src/python/rspython/builtins.rs index e75426271f..ba4fc953ec 100644 --- a/src/script/src/python/rspython/builtins.rs +++ b/src/script/src/python/rspython/builtins.rs @@ -654,7 +654,7 @@ pub(crate) mod greptime_builtin { ) -> PyResult { let percent = expressions::Literal::new(datafusion_common::ScalarValue::Float64(Some(percent))); - return eval_aggr_fn( + eval_aggr_fn( expressions::ApproxPercentileCont::new( vec![ Arc::new(expressions::Column::new("expr0", 0)) as _, @@ -666,7 +666,7 @@ pub(crate) mod greptime_builtin { .map_err(|err| from_df_err(err, vm))?, &[values.to_arrow_array()], vm, - ); + ) } /// effectively equals to `list(vector)` @@ -857,7 +857,7 @@ pub(crate) mod greptime_builtin { // pyfunction can return PyResult<...>, args can be like PyObjectRef or anything // impl IntoPyNativeFunc, see rustpython-vm function for more details let args = vec![base.as_vector_ref(), arg_pow]; - let res = PowFunction::default() + let res = PowFunction .eval(FunctionContext::default(), &args) .map_err(|err| { vm.new_runtime_error(format!( diff --git a/src/script/src/python/rspython/copr_impl.rs b/src/script/src/python/rspython/copr_impl.rs index e68906f313..7420ead142 100644 --- a/src/script/src/python/rspython/copr_impl.rs +++ b/src/script/src/python/rspython/copr_impl.rs @@ -14,8 +14,8 @@ use std::cell::RefCell; use std::collections::{HashMap, HashSet}; +use std::rc::Rc; use std::result::Result as StdResult; -use std::sync::Arc; use common_recordbatch::RecordBatch; use common_telemetry::{info, timer}; @@ -37,7 +37,7 @@ use crate::python::rspython::dataframe_impl::data_frame::set_dataframe_in_scope; use crate::python::rspython::dataframe_impl::init_data_frame; use crate::python::rspython::utils::{format_py_error, is_instance, py_obj_to_vec}; -thread_local!(static INTERPRETER: RefCell>> = RefCell::new(None)); +thread_local!(static INTERPRETER: RefCell>> = RefCell::new(None)); /// Using `RustPython` to run a parsed `Coprocessor` struct as input to execute python code pub(crate) fn rspy_exec_parsed( @@ -48,7 +48,7 @@ pub(crate) fn rspy_exec_parsed( let _t = timer!(metric::METRIC_RSPY_EXEC_TOTAL_ELAPSED); // 3. get args from `rb`, and cast them into PyVector let args: Vec = if let Some(rb) = rb { - let arg_names = copr.deco_args.arg_names.clone().unwrap_or(vec![]); + let arg_names = copr.deco_args.arg_names.clone().unwrap_or_default(); let args = select_from_rb(rb, &arg_names)?; check_args_anno_real_type(&arg_names, &args, copr, rb)?; args @@ -99,7 +99,7 @@ pub(crate) fn exec_with_cached_vm( rb: &Option, args: Vec, params: &HashMap, - vm: &Arc, + vm: &Rc, ) -> Result { vm.enter(|vm| -> Result { let _t = timer!(metric::METRIC_RSPY_EXEC_ELAPSED); @@ -188,7 +188,7 @@ fn try_into_columns( } /// init interpreter with type PyVector and Module: greptime -pub(crate) fn init_interpreter() -> Arc { +pub(crate) fn init_interpreter() -> Rc { let _t = timer!(metric::METRIC_RSPY_INIT_ELAPSED); INTERPRETER.with(|i| { i.borrow_mut() @@ -202,7 +202,7 @@ pub(crate) fn init_interpreter() -> Arc { let mut settings = vm::Settings::default(); // disable SIG_INT handler so our own binary can take ctrl_c handler settings.no_sig_int = true; - let interpreter = Arc::new(vm::Interpreter::with_init(settings, |vm| { + let interpreter = Rc::new(vm::Interpreter::with_init(settings, |vm| { // not using full stdlib to prevent security issue, instead filter out a few simple util module vm.add_native_modules( rustpython_stdlib::get_module_inits() diff --git a/src/script/src/python/rspython/utils.rs b/src/script/src/python/rspython/utils.rs index e1ee104b46..e24b3005c3 100644 --- a/src/script/src/python/rspython/utils.rs +++ b/src/script/src/python/rspython/utils.rs @@ -136,8 +136,8 @@ pub fn py_obj_to_vec( match columnar_value { DFColValue::Scalar(ScalarValue::List(scalars, _datatype)) => match scalars { Some(scalars) => { - let array = ScalarValue::iter_to_array(scalars.into_iter()) - .context(error::DataFusionSnafu)?; + let array = + ScalarValue::iter_to_array(scalars).context(error::DataFusionSnafu)?; Helper::try_into_vector(array).context(error::TypeCastSnafu) } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 0652993464..31d636a1e8 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -15,7 +15,7 @@ api = { path = "../api" } arrow-flight.workspace = true async-trait = "0.1" axum = { version = "0.6", features = ["headers"] } -axum-macros = "0.3" +axum-macros = "0.3.8" base64 = "0.13" bytes = "1.2" catalog = { path = "../catalog" } @@ -102,7 +102,6 @@ rand.workspace = true rustls = { version = "0.21", features = ["dangerous_configuration"] } script = { path = "../script", features = ["python"] } serde_json = "1.0" -session = { path = "../session", features = ["testing"] } table = { path = "../table" } tokio-postgres = "0.7" tokio-postgres-rustls = "0.10" diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index 75dab865fe..36fe992e6b 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -111,7 +111,9 @@ impl GreptimeRequestHandler { header: Option<&RequestHeader>, query_ctx: &QueryContextRef, ) -> TonicResult> { - let Some(user_provider) = self.user_provider.as_ref() else { return Ok(Ok(())) }; + let Some(user_provider) = self.user_provider.as_ref() else { + return Ok(Ok(())); + }; let auth_scheme = header .and_then(|header| { diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 3974a13ca7..b447aaf852 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -184,7 +184,9 @@ fn get_influxdb_credentials( Ok(Some((username.to_string(), password.to_string().into()))) } else { // try v1 - let Some(query_str) = request.uri().query() else { return Ok(None) }; + let Some(query_str) = request.uri().query() else { + return Ok(None); + }; match extract_influxdb_user_from_query(query_str) { (None, None) => Ok(None), diff --git a/src/servers/src/mysql/federated.rs b/src/servers/src/mysql/federated.rs index 2ed84d54d8..d4007a896a 100644 --- a/src/servers/src/mysql/federated.rs +++ b/src/servers/src/mysql/federated.rs @@ -239,7 +239,7 @@ fn select_variable(query: &str, query_context: QueryContextRef) -> Option Option { - if vec![&SELECT_VAR_PATTERN, &MYSQL_CONN_JAVA_PATTERN] + if [&SELECT_VAR_PATTERN, &MYSQL_CONN_JAVA_PATTERN] .iter() .any(|r| r.is_match(query)) { diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs index 2119acba7c..eff7eeb40c 100644 --- a/src/servers/src/mysql/helper.rs +++ b/src/servers/src/mysql/helper.rs @@ -218,21 +218,27 @@ mod tests { #[test] fn test_transform_placeholders() { let insert = parse_sql("insert into demo values(?,?,?)"); - let Statement::Insert(insert) = transform_placeholders(insert) else { unreachable!()}; + let Statement::Insert(insert) = transform_placeholders(insert) else { + unreachable!() + }; assert_eq!( "INSERT INTO demo VALUES ($1, $2, $3)", insert.inner.to_string() ); let delete = parse_sql("delete from demo where host=? and idc=?"); - let Statement::Delete(delete) = transform_placeholders(delete) else { unreachable!()}; + let Statement::Delete(delete) = transform_placeholders(delete) else { + unreachable!() + }; assert_eq!( "DELETE FROM demo WHERE host = $1 AND idc = $2", delete.inner.to_string() ); let select = parse_sql("select from demo where host=? and idc in (select idc from idcs where name=?) and cpu>?"); - let Statement::Query(select) = transform_placeholders(select) else { unreachable!()}; + let Statement::Query(select) = transform_placeholders(select) else { + unreachable!() + }; assert_eq!("SELECT from AS demo WHERE host = $1 AND idc IN (SELECT idc FROM idcs WHERE name = $2) AND cpu > $3", select.inner.to_string()); } } diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index c1cf5c5a9d..da8355603f 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -113,7 +113,7 @@ where Ok(rb) => stream::iter( // collect rows from a single recordbatch into vector to avoid // borrowing it - rb.rows().map(Ok).collect::>().into_iter(), + rb.rows().map(Ok).collect::>(), ) .boxed(), Err(e) => stream::once(future::err(PgWireError::ApiError(Box::new(e)))).boxed(), diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 89ecec05b4..e511cebe4c 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -257,7 +257,9 @@ pub(super) fn parameters_to_scalar_values( } for (idx, client_type) in client_param_types.iter().enumerate() { - let Some(Some(server_type)) = param_types.get(&format!("${}", idx + 1)) else { continue }; + let Some(Some(server_type)) = param_types.get(&format!("${}", idx + 1)) else { + continue; + }; let value = match client_type { &Type::VARCHAR | &Type::TEXT => { let data = portal.parameter::(idx)?; diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index b0cc355ef2..9f7e81fb13 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -618,7 +618,9 @@ async fn get_all_column_names( let mut labels = HashSet::new(); for table_name in table_names { - let Some(table) = manager.table(catalog, schema, &table_name).await? else { continue }; + let Some(table) = manager.table(catalog, schema, &table_name).await? else { + continue; + }; let schema = table.schema(); for column in schema.column_schemas() { labels.insert(column.name.to_string()); diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 54e755c353..4a2d2da2a4 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -49,7 +49,6 @@ impl Display for QueryContext { } impl QueryContext { - #[cfg(any(test, feature = "testing"))] pub fn arc() -> QueryContextRef { QueryContextBuilder::default().build() } diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 8d6bbd1541..fc3573fa19 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -191,14 +191,17 @@ mod tests { let result0 = ParserContext::create_with_dialect(sql0, &GreptimeDbDialect {}).unwrap(); let result1 = ParserContext::create_with_dialect(sql1, &GreptimeDbDialect {}).unwrap(); - for mut result in vec![result0, result1] { + for mut result in [result0, result1] { assert_eq!(1, result.len()); let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { Copy(copy) => { - let crate::statements::copy::Copy::CopyTable(CopyTable::To(copy_table)) = copy else { unreachable!() }; + let crate::statements::copy::Copy::CopyTable(CopyTable::To(copy_table)) = copy + else { + unreachable!() + }; let (catalog, schema, table) = if let [catalog, schema, table] = ©_table.table_name.0[..] { ( @@ -360,7 +363,9 @@ mod tests { .pop() .unwrap(); - let Statement::Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() }; + let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { + unreachable!() + }; assert_eq!( ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), stmt.database_name diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 4fa987a9ff..05b479b65f 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -78,7 +78,9 @@ impl CreateTable { .iter() .map(|c| { if is_time_index(c) { - let TableConstraint::Unique { columns, ..} = c else { unreachable!() }; + let TableConstraint::Unique { columns, .. } = c else { + unreachable!() + }; format_indent!("{}TIME INDEX ({})", format_list_comma!(columns)) } else { diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 1f0dccada7..8cef41dea6 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -346,7 +346,9 @@ impl ChunkReaderBuilder { /// Build time range predicate from schema and filters. fn build_time_range_predicate(&self) -> TimestampRange { - let Some(ts_col) = self.schema.user_schema().timestamp_column() else { return TimestampRange::min_to_max() }; + let Some(ts_col) = self.schema.user_schema().timestamp_column() else { + return TimestampRange::min_to_max(); + }; let unit = ts_col .data_type .as_timestamp() @@ -361,7 +363,9 @@ impl ChunkReaderBuilder { return true; } // end_timestamp of sst file is inclusive. - let Some((start, end)) = *file.time_range() else { return true; }; + let Some((start, end)) = *file.time_range() else { + return true; + }; let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); file_ts_range.intersects(predicate) } diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 794789d71d..52ac32c838 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -46,7 +46,9 @@ pub(crate) fn get_expired_ssts( ttl: Option, now: Timestamp, ) -> Result> { - let Some(ttl) = ttl else { return Ok(vec![]); }; + let Some(ttl) = ttl else { + return Ok(vec![]); + }; let expire_time = now.sub_duration(ttl).context(TtlCalculationSnafu)?; diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index ec3e1091a5..43a236c3e5 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -131,8 +131,8 @@ impl CompactionTaskImpl { let edit = RegionEdit { region_version, flushed_sequence: None, - files_to_add: Vec::from_iter(output.into_iter()), - files_to_remove: Vec::from_iter(input.into_iter()), + files_to_add: Vec::from_iter(output), + files_to_remove: Vec::from_iter(input), compaction_time_window: self.compaction_time_window, }; debug!( diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index c067eef293..9d1aaa1a8a 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -162,7 +162,10 @@ impl Memtable for BTreeMemtable { let Some(timestamp_type) = ts_meta.desc.data_type.as_timestamp() else { // safety: timestamp column always has timestamp type, otherwise it's a bug. - panic!("Timestamp column is not a valid timestamp type: {:?}", self.schema); + panic!( + "Timestamp column is not a valid timestamp type: {:?}", + self.schema + ); }; MemtableStats { @@ -472,7 +475,9 @@ impl InnerKey { #[inline] fn is_in_time_range(&self, range: &Option) -> bool { - let Some(range) = range else { return true; }; + let Some(range) = range else { + return true; + }; range.contains( &self .timestamp() diff --git a/src/storage/src/read/windowed.rs b/src/storage/src/read/windowed.rs index 83e8716e7b..76e18ffd1d 100644 --- a/src/storage/src/read/windowed.rs +++ b/src/storage/src/read/windowed.rs @@ -63,7 +63,9 @@ where { async fn next_batch(&mut self) -> Result> { let _window_scan_elapsed = timer!(crate::metrics::WINDOW_SCAN_ELAPSED); - let Some(mut reader) = self.readers.pop() else { return Ok(None); }; + let Some(mut reader) = self.readers.pop() else { + return Ok(None); + }; let store_schema = self.schema.schema_to_read(); let mut batches = vec![]; @@ -78,11 +80,13 @@ where } let Some(num_columns) = batches.get(0).map(|b| b.len()) else { - // the reader does not yield data, a batch of empty vectors must be returned instead of + // the reader does not yield data, a batch of empty vectors must be returned instead of // an empty batch without any column. - let empty_columns = store_schema.columns().iter().map(|s| { - s.desc.data_type.create_mutable_vector(0).to_vector() - }).collect(); + let empty_columns = store_schema + .columns() + .iter() + .map(|s| s.desc.data_type.create_mutable_vector(0).to_vector()) + .collect(); return Ok(Some(Batch::new(empty_columns))); }; let mut vectors_in_batch = Vec::with_capacity(num_columns); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 36025e0097..1db8484b74 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -542,7 +542,7 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig Result> { - let (Some(ts_col_idx), Some(ts_col)) = (schema.timestamp_index(), schema.timestamp_column()) else { return Ok(None); }; + let (Some(ts_col_idx), Some(ts_col)) = (schema.timestamp_index(), schema.timestamp_column()) + else { + return Ok(None); + }; let ts_datatype = &ts_col.data_type; decode_timestamp_range_inner(file_meta, ts_col_idx, ts_datatype) } @@ -176,9 +179,16 @@ fn decode_timestamp_range_inner( .context(DecodeParquetTimeRangeSnafu { msg: format!("Cannot find ts column by index: {ts_index}"), })? - .meta_data else { return Ok(None) }; - let Some(stats) = &metadata.statistics else { return Ok(None) }; - let (Some(min_value), Some(max_value)) = (&stats.min_value, &stats.max_value) else { return Ok(None); }; + .meta_data + else { + return Ok(None); + }; + let Some(stats) = &metadata.statistics else { + return Ok(None); + }; + let (Some(min_value), Some(max_value)) = (&stats.min_value, &stats.max_value) else { + return Ok(None); + }; // according to [parquet's spec](https://parquet.apache.org/docs/file-format/data-pages/encodings/), min/max value in stats uses plain encoding with little endian. // also see https://github.com/apache/arrow-rs/blob/5fb337db04a1a19f7d40da46f19b7b5fd4051593/parquet/src/file/statistics.rs#L172 diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index cada401c13..92cd6808db 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -157,7 +157,9 @@ impl Default for TwcsOptions { impl From<&HashMap> for CompactionStrategy { fn from(opts: &HashMap) -> Self { - let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else { return CompactionStrategy::default() }; + let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else { + return CompactionStrategy::default(); + }; if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE) { CompactionStrategy::LeveledTimeWindow } else if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) { diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 487457ac16..45ed0a2e1c 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -59,7 +59,7 @@ impl TestEnv { let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let storage_engine = EngineImpl::new( StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), + Arc::new(NoopLogStore), object_store.clone(), compaction_scheduler, ) diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 92e1552bbd..e11117f806 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -257,7 +257,9 @@ impl<'a> TimeRangePredicateBuilder<'a> { low: &DfExpr, high: &DfExpr, ) -> Option { - let DfExpr::Column(col) = expr else { return None; }; + let DfExpr::Column(col) = expr else { + return None; + }; if col.name != self.ts_col_name { return None; } @@ -288,7 +290,9 @@ impl<'a> TimeRangePredicateBuilder<'a> { if negated { return None; } - let DfExpr::Column(col) = expr else { return None; }; + let DfExpr::Column(col) = expr else { + return None; + }; if col.name != self.ts_col_name { return None; } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 6ea72b8804..1ea5573afa 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -138,7 +138,9 @@ mod test { )), }); let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+---+---+ @@ -498,7 +500,9 @@ CREATE TABLE {table_name} ( ))), }); let output = query(instance, request.clone()).await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+----+-------------------+ @@ -574,7 +578,9 @@ CREATE TABLE {table_name} ( assert!(matches!(output, Output::AffectedRows(4))); let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+----+-------------------+ @@ -638,7 +644,9 @@ CREATE TABLE {table_name} ( .await .unwrap(); let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let actual = recordbatches.pretty_print().unwrap(); @@ -724,7 +732,9 @@ CREATE TABLE {table_name} ( )), }); let output = query(instance, request.clone()).await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+---+---+ @@ -759,7 +769,9 @@ CREATE TABLE {table_name} ( assert!(matches!(output, Output::AffectedRows(2))); let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---------------------+---+---+ @@ -855,7 +867,9 @@ CREATE TABLE {table_name} ( })), }); let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +---+------+---------------------+ diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index b96591edfe..59d81457d7 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -73,7 +73,9 @@ monitor1,host=host2 memory=1027"; ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let recordbatches: Vec<_> = recordbatches.iter().collect(); @@ -100,7 +102,9 @@ monitor1,host=host2 memory=1027 1663840496400340001"; ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!( recordbatches.pretty_print().unwrap(), diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index af2d6db39d..1496c7c526 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -141,7 +141,9 @@ mod tests { async fn create_table(instance: &Instance, sql: &str) { let output = query(instance, sql).await; - let Output::AffectedRows(x) = output else { unreachable!() }; + let Output::AffectedRows(x) = output else { + unreachable!() + }; assert_eq!(x, 0); } @@ -153,12 +155,16 @@ mod tests { ('MOSS', 100000000, 10000000000, 2335190400000) "#; let output = query(instance, sql).await; - let Output::AffectedRows(x) = output else { unreachable!() }; + let Output::AffectedRows(x) = output else { + unreachable!() + }; assert_eq!(x, 4); let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition let output = query(instance, sql).await; - let Output::Stream(s) = output else { unreachable!() }; + let Output::Stream(s) = output else { + unreachable!() + }; let batches = common_recordbatch::util::collect_batches(s).await.unwrap(); let pretty_print = batches.pretty_print().unwrap(); let expected = "\ @@ -213,7 +219,9 @@ mod tests { .await .unwrap(); let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let actual = recordbatches.pretty_print().unwrap(); @@ -225,7 +233,9 @@ mod tests { async fn drop_table(instance: &Instance) { let sql = "DROP TABLE demo"; let output = query(instance, sql).await; - let Output::AffectedRows(x) = output else { unreachable!() }; + let Output::AffectedRows(x) = output else { + unreachable!() + }; assert_eq!(x, 1); } @@ -362,7 +372,7 @@ mod tests { let mut instance = standalone.instance; let plugins = Plugins::new(); - let hook = Arc::new(DisableDBOpHook::default()); + let hook = Arc::new(DisableDBOpHook); plugins.insert::>(hook.clone()); Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index 89ff45ab4b..1f5683006c 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -72,7 +72,9 @@ mod test { ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; + let Output::Stream(stream) = output else { + unreachable!() + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!( recordbatches.pretty_print().unwrap(), diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 7d4d508eaa..6abf5104dd 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -37,7 +37,9 @@ impl MockDistributedInstance { .storage_guards .iter() .map(|g| { - let TempDirGuard::File(dir) = &g.0 else { unreachable!() }; + let TempDirGuard::File(dir) = &g.0 else { + unreachable!() + }; dir }) .collect() @@ -63,7 +65,9 @@ pub struct MockStandaloneInstance { impl MockStandaloneInstance { pub fn data_tmp_dir(&self) -> &TempDir { - let TempDirGuard::File(dir) = &self._guard.storage_guard.0 else { unreachable!() }; + let TempDirGuard::File(dir) = &self._guard.storage_guard.0 else { + unreachable!() + }; dir } } diff --git a/tests-integration/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs index d4533c17de..4bf6d85067 100644 --- a/tests-integration/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -44,7 +44,10 @@ async fn create_insert_query_assert( query: promql.to_string(), ..PromQuery::default() }; - let QueryStatement::Promql(mut eval_stmt) = QueryLanguageParser::parse_promql(&query).unwrap() else { unreachable!() }; + let QueryStatement::Promql(mut eval_stmt) = QueryLanguageParser::parse_promql(&query).unwrap() + else { + unreachable!() + }; eval_stmt.start = start; eval_stmt.end = end; eval_stmt.interval = interval;