Compare commits

..

18 Commits

Author SHA1 Message Date
Bojan Serafimov
13cddbb10d WIP 2022-06-15 18:19:21 -04:00
Anastasia Lubennikova
d11c9f9fcb Use random ports for the proxy and local pg in tests
Fixes #1931
Author: Dmitry Ivanov
2022-06-15 20:21:58 +03:00
Kirill Bulatov
d8a37452c8 Rename ZenithFeedback (#1912) 2022-06-11 00:44:05 +03:00
chaitanya sharma
e1336f451d renamed .zenith data-dir to .neon. 2022-06-09 18:19:18 +02:00
Arseny Sher
a4d8261390 Save Postgres log in test_find_end_of_wal_* tests. 2022-06-09 19:16:43 +04:00
Egor Suvorov
e2a5a31595 Safekeeper HTTP router: add comment about /v1/timeline 2022-06-09 17:14:46 +02:00
Egor Suvorov
0ac0fba77a test_runner: test Safekeeper HTTP API Auth
All endpoints except for POST /v1/timeline are tested, this one is not tested in any way yet.
Three attempts for each endpoint: correctly authenticated, badly authenticated, unauthenticated.
2022-06-09 17:14:46 +02:00
Egor Suvorov
a001052cdd test_runner: SafekeeperHttpClient: support auth 2022-06-09 17:14:46 +02:00
Egor Suvorov
1f1d852204 ZenithEnvBuilder: rename pageserver_auth_enabled --> auth_enabled 2022-06-09 17:14:46 +02:00
Egor Suvorov
f7b878611a Implement JWT authentication in Safekeeper HTTP API (#1753)
* `control_plane` crate (used by `neon_local`) now parses an `auth_enabled` bool for each Safekeeper
* If auth is enabled, a Safekeeper is passed a path to a public key via a new command line argument
* Added TODO comments to other places needing auth
2022-06-09 17:14:46 +02:00
Arseny Sher
a51b2dac9a Don't s3 offload from newly joined safekeeper not having required WAL.
I made the check at launcher level with the perspective of generally moving
election (decision who offloads) there.

Also log timeline 'active' changes.
2022-06-09 18:30:16 +04:00
Thang Pham
e22d9cee3a fix ZeroDivisionError in scripts/generate_perf_report_page (#1906)
Fixes the `ZeroDivisionError` error by adding `EPS=1e-6` when doing the calculation.
2022-06-08 09:15:12 -04:00
Arthur Petukhovsky
a01999bc4a Replace most common remote logs with metrics (#1909) 2022-06-08 13:36:49 +03:00
chaitanya sharma
32e64afd54 Use better parallel build instructions in readme.md (#1908) 2022-06-08 11:25:37 +03:00
Kirill Bulatov
8a53472e4f Force etcd broker keys to not to intersect 2022-06-08 11:21:05 +03:00
Dmitry Rodionov
6e26588d17 Allow to customize shutdown condition in PostgresBackend
Use it in PageServerHandler to check per thread shutdown condition
from thread_mgr which takes into account tenants and timelines
2022-06-07 22:11:54 +03:00
Arseny Sher
0b93253b3c Fix leaked keepalive task in s3 offloading leader election.
I still don't like the surroundings and feel we'd better get away without using
election API at all, but this is a quick fix to keep CI green.

ref #1815
2022-06-07 15:17:57 +04:00
Dmitry Rodionov
7dc6beacbd make it possible to associate thread with a tenant after thread start 2022-06-07 12:59:35 +03:00
48 changed files with 1083 additions and 727 deletions

View File

@@ -57,7 +57,7 @@
args:
creates: "/storage/pageserver/data/tenants"
environment:
ZENITH_REPO_DIR: "/storage/pageserver/data"
NEON_REPO_DIR: "/storage/pageserver/data"
LD_LIBRARY_PATH: "/usr/local/lib"
become: true
tags:
@@ -131,7 +131,7 @@
args:
creates: "/storage/safekeeper/data/safekeeper.id"
environment:
ZENITH_REPO_DIR: "/storage/safekeeper/data"
NEON_REPO_DIR: "/storage/safekeeper/data"
LD_LIBRARY_PATH: "/usr/local/lib"
become: true
tags:

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -9,8 +9,8 @@ tmp_install
tmp_check_cli
test_output
.vscode
.zenith
integration_tests/.zenith
.neon
integration_tests/.neon
.mypy_cache
Dockerfile

4
.gitignore vendored
View File

@@ -6,8 +6,8 @@ __pycache__/
test_output/
.vscode
.idea
/.zenith
/integration_tests/.zenith
/.neon
/integration_tests/.neon
# Coverage
*.profraw

View File

@@ -6,5 +6,5 @@ target/
tmp_install/
__pycache__/
test_output/
.zenith/
.neon/
.git/

View File

@@ -80,7 +80,7 @@ brew link --force libpq
```sh
git clone --recursive https://github.com/neondatabase/neon.git
cd neon
make -j5
make -j`nproc`
```
#### dependency installation notes
@@ -93,7 +93,7 @@ Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (r
#### running neon database
1. Start pageserver and postgres on top of it (should be called from repo root):
```sh
# Create repository in .zenith with proper paths to binaries and data
# Create repository in .neon with proper paths to binaries and data
# Later that would be responsibility of a package install script
> ./target/debug/neon_local init
initializing tenantid 9ef87a5bf0d92544f6fafeeb3239695c
@@ -103,16 +103,16 @@ pageserver init succeeded
# start pageserver and safekeeper
> ./target/debug/neon_local start
Starting pageserver at '127.0.0.1:64000' in '.zenith'
Starting pageserver at '127.0.0.1:64000' in '.neon'
Pageserver started
initializing for sk 1 for 7676
Starting safekeeper at '127.0.0.1:5454' in '.zenith/safekeepers/sk1'
Starting safekeeper at '127.0.0.1:5454' in '.neon/safekeepers/sk1'
Safekeeper started
# start postgres compute node
> ./target/debug/neon_local pg start main
Starting new postgres main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
# check list of running postgres instances
@@ -149,7 +149,7 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
# start postgres on that branch
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
# check the new list of running postgres instances

View File

@@ -21,9 +21,9 @@ use utils::{
use crate::safekeeper::SafekeeperNode;
//
// This data structures represents zenith CLI config
// This data structures represents neon_local CLI config
//
// It is deserialized from the .zenith/config file, or the config file passed
// It is deserialized from the .neon/config file, or the config file passed
// to 'zenith init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
@@ -34,8 +34,8 @@ pub struct LocalEnv {
// compute nodes).
//
// This is not stored in the config file. Rather, this is the path where the
// config file itself is. It is read from the ZENITH_REPO_DIR env variable or
// '.zenith' if not given.
// config file itself is. It is read from the NEON_REPO_DIR env variable or
// '.neon' if not given.
#[serde(skip)]
pub base_data_dir: PathBuf,
@@ -177,6 +177,7 @@ pub struct SafekeeperConf {
pub sync: bool,
pub remote_storage: Option<String>,
pub backup_threads: Option<u32>,
pub auth_enabled: bool,
}
impl Default for SafekeeperConf {
@@ -188,6 +189,7 @@ impl Default for SafekeeperConf {
sync: true,
remote_storage: None,
backup_threads: None,
auth_enabled: false,
}
}
}
@@ -337,7 +339,7 @@ impl LocalEnv {
pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
// Currently, the user first passes a config file with 'zenith init --config=<path>'
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
// to .zenith/config. TODO: We lose any formatting and comments along the way, which is
// to .neon/config. TODO: We lose any formatting and comments along the way, which is
// a bit sad.
let mut conf_content = r#"# This file describes a locale deployment of the page server
# and safekeeeper node. It is read by the 'zenith' command-line
@@ -481,9 +483,9 @@ impl LocalEnv {
}
fn base_path() -> PathBuf {
match std::env::var_os("ZENITH_REPO_DIR") {
match std::env::var_os("NEON_REPO_DIR") {
Some(val) => PathBuf::from(val),
None => PathBuf::from(".zenith"),
None => PathBuf::from(".neon"),
}
}

View File

@@ -149,6 +149,11 @@ impl SafekeeperNode {
if let Some(ref remote_storage) = self.conf.remote_storage {
cmd.args(&["--remote-storage", remote_storage]);
}
if self.conf.auth_enabled {
cmd.arg("--auth-validation-public-key-path");
// PathBuf is better be passed as is, not via `String`.
cmd.arg(self.env.base_data_dir.join("auth_public_key.pem"));
}
fill_aws_secrets_vars(&mut cmd);

View File

@@ -36,12 +36,12 @@ This is how the `LOGICAL_TIMELINE_SIZE` metric is implemented in the pageserver.
Alternatively, we could count only relation data. As in pg_database_size().
This approach is somewhat more user-friendly because it is the data that is really affected by the user.
On the other hand, it puts us in a weaker position than other services, i.e., RDS.
We will need to refactor the timeline_size counter or add another counter to implement it.
We will need to refactor the timeline_size counter or add another counter to implement it.
Timeline size is updated during wal digestion. It is not versioned and is valid at the last_received_lsn moment.
Then this size should be reported to compute node.
`current_timeline_size` value is included in the walreceiver's custom feedback message: `ZenithFeedback.`
`current_timeline_size` value is included in the walreceiver's custom feedback message: `ReplicationFeedback.`
(PR about protocol changes https://github.com/zenithdb/zenith/pull/1037).
@@ -64,11 +64,11 @@ We should warn users if the limit is soon to be reached.
### **Reliability, failure modes and corner cases**
1. `current_timeline_size` is valid at the last received and digested by pageserver lsn.
If pageserver lags behind compute node, `current_timeline_size` will lag too. This lag can be tuned using backpressure, but it is not expected to be 0 all the time.
So transactions that happen in this lsn range may cause limit overflow. Especially operations that generate (i.e., CREATE DATABASE) or free (i.e., TRUNCATE) a lot of data pages while generating a small amount of WAL. Are there other operations like this?
Currently, CREATE DATABASE operations are restricted in the console. So this is not an issue.

View File

@@ -154,7 +154,7 @@ The default distrib dir is `./tmp_install/`.
#### workdir (-D)
A directory in the file system, where pageserver will store its files.
The default is `./.zenith/`.
The default is `./.neon/`.
This parameter has a special CLI alias (`-D`) and can not be overridden with regular `-c` way.

View File

@@ -1,91 +1,43 @@
//! A set of primitives to access a shared data/updates, propagated via etcd broker (not persistent).
//! Intended to connect services to each other, not to store their data.
/// All broker keys, that are used when dealing with etcd.
pub mod subscription_key;
/// All broker values, possible to use when dealing with etcd.
pub mod subscription_value;
use std::{
collections::{hash_map, HashMap},
fmt::Display,
str::FromStr,
};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
pub use etcd_client::*;
use serde::de::DeserializeOwned;
use subscription_key::SubscriptionKey;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use utils::{
lsn::Lsn,
zid::{NodeId, ZTenantId, ZTenantTimelineId},
};
use utils::zid::{NodeId, ZTenantTimelineId};
use crate::subscription_key::SubscriptionFullKey;
pub use etcd_client::*;
/// Default value to use for prefixing to all etcd keys with.
/// This way allows isolating safekeeper/pageserver groups in the same etcd cluster.
pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
#[derive(Debug, Deserialize, Serialize)]
struct SafekeeperTimeline {
safekeeper_id: NodeId,
info: SkTimelineInfo,
}
/// Published data about safekeeper's timeline. Fields made optional for easy migrations.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub backup_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub safekeeper_connstr: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
EtcdClient(etcd_client::Error, String),
#[error("Error during parsing etcd key: {0}")]
InvalidKey(String),
#[error("Error during parsing etcd value: {0}")]
ParsingError(String),
#[error("Internal error: {0}")]
InternalError(String),
}
/// A way to control the data retrieval from a certain subscription.
pub struct SkTimelineSubscription {
safekeeper_timeline_updates:
mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, SkTimelineInfo>>>,
kind: SkTimelineSubscriptionKind,
pub struct BrokerSubscription<V> {
value_updates: mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>>,
key: SubscriptionKey,
watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
}
impl SkTimelineSubscription {
impl<V> BrokerSubscription<V> {
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
pub async fn fetch_data(
&mut self,
) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, SkTimelineInfo>>> {
self.safekeeper_timeline_updates.recv().await
pub async fn fetch_data(&mut self) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>> {
self.value_updates.recv().await
}
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
@@ -93,117 +45,90 @@ impl SkTimelineSubscription {
self.watcher.cancel().await.map_err(|e| {
BrokerError::EtcdClient(
e,
format!(
"Failed to cancel timeline subscription, kind: {:?}",
self.kind
),
format!("Failed to cancel broker subscription, kind: {:?}", self.key),
)
})?;
self.watcher_handle.await.map_err(|e| {
BrokerError::InternalError(format!(
"Failed to join the timeline updates task, kind: {:?}, error: {e}",
self.kind
"Failed to join the broker value updates task, kind: {:?}, error: {e}",
self.key
))
})?
}
}
/// The subscription kind to the timeline updates from safekeeper.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SkTimelineSubscriptionKind {
broker_etcd_prefix: String,
kind: SubscriptionKind,
}
impl SkTimelineSubscriptionKind {
pub fn all(broker_etcd_prefix: String) -> Self {
Self {
broker_etcd_prefix,
kind: SubscriptionKind::All,
}
}
pub fn tenant(broker_etcd_prefix: String, tenant: ZTenantId) -> Self {
Self {
broker_etcd_prefix,
kind: SubscriptionKind::Tenant(tenant),
}
}
pub fn timeline(broker_etcd_prefix: String, timeline: ZTenantTimelineId) -> Self {
Self {
broker_etcd_prefix,
kind: SubscriptionKind::Timeline(timeline),
}
}
/// Etcd key to use for watching a certain timeline updates from safekeepers.
pub fn watch_key(&self) -> String {
match self.kind {
SubscriptionKind::All => self.broker_etcd_prefix.to_string(),
SubscriptionKind::Tenant(tenant_id) => {
format!("{}/{tenant_id}/safekeeper", self.broker_etcd_prefix)
}
SubscriptionKind::Timeline(ZTenantTimelineId {
tenant_id,
timeline_id,
}) => format!(
"{}/{tenant_id}/{timeline_id}/safekeeper",
self.broker_etcd_prefix
),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum SubscriptionKind {
/// Get every timeline update.
All,
/// Get certain tenant timelines' updates.
Tenant(ZTenantId),
/// Get certain timeline updates.
Timeline(ZTenantTimelineId),
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
EtcdClient(etcd_client::Error, String),
#[error("Error during parsing etcd key: {0}")]
KeyNotParsed(String),
#[error("Internal error: {0}")]
InternalError(String),
}
/// Creates a background task to poll etcd for timeline updates from safekeepers.
/// Stops and returns `Err` on any error during etcd communication.
/// Watches the key changes until either the watcher is cancelled via etcd or the subscription cancellation handle,
/// exiting normally in such cases.
pub async fn subscribe_to_safekeeper_timeline_updates(
/// Etcd values are parsed as json fukes into a type, specified in the generic patameter.
pub async fn subscribe_for_json_values<V>(
client: &mut Client,
subscription: SkTimelineSubscriptionKind,
) -> Result<SkTimelineSubscription, BrokerError> {
info!("Subscribing to timeline updates, subscription kind: {subscription:?}");
let kind = subscription.clone();
key: SubscriptionKey,
) -> Result<BrokerSubscription<V>, BrokerError>
where
V: DeserializeOwned + Send + 'static,
{
subscribe_for_values(client, key, |_, value_str| {
match serde_json::from_str::<V>(value_str) {
Ok(value) => Some(value),
Err(e) => {
error!("Failed to parse value str '{value_str}': {e}");
None
}
}
})
.await
}
/// Same as [`subscribe_for_json_values`], but allows to specify a custom parser of a etcd value string.
pub async fn subscribe_for_values<P, V>(
client: &mut Client,
key: SubscriptionKey,
value_parser: P,
) -> Result<BrokerSubscription<V>, BrokerError>
where
V: Send + 'static,
P: Fn(SubscriptionFullKey, &str) -> Option<V> + Send + 'static,
{
info!("Subscribing to broker value updates, key: {key:?}");
let subscription_key = key.clone();
let (watcher, mut stream) = client
.watch(
subscription.watch_key(),
Some(WatchOptions::new().with_prefix()),
)
.watch(key.watch_key(), Some(WatchOptions::new().with_prefix()))
.await
.map_err(|e| {
BrokerError::EtcdClient(
e,
format!("Failed to init the watch for subscription {subscription:?}"),
format!("Failed to init the watch for subscription {key:?}"),
)
})?;
let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel();
let (value_updates_sender, value_updates_receiver) = mpsc::unbounded_channel();
let watcher_handle = tokio::spawn(async move {
while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!(
"Failed to get messages from the subscription stream, kind: {:?}, error: {e}", subscription.kind
"Failed to get messages from the subscription stream, kind: {:?}, error: {e}", key.kind
)))? {
if resp.canceled() {
info!("Watch for timeline updates subscription was canceled, exiting");
break;
}
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, SkTimelineInfo>> = HashMap::new();
let mut value_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, V>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
let mut timeline_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let mut value_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let events = resp.events();
@@ -213,182 +138,78 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
if EventType::Put == event.event_type() {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
let (key_str, value_str) = match extract_key_value_str(new_etcd_kv) {
Ok(strs) => strs,
Err(e) => {
error!("Failed to represent etcd KV {new_etcd_kv:?} as pair of str: {e}");
continue;
},
};
match parse_safekeeper_timeline(&subscription, key_str, value_str) {
Ok((zttid, timeline)) => {
match timeline_updates
.entry(zttid)
.or_default()
.entry(timeline.safekeeper_id)
{
hash_map::Entry::Occupied(mut o) => {
let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
} else {
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) {
Ok(Some((key, value))) => match value_updates
.entry(key.id)
.or_default()
.entry(key.node_id)
{
hash_map::Entry::Occupied(mut o) => {
let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
} else {
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
}
}
}
hash_map::Entry::Vacant(v) => {
v.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
}
}
}
// it is normal to get other keys when we subscribe to everything
Err(BrokerError::InvalidKey(e)) => debug!("Unexpected key for timeline update: {e}"),
Err(e) => error!("Failed to parse timeline update: {e}"),
hash_map::Entry::Vacant(v) => {
v.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
}
},
Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"),
Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"),
Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"),
};
}
}
}
if let Err(e) = timeline_updates_sender.send(timeline_updates) {
info!("Timeline updates sender got dropped, exiting: {e}");
break;
if !value_updates.is_empty() {
if let Err(e) = value_updates_sender.send(value_updates) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
}
}
}
Ok(())
}.instrument(info_span!("etcd_broker")));
Ok(SkTimelineSubscription {
kind,
safekeeper_timeline_updates,
Ok(BrokerSubscription {
key: subscription_key,
value_updates: value_updates_receiver,
watcher_handle,
watcher,
})
}
fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> {
let key = kv.key_str().map_err(|e| {
fn parse_etcd_kv<P, V>(
kv: &KeyValue,
value_parser: &P,
cluster_prefix: &str,
) -> Result<Option<(SubscriptionFullKey, V)>, BrokerError>
where
P: Fn(SubscriptionFullKey, &str) -> Option<V>,
{
let key_str = kv.key_str().map_err(|e| {
BrokerError::EtcdClient(e, "Failed to extract key str out of etcd KV".to_string())
})?;
let value = kv.value_str().map_err(|e| {
let value_str = kv.value_str().map_err(|e| {
BrokerError::EtcdClient(e, "Failed to extract value str out of etcd KV".to_string())
})?;
Ok((key, value))
}
static SK_TIMELINE_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$")
.expect("wrong regex for safekeeper timeline etcd key")
});
fn parse_safekeeper_timeline(
subscription: &SkTimelineSubscriptionKind,
key_str: &str,
value_str: &str,
) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> {
let broker_prefix = subscription.broker_etcd_prefix.as_str();
if !key_str.starts_with(broker_prefix) {
return Err(BrokerError::InvalidKey(format!(
"KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}"
if !key_str.starts_with(cluster_prefix) {
return Err(BrokerError::KeyNotParsed(format!(
"KV has unexpected key '{key_str}' that does not start with cluster prefix {cluster_prefix}"
)));
}
let key_part = &key_str[broker_prefix.len()..];
let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) {
Some(captures) => captures,
None => {
return Err(BrokerError::InvalidKey(format!(
"KV has unexpected key part '{key_part}' that does not match required regex {}",
SK_TIMELINE_KEY_REGEX.as_str()
)));
}
};
let info = serde_json::from_str(value_str).map_err(|e| {
BrokerError::ParsingError(format!(
"Failed to parse '{value_str}' as safekeeper timeline info: {e}"
))
let key = SubscriptionFullKey::from_str(&key_str[cluster_prefix.len()..]).map_err(|e| {
BrokerError::KeyNotParsed(format!("Failed to parse KV key '{key_str}': {e}"))
})?;
let zttid = ZTenantTimelineId::new(
parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?,
);
let safekeeper_id = NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?);
Ok((
zttid,
SafekeeperTimeline {
safekeeper_id,
info,
},
))
}
fn parse_capture<T>(caps: &Captures, index: usize) -> Result<T, String>
where
T: FromStr,
<T as FromStr>::Err: Display,
{
let capture_match = caps
.get(index)
.ok_or_else(|| format!("Failed to get capture match at index {index}"))?
.as_str();
capture_match.parse().map_err(|e| {
format!(
"Failed to parse {} from {capture_match}: {e}",
std::any::type_name::<T>()
)
})
}
#[cfg(test)]
mod tests {
use utils::zid::ZTimelineId;
use super::*;
#[test]
fn typical_etcd_prefix_should_be_parsed() {
let prefix = "neon";
let tenant_id = ZTenantId::generate();
let timeline_id = ZTimelineId::generate();
let all_subscription = SkTimelineSubscriptionKind {
broker_etcd_prefix: prefix.to_string(),
kind: SubscriptionKind::All,
};
let tenant_subscription = SkTimelineSubscriptionKind {
broker_etcd_prefix: prefix.to_string(),
kind: SubscriptionKind::Tenant(tenant_id),
};
let timeline_subscription = SkTimelineSubscriptionKind {
broker_etcd_prefix: prefix.to_string(),
kind: SubscriptionKind::Timeline(ZTenantTimelineId::new(tenant_id, timeline_id)),
};
let typical_etcd_kv_strs = [
(
format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/1"),
r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#,
),
(
format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/13"),
r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#,
),
];
for (key_string, value_str) in typical_etcd_kv_strs {
for subscription in [
&all_subscription,
&tenant_subscription,
&timeline_subscription,
] {
let (id, _timeline) =
parse_safekeeper_timeline(subscription, &key_string, value_str)
.unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}"));
assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id));
}
}
}
Ok(value_parser(key, value_str).map(|value| (key, value)))
}

View File

@@ -0,0 +1,310 @@
//! Etcd broker keys, used in the project and shared between instances.
//! The keys are split into two categories:
//!
//! * [`SubscriptionFullKey`] full key format: `<cluster_prefix>/<tenant>/<timeline>/<node_kind>/<operation>/<node_id>`
//! Always returned from etcd in this form, always start with the user key provided.
//!
//! * [`SubscriptionKey`] user input key format: always partial, since it's unknown which `node_id`'s are available.
//! Full key always starts with the user input one, due to etcd subscription properties.
use std::{fmt::Display, str::FromStr};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId};
/// The subscription kind to the timeline updates from safekeeper.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SubscriptionKey {
/// Generic cluster prefix, allowing to use the same etcd instance by multiple logic groups.
pub cluster_prefix: String,
/// The subscription kind.
pub kind: SubscriptionKind,
}
/// All currently possible key kinds of a etcd broker subscription.
/// Etcd works so, that every key that starts with the subbscription key given is considered matching and
/// returned as part of the subscrption.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SubscriptionKind {
/// Get every update in etcd.
All,
/// Get etcd updates for any timeiline of a certain tenant, affected by any operation from any node kind.
TenantTimelines(ZTenantId),
/// Get etcd updates for a certain timeline of a tenant, affected by any operation from any node kind.
Timeline(ZTenantTimelineId),
/// Get etcd timeline updates, specific to a certain node kind.
Node(ZTenantTimelineId, NodeKind),
/// Get etcd timeline updates for a certain operation on specific nodes.
Operation(ZTenantTimelineId, NodeKind, OperationKind),
}
/// All kinds of nodes, able to write into etcd.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum NodeKind {
Safekeeper,
Pageserver,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum OperationKind {
Safekeeper(SkOperationKind),
}
/// Current operations, running inside the safekeeper node.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SkOperationKind {
TimelineInfo,
WalBackup,
}
static SUBSCRIPTION_FULL_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/([^/]+)/([^/]+)/([[:digit:]]+)$")
.expect("wrong subscription full etcd key regex")
});
/// Full key, received from etcd during any of the component's work.
/// No other etcd keys are considered during system's work.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionFullKey {
pub id: ZTenantTimelineId,
pub node_kind: NodeKind,
pub operation: OperationKind,
pub node_id: NodeId,
}
impl SubscriptionKey {
/// Subscribes for all etcd updates.
pub fn all(cluster_prefix: String) -> Self {
SubscriptionKey {
cluster_prefix,
kind: SubscriptionKind::All,
}
}
/// Subscribes to a given timeline info updates from safekeepers.
pub fn sk_timeline_info(cluster_prefix: String, timeline: ZTenantTimelineId) -> Self {
Self {
cluster_prefix,
kind: SubscriptionKind::Operation(
timeline,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::TimelineInfo),
),
}
}
/// Subscribes to all timeine updates during specific operations, running on the corresponding nodes.
pub fn operation(
cluster_prefix: String,
timeline: ZTenantTimelineId,
node_kind: NodeKind,
operation: OperationKind,
) -> Self {
Self {
cluster_prefix,
kind: SubscriptionKind::Operation(timeline, node_kind, operation),
}
}
/// Etcd key to use for watching a certain timeline updates from safekeepers.
pub fn watch_key(&self) -> String {
let cluster_prefix = &self.cluster_prefix;
match self.kind {
SubscriptionKind::All => cluster_prefix.to_string(),
SubscriptionKind::TenantTimelines(tenant_id) => {
format!("{cluster_prefix}/{tenant_id}")
}
SubscriptionKind::Timeline(id) => {
format!("{cluster_prefix}/{id}")
}
SubscriptionKind::Node(id, node_kind) => {
format!("{cluster_prefix}/{id}/{node_kind}")
}
SubscriptionKind::Operation(id, node_kind, operation_kind) => {
format!("{cluster_prefix}/{id}/{node_kind}/{operation_kind}")
}
}
}
}
impl Display for OperationKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationKind::Safekeeper(o) => o.fmt(f),
}
}
}
impl FromStr for OperationKind {
type Err = String;
fn from_str(operation_kind_str: &str) -> Result<Self, Self::Err> {
match operation_kind_str {
"timeline_info" => Ok(OperationKind::Safekeeper(SkOperationKind::TimelineInfo)),
"wal_backup" => Ok(OperationKind::Safekeeper(SkOperationKind::WalBackup)),
_ => Err(format!("Unknown operation kind: {operation_kind_str}")),
}
}
}
impl Display for SubscriptionFullKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
id,
node_kind,
operation,
node_id,
} = self;
write!(f, "{id}/{node_kind}/{operation}/{node_id}")
}
}
impl FromStr for SubscriptionFullKey {
type Err = String;
fn from_str(subscription_kind_str: &str) -> Result<Self, Self::Err> {
let key_captures = match SUBSCRIPTION_FULL_KEY_REGEX.captures(subscription_kind_str) {
Some(captures) => captures,
None => {
return Err(format!(
"Subscription kind str does not match a subscription full key regex {}",
SUBSCRIPTION_FULL_KEY_REGEX.as_str()
));
}
};
Ok(Self {
id: ZTenantTimelineId::new(
parse_capture(&key_captures, 1)?,
parse_capture(&key_captures, 2)?,
),
node_kind: parse_capture(&key_captures, 3)?,
operation: parse_capture(&key_captures, 4)?,
node_id: NodeId(parse_capture(&key_captures, 5)?),
})
}
}
fn parse_capture<T>(caps: &Captures, index: usize) -> Result<T, String>
where
T: FromStr,
<T as FromStr>::Err: Display,
{
let capture_match = caps
.get(index)
.ok_or_else(|| format!("Failed to get capture match at index {index}"))?
.as_str();
capture_match.parse().map_err(|e| {
format!(
"Failed to parse {} from {capture_match}: {e}",
std::any::type_name::<T>()
)
})
}
impl Display for NodeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Safekeeper => write!(f, "safekeeper"),
Self::Pageserver => write!(f, "pageserver"),
}
}
}
impl FromStr for NodeKind {
type Err = String;
fn from_str(node_kind_str: &str) -> Result<Self, Self::Err> {
match node_kind_str {
"safekeeper" => Ok(Self::Safekeeper),
"pageserver" => Ok(Self::Pageserver),
_ => Err(format!("Invalid node kind: {node_kind_str}")),
}
}
}
impl Display for SkOperationKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimelineInfo => write!(f, "timeline_info"),
Self::WalBackup => write!(f, "wal_backup"),
}
}
}
impl FromStr for SkOperationKind {
type Err = String;
fn from_str(operation_str: &str) -> Result<Self, Self::Err> {
match operation_str {
"timeline_info" => Ok(Self::TimelineInfo),
"wal_backup" => Ok(Self::WalBackup),
_ => Err(format!("Invalid operation: {operation_str}")),
}
}
}
#[cfg(test)]
mod tests {
use utils::zid::ZTimelineId;
use super::*;
#[test]
fn full_cluster_key_parsing() {
let prefix = "neon";
let node_kind = NodeKind::Safekeeper;
let operation_kind = OperationKind::Safekeeper(SkOperationKind::WalBackup);
let tenant_id = ZTenantId::generate();
let timeline_id = ZTimelineId::generate();
let id = ZTenantTimelineId::new(tenant_id, timeline_id);
let node_id = NodeId(1);
let timeline_subscription_keys = [
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::All,
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::TenantTimelines(tenant_id),
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::Timeline(id),
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::Node(id, node_kind),
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::Operation(id, node_kind, operation_kind),
},
];
let full_key_string = format!(
"{}/{node_id}",
timeline_subscription_keys.last().unwrap().watch_key()
);
for key in timeline_subscription_keys {
assert!(full_key_string.starts_with(&key.watch_key()), "Full key '{full_key_string}' should start with any of the keys, keys, but {key:?} did not match");
}
let full_key = SubscriptionFullKey::from_str(&full_key_string).unwrap_or_else(|e| {
panic!("Failed to parse {full_key_string} as a subscription full key: {e}")
});
assert_eq!(
full_key,
SubscriptionFullKey {
id,
node_kind,
operation: operation_kind,
node_id
}
)
}
}

View File

@@ -0,0 +1,35 @@
//! Module for the values to put into etcd.
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::lsn::Lsn;
/// Data about safekeeper's timeline. Fields made optional for easy migrations.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub backup_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,
}

View File

@@ -4,6 +4,7 @@ use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use std::cmp::Ordering;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Instant;
@@ -69,6 +70,12 @@ impl Conf {
pub fn start_server(&self) -> Result<PostgresServer> {
info!("Starting Postgres server in {:?}", self.datadir);
let log_file = fs::File::create(self.datadir.join("pg.log")).with_context(|| {
format!(
"Failed to create pg.log file in directory {}",
self.datadir.display()
)
})?;
let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
let unix_socket_dir_path = unix_socket_dir.path().to_owned();
let server_process = self
@@ -84,7 +91,7 @@ impl Conf {
// Disable background processes as much as possible
.args(&["-c", "wal_writer_delay=10s"])
.args(&["-c", "autovacuum=off"])
.stderr(Stdio::null())
.stderr(Stdio::from(log_file))
.spawn()?;
let server = PostgresServer {
process: server_process,

View File

@@ -13,13 +13,10 @@ use std::fmt;
use std::io::{self, Write};
use std::net::{Shutdown, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
static PGBACKEND_SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
pub trait Handler {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
@@ -45,6 +42,10 @@ pub trait Handler {
fn check_auth_jwt(&mut self, _pgb: &mut PostgresBackend, _jwt_response: &[u8]) -> Result<()> {
bail!("JWT auth failed")
}
fn is_shutdown_requested(&self) -> bool {
false
}
}
/// PostgresBackend protocol state.
@@ -274,7 +275,7 @@ impl PostgresBackend {
let mut unnamed_query_string = Bytes::new();
while !PGBACKEND_SHUTDOWN_REQUESTED.load(Ordering::Relaxed) {
while !handler.is_shutdown_requested() {
match self.read_message() {
Ok(message) => {
if let Some(msg) = message {
@@ -493,8 +494,3 @@ impl PostgresBackend {
Ok(ProcessMsgResult::Continue)
}
}
// Set the flag to inform connections to cancel
pub fn set_pgbackend_shutdown_requested() {
PGBACKEND_SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
}

View File

@@ -926,10 +926,10 @@ impl<'a> BeMessage<'a> {
}
}
// Zenith extension of postgres replication protocol
// See ZENITH_STATUS_UPDATE_TAG_BYTE
// Neon extension of postgres replication protocol
// See NEON_STATUS_UPDATE_TAG_BYTE
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct ZenithFeedback {
pub struct ReplicationFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
// Parts of StandbyStatusUpdate we resend to compute via safekeeper
@@ -939,13 +939,13 @@ pub struct ZenithFeedback {
pub ps_replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback.
// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const ZENITH_FEEDBACK_FIELDS_NUMBER: u8 = 5;
pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl ZenithFeedback {
pub fn empty() -> ZenithFeedback {
ZenithFeedback {
impl ReplicationFeedback {
pub fn empty() -> ReplicationFeedback {
ReplicationFeedback {
current_timeline_size: 0,
ps_writelsn: 0,
ps_applylsn: 0,
@@ -954,7 +954,7 @@ impl ZenithFeedback {
}
}
// Serialize ZenithFeedback using custom format
// Serialize ReplicationFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
@@ -965,7 +965,7 @@ impl ZenithFeedback {
// uint32 - value length in bytes
// value itself
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u8(ZENITH_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys
write_cstr(&Bytes::from("current_timeline_size"), buf)?;
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
@@ -992,9 +992,9 @@ impl ZenithFeedback {
Ok(())
}
// Deserialize ZenithFeedback message
pub fn parse(mut buf: Bytes) -> ZenithFeedback {
let mut zf = ZenithFeedback::empty();
// Deserialize ReplicationFeedback message
pub fn parse(mut buf: Bytes) -> ReplicationFeedback {
let mut zf = ReplicationFeedback::empty();
let nfields = buf.get_u8();
let mut i = 0;
while i < nfields {
@@ -1035,14 +1035,14 @@ impl ZenithFeedback {
_ => {
let len = buf.get_i32();
warn!(
"ZenithFeedback parse. unknown key {} of len {}. Skip it.",
"ReplicationFeedback parse. unknown key {} of len {}. Skip it.",
key, len
);
buf.advance(len as usize);
}
}
}
trace!("ZenithFeedback parsed is {:?}", zf);
trace!("ReplicationFeedback parsed is {:?}", zf);
zf
}
}
@@ -1052,8 +1052,8 @@ mod tests {
use super::*;
#[test]
fn test_zenithfeedback_serialization() {
let mut zf = ZenithFeedback::empty();
fn test_replication_feedback_serialization() {
let mut zf = ReplicationFeedback::empty();
// Fill zf with some values
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
@@ -1062,13 +1062,13 @@ mod tests {
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();
let zf_parsed = ZenithFeedback::parse(data.freeze());
let zf_parsed = ReplicationFeedback::parse(data.freeze());
assert_eq!(zf, zf_parsed);
}
#[test]
fn test_zenithfeedback_unknown_key() {
let mut zf = ZenithFeedback::empty();
fn test_replication_feedback_unknown_key() {
let mut zf = ReplicationFeedback::empty();
// Fill zf with some values
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
@@ -1079,7 +1079,7 @@ mod tests {
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = ZENITH_FEEDBACK_FIELDS_NUMBER + 1;
*first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1;
}
write_cstr(&Bytes::from("new_field_one"), &mut data).unwrap();
@@ -1087,7 +1087,7 @@ mod tests {
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let zf_parsed = ZenithFeedback::parse(data.freeze());
let zf_parsed = ReplicationFeedback::parse(data.freeze());
assert_eq!(zf, zf_parsed);
}

View File

@@ -69,7 +69,7 @@ Repository
The repository stores all the page versions, or WAL records needed to
reconstruct them. Each tenant has a separate Repository, which is
stored in the .zenith/tenants/<tenantid> directory.
stored in the .neon/tenants/<tenantid> directory.
Repository is an abstract trait, defined in `repository.rs`. It is
implemented by the LayeredRepository object in
@@ -92,7 +92,7 @@ Each repository also has a WAL redo manager associated with it, see
records, whenever we need to reconstruct a page version from WAL to
satisfy a GetPage@LSN request, or to avoid accumulating too much WAL
for a page. The WAL redo manager uses a Postgres process running in
special zenith wal-redo mode to do the actual WAL redo, and
special Neon wal-redo mode to do the actual WAL redo, and
communicates with the process using a pipe.

View File

@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".neon"));
let workdir = workdir
.canonicalize()
.with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;

View File

@@ -22,8 +22,6 @@ use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
use postgres::CopyOutReader;
///
/// Import all relation data pages from local disk into the repository.
///
@@ -405,126 +403,3 @@ fn import_wal<R: Repository>(
Ok(())
}
pub fn import_timeline_from_tar<R: Repository>(
tline: &mut DatadirTimeline<R>,
copyreader: CopyOutReader,
lsn: Lsn,
) -> Result<()> {
let mut ar = tar::Archive::new(copyreader);
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
for e in ar.entries().unwrap() {
let mut entry = e.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
// read the whole entry
entry.read_to_end(&mut buffer).unwrap();
import_file(&mut modification, &file_path.as_ref(), &buffer)?;
}
tar::EntryType::Directory => {
println!("tar::EntryType::Directory {}", file_path.display());
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
}
Ok(())
}
pub fn import_file<R: Repository>(
modification: &mut DatadirModification<R>,
file_path: &Path,
buffer: &[u8],
) -> Result<()> {
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_control" => {
println!("pg_control file {}", file_path.display());
// Import it as ControlFile
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
}
"pg_filenode.map" => {
("pg_filenode.map file {}", file_path.display());
}
_ => {
println!("global relfile {}", file_path.display());
//TODO
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.skip(1)
.next()
.unwrap()
.to_string_lossy()
.parse()
.unwrap();
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_filenode.map" => {
println!(
"dbnode {} pg_filenode.map file {}",
dbnode,
file_path.display()
);
modification.put_relmap_file(
spcnode,
dbnode,
Bytes::copy_from_slice(&buffer[..]),
)?;
}
_ => {
println!("dbnode {} relfile {}", dbnode, file_path.display());
//TODO
}
}
} else if file_path.starts_with("pg_xact") {
println!(
"pg_xact {} ",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_multixact/offset") {
println!(
"pg_multixact/offset {}",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_multixact/members") {
println!(
"pg_multixact/members {}",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
println!(
"xid {} pg_twophase {}",
xid,
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
}
Ok(())
}

View File

@@ -4,7 +4,7 @@
//! The functions here are responsible for locating the correct layer for the
//! get/put call, tracing timeline branching history as needed.
//!
//! The files are stored in the .zenith/tenants/<tenantid>/timelines/<timelineid>
//! The files are stored in the .neon/tenants/<tenantid>/timelines/<timelineid>
//! directory. See layered_repository/README for how the files are managed.
//! In addition to the layer files, there is a metadata file in the same
//! directory that contains information about the timeline, in particular its
@@ -148,7 +148,7 @@ lazy_static! {
.expect("failed to define a metric");
}
/// Parts of the `.zenith/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
/// Parts of the `.neon/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///

View File

@@ -123,7 +123,7 @@ The files are called "layer files". Each layer file covers a range of keys, and
a range of LSNs (or a single LSN, in case of image layers). You can think of it
as a rectangle in the two-dimensional key-LSN space. The layer files for each
timeline are stored in the timeline's subdirectory under
`.zenith/tenants/<tenantid>/timelines`.
`.neon/tenants/<tenantid>/timelines`.
There are two kind of layer files: images, and delta layers. An image file
contains a snapshot of all keys at a particular LSN, whereas a delta file
@@ -178,7 +178,7 @@ version, and how branching and GC works is still valid.
The full path of a delta file looks like this:
```
.zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
.neon/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
```
For simplicity, the examples below use a simplified notation for the

View File

@@ -24,7 +24,6 @@ pub mod walredo;
use lazy_static::lazy_static;
use tracing::info;
use utils::postgres_backend;
use crate::thread_mgr::ThreadKind;
use metrics::{register_int_gauge_vec, IntGaugeVec};
@@ -73,7 +72,6 @@ pub fn shutdown_pageserver(exit_code: i32) {
thread_mgr::shutdown_threads(Some(ThreadKind::LibpqEndpointListener), None, None);
// Shut down any page service threads.
postgres_backend::set_pgbackend_shutdown_requested();
thread_mgr::shutdown_threads(Some(ThreadKind::PageRequestHandler), None, None);
// Shut down all the tenants. This flushes everything to disk and kills

View File

@@ -370,6 +370,10 @@ impl PageServerHandler {
) -> anyhow::Result<()> {
let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered();
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
thread_mgr::associate_with(Some(tenantid), Some(timelineid));
// Check that the timeline exists
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
@@ -672,6 +676,10 @@ impl postgres_backend::Handler for PageServerHandler {
Ok(())
}
fn is_shutdown_requested(&self) -> bool {
thread_mgr::is_shutdown_requested()
}
fn process_query(
&mut self,
pgb: &mut PostgresBackend,
@@ -802,7 +810,6 @@ impl postgres_backend::Handler for PageServerHandler {
.map(|h| h.as_str().parse())
.unwrap_or_else(|| Ok(repo.get_gc_horizon()))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timelineid), gc_horizon, pitr, true)?;

View File

@@ -197,7 +197,7 @@ impl Display for TimelineSyncStatusUpdate {
}
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
type Timeline: Timeline;

View File

@@ -186,8 +186,8 @@ use crate::{
};
use metrics::{
register_histogram_vec, register_int_counter, register_int_gauge, HistogramVec, IntCounter,
IntGauge,
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
HistogramVec, IntCounter, IntCounterVec, IntGauge,
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -208,14 +208,17 @@ lazy_static! {
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
"pageserver_remote_storage_image_sync_seconds",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by `operation_kind` (upload|download) and `status` (success|failure)",
&["operation_kind", "status"],
vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 12.5, 15.0, 17.5, 20.0
]
Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)",
&["tenant_id", "timeline_id", "operation_kind", "status"],
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0]
)
.expect("failed to register pageserver image sync time histogram vec");
static ref REMOTE_INDEX_UPLOAD: IntCounterVec = register_int_counter_vec!(
"pageserver_remote_storage_remote_index_uploads_total",
"Number of remote index uploads",
&["tenant_id", "timeline_id"],
)
.expect("failed to register pageserver remote index upload vec");
}
static SYNC_QUEUE: OnceCell<SyncQueue> = OnceCell::new();
@@ -1146,19 +1149,19 @@ where
.await
{
DownloadedTimeline::Abort => {
register_sync_status(sync_start, task_name, None);
register_sync_status(sync_id, sync_start, task_name, None);
if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) {
error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}");
}
}
DownloadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
DownloadedTimeline::Successful(mut download_data) => {
match update_local_metadata(conf, sync_id, current_remote_timeline).await {
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_start, task_name, Some(true));
register_sync_status(sync_id, sync_start, task_name, Some(true));
return Some(TimelineSyncStatusUpdate::Downloaded);
}
Err(e) => {
@@ -1169,7 +1172,7 @@ where
error!("Failed to update local timeline metadata: {e:?}");
download_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Download(download_data));
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
}
}
@@ -1265,14 +1268,14 @@ async fn delete_timeline_data<P, S>(
error!("Failed to update remote timeline {sync_id}: {e:?}");
new_delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(new_delete_data));
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
return;
}
}
timeline_delete.deletion_registered = true;
let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await;
register_sync_status(sync_start, task_name, Some(sync_status));
register_sync_status(sync_id, sync_start, task_name, Some(sync_status));
}
async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result<TimelineMetadata> {
@@ -1306,7 +1309,7 @@ async fn upload_timeline_data<P, S>(
.await
{
UploadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
return;
}
UploadedTimeline::Successful(upload_data) => upload_data,
@@ -1325,13 +1328,13 @@ async fn upload_timeline_data<P, S>(
.await
{
Ok(()) => {
register_sync_status(sync_start, task_name, Some(true));
register_sync_status(sync_id, sync_start, task_name, Some(true));
}
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
}
}
@@ -1421,7 +1424,14 @@ where
IndexPart::from_remote_timeline(&timeline_path, updated_remote_timeline)
.context("Failed to create an index part from the updated remote timeline")?;
info!("Uploading remote index for the timeline");
debug!("Uploading remote index for the timeline");
REMOTE_INDEX_UPLOAD
.with_label_values(&[
&sync_id.tenant_id.to_string(),
&sync_id.timeline_id.to_string(),
])
.inc();
upload_index_part(conf, storage, sync_id, new_index_part)
.await
.context("Failed to upload new index part")
@@ -1590,12 +1600,24 @@ fn compare_local_and_remote_timeline(
(initial_timeline_status, awaits_download)
}
fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option<bool>) {
fn register_sync_status(
sync_id: ZTenantTimelineId,
sync_start: Instant,
sync_name: &str,
sync_status: Option<bool>,
) {
let secs_elapsed = sync_start.elapsed().as_secs_f64();
info!("Processed a sync task in {secs_elapsed:.2} seconds");
debug!("Processed a sync task in {secs_elapsed:.2} seconds");
let tenant_id = sync_id.tenant_id.to_string();
let timeline_id = sync_id.timeline_id.to_string();
match sync_status {
Some(true) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "success"]),
Some(false) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "failure"]),
Some(true) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "success"])
}
Some(false) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "failure"])
}
None => return,
}
.observe(secs_elapsed)

View File

@@ -4,6 +4,7 @@ use std::{fmt::Debug, path::PathBuf};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use remote_storage::RemoteStorage;
use tokio::fs;
use tracing::{debug, error, info, warn};
@@ -17,6 +18,16 @@ use super::{
use crate::{
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
};
use metrics::{register_int_counter_vec, IntCounterVec};
lazy_static! {
static ref NO_LAYERS_UPLOAD: IntCounterVec = register_int_counter_vec!(
"pageserver_remote_storage_no_layers_uploads_total",
"Number of skipped uploads due to no layers",
&["tenant_id", "timeline_id"],
)
.expect("failed to register pageserver no layers upload vec");
}
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<P, S>(
@@ -102,7 +113,13 @@ where
.collect::<Vec<_>>();
if layers_to_upload.is_empty() {
info!("No layers to upload after filtering, aborting");
debug!("No layers to upload after filtering, aborting");
NO_LAYERS_UPLOAD
.with_label_values(&[
&sync_id.tenant_id.to_string(),
&sync_id.timeline_id.to_string(),
])
.inc();
return UploadedTimeline::Successful(upload_data);
}

View File

@@ -108,15 +108,21 @@ pub enum ThreadKind {
StorageSync,
}
struct MutableThreadState {
/// Tenant and timeline that this thread is associated with.
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
/// Handle for waiting for the thread to exit. It can be None, if the
/// the thread has already exited.
join_handle: Option<JoinHandle<()>>,
}
struct PageServerThread {
_thread_id: u64,
kind: ThreadKind,
/// Tenant and timeline that this thread is associated with.
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
name: String,
// To request thread shutdown, set the flag, and send a dummy message to the
@@ -124,9 +130,7 @@ struct PageServerThread {
shutdown_requested: AtomicBool,
shutdown_tx: watch::Sender<()>,
/// Handle for waiting for the thread to exit. It can be None, if the
/// the thread has already exited.
join_handle: Mutex<Option<JoinHandle<()>>>,
mutable: Mutex<MutableThreadState>,
}
/// Launch a new thread
@@ -145,29 +149,27 @@ where
{
let (shutdown_tx, shutdown_rx) = watch::channel(());
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
let thread = PageServerThread {
let thread = Arc::new(PageServerThread {
_thread_id: thread_id,
kind,
tenant_id,
timeline_id,
name: name.to_string(),
shutdown_requested: AtomicBool::new(false),
shutdown_tx,
join_handle: Mutex::new(None),
};
let thread_rc = Arc::new(thread);
let mut jh_guard = thread_rc.join_handle.lock().unwrap();
mutable: Mutex::new(MutableThreadState {
tenant_id,
timeline_id,
join_handle: None,
}),
});
THREADS
.lock()
.unwrap()
.insert(thread_id, Arc::clone(&thread_rc));
.insert(thread_id, Arc::clone(&thread));
let thread_rc2 = Arc::clone(&thread_rc);
let mut thread_mut = thread.mutable.lock().unwrap();
let thread_cloned = Arc::clone(&thread);
let thread_name = name.to_string();
let join_handle = match thread::Builder::new()
.name(name.to_string())
@@ -175,7 +177,7 @@ where
thread_wrapper(
thread_name,
thread_id,
thread_rc2,
thread_cloned,
shutdown_rx,
shutdown_process_on_error,
f,
@@ -189,8 +191,8 @@ where
return Err(err);
}
};
*jh_guard = Some(join_handle);
drop(jh_guard);
thread_mut.join_handle = Some(join_handle);
drop(thread_mut);
// The thread is now running. Nothing more to do here
Ok(thread_id)
@@ -229,19 +231,20 @@ fn thread_wrapper<F>(
.remove(&thread_id)
.expect("no thread in registry");
let thread_mut = thread.mutable.lock().unwrap();
match result {
Ok(Ok(())) => debug!("Thread '{}' exited normally", thread_name),
Ok(Err(err)) => {
if shutdown_process_on_error {
error!(
"Shutting down: thread '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
shutdown_pageserver(1);
} else {
error!(
"Thread '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
}
}
@@ -249,19 +252,29 @@ fn thread_wrapper<F>(
if shutdown_process_on_error {
error!(
"Shutting down: thread '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
shutdown_pageserver(1);
} else {
error!(
"Thread '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
}
}
}
}
// expected to be called from the thread of the given id.
pub fn associate_with(tenant_id: Option<ZTenantId>, timeline_id: Option<ZTimelineId>) {
CURRENT_THREAD.with(|ct| {
let borrowed = ct.borrow();
let mut thread_mut = borrowed.as_ref().unwrap().mutable.lock().unwrap();
thread_mut.tenant_id = tenant_id;
thread_mut.timeline_id = timeline_id;
});
}
/// Is there a thread running that matches the criteria
/// Signal and wait for threads to shut down.
@@ -285,9 +298,10 @@ pub fn shutdown_threads(
let threads = THREADS.lock().unwrap();
for thread in threads.values() {
let thread_mut = thread.mutable.lock().unwrap();
if (kind.is_none() || Some(thread.kind) == kind)
&& (tenant_id.is_none() || thread.tenant_id == tenant_id)
&& (timeline_id.is_none() || thread.timeline_id == timeline_id)
&& (tenant_id.is_none() || thread_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || thread_mut.timeline_id == timeline_id)
{
thread.shutdown_requested.store(true, Ordering::Relaxed);
// FIXME: handle error?
@@ -298,8 +312,10 @@ pub fn shutdown_threads(
drop(threads);
for thread in victim_threads {
let mut thread_mut = thread.mutable.lock().unwrap();
info!("waiting for {} to shut down", thread.name);
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {
if let Some(join_handle) = thread_mut.join_handle.take() {
drop(thread_mut);
let _ = join_handle.join();
} else {
// The thread had not even fully started yet. Or it was shut down

View File

@@ -50,7 +50,10 @@ use crate::thread_mgr::ThreadKind;
use crate::{thread_mgr, DatadirTimelineImpl};
use anyhow::{ensure, Context};
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{Client, SkTimelineInfo, SkTimelineSubscription, SkTimelineSubscriptionKind};
use etcd_broker::{
subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription,
Client,
};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::cell::Cell;
@@ -68,7 +71,7 @@ use tokio::{
use tracing::*;
use url::Url;
use utils::lsn::Lsn;
use utils::pq_proto::ZenithFeedback;
use utils::pq_proto::ReplicationFeedback;
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId};
use self::connection_handler::{WalConnectionEvent, WalReceiverConnection};
@@ -403,7 +406,7 @@ async fn timeline_wal_broker_loop_step(
// Endlessly try to subscribe for broker updates for a given timeline.
// If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly.
// This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway.
let mut broker_subscription: SkTimelineSubscription;
let mut broker_subscription: BrokerSubscription<SkTimelineInfo>;
let mut attempt = 0;
loop {
select! {
@@ -420,9 +423,9 @@ async fn timeline_wal_broker_loop_step(
info!("Broker subscription loop cancelled, shutting down");
return Ok(ControlFlow::Break(()));
},
new_subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates(
new_subscription = etcd_broker::subscribe_for_json_values(
etcd_client,
SkTimelineSubscriptionKind::timeline(broker_prefix.to_owned(), id),
SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id),
)
.instrument(info_span!("etcd_subscription")) => match new_subscription {
Ok(new_subscription) => {
@@ -518,7 +521,7 @@ struct WalConnectionData {
safekeeper_id: NodeId,
connection: WalReceiverConnection,
connection_init_time: NaiveDateTime,
last_wal_receiver_data: Option<(ZenithFeedback, NaiveDateTime)>,
last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>,
}
#[derive(Debug, PartialEq, Eq)]
@@ -843,7 +846,7 @@ mod tests {
.await;
let now = Utc::now().naive_utc();
dummy_connection_data.last_wal_receiver_data = Some((
ZenithFeedback {
ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: 1,
ps_applylsn: current_lsn,
@@ -1014,7 +1017,7 @@ mod tests {
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
dummy_connection_data.last_wal_receiver_data = Some((
ZenithFeedback {
ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: current_lsn.0,
ps_applylsn: 1,

View File

@@ -19,7 +19,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
@@ -33,7 +33,7 @@ use crate::{
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ZenithFeedback),
NewWal(ReplicationFeedback),
End(Result<(), String>),
}
@@ -328,7 +328,7 @@ async fn handle_walreceiver_connection(
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.
let zenith_status_update = ZenithFeedback {
let zenith_status_update = ReplicationFeedback {
current_timeline_size: timeline.get_current_logical_size() as u64,
ps_writelsn: write_lsn,
ps_flushlsn: flush_lsn,

View File

@@ -10,6 +10,7 @@ use remote_storage::RemoteStorageConfig;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc;
use toml_edit::Document;
@@ -27,6 +28,7 @@ use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::SafeKeeperConf;
use utils::auth::JwtAuth;
use utils::{
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
zid::NodeId,
@@ -132,6 +134,12 @@ fn main() -> anyhow::Result<()> {
.default_missing_value("true")
.help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."),
)
.arg(
Arg::new("auth-validation-public-key-path")
.long("auth-validation-public-key-path")
.takes_value(true)
.help("Path to an RSA .pem public key which is used to check JWT tokens")
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
@@ -204,6 +212,10 @@ fn main() -> anyhow::Result<()> {
.parse()
.context("failed to parse bool enable-s3-offload bool")?;
conf.auth_validation_public_key_path = arg_matches
.value_of("auth-validation-public-key-path")
.map(PathBuf::from);
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
}
@@ -239,6 +251,19 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
e
})?;
let auth = match conf.auth_validation_public_key_path.as_ref() {
None => {
info!("Auth is disabled");
None
}
Some(path) => {
info!("Loading JWT auth key from {}", path.display());
Some(Arc::new(
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
))
}
};
// XXX: Don't spawn any threads before daemonizing!
if conf.daemonize {
info!("daemonizing...");
@@ -280,8 +305,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
// TODO authentication
let router = http::make_router(conf_);
let router = http::make_router(conf_, auth);
endpoint::serve_thread_main(
router,
http_listener,
@@ -295,6 +319,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
.spawn(|| {
// TODO: add auth
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) {
info!("safekeeper thread terminated: {e}");
}
@@ -309,6 +334,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
// TODO: add auth?
broker::thread_main(conf_);
})?,
);
@@ -321,6 +347,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
thread::Builder::new()
.name("WAL removal thread".into())
.spawn(|| {
// TODO: add auth?
remove_wal::thread_main(conf_);
})?,
);
@@ -330,6 +357,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
thread::Builder::new()
.name("wal backup launcher thread".into())
.spawn(move || {
// TODO: add auth?
wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx);
})?,
);

View File

@@ -4,9 +4,7 @@ use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use etcd_broker::Client;
use etcd_broker::PutOptions;
use etcd_broker::SkTimelineSubscriptionKind;
use etcd_broker::subscription_value::SkTimelineInfo;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
@@ -15,6 +13,10 @@ use tracing::*;
use url::Url;
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
use etcd_broker::{
subscription_key::{OperationKind, SkOperationKind, SubscriptionKey},
Client, PutOptions,
};
use utils::zid::{NodeId, ZTenantTimelineId};
const RETRY_INTERVAL_MSEC: u64 = 1000;
@@ -43,7 +45,7 @@ fn timeline_safekeeper_path(
) -> String {
format!(
"{}/{sk_id}",
SkTimelineSubscriptionKind::timeline(broker_etcd_prefix, zttid).watch_key()
SubscriptionKey::sk_timeline_info(broker_etcd_prefix, zttid).watch_key()
)
}
@@ -90,7 +92,7 @@ impl ElectionLeader {
}
}
pub async fn get_leader(req: &Election) -> Result<ElectionLeader> {
pub async fn get_leader(req: &Election, leader: &mut Option<ElectionLeader>) -> Result<()> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
@@ -102,22 +104,27 @@ pub async fn get_leader(req: &Election) -> Result<ElectionLeader> {
let lease_id = lease.map(|l| l.id()).unwrap();
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
// kill previous keepalive, if any
if let Some(l) = leader.take() {
l.give_up().await;
}
if let Err(e) = client
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
// immediately save handle to kill task if we get canceled below
*leader = Some(ElectionLeader {
client: client.clone(),
keep_alive,
});
client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await
{
keep_alive.abort();
let _ = keep_alive.await;
return Err(e.into());
}
.await?;
Ok(ElectionLeader { client, keep_alive })
Ok(())
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
@@ -143,14 +150,6 @@ async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
}
}
pub fn get_campaign_name(
election_name: &str,
broker_prefix: &str,
id: ZTenantTimelineId,
) -> String {
format!("{broker_prefix}/{id}/{election_name}")
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
@@ -204,9 +203,20 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates(
let mut subscription = etcd_broker::subscribe_for_values(
&mut client,
SkTimelineSubscriptionKind::all(conf.broker_etcd_prefix.clone()),
SubscriptionKey::all(conf.broker_etcd_prefix.clone()),
|full_key, value_str| {
if full_key.operation == OperationKind::Safekeeper(SkOperationKind::TimelineInfo) {
match serde_json::from_str::<SkTimelineInfo>(value_str) {
Ok(new_info) => return Some(new_info),
Err(e) => {
error!("Failed to parse timeline info from value str '{value_str}': {e}")
}
}
}
None
},
)
.await
.context("failed to subscribe for safekeeper info")?;

View File

@@ -1,9 +1,9 @@
use etcd_broker::SkTimelineInfo;
use hyper::{Body, Request, Response, StatusCode};
use hyper::{Body, Request, Response, StatusCode, Uri};
use once_cell::sync::Lazy;
use serde::Serialize;
use serde::Serializer;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
@@ -11,9 +11,11 @@ use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
use crate::SafeKeeperConf;
use etcd_broker::subscription_value::SkTimelineInfo;
use utils::{
auth::JwtAuth,
http::{
endpoint,
endpoint::{self, auth_middleware, check_permission},
error::ApiError,
json::{json_request, json_response},
request::{ensure_no_body, parse_request_param},
@@ -32,6 +34,7 @@ struct SafekeeperStatus {
/// Healthcheck handler.
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let conf = get_conf(&request);
let status = SafekeeperStatus { id: conf.my_id };
json_response(StatusCode::OK, status)
@@ -91,6 +94,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(zttid.tenant_id))?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
let (inmem, state) = tli.get_state();
@@ -125,6 +129,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
tenant_id: request_data.tenant_id,
timeline_id: request_data.timeline_id,
};
check_permission(&request, Some(zttid.tenant_id))?;
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
.map_err(ApiError::from_err)?;
@@ -145,6 +150,7 @@ async fn timeline_delete_force_handler(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(zttid.tenant_id))?;
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
@@ -160,6 +166,7 @@ async fn tenant_delete_force_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
@@ -178,6 +185,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(zttid.tenant_id))?;
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
@@ -188,15 +196,33 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
}
/// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
pub fn make_router(
conf: SafeKeeperConf,
auth: Option<Arc<JwtAuth>>,
) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router();
if auth.is_some() {
router = router.middleware(auth_middleware(|request| {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {
// Option<Arc<JwtAuth>> is always provided as data below, hence unwrap().
request.data::<Option<Arc<JwtAuth>>>().unwrap().as_deref()
}
}))
}
router
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", status_handler)
.get(
"/v1/timeline/:tenant_id/:timeline_id",
timeline_status_handler,
)
// Will be used in the future instead of implicit timeline creation
.post("/v1/timeline", timeline_create_handler)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",

View File

@@ -57,6 +57,7 @@ pub struct SafeKeeperConf {
pub my_id: NodeId,
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub auth_validation_public_key_path: Option<PathBuf>,
}
impl SafeKeeperConf {
@@ -88,6 +89,7 @@ impl Default for SafeKeeperConf {
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
auth_validation_public_key_path: None,
}
}
}

View File

@@ -242,9 +242,9 @@ impl Collector for TimelineCollector {
let timeline_id = tli.zttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
let mut most_advanced: Option<utils::pq_proto::ZenithFeedback> = None;
let mut most_advanced: Option<utils::pq_proto::ReplicationFeedback> = None;
for replica in tli.replicas.iter() {
if let Some(replica_feedback) = replica.zenith_feedback {
if let Some(replica_feedback) = replica.pageserver_feedback {
if let Some(current) = most_advanced {
if current.ps_writelsn < replica_feedback.ps_writelsn {
most_advanced = Some(replica_feedback);

View File

@@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use etcd_broker::SkTimelineInfo;
use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::xlog_utils::TimeLineID;
use postgres_ffi::xlog_utils::XLogSegNo;
@@ -23,7 +23,7 @@ use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use utils::{
bin_ser::LeSer,
lsn::Lsn,
pq_proto::{SystemId, ZenithFeedback},
pq_proto::{ReplicationFeedback, SystemId},
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
@@ -348,7 +348,7 @@ pub struct AppendResponse {
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
pub zenith_feedback: ZenithFeedback,
pub pageserver_feedback: ReplicationFeedback,
}
impl AppendResponse {
@@ -358,7 +358,7 @@ impl AppendResponse {
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
zenith_feedback: ZenithFeedback::empty(),
pageserver_feedback: ReplicationFeedback::empty(),
}
}
}
@@ -476,7 +476,7 @@ impl AcceptorProposerMessage {
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
msg.zenith_feedback.serialize(buf)?
msg.pageserver_feedback.serialize(buf)?
}
}
@@ -677,7 +677,7 @@ where
commit_lsn: self.state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
zenith_feedback: ZenithFeedback::empty(),
pageserver_feedback: ReplicationFeedback::empty(),
};
trace!("formed AppendResponse {:?}", ar);
ar

View File

@@ -21,7 +21,7 @@ use utils::{
bin_ser::BeSer,
lsn::Lsn,
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback},
pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody},
sock_split::ReadStream,
};
@@ -29,7 +29,7 @@ use utils::{
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
// zenith extension of replication protocol
const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
type FullTransactionId = u64;
@@ -122,15 +122,15 @@ impl ReplicationConn {
warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet.");
// timeline.update_replica_state(replica_id, Some(state));
}
Some(ZENITH_STATUS_UPDATE_TAG_BYTE) => {
Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
// Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
let buf = Bytes::copy_from_slice(&m[9..]);
let reply = ZenithFeedback::parse(buf);
let reply = ReplicationFeedback::parse(buf);
trace!("ZenithFeedback is {:?}", reply);
// Only pageserver sends ZenithFeedback, so set the flag.
trace!("ReplicationFeedback is {:?}", reply);
// Only pageserver sends ReplicationFeedback, so set the flag.
// This replica is the source of information to resend to compute.
state.zenith_feedback = Some(reply);
state.pageserver_feedback = Some(reply);
timeline.update_replica_state(replica_id, state);
}

View File

@@ -3,7 +3,7 @@
use anyhow::{bail, Context, Result};
use etcd_broker::SkTimelineInfo;
use etcd_broker::subscription_value::SkTimelineInfo;
use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;
@@ -21,7 +21,7 @@ use tracing::*;
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantId, ZTenantTimelineId},
};
@@ -48,8 +48,8 @@ pub struct ReplicaState {
pub remote_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
/// Zenith specific feedback received from pageserver, if any
pub zenith_feedback: Option<ZenithFeedback>,
/// Replication specific feedback received from pageserver, if any
pub pageserver_feedback: Option<ReplicationFeedback>,
}
impl Default for ReplicaState {
@@ -68,7 +68,7 @@ impl ReplicaState {
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
zenith_feedback: None,
pageserver_feedback: None,
}
}
}
@@ -149,8 +149,12 @@ impl SharedState {
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action.
fn update_status(&mut self) -> bool {
self.active = self.is_active();
fn update_status(&mut self, ttid: ZTenantTimelineId) -> bool {
let is_active = self.is_active();
if self.active != is_active {
info!("timeline {} active={} now", ttid, is_active);
}
self.active = is_active;
self.is_wal_backup_action_pending()
}
@@ -187,6 +191,12 @@ impl SharedState {
self.wal_backup_active
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
fn can_wal_backup(&self) -> bool {
self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -211,25 +221,25 @@ impl SharedState {
// we need to know which pageserver compute node considers to be main.
// See https://github.com/zenithdb/zenith/issues/1171
//
if let Some(zenith_feedback) = state.zenith_feedback {
if let Some(acc_feedback) = acc.zenith_feedback {
if acc_feedback.ps_writelsn < zenith_feedback.ps_writelsn {
if let Some(pageserver_feedback) = state.pageserver_feedback {
if let Some(acc_feedback) = acc.pageserver_feedback {
if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn {
warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet.");
acc.zenith_feedback = Some(zenith_feedback);
acc.pageserver_feedback = Some(pageserver_feedback);
}
} else {
acc.zenith_feedback = Some(zenith_feedback);
acc.pageserver_feedback = Some(pageserver_feedback);
}
// last lsn received by pageserver
// FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver.
// See https://github.com/zenithdb/zenith/issues/1171
acc.last_received_lsn = Lsn::from(zenith_feedback.ps_writelsn);
acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn);
// When at least one pageserver has preserved data up to remote_consistent_lsn,
// safekeeper is free to delete it, so choose max of all pageservers.
acc.remote_consistent_lsn = max(
Lsn::from(zenith_feedback.ps_applylsn),
Lsn::from(pageserver_feedback.ps_applylsn),
acc.remote_consistent_lsn,
);
}
@@ -291,7 +301,7 @@ impl Timeline {
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status();
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
@@ -308,7 +318,7 @@ impl Timeline {
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
is_wal_backup_action_pending = shared_state.update_status();
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
}
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
@@ -327,7 +337,7 @@ impl Timeline {
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.update_status();
shared_state.update_status(self.zttid);
return Ok(true);
}
}
@@ -341,6 +351,12 @@ impl Timeline {
shared_state.wal_backup_attend()
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
pub fn can_wal_backup(&self) -> bool {
self.mutex.lock().unwrap().can_wal_backup()
}
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
@@ -441,8 +457,8 @@ impl Timeline {
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
if let Some(zenith_feedback) = state.zenith_feedback {
resp.zenith_feedback = zenith_feedback;
if let Some(pageserver_feedback) = state.pageserver_feedback {
resp.pageserver_feedback = pageserver_feedback;
}
}
@@ -509,7 +525,7 @@ impl Timeline {
}
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
is_wal_backup_action_pending = shared_state.update_status();
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;

View File

@@ -1,4 +1,7 @@
use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::task::JoinHandle;
use std::cmp::min;
@@ -26,8 +29,6 @@ use crate::{broker, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BACKUP_ELECTION_NAME: &str = "WAL_BACKUP";
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
@@ -48,14 +49,10 @@ pub fn wal_backup_launcher_thread_main(
});
}
/// Check whether wal backup is required for timeline and mark that launcher is
/// aware of current status (if timeline exists).
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli.wal_backup_attend()
} else {
false
}
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
}
struct WalBackupTaskHandle {
@@ -63,6 +60,56 @@ struct WalBackupTaskHandle {
handle: JoinHandle<()>,
}
struct WalBackupTimelineEntry {
timeline: Arc<Timeline>,
handle: Option<WalBackupTaskHandle>,
}
/// Start per timeline task, if it makes sense for this safekeeper to offload.
fn consider_start_task(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
task: &mut WalBackupTimelineEntry,
) {
if !task.timeline.can_wal_backup() {
return;
}
info!("starting WAL backup task for {}", zttid);
// TODO: decide who should offload right here by simply checking current
// state instead of running elections in offloading task.
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
zttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&zttid);
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", zttid = %zttid)),
);
task.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
@@ -71,7 +118,7 @@ async fn wal_backup_launcher_main_loop(
mut wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
info!(
"WAL backup launcher: started, remote config {:?}",
"WAL backup launcher started, remote config {:?}",
conf.remote_storage
);
@@ -82,60 +129,50 @@ async fn wal_backup_launcher_main_loop(
})
});
let mut tasks: HashMap<ZTenantTimelineId, WalBackupTaskHandle> = HashMap::new();
// Presense in this map means launcher is aware s3 offloading is needed for
// the timeline, but task is started only if it makes sense for to offload
// from this safekeeper.
let mut tasks: HashMap<ZTenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
loop {
// channel is never expected to get closed
let zttid = wal_backup_launcher_rx.recv().await.unwrap();
let is_wal_backup_required = is_wal_backup_required(zttid);
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
// do we need to do anything at all?
if is_wal_backup_required != tasks.contains_key(&zttid) {
if is_wal_backup_required {
// need to start the task
info!("starting WAL backup task for {}", zttid);
tokio::select! {
zttid = wal_backup_launcher_rx.recv() => {
// channel is never expected to get closed
let zttid = zttid.unwrap();
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
let timeline = is_wal_backup_required(zttid);
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&zttid) {
if let Some(timeline) = timeline {
// need to start the task
let entry = tasks.entry(zttid).or_insert(WalBackupTimelineEntry {
timeline,
handle: None,
});
consider_start_task(&conf, zttid, entry);
} else {
// need to stop the task
info!("stopping WAL backup task for {}", zttid);
// TODO: decide who should offload in launcher itself by simply checking current state
let election_name = broker::get_campaign_name(
BACKUP_ELECTION_NAME,
&conf.broker_etcd_prefix,
zttid,
);
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&zttid);
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", zttid = %zttid)),
);
tasks.insert(
zttid,
WalBackupTaskHandle {
shutdown_tx,
handle,
},
);
} else {
// need to stop the task
info!("stopping WAL backup task for {}", zttid);
let wb_handle = tasks.remove(&zttid).unwrap();
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
// Hm, why I can't await on reference to handle?
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", zttid, e);
let entry = tasks.remove(&zttid).unwrap();
if let Some(wb_handle) = entry.handle {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", zttid, e);
}
}
}
}
}
// Start known tasks, if needed and possible.
_ = ticker.tick() => {
for (zttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) {
consider_start_task(&conf, *zttid, entry);
}
}
}
@@ -200,20 +237,11 @@ impl WalBackupTask {
loop {
let mut retry_attempt = 0u32;
if let Some(l) = self.leader.take() {
l.give_up().await;
}
info!("acquiring leadership");
match broker::get_leader(&self.election).await {
Ok(l) => {
self.leader = Some(l);
}
Err(e) => {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
continue;
}
if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
continue;
}
info!("acquired leadership");

View File

@@ -26,6 +26,7 @@ KEY_EXCLUDE_FIELDS = frozenset({
})
NEGATIVE_COLOR = 'negative'
POSITIVE_COLOR = 'positive'
EPS = 1e-6
@dataclass
@@ -120,7 +121,8 @@ def get_row_values(columns: List[str], run_result: SuitRun,
# this might happen when new metric is added and there is no value for it in previous run
# let this be here, TODO add proper handling when this actually happens
raise ValueError(f'{column} not found in previous result')
ratio = float(value) / float(prev_value['value']) - 1
# adding `EPS` to each term to avoid ZeroDivisionError when the denominator is zero
ratio = (float(value) + EPS) / (float(prev_value['value']) + EPS) - 1
ratio_display, color = format_ratio(ratio, current_value['report'])
row_values.append(RowValue(value, color, ratio_display))
return row_values

View File

@@ -7,7 +7,7 @@ import pytest
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_auth_enabled = True
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
ps = env.pageserver
@@ -54,7 +54,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
neon_env_builder.pageserver_auth_enabled = True
neon_env_builder.auth_enabled = True
if with_safekeepers:
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

View File

@@ -42,8 +42,8 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
Repeat check for several tenants/timelines.
"""
env = neon_env_builder.init_start()
neon_env_builder.num_safekeepers = num_safekeepers
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
for _ in range(num_timelines):

View File

@@ -114,7 +114,7 @@ def test_pageserver_http_api_client(neon_simple_env: NeonEnv):
def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_auth_enabled = True
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
management_token = env.auth_keys.generate_management_token()

View File

@@ -10,7 +10,7 @@ from fixtures.log_helper import log
#
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
neon_env_builder.pageserver_auth_enabled = True
neon_env_builder.auth_enabled = True
if with_safekeepers:
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

View File

@@ -2,11 +2,14 @@ from contextlib import closing
from datetime import datetime
import os
import pytest
import time
from uuid import UUID
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.utils import lsn_to_hex
from fixtures.benchmark_fixture import MetricReport
@pytest.mark.parametrize('with_safekeepers', [False, True])
@@ -44,6 +47,56 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder, with_safekeepers:
assert cur.fetchone() == (5000050000, )
def test_tenant_threads(neon_env_builder, zenbenchmark):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
def get_num_threads() -> int:
metrics = env.pageserver.http_client().get_metrics()
parsed = parse_metrics(metrics)
threads = parsed.query_one("process_threads").value
return threads
threads_before = get_num_threads()
zenbenchmark.record("threads_before", threads_before, "", report=MetricReport.LOWER_IS_BETTER)
tenants = env.pageserver.http_client().tenant_list()
num_tenants = len(tenants)
num_active = len([t for t in tenants if t["state"] == "Active"])
zenbenchmark.record("tenants_before", num_tenants, "", report=MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("active_before", num_active, "", report=MetricReport.LOWER_IS_BETTER)
for i in range(20):
print(f"creating tenant {i}")
name = f"test_tenant_threads_{i}"
tenant, _ = env.neon_cli.create_tenant()
timeline = env.neon_cli.create_timeline(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
pg.safe_psql("select 1;")
pg.stop()
env.pageserver.http_client().timeline_detach(tenant, timeline)
remaining_timelines = [
UUID(r["timeline_id"])
for r in env.pageserver.http_client().timeline_list(tenant)
]
for t in remaining_timelines:
env.pageserver.http_client().timeline_detach(tenant, t)
time.sleep(5)
threads_after = get_num_threads()
zenbenchmark.record("threads_before", threads_after, "", report=MetricReport.LOWER_IS_BETTER)
tenants = env.pageserver.http_client().tenant_list()
num_tenants = len(tenants)
num_active = len([t for t in tenants if t["state"] == "Active"])
zenbenchmark.record("tenants_after", num_tenants, "", report=MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("active_after", num_active, "", report=MetricReport.LOWER_IS_BETTER)
def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3

View File

@@ -16,6 +16,7 @@ from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Sa
from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
from uuid import uuid4
@dataclass
@@ -349,10 +350,12 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
def test_wal_removal(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('auth_enabled', [False, True])
def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.num_safekeepers = 2
# to advance remote_consistent_llsn
neon_env_builder.enable_local_fs_remote_storage()
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_wal_removal')
@@ -369,7 +372,10 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder):
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
pageserver_conn_options = {}
if auth_enabled:
pageserver_conn_options['password'] = env.auth_keys.generate_tenant_token(tenant_id)
with closing(env.pageserver.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
@@ -380,9 +386,29 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder):
]
assert all(os.path.exists(p) for p in first_segments)
http_cli = env.safekeepers[0].http_client()
if not auth_enabled:
http_cli = env.safekeepers[0].http_client()
else:
http_cli = env.safekeepers[0].http_client(
auth_token=env.auth_keys.generate_tenant_token(tenant_id))
http_cli_other = env.safekeepers[0].http_client(
auth_token=env.auth_keys.generate_tenant_token(uuid4().hex))
http_cli_noauth = env.safekeepers[0].http_client()
# Pretend WAL is offloaded to s3.
if auth_enabled:
old_backup_lsn = http_cli.timeline_status(tenant_id=tenant_id,
timeline_id=timeline_id).backup_lsn
assert 'FFFFFFFF/FEFFFFFF' != old_backup_lsn
for cli in [http_cli_other, http_cli_noauth]:
with pytest.raises(cli.HTTPError, match='Forbidden|Unauthorized'):
cli.record_safekeeper_info(tenant_id,
timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'})
assert old_backup_lsn == http_cli.timeline_status(tenant_id=tenant_id,
timeline_id=timeline_id).backup_lsn
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'})
assert 'FFFFFFFF/FEFFFFFF' == http_cli.timeline_status(tenant_id=tenant_id,
timeline_id=timeline_id).backup_lsn
# wait till first segment is removed on all safekeepers
started_at = time.time()
@@ -596,25 +622,42 @@ def test_sync_safekeepers(neon_env_builder: NeonEnvBuilder,
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
def test_timeline_status(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('auth_enabled', [False, True])
def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_timeline_status')
pg = env.postgres.create_start('test_timeline_status')
wa = env.safekeepers[0]
wa_http_cli = wa.http_client()
wa_http_cli.check_status()
# learn neon timeline from compute
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
if not auth_enabled:
wa_http_cli = wa.http_client()
wa_http_cli.check_status()
else:
wa_http_cli = wa.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
wa_http_cli.check_status()
wa_http_cli_bad = wa.http_client(
auth_token=env.auth_keys.generate_tenant_token(uuid4().hex))
wa_http_cli_bad.check_status()
wa_http_cli_noauth = wa.http_client()
wa_http_cli_noauth.check_status()
# fetch something sensible from status
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
epoch = tli_status.acceptor_epoch
timeline_start_lsn = tli_status.timeline_start_lsn
if auth_enabled:
for cli in [wa_http_cli_bad, wa_http_cli_noauth]:
with pytest.raises(cli.HTTPError, match='Forbidden|Unauthorized'):
cli.timeline_status(tenant_id, timeline_id)
pg.safe_psql("create table t(i int)")
# ensure epoch goes up after reboot
@@ -894,8 +937,10 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder):
assert wal_size_after_checkpoint < 16 * 2.5
def test_delete_force(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('auth_enabled', [False, True])
def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.num_safekeepers = 1
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
# Create two tenants: one will be deleted, other should be preserved.
@@ -921,7 +966,14 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder):
cur.execute('CREATE TABLE t(key int primary key)')
sk = env.safekeepers[0]
sk_data_dir = Path(sk.data_dir())
sk_http = sk.http_client()
if not auth_enabled:
sk_http = sk.http_client()
sk_http_other = sk_http
else:
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
sk_http_other = sk.http_client(
auth_token=env.auth_keys.generate_tenant_token(tenant_id_other))
sk_http_noauth = sk.http_client()
assert (sk_data_dir / tenant_id / timeline_id_1).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
@@ -961,6 +1013,15 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder):
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
if auth_enabled:
# Ensure we cannot delete the other tenant
for sk_h in [sk_http, sk_http_noauth]:
with pytest.raises(sk_h.HTTPError, match='Forbidden|Unauthorized'):
assert sk_h.timeline_delete_force(tenant_id_other, timeline_id_other)
with pytest.raises(sk_h.HTTPError, match='Forbidden|Unauthorized'):
assert sk_h.tenant_delete_force(tenant_id_other)
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Remove initial tenant's br2 (inactive)
assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == {
"dir_existed": True,
@@ -1001,7 +1062,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder):
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Ensure the other tenant still works
sk_http.timeline_status(tenant_id_other, timeline_id_other)
sk_http_other.timeline_status(tenant_id_other, timeline_id_other)
with closing(pg_other.connect()) as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO t (key) VALUES (123)')

View File

@@ -29,7 +29,7 @@ from dataclasses import dataclass
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import make_dsn, parse_dsn
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing_extensions import Literal
import requests
@@ -500,7 +500,7 @@ class NeonEnvBuilder:
num_safekeepers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
pageserver_auth_enabled: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME):
self.repo_dir = repo_dir
@@ -513,7 +513,7 @@ class NeonEnvBuilder:
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.safekeepers_id_start = safekeepers_id_start
self.pageserver_auth_enabled = pageserver_auth_enabled
self.auth_enabled = auth_enabled
self.default_branch_name = default_branch_name
self.env: Optional[NeonEnv] = None
@@ -639,7 +639,7 @@ class NeonEnv:
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
pageserver_auth_type = "ZenithJWT" if config.pageserver_auth_enabled else "Trust"
pageserver_auth_type = "ZenithJWT" if config.auth_enabled else "Trust"
toml += textwrap.dedent(f"""
[pageserver]
@@ -667,6 +667,10 @@ class NeonEnv:
pg_port = {port.pg}
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster""")
if config.auth_enabled:
toml += textwrap.dedent(f"""
auth_enabled = true
""")
if bool(self.remote_storage_users
& RemoteStorageUsers.SAFEKEEPER) and self.remote_storage is not None:
toml += textwrap.dedent(f"""
@@ -1197,7 +1201,7 @@ class NeonCli:
log.info(f'Running in "{self.env.repo_dir}"')
env_vars = os.environ.copy()
env_vars['ZENITH_REPO_DIR'] = str(self.env.repo_dir)
env_vars['NEON_REPO_DIR'] = str(self.env.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
if self.env.rust_log_override is not None:
env_vars['RUST_LOG'] = self.env.rust_log_override
@@ -1375,6 +1379,7 @@ class VanillaPostgres(PgProtocol):
self.pg_bin = pg_bin
self.running = False
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
self.configure([f"port = {port}\n"])
def configure(self, options: List[str]):
"""Append lines into postgresql.conf file."""
@@ -1409,10 +1414,12 @@ class VanillaPostgres(PgProtocol):
@pytest.fixture(scope='function')
def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]:
def vanilla_pg(test_output_dir: str,
port_distributor: PortDistributor) -> Iterator[VanillaPostgres]:
pgdatadir = os.path.join(test_output_dir, "pgdata-vanilla")
pg_bin = PgBin(test_output_dir)
with VanillaPostgres(pgdatadir, pg_bin, 5432) as vanilla_pg:
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
yield vanilla_pg
@@ -1458,7 +1465,7 @@ def remote_pg(test_output_dir: str) -> Iterator[RemotePostgres]:
class NeonProxy(PgProtocol):
def __init__(self, port: int):
def __init__(self, port: int, pg_port: int):
super().__init__(host="127.0.0.1",
user="proxy_user",
password="pytest2",
@@ -1467,9 +1474,10 @@ class NeonProxy(PgProtocol):
self.http_port = 7001
self.host = "127.0.0.1"
self.port = port
self.pg_port = pg_port
self._popen: Optional[subprocess.Popen[bytes]] = None
def start_static(self, addr="127.0.0.1:5432") -> None:
def start(self) -> None:
assert self._popen is None
# Start proxy
@@ -1478,7 +1486,8 @@ class NeonProxy(PgProtocol):
args.extend(["--http", f"{self.host}:{self.http_port}"])
args.extend(["--proxy", f"{self.host}:{self.port}"])
args.extend(["--auth-backend", "postgres"])
args.extend(["--auth-endpoint", "postgres://proxy_auth:pytest1@localhost:5432/postgres"])
args.extend(
["--auth-endpoint", f"postgres://proxy_auth:pytest1@localhost:{self.pg_port}/postgres"])
self._popen = subprocess.Popen(args)
self._wait_until_ready()
@@ -1497,14 +1506,16 @@ class NeonProxy(PgProtocol):
@pytest.fixture(scope='function')
def static_proxy(vanilla_pg) -> Iterator[NeonProxy]:
def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
"""Neon proxy that routes directly to vanilla postgres."""
vanilla_pg.start()
vanilla_pg.safe_psql("create user proxy_auth with password 'pytest1' superuser")
vanilla_pg.safe_psql("create user proxy_user with password 'pytest2'")
with NeonProxy(4432) as proxy:
proxy.start_static()
port = port_distributor.get_port()
pg_port = vanilla_pg.default_options['port']
with NeonProxy(port, pg_port) as proxy:
proxy.start()
yield proxy
@@ -1757,7 +1768,6 @@ class Safekeeper:
env: NeonEnv
port: SafekeeperPort
id: int
auth_token: Optional[str] = None
running: bool = False
def start(self) -> 'Safekeeper':
@@ -1813,8 +1823,8 @@ class Safekeeper:
assert isinstance(res, dict)
return res
def http_client(self) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http)
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token)
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
@@ -1838,9 +1848,15 @@ class SafekeeperMetrics:
class SafekeeperHttpClient(requests.Session):
def __init__(self, port: int):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
if auth_token is not None:
self.headers['Authorization'] = f'Bearer {auth_token}'
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()

View File

@@ -3,7 +3,7 @@ import shutil
import subprocess
from pathlib import Path
from typing import Any, List, Optional
from typing import Any, List
from fixtures.log_helper import log