Files
neon/storage_broker/src/lib.rs
Conrad Ludgate 72832b3214 chore: fix clippy lints from nightly-2025-03-16 (#11273)
I like to run nightly clippy every so often to make our future rust
upgrades easier. Some notable changes:

* Prefer `next_back()` over `last()`. Generic iterators will implement
`last()` to run forward through the iterator until the end.

* Prefer `io::Error::other()`.

* Use implicit returns

One case where I haven't dealt with the issues is the now
[more-sensitive "large enum variant"
lint](https://github.com/rust-lang/rust-clippy/pull/13833). I chose not
to take any decisions around it here, and simply marked them as allow
for now.
2025-04-09 15:04:42 +00:00

97 lines
3.8 KiB
Rust

use std::time::Duration;
use proto::TenantTimelineId as ProtoTenantTimelineId;
use proto::broker_service_client::BrokerServiceClient;
use tonic::Status;
use tonic::codegen::StdError;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
// Code generated by protobuf.
pub mod proto {
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
// we don't use these types for anything but broker data transmission,
// so it's ok to ignore this one.
#![allow(clippy::derive_partial_eq_without_eq)]
tonic::include_proto!("storage_broker");
}
pub mod metrics;
// Re-exports to avoid direct tonic dependency in user crates.
pub use hyper::Uri;
pub use tonic::{Code, Request, Streaming};
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
// BrokerServiceClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
// validated here.
//
// NB: this function is not async, but still must be run on a tokio runtime thread
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
where
U: std::convert::TryInto<Uri>,
U::Error: std::error::Error + Send + Sync + 'static,
{
let uri: Uri = endpoint.try_into()?;
let mut tonic_endpoint: Endpoint = uri.into();
// If schema starts with https, start encrypted connection; do plain text
// otherwise.
if let Some("https") = tonic_endpoint.uri().scheme_str() {
// if there's no default provider and both ring+aws-lc-rs are enabled
// this the tls settings on tonic will not work.
// erroring is ok.
rustls::crypto::ring::default_provider()
.install_default()
.ok();
let tls = ClientTlsConfig::new();
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
}
tonic_endpoint = tonic_endpoint
.http2_keep_alive_interval(keepalive_interval)
.keep_alive_while_idle(true)
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
// keep_alive_timeout is 20s by default on both client and server side
let channel = tonic_endpoint.connect_lazy();
Ok(BrokerClientChannel::new(channel))
}
impl BrokerClientChannel {
/// Create a new client to the given endpoint, but don't actually connect until the first request.
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
Ok(Self::new(conn))
}
}
// parse variable length bytes from protobuf
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
.map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
Status::new(
Code::InvalidArgument,
format!("malformed timeline_id: {}", e),
)
})?;
Ok(TenantTimelineId {
tenant_id,
timeline_id,
})
}