From 752bf5a22f8b53a163102820d845c87bf848cb55 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 5 Mar 2024 12:14:37 +0200 Subject: [PATCH] build: clippy disallow futures::pin_mut macro (#7016) `std` has had `pin!` macro for some time, there is no need for us to use the older alternatives. Cannot disallow `tokio::pin` because tokio macros use that. --- clippy.toml | 7 +++++++ control_plane/src/pageserver.rs | 2 +- libs/postgres_backend/src/lib.rs | 4 +--- proxy/src/serverless/sql_over_http.rs | 4 +--- s3_scrubber/src/checks.rs | 5 ++--- s3_scrubber/src/garbage.rs | 14 +++++++------- s3_scrubber/src/scan_metadata.rs | 5 ++--- safekeeper/src/wal_service.rs | 2 +- 8 files changed, 22 insertions(+), 21 deletions(-) diff --git a/clippy.toml b/clippy.toml index d788afc84d..5f7dc66152 100644 --- a/clippy.toml +++ b/clippy.toml @@ -3,3 +3,10 @@ disallowed-methods = [ # Allow this for now, to deny it later once we stop using Handle::block_on completely # "tokio::runtime::Handle::block_on", ] + +disallowed-macros = [ + # use std::pin::pin + "futures::pin_mut", + # cannot disallow this, because clippy finds used from tokio macros + #"tokio::pin", +] diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 642f153f2d..7d0c07a938 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -605,7 +605,7 @@ impl PageServerNode { eprintln!("connection error: {}", e); } }); - tokio::pin!(client); + let client = std::pin::pin!(client); // Init base reader let (start_lsn, base_tarfile_path) = base; diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 73d25619c3..260018ad89 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -6,7 +6,6 @@ #![deny(clippy::undocumented_unsafe_blocks)] use anyhow::Context; use bytes::Bytes; -use futures::pin_mut; use serde::{Deserialize, Serialize}; use std::io::ErrorKind; use std::net::SocketAddr; @@ -378,8 +377,7 @@ impl PostgresBackend { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll> { - let flush_fut = self.flush(); - pin_mut!(flush_fut); + let flush_fut = std::pin::pin!(self.flush()); flush_fut.poll(cx) } diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 7f51ba82cc..74af985211 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::bail; -use futures::pin_mut; use futures::StreamExt; use hyper::body::HttpBody; use hyper::header; @@ -531,13 +530,12 @@ async fn query_to_json( ) -> anyhow::Result<(ReadyForQueryStatus, Value)> { info!("executing query"); let query_params = data.params; - let row_stream = client.query_raw_txt(&data.query, query_params).await?; + let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?); info!("finished executing query"); // Manually drain the stream into a vector to leave row_stream hanging // around to get a command tag. Also check that the response is not too // big. - pin_mut!(row_stream); let mut rows: Vec = Vec::new(); while let Some(row) = row_stream.next().await { let row = row?; diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index 7b9f96dce3..7c0f699958 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -11,7 +11,7 @@ use utils::id::TimelineId; use crate::cloud_admin_api::BranchData; use crate::metadata_stream::stream_listing; use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId}; -use futures_util::{pin_mut, StreamExt}; +use futures_util::StreamExt; use pageserver::tenant::remote_timeline_client::parse_remote_index_path; use pageserver::tenant::storage_layer::LayerFileName; use pageserver::tenant::IndexPart; @@ -285,8 +285,7 @@ pub(crate) async fn list_timeline_blobs( let mut index_parts: Vec = Vec::new(); let mut initdb_archive: bool = false; - let stream = stream_listing(s3_client, &timeline_dir_target); - pin_mut!(stream); + let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target)); while let Some(obj) = stream.next().await { let obj = obj?; let key = obj.key(); diff --git a/s3_scrubber/src/garbage.rs b/s3_scrubber/src/garbage.rs index 93bb115883..7a08dffc66 100644 --- a/s3_scrubber/src/garbage.rs +++ b/s3_scrubber/src/garbage.rs @@ -12,7 +12,7 @@ use aws_sdk_s3::{ types::{Delete, ObjectIdentifier}, Client, }; -use futures_util::{pin_mut, TryStreamExt}; +use futures_util::TryStreamExt; use pageserver_api::shard::TenantShardId; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; @@ -199,12 +199,12 @@ async fn find_garbage_inner( } } }); - let tenants_checked = tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY); + let mut tenants_checked = + std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY)); // Process the results of Tenant checks. If a Tenant is garbage, it goes into // the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline // checks if they are enabled by the `depth` parameter. - pin_mut!(tenants_checked); let mut garbage = GarbageList::new(node_kind, bucket_config); let mut active_tenants: Vec = vec![]; let mut counter = 0; @@ -267,10 +267,10 @@ async fn find_garbage_inner( .map(|r| (ttid, r)) } }); - let timelines_checked = timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY); + let mut timelines_checked = + std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY)); // Update the GarbageList with any timelines which appear not to exist. - pin_mut!(timelines_checked); while let Some(result) = timelines_checked.next().await { let (ttid, console_result) = result?; if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) { @@ -425,9 +425,9 @@ pub async fn purge_garbage( } } }); - let get_objects_results = get_objects_results.try_buffer_unordered(S3_CONCURRENCY); + let mut get_objects_results = + std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY)); - pin_mut!(get_objects_results); let mut objects_to_delete = Vec::new(); while let Some(result) = get_objects_results.next().await { let mut object_list = result?; diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index 4b63bb3884..6ff9783875 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -7,7 +7,7 @@ use crate::checks::{ use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use aws_sdk_s3::Client; -use futures_util::{pin_mut, StreamExt, TryStreamExt}; +use futures_util::{StreamExt, TryStreamExt}; use histogram::Histogram; use pageserver::tenant::remote_timeline_client::remote_layer_path; use pageserver::tenant::IndexPart; @@ -226,7 +226,7 @@ pub async fn scan_metadata( Ok((ttid, data)) } let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid)); - let timelines = timelines.try_buffered(CONCURRENCY); + let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different // shards in the same tenant might refer to one anothers' keys if a shard split has happened. @@ -309,7 +309,6 @@ pub async fn scan_metadata( // all results for the same tenant will be adjacent. We accumulate these, // and then call `analyze_tenant` to flush, when we see the next tenant ID. let mut summary = MetadataSummary::new(); - pin_mut!(timelines); while let Some(i) = timelines.next().await { let (ttid, data) = i?; summary.update_data(&data); diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index bceaad1e16..4a97eb3993 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -68,7 +68,7 @@ async fn handle_socket( // is not Unpin, and all pgbackend/framed/tokio dependencies require stream // to be Unpin. Which is reasonable, as indeed something like TimeoutReader // shouldn't be moved. - tokio::pin!(socket); + let socket = std::pin::pin!(socket); let traffic_metrics = TrafficMetrics::new(); if let Some(current_az) = conf.availability_zone.as_deref() {