mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Which ought to replace etcd. This patch only adds the binary and adjusts Dockerfile to include it; subsequent ones will add deploy of helm chart and the actual replacement. It is a simple and fast pub-sub message bus. In this patch only safekeeper message is supported, but others can be easily added. Compilation now requires protoc to be installed. Installing protobuf-compiler package is fine for Debian/Ubuntu. ref https://github.com/neondatabase/neon/pull/2733 https://github.com/neondatabase/neon/issues/2394
109 lines
3.4 KiB
Rust
109 lines
3.4 KiB
Rust
use hyper::body::HttpBody;
|
|
use std::pin::Pin;
|
|
use std::task::{Context, Poll};
|
|
use tonic::codegen::StdError;
|
|
use tonic::{transport::Channel, Code, Status};
|
|
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
|
|
|
use proto::{
|
|
broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId,
|
|
};
|
|
|
|
// Code generated by protobuf.
|
|
pub mod proto {
|
|
include!("../proto/storage_broker.rs");
|
|
}
|
|
|
|
pub mod metrics;
|
|
|
|
// Re-exports to avoid direct tonic dependency in user crates.
|
|
pub use tonic::Request;
|
|
pub use tonic::Streaming;
|
|
|
|
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
|
|
|
// NeonBrokerClient charged with tonic provided Channel transport; helps to
|
|
// avoid depending on tonic directly in user crates.
|
|
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
|
|
|
impl BrokerClientChannel {
|
|
/// Create a new client to the given endpoint, but don't actually connect until the first request.
|
|
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
|
where
|
|
D: std::convert::TryInto<tonic::transport::Endpoint>,
|
|
D::Error: Into<StdError>,
|
|
{
|
|
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
|
|
Ok(Self::new(conn))
|
|
}
|
|
}
|
|
|
|
// parse variable length bytes from protobuf
|
|
pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
|
|
let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
|
|
.map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
|
|
let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
|
|
Status::new(
|
|
Code::InvalidArgument,
|
|
format!("malformed timeline_id: {}", e),
|
|
)
|
|
})?;
|
|
Ok(TenantTimelineId {
|
|
tenant_id,
|
|
timeline_id,
|
|
})
|
|
}
|
|
|
|
// These several usages don't justify anyhow dependency, though it would work as
|
|
// well.
|
|
type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|
|
|
// Provides impl HttpBody for two different types implementing it. Inspired by
|
|
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
|
|
pub enum EitherBody<A, B> {
|
|
Left(A),
|
|
Right(B),
|
|
}
|
|
|
|
impl<A, B> HttpBody for EitherBody<A, B>
|
|
where
|
|
A: HttpBody + Send + Unpin,
|
|
B: HttpBody<Data = A::Data> + Send + Unpin,
|
|
A::Error: Into<AnyError>,
|
|
B::Error: Into<AnyError>,
|
|
{
|
|
type Data = A::Data;
|
|
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|
|
|
fn is_end_stream(&self) -> bool {
|
|
match self {
|
|
EitherBody::Left(b) => b.is_end_stream(),
|
|
EitherBody::Right(b) => b.is_end_stream(),
|
|
}
|
|
}
|
|
|
|
fn poll_data(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
|
match self.get_mut() {
|
|
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
|
|
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
|
|
}
|
|
}
|
|
|
|
fn poll_trailers(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
|
|
match self.get_mut() {
|
|
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
|
|
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
|
|
err.map(|e| e.map_err(Into::into))
|
|
}
|