diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b63ee31d5e..c0e4a2a9cf 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -27,6 +27,7 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::io; use std::net::TcpListener; +use std::pin::pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -466,8 +467,7 @@ impl PageServerHandler { pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.flush().await?; - let copyin_reader = StreamReader::new(copyin_stream(pgb)); - tokio::pin!(copyin_reader); + let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); timeline .import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx) .await?; @@ -512,8 +512,7 @@ impl PageServerHandler { info!("importing wal"); pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.flush().await?; - let copyin_reader = StreamReader::new(copyin_stream(pgb)); - tokio::pin!(copyin_reader); + let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?; info!("wal import complete"); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 611c2c27d3..e1db34ec1b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,6 +24,7 @@ use std::collections::HashMap; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; +use std::pin::pin; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; @@ -677,8 +678,7 @@ impl Timeline { let mut failed = 0; - let cancelled = task_mgr::shutdown_watcher(); - tokio::pin!(cancelled); + let mut cancelled = pin!(task_mgr::shutdown_watcher()); loop { tokio::select! { @@ -1837,13 +1837,13 @@ impl Timeline { let mut timeline_state_updates = self.subscribe_for_state_updates(); let self_calculation = Arc::clone(self); - let calculation = async { + let mut calculation = pin!(async { let cancel = cancel.child_token(); let ctx = ctx.attached_child(); self_calculation .calculate_logical_size(lsn, cancel, &ctx) .await - }; + }); let timeline_state_cancellation = async { loop { match timeline_state_updates.changed().await { @@ -1872,7 +1872,6 @@ impl Timeline { "aborted because task_mgr shutdown requested".to_string() }; - tokio::pin!(calculation); loop { tokio::select! { res = &mut calculation => { return res } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 3ec8c30d70..107cd89b90 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -399,7 +399,6 @@ impl Timeline { let mut throwaway_cache = HashMap::new(); let gather = crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx); - tokio::pin!(gather); tokio::select! { _ = cancel.cancelled() => {} diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 7194a4f3ed..9398a7bee9 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -2,6 +2,7 @@ use std::{ error::Error, + pin::pin, str::FromStr, sync::Arc, time::{Duration, SystemTime}, @@ -17,7 +18,7 @@ use postgres_ffi::v14::xlog_utils::normalize_lsn; use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; -use tokio::{pin, select, sync::watch, time}; +use tokio::{select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; @@ -187,8 +188,7 @@ pub async fn handle_walreceiver_connection( let query = format!("START_REPLICATION PHYSICAL {startpoint}"); let copy_stream = replication_client.copy_both_simple(&query).await?; - let physical_stream = ReplicationStream::new(copy_stream); - pin!(physical_stream); + let mut physical_stream = pin!(ReplicationStream::new(copy_stream)); let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);