mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
build: clippy disallow futures::pin_mut macro (#7016)
`std` has had `pin!` macro for some time, there is no need for us to use the older alternatives. Cannot disallow `tokio::pin` because tokio macros use that.
This commit is contained in:
@@ -3,3 +3,10 @@ disallowed-methods = [
|
|||||||
# Allow this for now, to deny it later once we stop using Handle::block_on completely
|
# Allow this for now, to deny it later once we stop using Handle::block_on completely
|
||||||
# "tokio::runtime::Handle::block_on",
|
# "tokio::runtime::Handle::block_on",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
disallowed-macros = [
|
||||||
|
# use std::pin::pin
|
||||||
|
"futures::pin_mut",
|
||||||
|
# cannot disallow this, because clippy finds used from tokio macros
|
||||||
|
#"tokio::pin",
|
||||||
|
]
|
||||||
|
|||||||
@@ -605,7 +605,7 @@ impl PageServerNode {
|
|||||||
eprintln!("connection error: {}", e);
|
eprintln!("connection error: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
tokio::pin!(client);
|
let client = std::pin::pin!(client);
|
||||||
|
|
||||||
// Init base reader
|
// Init base reader
|
||||||
let (start_lsn, base_tarfile_path) = base;
|
let (start_lsn, base_tarfile_path) = base;
|
||||||
|
|||||||
@@ -6,7 +6,6 @@
|
|||||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::pin_mut;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
@@ -378,8 +377,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut std::task::Context<'_>,
|
cx: &mut std::task::Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
let flush_fut = self.flush();
|
let flush_fut = std::pin::pin!(self.flush());
|
||||||
pin_mut!(flush_fut);
|
|
||||||
flush_fut.poll(cx)
|
flush_fut.poll(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use futures::pin_mut;
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use hyper::body::HttpBody;
|
use hyper::body::HttpBody;
|
||||||
use hyper::header;
|
use hyper::header;
|
||||||
@@ -531,13 +530,12 @@ async fn query_to_json<T: GenericClient>(
|
|||||||
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
|
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
|
||||||
info!("executing query");
|
info!("executing query");
|
||||||
let query_params = data.params;
|
let query_params = data.params;
|
||||||
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
|
let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?);
|
||||||
info!("finished executing query");
|
info!("finished executing query");
|
||||||
|
|
||||||
// Manually drain the stream into a vector to leave row_stream hanging
|
// Manually drain the stream into a vector to leave row_stream hanging
|
||||||
// around to get a command tag. Also check that the response is not too
|
// around to get a command tag. Also check that the response is not too
|
||||||
// big.
|
// big.
|
||||||
pin_mut!(row_stream);
|
|
||||||
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
|
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
|
||||||
while let Some(row) = row_stream.next().await {
|
while let Some(row) = row_stream.next().await {
|
||||||
let row = row?;
|
let row = row?;
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use utils::id::TimelineId;
|
|||||||
use crate::cloud_admin_api::BranchData;
|
use crate::cloud_admin_api::BranchData;
|
||||||
use crate::metadata_stream::stream_listing;
|
use crate::metadata_stream::stream_listing;
|
||||||
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
|
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
|
||||||
use futures_util::{pin_mut, StreamExt};
|
use futures_util::StreamExt;
|
||||||
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
|
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
|
||||||
use pageserver::tenant::storage_layer::LayerFileName;
|
use pageserver::tenant::storage_layer::LayerFileName;
|
||||||
use pageserver::tenant::IndexPart;
|
use pageserver::tenant::IndexPart;
|
||||||
@@ -285,8 +285,7 @@ pub(crate) async fn list_timeline_blobs(
|
|||||||
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
|
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
|
||||||
let mut initdb_archive: bool = false;
|
let mut initdb_archive: bool = false;
|
||||||
|
|
||||||
let stream = stream_listing(s3_client, &timeline_dir_target);
|
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
|
||||||
pin_mut!(stream);
|
|
||||||
while let Some(obj) = stream.next().await {
|
while let Some(obj) = stream.next().await {
|
||||||
let obj = obj?;
|
let obj = obj?;
|
||||||
let key = obj.key();
|
let key = obj.key();
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use aws_sdk_s3::{
|
|||||||
types::{Delete, ObjectIdentifier},
|
types::{Delete, ObjectIdentifier},
|
||||||
Client,
|
Client,
|
||||||
};
|
};
|
||||||
use futures_util::{pin_mut, TryStreamExt};
|
use futures_util::TryStreamExt;
|
||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
@@ -199,12 +199,12 @@ async fn find_garbage_inner(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let tenants_checked = tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
|
let mut tenants_checked =
|
||||||
|
std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
|
||||||
|
|
||||||
// Process the results of Tenant checks. If a Tenant is garbage, it goes into
|
// Process the results of Tenant checks. If a Tenant is garbage, it goes into
|
||||||
// the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
|
// the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
|
||||||
// checks if they are enabled by the `depth` parameter.
|
// checks if they are enabled by the `depth` parameter.
|
||||||
pin_mut!(tenants_checked);
|
|
||||||
let mut garbage = GarbageList::new(node_kind, bucket_config);
|
let mut garbage = GarbageList::new(node_kind, bucket_config);
|
||||||
let mut active_tenants: Vec<TenantShardId> = vec![];
|
let mut active_tenants: Vec<TenantShardId> = vec![];
|
||||||
let mut counter = 0;
|
let mut counter = 0;
|
||||||
@@ -267,10 +267,10 @@ async fn find_garbage_inner(
|
|||||||
.map(|r| (ttid, r))
|
.map(|r| (ttid, r))
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let timelines_checked = timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
|
let mut timelines_checked =
|
||||||
|
std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
|
||||||
|
|
||||||
// Update the GarbageList with any timelines which appear not to exist.
|
// Update the GarbageList with any timelines which appear not to exist.
|
||||||
pin_mut!(timelines_checked);
|
|
||||||
while let Some(result) = timelines_checked.next().await {
|
while let Some(result) = timelines_checked.next().await {
|
||||||
let (ttid, console_result) = result?;
|
let (ttid, console_result) = result?;
|
||||||
if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
|
if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
|
||||||
@@ -425,9 +425,9 @@ pub async fn purge_garbage(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let get_objects_results = get_objects_results.try_buffer_unordered(S3_CONCURRENCY);
|
let mut get_objects_results =
|
||||||
|
std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));
|
||||||
|
|
||||||
pin_mut!(get_objects_results);
|
|
||||||
let mut objects_to_delete = Vec::new();
|
let mut objects_to_delete = Vec::new();
|
||||||
while let Some(result) = get_objects_results.next().await {
|
while let Some(result) = get_objects_results.next().await {
|
||||||
let mut object_list = result?;
|
let mut object_list = result?;
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use crate::checks::{
|
|||||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||||
use aws_sdk_s3::Client;
|
use aws_sdk_s3::Client;
|
||||||
use futures_util::{pin_mut, StreamExt, TryStreamExt};
|
use futures_util::{StreamExt, TryStreamExt};
|
||||||
use histogram::Histogram;
|
use histogram::Histogram;
|
||||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||||
use pageserver::tenant::IndexPart;
|
use pageserver::tenant::IndexPart;
|
||||||
@@ -226,7 +226,7 @@ pub async fn scan_metadata(
|
|||||||
Ok((ttid, data))
|
Ok((ttid, data))
|
||||||
}
|
}
|
||||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
||||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||||
|
|
||||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||||
@@ -309,7 +309,6 @@ pub async fn scan_metadata(
|
|||||||
// all results for the same tenant will be adjacent. We accumulate these,
|
// all results for the same tenant will be adjacent. We accumulate these,
|
||||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||||
let mut summary = MetadataSummary::new();
|
let mut summary = MetadataSummary::new();
|
||||||
pin_mut!(timelines);
|
|
||||||
while let Some(i) = timelines.next().await {
|
while let Some(i) = timelines.next().await {
|
||||||
let (ttid, data) = i?;
|
let (ttid, data) = i?;
|
||||||
summary.update_data(&data);
|
summary.update_data(&data);
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ async fn handle_socket(
|
|||||||
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
|
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
|
||||||
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
|
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
|
||||||
// shouldn't be moved.
|
// shouldn't be moved.
|
||||||
tokio::pin!(socket);
|
let socket = std::pin::pin!(socket);
|
||||||
|
|
||||||
let traffic_metrics = TrafficMetrics::new();
|
let traffic_metrics = TrafficMetrics::new();
|
||||||
if let Some(current_az) = conf.availability_zone.as_deref() {
|
if let Some(current_az) = conf.availability_zone.as_deref() {
|
||||||
|
|||||||
Reference in New Issue
Block a user