Compare commits

..

6 Commits

Author SHA1 Message Date
Arseny Sher
f0cbd5353a Use in wp custom WAL reader gracefully handling missing WAL.
and disable recovery on start.
2023-10-04 12:51:26 +03:00
Arseny Sher
8ea21686e1 Add safekeeper test_late_init. 2023-10-04 12:50:47 +03:00
Arseny Sher
a8e7eede2a Add check that WAL segments are identical after recovery. 2023-09-20 13:34:44 +03:00
Arseny Sher
2b91f507a8 Make test_pageserver_http_get_wal_receiver_success not wait for keepalive. 2023-09-18 17:44:39 +03:00
Arseny Sher
bb2c3253c6 Introduce safekeeper peer recovery.
Implements fetching of WAL by safekeeper from another safekeeper by imitating
behaviour of last elected leader. This allows to avoid WAL accumulation on
compute and facilitates faster compute startup as it doesn't need to download
any WAL. Actually removing WAL download in walproposer is a matter of another
patch though.

There is a per timeline task which always runs, checking regularly if it should
start recovery frome someone, meaning there is something to fetch and there is
no streaming compute. It then proceeds with fetching, finishing when there is
nothing more to receive.

Implements https://github.com/neondatabase/neon/pull/4875
2023-09-18 17:44:38 +03:00
Arseny Sher
bdf3769a2b Don't use AppenRequestHeader.epoch_start_lsn.
It is simpler to get it once from ProposerEelected.
2023-09-18 17:44:38 +03:00
38 changed files with 1431 additions and 825 deletions

View File

@@ -834,7 +834,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.17.10
VM_BUILDER_VERSION: v0.17.5
steps:
- name: Checkout
@@ -844,7 +844,7 @@ jobs:
- name: Downloading vm-builder
run: |
curl -fL https://workstation-ubuntu.tail888fb.ts.net/vm-builder -o vm-builder
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
chmod +x vm-builder
# Note: we need a separate pull step here because otherwise vm-builder will try to pull, and
@@ -1091,9 +1091,8 @@ jobs:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"

54
Cargo.lock generated
View File

@@ -636,7 +636,7 @@ dependencies = [
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tokio-tungstenite 0.20.0",
"tower",
"tower-layer",
"tower-service",
@@ -1941,15 +1941,15 @@ dependencies = [
[[package]]
name = "hyper-tungstenite"
version = "0.11.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9"
checksum = "880b8b1c98a5ec2a505c7c90db6d3f6f1f480af5655d9c5b55facc9382a5a5b5"
dependencies = [
"hyper",
"pin-project-lite",
"pin-project",
"tokio",
"tokio-tungstenite",
"tungstenite",
"tokio-tungstenite 0.18.0",
"tungstenite 0.18.0",
]
[[package]]
@@ -2908,9 +2908,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.13"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
@@ -4641,6 +4641,18 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-tungstenite"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.18.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.20.0"
@@ -4650,7 +4662,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
"tungstenite 0.20.0",
]
[[package]]
@@ -4965,9 +4977,28 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.20.1"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
dependencies = [
"byteorder",
"bytes",
@@ -5617,7 +5648,6 @@ dependencies = [
"tower",
"tracing",
"tracing-core",
"tungstenite",
"url",
"uuid",
]

View File

@@ -78,7 +78,7 @@ hostname = "0.3.1"
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper-tungstenite = "0.11"
hyper-tungstenite = "0.9"
inotify = "0.10.2"
itertools = "0.10"
jsonwebtoken = "8"

View File

@@ -589,7 +589,8 @@ RUN case "${PG_VERSION}" in \
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/embedding.control
#########################################################################################
#

View File

@@ -153,6 +153,18 @@ neon-pg-ext-%: postgres-%
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
# pg_embedding was temporarily released as hnsw from this repo, when we only
# supported PostgreSQL 14 and 15
neon-pg-ext-v14: neon-pg-ext-hnsw-v14
neon-pg-ext-v15: neon-pg-ext-hnsw-v15
neon-pg-ext-hnsw-%: postgres-headers-% postgres-%
+@echo "Compiling hnsw $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/hnsw-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/hnsw-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/hnsw/Makefile install
.PHONY: neon-pg-ext-clean-%
neon-pg-ext-clean-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
@@ -167,6 +179,9 @@ neon-pg-ext-clean-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/hnsw-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/hnsw/Makefile clean
.PHONY: neon-pg-ext
neon-pg-ext: \

View File

@@ -29,13 +29,13 @@ See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more informati
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev cmake postgresql-client protobuf-compiler \
libcurl4-openssl-dev openssl python-poetry lsof libicu-dev
libcurl4-openssl-dev openssl python-poetry lsof
```
* On Fedora, these packages are needed:
```bash
dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler \
protobuf-devel libcurl-devel openssl poetry lsof libicu-devel
protobuf-devel libcurl-devel openssl poetry lsof
```
* On Arch based systems, these packages are needed:
```bash

View File

@@ -3,7 +3,7 @@
//!
use chrono::{DateTime, Utc};
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde::Serialize;
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
@@ -54,8 +54,8 @@ impl EventType {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra, Metric> {
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra, Metric: Serialize> {
#[serde(flatten)]
#[serde(rename = "type")]
pub kind: EventType,

View File

@@ -561,7 +561,14 @@ impl CgroupWatcher {
/// Setting these values also affects the thresholds for receiving usage alerts.
#[derive(Debug)]
pub struct MemoryLimits {
pub high: u64,
high: u64,
max: u64,
}
impl MemoryLimits {
pub fn new(high: u64, max: u64) -> Self {
Self { max, high }
}
}
// Methods for manipulating the actual cgroup
@@ -638,7 +645,12 @@ impl CgroupWatcher {
/// Set cgroup memory.high and memory.max.
pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
info!(limits.high, path = self.path(), "writing new memory limits",);
info!(
limits.high,
limits.max,
path = self.path(),
"writing new memory limits",
);
self.memory()
.context("failed to get memory subsystem while setting memory limits")?
.set_mem(cgroups_rs::memory::SetMemory {
@@ -647,7 +659,7 @@ impl CgroupWatcher {
high: Some(MaxValue::Value(
u64::min(limits.high, i64::MAX as u64) as i64
)),
max: None,
max: Some(MaxValue::Value(u64::min(limits.max, i64::MAX as u64) as i64)),
})
.context("failed to set memory limits")
}
@@ -655,7 +667,7 @@ impl CgroupWatcher {
/// Given some amount of available memory, set the desired cgroup memory limits
pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
let new_high = self.config.calculate_memory_high_value(available_memory);
let limits = MemoryLimits { high: new_high };
let limits = MemoryLimits::new(new_high, available_memory);
info!(
path = self.path(),
memory = ?limits,

View File

@@ -257,11 +257,12 @@ impl Runner {
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
}
let limits = MemoryLimits {
let limits = MemoryLimits::new(
// new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
// since it is properly initialized in the previous cgroup if let block
high: new_cgroup_mem_high,
};
new_cgroup_mem_high,
available_memory,
);
cgroup
.set_limits(&limits)
.context("failed to set cgroup memory limits")?;
@@ -327,9 +328,7 @@ impl Runner {
name = cgroup.path(),
"updating cgroup memory.high",
);
let limits = MemoryLimits {
high: new_cgroup_mem_high,
};
let limits = MemoryLimits::new(new_cgroup_mem_high, available_memory);
cgroup
.set_limits(&limits)
.context("failed to set file cache size")?;

View File

@@ -14,7 +14,7 @@ use tracing::*;
use utils::id::NodeId;
mod metrics;
use metrics::MetricsKey;
use metrics::{Ids, MetricsKey};
mod disk_cache;
mod upload;
@@ -68,11 +68,10 @@ pub async fn collect_metrics(
},
);
let path: Arc<PathBuf> = Arc::new(local_disk_storage);
let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);
let cancel = task_mgr::shutdown_token();
let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval);
let mut cached_metrics = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
@@ -109,14 +108,14 @@ pub async fn collect_metrics(
// already here, better to try to flush the new values.
let flush = async {
match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
match disk_cache::flush_metrics_to_disk(&metrics, &final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(e) => {
// idea here is that if someone creates a directory as our path, then they
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {path:?}: {e:#}");
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
}
}
};
@@ -153,10 +152,12 @@ pub async fn collect_metrics(
///
/// Cancellation safe.
async fn restore_and_reschedule(
path: &Arc<PathBuf>,
final_path: &Arc<PathBuf>,
metric_collection_interval: Duration,
) -> Cache {
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(final_path.clone())
.await
{
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
@@ -174,11 +175,12 @@ async fn restore_and_reschedule(
use std::io::{Error, ErrorKind};
let root = e.root_cause();
let maybe_ioerr = root.downcast_ref::<Error>();
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
if !is_not_found {
tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}");
}
(HashMap::new(), None)

View File

@@ -9,13 +9,6 @@ pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result
let span = tracing::Span::current();
tokio::task::spawn_blocking(move || {
let _e = span.entered();
if let Some(parent) = path.parent() {
if let Err(e) = scan_and_delete_with_same_prefix(&path) {
tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
}
}
let mut file = std::fs::File::open(&*path)?;
let reader = std::io::BufReader::new(&mut file);
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
@@ -25,68 +18,26 @@ pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result
.and_then(|x| x)
}
fn scan_and_delete_with_same_prefix(path: &std::path::Path) -> std::io::Result<()> {
let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
let prefix = path.file_name().expect("caller checked").to_string_lossy();
for entry in it {
let entry = entry?;
if !entry.metadata()?.is_file() {
continue;
}
let file_name = entry.file_name();
if path.file_name().unwrap() == file_name {
// do not remove our actual file
continue;
}
let file_name = file_name.to_string_lossy();
if !file_name.starts_with(&*prefix) {
continue;
}
let path = entry.path();
if let Err(e) = std::fs::remove_file(&path) {
tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
} else {
tracing::info!("cleaned up old tempfile {file_name:?}");
}
}
Ok(())
}
pub(super) async fn flush_metrics_to_disk(
current_metrics: &Arc<Vec<RawMetric>>,
path: &Arc<PathBuf>,
final_path: &Arc<PathBuf>,
) -> anyhow::Result<()> {
use std::io::Write;
anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
anyhow::ensure!(
path.file_name().is_some(),
"path must have filename: {path:?}"
final_path.parent().is_some(),
"path must have parent: {final_path:?}"
);
let span = tracing::Span::current();
tokio::task::spawn_blocking({
let current_metrics = current_metrics.clone();
let path = path.clone();
let final_path = final_path.clone();
move || {
let _e = span.entered();
let parent = path.parent().expect("existence checked");
let file_name = path.file_name().expect("existence checked");
let mut tempfile = tempfile::Builder::new()
.prefix(file_name)
.suffix(".tmp")
.tempfile_in(parent)?;
tracing::debug!("using tempfile {:?}", tempfile.path());
let mut tempfile =
tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?;
// write out all of the raw metrics, to be read out later on restart as cached values
{
@@ -101,17 +52,15 @@ pub(super) async fn flush_metrics_to_disk(
tempfile.flush()?;
tempfile.as_file().sync_all()?;
fail::fail_point!("before-persist-last-metrics-collected");
drop(tempfile.persist(&*final_path)?);
drop(tempfile.persist(&*path).map_err(|e| e.error)?);
let f = std::fs::File::open(path.parent().unwrap())?;
let f = std::fs::File::open(final_path.parent().unwrap())?;
f.sync_all()?;
anyhow::Ok(())
}
})
.await
.with_context(|| format!("write metrics to {path:?} join error"))
.and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
.with_context(|| format!("write metrics to {final_path:?} join error"))
.and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}")))
}

View File

@@ -1,21 +1,36 @@
use crate::context::RequestContext;
use anyhow::Context;
use crate::tenant::mgr;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use serde_with::serde_as;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use pageserver_api::models::TenantState;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::sync::Arc;
use std::time::SystemTime;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use anyhow::Context;
use super::{Cache, RawMetric};
// FIXME: all other consumption_metrics::Event stuff is over at uploading, maybe move?
#[serde_as]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)]
pub(super) struct Ids {
#[serde_as(as = "DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
/// instead of static str.
// Do not rename any of these without first consulting with data team and partner
// management.
// FIXME: write those tests before refactoring to this!
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(super) enum Name {
/// Timeline last_record_lsn, absolute
@@ -44,7 +59,7 @@ pub(super) enum Name {
/// elsewhere.
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(crate) struct MetricsKey {
pub(super) struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
@@ -68,7 +83,7 @@ impl MetricsKey {
struct AbsoluteValueFactory(MetricsKey);
impl AbsoluteValueFactory {
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
@@ -83,7 +98,7 @@ struct IncrementalValueFactory(MetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
const fn from_until(
fn from_previous_up_to(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
@@ -91,11 +106,16 @@ impl IncrementalValueFactory {
) -> RawMetric {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
let when = EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
};
(key, (when, val))
(
key,
(
EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
},
val,
),
)
}
fn key(&self) -> &MetricsKey {
@@ -189,11 +209,9 @@ pub(super) async fn collect_all_metrics(
cached_metrics: &Cache,
ctx: &RequestContext,
) -> Vec<RawMetric> {
use pageserver_api::models::TenantState;
let started_at = std::time::Instant::now();
let tenants = match crate::tenant::mgr::list_tenants().await {
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(err) => {
tracing::error!("failed to list tenants: {:?}", err);
@@ -205,7 +223,7 @@ pub(super) async fn collect_all_metrics(
if state != TenantState::Active {
None
} else {
crate::tenant::mgr::get_tenant(id, true)
mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
@@ -267,7 +285,7 @@ where
current_metrics
}
/// In-between abstraction to allow testing metrics without actual Tenants.
/// Testing helping in-between abstraction allowing testing metrics without actual Tenants.
struct TenantSnapshot {
resident_size: u64,
remote_size: u64,
@@ -423,14 +441,14 @@ impl TimelineSnapshot {
let up_to = now;
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta);
// written_size_delta
metrics.push(key_value);
// written_size
metrics.push((key, written_size_now));
} else {
// the cached value was ahead of us, report zero until we've caught up
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0));
// the cached value was ahead of us, report the same until we've caught up
metrics.push((key, (written_size_now.0, prev.1)));
}
@@ -450,6 +468,3 @@ impl TimelineSnapshot {
#[cfg(test)]
mod tests;
#[cfg(test)]
pub(crate) use tests::metric_examples;

View File

@@ -1,7 +1,13 @@
use super::*;
use std::collections::HashMap;
use std::time::SystemTime;
use utils::lsn::Lsn;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use super::*;
use chrono::{DateTime, Utc};
#[test]
fn startup_collected_timeline_metrics_before_advancing() {
@@ -27,7 +33,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
snap.loaded_at.1.into(),
now,
0
@@ -67,7 +73,8 @@ fn startup_collected_timeline_metrics_second_round() {
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
@@ -93,7 +100,11 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
// at t=before was the last time the last_record_lsn changed
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
// end time of this event is used for the next ones
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before,
just_before,
0,
),
]);
let snap = TimelineSnapshot {
@@ -107,13 +118,81 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
just_before,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn metric_image_stability() {
// it is important that these strings stay as they are
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap();
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap();
let [now, before] = [DateTime::<Utc>::from(now), DateTime::from(before)];
let examples = [
(
line!(),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
MetricsKey::resident_size(tenant_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
MetricsKey::synthetic_size(tenant_id).at(now, 1),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
),
];
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0);
for (line, (key, (kind, value)), expected) in examples {
let e = consumption_metrics::Event {
kind,
metric: key.metric,
idempotency_key: idempotency_key.to_string(),
value,
extra: Ids {
tenant_id: key.tenant_id,
timeline_id: key.timeline_id,
},
};
let actual = serde_json::to_string(&e).unwrap();
assert_eq!(expected, actual, "example from line {line}");
}
}
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
@@ -141,7 +220,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
way_before,
before_restart,
// not taken into account, but the timestamps are important
@@ -155,7 +234,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before_restart,
now,
0
@@ -173,7 +252,8 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
]
);
@@ -279,19 +359,3 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
times
}
pub(crate) const fn metric_examples(
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [RawMetric; 6] {
[
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
MetricsKey::resident_size(tenant_id).at(now, 0),
MetricsKey::synthetic_size(tenant_id).at(now, 1),
]
}

View File

@@ -1,21 +1,8 @@
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
use serde_with::serde_as;
use consumption_metrics::{idempotency_key, Event, EventChunk, CHUNK_SIZE};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::*;
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
use utils::id::{TenantId, TimelineId};
/// How the metrics from pageserver are identified.
#[serde_with::serde_as]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
struct Ids {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}
use super::{Cache, Ids, RawMetric};
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
pub(super) async fn upload_metrics(
@@ -26,21 +13,44 @@ pub(super) async fn upload_metrics(
metrics: &[RawMetric],
cached_metrics: &mut Cache,
) -> anyhow::Result<()> {
use bytes::BufMut;
let mut uploaded = 0;
let mut failed = 0;
let started_at = std::time::Instant::now();
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
let mut buffer = bytes::BytesMut::new();
let mut chunk_to_send = Vec::new();
while let Some(res) = iter.next() {
let (chunk, body) = res?;
for chunk in metrics.chunks(CHUNK_SIZE) {
chunk_to_send.clear();
// FIXME: this should always overwrite and truncate to chunk.len()
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
// FIXME: finally write! this to the prev allocation
idempotency_key: idempotency_key(node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
serde_json::to_writer(
(&mut buffer).writer(),
&EventChunk {
events: (&chunk_to_send).into(),
},
)?;
let body = buffer.split().freeze();
let event_bytes = body.len();
let is_last = iter.len() == 0;
let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
let res = upload(client, metric_collection_endpoint, body, cancel)
.instrument(tracing::info_span!(
"upload",
%event_bytes,
@@ -78,150 +88,6 @@ pub(super) async fn upload_metrics(
Ok(())
}
// The return type is quite ugly, but we gain testability in isolation
fn serialize_in_chunks<'a, F>(
chunk_size: usize,
input: &'a [RawMetric],
factory: F,
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
where
F: KeyGen<'a> + 'a,
{
use bytes::BufMut;
struct Iter<'a, F> {
inner: std::slice::Chunks<'a, RawMetric>,
chunk_size: usize,
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
buffer: bytes::BytesMut,
// chunk amount of events are reused to produce the serialized document
scratch: Vec<Event<Ids, Name>>,
factory: F,
}
impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
let chunk = self.inner.next()?;
if self.scratch.is_empty() {
// first round: create events with N strings
self.scratch.extend(
chunk
.iter()
.map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
);
} else {
// next rounds: update_in_place to reuse allocations
assert_eq!(self.scratch.len(), self.chunk_size);
self.scratch
.iter_mut()
.zip(chunk.iter())
.for_each(|(slot, raw_metric)| {
raw_metric.update_in_place(slot, &self.factory.generate())
});
}
let res = serde_json::to_writer(
(&mut self.buffer).writer(),
&EventChunk {
events: (&self.scratch[..chunk.len()]).into(),
},
);
match res {
Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
Err(e) => Some(Err(e)),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
let buffer = bytes::BytesMut::new();
let inner = input.chunks(chunk_size);
let scratch = Vec::new();
Iter {
inner,
chunk_size,
buffer,
scratch,
factory,
}
}
trait RawMetricExt {
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
}
impl RawMetricExt for RawMetric {
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
let MetricsKey {
metric,
tenant_id,
timeline_id,
} = self.0;
let (kind, value) = self.1;
Event {
kind,
metric,
idempotency_key: key.to_string(),
value,
extra: Ids {
tenant_id,
timeline_id,
},
}
}
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
use std::fmt::Write;
let MetricsKey {
metric,
tenant_id,
timeline_id,
} = self.0;
let (kind, value) = self.1;
*event = Event {
kind,
metric,
idempotency_key: {
event.idempotency_key.clear();
write!(event.idempotency_key, "{key}").unwrap();
std::mem::take(&mut event.idempotency_key)
},
value,
extra: Ids {
tenant_id,
timeline_id,
},
};
}
}
trait KeyGen<'a>: Copy {
fn generate(&self) -> IdempotencyKey<'a>;
}
impl<'a> KeyGen<'a> for &'a str {
fn generate(&self) -> IdempotencyKey<'a> {
IdempotencyKey::generate(self)
}
}
enum UploadError {
Rejected(reqwest::StatusCode),
Reqwest(reqwest::Error),
@@ -253,16 +119,11 @@ impl UploadError {
}
}
// this is consumed by the test verifiers
static LAST_IN_BATCH: reqwest::header::HeaderName =
reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
async fn upload(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
body: bytes::Bytes,
cancel: &CancellationToken,
is_last: bool,
) -> Result<(), UploadError> {
let warn_after = 3;
let max_attempts = 10;
@@ -273,24 +134,17 @@ async fn upload(
let res = client
.post(metric_collection_endpoint.clone())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(
LAST_IN_BATCH.clone(),
if is_last { "true" } else { "false" },
)
.body(body)
.send()
.await;
let res = res.and_then(|res| res.error_for_status());
// 10 redirects are normally allowed, so we don't need worry about 3xx
match res {
Ok(_response) => Ok(()),
Err(e) => {
let status = e.status().filter(|s| s.is_client_error());
if let Some(status) = status {
// rejection used to be a thing when the server could reject a
// whole batch of metrics if one metric was bad.
Err(UploadError::Rejected(status))
} else {
Err(UploadError::Reqwest(e))
@@ -321,123 +175,3 @@ async fn upload(
res
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
#[test]
fn chunked_serialization() {
let examples = metric_samples();
assert!(examples.len() > 1);
let factory = FixedGen::new(Utc::now(), "1", 42);
// need to use Event here because serde_json::Value uses default hashmap, not linked
// hashmap
#[derive(serde::Deserialize)]
struct EventChunk {
events: Vec<Event<Ids, Name>>,
}
let correct = serialize_in_chunks(examples.len(), &examples, factory)
.map(|res| res.unwrap().1)
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
.collect::<Vec<_>>();
for chunk_size in 1..examples.len() {
let actual = serialize_in_chunks(chunk_size, &examples, factory)
.map(|res| res.unwrap().1)
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
.collect::<Vec<_>>();
// if these are equal, it means that multi-chunking version works as well
assert_eq!(correct, actual);
}
}
#[derive(Clone, Copy)]
struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
impl<'a> FixedGen<'a> {
fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
FixedGen(now, node_id, nonce)
}
}
impl<'a> KeyGen<'a> for FixedGen<'a> {
fn generate(&self) -> IdempotencyKey<'a> {
IdempotencyKey::for_tests(self.0, self.1, self.2)
}
}
static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
.unwrap()
.into()
});
#[test]
fn metric_image_stability() {
// it is important that these strings stay as they are
let examples = [
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
),
];
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
let examples = examples.into_iter().zip(metric_samples());
for ((line, expected), (key, (kind, value))) in examples {
let e = consumption_metrics::Event {
kind,
metric: key.metric,
idempotency_key: idempotency_key.to_string(),
value,
extra: Ids {
tenant_id: key.tenant_id,
timeline_id: key.timeline_id,
},
};
let actual = serde_json::to_string(&e).unwrap();
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
}
}
fn metric_samples() -> [RawMetric; 6] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
.unwrap()
.into();
let [now, before] = [*SAMPLES_NOW, before];
super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
}
}

View File

@@ -129,8 +129,10 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
pub struct PageCacheMetrics {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_ephemeral: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_ephemeral: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
@@ -161,12 +163,24 @@ pub static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
.unwrap()
},
read_accesses_ephemeral: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["ephemeral"])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["immutable"])
.unwrap()
},
read_hits_ephemeral: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["ephemeral", "-"])
.unwrap()
},
read_hits_immutable: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["immutable", "-"])

View File

@@ -2,3 +2,4 @@ comment = '** Deprecated ** Please use pg_embedding instead'
default_version = '0.1.0'
module_pathname = '$libdir/hnsw'
relocatable = true
trusted = true

View File

@@ -222,9 +222,8 @@ lfc_change_limit_hook(int newval, void *extra)
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
return;
/* Open cache file if not done yet */

View File

@@ -566,9 +566,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
}
initStringInfo(&safekeeper[n_safekeepers].outbuf);
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
if (safekeeper[n_safekeepers].xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
safekeeper[n_safekeepers].xlogreader = NULL;
safekeeper[n_safekeepers].flushWrite = false;
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr;
@@ -716,6 +714,12 @@ ShutdownConnection(Safekeeper *sk)
sk->voteResponse.termHistory.entries = NULL;
HackyRemoveWalProposerEvent(sk);
if (sk->xlogreader)
{
NeonWALReaderFree(sk->xlogreader);
sk->xlogreader = NULL;
}
}
/*
@@ -1238,8 +1242,8 @@ HandleElectedProposer(void)
LSN_FORMAT_ARGS(truncateLsn),
LSN_FORMAT_ARGS(propEpochStartLsn));
/* Perform recovery */
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
elog(FATAL, "Failed to recover state");
// if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
// elog(FATAL, "Failed to recover state");
}
else if (syncSafekeepers)
{
@@ -1555,6 +1559,12 @@ SendProposerElected(Safekeeper *sk)
term_t lastCommonTerm;
int i;
/* It's a good moment to create WAL reader */
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, propEpochStartLsn);
if (!sk->xlogreader)
elog(FATAL, "failed to allocate xlog reader");
/*
* Determine start LSN by comparing safekeeper's log term switch history
* and proposer's, searching for the divergence point.
@@ -1834,19 +1844,24 @@ SendAppendRequests(Safekeeper *sk)
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
if (!WALRead(sk->xlogreader,
if (!NeonWALRead(sk->xlogreader,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn,
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline_id here */
1,
1
#else
ThisTimeLineID,
ThisTimeLineID
#endif
&errinfo))
))
{
WALReadRaiseError(&errinfo);
elog(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
sk->xlogreader->err_msg);
ShutdownConnection(sk);
return false;
}
sk->outbuf.len += req->endLsn - req->beginLsn;

View File

@@ -2,6 +2,7 @@
#define __NEON_WALPROPOSER_H__
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
#include "postgres.h"
#include "port.h"
#include "access/xlog_internal.h"
@@ -327,6 +328,24 @@ typedef struct AppendResponse
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
#define NEON_WALREADER_ERR_MSG_LEN 128
/*
* Like WALRead, but returns error instead of throwing ERROR when segment is
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
*/
typedef struct {
/* LSN before */
XLogRecPtr available_lsn;
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
} NeonWALReader;
/*
* Descriptor of safekeeper
*/
@@ -358,7 +377,7 @@ typedef struct Safekeeper
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState *xlogreader;
NeonWALReader *xlogreader;
/*
* Streaming will start here; must be record boundary.
@@ -508,4 +527,9 @@ extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_
extern uint64 BackpressureThrottlingTime(void);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn);
extern void NeonWALReaderFree(NeonWALReader *state);
extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -12,6 +12,7 @@
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "utils/wait_event.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
@@ -657,3 +658,185 @@ XLogBroadcastWalProposer(void)
set_ps_display(activitymsg);
}
}
/* palloc and initialize NeonWALReader */
NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
{
NeonWALReader *reader;
reader = (NeonWALReader *)
palloc_extended(sizeof(NeonWALReader),
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
if (!reader)
return NULL;
reader->available_lsn = available_lsn;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
reader->segcxt.ws_segsize = wal_segment_size;
return reader;
}
static void neon_wal_segment_close(NeonWALReader *state);
void
NeonWALReaderFree(NeonWALReader *state)
{
if (state->seg.ws_file != -1)
neon_wal_segment_close(state);
pfree(state);
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return true;
return false;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
void
neon_wal_segment_close(NeonWALReader *state)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
/*
* Mostly copy of vanilla WALRead, but 1) returns error if requested data before
* available_lsn 2) returns error is segment is missing instead of throwing
* ERROR.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'state->errno' shows whether it was missing WAL (ENOENT) or something else,
* and 'err' the desciption.
*/
bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
char *p;
XLogRecPtr recptr;
Size nbytes;
if (startptr < state->available_lsn)
{
state->wre_errno = 0;
snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X",
LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn));
return false;
}
p = buf;
recptr = startptr;
nbytes = count;
while (nbytes > 0)
{
uint32 startoff;
int segbytes;
int readbytes;
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/*
* If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's
* provided openSegment callback.
*/
if (state->seg.ws_file < 0 ||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
tli != state->seg.ws_tli)
{
XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0)
neon_wal_segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
if (!neon_wal_segment_open(state, nextSegNo, &tli))
{
char fname[MAXFNAMELEN];
state->wre_errno = errno;
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
return false;
}
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);
/* Update the current segment info. */
state->seg.ws_tli = tli;
state->seg.ws_segno = nextSegNo;
}
/* How many bytes are within this segment? */
if (nbytes > (state->segcxt.ws_segsize - startoff))
segbytes = state->segcxt.ws_segsize - startoff;
else
segbytes = nbytes;
#ifndef FRONTEND
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
#endif
/* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0;
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND
pgstat_report_wait_end();
#endif
if (readbytes <= 0)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
if (readbytes < 0)
{
state->wre_errno = errno;
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
fname, startoff, strerror(state->wre_errno));
}
else
{
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
fname, startoff);
}
return false;
}
/* Update state for read */
recptr += readbytes;
nbytes -= readbytes;
p += readbytes;
}
return true;
}

50
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -887,34 +887,34 @@ files = [
[[package]]
name = "cryptography"
version = "41.0.4"
version = "41.0.3"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
optional = false
python-versions = ">=3.7"
files = [
{file = "cryptography-41.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:80907d3faa55dc5434a16579952ac6da800935cd98d14dbd62f6f042c7f5e839"},
{file = "cryptography-41.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:35c00f637cd0b9d5b6c6bd11b6c3359194a8eba9c46d4e875a3660e3b400005f"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cecfefa17042941f94ab54f769c8ce0fe14beff2694e9ac684176a2535bf9714"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e40211b4923ba5a6dc9769eab704bdb3fbb58d56c5b336d30996c24fcf12aadb"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:23a25c09dfd0d9f28da2352503b23e086f8e78096b9fd585d1d14eca01613e13"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2ed09183922d66c4ec5fdaa59b4d14e105c084dd0febd27452de8f6f74704143"},
{file = "cryptography-41.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:5a0f09cefded00e648a127048119f77bc2b2ec61e736660b5789e638f43cc397"},
{file = "cryptography-41.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:9eeb77214afae972a00dee47382d2591abe77bdae166bda672fb1e24702a3860"},
{file = "cryptography-41.0.4-cp37-abi3-win32.whl", hash = "sha256:3b224890962a2d7b57cf5eeb16ccaafba6083f7b811829f00476309bce2fe0fd"},
{file = "cryptography-41.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c880eba5175f4307129784eca96f4e70b88e57aa3f680aeba3bab0e980b0f37d"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:004b6ccc95943f6a9ad3142cfabcc769d7ee38a3f60fb0dddbfb431f818c3a67"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:86defa8d248c3fa029da68ce61fe735432b047e32179883bdb1e79ed9bb8195e"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:37480760ae08065437e6573d14be973112c9e6dcaf5f11d00147ee74f37a3829"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:b5f4dfe950ff0479f1f00eda09c18798d4f49b98f4e2006d644b3301682ebdca"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:7e53db173370dea832190870e975a1e09c86a879b613948f09eb49324218c14d"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5b72205a360f3b6176485a333256b9bcd48700fc755fef51c8e7e67c4b63e3ac"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:93530900d14c37a46ce3d6c9e6fd35dbe5f5601bf6b3a5c325c7bffc030344d9"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:efc8ad4e6fc4f1752ebfb58aefece8b4e3c4cae940b0994d43649bdfce8d0d4f"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:c3391bd8e6de35f6f1140e50aaeb3e2b3d6a9012536ca23ab0d9c35ec18c8a91"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:0d9409894f495d465fe6fda92cb70e8323e9648af912d5b9141d616df40a87b8"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:8ac4f9ead4bbd0bc8ab2d318f97d85147167a488be0e08814a37eb2f439d5cf6"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:047c4603aeb4bbd8db2756e38f5b8bd7e94318c047cfe4efeb5d715e08b49311"},
{file = "cryptography-41.0.4.tar.gz", hash = "sha256:7febc3094125fc126a7f6fb1f420d0da639f3f32cb15c8ff0dc3997c4549f51a"},
{file = "cryptography-41.0.3-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:652627a055cb52a84f8c448185922241dd5217443ca194d5739b44612c5e6507"},
{file = "cryptography-41.0.3-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:8f09daa483aedea50d249ef98ed500569841d6498aa9c9f4b0531b9964658922"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4fd871184321100fb400d759ad0cddddf284c4b696568204d281c902fc7b0d81"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84537453d57f55a50a5b6835622ee405816999a7113267739a1b4581f83535bd"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:3fb248989b6363906827284cd20cca63bb1a757e0a2864d4c1682a985e3dca47"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:42cb413e01a5d36da9929baa9d70ca90d90b969269e5a12d39c1e0d475010116"},
{file = "cryptography-41.0.3-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:aeb57c421b34af8f9fe830e1955bf493a86a7996cc1338fe41b30047d16e962c"},
{file = "cryptography-41.0.3-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6af1c6387c531cd364b72c28daa29232162010d952ceb7e5ca8e2827526aceae"},
{file = "cryptography-41.0.3-cp37-abi3-win32.whl", hash = "sha256:0d09fb5356f975974dbcb595ad2d178305e5050656affb7890a1583f5e02a306"},
{file = "cryptography-41.0.3-cp37-abi3-win_amd64.whl", hash = "sha256:a983e441a00a9d57a4d7c91b3116a37ae602907a7618b882c8013b5762e80574"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5259cb659aa43005eb55a0e4ff2c825ca111a0da1814202c64d28a985d33b087"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:67e120e9a577c64fe1f611e53b30b3e69744e5910ff3b6e97e935aeb96005858"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:7efe8041897fe7a50863e51b77789b657a133c75c3b094e51b5e4b5cec7bf906"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:ce785cf81a7bdade534297ef9e490ddff800d956625020ab2ec2780a556c313e"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:57a51b89f954f216a81c9d057bf1a24e2f36e764a1ca9a501a6964eb4a6800dd"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:4c2f0d35703d61002a2bbdcf15548ebb701cfdd83cdc12471d2bae80878a4207"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:23c2d778cf829f7d0ae180600b17e9fceea3c2ef8b31a99e3c694cbbf3a24b84"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:95dd7f261bb76948b52a5330ba5202b91a26fbac13ad0e9fc8a3ac04752058c7"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:41d7aa7cdfded09b3d73a47f429c298e80796c8e825ddfadc84c8a7f12df212d"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d0d651aa754ef58d75cec6edfbd21259d93810b73f6ec246436a21b7841908de"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:ab8de0d091acbf778f74286f4989cf3d1528336af1b59f3e5d2ebca8b5fe49e1"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:a74fbcdb2a0d46fe00504f571a2a540532f4c188e6ccf26f1f178480117b33c4"},
{file = "cryptography-41.0.3.tar.gz", hash = "sha256:6d192741113ef5e30d89dcb5b956ef4e1578f304708701b8b73d38e3e1461f34"},
]
[package.dependencies]

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.72.1"
channel = "1.72.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -2,7 +2,7 @@
// Main entry point for the safekeeper executable
//
use anyhow::{bail, Context, Result};
use clap::Parser;
use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
@@ -105,6 +105,9 @@ struct Args {
/// it during this period passed as a human readable duration.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
heartbeat_timeout: Duration,
/// Disable/enable peer recovery. Used for disabling it in tests.
#[arg(long, default_value = "true", action=ArgAction::Set)]
peer_recovery: bool,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
@@ -268,6 +271,7 @@ async fn main() -> anyhow::Result<()> {
broker_endpoint: args.broker_endpoint,
broker_keepalive_interval: args.broker_keepalive_interval,
heartbeat_timeout: args.heartbeat_timeout,
peer_recovery_enabled: args.peer_recovery,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
wal_backup_enabled: !args.disable_wal_backup,

View File

@@ -372,6 +372,13 @@ impl SafekeeperPostgresHandler {
/// from a walproposer recovery function. This connection gets a special handling:
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
pub fn is_walproposer_recovery(&self) -> bool {
self.appname == Some("wal_proposer_recovery".to_string())
match &self.appname {
None => false,
Some(appname) => {
appname == "wal_proposer_recovery" ||
// set by safekeeper peer recovery
appname.starts_with("safekeeper")
}
}
}
}

View File

@@ -16,8 +16,8 @@ use tokio::io::AsyncReadExt;
use utils::http::endpoint::request_span;
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{debug_dump, pull_timeline};
@@ -60,16 +60,25 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}
/// Same as TermSwitchEntry, but serializes LSN using display serializer
/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
#[serde_as(as = "DisplayFromStr")]
pub lsn: Lsn,
}
impl From<TermSwitchApiEntry> for TermLsn {
fn from(api_val: TermSwitchApiEntry) -> Self {
TermLsn {
term: api_val.term,
lsn: api_val.lsn,
}
}
}
/// Augment AcceptorState with epoch for convenience
#[derive(Debug, Serialize, Deserialize)]
pub struct AcceptorStateStatus {

View File

@@ -62,6 +62,7 @@ pub struct SafeKeeperConf {
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub heartbeat_timeout: Duration,
pub peer_recovery_enabled: bool,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
pub backup_parallel_jobs: usize,
@@ -100,6 +101,7 @@ impl SafeKeeperConf {
.parse()
.expect("failed to parse default broker endpoint"),
broker_keepalive_interval: Duration::from_secs(5),
peer_recovery_enabled: true,
wal_backup_enabled: true,
backup_parallel_jobs: 1,
pg_auth: None,

View File

@@ -55,9 +55,12 @@ impl WalReceivers {
/// Register new walreceiver. Returned guard provides access to the slot and
/// automatically deregisters in Drop.
pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
let slots = &mut self.mutex.lock().slots;
let walreceiver = WalReceiverState::Voting;
let walreceiver = WalReceiverState {
conn_id,
status: WalReceiverStatus::Voting,
};
// find empty slot or create new one
let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
slots[pos] = Some(walreceiver);
@@ -96,6 +99,18 @@ impl WalReceivers {
self.mutex.lock().slots.iter().flatten().cloned().collect()
}
/// Get number of streaming walreceivers (normally 0 or 1) from compute.
pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
self.mutex
.lock()
.slots
.iter()
.flatten()
// conn_id.is_none skips recovery which also registers here
.filter(|s| s.conn_id.is_none() && matches!(s.status, WalReceiverStatus::Streaming))
.count()
}
/// Unregister walsender.
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
let mut shared = self.mutex.lock();
@@ -108,10 +123,17 @@ struct WalReceiversShared {
slots: Vec<Option<WalReceiverState>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
pub conn_id: Option<ConnectionId>,
pub status: WalReceiverStatus,
}
/// Walreceiver status. Currently only whether it passed voting stage and
/// started receiving the stream, but it is easy to add more if needed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalReceiverState {
pub enum WalReceiverStatus {
Voting,
Streaming,
}
@@ -136,8 +158,8 @@ impl Drop for WalReceiverGuard {
}
}
const MSG_QUEUE_SIZE: usize = 256;
const REPLY_QUEUE_SIZE: usize = 16;
pub const MSG_QUEUE_SIZE: usize = 256;
pub const REPLY_QUEUE_SIZE: usize = 16;
impl SafekeeperPostgresHandler {
/// Wrapper around handle_start_wal_push_guts handling result. Error is
@@ -261,7 +283,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
tli.clone(),
msg_rx,
reply_tx,
self.conn_id,
Some(self.conn_id),
));
// Forward all messages to WalAcceptor
@@ -317,31 +339,41 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
pub struct WalAcceptor {
tli: Arc<Timeline>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: Option<ConnectionId>,
}
impl WalAcceptor {
/// Spawn thread with WalAcceptor running, return handle to it.
fn spawn(
/// Spawn task with WalAcceptor running, return handle to it. Task returns
/// Ok(()) if either of channels has closed, and Err if any error during
/// message processing is encountered.
///
/// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
pub fn spawn(
tli: Arc<Timeline>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: ConnectionId,
conn_id: Option<ConnectionId>,
) -> JoinHandle<anyhow::Result<()>> {
task::spawn(async move {
let mut wa = WalAcceptor {
tli,
msg_rx,
reply_tx,
conn_id,
};
let span_ttid = wa.tli.ttid; // satisfy borrow checker
wa.run()
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
.instrument(
info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
)
.await
})
}
@@ -355,7 +387,7 @@ impl WalAcceptor {
let _compute_conn_guard = ComputeConnectionGuard {
timeline: Arc::clone(&self.tli),
};
let walreceiver_guard = self.tli.get_walreceivers().register();
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
self.tli.update_status_notify().await?;
// After this timestamp we will stop processing AppendRequests and send a response
@@ -372,7 +404,7 @@ impl WalAcceptor {
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
*walreceiver_guard.get() = WalReceiverState::Streaming;
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {

View File

@@ -1,17 +1,41 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.
use std::sync::Arc;
use std::time::SystemTime;
use std::{fmt, pin::pin, sync::Arc};
use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};
use anyhow::{bail, Context};
use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::timeout;
use tokio::{
select,
time::sleep,
time::{self, Duration},
};
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::types::PgLsn;
use tracing::*;
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
use crate::{timeline::Timeline, SafeKeeperConf};
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
use crate::safekeeper::{AppendRequest, AppendRequestHeader};
use crate::{
http::routes::TimelineStatus,
receive_wal::MSG_QUEUE_SIZE,
safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
TermLsn, VoteRequest,
},
timeline::{PeerInfo, Timeline},
SafeKeeperConf,
};
/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
@@ -22,19 +46,387 @@ pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
};
select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = recovery_main_loop(tli, conf) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}
/// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and
/// fields to explain the choice.
#[derive(Debug)]
pub struct RecoveryNeededInfo {
/// my term
pub term: Term,
/// my last_log_term
pub last_log_term: Term,
/// my flush_lsn
pub flush_lsn: Lsn,
/// peers from which we can fetch WAL, for observability.
pub peers: Vec<PeerInfo>,
/// for observability
pub num_streaming_computes: usize,
pub donors: Vec<Donor>,
}
// Custom to omit not important fields from PeerInfo.
impl fmt::Display for RecoveryNeededInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{{")?;
write!(
f,
"term: {}, last_log_term: {}, flush_lsn: {}, peers: {{",
self.term, self.last_log_term, self.flush_lsn
)?;
for p in self.peers.iter() {
write!(
f,
"PeerInfo {{ sk_id: {}, term: {}, last_log_term: {}, flush_lsn: {} }}, ",
p.sk_id, p.term, p.last_log_term, p.flush_lsn
)?;
}
write!(
f,
"}} num_streaming_computes: {}, donors: {:?}",
self.num_streaming_computes, self.donors
)
}
}
#[derive(Clone, Debug)]
pub struct Donor {
pub sk_id: NodeId,
/// equals to last_log_term
pub term: Term,
pub flush_lsn: Lsn,
pub pg_connstr: String,
pub http_connstr: String,
}
impl From<&PeerInfo> for Donor {
fn from(p: &PeerInfo) -> Self {
Donor {
sk_id: p.sk_id,
term: p.term,
flush_lsn: p.flush_lsn,
pg_connstr: p.pg_connstr.clone(),
http_connstr: p.http_connstr.clone(),
}
}
}
const CHECK_INTERVAL_MS: u64 = 2000;
/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
match recovery_needed_info.donors.first() {
Some(donor) => {
info!(
"starting recovery from donor {}: {}",
donor.sk_id, recovery_needed_info
);
match recover(tli.clone(), donor, &conf).await {
// Note: 'write_wal rewrites WAL written before' error is
// expected here and might happen if compute and recovery
// concurrently write the same data. Eventually compute
// should win.
Err(e) => warn!("recovery failed: {:#}", e),
Ok(msg) => info!("recovery finished: {}", msg),
}
}
None => {
trace!(
"recovery not needed or not possible: {}",
recovery_needed_info
);
}
}
sleep(check_duration).await;
}
}
/// Recover from the specified donor. Returns message explaining normal finish
/// reason or error.
async fn recover(
tli: Arc<Timeline>,
donor: &Donor,
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// Learn donor term switch history to figure out starting point.
let client = reqwest::Client::new();
let timeline_info: TimelineStatus = client
.get(format!(
"http://{}/v1/tenant/{}/timeline/{}",
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
))
.send()
.await?
.json()
.await?;
if timeline_info.acceptor_state.term != donor.term {
bail!(
"donor term changed from {} to {}",
donor.term,
timeline_info.acceptor_state.term
);
}
// convert from API TermSwitchApiEntry into TermLsn.
let donor_th = TermHistory(
timeline_info
.acceptor_state
.term_history
.iter()
.map(|tl| Into::<TermLsn>::into(*tl))
.collect(),
);
// Now understand our term history.
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
let vote_response = match tli
.process_msg(&vote_request)
.await
.context("VoteRequest handling")?
{
Some(AcceptorProposerMessage::VoteResponse(vr)) => vr,
_ => {
bail!("unexpected VoteRequest response"); // unreachable
}
};
if vote_response.term != donor.term {
bail!(
"our term changed from {} to {}",
donor.term,
vote_response.term
);
}
let last_common_point = match TermHistory::find_highest_common_point(
&donor_th,
&vote_response.term_history,
vote_response.flush_lsn,
) {
None => bail!(
"couldn't find common point in histories, donor {:?}, sk {:?}",
donor_th,
vote_response.term_history,
),
Some(lcp) => lcp,
};
info!("found last common point at {:?}", last_common_point);
// truncate WAL locally
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
term: donor.term,
start_streaming_at: last_common_point.lsn,
term_history: donor_th,
timeline_start_lsn: Lsn::INVALID,
});
// Successful ProposerElected handling always returns None. If term changed,
// we'll find out that during the streaming. Note: it is expected to get
// 'refusing to overwrite correct WAL' here if walproposer reconnected
// concurrently, restart helps here.
tli.process_msg(&pe)
.await
.context("ProposerElected handling")?;
recovery_stream(tli, donor, last_common_point.lsn, conf).await
}
// Pull WAL from donor, assuming handshake is already done.
async fn recovery_stream(
tli: Arc<Timeline>,
donor: &Donor,
start_streaming_at: Lsn,
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// TODO: pass auth token
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
let connect_timeout = Duration::from_millis(10000);
let (client, connection) = match time::timeout(connect_timeout, cfg.connect(postgres::NoTls))
.await
{
Ok(client_and_conn) => client_and_conn?,
Err(_elapsed) => {
bail!("timed out while waiting {connect_timeout:?} for connection to peer safekeeper to open");
}
};
trace!("connected to {:?}", donor);
// The connection object performs the actual communication with the
// server, spawn it off to run on its own.
let ttid = tli.ttid;
tokio::spawn(async move {
if let Err(e) = connection
.instrument(info_span!("recovery task connection poll", ttid = %ttid))
.await
{
// This logging isn't very useful as error is anyway forwarded to client.
trace!(
"tokio_postgres connection object finished with error: {}",
e
);
}
});
let query = format!(
"START_REPLICATION PHYSICAL {} (term='{}')",
start_streaming_at, donor.term
);
let copy_stream = client.copy_both_simple(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
// As in normal walreceiver, do networking and writing to disk in parallel.
let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
let wa = WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, None);
let res = tokio::select! {
r = network_io(physical_stream, msg_tx, donor.clone(), tli.clone(), conf.clone()) => r,
r = read_replies(reply_rx, donor.term) => r.map(|()| None),
};
// Join the spawned WalAcceptor. At this point chans to/from it passed to
// network routines are dropped, so it will exit as soon as it touches them.
match wa.await {
Ok(Ok(())) => {
// WalAcceptor finished normally, termination reason is different
match res {
Ok(Some(success_desc)) => Ok(success_desc),
Ok(None) => bail!("unexpected recovery end without error/success"), // can't happen
Err(e) => Err(e), // network error or term change
}
}
Ok(Err(e)) => Err(e), // error while processing message
Err(e) => bail!("WalAcceptor panicked: {}", e),
}
}
// Perform network part of streaming: read data and push it to msg_tx, send KA
// to make sender hear from us. If there is nothing coming for a while, check
// for termination.
// Returns
// - Ok(None) if channel to WalAcceptor closed -- its task should return error.
// - Ok(Some(String)) if recovery successfully completed.
// - Err if error happened while reading/writing to socket.
async fn network_io(
physical_stream: ReplicationStream,
msg_tx: Sender<ProposerAcceptorMessage>,
donor: Donor,
tli: Arc<Timeline>,
conf: SafeKeeperConf,
) -> anyhow::Result<Option<String>> {
let mut physical_stream = pin!(physical_stream);
let mut last_received_lsn = Lsn::INVALID;
// tear down connection if no data arrives withing this period
let no_data_timeout = Duration::from_millis(30000);
loop {
let msg = match timeout(no_data_timeout, physical_stream.next()).await {
Ok(next) => match next {
None => bail!("unexpected end of replication stream"),
Some(msg) => msg.context("get replication message")?,
},
Err(_) => bail!("no message received within {:?}", no_data_timeout),
};
match msg {
ReplicationMessage::XLogData(xlog_data) => {
let ar_hdr = AppendRequestHeader {
term: donor.term,
epoch_start_lsn: Lsn::INVALID, // unused
begin_lsn: Lsn(xlog_data.wal_start()),
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
truncate_lsn: Lsn::INVALID, // do not attempt to advance
proposer_uuid: [0; 16],
};
let ar = AppendRequest {
h: ar_hdr,
wal_data: xlog_data.into_data(),
};
trace!(
"processing AppendRequest {}-{}, len {}",
ar.h.begin_lsn,
ar.h.end_lsn,
ar.wal_data.len()
);
last_received_lsn = ar.h.end_lsn;
if msg_tx
.send(ProposerAcceptorMessage::AppendRequest(ar))
.await
.is_err()
{
return Ok(None); // chan closed, WalAcceptor terminated
}
}
ReplicationMessage::PrimaryKeepAlive(_) => {
// keepalive means nothing is being streamed for a while. Check whether we need to stop.
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
// do current donors still contain one we currently connected to?
if !recovery_needed_info
.donors
.iter()
.any(|d| d.sk_id == donor.sk_id)
{
// Most likely it means we are caughtup.
// note: just exiting makes tokio_postgres send CopyFail to the far end.
return Ok(Some(format!(
"terminating at {} as connected safekeeper {} with term {} is not a donor anymore: {}",
last_received_lsn, donor.sk_id, donor.term, recovery_needed_info
)));
}
}
_ => {}
}
// Send reply to each message to keep connection alive. Ideally we
// should do that once in a while instead, but this again requires
// stream split or similar workaround, and recovery is anyway not that
// performance critical.
//
// We do not know here real write/flush LSNs (need to take mutex again
// or check replies which are read in different future), but neither
// sender much cares about them, so just send last received.
physical_stream
.as_mut()
.standby_status_update(
PgLsn::from(last_received_lsn.0),
PgLsn::from(last_received_lsn.0),
PgLsn::from(last_received_lsn.0),
SystemTime::now(),
0,
)
.await?;
}
}
// Read replies from WalAcceptor. We are not interested much in sending them to
// donor safekeeper, so don't route them anywhere. However, we should check if
// term changes and exit if it does.
// Returns Ok(()) if channel closed, Err in case of term change.
async fn read_replies(
mut reply_rx: Receiver<AcceptorProposerMessage>,
donor_term: Term,
) -> anyhow::Result<()> {
loop {
match reply_rx.recv().await {
Some(msg) => {
if let AcceptorProposerMessage::AppendResponse(ar) = msg {
if ar.term != donor_term {
bail!("donor term changed from {} to {}", donor_term, ar.term);
}
}
}
None => return Ok(()), // chan closed, WalAcceptor terminated
}
}
}

View File

@@ -91,6 +91,59 @@ impl TermHistory {
}
TermHistory(res)
}
/// Find point of divergence between leader (walproposer) term history and
/// safekeeper. Arguments are not symmetrics as proposer history ends at
/// +infinity while safekeeper at flush_lsn.
/// C version is at walproposer SendProposerElected.
pub fn find_highest_common_point(
prop_th: &TermHistory,
sk_th: &TermHistory,
sk_wal_end: Lsn,
) -> Option<TermLsn> {
let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
// find last common term, if any...
let mut last_common_idx = None;
for i in 0..min(sk_th.len(), prop_th.len()) {
if prop_th[i].term != sk_th[i].term {
break;
}
// If term is the same, LSN must be equal as well.
assert!(
prop_th[i].lsn == sk_th[i].lsn,
"same term {} has different start LSNs: prop {}, sk {}",
prop_th[i].term,
prop_th[i].lsn,
sk_th[i].lsn
);
last_common_idx = Some(i);
}
let last_common_idx = match last_common_idx {
None => return None, // no common point
Some(lci) => lci,
};
// Now find where it ends at both prop and sk and take min. End of
// (common) term is the start of the next except it is the last one;
// there it is flush_lsn in case of safekeeper or, in case of proposer
// +infinity, so we just take flush_lsn then.
if last_common_idx == prop_th.len() - 1 {
Some(TermLsn {
term: prop_th[last_common_idx].term,
lsn: sk_wal_end,
})
} else {
let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
sk_th[last_common_idx + 1].lsn
} else {
sk_wal_end
};
Some(TermLsn {
term: prop_th[last_common_idx].term,
lsn: min(prop_common_term_end, sk_common_term_end),
})
}
}
}
/// Display only latest entries for Debug.
@@ -305,19 +358,19 @@ pub struct AcceptorGreeting {
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
pub struct VoteRequest {
term: Term,
pub term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
flush_lsn: Lsn,
pub flush_lsn: Lsn,
truncate_lsn: Lsn,
term_history: TermHistory,
pub term_history: TermHistory,
timeline_start_lsn: Lsn,
}
@@ -344,7 +397,8 @@ pub struct AppendRequest {
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// LSN since the proposer appends WAL; determines epoch switch point.
// TODO: remove this field, it in unused -- LSN of term switch can be taken
// from ProposerElected (as well as from term history).
pub epoch_start_lsn: Lsn,
/// start position of message in WAL
pub begin_lsn: Lsn,
@@ -759,7 +813,7 @@ where
bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
msg.term, self.flush_lsn(), msg.start_streaming_at)
}
// Otherwise this shouldn't happen.
// Otherwise we must never attempt to truncate committed data.
assert!(
msg.start_streaming_at >= self.inmem.commit_lsn,
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
@@ -810,6 +864,14 @@ where
info!("start receiving WAL since {:?}", msg.start_streaming_at);
// Cache LSN where term starts to immediately fsync control file with
// commit_lsn once we reach it -- sync-safekeepers finishes when
// persisted commit_lsn on majority of safekeepers aligns.
self.epoch_start_lsn = match msg.term_history.0.last() {
None => bail!("proposer elected with empty term history"),
Some(term_lsn_start) => term_lsn_start.lsn,
};
Ok(None)
}
@@ -835,10 +897,7 @@ where
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
// Also note that commit_lsn can reach epoch_start_lsn earlier
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
self.persist_control_file(self.state.clone()).await?;
}
@@ -902,7 +961,6 @@ where
// Now we know that we are in the same term as the proposer,
// processing the message.
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// do the job
@@ -1185,4 +1243,65 @@ mod tests {
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1);
}
#[test]
fn test_find_highest_common_point_none() {
let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
None
);
}
#[test]
fn test_find_highest_common_point_middle() {
let prop_th = TermHistory(vec![
(1, Lsn(10)).into(),
(2, Lsn(20)).into(),
(4, Lsn(40)).into(),
]);
let sk_th = TermHistory(vec![
(1, Lsn(10)).into(),
(2, Lsn(20)).into(),
(3, Lsn(30)).into(), // sk ends last common term 2 at 30
]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
Some(TermLsn {
term: 2,
lsn: Lsn(30),
})
);
}
#[test]
fn test_find_highest_common_point_sk_end() {
let prop_th = TermHistory(vec![
(1, Lsn(10)).into(),
(2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
(4, Lsn(40)).into(),
]);
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
Some(TermLsn {
term: 2,
lsn: Lsn(32),
})
);
}
#[test]
fn test_find_highest_common_point_walprop() {
let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
Some(TermLsn {
term: 2,
lsn: Lsn(32),
})
);
}
}

View File

@@ -418,10 +418,11 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_))
matches!(end_watch, EndWatch::Flush(_)),
appname
);
// switch to copy
@@ -680,7 +681,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
}
}
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(5);
/// Wait until we have available WAL > start_pos or timeout expires. Returns
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;

View File

@@ -11,6 +11,7 @@ use serde_with::DisplayFromStr;
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
sync::{mpsc::Sender, watch},
@@ -27,7 +28,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::receive_wal::WalReceivers;
use crate::recovery::recovery_main;
use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
@@ -45,11 +46,12 @@ use crate::{debug_dump, wal_storage};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
pub term: Term,
/// Term of the last entry.
_last_log_term: Term,
pub last_log_term: Term,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
_flush_lsn: Lsn,
pub flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
@@ -61,16 +63,21 @@ pub struct PeerInfo {
#[serde(skip)]
#[serde(default = "Instant::now")]
ts: Instant,
pub pg_connstr: String,
pub http_connstr: String,
}
impl PeerInfo {
fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id: NodeId(sk_info.safekeeper_id),
_last_log_term: sk_info.last_log_term,
_flush_lsn: Lsn(sk_info.flush_lsn),
term: sk_info.term,
last_log_term: sk_info.last_log_term,
flush_lsn: Lsn(sk_info.flush_lsn),
commit_lsn: Lsn(sk_info.commit_lsn),
local_start_lsn: Lsn(sk_info.local_start_lsn),
pg_connstr: sk_info.safekeeper_connstr.clone(),
http_connstr: sk_info.http_connstr.clone(),
ts,
}
}
@@ -265,6 +272,20 @@ impl SharedState {
availability_zone: conf.availability_zone.clone(),
}
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
let now = Instant::now();
self.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
.cloned()
.collect()
}
}
#[derive(Debug, thiserror::Error)]
@@ -446,7 +467,9 @@ impl Timeline {
/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
tokio::spawn(recovery_main(self.clone(), conf.clone()));
if conf.peer_recovery_enabled {
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
}
/// Delete timeline from disk completely, by removing timeline directory. Background
@@ -680,20 +703,88 @@ impl Timeline {
Ok(())
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
let now = Instant::now();
shared_state
.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
.cloned()
.collect()
shared_state.get_peers(conf.heartbeat_timeout)
}
/// Should we start fetching WAL from a peer safekeeper, and if yes, from
/// which? Answer is yes, i.e. .donors is not empty if 1) there is something
/// to fetch, and we can do that without running elections; 2) there is no
/// actively streaming compute, as we don't want to compete with it.
///
/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
/// to its last_log_term so we are sure such a leader ever had been elected.
///
/// All possible donors are returned so that we could keep connection to the
/// current one if it is good even if it slightly lags behind.
///
/// Note that term conditions above might be not met, but safekeepers are
/// still not aligned on last flush_lsn. Generally in this case until
/// elections are run it is not possible to say which safekeeper should
/// recover from which one -- history which would be committed is different
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.write_shared_state().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_epoch();
let flush_lsn = ss.sk.flush_lsn();
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
let mut peers = ss.get_peers(heartbeat_timeout);
// Sort by <last log term, lsn> pairs.
peers.sort_by(|p1, p2| {
let tl1 = TermLsn {
term: p1.last_log_term,
lsn: p1.flush_lsn,
};
let tl2 = TermLsn {
term: p2.last_log_term,
lsn: p2.flush_lsn,
};
tl2.cmp(&tl1) // desc
});
let num_streaming_computes = self.walreceivers.get_num_streaming();
let donors = if num_streaming_computes > 0 {
vec![] // If there is a streaming compute, don't try to recover to not intervene.
} else {
peers
.iter()
.filter_map(|candidate| {
// Are we interested in this candidate?
let candidate_tl = TermLsn {
term: candidate.last_log_term,
lsn: candidate.flush_lsn,
};
let my_tl = TermLsn {
term: last_log_term,
lsn: flush_lsn,
};
if my_tl < candidate_tl {
// Yes, we are interested. Can we pull from it without
// (re)running elections? It is possible if 1) his term
// is equal to his last_log_term so we could act on
// behalf of leader of this term (we must be sure he was
// ever elected) and 2) our term is not higher, or we'll refuse data.
if candidate.term == candidate.last_log_term && candidate.term >= term {
Some(Donor::from(candidate))
} else {
None
}
} else {
None
}
})
.collect()
};
RecoveryNeededInfo {
term,
last_log_term,
flush_lsn,
peers,
num_streaming_computes,
donors,
}
}
pub fn get_walsenders(&self) -> &Arc<WalSenders> {

View File

@@ -1,62 +0,0 @@
#!/usr/bin/env python3
#
# Script to download the basebackup from a pageserver to a tar file.
#
# This can be useful in disaster recovery.
#
import argparse
import psycopg2
from psycopg2.extensions import connection as PgConnection
def main(args: argparse.Namespace):
pageserver_connstr = args.pageserver_connstr
tenant_id = args.tenant
timeline_id = args.timeline
lsn = args.lsn
output_path = args.output_path
psconn: PgConnection = psycopg2.connect(pageserver_connstr)
psconn.autocommit = True
output = open(output_path, "wb")
with psconn.cursor() as pscur:
pscur.copy_expert(f"basebackup {tenant_id} {timeline_id} {lsn}", output)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--tenant-id",
dest="tenant",
required=True,
help="Id of the tenant",
)
parser.add_argument(
"--timeline-id",
dest="timeline",
required=True,
help="Id of the timeline",
)
parser.add_argument(
"--lsn",
dest="lsn",
required=True,
help="LSN to take the basebackup at",
)
parser.add_argument(
"--pageserver-connstr",
dest="pageserver_connstr",
required=True,
help="libpq connection string of the pageserver",
)
parser.add_argument(
"--output",
dest="output_path",
required=True,
help="output path to write the basebackup to",
)
args = parser.parse_args()
main(args)

View File

@@ -2691,6 +2691,20 @@ class Safekeeper:
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
def timeline_dir(self, tenant_id, timeline_id) -> str:
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
Get list of segment names of the given timeline.
"""
tli_dir = self.timeline_dir(tenant_id, timeline_id)
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if f != "safekeeper.control"])
segments.sort()
return segments
@dataclass
class SafekeeperTimelineStatus:

View File

@@ -1,44 +0,0 @@
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
#
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_lfc_resize", "empty")
endpoint = env.endpoints.create_start(
"test_lfc_resize",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
],
)
n_resize = 10
scale = 10
log.info("postgres is running on 'test_lfc_resize' branch")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()
conn = endpoint.connect()
cur = conn.cursor()
for i in range(n_resize):
cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
thread.join()

View File

@@ -157,6 +157,8 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
# insert something to force sk -> ps message
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
# the latest WAL yet, when the `timeline_detail` API is first called.
@@ -168,7 +170,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
)
# Make a DB modification then expect getting a new WAL receiver's data.
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
endpoint.safe_psql("INSERT INTO t VALUES (1, 'hey')")
wait_until(
number_of_iterations=5,
interval=1,

View File

@@ -1,7 +1,5 @@
import json
import time
from dataclasses import dataclass
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
@@ -30,7 +28,6 @@ def test_metric_collection(
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
@@ -38,9 +35,7 @@ def test_metric_collection(
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
uploads.put(events)
return Response(status=200)
# Require collecting metrics frequently, since we change
@@ -48,12 +43,15 @@ def test_metric_collection(
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
neon_env_builder.pageserver_config_override = (
f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
@@ -65,7 +63,7 @@ def test_metric_collection(
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
env = neon_env_builder.init_start()
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
@@ -126,20 +124,19 @@ def test_metric_collection(
events = uploads.get(timeout=timeout)
if events == "ready":
(events, is_last) = uploads.get(timeout=timeout)
v.ingest(events, is_last)
events = uploads.get(timeout=timeout)
v.ingest(events)
break
else:
(events, is_last) = events
v.ingest(events, is_last)
v.ingest(events)
if "synthetic_storage_size" not in v.accepted_event_names():
log.info("waiting for synthetic storage size to be calculated and uploaded...")
rounds = 0
while "synthetic_storage_size" not in v.accepted_event_names():
(events, is_last) = uploads.get(timeout=timeout)
v.ingest(events, is_last)
events = uploads.get(timeout=timeout)
v.ingest(events)
rounds += 1
assert rounds < 10, "did not get synthetic_storage_size in 10 uploads"
# once we have it in verifiers, it will assert that future batches will contain it
@@ -153,161 +150,17 @@ def test_metric_collection(
events = uploads.get(timeout=timeout)
if events == "ready":
(events, is_last) = uploads.get(timeout=timeout * 3)
v.ingest(events, is_last)
(events, is_last) = uploads.get(timeout=timeout)
v.ingest(events, is_last)
events = uploads.get(timeout=timeout * 3)
v.ingest(events)
events = uploads.get(timeout=timeout)
v.ingest(events)
break
else:
(events, is_last) = events
v.ingest(events, is_last)
v.ingest(events)
httpserver.check()
def test_metric_collection_cleans_up_tempfile(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
pageserver_http = env.pageserver.http_client()
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
timeout = 5
# these strings in the upload queue allow synchronizing with the uploads
# and the main test execution
uploads.put("ready")
while True:
events = uploads.get(timeout=timeout)
if events == "ready":
(events, _) = uploads.get(timeout=timeout)
break
# should really configure an env?
pageserver_http.configure_failpoints(("before-persist-last-metrics-collected", "exit"))
time.sleep(3)
env.pageserver.stop()
initially = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
assert (
len(initially.matching) == 2
), f"expecting actual file and tempfile, but not found: {initially.matching}"
uploads.put("ready")
env.pageserver.start()
while True:
events = uploads.get(timeout=timeout * 3)
if events == "ready":
(events, _) = uploads.get(timeout=timeout)
break
env.pageserver.stop()
later = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
# it is possible we shutdown the pageserver right at the correct time, so the old tempfile
# is gone, but we also have a new one.
only = set(["last_consumption_metrics.json"])
assert (
initially.matching.intersection(later.matching) == only
), "only initial tempfile should had been removed"
assert initially.other.issuperset(later.other), "no other files should had been removed"
@dataclass
class PrefixPartitionedFiles:
matching: Set[str]
other: Set[str]
def iterate_pageserver_workdir(path: Path, prefix: str) -> PrefixPartitionedFiles:
"""
Iterates the files in the workdir, returns two sets:
- files with the prefix
- files without the prefix
"""
matching = set()
other = set()
for entry in path.iterdir():
if not entry.is_file():
continue
if not entry.name.startswith(prefix):
other.add(entry.name)
else:
matching.add(entry.name)
return PrefixPartitionedFiles(matching, other)
class MetricsVerifier:
"""
A graph of per tenant per timeline verifiers, allowing one for each
@@ -318,7 +171,7 @@ class MetricsVerifier:
self.tenants: Dict[TenantId, TenantMetricsVerifier] = {}
pass
def ingest(self, events, is_last):
def ingest(self, events):
stringified = json.dumps(events, indent=2)
log.info(f"ingesting: {stringified}")
for event in events:
@@ -328,9 +181,8 @@ class MetricsVerifier:
self.tenants[id].ingest(event)
if is_last:
for t in self.tenants.values():
t.post_batch()
for t in self.tenants.values():
t.post_batch()
def accepted_event_names(self) -> Set[str]:
names: Set[str] = set()

View File

@@ -1,3 +1,4 @@
import filecmp
import os
import pathlib
import random
@@ -980,6 +981,137 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
endpoint.start()
# Test that we can create timeline with one safekeeper down and initialize it
# later when some data already had been written.
def test_late_init(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
sk1 = env.safekeepers[0]
sk1.stop()
# create and insert smth while safekeeper is down...
env.neon_cli.create_branch("test_late_init")
endpoint = env.endpoints.create_start("test_late_init")
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
log.info("insert with safekeeper down done")
endpoint.stop() # stop compute
# stop another safekeeper, and start one which missed timeline creation
sk2 = env.safekeepers[1]
sk2.stop()
sk1.start()
# insert some more
endpoint = env.endpoints.create_start("test_late_init")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
# is timeline flush_lsn equal on provided safekeepers?
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
return (
sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
== sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
)
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
# still starts up and functions normally if two other sks are ok.
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
# normally if two other sks are ok.
# 3) Lagged safekeeper can still recover by peer recovery.
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
pass
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
# it works without compute at all.
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_peer_recovery")
endpoint = env.endpoints.create_start("test_peer_recovery")
endpoint.safe_psql("create table t(key int, value text)")
sk1 = env.safekeepers[0]
sk1.stop()
# roughly fills one segment
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
endpoint.stop() # stop compute
# now start safekeeper, but with peer recovery disabled
sk1.start(extra_opts=["--peer-recovery=false"])
# it should lag for about a segment
sk1_http_cli = sk1.http_client()
sk2 = env.safekeepers[1]
sk2_http_cli = sk2.http_client()
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
)
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# wait a bit, lsns shouldn't change
# time.sleep(5)
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
)
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# now restart safekeeper with peer recovery enabled and wait for recovery
sk1.stop().start()
wait(
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
"flush_lsn to get aligned",
wait_f=lambda sk1_http_cli=sk1_http_cli, sk2_http_cli=sk2_http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
f"waiting for flush_lsn alignment, sk1.flush_lsn={sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}, sk2.flush_lsn={sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}"
),
)
# check that WALs are identic after recovery
segs = sk1.list_segments(tenant_id, timeline_id)
log.info(f"segs are {segs}")
(_, mismatch, not_regular) = filecmp.cmpfiles(
sk1.timeline_dir(tenant_id, timeline_id),
sk2.timeline_dir(tenant_id, timeline_id),
segs,
shallow=False,
)
log.info(
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
)
for f in mismatch:
f1 = os.path.join(sk1.timeline_dir(tenant_id, timeline_id), f)
f2 = os.path.join(sk2.timeline_dir(tenant_id, timeline_id), f)
stdout_filename = "{}.filediff".format(f2)
with open(stdout_filename, "w") as stdout_f:
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
cmd = "diff {}.hex {}.hex".format(f1, f2)
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, not_regular) == ([], [])
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
env.safekeepers[2].stop()
endpoint = env.endpoints.create_start("test_peer_recovery")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
class SafekeeperEnv:
def __init__(
self,

View File

@@ -64,7 +64,6 @@ toml_edit = { version = "0.19", features = ["serde"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tungstenite = { version = "0.20" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4"] }