mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Implement prototype
This commit is contained in:
@@ -23,10 +23,11 @@ use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use pageserver_api::models::TimelineState;
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::{BrokerClientChannel, Code, Streaming};
|
||||
use storage_broker::proto::{DiscoveryRequest, SafekeeperTimelineInfo};
|
||||
use storage_broker::{BrokerClientChannel, Code};
|
||||
use storage_broker::Streaming;
|
||||
use tokio::select;
|
||||
use tracing::*;
|
||||
|
||||
@@ -198,6 +199,13 @@ pub(super) async fn connection_manager_loop_step(
|
||||
},
|
||||
None => {
|
||||
debug!("No candidates to retry, waiting indefinitely for the broker events");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let tenant_timeline_id = Some(ProtoTenantTimelineId {
|
||||
tenant_id: id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: id.timeline_id.as_ref().to_owned(),
|
||||
});
|
||||
let request = DiscoveryRequest { tenant_timeline_id };
|
||||
let _ = broker_client.send_discovery(request).await;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
|
||||
/// Subscribe and fetch all the interesting data from the broker.
|
||||
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
let discovery_conf = conf.clone();
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
@@ -105,11 +106,23 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
||||
let ttid = parse_proto_ttid(proto_ttid)?;
|
||||
|
||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
||||
// Note that we also receive *our own* info. That's
|
||||
// important, as it is used as an indication of live
|
||||
// connection to the broker.
|
||||
|
||||
if msg.is_discovery {
|
||||
let mut client = client.clone();
|
||||
let sk_info = tli.get_safekeeper_info(&discovery_conf).await;
|
||||
let stream = async_stream::stream! {
|
||||
yield sk_info;
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
let _ = client.publish_safekeeper_info(stream).await;
|
||||
});
|
||||
}
|
||||
|
||||
// note: there are blocking operations below, but it's considered fine for now
|
||||
let res = tli.record_safekeeper_info(msg).await;
|
||||
if res.is_ok() {
|
||||
|
||||
@@ -405,7 +405,7 @@ impl BrokerService for Broker {
|
||||
|
||||
async fn subscribe_discovery(
|
||||
&self,
|
||||
request: Request<SubscribeDiscoveryRequest>,
|
||||
_request: Request<SubscribeDiscoveryRequest>,
|
||||
) -> Result<Response<Self::SubscribeDiscoveryStream>, Status> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user