Compare commits

...

7 Commits

Author SHA1 Message Date
Christian Schwarz
4a1b52c12e WIP 2025-05-05 12:45:45 +02:00
Christian Schwarz
257693e4f2 WIP 2025-05-05 10:59:45 +02:00
Christian Schwarz
7aa9beaefd make sk compile 2025-05-04 17:06:46 +02:00
Christian Schwarz
35dbbbaf60 move discovery request mechanism into that type as well
Can't move the policy when we send disovery mechanism because that's
tied to connection_manager loop state.
2025-05-04 16:50:23 +02:00
Christian Schwarz
6380c9674c move subscription code into new client struct 2025-05-04 16:22:46 +02:00
Christian Schwarz
1f53688189 Revert "rip out broker binary target & launch of it in cplane & mention of it in docs"
This reverts commit 8f201b1580.
2025-05-04 14:40:44 +02:00
Christian Schwarz
8f201b1580 rip out broker binary target & launch of it in cplane & mention of it in docs 2025-05-04 14:38:52 +02:00
26 changed files with 502 additions and 212 deletions

1
Cargo.lock generated
View File

@@ -6634,6 +6634,7 @@ dependencies = [
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.0",
"tokio-util",
"tonic",
"tonic-build",
"tracing",

View File

@@ -6,9 +6,11 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::shard::TenantShardId;
use crate::membership::Configuration;
use crate::{ServerInfo, Term};
@@ -308,3 +310,14 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PutTenantPageserverLocationRequest {
pub pageserver_locations: Vec<TenantShardPageserverLocation>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverLocation {
pub generation: Generation,
pub pageserver_node_id: NodeId,
}

View File

@@ -46,6 +46,8 @@ pub struct ConnectionConfigArgs<'a> {
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
pub pageserver_generation: Option<u32>,
}
impl<'a> ConnectionConfigArgs<'a> {
@@ -72,6 +74,10 @@ impl<'a> ConnectionConfigArgs<'a> {
));
}
if let Some(pageserver_generation) = self.pageserver_generation {
options.push(format!("pageserver_generation={pageserver_generation}"));
}
options
}
}

View File

@@ -423,11 +423,14 @@ fn start_pageserver(
.map(storage_broker::Certificate::from_pem),
);
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(
let service_client = storage_broker::connect(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
tls_config,
)
)?;
anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(
service_client,
))
})
.with_context(|| {
format!(

View File

@@ -100,7 +100,7 @@ pub struct State {
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: &'static [&'static str],
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
@@ -114,7 +114,7 @@ impl State {
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,

View File

@@ -48,7 +48,6 @@ use remote_timeline_client::{
download_tenant_manifest,
};
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
@@ -152,7 +151,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub broker_client: storage_broker::TimelineUpdatesSubscriber,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
@@ -2122,7 +2121,7 @@ impl TenantShard {
async fn unoffload_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: RequestContext,
) -> Result<Arc<Timeline>, TimelineArchivalError> {
info!("unoffloading timeline");
@@ -2257,7 +2256,7 @@ impl TenantShard {
self: &Arc<Self>,
timeline_id: TimelineId,
new_state: TimelineArchivalState,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
@@ -2586,7 +2585,7 @@ impl TenantShard {
pub(crate) async fn create_timeline(
self: &Arc<TenantShard>,
params: CreateTimelineParams,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
if !self.is_active() {
@@ -3321,7 +3320,7 @@ impl TenantShard {
/// to delay background jobs. Background jobs can be started right away when None is given.
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
@@ -3951,7 +3950,7 @@ where
enum ActivateTimelineArgs {
Yes {
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
},
No,
}

View File

@@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::sync::{Notify, oneshot, watch};
@@ -2070,7 +2069,7 @@ impl Timeline {
pub(crate) fn activate(
self: &Arc<Self>,
parent: Arc<crate::tenant::TenantShard>,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
@@ -3112,7 +3111,7 @@ impl Timeline {
fn launch_wal_receiver(
self: &Arc<Self>,
ctx: &RequestContext,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
) {
info!(
"launching WAL receiver for timeline {} of tenant {}",

View File

@@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> {
tenant: Arc<TenantShard>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
self.write(|raw_timeline| async move {

View File

@@ -28,7 +28,6 @@ use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -69,7 +68,7 @@ impl WalReceiver {
pub fn start(
timeline: Arc<Timeline>,
conf: WalReceiverConf,
mut broker_client: BrokerClientChannel,
mut broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> Self {
let tenant_shard_id = timeline.tenant_shard_id;

View File

@@ -17,19 +17,12 @@ use std::time::Duration;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use futures::StreamExt;
use pageserver_api::models::TimelineState;
use postgres_connection::PgConnectionConfig;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
TypedMessage,
};
use storage_broker::{BrokerClientChannel, Code, Streaming};
use storage_broker::proto::SafekeeperDiscoveryResponse;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::{
@@ -56,7 +49,7 @@ pub(crate) struct Cancelled;
///
/// Not cancellation-safe. Use `cancel` token to request cancellation.
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
broker_client: &mut storage_broker::TimelineUpdatesSubscriber,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
cancel: &CancellationToken,
@@ -81,11 +74,6 @@ pub(super) async fn connection_manager_loop_step(
WALRECEIVER_ACTIVE_MANAGERS.dec();
}
let id = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
};
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
@@ -101,7 +89,12 @@ pub(super) async fn connection_manager_loop_step(
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
let (timeline_updates, mut discovery_requester) = broker_client.subscribe(
connection_manager_state.timeline.tenant_shard_id,
connection_manager_state.timeline.timeline_id,
cancel,
);
let mut timeline_updates = Box::pin(timeline_updates);
debug!("Subscribed for broker timeline updates");
loop {
@@ -155,29 +148,10 @@ pub(super) async fn connection_manager_loop_step(
}
},
// Got a new update from the broker
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
match broker_update {
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
warn!("broker subscription failed: {status}");
}
}
return Ok(());
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
return Ok(());
}
}
// Got a new update from the broker.
// The stream ends with None if and only if `cancel` is cancelled.
Some(timeline_update) = timeline_updates.next() => {
connection_manager_state.register_timeline_update(timeline_update)
},
new_event = async {
@@ -258,32 +232,11 @@ pub(super) async fn connection_manager_loop_step(
tokio::time::sleep(next_discovery_ts - now).await;
}
let tenant_timeline_id = Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
});
let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
let msg = TypedMessage {
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
info!("No active connection and no candidates, sending discovery request to the broker");
discovery_requester.request().await;
last_discovery_ts = Some(std::time::Instant::now());
info!("No active connection and no candidates, sending discovery request to the broker");
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = broker_client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
None
} => {}
}
@@ -298,63 +251,6 @@ pub(super) async fn connection_manager_loop_step(
}
}
/// Endlessly try to subscribe for broker updates for a given timeline.
async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,
id: TenantTimelineId,
cancel: &CancellationToken,
) -> Result<Streaming<TypedMessage>, Cancelled> {
let mut attempt = 0;
loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempt += 1;
// subscribe to the specific timeline
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
}),
}),
};
match {
tokio::select! {
r = broker_client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { return Err(Cancelled); }
}
} {
Ok(resp) => {
return Ok(resp.into_inner());
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
info!(
"Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
);
continue;
}
}
}
}
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
@@ -695,44 +591,14 @@ impl ConnectionManagerState {
}
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
return;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
return;
}
}
}
_ => {
// unexpected message
return;
}
};
fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) {
WALRECEIVER_BROKER_UPDATES.inc();
let storage_broker::TimelineShardUpdate {
is_discovery,
inner: timeline_update,
} = timeline_update;
trace!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon
@@ -1013,7 +879,9 @@ impl ConnectionManagerState {
shard_stripe_size,
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
availability_zone: self.conf.availability_zone.as_deref(),
// TODO: do we still have the emergency mode that runs without generations? If so, this expect would panic in that mode.
pageserver_generation: Some(self.timeline.generation.into().expect("attachments always have a generation number nowadays")),
};
match wal_stream_connection_config(connection_conf_args) {

View File

@@ -24,7 +24,7 @@ use safekeeper::defaults::{
};
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, wal_backup, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};

View File

@@ -50,7 +50,8 @@ async fn push_loop(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
)?
.into_raw_grpc_client();
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
let outbound = async_stream::stream! {
@@ -97,7 +98,8 @@ async fn pull_loop(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
)?
.into_raw_grpc_client();
// TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest {
@@ -153,7 +155,8 @@ async fn discover_loop(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
)?
.into_raw_grpc_client();
let request = SubscribeByFilterRequest {
types: vec![TypeSubscription {

View File

@@ -18,6 +18,7 @@ use safekeeper_api::models::ConnectionId;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{Instrument, debug, info, info_span};
use utils::auth::{Claims, JwtAuth, Scope};
use utils::generation::{self, Generation};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::PostgresClientProtocol;
@@ -37,6 +38,7 @@ pub struct SafekeeperPostgresHandler {
pub timeline_id: Option<TimelineId>,
pub ttid: TenantTimelineId,
pub shard: Option<ShardIdentity>,
pub pageserver_generation: Option<Generation>,
pub protocol: Option<PostgresClientProtocol>,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
@@ -159,6 +161,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
let mut shard_count: Option<u8> = None;
let mut shard_number: Option<u8> = None;
let mut shard_stripe_size: Option<u32> = None;
let mut pageserver_generation: Option<Generation> = None;
for opt in options {
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
@@ -201,6 +204,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
format!("Failed to parse {value} as shard stripe size")
})?);
}
Some(("pageserver_generation", value)) => {
self.pageserver_generation =
Some(value.parse::<u32>().map(Generation::new).with_context(
|| format!("Failed to parse {value} as generation"),
)?);
}
_ => continue,
}
}
@@ -259,6 +268,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
tracing::Span::current().record("shard", tracing::field::display(slug));
}
}
if let Some(pageserver_generation) = self.pageserver_generation {
tracing::Span::current().record(
"pageserver_generation",
tracing::field::display(pageserver_generation.get_suffix()),
);
}
Ok(())
} else {
@@ -370,6 +385,7 @@ impl SafekeeperPostgresHandler {
timeline_id: None,
ttid: TenantTimelineId::empty(),
shard: None,
pageserver_generation: None,
protocol: None,
conn_id,
claims: None,

View File

@@ -5,7 +5,7 @@ pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
pub async fn task_main_http(
conf: Arc<SafeKeeperConf>,

View File

@@ -17,9 +17,10 @@ use hyper::{Body, Request, Response, StatusCode};
use pem::Pem;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
TimelineStatus, TimelineTermBumpRequest,
AcceptorStateStatus, PullTimelineRequest, PutTenantPageserverLocationRequest, SafekeeperStatus,
SkTimelineInfo, TenantDeleteResult, TenantShardPageserverLocation, TermSwitchApiEntry,
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
TimelineTermBumpRequest,
};
use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
@@ -31,12 +32,15 @@ use tracing::{Instrument, info_span};
use utils::auth::SwappableJwtAuth;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use crate::debug_dump::TimelineDigestRequest;
use crate::safekeeper::TermLsn;
use crate::timelines_global_map::DeleteOrExclude;
use crate::wal_advertiser::advmap::UpdatePageserverAttachmentsArg;
use crate::{
GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
wal_advertiser,
};
/// Healthcheck handler.
@@ -91,6 +95,39 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, response_body)
}
async fn tenant_put_pageserver_attachments(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let PutTenantPageserverLocationRequest {
pageserver_locations,
}: PutTenantPageserverLocationRequest = json_request(&mut request).await?;
let global_timelines = get_global_timelines(&request);
let wal_advertiser = global_timelines.get_wal_advertiser();
wal_advertiser
.update_pageserver_attachments(
tenant_shard_id,
pageserver_locations
.into_iter()
.map(
|TenantShardPageserverLocation {
generation,
pageserver_node_id,
}| UpdatePageserverAttachmentsArg {
generation,
pageserver_node_id,
},
)
.collect(),
)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
@@ -714,6 +751,9 @@ pub fn make_router(
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
.put("/v1/tenant/:tenant_shard_id/pageserver_attachments", |r| {
request_span(r, tenant_put_pageserver_attachments)
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)

View File

@@ -38,6 +38,7 @@ pub mod timeline_eviction;
pub mod timeline_guard;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_advertiser;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_reader_stream;

View File

@@ -360,6 +360,7 @@ async fn recovery_stream(
listen_pg_addr_str: &donor.pg_connstr,
auth_token: None,
availability_zone: None,
pageserver_generation: None,
};
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();

View File

@@ -37,6 +37,7 @@ use crate::send_interpreted_wal::{
Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender,
};
use crate::timeline::WalResidentTimeline;
use crate::wal_advertiser;
use crate::wal_reader_stream::StreamingWalReader;
use crate::wal_storage::WalReader;
@@ -657,10 +658,29 @@ impl SafekeeperPostgresHandler {
let tli_cancel = tli.cancel.clone();
let wal_advertiser = match (self.shard, self.pageserver_generation) {
(Some(shard), Some(pageserver_generation)) => {
Some(tli.wal_advertiser.get_pageserver_timeline(
self.ttid,
shard.shard_index(),
pageserver_generation,
))
}
(shard, pageserver_generation) => {
debug!(
?shard,
?pageserver_generation,
"cannot feedback last_record_lsn to wal_advertiser subsystem, client must specify shard and pageserver_generation"
);
None
}
};
let mut reply_reader = ReplyReader {
reader,
ws_guard: ws_guard.clone(),
tli,
wal_advertiser,
};
let res = tokio::select! {
@@ -977,6 +997,7 @@ struct ReplyReader<IO> {
reader: PostgresBackendReader<IO>,
ws_guard: Arc<WalSenderGuard>,
tli: WalResidentTimeline,
wal_advertiser: Option<Arc<wal_advertiser::advmap::PageserverTimeline>>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
@@ -1023,6 +1044,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.await;
if let Some(wal_advertiser) = &self.wal_advertiser {
wal_advertiser.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn);
}
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through
// broker updated, not need to do it here

View File

@@ -106,6 +106,7 @@ impl Env {
&timeline_dir,
&remote_path,
shared_state,
todo!(),
conf.clone(),
);
timeline.bootstrap(

View File

@@ -24,6 +24,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use utils::sync::gate::Gate;
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
@@ -38,7 +39,9 @@ use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
use crate::{
SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage,
};
fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
@@ -446,6 +449,7 @@ pub struct Timeline {
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
mutex: RwLock<SharedState>,
pub(crate) wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,
timeline_dir: Utf8PathBuf,
@@ -475,6 +479,7 @@ impl Timeline {
timeline_dir: &Utf8Path,
remote_path: &RemotePath,
shared_state: SharedState,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
conf: Arc<SafeKeeperConf>,
) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
@@ -498,6 +503,7 @@ impl Timeline {
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(shared_state),
wal_advertiser,
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
gate: Default::default(),
@@ -516,6 +522,7 @@ impl Timeline {
pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
@@ -528,6 +535,7 @@ impl Timeline {
&timeline_dir,
&remote_path,
shared_state,
wal_advertiser,
conf,
))
}

View File

@@ -2,6 +2,7 @@
//! All timelines should always be present in this map, this is done by loading them
//! all from the disk on startup and keeping them in memory.
use std::any;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
@@ -11,13 +12,16 @@ use anyhow::{Context, Result, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
use safekeeper_api::models::{
SafekeeperUtilization, TenantShardPageserverLocation, TimelineDeleteResult,
};
use safekeeper_api::{ServerInfo, membership};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::http::routes::DeleteOrExcludeError;
@@ -26,7 +30,7 @@ use crate::state::TimelinePersistentState;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_storage};
use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage};
// Timeline entry in the global map: either a ready timeline, or mark that it is
// being created.
@@ -46,16 +50,25 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
wal_advertisement: Arc<wal_advertiser::advmap::World>,
global_rate_limiter: RateLimiter,
}
impl GlobalTimelinesState {
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
fn get_dependencies(
&self,
) -> (
Arc<SafeKeeperConf>,
Arc<TimelinesSet>,
RateLimiter,
Arc<wal_advertiser::advmap::World>,
) {
(
self.conf.clone(),
self.broker_active_set.clone(),
self.global_rate_limiter.clone(),
self.wal_advertisement.clone(),
)
}
@@ -91,6 +104,7 @@ impl GlobalTimelines {
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()),
global_rate_limiter: RateLimiter::new(1, 1),
}),
}
@@ -147,12 +161,13 @@ impl GlobalTimelines {
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = {
let state = self.state.lock().unwrap();
state.get_dependencies()
};
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
{
@@ -162,7 +177,8 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), ttid) {
let wal_advertiser = wal_advertiser.load_timeline(ttid);
match Timeline::load_timeline(conf.clone(), ttid, wal_advertiser) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
@@ -222,7 +238,7 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -267,7 +283,7 @@ impl GlobalTimelines {
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = {
let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
@@ -296,7 +312,10 @@ impl GlobalTimelines {
};
// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
let wal_advertiser = wal_advertiser.load_timeline(ttid);
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, wal_advertiser, conf.clone())
.await
{
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = self.state.lock().unwrap();
@@ -335,6 +354,7 @@ impl GlobalTimelines {
async fn install_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
conf: Arc<SafeKeeperConf>,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
@@ -377,7 +397,7 @@ impl GlobalTimelines {
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid)
Timeline::load_timeline(conf, ttid, wal_advertiser)
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
@@ -565,6 +585,10 @@ impl GlobalTimelines {
Ok(deleted)
}
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::advmap::World> {
self.state.lock().unwrap().wal_advertisement.clone()
}
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
let mut state = self.state.lock().unwrap();

View File

@@ -0,0 +1,20 @@
//! Advertise pending WAL to all pageservers that might be interested in it.
use std::{collections::HashSet, sync::Arc, time::Duration};
use crate::{GlobalTimelines, SafeKeeperConf};
pub(crate) mod advmap;
pub(crate) async fn wal_advertiser_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
todo!();
node_loop().await;
Ok(())
}
pub(crate) async fn node_loop() {
loop {}
}

View File

@@ -0,0 +1,78 @@
//! The data structure that track advertisement state.
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use serde::Serialize;
use utils::{
generation::Generation,
id::{NodeId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::{ShardIndex, TenantShardId},
};
#[derive(Default)]
pub struct World {
pageservers: RwLock<HashMap<NodeId, Arc<Pageserver>>>,
}
pub struct Pageserver {
node_id: NodeId,
attachments: RwLock<HashMap<TenantShardId, Arc<PageserverAttachment>>>,
}
pub struct PageserverAttachment {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
generation: Generation,
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<PageserverTimeline>>>,
}
pub struct PageserverTimeline {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
remote_consistent_lsn: RwLock<Lsn>,
}
pub struct SafekeeperTimeline {}
pub struct UpdatePageserverAttachmentsArg {
pub generation: Generation,
pub pageserver_node_id: NodeId,
}
impl World {
pub fn housekeeping(&self) {}
pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc<SafekeeperTimeline> {
todo!()
}
pub fn update_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
arg: Vec<UpdatePageserverAttachmentsArg>,
) -> anyhow::Result<()> {
todo!()
}
}
impl SafekeeperTimeline {
pub fn get_pageserver_timeline(
&self,
ttld: TenantTimelineId,
shard: ShardIndex,
pageserver_generation: Generation,
) -> Arc<PageserverTimeline> {
assert!(!pageserver_generation.is_none());
todo!()
}
}
impl PageserverTimeline {
pub fn update_remote_consistent_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
todo!()
}
}

View File

@@ -18,7 +18,7 @@ use utils::measured_stream::MeasuredStream;
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::TrafficMetrics;
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
/// Accept incoming TCP connections and spawn them into a background thread.
///
@@ -51,7 +51,7 @@ pub async fn task_main(
error!("connection handler exited: {}", err);
}
}
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty)),
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty, pageserver_generation = field::Empty)),
);
}
}

View File

@@ -27,6 +27,7 @@ parking_lot.workspace = true
prost.workspace = true
tonic.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
metrics.workspace = true

View File

@@ -1,10 +1,14 @@
use std::time::Duration;
use futures::Stream;
use proto::TenantTimelineId as ProtoTenantTimelineId;
use proto::broker_service_client::BrokerServiceClient;
use tokio_util::sync::CancellationToken;
use tonic::Status;
use tonic::codegen::StdError;
use tonic::transport::{Channel, Endpoint};
use tonic::transport::Endpoint;
use tracing::{debug, error, info, warn};
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
// Code generated by protobuf.
@@ -22,6 +26,7 @@ pub mod metrics;
pub use hyper::Uri;
pub use tonic::transport::{Certificate, ClientTlsConfig};
pub use tonic::{Code, Request, Streaming};
use utils::shard::TenantShardId;
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
@@ -29,9 +34,199 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
// BrokerServiceClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
#[derive(Clone)]
pub struct TimelineUpdatesSubscriber {
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code.
pub struct BrokerClientChannel {
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
impl BrokerClientChannel {
pub fn into_raw_grpc_client(
self,
) -> proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel> {
self.client
}
}
pub struct TimelineShardUpdate {
pub is_discovery: bool,
pub inner: proto::SafekeeperDiscoveryResponse,
}
pub struct DiscoveryRequester {
id: ProtoTenantTimelineId,
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
impl TimelineUpdatesSubscriber {
pub fn new(service_client: BrokerClientChannel) -> Self {
Self {
client: service_client.client.clone(),
}
}
pub fn subscribe(
&mut self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
cancel: &CancellationToken,
) -> (impl Stream<Item = TimelineShardUpdate>, DiscoveryRequester) {
let id = ProtoTenantTimelineId {
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
timeline_id: timeline_id.as_ref().to_owned(),
};
let discovery_requester = DiscoveryRequester {
id: id.clone(),
client: self.client.clone(),
};
let stream = async_stream::stream! {
let mut attempt = 0;
'resubscribe: loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempt += 1;
use proto::*;
// subscribe to the specific timeline
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(id.clone()),
}),
};
let res = tokio::select! {
r = self.client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { return; }
};
let mut update_stream = match res
{
Ok(resp) => {
resp.into_inner()
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
info!(
attempt, "failed to subscribe: {e:#}"
);
continue 'resubscribe;
}
};
loop {
let broker_update = tokio::select!{
_ = cancel.cancelled() => {
return;
}
update = update_stream.message() => { update }
};
match broker_update {
Ok(Some(typed_msg)) => {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
continue 'resubscribe;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
continue 'resubscribe;
}
}
}
_ => {
// unexpected message
warn!("unexpected message from broker: {typed_msg:?}");
continue 'resubscribe;
}
};
attempt = 0; // reset backoff iff we received a valid update
yield TimelineShardUpdate{is_discovery, inner: timeline_update };
},
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
warn!("broker subscription failed: {status}");
}
}
continue 'resubscribe;
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
continue 'resubscribe;
}
}
}
}
};
(stream, discovery_requester)
}
}
impl DiscoveryRequester {
pub async fn request(&mut self) {
let request = proto::SafekeeperDiscoveryRequest {
tenant_timeline_id: Some(self.id.clone()),
};
let msg = proto::TypedMessage {
r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = self.client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
}
}
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
@@ -67,19 +262,9 @@ where
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
// keep_alive_timeout is 20s by default on both client and server side
let channel = tonic_endpoint.connect_lazy();
Ok(BrokerClientChannel::new(channel))
}
impl BrokerClientChannel {
/// Create a new client to the given endpoint, but don't actually connect until the first request.
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
Ok(Self::new(conn))
}
Ok(BrokerClientChannel {
client: proto::broker_service_client::BrokerServiceClient::new(channel),
})
}
// parse variable length bytes from protobuf