mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
feat: adjust the tonic features to remove axum dependency (#10348)
To help facilitate an upgrade to axum 0.8 (https://github.com/neondatabase/neon/pull/10332#pullrequestreview-2541989619) this massages the tonic dependency features so that tonic does not depend on axum.
This commit is contained in:
@@ -32,7 +32,6 @@ use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::time;
|
||||
use tonic::body::{self, empty_body, BoxBody};
|
||||
use tonic::codegen::Service;
|
||||
use tonic::transport::server::Connected;
|
||||
use tonic::Code;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::*;
|
||||
@@ -459,9 +458,10 @@ impl BrokerService for Broker {
|
||||
&self,
|
||||
request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
|
||||
) -> Result<Response<()>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let &RemoteAddr(remote_addr) = request
|
||||
.extensions()
|
||||
.get()
|
||||
.expect("RemoteAddr inserted by handler");
|
||||
let mut publisher = self.registry.register_publisher(remote_addr);
|
||||
|
||||
let mut stream = request.into_inner();
|
||||
@@ -484,9 +484,10 @@ impl BrokerService for Broker {
|
||||
&self,
|
||||
request: Request<SubscribeSafekeeperInfoRequest>,
|
||||
) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let &RemoteAddr(remote_addr) = request
|
||||
.extensions()
|
||||
.get()
|
||||
.expect("RemoteAddr inserted by handler");
|
||||
let proto_key = request
|
||||
.into_inner()
|
||||
.subscription_key
|
||||
@@ -537,9 +538,10 @@ impl BrokerService for Broker {
|
||||
&self,
|
||||
request: Request<SubscribeByFilterRequest>,
|
||||
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let &RemoteAddr(remote_addr) = request
|
||||
.extensions()
|
||||
.get()
|
||||
.expect("RemoteAddr inserted by handler");
|
||||
let proto_filter = request.into_inner();
|
||||
let ttid_filter = proto_filter.tenant_timeline_id.as_ref();
|
||||
|
||||
@@ -628,6 +630,9 @@ async fn http1_handler(
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct RemoteAddr(SocketAddr);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = Args::parse();
|
||||
@@ -687,13 +692,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.max_concurrent_streams(None);
|
||||
|
||||
let storage_broker_server_cloned = storage_broker_server.clone();
|
||||
let connect_info = stream.connect_info();
|
||||
let remote_addr = RemoteAddr(addr);
|
||||
let service_fn_ = async move {
|
||||
service_fn(move |mut req| {
|
||||
// That's what tonic's MakeSvc.call does to pass conninfo to
|
||||
// the request handler (and where its request.remote_addr()
|
||||
// expects it to find).
|
||||
req.extensions_mut().insert(connect_info.clone());
|
||||
req.extensions_mut().insert(remote_addr);
|
||||
|
||||
// Technically this second clone is not needed, but consume
|
||||
// by async block is apparently unavoidable. BTW, error
|
||||
|
||||
Reference in New Issue
Block a user