add task names

This commit is contained in:
Conrad Ludgate
2024-04-19 15:11:43 +01:00
parent 4d1b5992eb
commit 39345e3f57
10 changed files with 77 additions and 57 deletions

View File

@@ -2,6 +2,7 @@
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
rustdocflags = ["-Arustdoc::private_intra_doc_links"]
rustflags = ["--cfg", "tokio_unstable"]
[alias]
build_testing = ["build", "--features", "testing"]

View File

@@ -534,7 +534,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
)
.unwrap(),
));
tokio::spawn(locks.garbage_collect_worker());
tokio::task::Builder::new()
.name("wake compute lock gc")
.spawn(locks.garbage_collect_worker())
.unwrap();
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());

View File

@@ -40,28 +40,31 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
let span = info_span!("mgmt", peer = %peer_addr);
tokio::task::spawn(
async move {
info!("serving a new console management API connection");
tokio::task::Builder::new()
.name("mgmt handler")
.spawn(
async move {
info!("serving a new console management API connection");
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
.instrument(span),
);
.instrument(span),
)
.unwrap();
}
}

View File

@@ -63,7 +63,10 @@ impl Api {
let (client, connection) =
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
tokio::spawn(connection);
tokio::task::Builder::new()
.name("mock conn")
.spawn(connection)
.unwrap();
let secret = match get_execute_postgres_query(
&client,
"select rolpassword from pg_catalog.pg_authid where rolname = $1",

View File

@@ -141,12 +141,15 @@ pub async fn worker(
LOG_CHAN.set(tx.downgrade()).unwrap();
// setup row stream that will close on cancellation
tokio::spawn(async move {
cancellation_token.cancelled().await;
// dropping this sender will cause the channel to close only once
// all the remaining inflight requests have been completed.
drop(tx);
});
tokio::task::Builder::new()
.name("drop parquet conn")
.spawn(async move {
cancellation_token.cancelled().await;
// dropping this sender will cause the channel to close only once
// all the remaining inflight requests have been completed.
drop(tx);
})
.unwrap();
let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
let rx = rx.map(RequestData::from);

View File

@@ -296,13 +296,9 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
}
};
let mut node = connect_to_compute(
ctx,
&TcpMechanism { params: &params },
&user_info,
)
.or_else(|e| stream.throw_error(e))
.await?;
let mut node = connect_to_compute(ctx, &TcpMechanism { params: &params }, &user_info)
.or_else(|e| stream.throw_error(e))
.await?;
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;

View File

@@ -108,10 +108,12 @@ impl ConnectionWithCredentialsProvider {
if let Credentials::Dynamic(credentials_provider, _) = &self.credentials {
let credentials_provider = credentials_provider.clone();
let con2 = con.clone();
let f = tokio::spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f);
let f = tokio::task::Builder::new()
.name("redis keep connection")
.spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f.unwrap());
}
match Self::ping(&mut con).await {
Ok(()) => {

View File

@@ -142,10 +142,13 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
let cache = self.cache.clone();
tokio::spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
});
tokio::task::Builder::new()
.name("invalidate cache lazy")
.spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
})
.unwrap();
}
}

View File

@@ -61,22 +61,28 @@ pub async fn task_main(
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
tokio::spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
});
tokio::task::Builder::new()
.name("serverless pool gc")
.spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
})
.unwrap();
}
// shutdown the connection pool
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
});
tokio::task::Builder::new()
.name("serverless pool shutdown")
.spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
})
.unwrap();
let backend = Arc::new(PoolingBackend {
pool: Arc::clone(&conn_pool),

View File

@@ -492,7 +492,7 @@ pub fn poll_client<C: ClientInnerExt>(
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
tokio::task::Builder::new().name("pooled conn").spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
@@ -565,7 +565,7 @@ pub fn poll_client<C: ClientInnerExt>(
}).await;
}
.instrument(span));
.instrument(span)).unwrap();
let inner = ClientInner {
inner: client,
session: tx,