chore: upgrade toolchain to nightly-2024-08-07 (#4549)

* chore: upgrade toolchain to `nightly-2024-08-07`

* chore(ci): upgrade toolchain

* fix: fix unit test
This commit is contained in:
Weny Xu
2024-08-22 19:02:18 +08:00
committed by GitHub
parent 3517c13192
commit 25cd61b310
44 changed files with 90 additions and 108 deletions

View File

@@ -13,7 +13,7 @@ on:
name: Build API docs name: Build API docs
env: env:
RUST_TOOLCHAIN: nightly-2024-04-20 RUST_TOOLCHAIN: nightly-2024-08-07
jobs: jobs:
apidoc: apidoc:

View File

@@ -30,7 +30,7 @@ concurrency:
cancel-in-progress: true cancel-in-progress: true
env: env:
RUST_TOOLCHAIN: nightly-2024-04-20 RUST_TOOLCHAIN: nightly-2024-08-07
jobs: jobs:
check-typos-and-docs: check-typos-and-docs:

View File

@@ -10,7 +10,7 @@ concurrency:
cancel-in-progress: true cancel-in-progress: true
env: env:
RUST_TOOLCHAIN: nightly-2024-04-20 RUST_TOOLCHAIN: nightly-2024-08-07
permissions: permissions:
issues: write issues: write

View File

@@ -82,7 +82,7 @@ on:
# Use env variables to control all the release process. # Use env variables to control all the release process.
env: env:
# The arguments of building greptime. # The arguments of building greptime.
RUST_TOOLCHAIN: nightly-2024-04-20 RUST_TOOLCHAIN: nightly-2024-08-07
CARGO_PROFILE: nightly CARGO_PROFILE: nightly
# Controls whether to run tests, include unit-test, integration-test and sqlness. # Controls whether to run tests, include unit-test, integration-test and sqlness.

4
Cargo.lock generated
View File

@@ -4568,9 +4568,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]] [[package]]
name = "human-panic" name = "human-panic"
version = "1.2.3" version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f016c89920bbb30951a8405ecacbb4540db5524313b9445736e7e1855cf370" checksum = "1c5a08ed290eac04006e21e63d32e90086b6182c7cd0452d10f4264def1fec9a"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",

View File

@@ -77,6 +77,7 @@ clippy.readonly_write_lock = "allow"
rust.unknown_lints = "deny" rust.unknown_lints = "deny"
# Remove this after https://github.com/PyO3/pyo3/issues/4094 # Remove this after https://github.com/PyO3/pyo3/issues/4094
rust.non_local_definitions = "allow" rust.non_local_definitions = "allow"
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[workspace.dependencies] [workspace.dependencies]
# We turn off default-features for some dependencies here so the workspaces which inherit them can # We turn off default-features for some dependencies here so the workspaces which inherit them can

View File

@@ -1,2 +1,3 @@
[toolchain] [toolchain]
channel = "nightly-2024-04-20" channel = "nightly-2024-08-07"

View File

@@ -91,7 +91,7 @@ impl Database {
/// ///
/// - the name of database when using GreptimeDB standalone or cluster /// - the name of database when using GreptimeDB standalone or cluster
/// - the name provided by GreptimeCloud or other multi-tenant GreptimeDB /// - the name provided by GreptimeCloud or other multi-tenant GreptimeDB
/// environment /// environment
pub fn new_with_dbname(dbname: impl Into<String>, client: Client) -> Self { pub fn new_with_dbname(dbname: impl Into<String>, client: Client) -> Self {
Self { Self {
catalog: String::default(), catalog: String::default(),

View File

@@ -51,7 +51,7 @@ file-engine.workspace = true
flow.workspace = true flow.workspace = true
frontend = { workspace = true, default-features = false } frontend = { workspace = true, default-features = false }
futures.workspace = true futures.workspace = true
human-panic = "1.2.2" human-panic = "2.0"
lazy_static.workspace = true lazy_static.workspace = true
meta-client.workspace = true meta-client.workspace = true
meta-srv.workspace = true meta-srv.workspace = true

View File

@@ -139,13 +139,10 @@ async fn start(cli: Command) -> Result<()> {
} }
fn setup_human_panic() { fn setup_human_panic() {
let metadata = human_panic::Metadata { human_panic::setup_panic!(
version: env!("CARGO_PKG_VERSION").into(), human_panic::Metadata::new("GreptimeDB", env!("CARGO_PKG_VERSION"))
name: "GreptimeDB".into(), .homepage("https://github.com/GreptimeTeam/greptimedb/discussions")
authors: Default::default(), );
homepage: "https://github.com/GreptimeTeam/greptimedb/discussions".into(),
};
human_panic::setup_panic!(metadata);
common_telemetry::set_panic_hook(); common_telemetry::set_panic_hook();
} }

View File

@@ -48,19 +48,19 @@ pub fn build_db_string(catalog: &str, schema: &str) -> String {
/// The database name may come from different sources: /// The database name may come from different sources:
/// ///
/// - MySQL `schema` name in MySQL protocol login request: it's optional and user /// - MySQL `schema` name in MySQL protocol login request: it's optional and user
/// and switch database using `USE` command /// and switch database using `USE` command
/// - Postgres `database` parameter in Postgres wire protocol, required /// - Postgres `database` parameter in Postgres wire protocol, required
/// - HTTP RESTful API: the database parameter, optional /// - HTTP RESTful API: the database parameter, optional
/// - gRPC: the dbname field in header, optional but has a higher priority than /// - gRPC: the dbname field in header, optional but has a higher priority than
/// original catalog/schema /// original catalog/schema
/// ///
/// When database name is provided, we attempt to parse catalog and schema from /// When database name is provided, we attempt to parse catalog and schema from
/// it. We assume the format `[<catalog>-]<schema>`: /// it. We assume the format `[<catalog>-]<schema>`:
/// ///
/// - If `[<catalog>-]` part is not provided, we use whole database name as /// - If `[<catalog>-]` part is not provided, we use whole database name as
/// schema name /// schema name
/// - if `[<catalog>-]` is provided, we split database name with `-` and use /// - if `[<catalog>-]` is provided, we split database name with `-` and use
/// `<catalog>` and `<schema>`. /// `<catalog>` and `<schema>`.
pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (String, String) { pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (String, String) {
match parse_optional_catalog_and_schema_from_db_string(db) { match parse_optional_catalog_and_schema_from_db_string(db) {
(Some(catalog), schema) => (catalog, schema), (Some(catalog), schema) => (catalog, schema),

View File

@@ -32,7 +32,7 @@ pub struct FunctionContext {
impl FunctionContext { impl FunctionContext {
/// Create a mock [`FunctionContext`] for test. /// Create a mock [`FunctionContext`] for test.
#[cfg(any(test, feature = "testing"))] #[cfg(test)]
pub fn mock() -> Self { pub fn mock() -> Self {
Self { Self {
query_ctx: QueryContextBuilder::default().build().into(), query_ctx: QueryContextBuilder::default().build().into(),

View File

@@ -75,7 +75,7 @@ where
// to keep the not_greater length == floor+1 // to keep the not_greater length == floor+1
// so to ensure the peek of the not_greater is array[floor] // so to ensure the peek of the not_greater is array[floor]
// and the peek of the greater is array[floor+1] // and the peek of the greater is array[floor+1]
let p = if let Some(p) = self.p { p } else { 0.0_f64 }; let p = self.p.unwrap_or(0.0_f64);
let floor = (((self.n - 1) as f64) * p / (100_f64)).floor(); let floor = (((self.n - 1) as f64) * p / (100_f64)).floor();
if value <= *self.not_greater.peek().unwrap() { if value <= *self.not_greater.peek().unwrap() {
self.not_greater.push(value); self.not_greater.push(value);

View File

@@ -245,7 +245,7 @@ mod test {
]; ];
scipy_stats_norm_pdf.update_batch(&v).unwrap(); scipy_stats_norm_pdf.update_batch(&v).unwrap();
assert_eq!( assert_eq!(
Value::from(0.17843340219081558), Value::from(0.17843340219081552),
scipy_stats_norm_pdf.evaluate().unwrap() scipy_stats_norm_pdf.evaluate().unwrap()
); );

View File

@@ -28,7 +28,7 @@ pub struct FunctionState {
impl FunctionState { impl FunctionState {
/// Create a mock [`FunctionState`] for test. /// Create a mock [`FunctionState`] for test.
#[cfg(any(test, feature = "testing"))] #[cfg(test)]
pub fn mock() -> Self { pub fn mock() -> Self {
use std::sync::Arc; use std::sync::Arc;

View File

@@ -76,6 +76,7 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream {
/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` or `FlowServiceHandlerRef` as the first argument, /// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` or `FlowServiceHandlerRef` as the first argument,
/// - `&QueryContextRef` as the second argument, and /// - `&QueryContextRef` as the second argument, and
/// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row. /// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row.
///
/// Return type must be `common_query::error::Result<Value>`. /// Return type must be `common_query::error::Result<Value>`.
/// ///
/// # Example see `common/function/src/system/procedure_state.rs`. /// # Example see `common/function/src/system/procedure_state.rs`.

View File

@@ -172,8 +172,8 @@ impl From<TableLock> for StringKey {
/// ///
/// Note: /// Note:
/// - Allows modification the corresponding region's [TableRouteValue](crate::key::table_route::TableRouteValue), /// - Allows modification the corresponding region's [TableRouteValue](crate::key::table_route::TableRouteValue),
/// [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue) even if /// [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue) even if
/// it acquires the [RegionLock::Write] only without acquiring the [TableLock::Write]. /// it acquires the [RegionLock::Write] only without acquiring the [TableLock::Write].
/// ///
/// - Should acquire [TableLock] of the table at same procedure. /// - Should acquire [TableLock] of the table at same procedure.
/// ///

View File

@@ -51,7 +51,7 @@ const META_TTL: Duration = Duration::from_secs(60 * 10);
/// [Notify] is not a condition variable, we can't guarantee the waiters are notified /// [Notify] is not a condition variable, we can't guarantee the waiters are notified
/// if they didn't call `notified()` before we signal the notify. So we /// if they didn't call `notified()` before we signal the notify. So we
/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting /// 1. use dedicated notify for each condition, such as waiting for a lock, waiting
/// for children; /// for children;
/// 2. always use `notify_one` and ensure there are only one waiter. /// 2. always use `notify_one` and ensure there are only one waiter.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct ProcedureMeta { pub(crate) struct ProcedureMeta {

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#![feature(lazy_cell)]
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Command;
use std::sync::LazyLock; use std::sync::LazyLock;

View File

@@ -17,7 +17,6 @@
//! It also contains definition of expression, adapter and plan, and internal state management. //! It also contains definition of expression, adapter and plan, and internal state management.
#![feature(let_chains)] #![feature(let_chains)]
#![feature(duration_abs_diff)]
#![allow(dead_code)] #![allow(dead_code)]
#![warn(clippy::missing_docs_in_private_items)] #![warn(clippy::missing_docs_in_private_items)]
#![warn(clippy::too_many_lines)] #![warn(clippy::too_many_lines)]

View File

@@ -40,8 +40,9 @@ pub(crate) const ESTIMATED_META_SIZE: usize = 256;
/// - If the entry is able to fit into a Kafka record, it's converted into a Full record. /// - If the entry is able to fit into a Kafka record, it's converted into a Full record.
/// ///
/// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records. /// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records.
///
/// Those records must contain exactly one First record and one Last record, and potentially several /// Those records must contain exactly one First record and one Last record, and potentially several
/// Middle records. There may be no Middle record. /// Middle records. There may be no Middle record.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum RecordType { pub enum RecordType {
/// The record is self-contained, i.e. an entry's data is fully stored into this record. /// The record is self-contained, i.e. an entry's data is fully stored into this record.

View File

@@ -23,6 +23,7 @@ use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::provider::Provider; use store_api::logstore::provider::Provider;
use store_api::storage::RegionId; use store_api::storage::RegionId;
#[allow(renamed_and_removed_lints)]
pub mod protos { pub mod protos {
include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs"))); include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs")));
} }

View File

@@ -15,7 +15,6 @@
#![feature(async_closure)] #![feature(async_closure)]
#![feature(result_flattening)] #![feature(result_flattening)]
#![feature(assert_matches)] #![feature(assert_matches)]
#![feature(option_take_if)]
#![feature(extract_if)] #![feature(extract_if)]
pub mod bootstrap; pub mod bootstrap;

View File

@@ -33,8 +33,8 @@ impl UpdateMetadata {
/// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue): /// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue):
/// ///
/// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure) /// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure)
/// that is executed concurrently for **other region**. /// that is executed concurrently for **other region**.
/// It will only update **other region** info. Therefore, It's safe to retry after failure. /// It will only update **other region** info. Therefore, It's safe to retry after failure.
/// ///
/// - There is no other DDL procedure executed concurrently for the current table. /// - There is no other DDL procedure executed concurrently for the current table.
pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {

View File

@@ -122,7 +122,7 @@ impl UpdateMetadata {
/// ///
/// Abort(non-retry): /// Abort(non-retry):
/// - TableRoute or RegionRoute is not found. /// - TableRoute or RegionRoute is not found.
/// Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table. /// Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table.
/// ///
/// Retry: /// Retry:
/// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).

View File

@@ -27,6 +27,7 @@ pub trait WeightedChoose<Item>: Send + Sync {
/// Note: /// Note:
/// 1. make sure weight_array is not empty. /// 1. make sure weight_array is not empty.
/// 2. the total weight is greater than 0. /// 2. the total weight is greater than 0.
///
/// Otherwise an error will be returned. /// Otherwise an error will be returned.
fn set_weight_array(&mut self, weight_array: Vec<WeightedItem<Item>>) -> Result<()>; fn set_weight_array(&mut self, weight_array: Vec<WeightedItem<Item>>) -> Result<()>;

View File

@@ -61,9 +61,9 @@ impl CheckLeader for RwLock<State> {
/// To use this cache, the following constraints must be followed: /// To use this cache, the following constraints must be followed:
/// 1. The leader node can create this metadata. /// 1. The leader node can create this metadata.
/// 2. The follower node can create this metadata. The leader node can lazily retrieve /// 2. The follower node can create this metadata. The leader node can lazily retrieve
/// the corresponding data through the caching loading mechanism. /// the corresponding data through the caching loading mechanism.
/// 3. Only the leader node can update this metadata, as the cache cannot detect /// 3. Only the leader node can update this metadata, as the cache cannot detect
/// modifications made to the data on the follower node. /// modifications made to the data on the follower node.
/// 4. Only the leader node can delete this metadata for the same reason mentioned above. /// 4. Only the leader node can delete this metadata for the same reason mentioned above.
pub struct LeaderCachedKvBackend { pub struct LeaderCachedKvBackend {
check_leader: CheckLeaderRef, check_leader: CheckLeaderRef,

View File

@@ -31,7 +31,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// ///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches. /// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can /// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique). /// ignore op type as sequence is already unique).
/// 2. Batches from sources **must** not be empty. /// 2. Batches from sources **must** not be empty.
/// ///
/// The reader won't concatenate batches. Each batch returned by the reader also doesn't /// The reader won't concatenate batches. Each batch returned by the reader also doesn't

View File

@@ -960,6 +960,7 @@ pub fn build_rows(start: usize, end: usize) -> Vec<Row> {
/// - `key`: A string key that is common across all rows. /// - `key`: A string key that is common across all rows.
/// - `timestamps`: Array of timestamp values. /// - `timestamps`: Array of timestamp values.
/// - `fields`: Array of tuples where each tuple contains two optional i64 values, representing two optional float fields. /// - `fields`: Array of tuples where each tuple contains two optional i64 values, representing two optional float fields.
///
/// Returns a vector of `Row` each containing the key, two optional float fields, and a timestamp. /// Returns a vector of `Row` each containing the key, two optional float fields, and a timestamp.
pub fn build_rows_with_fields( pub fn build_rows_with_fields(
key: &str, key: &str,

View File

@@ -159,9 +159,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let create_res = self.inner.create_dir(path, args).await; let create_res = self.inner.create_dir(path, args).await;
timer.observe_duration(); timer.observe_duration();
create_res.map_err(|e| { create_res.inspect_err(|e| {
increment_errors_total(Operation::CreateDir, e.kind()); increment_errors_total(Operation::CreateDir, e.kind());
e
}) })
} }
@@ -175,9 +174,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.start_timer(); .start_timer();
let (rp, r) = self.inner.read(path, args).await.map_err(|e| { let (rp, r) = self.inner.read(path, args).await.inspect_err(|e| {
increment_errors_total(Operation::Read, e.kind()); increment_errors_total(Operation::Read, e.kind());
e
})?; })?;
Ok(( Ok((
@@ -205,9 +203,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.start_timer(); .start_timer();
let (rp, r) = self.inner.write(path, args).await.map_err(|e| { let (rp, r) = self.inner.write(path, args).await.inspect_err(|e| {
increment_errors_total(Operation::Write, e.kind()); increment_errors_total(Operation::Write, e.kind());
e
})?; })?;
Ok(( Ok((
@@ -236,9 +233,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let stat_res = self.inner.stat(path, args).await; let stat_res = self.inner.stat(path, args).await;
timer.observe_duration(); timer.observe_duration();
stat_res.map_err(|e| { stat_res.inspect_err(|e| {
increment_errors_total(Operation::Stat, e.kind()); increment_errors_total(Operation::Stat, e.kind());
e
}) })
} }
@@ -254,9 +250,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let delete_res = self.inner.delete(path, args).await; let delete_res = self.inner.delete(path, args).await;
timer.observe_duration(); timer.observe_duration();
delete_res.map_err(|e| { delete_res.inspect_err(|e| {
increment_errors_total(Operation::Delete, e.kind()); increment_errors_total(Operation::Delete, e.kind());
e
}) })
} }
@@ -273,9 +268,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let list_res = self.inner.list(path, args).await; let list_res = self.inner.list(path, args).await;
timer.observe_duration(); timer.observe_duration();
list_res.map_err(|e| { list_res.inspect_err(|e| {
increment_errors_total(Operation::List, e.kind()); increment_errors_total(Operation::List, e.kind());
e
}) })
} }
@@ -290,9 +284,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let result = self.inner.batch(args).await; let result = self.inner.batch(args).await;
timer.observe_duration(); timer.observe_duration();
result.map_err(|e| { result.inspect_err(|e| {
increment_errors_total(Operation::Batch, e.kind()); increment_errors_total(Operation::Batch, e.kind());
e
}) })
} }
@@ -308,9 +301,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let result = self.inner.presign(path, args).await; let result = self.inner.presign(path, args).await;
timer.observe_duration(); timer.observe_duration();
result.map_err(|e| { result.inspect_err(|e| {
increment_errors_total(Operation::Presign, e.kind()); increment_errors_total(Operation::Presign, e.kind());
e
}) })
} }
@@ -335,9 +327,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
timer.observe_duration(); timer.observe_duration();
result.map_err(|e| { result.inspect_err(|e| {
increment_errors_total(Operation::BlockingCreateDir, e.kind()); increment_errors_total(Operation::BlockingCreateDir, e.kind());
e
}) })
} }
@@ -376,9 +367,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
), ),
) )
}) })
.map_err(|e| { .inspect_err(|e| {
increment_errors_total(Operation::BlockingRead, e.kind()); increment_errors_total(Operation::BlockingRead, e.kind());
e
}) })
} }
@@ -417,9 +407,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
), ),
) )
}) })
.map_err(|e| { .inspect_err(|e| {
increment_errors_total(Operation::BlockingWrite, e.kind()); increment_errors_total(Operation::BlockingWrite, e.kind());
e
}) })
} }
@@ -442,9 +431,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.start_timer(); .start_timer();
let result = self.inner.blocking_stat(path, args); let result = self.inner.blocking_stat(path, args);
timer.observe_duration(); timer.observe_duration();
result.map_err(|e| { result.inspect_err(|e| {
increment_errors_total(Operation::BlockingStat, e.kind()); increment_errors_total(Operation::BlockingStat, e.kind());
e
}) })
} }
@@ -468,9 +456,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let result = self.inner.blocking_delete(path, args); let result = self.inner.blocking_delete(path, args);
timer.observe_duration(); timer.observe_duration();
result.map_err(|e| { result.inspect_err(|e| {
increment_errors_total(Operation::BlockingDelete, e.kind()); increment_errors_total(Operation::BlockingDelete, e.kind());
e
}) })
} }
@@ -494,9 +481,8 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
let result = self.inner.blocking_list(path, args); let result = self.inner.blocking_list(path, args);
timer.observe_duration(); timer.observe_duration();
result.map_err(|e| { result.inspect_err(|e| {
increment_errors_total(Operation::BlockingList, e.kind()); increment_errors_total(Operation::BlockingList, e.kind());
e
}) })
} }
} }
@@ -535,18 +521,16 @@ impl<R> PrometheusMetricWrapper<R> {
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> { impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self) -> Result<Buffer> { async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await.map_err(|err| { self.inner.read().await.inspect_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err
}) })
} }
} }
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read(&mut self) -> opendal::Result<Buffer> { fn read(&mut self) -> opendal::Result<Buffer> {
self.inner.read().map_err(|err| { self.inner.read().inspect_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err
}) })
} }
} }
@@ -567,16 +551,14 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
} }
async fn close(&mut self) -> Result<()> { async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| { self.inner.close().await.inspect_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err
}) })
} }
async fn abort(&mut self) -> Result<()> { async fn abort(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| { self.inner.close().await.inspect_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err
}) })
} }
} }
@@ -589,16 +571,14 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
.map(|_| { .map(|_| {
self.bytes += bytes as u64; self.bytes += bytes as u64;
}) })
.map_err(|err| { .inspect_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err
}) })
} }
fn close(&mut self) -> Result<()> { fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| { self.inner.close().inspect_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err
}) })
} }
} }

View File

@@ -47,7 +47,7 @@ use crate::extension_plan::Millisecond;
/// Empty source plan that generate record batch with two columns: /// Empty source plan that generate record batch with two columns:
/// - time index column, computed from start, end and interval /// - time index column, computed from start, end and interval
/// - value column, generated by the input expr. The expr should not /// - value column, generated by the input expr. The expr should not
/// reference any column except the time index column. /// reference any column except the time index column.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EmptyMetric { pub struct EmptyMetric {
start: Millisecond, start: Millisecond,

View File

@@ -32,11 +32,12 @@ use crate::range_array::RangeArray;
/// There are 3 variants of smoothing functions: /// There are 3 variants of smoothing functions:
/// 1) "Simple exponential smoothing": only the `level` component (the weighted average of the observations) is used to make forecasts. /// 1) "Simple exponential smoothing": only the `level` component (the weighted average of the observations) is used to make forecasts.
/// This method is applied for time-series data that does not exhibit trend or seasonality. /// This method is applied for time-series data that does not exhibit trend or seasonality.
/// 2) "Holt's linear method" (a.k.a. "double exponential smoothing"): `level` and `trend` components are used to make forecasts. /// 2) "Holt's linear method" (a.k.a. "double exponential smoothing"): `level` and `trend` components are used to make forecasts.
/// This method is applied for time-series data that exhibits trend but not seasonality. /// This method is applied for time-series data that exhibits trend but not seasonality.
/// 3) "Holt-Winter's method" (a.k.a. "triple exponential smoothing"): `level`, `trend`, and `seasonality` are used to make forecasts. /// 3) "Holt-Winter's method" (a.k.a. "triple exponential smoothing"): `level`, `trend`, and `seasonality` are used to make forecasts.
/// This method is applied for time-series data that exhibits both trend and seasonality. ///
/// This method is applied for time-series data that exhibits both trend and seasonality.
/// ///
/// In order to keep the parity with the Prometheus functions we had to follow the same naming ("HoltWinters"), however /// In order to keep the parity with the Prometheus functions we had to follow the same naming ("HoltWinters"), however
/// the "Holt's linear"("double exponential smoothing") suits better and reflects implementation. /// the "Holt's linear"("double exponential smoothing") suits better and reflects implementation.

View File

@@ -34,7 +34,7 @@
//! - bit 0 (lowest bit): whether `FooterPayload` is compressed //! - bit 0 (lowest bit): whether `FooterPayload` is compressed
//! - all other bits are reserved for future use and should be set to 0 on write //! - all other bits are reserved for future use and should be set to 0 on write
//! * all other bytes are reserved for future use and should be set to 0 on write //! * all other bytes are reserved for future use and should be set to 0 on write
//! A 4 byte integer is always signed, in a twos complement representation, stored little-endian. //! A 4 byte integer is always signed, in a twos complement representation, stored little-endian.
//! //!
//! ## Footer Payload //! ## Footer Payload
//! //!

View File

@@ -666,6 +666,7 @@ impl PromPlanner {
/// Name rule: /// Name rule:
/// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher. /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
/// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers. /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
#[allow(clippy::mutable_key_type)]
fn preprocess_label_matchers( fn preprocess_label_matchers(
&mut self, &mut self,
label_matchers: &Matchers, label_matchers: &Matchers,

View File

@@ -70,7 +70,7 @@ impl QueryEngineContext {
} }
/// Mock an engine context for unit tests. /// Mock an engine context for unit tests.
#[cfg(any(test, feature = "test"))] #[cfg(test)]
pub fn mock() -> Self { pub fn mock() -> Self {
use common_base::Plugins; use common_base::Plugins;
use session::context::QueryContext; use session::context::QueryContext;

View File

@@ -1018,9 +1018,9 @@ pub fn file_column_schemas_to_table(
/// ///
/// More specifically, for each column seen in the table schema, /// More specifically, for each column seen in the table schema,
/// - If the same column does exist in the file schema, it checks if the data /// - If the same column does exist in the file schema, it checks if the data
/// type of the file column can be casted into the form of the table column. /// type of the file column can be casted into the form of the table column.
/// - If the same column does not exist in the file schema, it checks if the /// - If the same column does not exist in the file schema, it checks if the
/// table column is nullable or has a default constraint. /// table column is nullable or has a default constraint.
pub fn check_file_to_table_schema_compatibility( pub fn check_file_to_table_schema_compatibility(
file_column_schemas: &[ColumnSchema], file_column_schemas: &[ColumnSchema],
table_column_schemas: &[ColumnSchema], table_column_schemas: &[ColumnSchema],

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::any::Any;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@@ -31,8 +32,9 @@ use tokio::runtime::Runtime;
static SCRIPT_ENGINE: Lazy<PyEngine> = Lazy::new(sample_script_engine); static SCRIPT_ENGINE: Lazy<PyEngine> = Lazy::new(sample_script_engine);
static LOCAL_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new(); static LOCAL_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
fn get_local_runtime() -> std::thread::Result<&'static Runtime> { fn get_local_runtime() -> std::thread::Result<&'static Runtime> {
let rt = LOCAL_RUNTIME let rt = LOCAL_RUNTIME.get_or_try_init(|| {
.get_or_try_init(|| tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as _))?; tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as Box<dyn Any + Send + 'static>)
})?;
Ok(rt) Ok(rt)
} }
/// a terrible hack to call async from sync by: /// a terrible hack to call async from sync by:

View File

@@ -175,7 +175,7 @@ coprocessor = copr
/// constants will be broadcast to length of `col_len` /// constants will be broadcast to length of `col_len`
/// accept and convert if obj is of two types: /// accept and convert if obj is of two types:
/// 1. tuples of PyVector/PyList of literals/single literal of same type /// 1. tuples of PyVector/PyList of literals/single literal of same type
/// or a mixed tuple of PyVector and PyList of same type Literals /// or a mixed tuple of PyVector and PyList of same type Literals
/// 2. a single PyVector /// 2. a single PyVector
/// 3. a PyList of same type Literals /// 3. a PyList of same type Literals
/// 4. a single constant, will be expanded to a PyVector of length of `col_len` /// 4. a single constant, will be expanded to a PyVector of length of `col_len`

View File

@@ -110,9 +110,8 @@ impl GreptimeRequestHandler {
.spawn(result_future) .spawn(result_future)
.await .await
.context(JoinTaskSnafu) .context(JoinTaskSnafu)
.map_err(|e| { .inspect_err(|e| {
timer.record(e.status_code()); timer.record(e.status_code());
e
})? })?
} }
None => result_future.await, None => result_future.await,
@@ -160,11 +159,10 @@ pub(crate) async fn auth(
name: "Token AuthScheme".to_string(), name: "Token AuthScheme".to_string(),
}), }),
} }
.map_err(|e| { .inspect_err(|e| {
METRIC_AUTH_FAILURE METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()]) .with_label_values(&[e.status_code().as_ref()])
.inc(); .inc();
e
}) })
} }

View File

@@ -42,11 +42,10 @@ pub(crate) static JEMALLOC_COLLECTOR: Lazy<Option<JemallocCollector>> = Lazy::ne
e e
}) })
.ok(); .ok();
collector.map(|c| { collector.inspect(|c| {
if let Err(e) = c.update() { if let Err(e) = c.update() {
error!(e; "Failed to update jemalloc metrics"); error!(e; "Failed to update jemalloc metrics");
}; };
c
}) })
}); });

View File

@@ -196,7 +196,7 @@ fn select_variable(query: &str, query_context: QueryContextRef) -> Option<Output
// @@aa // @@aa
// field is '@@aa' // field is '@@aa'
fields.push(ColumnSchema::new( fields.push(ColumnSchema::new(
&format!("@@{}", var_as[0]), format!("@@{}", var_as[0]),
ConcreteDataType::string_datatype(), ConcreteDataType::string_datatype(),
true, true,
)); ));

View File

@@ -29,10 +29,10 @@ const APPROXIMATE_COLUMN_COUNT: usize = 8;
/// ///
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax> /// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
/// - since the name are case-insensitive, we transform them to lowercase for /// - since the name are case-insensitive, we transform them to lowercase for
/// better sql usability /// better sql usability
/// - replace `.` and `-` with `_` /// - replace `.` and `-` with `_`
fn normalize_otlp_name(name: &str) -> String { fn normalize_otlp_name(name: &str) -> String {
name.to_lowercase().replace(|c| c == '.' || c == '-', "_") name.to_lowercase().replace(['.', '-'], "_")
} }
/// Convert OpenTelemetry metrics to GreptimeDB insert requests /// Convert OpenTelemetry metrics to GreptimeDB insert requests
@@ -174,7 +174,7 @@ fn encode_gauge(
scope_attrs: Option<&Vec<KeyValue>>, scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> { ) -> Result<()> {
let table = table_writer.get_or_default_table_data( let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name), normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT, APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(), gauge.data_points.len(),
); );
@@ -208,7 +208,7 @@ fn encode_sum(
scope_attrs: Option<&Vec<KeyValue>>, scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> { ) -> Result<()> {
let table = table_writer.get_or_default_table_data( let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name), normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT, APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(), sum.data_points.len(),
); );
@@ -237,7 +237,7 @@ const HISTOGRAM_LE_COLUMN: &str = "le";
/// The implementation has been following Prometheus histogram table format: /// The implementation has been following Prometheus histogram table format:
/// ///
/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper /// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
/// limit, and `greptime_value` for bucket count /// limit, and `greptime_value` for bucket count
/// - A `%metric%_sum` table storing sum of samples /// - A `%metric%_sum` table storing sum of samples
/// - A `%metric%_count` table storing count of samples. /// - A `%metric%_count` table storing count of samples.
/// ///
@@ -358,7 +358,7 @@ fn encode_summary(
scope_attrs: Option<&Vec<KeyValue>>, scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> { ) -> Result<()> {
let table = table_writer.get_or_default_table_data( let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name), normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT, APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(), summary.data_points.len(),
); );
@@ -377,7 +377,7 @@ fn encode_summary(
for quantile in &data_point.quantile_values { for quantile in &data_point.quantile_values {
row_writer::write_f64( row_writer::write_f64(
table, table,
&format!("greptime_p{:02}", quantile.quantile * 100f64), format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value, quantile.value,
&mut row, &mut row,
)?; )?;

View File

@@ -15,7 +15,6 @@
#![feature(box_patterns)] #![feature(box_patterns)]
#![feature(assert_matches)] #![feature(assert_matches)]
#![feature(let_chains)] #![feature(let_chains)]
#![feature(lazy_cell)]
pub mod ast; pub mod ast;
pub mod dialect; pub mod dialect;

View File

@@ -57,8 +57,9 @@ lazy_static! {
/// - `ms` for `milliseconds` /// - `ms` for `milliseconds`
/// - `us` for `microseconds` /// - `us` for `microseconds`
/// - `ns` for `nanoseconds` /// - `ns` for `nanoseconds`
///
/// Required for scenarios that use the shortened version of `INTERVAL`, /// Required for scenarios that use the shortened version of `INTERVAL`,
/// f.e `SELECT INTERVAL '1h'` or `SELECT INTERVAL '3w2d'` /// f.e `SELECT INTERVAL '1h'` or `SELECT INTERVAL '3w2d'`
pub(crate) struct ExpandIntervalTransformRule; pub(crate) struct ExpandIntervalTransformRule;
impl TransformRule for ExpandIntervalTransformRule { impl TransformRule for ExpandIntervalTransformRule {
@@ -145,10 +146,11 @@ fn update_existing_interval_with_value(interval: &Interval, value: Box<Expr>) ->
/// Normalizes an interval expression string into the sql-compatible format. /// Normalizes an interval expression string into the sql-compatible format.
/// This function handles 2 types of input: /// This function handles 2 types of input:
/// 1. Abbreviated interval strings (e.g., "1y2mo3d") /// 1. Abbreviated interval strings (e.g., "1y2mo3d")
/// Returns an interval's full name (e.g., "years", "hours", "minutes") according to the `INTERVAL_ABBREVIATION_MAPPING` /// Returns an interval's full name (e.g., "years", "hours", "minutes") according to the `INTERVAL_ABBREVIATION_MAPPING`
/// If the `interval_str` contains whitespaces, the interval name is considered to be in a full form. /// If the `interval_str` contains whitespaces, the interval name is considered to be in a full form.
/// 2. ISO 8601 format strings (e.g., "P1Y2M3D"), case/sign independent /// 2. ISO 8601 format strings (e.g., "P1Y2M3D"), case/sign independent
/// Returns a number of milliseconds corresponding to ISO 8601 (e.g., "36525000 milliseconds") /// Returns a number of milliseconds corresponding to ISO 8601 (e.g., "36525000 milliseconds")
///
/// Note: Hybrid format "1y 2 days 3h" is not supported. /// Note: Hybrid format "1y 2 days 3h" is not supported.
fn normalize_interval_name(interval_str: &str) -> Option<String> { fn normalize_interval_name(interval_str: &str) -> Option<String> {
if interval_str.contains(char::is_whitespace) { if interval_str.contains(char::is_whitespace) {