Add one more WAL service port allowing only tenant scoped auth tokens.

It will make it easier to limit access at network level, with e.g. k8s network
policies.

ref https://github.com/neondatabase/neon/issues/4730
This commit is contained in:
Arseny Sher
2023-07-17 12:18:04 +03:00
committed by Arseny Sher
parent b4d36f572d
commit 1e7db5458f
5 changed files with 75 additions and 9 deletions

View File

@@ -16,7 +16,7 @@ use crate::id::TenantId;
/// Algorithm to use. We require EdDSA.
const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
// Provides access to all data for a specific tenant (specified in `struct Claims` below)

View File

@@ -37,7 +37,7 @@ use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::JwtAuth;
use utils::auth::{JwtAuth, Scope};
use utils::{
id::NodeId,
logging::{self, LogFormat},
@@ -72,6 +72,10 @@ struct Args {
/// Listen endpoint for receiving/sending WAL in the form host:port.
#[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
listen_pg: String,
/// Listen endpoint for receiving/sending WAL in the form host:port allowing
/// only tenant scoped auth tokens. Pointless if auth is disabled.
#[arg(long, default_value = None, verbatim_doc_comment)]
listen_pg_tenant_only: Option<String>,
/// Listen http endpoint for management and metrics in the form host:port.
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
listen_http: String,
@@ -94,7 +98,7 @@ struct Args {
broker_keepalive_interval: Duration,
/// Peer safekeeper is considered dead after not receiving heartbeats from
/// it during this period passed as a human readable duration.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)]
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
heartbeat_timeout: Duration,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
@@ -179,6 +183,7 @@ async fn main() -> anyhow::Result<()> {
workdir,
my_id: id,
listen_pg_addr: args.listen_pg,
listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
listen_http_addr: args.listen_http,
availability_zone: args.availability_zone,
no_sync: args.no_sync,
@@ -222,6 +227,21 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
e
})?;
let pg_listener_tenant_only =
if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only {
info!(
"starting safekeeper tenant scoped WAL service on {}",
listen_pg_addr_tenant_only
);
let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
e
})?;
Some(listener)
} else {
None
};
info!(
"starting safekeeper HTTP service on {}",
conf.listen_http_addr
@@ -253,14 +273,34 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(conf_, pg_listener))
.spawn(wal_service::task_main(
conf_,
pg_listener,
Some(Scope::SafekeeperData),
))
// wrap with task name for error reporting
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf_,
pg_listener_tenant_only,
Some(Scope::Tenant),
))
// wrap with task name for error reporting
.map(|res| ("WAL service tenant only main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
}
let conf_ = conf.clone();
let http_handle = current_thread_rt
.as_ref()

View File

@@ -34,6 +34,8 @@ pub struct SafekeeperPostgresHandler {
pub ttid: TenantTimelineId,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
/// Auth scope allowed on the connections. None if auth is not configured.
allowed_auth_scope: Option<Scope>,
claims: Option<Claims>,
io_metrics: Option<TrafficMetrics>,
}
@@ -147,6 +149,16 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
.unwrap()
.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
let scope = self
.allowed_auth_scope
.expect("auth is enabled but scope is not configured");
// The handler might be configured to allow only tenant scope tokens.
if matches!(scope, Scope::Tenant) && !matches!(data.claims.scope, Scope::Tenant) {
return Err(QueryError::Other(anyhow::anyhow!(
"passed JWT token is for full access, but only tenant scope is allowed"
)));
}
if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
return Err(QueryError::Other(anyhow::anyhow!(
"jwt token scope is Tenant, but tenant id is missing"
@@ -215,7 +227,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
}
impl SafekeeperPostgresHandler {
pub fn new(conf: SafeKeeperConf, conn_id: u32, io_metrics: Option<TrafficMetrics>) -> Self {
pub fn new(
conf: SafeKeeperConf,
conn_id: u32,
io_metrics: Option<TrafficMetrics>,
allowed_auth_scope: Option<Scope>,
) -> Self {
SafekeeperPostgresHandler {
conf,
appname: None,
@@ -224,6 +241,7 @@ impl SafekeeperPostgresHandler {
ttid: TenantTimelineId::empty(),
conn_id,
claims: None,
allowed_auth_scope,
io_metrics,
}
}

View File

@@ -53,6 +53,7 @@ pub struct SafeKeeperConf {
pub workdir: PathBuf,
pub my_id: NodeId,
pub listen_pg_addr: String,
pub listen_pg_addr_tenant_only: Option<String>,
pub listen_http_addr: String,
pub availability_zone: Option<String>,
pub no_sync: bool,
@@ -85,6 +86,7 @@ impl SafeKeeperConf {
workdir: PathBuf::from("./"),
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_pg_addr_tenant_only: None,
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
availability_zone: None,
remote_storage: None,

View File

@@ -8,7 +8,7 @@ use std::{future, time::Duration};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
use tracing::*;
use utils::measured_stream::MeasuredStream;
use utils::{auth::Scope, measured_stream::MeasuredStream};
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::TrafficMetrics;
@@ -19,6 +19,7 @@ use postgres_backend::{AuthType, PostgresBackend};
pub async fn task_main(
conf: SafeKeeperConf,
pg_listener: std::net::TcpListener,
allowed_auth_scope: Option<Scope>,
) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment.
pg_listener.set_nonblocking(true)?;
@@ -33,7 +34,7 @@ pub async fn task_main(
let conn_id = issue_connection_id(&mut connection_count);
tokio::spawn(async move {
if let Err(err) = handle_socket(socket, conf, conn_id)
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope)
.instrument(info_span!("", cid = %conn_id))
.await
{
@@ -49,6 +50,7 @@ async fn handle_socket(
socket: TcpStream,
conf: SafeKeeperConf,
conn_id: ConnectionId,
allowed_auth_scope: Option<Scope>,
) -> Result<(), QueryError> {
socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?;
@@ -84,8 +86,12 @@ async fn handle_socket(
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let mut conn_handler = SafekeeperPostgresHandler::new(
conf,
conn_id,
Some(traffic_metrics.clone()),
allowed_auth_scope,
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.