chore: new format

This commit is contained in:
Ning Sun
2026-03-23 21:47:05 +08:00
parent 1d95a042e1
commit 422dc22c2a
26 changed files with 76 additions and 83 deletions

View File

@@ -148,7 +148,9 @@ impl CatalogManager for MemoryCatalogManager {
Ok(self
.catalogs
.read()
.unwrap().values().flat_map(|schema_entries| schema_entries.values())
.unwrap()
.values()
.flat_map(|schema_entries| schema_entries.values())
.flat_map(|tables| tables.values())
.find(|t| t.table_info().ident.table_id == table_id)
.map(|t| t.table_info()))

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::vec;
use std::{assert_matches, vec};
use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;

View File

@@ -794,15 +794,13 @@ impl Tokenizer {
is_quote_present = true;
break;
}
' '
if !is_quoted => {
break;
}
'(' | ')' | '+' | '-'
if !is_quoted => {
self.rewind_one();
break;
}
' ' if !is_quoted => {
break;
}
'(' | ')' | '+' | '-' if !is_quoted => {
self.rewind_one();
break;
}
'\\' => {
let Some(next) = self.consume_next(pattern) else {
return InvalidFuncArgsSnafu {

View File

@@ -380,9 +380,8 @@ impl PoisonStore for KvStateStore {
#[cfg(test)]
mod tests {
use std::assert_matches;
use std::env;
use std::sync::Arc;
use std::{assert_matches, env};
use common_procedure::store::state_store::KeyValue;
use common_telemetry::info;

View File

@@ -14,7 +14,6 @@
//! Common traits and structures for the procedure framework.
pub mod error;
pub mod event;
pub mod local;

View File

@@ -21,10 +21,12 @@ mod panic_hook;
pub mod tracing_context;
mod tracing_sampler;
pub use common_error;
pub use logging::{
LOG_RELOAD_HANDLE, TRACE_RELOAD_HANDLE, get_or_init_tracer, init_default_ut_logging,
init_global_logging,
};
pub use metric::dump_metrics;
pub use panic_hook::set_panic_hook;
pub use {common_error, tracing, tracing_subscriber};
pub use tracing;
pub use tracing_subscriber;

View File

@@ -183,9 +183,10 @@ impl UpgradeRegionsHandler {
.await
{
Ok(responses) => {
replies.extend(
Self::convert_responses_to_replies(responses, &catchup_regions),
);
replies.extend(Self::convert_responses_to_replies(
responses,
&catchup_regions,
));
}
Err(_) => {
replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {

View File

@@ -307,10 +307,11 @@ macro_rules! impl_extend_for_builder {
}};
}
pub(crate) use {
impl_extend_for_builder, impl_get_for_vector, impl_get_ref_for_vector,
impl_try_from_arrow_array_for_vector, impl_validity_for_vector,
};
pub(crate) use impl_extend_for_builder;
pub(crate) use impl_get_for_vector;
pub(crate) use impl_get_ref_for_vector;
pub(crate) use impl_try_from_arrow_array_for_vector;
pub(crate) use impl_validity_for_vector;
#[cfg(test)]
pub mod tests {

View File

@@ -408,7 +408,9 @@ impl Arrangement {
pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
// iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch
for (_ts, batch) in self.spine.range((Bound::Excluded(now), Bound::Unbounded)) {
let min_ts = batch.values().flat_map(|v| v.iter().map(|(_, ts, _)| *ts).min())
let min_ts = batch
.values()
.flat_map(|v| v.iter().map(|(_, ts, _)| *ts).min())
.min();
if min_ts.is_some() {

View File

@@ -61,9 +61,7 @@ impl RegionWalRange {
fn next_batch_size(&self) -> Option<u64> {
if self.current_entry_id < self.end_entry_id {
Some(
self.end_entry_id.saturating_sub(self.current_entry_id),
)
Some(self.end_entry_id.saturating_sub(self.current_entry_id))
} else {
None
}

View File

@@ -984,8 +984,7 @@ impl MySqlElection {
#[cfg(test)]
mod tests {
use std::assert_matches;
use std::env;
use std::{assert_matches, env};
use common_meta::maybe_skip_mysql_integration_test;
use common_telemetry::init_default_ut_logging;

View File

@@ -828,8 +828,7 @@ impl PgElection {
#[cfg(test)]
mod tests {
use std::assert_matches;
use std::env;
use std::{assert_matches, env};
use common_meta::maybe_skip_postgres_integration_test;

View File

@@ -18,11 +18,11 @@ use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Deserializer;
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
use crate::row_converter::{COLUMN_ID_ENCODE_SIZE, SortField};
/// Key value view of a mutation.
#[derive(Debug)]
@@ -360,7 +360,7 @@ mod tests {
use api::v1::{self, ColumnDataType, SemanticType};
use super::*;
use crate::test_util::{i64_value, TestRegionMetadataBuilder};
use crate::test_util::{TestRegionMetadataBuilder, i64_value};
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;

View File

@@ -614,9 +614,7 @@ async fn find_dynamic_options(
region_options: &crate::region::options::RegionOptions,
schema_metadata_manager: &SchemaMetadataManagerRef,
) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
if let (true, Some(ttl)) =
(region_options.compaction_override, region_options.ttl)
{
if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
debug!(
"Use region options directly for table {}: compaction={:?}, ttl={:?}",
table_id, region_options.compaction, region_options.ttl

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches;
use std::fs;
use std::sync::Arc;
use std::{assert_matches, fs};
use api::v1::Rows;
use common_function::utils::partition_expr_version;

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches;
use std::fs;
use std::sync::Arc;
use std::{assert_matches, fs};
use api::v1::Rows;
use common_error::ext::ErrorExt;

View File

@@ -14,10 +14,9 @@
//! Integration tests for staging state functionality.
use std::assert_matches;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use std::{assert_matches, fs};
use api::v1::Rows;
use common_error::ext::ErrorExt;

View File

@@ -1082,7 +1082,9 @@ impl IndexBuildScheduler {
/// Find the next task which has the highest priority to run.
fn find_next_task(&self) -> Option<IndexBuildTask> {
self.region_status.values().filter_map(|status| status.pending_tasks.peek())
self.region_status
.values()
.filter_map(|status| status.pending_tasks.peek())
.max()
.cloned()
}

View File

@@ -343,8 +343,7 @@ impl Inserter {
.convert(request)
.await?;
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
self.do_request(inserts, &table_infos, &ctx).await
}
@@ -360,8 +359,7 @@ impl Inserter {
.convert(insert, ctx, statement_executor)
.await?;
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
self.do_request(inserts, &table_infos, ctx).await
}

View File

@@ -30,8 +30,8 @@ use crate::error::{
};
use crate::etl::field::Fields;
use crate::etl::processor::{
FIELD_NAME, FIELDS_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, PATTERNS_NAME, Processor,
yaml_bool, yaml_new_field, yaml_new_fields, yaml_parse_string, yaml_parse_strings, yaml_string,
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
};
pub(crate) const PROCESSOR_DISSECT: &str = "dissect";

View File

@@ -1826,14 +1826,10 @@ impl PromPlanner {
.iter()
.map(|tag| DfExpr::Column(Column::from_name(tag))),
)
.chain(
self.ctx
.use_tsid
.then_some(DfExpr::Column(Column::new(
Some(table_ref.clone()),
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
))),
)
.chain(self.ctx.use_tsid.then_some(DfExpr::Column(Column::new(
Some(table_ref.clone()),
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
))))
.chain(Some(DfExpr::Alias(Alias {
expr: Box::new(DfExpr::Cast(Cast {
expr: Box::new(self.create_time_index_column_expr()?),

View File

@@ -216,9 +216,10 @@ fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to
let _ = visit_expressions_mut(expr, |e| {
match e {
sqlparser::ast::Expr::Identifier(ident)
if ident.value.eq_ignore_ascii_case(from_column) => {
ident.value = to_column.to_string();
}
if ident.value.eq_ignore_ascii_case(from_column) =>
{
ident.value = to_column.to_string();
}
sqlparser::ast::Expr::CompoundIdentifier(idents) => {
if let Some(last) = idents.last_mut()
&& last.value.eq_ignore_ascii_case(from_column)

View File

@@ -25,8 +25,7 @@ use arrow_flight::{
HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket,
};
use async_trait::async_trait;
use bytes;
use bytes::Bytes;
use bytes::{self, Bytes};
use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_memory_manager::MemoryGuard;

View File

@@ -170,13 +170,12 @@ pub fn collect_plan_metrics(plan: &Arc<dyn ExecutionPlan>, maps: &mut [&mut Hash
MetricValue::Gauge { name, gauge } => {
collect_into_maps(name, gauge.value() as u64, maps);
}
MetricValue::Time { name, time }
if name.starts_with(GREPTIME_EXEC_PREFIX) => {
// override
maps.iter_mut().for_each(|map| {
map.insert(name.to_string(), time.value() as u64);
});
}
MetricValue::Time { name, time } if name.starts_with(GREPTIME_EXEC_PREFIX) => {
// override
maps.iter_mut().for_each(|map| {
map.insert(name.to_string(), time.value() as u64);
});
}
_ => {}
});
}

View File

@@ -1119,15 +1119,17 @@ fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
match modifier {
Some(LabelModifier::Include(labels))
if !labels.labels.contains(&METRIC_NAME.to_string()) => {
metric_names.clear();
return;
}
if !labels.labels.contains(&METRIC_NAME.to_string()) =>
{
metric_names.clear();
return;
}
Some(LabelModifier::Exclude(labels))
if labels.labels.contains(&METRIC_NAME.to_string()) => {
metric_names.clear();
return;
}
if labels.labels.contains(&METRIC_NAME.to_string()) =>
{
metric_names.clear();
return;
}
_ => {}
}
collect_metric_names(expr, metric_names)

View File

@@ -205,9 +205,10 @@ impl PrometheusJsonResponse {
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
match column.data_type {
ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_))
if timestamp_column_index.is_none() => {
timestamp_column_index = Some(i);
}
if timestamp_column_index.is_none() =>
{
timestamp_column_index = Some(i);
}
// Treat all value types as field
ConcreteDataType::Float32(_)
| ConcreteDataType::Float64(_)
@@ -219,9 +220,10 @@ impl PrometheusJsonResponse {
| ConcreteDataType::UInt16(_)
| ConcreteDataType::UInt32(_)
| ConcreteDataType::UInt64(_)
if first_field_column_index.is_none() => {
first_field_column_index = Some(i);
}
if first_field_column_index.is_none() =>
{
first_field_column_index = Some(i);
}
ConcreteDataType::String(_) => {
tag_column_indices.push(i);
num_label_columns += 1;