diff --git a/.cargo/config.toml b/.cargo/config.toml index 5e452974ad..8178384675 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,6 +2,7 @@ # This is only present for local builds, as it will be overridden # by the RUSTDOCFLAGS env var in CI. rustdocflags = ["-Arustdoc::private_intra_doc_links"] +rustflags = ["--cfg", "tokio_unstable"] [alias] build_testing = ["build", "--features", "testing"] diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index d16ca2bc45..246b27bb9c 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -534,7 +534,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { ) .unwrap(), )); - tokio::spawn(locks.garbage_collect_worker()); + tokio::task::Builder::new() + .name("wake compute lock gc") + .spawn(locks.garbage_collect_worker()) + .unwrap(); let url = args.auth_endpoint.parse()?; let endpoint = http::Endpoint::new(url, http::new_client()); diff --git a/proxy/src/console/mgmt.rs b/proxy/src/console/mgmt.rs index c7a2d467c0..31ca210860 100644 --- a/proxy/src/console/mgmt.rs +++ b/proxy/src/console/mgmt.rs @@ -40,28 +40,31 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result { let span = info_span!("mgmt", peer = %peer_addr); - tokio::task::spawn( - async move { - info!("serving a new console management API connection"); + tokio::task::Builder::new() + .name("mgmt handler") + .spawn( + async move { + info!("serving a new console management API connection"); - // these might be long running connections, have a separate logging for cancelling - // on shutdown and other ways of stopping. - let cancelled = scopeguard::guard(tracing::Span::current(), |span| { - let _e = span.entered(); - info!("console management API task cancelled"); - }); + // these might be long running connections, have a separate logging for cancelling + // on shutdown and other ways of stopping. + let cancelled = scopeguard::guard(tracing::Span::current(), |span| { + let _e = span.entered(); + info!("console management API task cancelled"); + }); - if let Err(e) = handle_connection(socket).await { - error!("serving failed with an error: {e}"); - } else { - info!("serving completed"); + if let Err(e) = handle_connection(socket).await { + error!("serving failed with an error: {e}"); + } else { + info!("serving completed"); + } + + // we can no longer get dropped + scopeguard::ScopeGuard::into_inner(cancelled); } - - // we can no longer get dropped - scopeguard::ScopeGuard::into_inner(cancelled); - } - .instrument(span), - ); + .instrument(span), + ) + .unwrap(); } } diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index 60d488c973..2bfbf33f79 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -63,7 +63,10 @@ impl Api { let (client, connection) = tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?; - tokio::spawn(connection); + tokio::task::Builder::new() + .name("mock conn") + .spawn(connection) + .unwrap(); let secret = match get_execute_postgres_query( &client, "select rolpassword from pg_catalog.pg_authid where rolname = $1", diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index e061216d15..571518211f 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -141,12 +141,15 @@ pub async fn worker( LOG_CHAN.set(tx.downgrade()).unwrap(); // setup row stream that will close on cancellation - tokio::spawn(async move { - cancellation_token.cancelled().await; - // dropping this sender will cause the channel to close only once - // all the remaining inflight requests have been completed. - drop(tx); - }); + tokio::task::Builder::new() + .name("drop parquet conn") + .spawn(async move { + cancellation_token.cancelled().await; + // dropping this sender will cause the channel to close only once + // all the remaining inflight requests have been completed. + drop(tx); + }) + .unwrap(); let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx)); let rx = rx.map(RequestData::from); diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 2f88c5db5d..9db7649003 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -296,13 +296,9 @@ pub async fn handle_client( } }; - let mut node = connect_to_compute( - ctx, - &TcpMechanism { params: ¶ms }, - &user_info, - ) - .or_else(|e| stream.throw_error(e)) - .await?; + let mut node = connect_to_compute(ctx, &TcpMechanism { params: ¶ms }, &user_info) + .or_else(|e| stream.throw_error(e)) + .await?; let session = cancellation_handler.get_session(); prepare_client_connection(&node, &session, &mut stream).await?; diff --git a/proxy/src/redis/connection_with_credentials_provider.rs b/proxy/src/redis/connection_with_credentials_provider.rs index 3a90d911c2..a239f26e16 100644 --- a/proxy/src/redis/connection_with_credentials_provider.rs +++ b/proxy/src/redis/connection_with_credentials_provider.rs @@ -108,10 +108,12 @@ impl ConnectionWithCredentialsProvider { if let Credentials::Dynamic(credentials_provider, _) = &self.credentials { let credentials_provider = credentials_provider.clone(); let con2 = con.clone(); - let f = tokio::spawn(async move { - let _ = Self::keep_connection(con2, credentials_provider).await; - }); - self.refresh_token_task = Some(f); + let f = tokio::task::Builder::new() + .name("redis keep connection") + .spawn(async move { + let _ = Self::keep_connection(con2, credentials_provider).await; + }); + self.refresh_token_task = Some(f.unwrap()); } match Self::ping(&mut con).await { Ok(()) => { diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 5a38530faf..15fdb5107e 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -142,10 +142,13 @@ impl MessageHandler { // To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds. // TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message. let cache = self.cache.clone(); - tokio::spawn(async move { - tokio::time::sleep(INVALIDATION_LAG).await; - invalidate_cache(cache, msg); - }); + tokio::task::Builder::new() + .name("invalidate cache lazy") + .spawn(async move { + tokio::time::sleep(INVALIDATION_LAG).await; + invalidate_cache(cache, msg); + }) + .unwrap(); } } diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index b0f4026c76..4a56cff887 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -61,22 +61,28 @@ pub async fn task_main( let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config); { let conn_pool = Arc::clone(&conn_pool); - tokio::spawn(async move { - conn_pool.gc_worker(StdRng::from_entropy()).await; - }); + tokio::task::Builder::new() + .name("serverless pool gc") + .spawn(async move { + conn_pool.gc_worker(StdRng::from_entropy()).await; + }) + .unwrap(); } // shutdown the connection pool - tokio::spawn({ - let cancellation_token = cancellation_token.clone(); - let conn_pool = conn_pool.clone(); - async move { - cancellation_token.cancelled().await; - tokio::task::spawn_blocking(move || conn_pool.shutdown()) - .await - .unwrap(); - } - }); + tokio::task::Builder::new() + .name("serverless pool shutdown") + .spawn({ + let cancellation_token = cancellation_token.clone(); + let conn_pool = conn_pool.clone(); + async move { + cancellation_token.cancelled().await; + tokio::task::spawn_blocking(move || conn_pool.shutdown()) + .await + .unwrap(); + } + }) + .unwrap(); let backend = Arc::new(PoolingBackend { pool: Arc::clone(&conn_pool), diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 798e488509..0fe766ce32 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -492,7 +492,7 @@ pub fn poll_client( let cancel = CancellationToken::new(); let cancelled = cancel.clone().cancelled_owned(); - tokio::spawn( + tokio::task::Builder::new().name("pooled conn").spawn( async move { let _conn_gauge = conn_gauge; let mut idle_timeout = pin!(tokio::time::sleep(idle)); @@ -565,7 +565,7 @@ pub fn poll_client( }).await; } - .instrument(span)); + .instrument(span)).unwrap(); let inner = ClientInner { inner: client, session: tx,