Replace pin! from tokio to the std one (#3903)

With fresh rustc brought by
https://github.com/neondatabase/neon/pull/3902, we can use
`std::pin::pin!` macro instead of the tokio one.
One place did not need the macro at all, other places were adjusted.
This commit is contained in:
Kirill Bulatov
2023-03-29 14:14:56 +03:00
committed by GitHub
parent f1b174dc6a
commit ac9c7e8c4a
4 changed files with 10 additions and 13 deletions

View File

@@ -27,6 +27,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::io;
use std::net::TcpListener;
use std::pin::pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -466,8 +467,7 @@ impl PageServerHandler {
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let copyin_reader = StreamReader::new(copyin_stream(pgb));
tokio::pin!(copyin_reader);
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.await?;
@@ -512,8 +512,7 @@ impl PageServerHandler {
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let copyin_reader = StreamReader::new(copyin_stream(pgb));
tokio::pin!(copyin_reader);
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");

View File

@@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
@@ -677,8 +678,7 @@ impl Timeline {
let mut failed = 0;
let cancelled = task_mgr::shutdown_watcher();
tokio::pin!(cancelled);
let mut cancelled = pin!(task_mgr::shutdown_watcher());
loop {
tokio::select! {
@@ -1837,13 +1837,13 @@ impl Timeline {
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
let calculation = async {
let mut calculation = pin!(async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cancel, &ctx)
.await
};
});
let timeline_state_cancellation = async {
loop {
match timeline_state_updates.changed().await {
@@ -1872,7 +1872,6 @@ impl Timeline {
"aborted because task_mgr shutdown requested".to_string()
};
tokio::pin!(calculation);
loop {
tokio::select! {
res = &mut calculation => { return res }

View File

@@ -399,7 +399,6 @@ impl Timeline {
let mut throwaway_cache = HashMap::new();
let gather =
crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx);
tokio::pin!(gather);
tokio::select! {
_ = cancel.cancelled() => {}

View File

@@ -2,6 +2,7 @@
use std::{
error::Error,
pin::pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
@@ -17,7 +18,7 @@ use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio::{select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
@@ -187,8 +188,7 @@ pub async fn handle_walreceiver_connection(
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let copy_stream = replication_client.copy_both_simple(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);