From 7c9bd542a679c38149014e2137ec41207aea6b29 Mon Sep 17 00:00:00 2001 From: Elizabeth Murray Date: Mon, 26 May 2025 06:30:48 -0700 Subject: [PATCH] Fix compile warnings, minor cleanup. --- pageserver/client_grpc/Cargo.toml | 2 +- pageserver/client_grpc/src/client_cache.rs | 31 ++----------------- pageserver/client_grpc/src/lib.rs | 2 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 2 +- 4 files changed, 6 insertions(+), 31 deletions(-) diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 3955ef579f..d0e162fbbe 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -12,7 +12,7 @@ tonic.workspace = true tracing.workspace = true tokio = { version = "1.43.1", features = ["full", "macros", "net", "io-util", "rt", "rt-multi-thread"] } uuid = { version = "1", features = ["v4"] } -tower = { version = "0.4", features = ["timeout"] } +tower = { version = "0.4", features = ["timeout", "util"] } rand = "0.8" tokio-util = { version = "0.7", features = ["compat"] } hyper-util = "0.1.9" diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index bb8befa1ce..cb496148ee 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -1,8 +1,7 @@ use std::{ - collections::{BinaryHeap, HashMap}, + collections::{HashMap}, sync::{ Arc, - atomic::{AtomicUsize, Ordering}, }, time::{Duration, Instant}, io::{self, Error, ErrorKind}, @@ -11,7 +10,7 @@ use std::{ use priority_queue::PriorityQueue; use tokio::{ - sync::{Mutex, mpsc, watch, Semaphore, OwnedSemaphorePermit}, + sync::{Mutex, Semaphore, OwnedSemaphorePermit}, time::sleep, net::TcpStream, io::{AsyncRead, AsyncWrite, ReadBuf}, @@ -32,31 +31,10 @@ use rand::{ SeedableRng }; -use tower::service_fn; use http::Uri; use hyper_util::rt::TokioIo; use bytes::BytesMut; -use futures::future; -use http::Uri; -use hyper_util::rt::TokioIo; -use rand::{Rng, SeedableRng, rngs::StdRng}; -use std::io::{self, Error, ErrorKind}; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::net::TcpStream; use tower::service_fn; -use uuid; - -use metrics::{ - {Encoder, TextEncoder}, - proto::MetricFamily, -}; - -// use info -use tracing::info; use tokio_util::sync::CancellationToken; @@ -240,7 +218,6 @@ pub struct PooledClient { pool: Arc, id: uuid::Uuid, permit: OwnedSemaphorePermit, - is_ok: bool, } impl ConnectionPool { @@ -356,7 +333,7 @@ impl ConnectionPool { // Pop the highest-active-consumers connection. There are no connections // in the heap that have more than max_consumers active consumers. - if let Some((id, cons)) = inner.pq.pop() { + if let Some((id, _cons)) = inner.pq.pop() { let entry = inner.entries.get_mut(&id) .expect("pq and entries got out of sync"); @@ -369,7 +346,6 @@ impl ConnectionPool { pool: Arc::clone(&self), id, permit: permit, - is_ok: true, }; // re‐insert with updated priority @@ -437,7 +413,6 @@ impl ConnectionPool { let mut inner = self_clone.inner.lock().await; inner.waiters += 1; if inner.waiters >= (inner.in_progress * self_clone.max_consumers) { - semaphore = Arc::clone(&self_clone.channel_semaphore); let self_clone_spawn = Arc::clone(&self_clone); tokio::task::spawn(async move { self_clone_spawn.create_connection().await; diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 2a15e59786..4c1a4a5185 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -22,7 +22,7 @@ use utils::shard::ShardIndex; use std::fmt::Debug; mod client_cache; -use metrics::{IntCounter, IntCounterVec, core::Collector}; +use metrics::{IntCounterVec, core::Collector}; #[derive(Error, Debug)] pub enum PageserverClientError { diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 071c952e0d..f49ad68b2c 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -171,7 +171,7 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { }) } async fn get_metrics(State(state): State>) -> Response { - use metrics::core::Collector; + let metrics = state.collect(); info!("metrics: {metrics:?}");