mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
7 Commits
add_audit_
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4a1b52c12e | ||
|
|
257693e4f2 | ||
|
|
7aa9beaefd | ||
|
|
35dbbbaf60 | ||
|
|
6380c9674c | ||
|
|
1f53688189 | ||
|
|
8f201b1580 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6634,6 +6634,7 @@ dependencies = [
|
||||
"rustls 0.23.18",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
|
||||
@@ -168,35 +168,6 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
// Always add pgaudit to shared_preload_libraries.
|
||||
//
|
||||
// This is needed to handle the downgrade scenario.
|
||||
// pgaudit extension creates event triggers that require library to be loaded.
|
||||
// so, once extension was installed it must always be present in shared_preload_libraries.
|
||||
let mut extra_shared_preload_libraries = String::new();
|
||||
|
||||
let libs = {
|
||||
// We don't distribute pgaudit in the testing image,
|
||||
// and don't pass shared_preload_libraries via spec,
|
||||
// so disable this logic there.
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
String::new()
|
||||
}
|
||||
#[cfg(not(feature = "testing"))]
|
||||
{
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("shared_preload_libraries")
|
||||
.expect("shared_preload_libraries setting is missing in the spec")
|
||||
}
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "testing"))]
|
||||
if !libs.contains("pgaudit") {
|
||||
extra_shared_preload_libraries.push_str(",pgaudit");
|
||||
};
|
||||
|
||||
// If base audit logging is enabled, configure it.
|
||||
// In this setup, the audit log will be written to the standard postgresql log.
|
||||
//
|
||||
@@ -206,22 +177,29 @@ pub fn write_postgres_conf(
|
||||
// This way we always override the settings from the spec
|
||||
// and don't allow the user or the control plane admin to change them.
|
||||
match spec.audit_log_level {
|
||||
ComputeAudit::Disabled => {
|
||||
// this is the default, but let's be explicit
|
||||
writeln!(file, "pgaudit.log='none'")?;
|
||||
}
|
||||
ComputeAudit::Disabled => {}
|
||||
ComputeAudit::Log | ComputeAudit::Base => {
|
||||
writeln!(file, "# Managed by compute_ctl base audit settings: start")?;
|
||||
writeln!(file, "pgaudit.log='ddl,role'")?;
|
||||
// Disable logging of catalog queries to reduce the noise
|
||||
writeln!(file, "pgaudit.log_catalog=off")?;
|
||||
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
)?;
|
||||
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
let mut extra_shared_preload_libraries = String::new();
|
||||
if !libs.contains("pgaudit") {
|
||||
extra_shared_preload_libraries.push_str(",pgaudit");
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
)?;
|
||||
} else {
|
||||
// Typically, this should be unreacheable,
|
||||
// because we always set at least some shared_preload_libraries in the spec
|
||||
// but let's handle it explicitly anyway.
|
||||
writeln!(file, "shared_preload_libraries='neon,pgaudit'")?;
|
||||
}
|
||||
writeln!(file, "# Managed by compute_ctl base audit settings: end")?;
|
||||
}
|
||||
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
|
||||
@@ -250,15 +228,28 @@ pub fn write_postgres_conf(
|
||||
// The caller who sets the flag is responsible for ensuring that the necessary
|
||||
// shared_preload_libraries are present in the compute image,
|
||||
// otherwise the compute start will fail.
|
||||
if !libs.contains("pgauditlogtofile") {
|
||||
extra_shared_preload_libraries.push_str(",pgauditlogtofile");
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
let mut extra_shared_preload_libraries = String::new();
|
||||
if !libs.contains("pgaudit") {
|
||||
extra_shared_preload_libraries.push_str(",pgaudit");
|
||||
}
|
||||
if !libs.contains("pgauditlogtofile") {
|
||||
extra_shared_preload_libraries.push_str(",pgauditlogtofile");
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
)?;
|
||||
} else {
|
||||
// Typically, this should be unreacheable,
|
||||
// because we always set at least some shared_preload_libraries in the spec
|
||||
// but let's handle it explicitly anyway.
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='neon,pgaudit,pgauditlogtofile'"
|
||||
)?;
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
)?;
|
||||
|
||||
writeln!(
|
||||
file,
|
||||
"# Managed by compute_ctl compliance audit settings: end"
|
||||
|
||||
@@ -6,9 +6,11 @@ use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::TimestampTz;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::membership::Configuration;
|
||||
use crate::{ServerInfo, Term};
|
||||
@@ -308,3 +310,14 @@ pub struct PullTimelineResponse {
|
||||
pub safekeeper_host: Option<String>,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PutTenantPageserverLocationRequest {
|
||||
pub pageserver_locations: Vec<TenantShardPageserverLocation>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct TenantShardPageserverLocation {
|
||||
pub generation: Generation,
|
||||
pub pageserver_node_id: NodeId,
|
||||
}
|
||||
|
||||
@@ -46,6 +46,8 @@ pub struct ConnectionConfigArgs<'a> {
|
||||
|
||||
pub auth_token: Option<&'a str>,
|
||||
pub availability_zone: Option<&'a str>,
|
||||
|
||||
pub pageserver_generation: Option<u32>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectionConfigArgs<'a> {
|
||||
@@ -72,6 +74,10 @@ impl<'a> ConnectionConfigArgs<'a> {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(pageserver_generation) = self.pageserver_generation {
|
||||
options.push(format!("pageserver_generation={pageserver_generation}"));
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
}
|
||||
|
||||
@@ -423,11 +423,14 @@ fn start_pageserver(
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
);
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
storage_broker::connect(
|
||||
let service_client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
tls_config,
|
||||
)
|
||||
)?;
|
||||
anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(
|
||||
service_client,
|
||||
))
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -100,7 +100,7 @@ pub struct State {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
allowlist_routes: &'static [&'static str],
|
||||
remote_storage: GenericRemoteStorage,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
@@ -114,7 +114,7 @@ impl State {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
|
||||
@@ -48,7 +48,6 @@ use remote_timeline_client::{
|
||||
download_tenant_manifest,
|
||||
};
|
||||
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
|
||||
use timeline::offload::{OffloadError, offload_timeline};
|
||||
use timeline::{
|
||||
@@ -152,7 +151,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
/// as the shared remote storage client and process initialization state.
|
||||
#[derive(Clone)]
|
||||
pub struct TenantSharedResources {
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub l0_flush_global_state: L0FlushGlobalState,
|
||||
@@ -2122,7 +2121,7 @@ impl TenantShard {
|
||||
async fn unoffload_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: RequestContext,
|
||||
) -> Result<Arc<Timeline>, TimelineArchivalError> {
|
||||
info!("unoffloading timeline");
|
||||
@@ -2257,7 +2256,7 @@ impl TenantShard {
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
new_state: TimelineArchivalState,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
info!("setting timeline archival config");
|
||||
@@ -2586,7 +2585,7 @@ impl TenantShard {
|
||||
pub(crate) async fn create_timeline(
|
||||
self: &Arc<TenantShard>,
|
||||
params: CreateTimelineParams,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
if !self.is_active() {
|
||||
@@ -3321,7 +3320,7 @@ impl TenantShard {
|
||||
/// to delay background jobs. Background jobs can be started right away when None is given.
|
||||
fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
@@ -3951,7 +3950,7 @@ where
|
||||
|
||||
enum ActivateTimelineArgs {
|
||||
Yes {
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
},
|
||||
No,
|
||||
}
|
||||
|
||||
@@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
|
||||
use rand::Rng;
|
||||
use remote_storage::DownloadError;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{Notify, oneshot, watch};
|
||||
@@ -2070,7 +2069,7 @@ impl Timeline {
|
||||
pub(crate) fn activate(
|
||||
self: &Arc<Self>,
|
||||
parent: Arc<crate::tenant::TenantShard>,
|
||||
broker_client: BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
@@ -3112,7 +3111,7 @@ impl Timeline {
|
||||
fn launch_wal_receiver(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
broker_client: BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
) {
|
||||
info!(
|
||||
"launching WAL receiver for timeline {} of tenant {}",
|
||||
|
||||
@@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
tenant: Arc<TenantShard>,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
self.write(|raw_timeline| async move {
|
||||
|
||||
@@ -28,7 +28,6 @@ use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -69,7 +68,7 @@ impl WalReceiver {
|
||||
pub fn start(
|
||||
timeline: Arc<Timeline>,
|
||||
conf: WalReceiverConf,
|
||||
mut broker_client: BrokerClientChannel,
|
||||
mut broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: &RequestContext,
|
||||
) -> Self {
|
||||
let tenant_shard_id = timeline.tenant_shard_id;
|
||||
|
||||
@@ -17,19 +17,12 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
|
||||
TypedMessage,
|
||||
};
|
||||
use storage_broker::{BrokerClientChannel, Code, Streaming};
|
||||
use storage_broker::proto::SafekeeperDiscoveryResponse;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::backoff::{
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
|
||||
};
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::{
|
||||
@@ -56,7 +49,7 @@ pub(crate) struct Cancelled;
|
||||
///
|
||||
/// Not cancellation-safe. Use `cancel` token to request cancellation.
|
||||
pub(super) async fn connection_manager_loop_step(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
broker_client: &mut storage_broker::TimelineUpdatesSubscriber,
|
||||
connection_manager_state: &mut ConnectionManagerState,
|
||||
ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
@@ -81,11 +74,6 @@ pub(super) async fn connection_manager_loop_step(
|
||||
WALRECEIVER_ACTIVE_MANAGERS.dec();
|
||||
}
|
||||
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
|
||||
timeline_id: connection_manager_state.timeline.timeline_id,
|
||||
};
|
||||
|
||||
let mut timeline_state_updates = connection_manager_state
|
||||
.timeline
|
||||
.subscribe_for_state_updates();
|
||||
@@ -101,7 +89,12 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// Subscribe to the broker updates. Stream shares underlying TCP connection
|
||||
// with other streams on this client (other connection managers). When
|
||||
// object goes out of scope, stream finishes in drop() automatically.
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
|
||||
let (timeline_updates, mut discovery_requester) = broker_client.subscribe(
|
||||
connection_manager_state.timeline.tenant_shard_id,
|
||||
connection_manager_state.timeline.timeline_id,
|
||||
cancel,
|
||||
);
|
||||
let mut timeline_updates = Box::pin(timeline_updates);
|
||||
debug!("Subscribed for broker timeline updates");
|
||||
|
||||
loop {
|
||||
@@ -155,29 +148,10 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
},
|
||||
|
||||
// Got a new update from the broker
|
||||
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
|
||||
match broker_update {
|
||||
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
||||
Err(status) => {
|
||||
match status.code() {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
|
||||
// tonic's error handling doesn't provide a clear code for disconnections: we get
|
||||
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
|
||||
// => https://github.com/neondatabase/neon/issues/9562
|
||||
info!("broker disconnected: {status}");
|
||||
},
|
||||
_ => {
|
||||
warn!("broker subscription failed: {status}");
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("broker subscription stream ended"); // can't happen
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
// Got a new update from the broker.
|
||||
// The stream ends with None if and only if `cancel` is cancelled.
|
||||
Some(timeline_update) = timeline_updates.next() => {
|
||||
connection_manager_state.register_timeline_update(timeline_update)
|
||||
},
|
||||
|
||||
new_event = async {
|
||||
@@ -258,32 +232,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
tokio::time::sleep(next_discovery_ts - now).await;
|
||||
}
|
||||
|
||||
let tenant_timeline_id = Some(ProtoTenantTimelineId {
|
||||
tenant_id: id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: id.timeline_id.as_ref().to_owned(),
|
||||
});
|
||||
let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
|
||||
let msg = TypedMessage {
|
||||
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
|
||||
safekeeper_timeline_info: None,
|
||||
safekeeper_discovery_request: Some(request),
|
||||
safekeeper_discovery_response: None,
|
||||
};
|
||||
info!("No active connection and no candidates, sending discovery request to the broker");
|
||||
discovery_requester.request().await;
|
||||
|
||||
last_discovery_ts = Some(std::time::Instant::now());
|
||||
info!("No active connection and no candidates, sending discovery request to the broker");
|
||||
|
||||
// Cancellation safety: we want to send a message to the broker, but publish_one()
|
||||
// function can get cancelled by the other select! arm. This is absolutely fine, because
|
||||
// we just want to receive broker updates and discovery is not important if we already
|
||||
// receive updates.
|
||||
//
|
||||
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
|
||||
// This is totally fine because of the reason above.
|
||||
|
||||
// This is a fire-and-forget request, we don't care about the response
|
||||
let _ = broker_client.publish_one(msg).await;
|
||||
debug!("Discovery request sent to the broker");
|
||||
None
|
||||
} => {}
|
||||
}
|
||||
@@ -298,63 +251,6 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
}
|
||||
|
||||
/// Endlessly try to subscribe for broker updates for a given timeline.
|
||||
async fn subscribe_for_timeline_updates(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
id: TenantTimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Streaming<TypedMessage>, Cancelled> {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
exponential_backoff(
|
||||
attempt,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
|
||||
// subscribe to the specific timeline
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo as i32,
|
||||
},
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
|
||||
},
|
||||
],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: id.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
}),
|
||||
};
|
||||
|
||||
match {
|
||||
tokio::select! {
|
||||
r = broker_client.subscribe_by_filter(request) => { r }
|
||||
_ = cancel.cancelled() => { return Err(Cancelled); }
|
||||
}
|
||||
} {
|
||||
Ok(resp) => {
|
||||
return Ok(resp.into_inner());
|
||||
}
|
||||
Err(e) => {
|
||||
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
||||
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
|
||||
info!(
|
||||
"Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
|
||||
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
|
||||
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
|
||||
@@ -695,44 +591,14 @@ impl ConnectionManagerState {
|
||||
}
|
||||
|
||||
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
|
||||
fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
|
||||
let mut is_discovery = false;
|
||||
let timeline_update = match typed_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => {
|
||||
let info = match typed_msg.safekeeper_timeline_info {
|
||||
Some(info) => info,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_timeline_info");
|
||||
return;
|
||||
}
|
||||
};
|
||||
SafekeeperDiscoveryResponse {
|
||||
safekeeper_id: info.safekeeper_id,
|
||||
tenant_timeline_id: info.tenant_timeline_id,
|
||||
commit_lsn: info.commit_lsn,
|
||||
safekeeper_connstr: info.safekeeper_connstr,
|
||||
availability_zone: info.availability_zone,
|
||||
standby_horizon: info.standby_horizon,
|
||||
}
|
||||
}
|
||||
MessageType::SafekeeperDiscoveryResponse => {
|
||||
is_discovery = true;
|
||||
match typed_msg.safekeeper_discovery_response {
|
||||
Some(response) => response,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_discovery_response");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// unexpected message
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) {
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
let storage_broker::TimelineShardUpdate {
|
||||
is_discovery,
|
||||
inner: timeline_update,
|
||||
} = timeline_update;
|
||||
|
||||
trace!(
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
@@ -1013,7 +879,9 @@ impl ConnectionManagerState {
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
availability_zone: self.conf.availability_zone.as_deref(),
|
||||
// TODO: do we still have the emergency mode that runs without generations? If so, this expect would panic in that mode.
|
||||
pageserver_generation: Some(self.timeline.generation.into().expect("attachments always have a generation number nowadays")),
|
||||
};
|
||||
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
|
||||
@@ -97,7 +99,7 @@ impl VirtualFile {
|
||||
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let mode = get_io_mode();
|
||||
@@ -110,16 +112,21 @@ impl VirtualFile {
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
if set_o_direct {
|
||||
let open_options = open_options.clone();
|
||||
let open_options = if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
let mut open_options = open_options;
|
||||
open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
open_options
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
open_options
|
||||
};
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
@@ -523,7 +530,7 @@ impl VirtualFileInner {
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -551,11 +558,10 @@ impl VirtualFileInner {
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let reopen_options = open_options
|
||||
.clone()
|
||||
.create(false)
|
||||
.create_new(false)
|
||||
.truncate(false);
|
||||
let mut reopen_options = open_options.clone();
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
@@ -1301,7 +1307,7 @@ mod tests {
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
|
||||
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
@@ -1368,7 +1374,7 @@ mod tests {
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
@@ -1387,7 +1393,8 @@ mod tests {
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true),
|
||||
.truncate(true)
|
||||
.to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1405,7 +1412,12 @@ mod tests {
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
|
||||
let mut vfile = A::open(
|
||||
path_b.clone(),
|
||||
OpenOptions::new().read(true).to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
@@ -1454,7 +1466,7 @@ mod tests {
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true),
|
||||
OpenOptions::new().read(true).clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::path::Path;
|
||||
|
||||
use super::io_engine::IoEngine;
|
||||
@@ -44,7 +43,7 @@ impl OpenOptions {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub fn read(mut self, read: bool) -> Self {
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
@@ -57,7 +56,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn write(mut self, write: bool) -> Self {
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
self.write = write;
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
@@ -71,7 +70,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(mut self, create: bool) -> Self {
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
@@ -84,7 +83,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_new(mut self, create_new: bool) -> Self {
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
@@ -97,7 +96,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn truncate(mut self, truncate: bool) -> Self {
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
@@ -125,8 +124,10 @@ impl OpenOptions {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mode(mut self, mode: u32) -> Self {
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
@@ -139,7 +140,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn custom_flags(mut self, flags: i32) -> Self {
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
|
||||
@@ -32,6 +32,12 @@ pub(crate) enum ComputeUserInfoParseError {
|
||||
option: EndpointId,
|
||||
},
|
||||
|
||||
#[error(
|
||||
"Common name inferred from SNI ('{}') is not known",
|
||||
.cn,
|
||||
)]
|
||||
UnknownCommonName { cn: String },
|
||||
|
||||
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
|
||||
MalformedProjectName(EndpointId),
|
||||
}
|
||||
@@ -60,15 +66,22 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
|
||||
let (subdomain, common_name) = sni.split_once('.')?;
|
||||
pub(crate) fn endpoint_sni(
|
||||
sni: &str,
|
||||
common_names: &HashSet<String>,
|
||||
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
|
||||
let Some((subdomain, common_name)) = sni.split_once('.') else {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
|
||||
};
|
||||
if !common_names.contains(common_name) {
|
||||
return None;
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName {
|
||||
cn: common_name.into(),
|
||||
});
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
Some(EndpointId::from(subdomain))
|
||||
Ok(Some(EndpointId::from(subdomain)))
|
||||
}
|
||||
|
||||
impl ComputeUserInfoMaybeEndpoint {
|
||||
@@ -100,8 +113,15 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
})
|
||||
.map(|name| name.into());
|
||||
|
||||
let endpoint_from_domain =
|
||||
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
|
||||
let endpoint_from_domain = if let Some(sni_str) = sni {
|
||||
if let Some(cn) = common_names {
|
||||
endpoint_sni(sni_str, cn)?
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let endpoint = match (endpoint_option, endpoint_from_domain) {
|
||||
// Invariant: if we have both project name variants, they should match.
|
||||
@@ -404,34 +424,21 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_sni() {
|
||||
fn parse_inconsistent_sni() {
|
||||
let options = StartupMessageParams::new([("user", "john_doe")]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert!(info.endpoint_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_sni_with_options() {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "endpoint=foo-bar-baz-1234"),
|
||||
]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
|
||||
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.expect_err("should fail");
|
||||
match err {
|
||||
UnknownCommonName { cn } => {
|
||||
assert_eq!(cn, "localhost");
|
||||
}
|
||||
_ => panic!("bad error: {err:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -24,6 +24,9 @@ pub(crate) enum HandshakeError {
|
||||
#[error("protocol violation")]
|
||||
ProtocolViolation,
|
||||
|
||||
#[error("missing certificate")]
|
||||
MissingCertificate,
|
||||
|
||||
#[error("{0}")]
|
||||
StreamUpgradeError(#[from] StreamUpgradeError),
|
||||
|
||||
@@ -39,6 +42,10 @@ impl ReportableError for HandshakeError {
|
||||
match self {
|
||||
HandshakeError::EarlyData => crate::error::ErrorKind::User,
|
||||
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
|
||||
// This error should not happen, but will if we have no default certificate and
|
||||
// the client sends no SNI extension.
|
||||
// If they provide SNI then we can be sure there is a certificate that matches.
|
||||
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
|
||||
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
|
||||
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
|
||||
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
@@ -139,7 +146,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
// try parse endpoint
|
||||
let ep = conn_info
|
||||
.server_name()
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
|
||||
if let Some(ep) = ep {
|
||||
ctx.set_endpoint_id(ep);
|
||||
}
|
||||
@@ -154,8 +161,10 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
let (_, tls_server_end_point) =
|
||||
tls.cert_resolver.resolve(conn_info.server_name());
|
||||
let (_, tls_server_end_point) = tls
|
||||
.cert_resolver
|
||||
.resolve(conn_info.server_name())
|
||||
.ok_or(HandshakeError::MissingCertificate)?;
|
||||
|
||||
stream = PqStream {
|
||||
framed: Framed {
|
||||
|
||||
@@ -98,7 +98,8 @@ fn generate_tls_config<'a>(
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone_key())?;
|
||||
|
||||
let cert_resolver = CertResolver::new(key, vec![cert])?;
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
cert_resolver.add_cert(key, vec![cert], true)?;
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
|
||||
@@ -199,7 +199,8 @@ fn get_conn_info(
|
||||
let endpoint = match connection_url.host() {
|
||||
Some(url::Host::Domain(hostname)) => {
|
||||
if let Some(tls) = tls {
|
||||
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
endpoint_sni(hostname, &tls.common_names)?
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once('.')
|
||||
|
||||
@@ -5,7 +5,6 @@ use anyhow::{Context, bail};
|
||||
use itertools::Itertools;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use rustls::sign::CertifiedKey;
|
||||
use x509_cert::der::{Reader, SliceReader};
|
||||
|
||||
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
|
||||
@@ -26,8 +25,10 @@ pub fn configure_tls(
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
@@ -39,8 +40,11 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver
|
||||
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
|
||||
cert_resolver.add_cert_path(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -79,42 +83,92 @@ pub fn configure_tls(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
Self::new(priv_key, cert_chain)
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
fn add_cert_path(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let mut certs = HashMap::new();
|
||||
let default = (cert.clone(), tls_server_end_point);
|
||||
certs.insert(common_name, (cert, tls_server_end_point));
|
||||
Ok(Self { certs, default })
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
}
|
||||
|
||||
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
self.add_cert(priv_key, cert_chain)
|
||||
}
|
||||
|
||||
fn add_cert(
|
||||
pub fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
}
|
||||
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -123,82 +177,12 @@ impl CertResolver {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_key_cert(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
Ok((priv_key, cert_chain))
|
||||
}
|
||||
|
||||
fn process_key_cert(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
Ok((common_name, cert, tls_server_end_point))
|
||||
}
|
||||
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello<'_>,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
Some(self.resolve(client_hello.server_name()).0)
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,7 +190,7 @@ impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
@@ -216,17 +200,12 @@ impl CertResolver {
|
||||
if let Some(mut sni_name) = server_name {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return cert.clone();
|
||||
return Some(cert.clone());
|
||||
}
|
||||
if let Some((_, rest)) = sni_name.split_once('.') {
|
||||
sni_name = rest;
|
||||
} else {
|
||||
// The customer has some custom DNS mapping - just return
|
||||
// a default certificate.
|
||||
//
|
||||
// This will error if the customer uses anything stronger
|
||||
// than sslmode=require. That's a choice they can make.
|
||||
return self.default.clone();
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -24,7 +24,7 @@ use safekeeper::defaults::{
|
||||
};
|
||||
use safekeeper::{
|
||||
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
|
||||
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
|
||||
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, wal_backup, wal_service,
|
||||
};
|
||||
use sd_notify::NotifyState;
|
||||
use storage_broker::{DEFAULT_ENDPOINT, Uri};
|
||||
|
||||
@@ -50,7 +50,8 @@ async fn push_loop(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
)?
|
||||
.into_raw_grpc_client();
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
|
||||
let outbound = async_stream::stream! {
|
||||
@@ -97,7 +98,8 @@ async fn pull_loop(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
)?
|
||||
.into_raw_grpc_client();
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
@@ -153,7 +155,8 @@ async fn discover_loop(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
)?
|
||||
.into_raw_grpc_client();
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
|
||||
@@ -18,6 +18,7 @@ use safekeeper_api::models::ConnectionId;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{Instrument, debug, info, info_span};
|
||||
use utils::auth::{Claims, JwtAuth, Scope};
|
||||
use utils::generation::{self, Generation};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
@@ -37,6 +38,7 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard: Option<ShardIdentity>,
|
||||
pub pageserver_generation: Option<Generation>,
|
||||
pub protocol: Option<PostgresClientProtocol>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
@@ -159,6 +161,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
let mut shard_count: Option<u8> = None;
|
||||
let mut shard_number: Option<u8> = None;
|
||||
let mut shard_stripe_size: Option<u32> = None;
|
||||
let mut pageserver_generation: Option<Generation> = None;
|
||||
|
||||
for opt in options {
|
||||
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
|
||||
@@ -201,6 +204,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
format!("Failed to parse {value} as shard stripe size")
|
||||
})?);
|
||||
}
|
||||
Some(("pageserver_generation", value)) => {
|
||||
self.pageserver_generation =
|
||||
Some(value.parse::<u32>().map(Generation::new).with_context(
|
||||
|| format!("Failed to parse {value} as generation"),
|
||||
)?);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
@@ -259,6 +268,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
tracing::Span::current().record("shard", tracing::field::display(slug));
|
||||
}
|
||||
}
|
||||
if let Some(pageserver_generation) = self.pageserver_generation {
|
||||
tracing::Span::current().record(
|
||||
"pageserver_generation",
|
||||
tracing::field::display(pageserver_generation.get_suffix()),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -370,6 +385,7 @@ impl SafekeeperPostgresHandler {
|
||||
timeline_id: None,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
shard: None,
|
||||
pageserver_generation: None,
|
||||
protocol: None,
|
||||
conn_id,
|
||||
claims: None,
|
||||
|
||||
@@ -5,7 +5,7 @@ pub use routes::make_router;
|
||||
pub use safekeeper_api::models;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
|
||||
|
||||
pub async fn task_main_http(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
|
||||
@@ -17,9 +17,10 @@ use hyper::{Body, Request, Response, StatusCode};
|
||||
use pem::Pem;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, PutTenantPageserverLocationRequest, SafekeeperStatus,
|
||||
SkTimelineInfo, TenantDeleteResult, TenantShardPageserverLocation, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -31,12 +32,15 @@ use tracing::{Instrument, info_span};
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::debug_dump::TimelineDigestRequest;
|
||||
use crate::safekeeper::TermLsn;
|
||||
use crate::timelines_global_map::DeleteOrExclude;
|
||||
use crate::wal_advertiser::advmap::UpdatePageserverAttachmentsArg;
|
||||
use crate::{
|
||||
GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
|
||||
wal_advertiser,
|
||||
};
|
||||
|
||||
/// Healthcheck handler.
|
||||
@@ -91,6 +95,39 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
json_response(StatusCode::OK, response_body)
|
||||
}
|
||||
|
||||
async fn tenant_put_pageserver_attachments(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let PutTenantPageserverLocationRequest {
|
||||
pageserver_locations,
|
||||
}: PutTenantPageserverLocationRequest = json_request(&mut request).await?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let wal_advertiser = global_timelines.get_wal_advertiser();
|
||||
wal_advertiser
|
||||
.update_pageserver_attachments(
|
||||
tenant_shard_id,
|
||||
pageserver_locations
|
||||
.into_iter()
|
||||
.map(
|
||||
|TenantShardPageserverLocation {
|
||||
generation,
|
||||
pageserver_node_id,
|
||||
}| UpdatePageserverAttachmentsArg {
|
||||
generation,
|
||||
pageserver_node_id,
|
||||
},
|
||||
)
|
||||
.collect(),
|
||||
)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
|
||||
|
||||
@@ -714,6 +751,9 @@ pub fn make_router(
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_shard_id/pageserver_attachments", |r| {
|
||||
request_span(r, tenant_put_pageserver_attachments)
|
||||
})
|
||||
// Will be used in the future instead of implicit timeline creation
|
||||
.post("/v1/tenant/timeline", |r| {
|
||||
request_span(r, timeline_create_handler)
|
||||
|
||||
@@ -38,6 +38,7 @@ pub mod timeline_eviction;
|
||||
pub mod timeline_guard;
|
||||
pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_advertiser;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
|
||||
@@ -360,6 +360,7 @@ async fn recovery_stream(
|
||||
listen_pg_addr_str: &donor.pg_connstr,
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
pageserver_generation: None,
|
||||
};
|
||||
let cfg = wal_stream_connection_config(connection_conf_args)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
|
||||
@@ -37,6 +37,7 @@ use crate::send_interpreted_wal::{
|
||||
Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender,
|
||||
};
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_advertiser;
|
||||
use crate::wal_reader_stream::StreamingWalReader;
|
||||
use crate::wal_storage::WalReader;
|
||||
|
||||
@@ -657,10 +658,29 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
let tli_cancel = tli.cancel.clone();
|
||||
|
||||
let wal_advertiser = match (self.shard, self.pageserver_generation) {
|
||||
(Some(shard), Some(pageserver_generation)) => {
|
||||
Some(tli.wal_advertiser.get_pageserver_timeline(
|
||||
self.ttid,
|
||||
shard.shard_index(),
|
||||
pageserver_generation,
|
||||
))
|
||||
}
|
||||
(shard, pageserver_generation) => {
|
||||
debug!(
|
||||
?shard,
|
||||
?pageserver_generation,
|
||||
"cannot feedback last_record_lsn to wal_advertiser subsystem, client must specify shard and pageserver_generation"
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard: ws_guard.clone(),
|
||||
tli,
|
||||
wal_advertiser,
|
||||
};
|
||||
|
||||
let res = tokio::select! {
|
||||
@@ -977,6 +997,7 @@ struct ReplyReader<IO> {
|
||||
reader: PostgresBackendReader<IO>,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
tli: WalResidentTimeline,
|
||||
wal_advertiser: Option<Arc<wal_advertiser::advmap::PageserverTimeline>>,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
@@ -1023,6 +1044,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
self.tli
|
||||
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
|
||||
.await;
|
||||
if let Some(wal_advertiser) = &self.wal_advertiser {
|
||||
wal_advertiser.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn);
|
||||
}
|
||||
// in principle new remote_consistent_lsn could allow to
|
||||
// deactivate the timeline, but we check that regularly through
|
||||
// broker updated, not need to do it here
|
||||
|
||||
@@ -106,6 +106,7 @@ impl Env {
|
||||
&timeline_dir,
|
||||
&remote_path,
|
||||
shared_state,
|
||||
todo!(),
|
||||
conf.clone(),
|
||||
);
|
||||
timeline.bootstrap(
|
||||
|
||||
@@ -24,6 +24,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
|
||||
@@ -38,7 +39,9 @@ use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::{self, remote_timeline_path};
|
||||
use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
|
||||
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
|
||||
use crate::{
|
||||
SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage,
|
||||
};
|
||||
|
||||
fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
|
||||
PeerInfo {
|
||||
@@ -446,6 +449,7 @@ pub struct Timeline {
|
||||
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
|
||||
/// while holding it, ensuring that consensus checks are in order.
|
||||
mutex: RwLock<SharedState>,
|
||||
pub(crate) wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
|
||||
walsenders: Arc<WalSenders>,
|
||||
walreceivers: Arc<WalReceivers>,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
@@ -475,6 +479,7 @@ impl Timeline {
|
||||
timeline_dir: &Utf8Path,
|
||||
remote_path: &RemotePath,
|
||||
shared_state: SharedState,
|
||||
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
) -> Arc<Self> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
@@ -498,6 +503,7 @@ impl Timeline {
|
||||
shared_state_version_tx,
|
||||
shared_state_version_rx,
|
||||
mutex: RwLock::new(shared_state),
|
||||
wal_advertiser,
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
gate: Default::default(),
|
||||
@@ -516,6 +522,7 @@ impl Timeline {
|
||||
pub fn load_timeline(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
ttid: TenantTimelineId,
|
||||
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
@@ -528,6 +535,7 @@ impl Timeline {
|
||||
&timeline_dir,
|
||||
&remote_path,
|
||||
shared_state,
|
||||
wal_advertiser,
|
||||
conf,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! All timelines should always be present in this map, this is done by loading them
|
||||
//! all from the disk on startup and keeping them in memory.
|
||||
|
||||
use std::any;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -11,13 +12,16 @@ use anyhow::{Context, Result, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
|
||||
use safekeeper_api::models::{
|
||||
SafekeeperUtilization, TenantShardPageserverLocation, TimelineDeleteResult,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership};
|
||||
use tokio::fs;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::{durable_rename, fsync_async_opt};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
|
||||
use crate::http::routes::DeleteOrExcludeError;
|
||||
@@ -26,7 +30,7 @@ use crate::state::TimelinePersistentState;
|
||||
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_storage::Storage;
|
||||
use crate::{SafeKeeperConf, control_file, wal_storage};
|
||||
use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage};
|
||||
|
||||
// Timeline entry in the global map: either a ready timeline, or mark that it is
|
||||
// being created.
|
||||
@@ -46,16 +50,25 @@ struct GlobalTimelinesState {
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
wal_advertisement: Arc<wal_advertiser::advmap::World>,
|
||||
global_rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
impl GlobalTimelinesState {
|
||||
/// Get dependencies for a timeline constructor.
|
||||
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
|
||||
fn get_dependencies(
|
||||
&self,
|
||||
) -> (
|
||||
Arc<SafeKeeperConf>,
|
||||
Arc<TimelinesSet>,
|
||||
RateLimiter,
|
||||
Arc<wal_advertiser::advmap::World>,
|
||||
) {
|
||||
(
|
||||
self.conf.clone(),
|
||||
self.broker_active_set.clone(),
|
||||
self.global_rate_limiter.clone(),
|
||||
self.wal_advertisement.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -91,6 +104,7 @@ impl GlobalTimelines {
|
||||
tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
}),
|
||||
}
|
||||
@@ -147,12 +161,13 @@ impl GlobalTimelines {
|
||||
/// just lock and unlock it for each timeline -- this function is called
|
||||
/// during init when nothing else is running, so this is fine.
|
||||
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.get_dependencies()
|
||||
};
|
||||
|
||||
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
|
||||
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
|
||||
{
|
||||
@@ -162,7 +177,8 @@ impl GlobalTimelines {
|
||||
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(conf.clone(), ttid) {
|
||||
let wal_advertiser = wal_advertiser.load_timeline(ttid);
|
||||
match Timeline::load_timeline(conf.clone(), ttid, wal_advertiser) {
|
||||
Ok(tli) => {
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
self.state
|
||||
@@ -222,7 +238,7 @@ impl GlobalTimelines {
|
||||
start_lsn: Lsn,
|
||||
commit_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, _, _) = {
|
||||
let (conf, _, _, _) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
// Timeline already exists, return it.
|
||||
@@ -267,7 +283,7 @@ impl GlobalTimelines {
|
||||
check_tombstone: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Check for existence and mark that we're creating it.
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
match state.timelines.get(&ttid) {
|
||||
Some(GlobalMapTimeline::CreationInProgress) => {
|
||||
@@ -296,7 +312,10 @@ impl GlobalTimelines {
|
||||
};
|
||||
|
||||
// Do the actual move and reflect the result in the map.
|
||||
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
|
||||
let wal_advertiser = wal_advertiser.load_timeline(ttid);
|
||||
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, wal_advertiser, conf.clone())
|
||||
.await
|
||||
{
|
||||
Ok(timeline) => {
|
||||
let mut timeline_shared_state = timeline.write_shared_state().await;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
@@ -335,6 +354,7 @@ impl GlobalTimelines {
|
||||
async fn install_temp_timeline(
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
|
||||
@@ -377,7 +397,7 @@ impl GlobalTimelines {
|
||||
// Do the move.
|
||||
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
|
||||
|
||||
Timeline::load_timeline(conf, ttid)
|
||||
Timeline::load_timeline(conf, ttid, wal_advertiser)
|
||||
}
|
||||
|
||||
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
||||
@@ -565,6 +585,10 @@ impl GlobalTimelines {
|
||||
Ok(deleted)
|
||||
}
|
||||
|
||||
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::advmap::World> {
|
||||
self.state.lock().unwrap().wal_advertisement.clone()
|
||||
}
|
||||
|
||||
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
|
||||
20
safekeeper/src/wal_advertiser.rs
Normal file
20
safekeeper/src/wal_advertiser.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
//! Advertise pending WAL to all pageservers that might be interested in it.
|
||||
|
||||
use std::{collections::HashSet, sync::Arc, time::Duration};
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
pub(crate) mod advmap;
|
||||
|
||||
pub(crate) async fn wal_advertiser_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!();
|
||||
node_loop().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn node_loop() {
|
||||
loop {}
|
||||
}
|
||||
78
safekeeper/src/wal_advertiser/advmap.rs
Normal file
78
safekeeper/src/wal_advertiser/advmap.rs
Normal file
@@ -0,0 +1,78 @@
|
||||
//! The data structure that track advertisement state.
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use serde::Serialize;
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{NodeId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
shard::{ShardIndex, TenantShardId},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct World {
|
||||
pageservers: RwLock<HashMap<NodeId, Arc<Pageserver>>>,
|
||||
}
|
||||
|
||||
pub struct Pageserver {
|
||||
node_id: NodeId,
|
||||
attachments: RwLock<HashMap<TenantShardId, Arc<PageserverAttachment>>>,
|
||||
}
|
||||
|
||||
pub struct PageserverAttachment {
|
||||
pageserver: NodeId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
generation: Generation,
|
||||
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<PageserverTimeline>>>,
|
||||
}
|
||||
|
||||
pub struct PageserverTimeline {
|
||||
pageserver: NodeId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
remote_consistent_lsn: RwLock<Lsn>,
|
||||
}
|
||||
|
||||
pub struct SafekeeperTimeline {}
|
||||
|
||||
pub struct UpdatePageserverAttachmentsArg {
|
||||
pub generation: Generation,
|
||||
pub pageserver_node_id: NodeId,
|
||||
}
|
||||
|
||||
impl World {
|
||||
pub fn housekeeping(&self) {}
|
||||
pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc<SafekeeperTimeline> {
|
||||
todo!()
|
||||
}
|
||||
pub fn update_pageserver_attachments(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
arg: Vec<UpdatePageserverAttachmentsArg>,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl SafekeeperTimeline {
|
||||
pub fn get_pageserver_timeline(
|
||||
&self,
|
||||
ttld: TenantTimelineId,
|
||||
shard: ShardIndex,
|
||||
pageserver_generation: Generation,
|
||||
) -> Arc<PageserverTimeline> {
|
||||
assert!(!pageserver_generation.is_none());
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl PageserverTimeline {
|
||||
pub fn update_remote_consistent_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
///
|
||||
@@ -51,7 +51,7 @@ pub async fn task_main(
|
||||
error!("connection handler exited: {}", err);
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty)),
|
||||
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty, pageserver_generation = field::Empty)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ parking_lot.workspace = true
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
tokio-util.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tracing.workspace = true
|
||||
metrics.workspace = true
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::Stream;
|
||||
use proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use proto::broker_service_client::BrokerServiceClient;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::Status;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use tonic::transport::Endpoint;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::backoff::{
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
|
||||
};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
// Code generated by protobuf.
|
||||
@@ -22,6 +26,7 @@ pub mod metrics;
|
||||
pub use hyper::Uri;
|
||||
pub use tonic::transport::{Certificate, ClientTlsConfig};
|
||||
pub use tonic::{Code, Request, Streaming};
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
|
||||
@@ -29,9 +34,199 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
|
||||
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
// BrokerServiceClient charged with tonic provided Channel transport; helps to
|
||||
// avoid depending on tonic directly in user crates.
|
||||
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
#[derive(Clone)]
|
||||
pub struct TimelineUpdatesSubscriber {
|
||||
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
|
||||
}
|
||||
|
||||
/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code.
|
||||
pub struct BrokerClientChannel {
|
||||
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
|
||||
}
|
||||
|
||||
impl BrokerClientChannel {
|
||||
pub fn into_raw_grpc_client(
|
||||
self,
|
||||
) -> proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel> {
|
||||
self.client
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TimelineShardUpdate {
|
||||
pub is_discovery: bool,
|
||||
pub inner: proto::SafekeeperDiscoveryResponse,
|
||||
}
|
||||
|
||||
pub struct DiscoveryRequester {
|
||||
id: ProtoTenantTimelineId,
|
||||
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
|
||||
}
|
||||
|
||||
impl TimelineUpdatesSubscriber {
|
||||
pub fn new(service_client: BrokerClientChannel) -> Self {
|
||||
Self {
|
||||
client: service_client.client.clone(),
|
||||
}
|
||||
}
|
||||
pub fn subscribe(
|
||||
&mut self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> (impl Stream<Item = TimelineShardUpdate>, DiscoveryRequester) {
|
||||
let id = ProtoTenantTimelineId {
|
||||
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: timeline_id.as_ref().to_owned(),
|
||||
};
|
||||
let discovery_requester = DiscoveryRequester {
|
||||
id: id.clone(),
|
||||
client: self.client.clone(),
|
||||
};
|
||||
let stream = async_stream::stream! {
|
||||
let mut attempt = 0;
|
||||
'resubscribe: loop {
|
||||
exponential_backoff(
|
||||
attempt,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
|
||||
use proto::*;
|
||||
// subscribe to the specific timeline
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo as i32,
|
||||
},
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
|
||||
},
|
||||
],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(id.clone()),
|
||||
}),
|
||||
};
|
||||
|
||||
let res = tokio::select! {
|
||||
r = self.client.subscribe_by_filter(request) => { r }
|
||||
_ = cancel.cancelled() => { return; }
|
||||
};
|
||||
let mut update_stream = match res
|
||||
{
|
||||
Ok(resp) => {
|
||||
resp.into_inner()
|
||||
}
|
||||
Err(e) => {
|
||||
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
||||
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
|
||||
info!(
|
||||
attempt, "failed to subscribe: {e:#}"
|
||||
);
|
||||
continue 'resubscribe;
|
||||
}
|
||||
};
|
||||
loop {
|
||||
let broker_update = tokio::select!{
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
}
|
||||
update = update_stream.message() => { update }
|
||||
};
|
||||
match broker_update {
|
||||
Ok(Some(typed_msg)) => {
|
||||
let mut is_discovery = false;
|
||||
let timeline_update = match typed_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => {
|
||||
let info = match typed_msg.safekeeper_timeline_info {
|
||||
Some(info) => info,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_timeline_info");
|
||||
continue 'resubscribe;
|
||||
}
|
||||
};
|
||||
SafekeeperDiscoveryResponse {
|
||||
safekeeper_id: info.safekeeper_id,
|
||||
tenant_timeline_id: info.tenant_timeline_id,
|
||||
commit_lsn: info.commit_lsn,
|
||||
safekeeper_connstr: info.safekeeper_connstr,
|
||||
availability_zone: info.availability_zone,
|
||||
standby_horizon: info.standby_horizon,
|
||||
}
|
||||
}
|
||||
MessageType::SafekeeperDiscoveryResponse => {
|
||||
is_discovery = true;
|
||||
match typed_msg.safekeeper_discovery_response {
|
||||
Some(response) => response,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_discovery_response");
|
||||
continue 'resubscribe;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// unexpected message
|
||||
warn!("unexpected message from broker: {typed_msg:?}");
|
||||
continue 'resubscribe;
|
||||
}
|
||||
};
|
||||
attempt = 0; // reset backoff iff we received a valid update
|
||||
yield TimelineShardUpdate{is_discovery, inner: timeline_update };
|
||||
},
|
||||
Err(status) => {
|
||||
match status.code() {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
|
||||
// tonic's error handling doesn't provide a clear code for disconnections: we get
|
||||
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
|
||||
// => https://github.com/neondatabase/neon/issues/9562
|
||||
info!("broker disconnected: {status}");
|
||||
},
|
||||
_ => {
|
||||
warn!("broker subscription failed: {status}");
|
||||
}
|
||||
}
|
||||
continue 'resubscribe;
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("broker subscription stream ended"); // can't happen
|
||||
continue 'resubscribe;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
(stream, discovery_requester)
|
||||
}
|
||||
}
|
||||
|
||||
impl DiscoveryRequester {
|
||||
pub async fn request(&mut self) {
|
||||
let request = proto::SafekeeperDiscoveryRequest {
|
||||
tenant_timeline_id: Some(self.id.clone()),
|
||||
};
|
||||
let msg = proto::TypedMessage {
|
||||
r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32,
|
||||
safekeeper_timeline_info: None,
|
||||
safekeeper_discovery_request: Some(request),
|
||||
safekeeper_discovery_response: None,
|
||||
};
|
||||
|
||||
// Cancellation safety: we want to send a message to the broker, but publish_one()
|
||||
// function can get cancelled by the other select! arm. This is absolutely fine, because
|
||||
// we just want to receive broker updates and discovery is not important if we already
|
||||
// receive updates.
|
||||
//
|
||||
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
|
||||
// This is totally fine because of the reason above.
|
||||
|
||||
// This is a fire-and-forget request, we don't care about the response
|
||||
let _ = self.client.publish_one(msg).await;
|
||||
debug!("Discovery request sent to the broker");
|
||||
}
|
||||
}
|
||||
|
||||
// Create connection object configured to run TLS if schema starts with https://
|
||||
// and plain text otherwise. Connection is lazy, only endpoint sanity is
|
||||
@@ -67,19 +262,9 @@ where
|
||||
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
|
||||
// keep_alive_timeout is 20s by default on both client and server side
|
||||
let channel = tonic_endpoint.connect_lazy();
|
||||
Ok(BrokerClientChannel::new(channel))
|
||||
}
|
||||
|
||||
impl BrokerClientChannel {
|
||||
/// Create a new client to the given endpoint, but don't actually connect until the first request.
|
||||
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||
where
|
||||
D: std::convert::TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<StdError>,
|
||||
{
|
||||
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
|
||||
Ok(Self::new(conn))
|
||||
}
|
||||
Ok(BrokerClientChannel {
|
||||
client: proto::broker_service_client::BrokerServiceClient::new(channel),
|
||||
})
|
||||
}
|
||||
|
||||
// parse variable length bytes from protobuf
|
||||
|
||||
@@ -202,8 +202,6 @@ def test_pageserver_gc_compaction_preempt(
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
|
||||
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
|
||||
env.pageserver.allowed_errors.append(".*failed to pipe.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
Reference in New Issue
Block a user