chore: upgrade rust toolchain to latest nightly (#2049)

* chore: upgrade rust toolchain to latest nightly

* rebase develop

* update rust toolchain in ci
This commit is contained in:
LFC
2023-08-08 15:17:51 +08:00
committed by GitHub
parent 7d0d8dc6e3
commit 46fa3eb629
127 changed files with 551 additions and 339 deletions

View File

@@ -12,5 +12,9 @@ rustflags = [
"-Wclippy::print_stdout",
"-Wclippy::print_stderr",
"-Wclippy::implicit_clone",
"-Aclippy::items_after_test_module",
# It seems clippy has made a false positive decision here when upgrading rust toolchain to
# nightly-2023-08-07, we do need it to be borrowed mutably.
# Allow it for now; try disallow it when the toolchain is upgraded in the future.
"-Aclippy::needless_pass_by_ref_mut",
]

View File

@@ -13,7 +13,7 @@ on:
name: Build API docs
env:
RUST_TOOLCHAIN: nightly-2023-05-03
RUST_TOOLCHAIN: nightly-2023-08-07
jobs:
apidoc:

View File

@@ -25,7 +25,7 @@ on:
name: CI
env:
RUST_TOOLCHAIN: nightly-2023-05-03
RUST_TOOLCHAIN: nightly-2023-08-07
jobs:
typos:
@@ -62,11 +62,11 @@ jobs:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
toolchain: stable
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Install taplo
run: cargo install taplo-cli --version ^0.8 --locked
run: cargo +stable install taplo-cli --version ^0.8 --locked
- name: Run taplo
run: taplo format --check

View File

@@ -82,7 +82,7 @@ on:
# Use env variables to control all the release process.
env:
# The arguments of building greptime.
RUST_TOOLCHAIN: nightly-2023-05-03
RUST_TOOLCHAIN: nightly-2023-08-07
CARGO_PROFILE: nightly
# Controls whether to run tests, include unit-test, integration-test and sqlness.

View File

@@ -49,6 +49,7 @@ members = [
"tests-integration",
"tests/runner",
]
resolver = "2"
[workspace.package]
version = "0.3.2"

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2023-05-03"
channel = "nightly-2023-08-07"

View File

@@ -153,7 +153,12 @@ impl InformationSchemaColumnsBuilder {
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
else {
continue;
};
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
for (idx, column) in schema.column_schemas().iter().enumerate() {

View File

@@ -151,7 +151,12 @@ impl InformationSchemaTablesBuilder {
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
else {
continue;
};
let table_info = table.table_info();
self.add_table(
&catalog_name,

View File

@@ -218,13 +218,27 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec<Reg
let mut region_number: u64 = 0;
let mut region_stats = Vec::new();
let Ok(catalog_names) = catalog_manager.catalog_names().await else { return (region_number, region_stats) };
let Ok(catalog_names) = catalog_manager.catalog_names().await else {
return (region_number, region_stats);
};
for catalog_name in catalog_names {
let Ok(schema_names) = catalog_manager.schema_names(&catalog_name).await else { continue };
let Ok(schema_names) = catalog_manager.schema_names(&catalog_name).await else {
continue;
};
for schema_name in schema_names {
let Ok(table_names) = catalog_manager.table_names(&catalog_name,&schema_name).await else { continue };
let Ok(table_names) = catalog_manager
.table_names(&catalog_name, &schema_name)
.await
else {
continue;
};
for table_name in table_names {
let Ok(Some(table)) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await else { continue };
let Ok(Some(table)) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await
else {
continue;
};
let table_info = table.table_info();
let region_numbers = &table_info.meta.region_numbers;

View File

@@ -254,7 +254,10 @@ impl CatalogManager for RemoteCatalogManager {
let Some(table) = self
.memory_catalog_manager
.table(&request.catalog, &request.schema, &request.table_name)
.await? else { return Ok(()) };
.await?
else {
return Ok(());
};
let table_info = table.table_info();
let table_ident = TableIdent {

View File

@@ -264,7 +264,7 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
let primary_key_columns = build_primary_key_columns(entry_type, key);
let mut columns_values = HashMap::with_capacity(6);
columns_values.extend(primary_key_columns.into_iter());
columns_values.extend(primary_key_columns);
let _ = columns_values.insert(
"value".to_string(),
@@ -523,7 +523,7 @@ mod tests {
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
Arc::new(NoopLogStore),
object_store.clone(),
noop_compaction_scheduler,
)
@@ -574,9 +574,15 @@ mod tests {
assert_eq!(batch.num_rows(), 1);
let row = batch.rows().next().unwrap();
let Value::UInt8(entry_type) = row[0] else { unreachable!() };
let Value::Binary(key) = row[1].clone() else { unreachable!() };
let Value::Binary(value) = row[3].clone() else { unreachable!() };
let Value::UInt8(entry_type) = row[0] else {
unreachable!()
};
let Value::Binary(key) = row[1].clone() else {
unreachable!()
};
let Value::Binary(value) = row[3].clone() else {
unreachable!()
};
let entry = decode_system_catalog(Some(entry_type), Some(&*key), Some(&*value)).unwrap();
let expected = Entry::Table(TableEntry {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),

View File

@@ -313,17 +313,11 @@ mod tests {
.await
.is_ok());
assert_eq!(
HashSet::<String>::from_iter(
vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter()
),
HashSet::from_iter(
components
.catalog_manager
.catalog_names()
.await
.unwrap()
.into_iter()
)
HashSet::<String>::from_iter(vec![
DEFAULT_CATALOG_NAME.to_string(),
catalog_name.clone()
]),
HashSet::from_iter(components.catalog_manager.catalog_names().await.unwrap())
);
let table_to_register = components

View File

@@ -243,8 +243,10 @@ mod tests {
..Default::default()
};
let Options::Datanode(options) =
cmd.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() };
let Options::Datanode(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
@@ -397,10 +399,10 @@ mod tests {
let env_prefix = "DATANODE_UT";
temp_env::with_vars(
vec![
[
(
// storage.manifest.gc_duration = 9s
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"manifest".to_uppercase(),
@@ -411,7 +413,7 @@ mod tests {
),
(
// storage.compaction.max_purge_tasks = 99
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"compaction".to_uppercase(),
@@ -422,7 +424,7 @@ mod tests {
),
(
// meta_client_options.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
vec![
[
env_prefix.to_string(),
"meta_client_options".to_uppercase(),
"metasrv_addrs".to_uppercase(),
@@ -440,7 +442,10 @@ mod tests {
};
let Options::Datanode(opts) =
command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()};
command.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
// Should be read from env, env > default values.
assert_eq!(

View File

@@ -257,8 +257,10 @@ mod tests {
..Default::default()
};
let Options::Frontend(opts) =
command.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() };
let Options::Frontend(opts) = command.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
assert_eq!(opts.http_options.as_ref().unwrap().addr, "127.0.0.1:1234");
assert_eq!(
@@ -323,8 +325,10 @@ mod tests {
..Default::default()
};
let Options::Frontend(fe_opts) =
command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()};
let Options::Frontend(fe_opts) = command.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
assert_eq!(Mode::Distributed, fe_opts.mode);
assert_eq!(
"127.0.0.1:4000".to_string(),
@@ -404,10 +408,10 @@ mod tests {
let env_prefix = "FRONTEND_UT";
temp_env::with_vars(
vec![
[
(
// mysql_options.addr = 127.0.0.1:14002
vec![
[
env_prefix.to_string(),
"mysql_options".to_uppercase(),
"addr".to_uppercase(),
@@ -417,7 +421,7 @@ mod tests {
),
(
// mysql_options.runtime_size = 11
vec![
[
env_prefix.to_string(),
"mysql_options".to_uppercase(),
"runtime_size".to_uppercase(),
@@ -427,7 +431,7 @@ mod tests {
),
(
// http_options.addr = 127.0.0.1:24000
vec![
[
env_prefix.to_string(),
"http_options".to_uppercase(),
"addr".to_uppercase(),
@@ -437,7 +441,7 @@ mod tests {
),
(
// meta_client_options.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
vec![
[
env_prefix.to_string(),
"meta_client_options".to_uppercase(),
"metasrv_addrs".to_uppercase(),
@@ -458,8 +462,10 @@ mod tests {
log_dir: None,
log_level: Some("error".to_string()),
};
let Options::Frontend(fe_opts) =
command.load_options(top_level_opts).unwrap() else {unreachable!()};
let Options::Frontend(fe_opts) = command.load_options(top_level_opts).unwrap()
else {
unreachable!()
};
// Should be read from env, env > default values.
assert_eq!(fe_opts.mysql_options.as_ref().unwrap().runtime_size, 11);

View File

@@ -187,8 +187,10 @@ mod tests {
..Default::default()
};
let Options::Metasrv(options) =
cmd.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() };
let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
assert_eq!(SelectorType::LoadBased, options.selector);
@@ -216,8 +218,10 @@ mod tests {
..Default::default()
};
let Options::Metasrv(options) =
cmd.load_options(TopLevelOptions::default()).unwrap() else { unreachable!() };
let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2379".to_string(), options.store_addr);
@@ -269,20 +273,20 @@ mod tests {
let env_prefix = "METASRV_UT";
temp_env::with_vars(
vec![
[
(
// bind_addr = 127.0.0.1:14002
vec![env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
[env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
Some("127.0.0.1:14002"),
),
(
// server_addr = 127.0.0.1:13002
vec![env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
[env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
Some("127.0.0.1:13002"),
),
(
// http_options.addr = 127.0.0.1:24000
vec![
[
env_prefix.to_string(),
"http_options".to_uppercase(),
"addr".to_uppercase(),
@@ -300,7 +304,10 @@ mod tests {
};
let Options::Metasrv(opts) =
command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()};
command.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
// Should be read from env, env > default values.
assert_eq!(opts.bind_addr, "127.0.0.1:14002");

View File

@@ -158,10 +158,10 @@ mod tests {
let env_prefix = "DATANODE_UT";
temp_env::with_vars(
// The following environment variables will be used to override the values in the config file.
vec![
[
(
// storage.manifest.checkpoint_margin = 99
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"manifest".to_uppercase(),
@@ -172,7 +172,7 @@ mod tests {
),
(
// storage.type = S3
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"type".to_uppercase(),
@@ -182,7 +182,7 @@ mod tests {
),
(
// storage.bucket = mybucket
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"bucket".to_uppercase(),
@@ -192,7 +192,7 @@ mod tests {
),
(
// storage.manifest.gc_duration = 42s
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"manifest".to_uppercase(),
@@ -203,7 +203,7 @@ mod tests {
),
(
// storage.manifest.checkpoint_on_startup = true
vec![
[
env_prefix.to_string(),
"storage".to_uppercase(),
"manifest".to_uppercase(),
@@ -214,7 +214,7 @@ mod tests {
),
(
// wal.dir = /other/wal/dir
vec![
[
env_prefix.to_string(),
"wal".to_uppercase(),
"dir".to_uppercase(),
@@ -224,7 +224,7 @@ mod tests {
),
(
// meta_client_options.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
vec![
[
env_prefix.to_string(),
"meta_client_options".to_uppercase(),
"metasrv_addrs".to_uppercase(),

View File

@@ -423,7 +423,10 @@ mod tests {
..Default::default()
};
let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()};
let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
unreachable!()
};
let fe_opts = options.fe_opts;
let dn_opts = options.dn_opts;
let logging_opts = options.logging;
@@ -484,7 +487,8 @@ mod tests {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
})
.unwrap() else {
.unwrap()
else {
unreachable!()
};
@@ -508,10 +512,10 @@ mod tests {
let env_prefix = "STANDALONE_UT";
temp_env::with_vars(
vec![
[
(
// logging.dir = /other/log/dir
vec![
[
env_prefix.to_string(),
"logging".to_uppercase(),
"dir".to_uppercase(),
@@ -521,7 +525,7 @@ mod tests {
),
(
// logging.level = info
vec![
[
env_prefix.to_string(),
"logging".to_uppercase(),
"level".to_uppercase(),
@@ -531,7 +535,7 @@ mod tests {
),
(
// http_options.addr = 127.0.0.1:24000
vec![
[
env_prefix.to_string(),
"http_options".to_uppercase(),
"addr".to_uppercase(),
@@ -552,8 +556,10 @@ mod tests {
log_dir: None,
log_level: None,
};
let Options::Standalone(opts) =
command.load_options(top_level_opts).unwrap() else {unreachable!()};
let Options::Standalone(opts) = command.load_options(top_level_opts).unwrap()
else {
unreachable!()
};
// Should be read from env, env > default values.
assert_eq!(opts.logging.dir, "/other/log/dir");

View File

@@ -198,9 +198,8 @@ mod tests {
#[tokio::test]
async fn test_orc_infer_schema() {
let orc = OrcFormat::default();
let store = test_store(&test_data_root());
let schema = orc.infer_schema(&store, "test.orc").await.unwrap();
let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
let formatted: Vec<_> = format_schema(schema);
assert_eq!(

View File

@@ -181,7 +181,7 @@ async fn test_parquet_exec() {
.await;
assert_batches_eq!(
vec![
[
"+-----+-------+",
"| num | str |",
"+-----+-------+",
@@ -198,8 +198,7 @@ async fn test_parquet_exec() {
async fn test_orc_opener() {
let root = test_util::get_data_dir("tests/orc").display().to_string();
let store = test_store(&root);
let orc = OrcFormat::default();
let schema = orc.infer_schema(&store, "test.orc").await.unwrap();
let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
let schema = Arc::new(schema);
let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None);

View File

@@ -10,7 +10,7 @@ proc-macro = true
[dependencies]
backtrace = "0.3"
common-telemetry = { path = "../telemetry" }
proc-macro2 = "1.0"
proc-macro2 = "1.0.66"
quote = "1.0"
syn = "1.0"

View File

@@ -146,7 +146,9 @@ pub fn print_caller(args: TokenStream, input: TokenStream) -> TokenStream {
.expect("Expected an ident!")
.to_string();
if ident == "depth" {
let Lit::Int(i) = &name_value.lit else { panic!("Expected 'depth' to be a valid int!") };
let Lit::Int(i) = &name_value.lit else {
panic!("Expected 'depth' to be a valid int!")
};
depth = i.base10_parse::<usize>().expect("Invalid 'depth' value");
break;
}

View File

@@ -89,7 +89,7 @@ mod tests {
#[test]
fn test_function_registry() {
let registry = FunctionRegistry::default();
let func = Arc::new(TestAndFunction::default());
let func = Arc::new(TestAndFunction);
assert!(registry.get_function("test_and").is_none());
assert!(registry.functions().is_empty());

View File

@@ -26,7 +26,7 @@ pub(crate) struct MathFunction;
impl MathFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(PowFunction::default()));
registry.register(Arc::new(RateFunction::default()))
registry.register(Arc::new(PowFunction));
registry.register(Arc::new(RateFunction))
}
}

View File

@@ -85,7 +85,7 @@ mod tests {
use super::*;
#[test]
fn test_pow_function() {
let pow = PowFunction::default();
let pow = PowFunction;
assert_eq!("pow", pow.name());
assert_eq!(

View File

@@ -80,7 +80,7 @@ mod tests {
use super::*;
#[test]
fn test_rate_function() {
let rate = RateFunction::default();
let rate = RateFunction;
assert_eq!("prom_rate", rate.name());
assert_eq!(
ConcreteDataType::float64_datatype(),

View File

@@ -25,6 +25,6 @@ pub(crate) struct NumpyFunction;
impl NumpyFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(ClipFunction::default()));
registry.register(Arc::new(ClipFunction));
}
}

View File

@@ -156,7 +156,7 @@ mod tests {
#[test]
fn test_clip_signature() {
let clip = ClipFunction::default();
let clip = ClipFunction;
assert_eq!("clip", clip.name());
assert_eq!(
@@ -202,8 +202,6 @@ mod tests {
#[test]
fn test_clip_fn_signed() {
let clip = ClipFunction::default();
// eval with signed integers
let args: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from_values(0..10)),
@@ -217,7 +215,9 @@ mod tests {
)),
];
let vector = clip.eval(FunctionContext::default(), &args).unwrap();
let vector = ClipFunction
.eval(FunctionContext::default(), &args)
.unwrap();
assert_eq!(10, vector.len());
// clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6]
@@ -234,8 +234,6 @@ mod tests {
#[test]
fn test_clip_fn_unsigned() {
let clip = ClipFunction::default();
// eval with unsigned integers
let args: Vec<VectorRef> = vec![
Arc::new(UInt8Vector::from_values(0..10)),
@@ -249,7 +247,9 @@ mod tests {
)),
];
let vector = clip.eval(FunctionContext::default(), &args).unwrap();
let vector = ClipFunction
.eval(FunctionContext::default(), &args)
.unwrap();
assert_eq!(10, vector.len());
// clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6]
@@ -266,8 +266,6 @@ mod tests {
#[test]
fn test_clip_fn_float() {
let clip = ClipFunction::default();
// eval with floats
let args: Vec<VectorRef> = vec![
Arc::new(Int8Vector::from_values(0..10)),
@@ -281,7 +279,9 @@ mod tests {
)),
];
let vector = clip.eval(FunctionContext::default(), &args).unwrap();
let vector = ClipFunction
.eval(FunctionContext::default(), &args)
.unwrap();
assert_eq!(10, vector.len());
// clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6]

View File

@@ -291,7 +291,7 @@ mod tests {
];
let vector = interp(&args).unwrap();
assert_eq!(4, vector.len());
let res = vec![3.0, 3.0, 2.5, 0.0];
let res = [3.0, 3.0, 2.5, 0.0];
for (i, item) in res.iter().enumerate().take(vector.len()) {
assert!(matches!(vector.get(i),Value::Float64(v) if v==*item));
}
@@ -305,7 +305,7 @@ mod tests {
let left = vec![-1];
let right = vec![2];
let expect = vec![-1.0, 3.0, 2.5, 2.0, 0.0, 2.0];
let expect = [-1.0, 3.0, 2.5, 2.0, 0.0, 2.0];
let args: Vec<VectorRef> = vec![
Arc::new(Float64Vector::from_vec(x)),

View File

@@ -22,6 +22,6 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(ToUnixtimeFunction::default()));
registry.register(Arc::new(ToUnixtimeFunction));
}
}

View File

@@ -162,7 +162,7 @@ mod tests {
#[test]
fn test_string_to_unixtime() {
let f = ToUnixtimeFunction::default();
let f = ToUnixtimeFunction;
assert_eq!("to_unixtime", f.name());
assert_eq!(
ConcreteDataType::int64_datatype(),
@@ -190,7 +190,7 @@ mod tests {
Some("2022-06-30T23:59:60Z"),
Some("invalid_time_stamp"),
];
let results = vec![Some(1677652502), None, Some(1656633600), None];
let results = [Some(1677652502), None, Some(1656633600), None];
let args: Vec<VectorRef> = vec![Arc::new(StringVector::from(times.clone()))];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
@@ -211,7 +211,7 @@ mod tests {
#[test]
fn test_int_to_unixtime() {
let f = ToUnixtimeFunction::default();
let f = ToUnixtimeFunction;
assert_eq!("to_unixtime", f.name());
assert_eq!(
ConcreteDataType::int64_datatype(),
@@ -234,7 +234,7 @@ mod tests {
));
let times = vec![Some(3_i64), None, Some(5_i64), None];
let results = vec![Some(3), None, Some(5), None];
let results = [Some(3), None, Some(5), None];
let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
@@ -255,7 +255,7 @@ mod tests {
#[test]
fn test_timestamp_to_unixtime() {
let f = ToUnixtimeFunction::default();
let f = ToUnixtimeFunction;
assert_eq!("to_unixtime", f.name());
assert_eq!(
ConcreteDataType::int64_datatype(),
@@ -283,7 +283,7 @@ mod tests {
Some(TimestampSecond::new(42)),
None,
];
let results = vec![Some(123), None, Some(42), None];
let results = [Some(123), None, Some(42), None];
let ts_vector: TimestampSecondVector = build_vector_from_slice(&times);
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();

View File

@@ -77,7 +77,7 @@ mod tests {
#[test]
fn test_create_udf() {
let f = Arc::new(TestAndFunction::default());
let f = Arc::new(TestAndFunction);
let args: Vec<VectorRef> = vec![
Arc::new(ConstantVector::new(

View File

@@ -249,7 +249,9 @@ mod test {
)
.unwrap();
assert_eq!(flight_data.len(), 3);
let [d1, d2, d3] = flight_data.as_slice() else { unreachable!() };
let [d1, d2, d3] = flight_data.as_slice() else {
unreachable!()
};
let decoder = &mut FlightDecoder::default();
assert!(decoder.schema.is_none());
@@ -263,19 +265,25 @@ mod test {
let message = decoder.try_decode(d1.clone()).unwrap();
assert!(matches!(message, FlightMessage::Schema(_)));
let FlightMessage::Schema(decoded_schema) = message else { unreachable!() };
let FlightMessage::Schema(decoded_schema) = message else {
unreachable!()
};
assert_eq!(decoded_schema, schema);
let _ = decoder.schema.as_ref().unwrap();
let message = decoder.try_decode(d2.clone()).unwrap();
assert!(matches!(message, FlightMessage::Recordbatch(_)));
let FlightMessage::Recordbatch(actual_batch) = message else { unreachable!() };
let FlightMessage::Recordbatch(actual_batch) = message else {
unreachable!()
};
assert_eq!(actual_batch, batch1);
let message = decoder.try_decode(d3.clone()).unwrap();
assert!(matches!(message, FlightMessage::Recordbatch(_)));
let FlightMessage::Recordbatch(actual_batch) = message else { unreachable!() };
let FlightMessage::Recordbatch(actual_batch) = message else {
unreachable!()
};
assert_eq!(actual_batch, batch2);
}

View File

@@ -123,7 +123,8 @@ impl DatanodeTableManager {
let Some(curr) = resp
.prev_kv
.map(|kv| DatanodeTableValue::try_from_raw_value(kv.value))
.transpose()? else {
.transpose()?
else {
return UnexpectedSnafu {
err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, val: {val:?}"),
}.fail();

View File

@@ -81,7 +81,7 @@ impl TableInfoManager {
let Some(curr) = curr else {
return UnexpectedSnafu {
err_msg: format!("compare_and_put expect None but failed with current value None, table_id: {table_id}, table_info: {table_info:?}"),
}.fail()
}.fail();
};
ensure!(
&curr.table_info == table_info,

View File

@@ -163,10 +163,11 @@ impl TableNameManager {
let Some(curr) = result
.prev_kv
.map(|x| TableNameValue::try_from_raw_value(x.value))
.transpose()? else {
.transpose()?
else {
return UnexpectedSnafu {
err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, value: {value:?}"),
}.fail()
}.fail();
};
ensure!(
curr.table_id == table_id,
@@ -226,7 +227,8 @@ impl TableNameManager {
// name, then the table must not exist at the first place.
return TableNotExistSnafu {
table_name: TableName::from(key).to_string(),
}.fail();
}
.fail();
};
ensure!(

View File

@@ -91,7 +91,7 @@ impl TableRegionManager {
let Some(curr) = curr else {
return UnexpectedSnafu {
err_msg: format!("compare_and_put expect None but failed with current value None, table_id: {table_id}, region_distribution: {region_distribution:?}"),
}.fail()
}.fail();
};
ensure!(
&curr.region_distribution == region_distribution,

View File

@@ -236,7 +236,7 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
start: key,
end: range_end,
};
kvs.drain_filter(|key, _| range.contains(key))
kvs.extract_if(|key, _| range.contains(key))
.map(Into::into)
.collect::<Vec<_>>()
};

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(btree_drain_filter)]
#![feature(btree_extract_if)]
pub mod error;
pub mod heartbeat;

View File

@@ -474,7 +474,10 @@ impl ProcedureManager for LocalManager {
if message.parent_id.is_none() {
// This is the root procedure. We only submit the root procedure as it will
// submit sub-procedures to the manager.
let Some(loaded_procedure) = self.manager_ctx.load_one_procedure_from_message(*procedure_id, message) else {
let Some(loaded_procedure) = self
.manager_ctx
.load_one_procedure_from_message(*procedure_id, message)
else {
// Try to load other procedures.
continue;
};

View File

@@ -44,7 +44,9 @@ pub fn build_filter_from_timestamp(
ts_col_name: &str,
time_range: Option<&TimestampRange>,
) -> Option<Expr> {
let Some(time_range) = time_range else { return None; };
let Some(time_range) = time_range else {
return None;
};
let ts_col_expr = DfExpr::Column(Column {
relation: None,
name: ts_col_name.to_string(),

View File

@@ -60,7 +60,7 @@ impl SerializerRegistry for ExtensionSerializer {
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be serialized".to_string(),
)),
name if name == "MergeScan" => Ok(vec![]),
"MergeScan" => Ok(vec![]),
other => Err(DataFusionError::NotImplemented(format!(
"Serizlize logical plan for {}",
other

View File

@@ -50,7 +50,8 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
}
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else {
let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take()
else {
unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'");
};

View File

@@ -50,7 +50,8 @@ impl HeartbeatResponseHandler for OpenRegionHandler {
}
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else {
let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take()
else {
unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'");
};

View File

@@ -131,7 +131,7 @@ impl Instance {
state: "region_alive_keepers is not provided when building heartbeat task",
})?;
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(ParseMailboxMessageHandler),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),

View File

@@ -721,7 +721,9 @@ mod test {
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(
"INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)",
) else { unreachable!() };
) else {
unreachable!()
};
let output = instance
.execute_sql(stmt, QueryContext::arc())
.await
@@ -729,7 +731,9 @@ mod test {
assert!(matches!(output, Output::AffectedRows(1)));
let output = exec_selection(instance, "SELECT * FROM my_database.my_table").await;
let Output::Stream(stream) = output else { unreachable!() };
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---+---+---+---------------------+---+
@@ -855,7 +859,9 @@ mod test {
assert!(matches!(output, Output::AffectedRows(3)));
let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await;
let Output::Stream(stream) = output else { unreachable!() };
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+-------+-----+
@@ -925,7 +931,9 @@ mod test {
assert!(matches!(output, Output::AffectedRows(1)));
let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await;
let Output::Stream(stream) = output else { unreachable!() };
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+-------+------+
@@ -965,7 +973,9 @@ mod test {
)),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatch = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+-------+------+--------+

View File

@@ -474,7 +474,9 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() };
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else {
unreachable!()
};
let output = instance
.inner()
.execute_sql(stmt, QueryContext::arc())
@@ -491,7 +493,9 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() };
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else {
unreachable!()
};
let output = instance
.inner()
.execute_sql(stmt, QueryContext::arc())

View File

@@ -154,7 +154,9 @@ async fn test_open_region_handler() {
]));
let instruction = open_region_instruction();
let Instruction::OpenRegion(region_ident) = instruction.clone() else { unreachable!() };
let Instruction::OpenRegion(region_ident) = instruction.clone() else {
unreachable!()
};
let table_ident = &region_ident.table_ident;
let table = prepare_table(instance.inner()).await;

View File

@@ -278,23 +278,19 @@ impl_new_concrete_type_functions!(
impl ConcreteDataType {
pub fn timestamp_second_datatype() -> Self {
ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType::default()))
ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType))
}
pub fn timestamp_millisecond_datatype() -> Self {
ConcreteDataType::Timestamp(TimestampType::Millisecond(
TimestampMillisecondType::default(),
))
ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType))
}
pub fn timestamp_microsecond_datatype() -> Self {
ConcreteDataType::Timestamp(TimestampType::Microsecond(
TimestampMicrosecondType::default(),
))
ConcreteDataType::Timestamp(TimestampType::Microsecond(TimestampMicrosecondType))
}
pub fn timestamp_nanosecond_datatype() -> Self {
ConcreteDataType::Timestamp(TimestampType::Nanosecond(TimestampNanosecondType::default()))
ConcreteDataType::Timestamp(TimestampType::Nanosecond(TimestampNanosecondType))
}
/// Returns the time data type with `TimeUnit`.
@@ -323,17 +319,15 @@ impl ConcreteDataType {
}
pub fn interval_month_day_nano_datatype() -> Self {
ConcreteDataType::Interval(IntervalType::MonthDayNano(
IntervalMonthDayNanoType::default(),
))
ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType))
}
pub fn interval_year_month_datatype() -> Self {
ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType::default()))
ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType))
}
pub fn interval_day_time_datatype() -> Self {
ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType::default()))
ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType))
}
pub fn timestamp_datatype(unit: TimeUnit) -> Self {

View File

@@ -202,19 +202,19 @@ mod tests {
fn test_as_arrow_datatype() {
assert_eq!(
ArrowDataType::Time32(ArrowTimeUnit::Second),
TimeSecondType::default().as_arrow_type()
TimeSecondType.as_arrow_type()
);
assert_eq!(
ArrowDataType::Time32(ArrowTimeUnit::Millisecond),
TimeMillisecondType::default().as_arrow_type()
TimeMillisecondType.as_arrow_type()
);
assert_eq!(
ArrowDataType::Time64(ArrowTimeUnit::Microsecond),
TimeMicrosecondType::default().as_arrow_type()
TimeMicrosecondType.as_arrow_type()
);
assert_eq!(
ArrowDataType::Time64(ArrowTimeUnit::Nanosecond),
TimeNanosecondType::default().as_arrow_type()
TimeNanosecondType.as_arrow_type()
);
}
}

View File

@@ -66,16 +66,10 @@ impl TryFrom<u64> for TimestampType {
/// - 9: nanosecond
fn try_from(value: u64) -> Result<Self, Self::Error> {
match value {
SECOND_VARIATION => Ok(TimestampType::Second(TimestampSecondType::default())),
MILLISECOND_VARIATION => Ok(TimestampType::Millisecond(
TimestampMillisecondType::default(),
)),
MICROSECOND_VARIATION => Ok(TimestampType::Microsecond(
TimestampMicrosecondType::default(),
)),
NANOSECOND_VARIATION => {
Ok(TimestampType::Nanosecond(TimestampNanosecondType::default()))
}
SECOND_VARIATION => Ok(TimestampType::Second(TimestampSecondType)),
MILLISECOND_VARIATION => Ok(TimestampType::Millisecond(TimestampMillisecondType)),
MICROSECOND_VARIATION => Ok(TimestampType::Microsecond(TimestampMicrosecondType)),
NANOSECOND_VARIATION => Ok(TimestampType::Nanosecond(TimestampNanosecondType)),
_ => InvalidTimestampPrecisionSnafu { precision: value }.fail(),
}
}

View File

@@ -342,7 +342,7 @@ mod tests {
fn test_binary_vector_builder() {
let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]);
let mut builder = BinaryType::default().create_mutable_vector(3);
let mut builder = BinaryType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::Binary("hello".as_bytes()));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -360,7 +360,7 @@ mod tests {
fn test_boolean_vector_builder() {
let input = BooleanVector::from_slice(&[true, false, true]);
let mut builder = BooleanType::default().create_mutable_vector(3);
let mut builder = BooleanType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::Boolean(true));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -68,7 +68,7 @@ mod tests {
fn test_date_vector_builder() {
let input = DateVector::from_slice([1, 2, 3]);
let mut builder = DateType::default().create_mutable_vector(3);
let mut builder = DateType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::Date(Date::new(5)));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -88,7 +88,7 @@ mod tests {
DateTime::new(3),
]);
let mut builder = DateTimeType::default().create_mutable_vector(3);
let mut builder = DateTimeType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::DateTime(DateTime::new(5)));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -58,7 +58,7 @@ impl From<NullArray> for NullVector {
impl Vector for NullVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::Null(NullType::default())
ConcreteDataType::Null(NullType)
}
fn vector_type_name(&self) -> String {
@@ -269,7 +269,7 @@ mod tests {
#[test]
fn test_null_vector_builder() {
let mut builder = NullType::default().create_mutable_vector(3);
let mut builder = NullType.create_mutable_vector(3);
builder.push_null();
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());

View File

@@ -407,18 +407,26 @@ impl CatalogManager for FrontendCatalogManager {
}
let key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self.table_metadata_manager
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataManagerSnafu)? else { return Ok(None) };
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
};
let table_id = table_name_value.table_id();
let Some(table_info_value) = self.table_metadata_manager
let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)? else { return Ok(None) };
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
};
let table_info = Arc::new(
table_info_value
.table_info

View File

@@ -254,7 +254,7 @@ pub(crate) fn column_schemas_to_defs(
column_schemas
.iter()
.zip(column_datatypes.into_iter())
.zip(column_datatypes)
.map(|(schema, datatype)| {
Ok(api::v1::ColumnDef {
name: schema.name.clone(),
@@ -341,7 +341,9 @@ mod tests {
.pop()
.unwrap();
let Statement::CreateTable(create_table) = stmt else { unreachable!() };
let Statement::CreateTable(create_table) = stmt else {
unreachable!()
};
let expr = create_to_expr(&create_table, QueryContext::arc()).unwrap();
assert_eq!("3days", expr.table_options.get("ttl").unwrap());
assert_eq!(

View File

@@ -44,7 +44,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
// TODO(weny): considers introducing a macro
let Some((meta, Instruction::InvalidateTableCache(table_ident))) = ctx.incoming_message.take() else {
let Some((meta, Instruction::InvalidateTableCache(table_ident))) =
ctx.incoming_message.take()
else {
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'");
};

View File

@@ -193,7 +193,7 @@ impl Instance {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
meta_backend,
partition_manager,

View File

@@ -300,7 +300,9 @@ impl DistInstance {
.iter()
.filter(|route| {
route.region_routes.iter().any(|r| {
let Some(n) = region_number else { return true; };
let Some(n) = region_number else {
return true;
};
n == r.region.id.region_number()
})
})

View File

@@ -67,7 +67,9 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
}
async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
let Output::Stream(stream) = output else { unreachable!() };
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream)
.await
.context(error::CollectRecordbatchSnafu)?;

View File

@@ -315,7 +315,7 @@ impl StatementExecutor {
let columns_values = fields
.iter()
.cloned()
.zip(vectors.into_iter())
.zip(vectors)
.collect::<HashMap<_, _>>();
pending.push(table.insert(InsertRequest {

View File

@@ -218,7 +218,9 @@ impl Table for DistTable {
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let Output::AffectedRows(rows) = output else { unreachable!() };
let Output::AffectedRows(rows) = output else {
unreachable!()
};
Ok(rows)
}
}

View File

@@ -30,7 +30,7 @@ impl DistTable {
let regions = requests.iter().map(|x| x.region_number).collect::<Vec<_>>();
let instances = self.find_datanode_instances(&regions).await?;
let results = future::try_join_all(instances.into_iter().zip(requests.into_iter()).map(
let results = future::try_join_all(instances.into_iter().zip(requests).map(
|(instance, request)| {
common_runtime::spawn_write(async move {
instance

View File

@@ -63,7 +63,9 @@ impl DatanodeInstance {
.logical_plan(substrait_plan.to_vec(), None)
.await
.context(error::RequestDatanodeSnafu)?;
let Output::RecordBatches(record_batches) = result else { unreachable!() };
let Output::RecordBatches(record_batches) = result else {
unreachable!()
};
Ok(record_batches)
}

View File

@@ -96,12 +96,12 @@ impl LogStore for NoopLogStore {
let _ = data;
let _ = id;
let _ = ns;
EntryImpl::default()
EntryImpl
}
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
let _ = id;
NamespaceImpl::default()
NamespaceImpl
}
async fn obsolete(
@@ -121,27 +121,21 @@ mod tests {
#[test]
fn test_mock_entry() {
let e = EntryImpl::default();
let e = EntryImpl;
assert_eq!(0, e.data().len());
assert_eq!(0, e.id());
}
#[tokio::test]
async fn test_noop_logstore() {
let store = NoopLogStore::default();
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
let store = NoopLogStore;
let e = store.entry("".as_bytes(), 1, NamespaceImpl);
let _ = store.append(e.clone()).await.unwrap();
assert!(store.append_batch(vec![e]).await.is_ok());
store
.create_namespace(&NamespaceImpl::default())
.await
.unwrap();
store.create_namespace(&NamespaceImpl).await.unwrap();
assert_eq!(0, store.list_namespaces().await.unwrap().len());
store
.delete_namespace(&NamespaceImpl::default())
.await
.unwrap();
assert_eq!(NamespaceImpl::default(), store.namespace(0));
store.obsolete(NamespaceImpl::default(), 1).await.unwrap();
store.delete_namespace(&NamespaceImpl).await.unwrap();
assert_eq!(NamespaceImpl, store.namespace(0));
store.obsolete(NamespaceImpl, 1).await.unwrap();
}
}

View File

@@ -481,11 +481,11 @@ mod tests {
#[tokio::test]
async fn test_handler_name() {
let group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler::default()).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(MailboxHandler::default()).await;
group.add_handler(ResponseHeaderHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(PersistStatsHandler::default()).await;
let handlers = group.handlers.read().await;

View File

@@ -73,7 +73,9 @@ impl HeartbeatHandler for RegionFailureHandler {
.await;
}
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
let Some(stat) = acc.stat.as_ref() else {
return Ok(());
};
let heartbeat = DatanodeHeartbeat {
region_idents: stat

View File

@@ -88,8 +88,12 @@ impl FailureDetectRunner {
}
async fn start_with(&mut self, failure_detectors: Arc<FailureDetectorContainer>) {
let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else { return };
let Some(mut control_rx) = self.control_rx.take() else { return };
let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else {
return;
};
let Some(mut control_rx) = self.control_rx.take() else {
return;
};
let container = failure_detectors.clone();
let receiver_handle = common_runtime::spawn_bg(async move {
@@ -218,9 +222,7 @@ impl FailureDetectorContainer {
&self,
ident: RegionIdent,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.0
.entry(ident)
.or_insert_with(PhiAccrualFailureDetector::default)
self.0.entry(ident).or_default()
}
pub(crate) fn iter(&self) -> Box<dyn Iterator<Item = FailureDetectorEntry> + '_> {

View File

@@ -38,7 +38,9 @@ impl HeartbeatHandler for KeepLeaseHandler {
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(peer) = &peer else { return Ok(()); };
let Some(peer) = &peer else {
return Ok(());
};
let key = LeaseKey {
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),

View File

@@ -82,13 +82,12 @@ impl HeartbeatHandler for PersistStatsHandler {
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let Some(current_stat) = acc.stat.take() else { return Ok(()) };
let Some(current_stat) = acc.stat.take() else {
return Ok(());
};
let key = current_stat.stat_key();
let mut entry = self
.stats_cache
.entry(key)
.or_insert_with(EpochStats::default);
let mut entry = self.stats_cache.entry(key).or_default();
let key: Vec<u8> = key.into();
let epoch_stats = entry.value_mut();

View File

@@ -43,7 +43,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
let Some(stat) = acc.stat.as_ref() else {
return Ok(());
};
let mut table_region_leases = HashMap::new();
stat.region_stats.iter().for_each(|region_stat| {
@@ -133,7 +135,6 @@ mod test {
let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let handler = RegionLeaseHandler::default();
let acc = &mut HeartbeatAccumulator::default();
let new_region_stat = |region_number: RegionNumber| -> RegionStat {
@@ -183,7 +184,7 @@ mod test {
.await
.unwrap();
handler.handle(&req, ctx, acc).await.unwrap();
RegionLeaseHandler.handle(&req, ctx, acc).await.unwrap();
assert_eq!(acc.region_leases.len(), 1);
let lease = acc.region_leases.remove(0);

View File

@@ -13,7 +13,6 @@
// limitations under the License.
#![feature(async_closure)]
#![feature(btree_drain_filter)]
#![feature(result_flattening)]
pub mod bootstrap;

View File

@@ -211,19 +211,19 @@ impl MetaSrvBuilder {
};
let group = HeartbeatHandlerGroup::new(pushers);
group.add_handler(ResponseHeaderHandler::default()).await;
group.add_handler(ResponseHeaderHandler).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(KeepLeaseHandler::default()).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(MailboxHandler::default()).await;
group.add_handler(KeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(MailboxHandler).await;
if let Some(region_failover_handler) = region_failover_handler {
group.add_handler(region_failover_handler).await;
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(RegionLeaseHandler).await;
group.add_handler(PersistStatsHandler::default()).await;
if let Some((publish, _)) = pubsub.as_ref() {
group

View File

@@ -223,7 +223,7 @@ impl AlterTableProcedure {
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: table_ref.table.clone(),
table_name: table_ref.table,
})?;
let mut new_info = table_info.clone();

View File

@@ -98,7 +98,8 @@ impl ActivateRegion {
return UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect open region reply",
}.fail();
}
.fail();
};
if result {
Ok(Box::new(UpdateRegionMetadata::new(self.candidate)))
@@ -114,7 +115,7 @@ impl ActivateRegion {
RetryLaterSnafu { reason }.fail()
}
}
Err(e) if matches!(e, Error::MailboxTimeout { .. }) => {
Err(Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for activate failed region {failed_region:?} on Datanode {:?}",
self.candidate,

View File

@@ -89,8 +89,9 @@ impl DeactivateRegion {
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region reply"
}.fail();
reason: "expect close region reply",
}
.fail();
};
if result {
InactiveNodeManager::new(&ctx.in_memory)
@@ -108,7 +109,7 @@ impl DeactivateRegion {
RetryLaterSnafu { reason }.fail()
}
}
Err(e) if matches!(e, Error::MailboxTimeout { .. }) => {
Err(Error::MailboxTimeout { .. }) => {
// Since we are in a region failover situation, the Datanode that the failed region
// resides might be unreachable. So we wait for the region lease to expire. The
// region would be closed by its own [RegionAliveKeeper].
@@ -140,7 +141,7 @@ impl State for DeactivateRegion {
.await;
let mailbox_receiver = match result {
Ok(mailbox_receiver) => mailbox_receiver,
Err(e) if matches!(e, Error::PusherNotFound { .. }) => {
Err(Error::PusherNotFound { .. }) => {
// See the mailbox received timeout situation comments above.
self.wait_for_region_lease_expiry().await;
return Ok(Box::new(ActivateRegion::new(self.candidate)));

View File

@@ -64,7 +64,7 @@ where
self.topic2sub
.get(topic)
.map(|list_ref| list_ref.clone())
.unwrap_or_else(Vec::new)
.unwrap_or_default()
}
}
@@ -88,7 +88,7 @@ where
let subscriber = Arc::new(subscriber);
for topic in topic_list {
let mut entry = self.topic2sub.entry(topic).or_insert_with(Vec::new);
let mut entry = self.topic2sub.entry(topic).or_default();
entry.push(subscriber.clone());
}

View File

@@ -145,7 +145,7 @@ where
.into_owned()
.collect()
})
.unwrap_or_else(HashMap::new);
.unwrap_or_default();
let path = req.uri().path().to_owned();
Box::pin(async move { router.call(&path, query_params).await })
}

View File

@@ -72,10 +72,7 @@ pub(crate) fn create_region_distribution(
violated: "region should have been set",
})?
.id as u32;
regions_id_map
.entry(node_id)
.or_insert_with(Vec::new)
.push(region_id);
regions_id_map.entry(node_id).or_default().push(region_id);
}
Ok(regions_id_map)
}

View File

@@ -61,7 +61,8 @@ pub(crate) async fn fetch_table(
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)? else {
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
};

View File

@@ -422,9 +422,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_dir = table_dir(catalog_name, schema_name, table_id);
let Some((manifest, table_info)) = self
.recover_table_manifest_and_info(table_name, &table_dir)
.await.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)? else { return Ok(None) };
.recover_table_manifest_and_info(table_name, &table_dir)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?
else {
return Ok(None);
};
let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options);
let opts = OpenOptions {
@@ -628,9 +632,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
self.object_store.clone(),
self.compress_type,
);
let Some(table_info) =
let Some(table_info) =
MitoTable::<<S as StorageEngine>::Region>::recover_table_info(table_name, &manifest)
.await? else { return Ok(None) };
.await?
else {
return Ok(None);
};
Ok(Some((manifest, table_info)))
}

View File

@@ -63,7 +63,7 @@ mod procedure_test_util {
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
Arc::new(NoopLogStore),
object_store.clone(),
compaction_scheduler,
)

View File

@@ -202,9 +202,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
/// Alter regions.
async fn alter_regions(&mut self) -> Result<()> {
let Some(alter_op) = &self.alter_op else {
// Don't need to alter the region.
return Ok(());
};
return Ok(());
};
let table_name = &self.data.request.table_name;
let table_version = self.data.table_version;

View File

@@ -62,7 +62,9 @@ impl<S: StorageEngine> Procedure for DropMitoTable<S> {
fn lock_key(&self) -> LockKey {
let table_ref = self.data.table_ref();
let Some(table) = &self.table else { return LockKey::default() };
let Some(table) = &self.table else {
return LockKey::default();
};
let info = table.table_info();
let keys = info
.meta

View File

@@ -78,7 +78,7 @@ async fn setup_table_with_column_default_constraint() -> (TempDir, String, Table
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
Arc::new(NoopLogStore),
object_store.clone(),
compaction_scheduler,
)

View File

@@ -166,7 +166,7 @@ pub async fn setup_test_engine_and_table() -> TestEngineComponents {
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
Arc::new(NoopLogStore),
object_store.clone(),
compaction_scheduler,
)

View File

@@ -40,9 +40,7 @@ impl<S> RegionWorkerLoop<S> {
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self.regions.get_region(region_id) else {
// No such region.
send_result(sender_req.sender, RegionNotFoundSnafu {
region_id,
}.fail());
send_result(sender_req.sender, RegionNotFoundSnafu { region_id }.fail());
continue;
};

View File

@@ -175,10 +175,7 @@ impl WriteSplitter {
return FindRegionSnafu { reason }.fail();
}
};
region_map
.entry(region_id)
.or_insert_with(Vec::default)
.push(idx);
region_map.entry(region_id).or_default().push(idx);
}
Ok(region_map)
}

View File

@@ -115,7 +115,11 @@ impl RangeManipulate {
// process time index column
// the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else {
return Err(datafusion::common::field_not_found(None::<TableReference>, time_index, input_schema.as_ref()))
return Err(datafusion::common::field_not_found(
None::<TableReference>,
time_index,
input_schema.as_ref(),
));
};
let ts_col_field = columns[ts_col_index].field();
let timestamp_range_field = Field::new(
@@ -128,7 +132,11 @@ impl RangeManipulate {
// process value columns
for name in field_columns {
let Some(index) = input_schema.index_of_column_by_name(None, name)? else {
return Err(datafusion::common::field_not_found(None::<TableReference>, name, input_schema.as_ref()))
return Err(datafusion::common::field_not_found(
None::<TableReference>,
name,
input_schema.as_ref(),
));
};
columns[index] = DFField::from(RangeArray::convert_field(columns[index].field()));
}

View File

@@ -604,7 +604,7 @@ impl PromPlanner {
let exprs = result_set
.into_iter()
.map(|col| DfExpr::Column(col.into()))
.chain(self.create_tag_column_exprs()?.into_iter())
.chain(self.create_tag_column_exprs()?)
.chain(Some(self.create_time_index_column_expr()?))
.collect::<Vec<_>>();
// reuse this variable for simplicity
@@ -1040,7 +1040,7 @@ impl PromPlanner {
exprs.push(expr);
}
utils::conjunction(exprs.into_iter()).context(ValueNotFoundSnafu {
utils::conjunction(exprs).context(ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap(),
})
}

View File

@@ -654,7 +654,9 @@ mod tests {
let columns = vec![builder.to_vector()];
let record_batch = RecordBatch::new(schema, columns).unwrap();
let output = execute_show_with_filter(record_batch, None).await.unwrap();
let Output::RecordBatches(record_batches) = output else {unreachable!()};
let Output::RecordBatches(record_batches) = output else {
unreachable!()
};
let expected = "\
+----------------+
| Tables |
@@ -682,12 +684,18 @@ mod tests {
)
.unwrap()[0]
.clone();
let Statement::ShowTables(ShowTables { kind, .. }) = statement else {unreachable!()};
let ShowKind::Where(filter) = kind else {unreachable!()};
let Statement::ShowTables(ShowTables { kind, .. }) = statement else {
unreachable!()
};
let ShowKind::Where(filter) = kind else {
unreachable!()
};
let output = execute_show_with_filter(record_batch, Some(filter))
.await
.unwrap();
let Output::RecordBatches(record_batches) = output else {unreachable!()};
let Output::RecordBatches(record_batches) = output else {
unreachable!()
};
let expected = "\
+---------+
| Tables |

View File

@@ -86,7 +86,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
.create_physical_plan(input_plan, session_state)
.await?;
let Some(table_name) = self.get_table_name(input_plan)? else {
// no relation found in input plan, going to execute them locally
// no relation found in input plan, going to execute them locally
return Ok(Some(input_physical_plan));
};

View File

@@ -44,10 +44,9 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let Output::Stream(stream) = engine
.execute(plan, QueryContext::arc())
.await
.unwrap() else { unreachable!() };
let Output::Stream(stream) = engine.execute(plan, QueryContext::arc()).await.unwrap() else {
unreachable!()
};
util::collect(stream).await.unwrap()
}

View File

@@ -57,7 +57,9 @@ where
function::get_numbers_from_table::<T>(column_name, table_name, engine.clone()).await;
let numbers = numbers.iter().map(|&n| n.as_()).collect::<Vec<f64>>();
let expected = numbers.iter().sum::<f64>() / (numbers.len() as f64);
let Value::Float64(OrderedFloat(value)) = value else { unreachable!() };
let Value::Float64(OrderedFloat(value)) = value else {
unreachable!()
};
assert!(
(value - expected).abs() < 1e-3,
"expected {expected}, actual {value}"

View File

@@ -654,7 +654,7 @@ pub(crate) mod greptime_builtin {
) -> PyResult<PyObjectRef> {
let percent =
expressions::Literal::new(datafusion_common::ScalarValue::Float64(Some(percent)));
return eval_aggr_fn(
eval_aggr_fn(
expressions::ApproxPercentileCont::new(
vec![
Arc::new(expressions::Column::new("expr0", 0)) as _,
@@ -666,7 +666,7 @@ pub(crate) mod greptime_builtin {
.map_err(|err| from_df_err(err, vm))?,
&[values.to_arrow_array()],
vm,
);
)
}
/// effectively equals to `list(vector)`
@@ -857,7 +857,7 @@ pub(crate) mod greptime_builtin {
// pyfunction can return PyResult<...>, args can be like PyObjectRef or anything
// impl IntoPyNativeFunc, see rustpython-vm function for more details
let args = vec![base.as_vector_ref(), arg_pow];
let res = PowFunction::default()
let res = PowFunction
.eval(FunctionContext::default(), &args)
.map_err(|err| {
vm.new_runtime_error(format!(

View File

@@ -14,8 +14,8 @@
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
use std::result::Result as StdResult;
use std::sync::Arc;
use common_recordbatch::RecordBatch;
use common_telemetry::{info, timer};
@@ -37,7 +37,7 @@ use crate::python::rspython::dataframe_impl::data_frame::set_dataframe_in_scope;
use crate::python::rspython::dataframe_impl::init_data_frame;
use crate::python::rspython::utils::{format_py_error, is_instance, py_obj_to_vec};
thread_local!(static INTERPRETER: RefCell<Option<Arc<Interpreter>>> = RefCell::new(None));
thread_local!(static INTERPRETER: RefCell<Option<Rc<Interpreter>>> = RefCell::new(None));
/// Using `RustPython` to run a parsed `Coprocessor` struct as input to execute python code
pub(crate) fn rspy_exec_parsed(
@@ -48,7 +48,7 @@ pub(crate) fn rspy_exec_parsed(
let _t = timer!(metric::METRIC_RSPY_EXEC_TOTAL_ELAPSED);
// 3. get args from `rb`, and cast them into PyVector
let args: Vec<PyVector> = if let Some(rb) = rb {
let arg_names = copr.deco_args.arg_names.clone().unwrap_or(vec![]);
let arg_names = copr.deco_args.arg_names.clone().unwrap_or_default();
let args = select_from_rb(rb, &arg_names)?;
check_args_anno_real_type(&arg_names, &args, copr, rb)?;
args
@@ -99,7 +99,7 @@ pub(crate) fn exec_with_cached_vm(
rb: &Option<RecordBatch>,
args: Vec<PyVector>,
params: &HashMap<String, String>,
vm: &Arc<Interpreter>,
vm: &Rc<Interpreter>,
) -> Result<RecordBatch> {
vm.enter(|vm| -> Result<RecordBatch> {
let _t = timer!(metric::METRIC_RSPY_EXEC_ELAPSED);
@@ -188,7 +188,7 @@ fn try_into_columns(
}
/// init interpreter with type PyVector and Module: greptime
pub(crate) fn init_interpreter() -> Arc<Interpreter> {
pub(crate) fn init_interpreter() -> Rc<Interpreter> {
let _t = timer!(metric::METRIC_RSPY_INIT_ELAPSED);
INTERPRETER.with(|i| {
i.borrow_mut()
@@ -202,7 +202,7 @@ pub(crate) fn init_interpreter() -> Arc<Interpreter> {
let mut settings = vm::Settings::default();
// disable SIG_INT handler so our own binary can take ctrl_c handler
settings.no_sig_int = true;
let interpreter = Arc::new(vm::Interpreter::with_init(settings, |vm| {
let interpreter = Rc::new(vm::Interpreter::with_init(settings, |vm| {
// not using full stdlib to prevent security issue, instead filter out a few simple util module
vm.add_native_modules(
rustpython_stdlib::get_module_inits()

View File

@@ -136,8 +136,8 @@ pub fn py_obj_to_vec(
match columnar_value {
DFColValue::Scalar(ScalarValue::List(scalars, _datatype)) => match scalars {
Some(scalars) => {
let array = ScalarValue::iter_to_array(scalars.into_iter())
.context(error::DataFusionSnafu)?;
let array =
ScalarValue::iter_to_array(scalars).context(error::DataFusionSnafu)?;
Helper::try_into_vector(array).context(error::TypeCastSnafu)
}

View File

@@ -15,7 +15,7 @@ api = { path = "../api" }
arrow-flight.workspace = true
async-trait = "0.1"
axum = { version = "0.6", features = ["headers"] }
axum-macros = "0.3"
axum-macros = "0.3.8"
base64 = "0.13"
bytes = "1.2"
catalog = { path = "../catalog" }
@@ -102,7 +102,6 @@ rand.workspace = true
rustls = { version = "0.21", features = ["dangerous_configuration"] }
script = { path = "../script", features = ["python"] }
serde_json = "1.0"
session = { path = "../session", features = ["testing"] }
table = { path = "../table" }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.10"

View File

@@ -111,7 +111,9 @@ impl GreptimeRequestHandler {
header: Option<&RequestHeader>,
query_ctx: &QueryContextRef,
) -> TonicResult<InternalResult<()>> {
let Some(user_provider) = self.user_provider.as_ref() else { return Ok(Ok(())) };
let Some(user_provider) = self.user_provider.as_ref() else {
return Ok(Ok(()));
};
let auth_scheme = header
.and_then(|header| {

Some files were not shown because too many files have changed in this diff Show More