mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-30 04:40:37 +00:00
Use safekeeper tenant only port in all tests and actually test it.
Compute now uses special safekeeper WAL service port allowing auth tokens with only tenant scope. Adds understanding of this port to neon_local and fixtures, as well as test of both ports behaviour with different tokens. ref https://github.com/neondatabase/neon/issues/4730
This commit is contained in:
@@ -289,7 +289,7 @@ impl Endpoint {
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| format!("localhost:{}", sk.pg_port))
|
||||
.map(|sk| format!("localhost:{}", sk.get_compute_port()))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
conf.append("neon.safekeepers", &safekeepers);
|
||||
@@ -318,7 +318,7 @@ impl Endpoint {
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|x| x.pg_port.to_string())
|
||||
.map(|x| x.get_compute_port().to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
|
||||
@@ -463,7 +463,7 @@ impl Endpoint {
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.pg_port));
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -137,6 +137,7 @@ impl Default for PageServerConf {
|
||||
pub struct SafekeeperConf {
|
||||
pub id: NodeId,
|
||||
pub pg_port: u16,
|
||||
pub pg_tenant_only_port: Option<u16>,
|
||||
pub http_port: u16,
|
||||
pub sync: bool,
|
||||
pub remote_storage: Option<String>,
|
||||
@@ -149,6 +150,7 @@ impl Default for SafekeeperConf {
|
||||
Self {
|
||||
id: NodeId(0),
|
||||
pg_port: 0,
|
||||
pg_tenant_only_port: None,
|
||||
http_port: 0,
|
||||
sync: true,
|
||||
remote_storage: None,
|
||||
@@ -158,6 +160,14 @@ impl Default for SafekeeperConf {
|
||||
}
|
||||
}
|
||||
|
||||
impl SafekeeperConf {
|
||||
/// Compute is served by port on which only tenant scoped tokens allowed, if
|
||||
/// it is configured.
|
||||
pub fn get_compute_port(&self) -> u16 {
|
||||
self.pg_tenant_only_port.unwrap_or(self.pg_port)
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalEnv {
|
||||
pub fn pg_distrib_dir_raw(&self) -> PathBuf {
|
||||
self.pg_distrib_dir.clone()
|
||||
|
||||
@@ -120,45 +120,55 @@ impl SafekeeperNode {
|
||||
let availability_zone = format!("sk-{}", id_string);
|
||||
|
||||
let mut args = vec![
|
||||
"-D",
|
||||
datadir.to_str().with_context(|| {
|
||||
format!("Datadir path {datadir:?} cannot be represented as a unicode string")
|
||||
})?,
|
||||
"--id",
|
||||
&id_string,
|
||||
"--listen-pg",
|
||||
&listen_pg,
|
||||
"--listen-http",
|
||||
&listen_http,
|
||||
"--availability-zone",
|
||||
&availability_zone,
|
||||
"-D".to_owned(),
|
||||
datadir
|
||||
.to_str()
|
||||
.with_context(|| {
|
||||
format!("Datadir path {datadir:?} cannot be represented as a unicode string")
|
||||
})?
|
||||
.to_owned(),
|
||||
"--id".to_owned(),
|
||||
id_string,
|
||||
"--listen-pg".to_owned(),
|
||||
listen_pg,
|
||||
"--listen-http".to_owned(),
|
||||
listen_http,
|
||||
"--availability-zone".to_owned(),
|
||||
availability_zone,
|
||||
];
|
||||
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
|
||||
let listen_pg_tenant_only = format!("127.0.0.1:{}", pg_tenant_only_port);
|
||||
args.extend(["--listen-pg-tenant-only".to_owned(), listen_pg_tenant_only]);
|
||||
}
|
||||
if !self.conf.sync {
|
||||
args.push("--no-sync");
|
||||
args.push("--no-sync".to_owned());
|
||||
}
|
||||
|
||||
let broker_endpoint = format!("{}", self.env.broker.client_url());
|
||||
args.extend(["--broker-endpoint", &broker_endpoint]);
|
||||
args.extend(["--broker-endpoint".to_owned(), broker_endpoint]);
|
||||
|
||||
let mut backup_threads = String::new();
|
||||
if let Some(threads) = self.conf.backup_threads {
|
||||
backup_threads = threads.to_string();
|
||||
args.extend(["--backup-threads", &backup_threads]);
|
||||
args.extend(["--backup-threads".to_owned(), backup_threads]);
|
||||
} else {
|
||||
drop(backup_threads);
|
||||
}
|
||||
|
||||
if let Some(ref remote_storage) = self.conf.remote_storage {
|
||||
args.extend(["--remote-storage", remote_storage]);
|
||||
args.extend(["--remote-storage".to_owned(), remote_storage.clone()]);
|
||||
}
|
||||
|
||||
let key_path = self.env.base_data_dir.join("auth_public_key.pem");
|
||||
if self.conf.auth_enabled {
|
||||
args.extend([
|
||||
"--auth-validation-public-key-path",
|
||||
key_path.to_str().with_context(|| {
|
||||
format!("Key path {key_path:?} cannot be represented as a unicode string")
|
||||
})?,
|
||||
"--auth-validation-public-key-path".to_owned(),
|
||||
key_path
|
||||
.to_str()
|
||||
.with_context(|| {
|
||||
format!("Key path {key_path:?} cannot be represented as a unicode string")
|
||||
})?
|
||||
.to_owned(),
|
||||
]);
|
||||
}
|
||||
|
||||
|
||||
@@ -459,6 +459,7 @@ class AuthKeys:
|
||||
def generate_safekeeper_token(self) -> str:
|
||||
return self.generate_token(scope="safekeeperdata")
|
||||
|
||||
# generate token giving access to only one tenant
|
||||
def generate_tenant_token(self, tenant_id: TenantId) -> str:
|
||||
return self.generate_token(scope="tenant", tenant_id=str(tenant_id))
|
||||
|
||||
@@ -965,6 +966,7 @@ class NeonEnv:
|
||||
for i in range(1, config.num_safekeepers + 1):
|
||||
port = SafekeeperPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
pg_tenant_only=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
id = config.safekeepers_id_start + i # assign ids sequentially
|
||||
@@ -973,6 +975,7 @@ class NeonEnv:
|
||||
[[safekeepers]]
|
||||
id = {id}
|
||||
pg_port = {port.pg}
|
||||
pg_tenant_only_port = {port.pg_tenant_only}
|
||||
http_port = {port.http}
|
||||
sync = {'true' if config.safekeepers_enable_fsync else 'false'}"""
|
||||
)
|
||||
@@ -2608,6 +2611,7 @@ class EndpointFactory:
|
||||
@dataclass
|
||||
class SafekeeperPort:
|
||||
pg: int
|
||||
pg_tenant_only: int
|
||||
http: int
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -866,6 +867,41 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
assert debug_dump_1["config"]["id"] == env.safekeepers[0].id
|
||||
|
||||
|
||||
# Test auth on WAL service (postgres protocol) ports.
|
||||
def test_sk_auth(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_sk_auth")
|
||||
endpoint = env.endpoints.create_start("test_sk_auth")
|
||||
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# learn neon timeline from compute
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
tenant_token = env.auth_keys.generate_tenant_token(tenant_id)
|
||||
full_token = env.auth_keys.generate_safekeeper_token()
|
||||
|
||||
conn_opts = {
|
||||
"host": "127.0.0.1",
|
||||
"options": f"-c timeline_id={timeline_id} tenant_id={tenant_id}",
|
||||
}
|
||||
connector = PgProtocol(**conn_opts)
|
||||
# no password, should fail
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg)
|
||||
# giving password, should be ok with either token on main pg port
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=tenant_token)
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=full_token)
|
||||
# on tenant only port tenant only token should work
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=tenant_token)
|
||||
# but full token should fail
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=full_token)
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(
|
||||
self,
|
||||
@@ -912,6 +948,7 @@ class SafekeeperEnv:
|
||||
def start_safekeeper(self, i):
|
||||
port = SafekeeperPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
pg_tenant_only=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user