rip out broker binary target & launch of it in cplane & mention of it in docs

This commit is contained in:
Christian Schwarz
2025-05-04 14:38:52 +02:00
parent 4b9087651c
commit 8f201b1580
21 changed files with 7 additions and 1309 deletions

View File

@@ -86,7 +86,6 @@ RUN set -e \
--bin pageserver \
--bin pagectl \
--bin safekeeper \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin endpoint_storage \
@@ -119,7 +118,6 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
@@ -136,8 +134,7 @@ COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
# Now, when `docker run ... pageserver` is run, it can start without errors, yet will have some default dummy values.
RUN mkdir -p /data/.neon/ && \
echo "id=1234" > "/data/.neon/identity.toml" && \
echo "broker_endpoint='http://storage_broker:50051'\n" \
"pg_distrib_dir='/usr/local/'\n" \
echo "pg_distrib_dir='/usr/local/'\n" \
"listen_pg_addr='0.0.0.0:6400'\n" \
"listen_http_addr='0.0.0.0:9898'\n" \
"availability_zone='local'\n" \

View File

@@ -145,10 +145,8 @@ Python (3.11 or higher), and install the python3 packages using `./scripts/pysyn
> cargo neon init
Initializing pageserver node 1 at '127.0.0.1:64000' in ".neon"
# start pageserver, safekeeper, and broker for their intercommunication
# start pageserver, safekeeper, for their intercommunication
> cargo neon start
Starting neon broker at 127.0.0.1:50051.
storage_broker started, pid: 2918372
Starting pageserver node 1 at '127.0.0.1:64000' in ".neon".
pageserver started, pid: 2918386
Starting safekeeper at '127.0.0.1:5454' in '.neon/safekeepers/sk1'.

View File

@@ -1,16 +0,0 @@
# Minimal neon environment with one safekeeper. This is equivalent to the built-in
# defaults that you get with no --config
[[pageservers]]
id=1
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
pg_auth_type = 'Trust'
http_auth_type = 'Trust'
[[safekeepers]]
id = 1
pg_port = 5454
http_port = 7676
[broker]
listen_addr = '127.0.0.1:50051'

View File

@@ -17,13 +17,12 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
EndpointStorageConf, InitForceMode, LocalEnv, NeonLocalInitConf, NeonLocalInitPageserverConf,
SafekeeperConf,
};
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -49,7 +48,6 @@ use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
use url::Host;
use utils::auth::{Claims, Scope};
@@ -88,9 +86,6 @@ enum NeonLocalCmd {
#[clap(alias = "storage_controller")]
StorageController(StorageControllerCmd),
#[command(subcommand)]
#[clap(alias = "storage_broker")]
StorageBroker(StorageBrokerCmd),
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
EndpointStorage(EndpointStorageCmd),
@@ -424,32 +419,6 @@ struct StorageControllerStopCmdArgs {
instance_id: u8,
}
#[derive(clap::Subcommand)]
#[clap(about = "Manage storage broker")]
enum StorageBrokerCmd {
Start(StorageBrokerStartCmdArgs),
Stop(StorageBrokerStopCmdArgs),
}
#[derive(clap::Args)]
#[clap(about = "Start broker")]
struct StorageBrokerStartCmdArgs {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
#[derive(clap::Args)]
#[clap(about = "stop broker")]
struct StorageBrokerStopCmdArgs {
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
}
#[derive(clap::Subcommand)]
#[clap(about = "Manage safekeepers")]
enum SafekeeperCmd {
@@ -795,7 +764,6 @@ fn main() -> Result<()> {
NeonLocalCmd::StorageController(subcmd) => {
rt.block_on(handle_storage_controller(&subcmd, env))
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::EndpointStorage(subcmd) => {
rt.block_on(handle_endpoint_storage(&subcmd, env))
@@ -988,10 +956,6 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
// User (likely interactive) did not provide a description of the environment, give them the default
NeonLocalInitConf {
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
listen_https_addr: None,
},
safekeepers: vec![SafekeeperConf {
id: DEFAULT_SAFEKEEPER_ID,
pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
@@ -1776,28 +1740,6 @@ async fn handle_endpoint_storage(
Ok(())
}
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.start(&args.start_timeout).await {
eprintln!("broker start failed: {e}");
exit(1);
}
}
StorageBrokerCmd::Stop(_args) => {
// FIXME: stop_mode unused
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
eprintln!("broker stop failed: {e}");
exit(1);
}
}
}
Ok(())
}
async fn handle_start_all(
args: &StartCmdArgs,
env: &'static local_env::LocalEnv,
@@ -1838,14 +1780,6 @@ async fn handle_start_all_impl(
// force infalliblity through closure
#[allow(clippy::redundant_closure_call)]
(|| {
js.spawn(async move {
let storage_broker = StorageBroker::from_env(env);
storage_broker
.start(&retry_timeout)
.await
.map_err(|e| e.context("start storage_broker"))
});
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1998,11 +1932,6 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
eprintln!("neon broker stop failed: {e:#}");
}
// Stop all storage controller instances. In the most common case there's only one,
// but iterate though the base data directory in order to discover the instances.
let storcon_instances = env

View File

@@ -1,88 +0,0 @@
//! Code to manage the storage broker
//!
//! In the local test environment, the storage broker stores its data directly in
//!
//! ```text
//! .neon/storage_broker
//! ```
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env::LocalEnv};
pub struct StorageBroker {
env: LocalEnv,
}
impl StorageBroker {
/// Create a new `StorageBroker` instance from the environment.
pub fn from_env(env: &LocalEnv) -> Self {
Self { env: env.clone() }
}
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
&self.env.storage_broker_data_dir().join("server.crt"),
&self.env.storage_broker_data_dir().join("server.key"),
)?;
}
Ok(())
}
/// Start the storage broker process.
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
print!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();
if let Some(addr) = &broker.listen_addr {
args.push(format!("--listen-addr={addr}"));
}
if let Some(addr) = &broker.listen_https_addr {
args.push(format!("--listen-https-addr={addr}"));
}
let client = self.env.create_http_client();
background_process::start_process(
"storage_broker",
&self.env.storage_broker_data_dir(),
&self.env.storage_broker_bin(),
args,
[],
background_process::InitialPidFile::Create(self.pid_file_path()),
retry_timeout,
|| async {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
})?;
let request = client.get(status_url).build().with_context(|| {
format!("Failed to construct request to broker endpoint {url}")
})?;
match client.execute(request).await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
},
)
.await
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
}
/// Stop the storage broker process.
pub fn stop(&self) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &self.pid_file_path())
}
/// Get the path to the PID file for the storage broker.
fn pid_file_path(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
}
}

View File

@@ -7,7 +7,6 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod background_process;
pub mod broker;
pub mod endpoint;
pub mod endpoint_storage;
pub mod local_env;

View File

@@ -19,7 +19,6 @@ use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -61,8 +60,6 @@ pub struct LocalEnv {
/// Path to environment's public key
pub public_key_path: PathBuf,
pub broker: NeonBroker,
// Configuration for the storage controller (1 per neon_local environment)
pub storage_controller: NeonStorageControllerConf,
@@ -103,7 +100,6 @@ pub struct OnDiskConfig {
pub default_tenant_id: Option<TenantId>,
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
pub storage_controller: NeonStorageControllerConf,
#[serde(
skip_serializing,
@@ -141,7 +137,6 @@ pub struct NeonLocalInitConf {
// TODO: do we need this? Seems unused
pub neon_distrib_dir: Option<PathBuf>,
pub default_tenant_id: TenantId,
pub broker: NeonBroker,
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
@@ -157,19 +152,6 @@ pub struct EndpointStorageConf {
pub port: u16,
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)]
#[serde(default)]
pub struct NeonBroker {
/// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
pub listen_addr: Option<SocketAddr>,
/// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
/// listen_https_addr is preferred over listen_addr in neon_local.
pub listen_https_addr: Option<SocketAddr>,
}
/// A part of storage controller's config the neon_local knows about.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
@@ -241,22 +223,6 @@ impl Default for NeonStorageControllerConf {
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
} else {
format!(
"http://{}",
self.listen_addr
.expect("at least one address should be set")
)
};
Url::parse(&url).expect("failed to construct url")
}
}
// neon_local needs to know this subset of pageserver configuration.
// For legacy reasons, this information is duplicated from `pageserver.toml` into `.neon/config`.
// It can get stale if `pageserver.toml` is changed.
@@ -440,18 +406,10 @@ impl LocalEnv {
self.neon_distrib_dir.join("safekeeper")
}
pub fn storage_broker_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("storage_broker")
}
pub fn endpoints_path(&self) -> PathBuf {
self.base_data_dir.join("endpoints")
}
pub fn storage_broker_data_dir(&self) -> PathBuf {
self.base_data_dir.join("storage_broker")
}
pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
self.base_data_dir
.join(format!("pageserver_{pageserver_id}"))
@@ -634,7 +592,6 @@ impl LocalEnv {
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
safekeepers,
@@ -652,7 +609,6 @@ impl LocalEnv {
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
safekeepers,
@@ -761,7 +717,6 @@ impl LocalEnv {
default_tenant_id: self.default_tenant_id,
private_key_path: self.private_key_path.clone(),
public_key_path: self.public_key_path.clone(),
broker: self.broker.clone(),
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
safekeepers: self.safekeepers.clone(),
@@ -870,7 +825,6 @@ impl LocalEnv {
pg_distrib_dir,
neon_distrib_dir,
default_tenant_id,
broker,
storage_controller,
pageservers,
safekeepers,
@@ -921,7 +875,6 @@ impl LocalEnv {
default_tenant_id: Some(default_tenant_id),
private_key_path,
public_key_path,
broker,
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
safekeepers,
@@ -939,12 +892,6 @@ impl LocalEnv {
// create endpoints dir
fs::create_dir_all(env.endpoints_path())?;
// create storage broker dir
fs::create_dir_all(env.storage_broker_data_dir())?;
StorageBroker::from_env(&env)
.initialize()
.context("storage broker init failed")?;
// create safekeeper dirs
for safekeeper in &env.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;

View File

@@ -104,9 +104,7 @@ impl PageServerNode {
self.env.pg_distrib_dir_raw().display()
);
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];
let mut overrides = vec![pg_distrib_dir_param];
overrides.push(format!(
"control_plane_api='{}'",

View File

@@ -182,9 +182,6 @@ impl SafekeeperNode {
args.push("--no-sync".to_owned());
}
let broker_endpoint = format!("{}", self.env.broker.client_url());
args.extend(["--broker-endpoint".to_owned(), broker_endpoint]);
let mut backup_threads = String::new();
if let Some(threads) = self.conf.backup_threads {
backup_threads = threads.to_string();

View File

@@ -40,7 +40,6 @@ services:
volumes:
- ./pageserver_config:/data/.neon/
depends_on:
- storage_broker
- minio_create_buckets
safekeeper1:
@@ -49,7 +48,6 @@ services:
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper1:5454
- SAFEKEEPER_ID=1
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -70,7 +68,6 @@ services:
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- storage_broker
- minio_create_buckets
safekeeper2:
@@ -79,7 +76,6 @@ services:
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper2:5454
- SAFEKEEPER_ID=2
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -100,7 +96,6 @@ services:
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- storage_broker
- minio_create_buckets
safekeeper3:
@@ -109,7 +104,6 @@ services:
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper3:5454
- SAFEKEEPER_ID=3
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -130,17 +124,8 @@ services:
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- storage_broker
- minio_create_buckets
storage_broker:
restart: always
image: ${REPOSITORY:-ghcr.io/neondatabase}/neon:${TAG:-latest}
ports:
- 50051:50051
command:
- "storage_broker"
- "--listen-addr=0.0.0.0:50051"
compute:
restart: always

View File

@@ -2,7 +2,6 @@
### Overview
We use JWT tokens in communication between almost all components (compute, pageserver, safekeeper, CLI) regardless of the protocol used (HTTP/PostgreSQL).
storage_broker currently has no authentication.
Authentication is optional and is disabled by default for easier debugging.
It is used in some tests, though.
Note that we do not cover authentication with `pg.neon.tech` here.
@@ -24,7 +23,7 @@ because configs may be parsed and dumped into logs.
#### Tokens generation and validation
JWT tokens are signed using a private key.
Compute/pageserver/safekeeper use the private key's public counterpart to validate JWT tokens.
These components should not have access to the private key and may only get tokens from their configuration or external clients.
These components should not have access to the private key and may only get tokens from their configuration or external clients.
The key pair is generated once for an installation of compute/pageserver/safekeeper, e.g. by `neon_local init`.
There is currently no way to rotate the key without bringing down all components.
@@ -130,9 +129,6 @@ uses `$NEON_AUTH_TOKEN` as the password for the connection.
#### Overview
Pageserver keeps track of multiple tenants, each having multiple timelines.
For each timeline, it connects to the corresponding Safekeeper.
Information about "corresponding Safekeeper" is published by Safekeepers
in the storage_broker, but they do not publish access tokens, otherwise what is
the point of authentication.
Pageserver keeps a connection to some set of Safekeepers, which
may or may not correspond to active Computes.

View File

@@ -21,7 +21,6 @@ You can see a [docker compose](https://docs.docker.com/compose/) example to crea
- pageserver x 1
- safekeeper x 3
- storage_broker x 1
- compute x 1
- MinIO x 1 # This is Amazon S3 compatible object storage
@@ -37,7 +36,6 @@ $ cd docker-compose/
$ docker-compose down # remove the containers if exists
$ PG_VERSION=16 TAG=latest docker-compose up --build -d # You can specify the postgres and image version
Creating network "dockercompose_default" with the default driver
Creating docker-compose_storage_broker_1 ... done
(...omit...)
```

View File

@@ -24,13 +24,10 @@ max_file_descriptors = '100'
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'cloud_admin'
broker_endpoint = 'http://127.0.0.1:50051'
# [remote_storage]
```
The config above shows default values for all basic pageserver settings, besides `broker_endpoint`: that one has to be set by the user,
see the corresponding section below.
The config above shows default values for all basic pageserver settings.
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
@@ -48,11 +45,6 @@ Example: `${PAGESERVER_BIN} -c "checkpoint_timeout = '10 m'" -c "remote_storage=
Note that TOML distinguishes between strings and integers, the former require single or double quotes around them.
#### broker_endpoint
A storage broker endpoint to connect and pull the information from. Default is
`'http://127.0.0.1:50051'`.
#### checkpoint_distance
`checkpoint_distance` is the amount of incoming WAL that is held in

View File

@@ -2,11 +2,6 @@
Below you will find a brief overview of each subdir in the source tree in alphabetical order.
`storage_broker`:
Neon storage broker, providing messaging between safekeepers and pageservers.
[storage_broker.md](./storage_broker.md)
`storage_controller`:
Neon storage controller, manages a cluster of pageservers and exposes an API that enables

View File

@@ -1,27 +0,0 @@
# Storage broker
Storage broker targets two issues:
- Allowing safekeepers and pageservers learn which nodes also hold their
timelines, and timeline statuses there.
- Avoiding O(n^2) connections between storage nodes while doing so.
This is used
- By pageservers to determine the most advanced and alive safekeeper to pull WAL from.
- By safekeepers to synchronize on the timeline: advance
`remote_consistent_lsn`, `backup_lsn`, choose who offloads WAL to s3.
Technically, it is a simple stateless pub-sub message broker based on tonic
(grpc) making multiplexing easy. Since it is stateless, fault tolerance can be
provided by k8s; there is no built in replication support, though it is not hard
to add.
Currently, the only message is `SafekeeperTimelineInfo`. Each safekeeper, for
each active timeline, once in a while pushes timeline status to the broker.
Other nodes subscribe and receive this info, using it per above.
Broker serves /metrics on the same port as grpc service.
grpcurl can be used to check which values are currently being pushed:
```
grpcurl -proto broker/proto/broker.proto -d '{"all":{}}' -plaintext localhost:50051 storage_broker.BrokerService/SubscribeSafekeeperInfo
```

View File

@@ -1,912 +0,0 @@
//! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
//! nodes messaging.
//!
//! Subscriptions to 1) single timeline 2) all timelines are possible. We could
//! add subscription to the set of timelines to save grpc streams, but testing
//! shows many individual streams is also ok.
//!
//! Message is dropped if subscriber can't consume it, not affecting other
//! subscribers.
//!
//! Only safekeeper message is supported, but it is not hard to add something
//! else with generics.
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use camino::Utf8PathBuf;
use clap::{Parser, command};
use futures::future::OptionFuture;
use futures_core::Stream;
use futures_util::StreamExt;
use http_body_util::Full;
use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper::body::Incoming;
use hyper::header::CONTENT_TYPE;
use hyper::service::service_fn;
use hyper::{Method, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use metrics::{Encoder, TextEncoder};
use parking_lot::RwLock;
use storage_broker::metrics::{
BROADCAST_DROPPED_MESSAGES_TOTAL, BROADCASTED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
};
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid};
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time;
use tonic::body::{self, BoxBody, empty_body};
use tonic::codegen::Service;
use tonic::{Code, Request, Response, Status};
use tracing::*;
use utils::id::TenantTimelineId;
use utils::logging::{self, LogFormat};
use utils::sentry_init::init_sentry;
use utils::signals::ShutdownSignals;
use utils::{project_build_tag, project_git_version};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
const DEFAULT_CHAN_SIZE: usize = 32;
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
const DEFAULT_SSL_KEY_FILE: &str = "server.key";
const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
#[derive(Parser, Debug)]
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
#[clap(group(
clap::ArgGroup::new("listen-addresses")
.required(true)
.multiple(true)
.args(&["listen_addr", "listen_https_addr"]),
))]
struct Args {
/// Endpoint to listen HTTP on.
#[arg(short, long)]
listen_addr: Option<SocketAddr>,
/// Endpoint to listen HTTPS on.
#[arg(long)]
listen_https_addr: Option<SocketAddr>,
/// Size of the queue to the per timeline subscriber.
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
timeline_chan_size: usize,
/// Size of the queue to the all keys subscriber.
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
all_keys_chan_size: usize,
/// HTTP/2 keepalive interval.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
http2_keepalive_interval: Duration,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
/// Path to a file with certificate's private key for https API.
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API.
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: Duration,
}
/// Id of publisher for registering in maps
type PubId = u64;
/// Id of subscriber for registering in maps
type SubId = u64;
/// Single enum type for all messages.
#[derive(Clone, Debug, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum Message {
SafekeeperTimelineInfo(SafekeeperTimelineInfo),
SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
}
impl Message {
/// Convert proto message to internal message.
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
match proto_msg.r#type() {
MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
proto_msg.safekeeper_timeline_info.ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
})?,
)),
MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
proto_msg.safekeeper_discovery_request.ok_or_else(|| {
Status::new(
Code::InvalidArgument,
"missing safekeeper_discovery_request",
)
})?,
)),
MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
proto_msg.safekeeper_discovery_response.ok_or_else(|| {
Status::new(
Code::InvalidArgument,
"missing safekeeper_discovery_response",
)
})?,
)),
MessageType::Unknown => Err(Status::new(
Code::InvalidArgument,
format!("invalid message type: {:?}", proto_msg.r#type),
)),
}
}
/// Get the tenant_timeline_id from the message.
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
match self {
Message::SafekeeperTimelineInfo(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
}
}
/// Convert internal message to the protobuf struct.
pub fn as_typed_message(&self) -> TypedMessage {
let mut res = TypedMessage {
r#type: self.message_type() as i32,
..Default::default()
};
match self {
Message::SafekeeperTimelineInfo(msg) => {
res.safekeeper_timeline_info = Some(msg.clone())
}
Message::SafekeeperDiscoveryRequest(msg) => {
res.safekeeper_discovery_request = Some(msg.clone())
}
Message::SafekeeperDiscoveryResponse(msg) => {
res.safekeeper_discovery_response = Some(msg.clone())
}
}
res
}
/// Get the message type.
pub fn message_type(&self) -> MessageType {
match self {
Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
}
}
}
#[derive(Copy, Clone, Debug)]
enum SubscriptionKey {
All,
Timeline(TenantTimelineId),
}
impl SubscriptionKey {
/// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
match key {
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
}
}
}
/// Parse from FilterTenantTimelineId
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn from_proto_filter_tenant_timeline_id(
opt: Option<&FilterTenantTimelineId>,
) -> Result<Self, Status> {
if opt.is_none() {
return Ok(SubscriptionKey::All);
}
let f = opt.unwrap();
if !f.enabled {
return Ok(SubscriptionKey::All);
}
let ttid =
parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
Ok(SubscriptionKey::Timeline(ttid))
}
}
/// Channel to timeline subscribers.
struct ChanToTimelineSub {
chan: broadcast::Sender<Message>,
/// Tracked separately to know when delete the shmem entry. receiver_count()
/// is unhandy for that as unregistering and dropping the receiver side
/// happens at different moments.
num_subscribers: u64,
}
struct SharedState {
next_pub_id: PubId,
num_pubs: i64,
next_sub_id: SubId,
num_subs_to_timelines: i64,
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
num_subs_to_all: i64,
chan_to_all_subs: broadcast::Sender<Message>,
}
impl SharedState {
pub fn new(all_keys_chan_size: usize) -> Self {
SharedState {
next_pub_id: 0,
num_pubs: 0,
next_sub_id: 0,
num_subs_to_timelines: 0,
chans_to_timeline_subs: HashMap::new(),
num_subs_to_all: 0,
chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
}
}
// Register new publisher.
pub fn register_publisher(&mut self) -> PubId {
let pub_id = self.next_pub_id;
self.next_pub_id += 1;
self.num_pubs += 1;
NUM_PUBS.set(self.num_pubs);
pub_id
}
// Unregister publisher.
pub fn unregister_publisher(&mut self) {
self.num_pubs -= 1;
NUM_PUBS.set(self.num_pubs);
}
// Register new subscriber.
pub fn register_subscriber(
&mut self,
sub_key: SubscriptionKey,
timeline_chan_size: usize,
) -> (SubId, broadcast::Receiver<Message>) {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
let sub_rx = match sub_key {
SubscriptionKey::All => {
self.num_subs_to_all += 1;
NUM_SUBS_ALL.set(self.num_subs_to_all);
self.chan_to_all_subs.subscribe()
}
SubscriptionKey::Timeline(ttid) => {
self.num_subs_to_timelines += 1;
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
// Create new broadcast channel for this key, or subscriber to
// the existing one.
let chan_to_timeline_sub =
self.chans_to_timeline_subs
.entry(ttid)
.or_insert(ChanToTimelineSub {
chan: broadcast::channel(timeline_chan_size).0,
num_subscribers: 0,
});
chan_to_timeline_sub.num_subscribers += 1;
chan_to_timeline_sub.chan.subscribe()
}
};
(sub_id, sub_rx)
}
// Unregister the subscriber.
pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
match sub_key {
SubscriptionKey::All => {
self.num_subs_to_all -= 1;
NUM_SUBS_ALL.set(self.num_subs_to_all);
}
SubscriptionKey::Timeline(ttid) => {
self.num_subs_to_timelines -= 1;
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
// Remove from the map, destroying the channel, if we are the
// last subscriber to this timeline.
// Missing entry is a bug; we must have registered.
let chan_to_timeline_sub = self
.chans_to_timeline_subs
.get_mut(&ttid)
.expect("failed to find sub entry in shmem during unregister");
chan_to_timeline_sub.num_subscribers -= 1;
if chan_to_timeline_sub.num_subscribers == 0 {
self.chans_to_timeline_subs.remove(&ttid);
}
}
}
}
}
// SharedState wrapper.
#[derive(Clone)]
struct Registry {
shared_state: Arc<RwLock<SharedState>>,
timeline_chan_size: usize,
}
impl Registry {
// Register new publisher in shared state.
pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
let pub_id = self.shared_state.write().register_publisher();
info!("publication started id={} addr={:?}", pub_id, remote_addr);
Publisher {
id: pub_id,
registry: self.clone(),
remote_addr,
}
}
pub fn unregister_publisher(&self, publisher: &Publisher) {
self.shared_state.write().unregister_publisher();
info!(
"publication ended id={} addr={:?}",
publisher.id, publisher.remote_addr
);
}
// Register new subscriber in shared state.
pub fn register_subscriber(
&self,
sub_key: SubscriptionKey,
remote_addr: SocketAddr,
) -> Subscriber {
let (sub_id, sub_rx) = self
.shared_state
.write()
.register_subscriber(sub_key, self.timeline_chan_size);
info!(
"subscription started id={}, key={:?}, addr={:?}",
sub_id, sub_key, remote_addr
);
Subscriber {
id: sub_id,
key: sub_key,
sub_rx,
registry: self.clone(),
remote_addr,
}
}
// Unregister the subscriber
pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
self.shared_state
.write()
.unregister_subscriber(subscriber.key);
info!(
"subscription ended id={}, key={:?}, addr={:?}",
subscriber.id, subscriber.key, subscriber.remote_addr
);
}
/// Send msg to relevant subscribers.
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
PROCESSED_MESSAGES_TOTAL.inc();
// send message to subscribers for everything
let shared_state = self.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers, if there is ttid
let ttid = msg.tenant_timeline_id()?;
if let Some(ttid) = ttid {
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
}
Ok(())
}
}
// Private subscriber state.
struct Subscriber {
id: SubId,
key: SubscriptionKey,
// Subscriber receives messages from publishers here.
sub_rx: broadcast::Receiver<Message>,
// to unregister itself from shared state in Drop
registry: Registry,
// for logging
remote_addr: SocketAddr,
}
impl Drop for Subscriber {
fn drop(&mut self) {
self.registry.unregister_subscriber(self);
}
}
// Private publisher state
struct Publisher {
id: PubId,
registry: Registry,
// for logging
remote_addr: SocketAddr,
}
impl Publisher {
/// Send msg to relevant subscribers.
#[allow(clippy::result_large_err, reason = "TODO")]
pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
self.registry.send_msg(msg)
}
}
impl Drop for Publisher {
fn drop(&mut self) {
self.registry.unregister_publisher(self);
}
}
struct Broker {
registry: Registry,
}
#[tonic::async_trait]
impl BrokerService for Broker {
async fn publish_safekeeper_info(
&self,
request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
) -> Result<Response<()>, Status> {
let &RemoteAddr(remote_addr) = request
.extensions()
.get()
.expect("RemoteAddr inserted by handler");
let mut publisher = self.registry.register_publisher(remote_addr);
let mut stream = request.into_inner();
loop {
match stream.next().await {
Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
Some(Err(e)) => return Err(e), // grpc error from the stream
None => break, // closed stream
}
}
Ok(Response::new(()))
}
type SubscribeSafekeeperInfoStream =
Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
async fn subscribe_safekeeper_info(
&self,
request: Request<SubscribeSafekeeperInfoRequest>,
) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
let &RemoteAddr(remote_addr) = request
.extensions()
.get()
.expect("RemoteAddr inserted by handler");
let proto_key = request
.into_inner()
.subscription_key
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
let mut warn_interval = time::interval(Duration::from_millis(1000));
let mut missed_msgs: u64 = 0;
loop {
match subscriber.sub_rx.recv().await {
Ok(info) => {
match info {
Message::SafekeeperTimelineInfo(info) => yield info,
_ => {},
}
BROADCASTED_MESSAGES_TOTAL.inc();
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;
}
}
Err(RecvError::Closed) => {
// can't happen, we never drop the channel while there is a subscriber
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
}
}
}
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
))
}
type SubscribeByFilterStream =
Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
/// Subscribe to all messages, limited by a filter.
async fn subscribe_by_filter(
&self,
request: Request<SubscribeByFilterRequest>,
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
let &RemoteAddr(remote_addr) = request
.extensions()
.get()
.expect("RemoteAddr inserted by handler");
let proto_filter = request.into_inner();
let ttid_filter = proto_filter.tenant_timeline_id.as_ref();
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
let types_set = proto_filter
.types
.iter()
.map(|t| t.r#type)
.collect::<std::collections::HashSet<_>>();
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
let mut warn_interval = time::interval(Duration::from_millis(1000));
let mut missed_msgs: u64 = 0;
loop {
match subscriber.sub_rx.recv().await {
Ok(msg) => {
let msg_type = msg.message_type() as i32;
if types_set.contains(&msg_type) {
yield msg.as_typed_message();
BROADCASTED_MESSAGES_TOTAL.inc();
}
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;
}
}
Err(RecvError::Closed) => {
// can't happen, we never drop the channel while there is a subscriber
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
}
}
}
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeByFilterStream
))
}
/// Publish one message.
async fn publish_one(
&self,
request: Request<TypedMessage>,
) -> std::result::Result<Response<()>, Status> {
let msg = Message::from(request.into_inner())?;
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
self.registry.send_msg(&msg)?;
Ok(Response::new(()))
}
}
// We serve only metrics and healthcheck through http1.
async fn http1_handler(
req: hyper::Request<Incoming>,
) -> Result<hyper::Response<BoxBody>, Infallible> {
let resp = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let metrics = metrics::gather();
let encoder = TextEncoder::new();
encoder.encode(&metrics, &mut buffer).unwrap();
hyper::Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(body::boxed(Full::new(bytes::Bytes::from(buffer))))
.unwrap()
}
(&Method::GET, "/status") => hyper::Response::builder()
.status(StatusCode::OK)
.body(empty_body())
.unwrap(),
_ => hyper::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(empty_body())
.unwrap(),
};
Ok(resp)
}
#[derive(Clone, Copy)]
struct RemoteAddr(SocketAddr);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
// important to keep the order of:
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
// On any shutdown signal, log receival and exit.
std::thread::spawn(move || {
ShutdownSignals::handle(|signal| {
info!("received {}, terminating", signal.name());
std::process::exit(0);
})
});
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
timeline_chan_size: args.timeline_chan_size,
};
let storage_broker_impl = Broker {
registry: registry.clone(),
};
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
let http_listener = match &args.listen_addr {
Some(addr) => {
info!("listening HTTP on {}", addr);
Some(TcpListener::bind(addr).await?)
}
None => None,
};
let (https_listener, tls_acceptor) = match &args.listen_https_addr {
Some(addr) => {
let listener = TcpListener::bind(addr).await?;
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&args.ssl_key_file,
&args.ssl_cert_file,
args.ssl_cert_reload_period,
)
.await?;
let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
// Tonic is HTTP/2 only and it negotiates it with ALPN.
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
info!("listening HTTPS on {}", addr);
(Some(listener), Some(acceptor))
}
None => (None, None),
};
// grpc is served along with http1 for metrics on a single port, hence we
// don't use tonic's Server.
loop {
let (conn, is_https) = tokio::select! {
Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false),
Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true),
};
let (tcp_stream, addr) = match conn {
Ok(v) => v,
Err(e) => {
info!("couldn't accept connection: {e}");
continue;
}
};
let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
builder.http1().timer(TokioTimer::new());
builder
.http2()
.timer(TokioTimer::new())
.keep_alive_interval(Some(args.http2_keepalive_interval))
// This matches the tonic server default. It allows us to support production-like workloads.
.max_concurrent_streams(None);
let storage_broker_server_cloned = storage_broker_server.clone();
let remote_addr = RemoteAddr(addr);
let service_fn_ = async move {
service_fn(move |mut req| {
// That's what tonic's MakeSvc.call does to pass conninfo to
// the request handler (and where its request.remote_addr()
// expects it to find).
req.extensions_mut().insert(remote_addr);
// Technically this second clone is not needed, but consume
// by async block is apparently unavoidable. BTW, error
// message is enigmatic, see
// https://github.com/rust-lang/rust/issues/68119
//
// We could get away without async block at all, but then we
// need to resort to futures::Either to merge the result,
// which doesn't caress an eye as well.
let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
async move {
if req.headers().get("content-type").map(|x| x.as_bytes())
== Some(b"application/grpc")
{
let res_resp = storage_broker_server_svc.call(req).await;
// Grpc and http1 handlers have slightly different
// Response types: it is UnsyncBoxBody for the
// former one (not sure why) and plain hyper::Body
// for the latter. Both implement HttpBody though,
// and `Either` is used to merge them.
res_resp.map(|resp| resp.map(http_body_util::Either::Left))
} else {
let res_resp = http1_handler(req).await;
res_resp.map(|resp| resp.map(http_body_util::Either::Right))
}
}
})
}
.await;
let tls_acceptor = tls_acceptor.clone();
tokio::task::spawn(async move {
let res = if is_https {
let tls_acceptor =
tls_acceptor.expect("tls_acceptor is set together with https_listener");
let tls_stream = match tls_acceptor.accept(tcp_stream).await {
Ok(tls_stream) => tls_stream,
Err(e) => {
info!("error accepting TLS connection from {addr}: {e}");
return;
}
};
builder
.serve_connection(TokioIo::new(tls_stream), service_fn_)
.await
} else {
builder
.serve_connection(TokioIo::new(tcp_stream), service_fn_)
.await
};
if let Err(e) = res {
info!(%is_https, "error serving connection from {addr}: {e}");
}
});
}
}
#[cfg(test)]
mod tests {
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::sync::broadcast::error::TryRecvError;
use utils::id::{TenantId, TimelineId};
use super::*;
fn msg(timeline_id: Vec<u8>) -> Message {
Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0x00; 16],
timeline_id,
}),
term: 0,
last_log_term: 0,
flush_lsn: 1,
commit_lsn: 2,
backup_lsn: 3,
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,
})
}
fn tli_from_u64(i: u64) -> Vec<u8> {
let mut timeline_id = vec![0xFF; 8];
timeline_id.extend_from_slice(&i.to_be_bytes());
timeline_id
}
fn mock_addr() -> SocketAddr {
"127.0.0.1:8080".parse().unwrap()
}
#[tokio::test]
async fn test_registry() {
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
timeline_chan_size: 16,
};
// subscribe to timeline 2
let ttid_2 = TenantTimelineId {
tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
};
let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
// send two messages with different keys
let msg_1 = msg(tli_from_u64(1));
let msg_2 = msg(tli_from_u64(2));
let mut publisher = registry.register_publisher(mock_addr());
publisher.send_msg(&msg_1).expect("failed to send msg");
publisher.send_msg(&msg_2).expect("failed to send msg");
// msg with key 2 should arrive to subscriber_2
assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
// but nothing more
assert_eq!(
subscriber_2.sub_rx.try_recv().unwrap_err(),
TryRecvError::Empty
);
// subscriber_all should receive both messages
assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
assert_eq!(
subscriber_all.sub_rx.try_recv().unwrap_err(),
TryRecvError::Empty
);
}
}

View File

@@ -475,18 +475,6 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["-m", "immediate"])
return self.raw_cli(args)
def storage_broker_start(
self, timeout_in_seconds: int | None = None
) -> subprocess.CompletedProcess[str]:
cmd = ["storage_broker", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
return self.raw_cli(cmd)
def storage_broker_stop(self) -> subprocess.CompletedProcess[str]:
cmd = ["storage_broker", "stop"]
return self.raw_cli(cmd)
def endpoint_create(
self,
branch_name: str,

View File

@@ -501,9 +501,6 @@ class NeonEnvBuilder:
# Flag to use https listener in storage controller, generate local ssl certs,
# and force pageservers and neon_local to use https for storage controller api.
self.use_https_storage_controller_api: bool = False
# Flag to use https listener in storage broker, generate local ssl certs,
# and force pageservers and safekeepers to use https for storage broker api.
self.use_https_storage_broker_api: bool = False
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
@@ -1030,8 +1027,6 @@ class NeonEnvBuilder:
self.env.storage_controller.assert_no_errors()
self.env.broker.assert_no_errors()
self.env.endpoint_storage.assert_no_errors()
try:
@@ -1089,7 +1084,6 @@ class NeonEnv:
self.safekeepers: list[Safekeeper] = []
self.pageservers: list[NeonPageserver] = []
self.num_azs = config.num_azs
self.broker = NeonBroker(self, config.use_https_storage_broker_api)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
self.pg_version = config.pg_version
@@ -1109,7 +1103,6 @@ class NeonEnv:
config.use_https_pageserver_api
or config.use_https_safekeeper_api
or config.use_https_storage_controller_api
or config.use_https_storage_broker_api
)
self.ssl_ca_file = (
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
@@ -1189,11 +1182,6 @@ class NeonEnv:
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
if config.use_https_storage_broker_api:
cfg["broker"]["listen_https_addr"] = self.broker.listen_addr()
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_hooks_api is not None:
@@ -1388,8 +1376,6 @@ class NeonEnv:
with concurrent.futures.ThreadPoolExecutor(
max_workers=2 + len(self.pageservers) + len(self.safekeepers)
) as executor:
futs.append(executor.submit(lambda: self.broker.start()))
for pageserver in self.pageservers:
futs.append(
executor.submit(
@@ -1478,7 +1464,6 @@ class NeonEnv:
pageserver.stop(immediate=immediate)
except RuntimeError:
stop_later.append(pageserver)
self.broker.stop()
# TODO: for nice logging we need python 3.11 ExceptionGroup
for ps in stop_later:
@@ -4937,41 +4922,6 @@ class Safekeeper(LogUtils):
sk.http_client().timeline_create(create_r)
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""
def __init__(self, env: NeonEnv, use_https: bool):
super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log")
self.env = env
self.scheme = "https" if use_https else "http"
self.port: int = self.env.port_distributor.get_port()
self.running = False
def start(
self,
timeout_in_seconds: int | None = None,
) -> Self:
assert not self.running
self.env.neon_cli.storage_broker_start(timeout_in_seconds)
self.running = True
return self
def stop(self) -> Self:
if self.running:
self.env.neon_cli.storage_broker_stop()
self.running = False
return self
def listen_addr(self):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"{self.scheme}://{self.listen_addr()}"
def assert_no_errors(self):
assert_no_errors(self.logfile, "storage_controller", [])
class NodeKind(StrEnum):
PAGESERVER = "pageserver"
SAFEKEEPER = "safekeeper"

View File

@@ -139,7 +139,6 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
env.neon_cli.safekeeper_stop()
env.neon_cli.storage_controller_stop(False)
env.neon_cli.endpoint_storage_stop(False)
env.neon_cli.storage_broker_stop()
# Keep NeonEnv state up to date, it usually owns starting/stopping services
env.pageserver.running = False
@@ -189,7 +188,6 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
# Stop this to get out of the way of the following `start`
env.neon_cli.storage_controller_stop(False)
env.neon_cli.storage_broker_stop()
# Default start
res = env.neon_cli.raw_cli(["start"])

View File

@@ -214,23 +214,3 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
wait_until(reload_failed)
def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder):
"""
Test HTTPS storage broker API.
1. Make /status request to HTTPS API to ensure it's appropriately configured.
2. Generate simple workload to ensure that SK -> broker -> PS communication works well.
"""
neon_env_builder.use_https_storage_broker_api = True
env = neon_env_builder.init_start()
# 1. Simple check that HTTPS is enabled and works.
url = env.broker.client_url() + "/status"
assert url.startswith("https://")
requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status()
# 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS.
workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()
workload.write_rows(10)
workload.validate()

View File

@@ -89,7 +89,6 @@ def test_storage_controller_smoke(
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
env.broker.start()
env.storage_controller.start()
env.pageservers[0].start()
env.pageservers[1].start()
@@ -345,7 +344,6 @@ def prepare_onboarding_env(
# Start services by hand so that we can skip registration on one of the pageservers
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
env.endpoint_storage.start()
@@ -2776,10 +2774,6 @@ def start_env(env: NeonEnv, storage_controller_port: int):
with concurrent.futures.ThreadPoolExecutor(
max_workers=2 + len(env.pageservers) + len(env.safekeepers)
) as executor:
futs.append(
executor.submit(lambda: env.broker.start() or None)
) # The `or None` is for the linter
for pageserver in env.pageservers:
futs.append(
executor.submit(