Add authentication between Safekeeper and Pageserver/Compute

* Fix https://github.com/neondatabase/neon/issues/1854
* Never log Safekeeper::conninfo in walproposer as it now contains a secret token
* control_panel, test_runner: generate and pass JWT tokens for Safekeeper to compute and pageserver
* Compute: load JWT token for Safekepeer from the environment variable. Do not reuse the token from
  pageserver_connstring because it's embedded in there weirdly.
* Pageserver: load JWT token for Safekeeper from the environment variable.
* Rewrite docs/authentication.md
This commit is contained in:
Egor Suvorov
2022-11-11 13:25:40 +02:00
committed by Egor Suvorov
parent 1ca76776d0
commit ae53dc3326
21 changed files with 438 additions and 50 deletions

View File

@@ -49,11 +49,16 @@ pub enum InitialPidFile<'t> {
}
/// Start a background child process using the parameters given.
pub fn start_process<F, S: AsRef<OsStr>>(
pub fn start_process<
F,
S: AsRef<OsStr>,
EI: IntoIterator<Item = (String, String)>, // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
>(
process_name: &str,
datadir: &Path,
command: &Path,
args: &[S],
envs: EI,
initial_pid_file: InitialPidFile,
process_status_check: F,
) -> anyhow::Result<Child>
@@ -79,6 +84,7 @@ where
.stderr(same_file_for_stderr)
.args(args);
let filled_cmd = fill_aws_secrets_vars(fill_rust_env_vars(background_command));
filled_cmd.envs(envs);
let mut spawned_process = filled_cmd.spawn().with_context(|| {
format!("Could not spawn {process_name}, see console output and log files for details.")

View File

@@ -322,6 +322,9 @@ impl PostgresNode {
conf.append("shared_preload_libraries", "neon");
conf.append_line("");
conf.append("neon.pageserver_connstring", &pageserver_connstr);
if let AuthType::NeonJWT = auth_type {
conf.append("neon.safekeeper_token_env", "$ZENITH_AUTH_TOKEN");
}
conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {

View File

@@ -39,6 +39,7 @@ pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
&etcd_data_dir,
&etcd_broker.etcd_binary_path,
&args,
[],
background_process::InitialPidFile::Create(&pid_file_path),
|| {
for broker_endpoint in &etcd_broker.broker_endpoints {

View File

@@ -14,6 +14,7 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
@@ -253,11 +254,21 @@ impl PageServerNode {
args.extend(["-c", config_override]);
}
let envs = if self.env.pageserver.auth_type != AuthType::Trust {
// Generate a token to connect from the pageserver to a safekeeper
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
vec![("ZENITH_AUTH_TOKEN".to_owned(), token)]
} else {
vec![]
};
background_process::start_process(
"pageserver",
datadir,
&self.env.pageserver_bin(),
&args,
envs,
background_process::InitialPidFile::Expect(&self.pid_file()),
|| match self.check_status() {
Ok(()) => Ok(true),

View File

@@ -166,6 +166,7 @@ impl SafekeeperNode {
&datadir,
&self.env.safekeeper_bin(),
&args,
[],
background_process::InitialPidFile::Expect(&self.pid_file()),
|| match self.check_status() {
Ok(()) => Ok(true),

View File

@@ -1,30 +1,154 @@
## Authentication
### Overview
We use JWT tokens in communication between almost all components (compute, pageserver, safekeeper, CLI) regardless of the protocol used (HTTP/PostgreSQL).
Etcd currently has no authentication.
Authentication is optional and is disabled by default for easier debugging.
It is used in some tests, though.
Note that we do not cover authentication with `pg.neon.tech` here.
Current state of authentication includes usage of JWT tokens in communication between compute and pageserver and between CLI and pageserver. JWT token is signed using RSA keys. CLI generates a key pair during call to `neon_local init`. Using following openssl commands:
For HTTP connections we use the Bearer authentication scheme.
For PostgreSQL connections we expect the token to be passed as a password.
There is a caveat for `psql`: it silently truncates passwords to 100 symbols, so to correctly pass JWT via `psql` you have to either use `PGPASSWORD` environment variable, or store password in `psql`'s config file.
Current token scopes are described in `utils::auth::Scope`.
There are no expiration or rotation schemes.
_TODO_: some scopes allow both access to server management API and to the data.
These probably should be split into multiple scopes.
Tokens should not occur in logs.
They may sometimes occur in configuration files, although this is discouraged
because configs may be parsed and dumped into logs.
#### Tokens generation and validation
JWT tokens are signed using a private key.
Compute/pageserver/safekeeper use the private key's public counterpart to validate JWT tokens.
These components should not have access to the private key and may only get tokens from their configuration or external clients.
The key pair is generated once for an installation of compute/pageserver/safekeeper, e.g. by `neon_local init`.
There is currently no way to rotate the key without bringing down all components.
### CLI
CLI generates a key pair during call to `neon_local init` with the following commands:
```bash
openssl genrsa -out private_key.pem 2048
openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem
openssl genrsa -out auth_private_key.pem 2048
openssl rsa -in auth_private_key.pem -pubout -outform PEM -out auth_public_key.pem
```
CLI also generates signed token and saves it in the config for later access to pageserver. Now authentication is optional. Pageserver has two variables in config: `auth_validation_public_key_path` and `auth_type`, so when auth type present and set to `NeonJWT` pageserver will require authentication for connections. Actual JWT is passed in password field of connection string. There is a caveat for psql, it silently truncates passwords to 100 symbols, so to correctly pass JWT via psql you have to either use PGPASSWORD environment variable, or store password in psql config file.
Configuration files for all components point to `public_key.pem` for JWT validation.
However, authentication is disabled by default.
There is no way to automatically enable it everywhere, you have to configure each component individually.
Currently there is no authentication between compute and safekeepers, because this communication layer is under heavy refactoring. After this refactoring support for authentication will be added there too. Now safekeeper supports "hardcoded" token passed via environment variable to be able to use callmemaybe command in pageserver.
CLI also generates signed token (full access to Pageserver) and saves it in
the CLI's `config` file under `pageserver.auth_token`.
Note that pageserver's config does not have any similar parameter.
CLI is the only component which accesses that token.
Technically it could generate it from the private key on each run,
but it does not do that for some reason (_TODO_).
Compute uses token passed via environment variable to communicate to pageserver and in the future to the safekeeper too.
### Compute
#### Overview
Compute is a per-timeline PostgreSQL instance, so it should not have
any access to data of other tenants.
All tokens used by a compute are restricted to a specific tenant.
There is no auth isolation from other timelines of the same tenant,
but a non-rogue client never accesses another timeline even by an accident:
timeline IDs are random and hard to guess.
JWT authentication now supports two scopes: tenant and pageserverapi. Tenant scope is intended for use in tenant related api calls, e.g. create_branch. Compute launched for particular tenant also uses this scope. Scope pageserver api is intended to be used by console to manage pageserver. For now we have only one management operation - create tenant.
#### Incoming connections
All incoming connections are from PostgreSQL clients.
Their authentication is just plain PostgreSQL authentication and out of scope for this document.
Examples for token generation in python:
There is no administrative API except those provided by PostgreSQL.
#### Outgoing connections
Compute connects to Pageserver for getting pages.
The connection string is configured by the `neon.pageserver_connstring` PostgreSQL GUC, e.g. `postgresql://no_user:$ZENITH_AUTH_TOKEN@localhost:15028`.
The environment variable inside the connection string is substituted with
the JWT token.
Compute connects to Safekeepers to write and commit data.
The token is the same for all safekeepers.
It's stored in an environment variable, whose name is configured
by the `neon.safekeeper_token_env` PostgreSQL GUC.
If the GUC is unset, no token is passed.
Note that both tokens can be (and typically are) the same;
the scope is the tenant and the token is usually passed through the
`$ZENITH_AUTH_TOKEN` environment variable.
### Pageserver
#### Overview
Pageserver keeps track of multiple tenants, each having multiple timelines.
For each timeline, it connects to the corresponding Safekeeper.
Information about "corresponding Safekeeper" is published by Safekeepers
in the Etcd, but they do not publish access tokens, otherwise what is
the point of authentication.
Pageserver keeps a connection to some set of Safekeepers, which
may or may not correspond to active Computes.
Hence, we cannot obtain a per-timeline access token from a Compute.
E.g. if the timeline's Compute terminates before all WAL is
consumed by the Pageserver, the Pageserver continues consuming WAL.
Pageserver replicas' authentication is the same as the main's.
#### Incoming connections
Pageserver listens for connections from computes.
Each compute should present a token valid for the timeline's tenant.
Pageserver also has HTTP API: some parts are per-tenant,
some parts are server-wide, these are different scopes.
The `auth_type` configuration variable in Pageserver's config may have
either of three values:
* `Trust` removes all authentication. The outdated `MD5` value does likewise
* `NeonJWT` enables JWT validation.
Tokens are validated using the public key which lies in a PEM file
specified in the `auth_validation_public_key_path` config.
#### Outgoing connections
Pageserver makes a connection to a Safekeeper for each active timeline.
As Pageserver may want to access any timeline it has on the disk,
it is given a blanket JWT token to access any data on any Safekeeper.
This token is passed through an environment variable called `ZENITH_AUTH_TOKEN`
(non-configurable as of writing this text).
A better way _may be_ to store JWT token for each timeline next to it,
but may be not.
### Safekeeper
#### Overview
Safekeeper keeps track of multiple tenants, each having multiple timelines.
#### Incoming connections
Safekeeper accepts connections from Compute/Pageserver, each
connection corresponds to a specific timeline and requires
a corresponding JWT token.
Safekeeper also has HTTP API: some parts are per-tenant,
some parts are server-wide, these are different scopes.
The `auth-validation-public-key-path` command line options controls
the authentication mode:
* If the option is missing, there is no authentication or JWT token validation.
* If the option is present, it should be a path to the public key PEM file used for JWT token validation.
#### Outgoing connections
No connections are initiated by a Safekeeper.
### In the source code
Tests do not use authentication by default.
If you need it, you can enable it by configuring the test's environment:
```python
# generate pageserverapi token
management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256")
# generate tenant token
tenant_token = jwt.encode({"scope": "tenant", "tenant_id": ps.initial_tenant}, auth_keys.priv, algorithm="RS256")
neon_env_builder.auth_enabled = True
```
Utility functions to work with jwts in rust are located in libs/utils/src/auth.rs
You will have to generate tokens if you want to access components inside the test directly,
use `AuthKeys.generate_*_token` methods for that.
If you create a new scope, please create a new method to prevent mistypes in scope's name.

View File

@@ -21,8 +21,16 @@ const JWT_ALGORITHM: Algorithm = Algorithm::RS256;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
// Provides access to all data for a specific tenant (specified in `struct Claims` below)
// TODO: join these two?
Tenant,
// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
// Should only be used e.g. for status check/tenant creation/list.
PageServerApi,
// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
// Should only be used e.g. for status check.
// Currently also used for connection from any pageserver to any safekeeper.
SafekeeperData,
}
#[serde_as]

View File

@@ -15,5 +15,8 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
}
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
(Scope::SafekeeperData, _) => {
bail!("SafekeeperData scope makes no sense for Pageserver")
}
}
}

View File

@@ -1,5 +1,7 @@
//! Main entry point for the Page Server executable.
use std::env::{var, VarError};
use std::sync::Arc;
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use anyhow::{anyhow, Context};
@@ -270,6 +272,23 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
};
info!("Using auth: {:#?}", conf.auth_type);
match var("ZENITH_AUTH_TOKEN") {
Ok(v) => {
info!("Loaded JWT token for authentication with Safekeeper");
pageserver::config::SAFEKEEPER_AUTH_TOKEN
.set(Arc::new(v))
.map_err(|_| anyhow!("Could not initialize SAFEKEEPER_AUTH_TOKEN"))?;
}
Err(VarError::NotPresent) => {
info!("No JWT token for authentication with Safekeeper detected");
}
Err(e) => {
return Err(e).with_context(|| {
"Failed to either load to detect non-present ZENITH_AUTH_TOKEN environment variable"
})
}
};
let remote_storage = conf
.remote_storage_config
.as_ref()

View File

@@ -10,9 +10,11 @@ use std::env;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
use once_cell::sync::OnceCell;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use toml_edit;
use toml_edit::{Document, Item};
@@ -141,6 +143,15 @@ pub struct PageServerConf {
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
/// and/or serialized at a whim, while the token is secret. Currently this token is the
/// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
/// the future, more tokens and auth may arrive for etcd and/or its rewrite (see
/// https://github.com/neondatabase/neon/issues/2394), completely changing the logic.
/// Hence, we resort to a global variable for now instead of passing the token from the
/// startup code to the connection code through a dozen layers.
pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProfilingConfig {
Disabled,

View File

@@ -876,6 +876,7 @@ impl Timeline {
walreceiver_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
);
}

View File

@@ -50,6 +50,7 @@ pub fn spawn_connection_manager_task(
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
) {
let mut etcd_client = get_etcd_client().clone();
@@ -70,6 +71,7 @@ pub fn spawn_connection_manager_task(
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token,
);
loop {
select! {
@@ -360,6 +362,7 @@ struct WalreceiverState {
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, EtcdSkTimeline>,
auth_token: Option<Arc<String>>,
}
/// Current connection data.
@@ -408,6 +411,7 @@ impl WalreceiverState {
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
) -> Self {
let id = TenantTimelineId {
tenant_id: timeline.tenant_id,
@@ -422,6 +426,7 @@ impl WalreceiverState {
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token,
}
}
@@ -762,6 +767,10 @@ impl WalreceiverState {
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_deref()?,
match &self.auth_token {
None => None,
Some(x) => Some(x),
},
) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
@@ -841,17 +850,18 @@ fn wal_stream_connection_config(
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(&listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
Ok(
PgConnectionConfig::new_host_port(host, port).extend_options([
Ok(PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
]),
)
])
.set_password(auth_token.map(|s| s.to_owned())))
}
#[cfg(test)]
@@ -1498,6 +1508,7 @@ mod tests {
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token: None,
}
}
}

View File

@@ -44,6 +44,7 @@ PGconn *pageserver_conn = NULL;
WaitEventSet *pageserver_conn_wes = NULL;
char *page_server_connstring_raw;
char *safekeeper_token_env;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
@@ -418,6 +419,15 @@ pg_init_libpagestore(void)
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomStringVariable("neon.safekeeper_token_env",
"the environment variable containing JWT token for authentication with Safekeepers, the convention is to either unset or set to $ZENITH_AUTH_TOKEN",
NULL,
&safekeeper_token_env,
NULL,
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
NULL,
@@ -481,6 +491,24 @@ pg_init_libpagestore(void)
neon_timeline_walproposer = neon_timeline;
neon_tenant_walproposer = neon_tenant;
/* retrieve the token for Safekeeper, if present */
if (safekeeper_token_env != NULL) {
if (safekeeper_token_env[0] != '$') {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("expected safekeeper auth token environment variable's name starting with $ but found: %s",
safekeeper_token_env)));
}
neon_safekeeper_token_walproposer = getenv(&safekeeper_token_env[1]);
if (!neon_safekeeper_token_walproposer) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("cannot get safekeeper auth token, environment variable %s is not set",
&safekeeper_token_env[1])));
}
neon_log(LOG, "using safekeeper auth token from environment variable");
}
if (page_server_connstring && page_server_connstring[0])
{
neon_log(PageStoreTrace, "set neon_smgr hook");

View File

@@ -80,6 +80,7 @@ bool am_wal_proposer;
char *neon_timeline_walproposer = NULL;
char *neon_tenant_walproposer = NULL;
char *neon_safekeeper_token_walproposer = NULL;
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
@@ -513,19 +514,17 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
Safekeeper *sk = &safekeeper[n_safekeepers];
int written = 0;
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, neon_timeline_walproposer, neon_tenant_walproposer);
if (neon_safekeeper_token_walproposer != NULL) {
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
"host=%s port=%s password=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, neon_safekeeper_token_walproposer, neon_timeline_walproposer,
neon_tenant_walproposer);
} else {
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, neon_timeline_walproposer, neon_tenant_walproposer);
}
/*
* currently connection string is not that long, but once we pass
* something like jwt we might overflow the buffer,
*/
/*
* so it is better to be defensive and check that everything aligns
* well
*/
if (written > MAXCONNINFO || written < 0)
elog(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
}
@@ -721,12 +720,13 @@ ResetConnection(Safekeeper *sk)
* According to libpq docs:
* "If the result is CONNECTION_BAD, the connection attempt has already failed,
* typically because of invalid connection parameters."
* We should report this failure.
* We should report this failure. Do not print the exact `conninfo` as it may
* contain e.g. password. The error message should already provide enough information.
*
* https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS
*/
elog(WARNING, "Immediate failure to connect with node:\n\t%s\n\terror: %s",
sk->conninfo, walprop_error_message(sk->conn));
elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
/*
* Even though the connection failed, we still need to clean up the

View File

@@ -41,6 +41,7 @@ typedef struct WalMessage WalMessage;
extern char *neon_timeline_walproposer;
extern char *neon_tenant_walproposer;
extern char *neon_safekeeper_token_walproposer;
/* Possible return values from ReadPGAsync */
typedef enum
@@ -337,8 +338,13 @@ typedef struct Safekeeper
{
char const *host;
char const *port;
char conninfo[MAXCONNINFO]; /* connection info for*
* connecting/reconnecting */
/*
* connection string for connecting/reconnecting.
*
* May contain private information like password and should not be logged.
*/
char conninfo[MAXCONNINFO];
/*
* postgres protocol connection to the WAL acceptor

View File

@@ -14,5 +14,6 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
Ok(())
}
(Scope::PageServerApi, _) => bail!("PageServerApi scope makes no sense for Safekeeper"),
(Scope::SafekeeperData, _) => Ok(()),
}
}

View File

@@ -208,11 +208,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
let conf_ = conf.clone();
let auth_ = auth.clone();
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
let router = http::make_router(conf_, auth);
let router = http::make_router(conf_, auth_);
endpoint::serve_thread_main(
router,
http_listener,
@@ -226,8 +227,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
.spawn(|| {
// TODO: add auth
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) {
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener, auth) {
info!("safekeeper thread terminated: {e}");
}
})
@@ -254,7 +254,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
thread::Builder::new()
.name("WAL removal thread".into())
.spawn(|| {
// TODO: add auth?
remove_wal::thread_main(conf_);
})?,
);
@@ -264,7 +263,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
thread::Builder::new()
.name("wal backup launcher thread".into())
.spawn(move || {
// TODO: add auth?
wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx);
})?,
);

View File

@@ -1,19 +1,23 @@
//! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
//! protocol commands.
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::receive_wal::ReceiveWalConn;
use crate::send_wal::ReplicationConn;
use crate::{GlobalTimelines, SafeKeeperConf};
use anyhow::{bail, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use postgres_ffi::PG_TLI;
use regex::Regex;
use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
use std::str;
use std::sync::Arc;
use tracing::info;
use utils::auth::{Claims, JwtAuth, Scope};
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
@@ -28,6 +32,8 @@ pub struct SafekeeperPostgresHandler {
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub ttid: TenantTimelineId,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
}
/// Parsed Postgres command.
@@ -93,7 +99,44 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
}
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackend,
jwt_response: &[u8],
) -> anyhow::Result<()> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
// which requires auth to be present
let data = self
.auth
.as_ref()
.unwrap()
.decode(str::from_utf8(jwt_response)?)?;
if matches!(data.claims.scope, Scope::Tenant) {
ensure!(
data.claims.tenant_id.is_some(),
"jwt token scope is Tenant, but tenant id is missing"
)
}
info!(
"jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
data.claims.scope, data.claims.tenant_id,
);
self.claims = Some(data.claims);
Ok(())
}
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
let cmd = parse_cmd(query_string)?;
info!(
@@ -103,6 +146,7 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
@@ -122,16 +166,35 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
impl SafekeeperPostgresHandler {
pub fn new(conf: SafeKeeperConf) -> Self {
pub fn new(conf: SafeKeeperConf, auth: Option<Arc<JwtAuth>>) -> Self {
SafekeeperPostgresHandler {
conf,
appname: None,
tenant_id: None,
timeline_id: None,
ttid: TenantTimelineId::empty(),
auth,
claims: None,
}
}
// when accessing management api supply None as an argument
// when using to authorize tenant pass corresponding tenant id
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<()> {
if self.auth.is_none() {
// auth is set to Trust, nothing to check so just return ok
return Ok(());
}
// auth is some, just checked above, when auth is some
// then claims are always present because of checks during connection init
// so this expect won't trigger
let claims = self
.claims
.as_ref()
.expect("claims presence already checked");
check_permission(claims, tenant_id)
}
///
/// Handle IDENTIFY_SYSTEM replication command
///

View File

@@ -5,25 +5,32 @@
use anyhow::Result;
use regex::Regex;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use tracing::*;
use utils::auth::JwtAuth;
use crate::handler::SafekeeperPostgresHandler;
use crate::SafeKeeperConf;
use utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> {
pub fn thread_main(
conf: SafeKeeperConf,
listener: TcpListener,
auth: Option<Arc<JwtAuth>>,
) -> Result<()> {
loop {
match listener.accept() {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
let conf = conf.clone();
let auth = auth.clone();
let _ = thread::Builder::new()
.name("WAL service thread".into())
.spawn(move || {
if let Err(err) = handle_socket(socket, conf) {
if let Err(err) = handle_socket(socket, conf, auth) {
error!("connection handler exited: {}", err);
}
})
@@ -44,13 +51,25 @@ fn get_tid() -> u64 {
/// This is run by `thread_main` above, inside a background thread.
///
fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> {
fn handle_socket(
socket: TcpStream,
conf: SafeKeeperConf,
auth: Option<Arc<JwtAuth>>,
) -> Result<()> {
let _enter = info_span!("", tid = ?get_tid()).entered();
socket.set_nodelay(true)?;
let mut conn_handler = SafekeeperPostgresHandler::new(conf);
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?;
let mut conn_handler = SafekeeperPostgresHandler::new(conf, auth.clone());
let pgbackend = PostgresBackend::new(
socket,
match auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
},
None,
false,
)?;
// libpq replication protocol between safekeeper and replicas/pagers
pgbackend.run(&mut conn_handler)?;

View File

@@ -435,6 +435,9 @@ class AuthKeys:
def generate_pageserver_token(self) -> str:
return self.generate_token(scope="pageserverapi")
def generate_safekeeper_token(self) -> str:
return self.generate_token(scope="safekeeperdata")
def generate_tenant_token(self, tenant_id: TenantId) -> str:
return self.generate_token(scope="tenant", tenant_id=str(tenant_id))
@@ -2497,7 +2500,8 @@ class Safekeeper:
# "replication=0" hacks psycopg not to send additional queries
# on startup, see https://github.com/psycopg/psycopg2/pull/482
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
token = self.env.auth_keys.generate_tenant_token(tenant_id)
connstr = f"host=localhost port={self.port.pg} password={token} replication=0 options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
with closing(psycopg2.connect(connstr)) as conn:
# server doesn't support transactions

View File

@@ -1,7 +1,7 @@
from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException, PgProtocol
from fixtures.types import TenantId
@@ -73,3 +73,73 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (5000050000,)
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
timeline_id = env.neon_cli.create_branch(branch)
env.postgres.create_start(branch)
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())
pageserver_token = env.auth_keys.generate_pageserver_token()
safekeeper_token = env.auth_keys.generate_safekeeper_token()
def check_connection(
pg_protocol: PgProtocol, command: str, expect_success: bool, **conn_kwargs
):
def op():
with closing(pg_protocol.connect(**conn_kwargs)) as conn:
with conn.cursor() as cur:
cur.execute(command)
if expect_success:
op()
else:
with pytest.raises(Exception):
op()
def check_pageserver(expect_success: bool, **conn_kwargs):
check_connection(
env.pageserver,
f"get_last_record_rlsn {env.initial_tenant} {timeline_id}",
expect_success,
**conn_kwargs,
)
check_pageserver(not auth_enabled)
if auth_enabled:
check_pageserver(True, password=tenant_token)
env.pageserver.allowed_errors.append(".*Tenant id mismatch. Permission denied.*")
check_pageserver(False, password=invalid_tenant_token)
check_pageserver(True, password=pageserver_token)
env.pageserver.allowed_errors.append(
".*SafekeeperData scope makes no sense for Pageserver.*"
)
check_pageserver(False, password=safekeeper_token)
def check_safekeeper(expect_success: bool, **conn_kwargs):
check_connection(
PgProtocol(
host="localhost",
port=env.safekeepers[0].port.pg,
options=f"ztenantid={env.initial_tenant} ztimelineid={timeline_id}",
),
"IDENTIFY_SYSTEM",
expect_success,
**conn_kwargs,
)
check_safekeeper(not auth_enabled)
if auth_enabled:
check_safekeeper(True, password=tenant_token)
check_safekeeper(False, password=invalid_tenant_token)
check_safekeeper(False, password=pageserver_token)
check_safekeeper(True, password=safekeeper_token)