mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
Compare commits
6 Commits
problame/b
...
sk-wp-grac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0cbd5353a | ||
|
|
8ea21686e1 | ||
|
|
a8e7eede2a | ||
|
|
2b91f507a8 | ||
|
|
bb2c3253c6 | ||
|
|
bdf3769a2b |
7
.github/workflows/build_and_test.yml
vendored
7
.github/workflows/build_and_test.yml
vendored
@@ -834,7 +834,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.17.10
|
||||
VM_BUILDER_VERSION: v0.17.5
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -844,7 +844,7 @@ jobs:
|
||||
|
||||
- name: Downloading vm-builder
|
||||
run: |
|
||||
curl -fL https://workstation-ubuntu.tail888fb.ts.net/vm-builder -o vm-builder
|
||||
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
|
||||
chmod +x vm-builder
|
||||
|
||||
# Note: we need a separate pull step here because otherwise vm-builder will try to pull, and
|
||||
@@ -1091,9 +1091,8 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
|
||||
54
Cargo.lock
generated
54
Cargo.lock
generated
@@ -636,7 +636,7 @@ dependencies = [
|
||||
"sha1",
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tokio-tungstenite 0.20.0",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
@@ -1941,15 +1941,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tungstenite"
|
||||
version = "0.11.1"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9"
|
||||
checksum = "880b8b1c98a5ec2a505c7c90db6d3f6f1f480af5655d9c5b55facc9382a5a5b5"
|
||||
dependencies = [
|
||||
"hyper",
|
||||
"pin-project-lite",
|
||||
"pin-project",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tungstenite",
|
||||
"tokio-tungstenite 0.18.0",
|
||||
"tungstenite 0.18.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2908,9 +2908,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.13"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
|
||||
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
|
||||
|
||||
[[package]]
|
||||
name = "pin-utils"
|
||||
@@ -4641,6 +4641,18 @@ dependencies = [
|
||||
"xattr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tungstenite"
|
||||
version = "0.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tungstenite 0.18.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tungstenite"
|
||||
version = "0.20.0"
|
||||
@@ -4650,7 +4662,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tungstenite",
|
||||
"tungstenite 0.20.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4965,9 +4977,28 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
|
||||
|
||||
[[package]]
|
||||
name = "tungstenite"
|
||||
version = "0.20.1"
|
||||
version = "0.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
|
||||
checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"http",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand",
|
||||
"sha1",
|
||||
"thiserror",
|
||||
"url",
|
||||
"utf-8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tungstenite"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
@@ -5617,7 +5648,6 @@ dependencies = [
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tungstenite",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -78,7 +78,7 @@ hostname = "0.3.1"
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1.1"
|
||||
hyper = "0.14"
|
||||
hyper-tungstenite = "0.11"
|
||||
hyper-tungstenite = "0.9"
|
||||
inotify = "0.10.2"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "8"
|
||||
|
||||
@@ -589,7 +589,8 @@ RUN case "${PG_VERSION}" in \
|
||||
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/embedding.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
|
||||
15
Makefile
15
Makefile
@@ -153,6 +153,18 @@ neon-pg-ext-%: postgres-%
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
|
||||
|
||||
# pg_embedding was temporarily released as hnsw from this repo, when we only
|
||||
# supported PostgreSQL 14 and 15
|
||||
neon-pg-ext-v14: neon-pg-ext-hnsw-v14
|
||||
neon-pg-ext-v15: neon-pg-ext-hnsw-v15
|
||||
|
||||
neon-pg-ext-hnsw-%: postgres-headers-% postgres-%
|
||||
+@echo "Compiling hnsw $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/hnsw-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/hnsw-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/hnsw/Makefile install
|
||||
|
||||
.PHONY: neon-pg-ext-clean-%
|
||||
neon-pg-ext-clean-%:
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
@@ -167,6 +179,9 @@ neon-pg-ext-clean-%:
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/hnsw-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/hnsw/Makefile clean
|
||||
|
||||
.PHONY: neon-pg-ext
|
||||
neon-pg-ext: \
|
||||
|
||||
@@ -29,13 +29,13 @@ See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more informati
|
||||
```bash
|
||||
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
|
||||
libssl-dev clang pkg-config libpq-dev cmake postgresql-client protobuf-compiler \
|
||||
libcurl4-openssl-dev openssl python-poetry lsof libicu-dev
|
||||
libcurl4-openssl-dev openssl python-poetry lsof
|
||||
```
|
||||
* On Fedora, these packages are needed:
|
||||
```bash
|
||||
dnf install flex bison readline-devel zlib-devel openssl-devel \
|
||||
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler \
|
||||
protobuf-devel libcurl-devel openssl poetry lsof libicu-devel
|
||||
protobuf-devel libcurl-devel openssl poetry lsof
|
||||
```
|
||||
* On Arch based systems, these packages are needed:
|
||||
```bash
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//!
|
||||
use chrono::{DateTime, Utc};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -54,8 +54,8 @@ impl EventType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub struct Event<Extra, Metric> {
|
||||
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub struct Event<Extra, Metric: Serialize> {
|
||||
#[serde(flatten)]
|
||||
#[serde(rename = "type")]
|
||||
pub kind: EventType,
|
||||
|
||||
@@ -561,7 +561,14 @@ impl CgroupWatcher {
|
||||
/// Setting these values also affects the thresholds for receiving usage alerts.
|
||||
#[derive(Debug)]
|
||||
pub struct MemoryLimits {
|
||||
pub high: u64,
|
||||
high: u64,
|
||||
max: u64,
|
||||
}
|
||||
|
||||
impl MemoryLimits {
|
||||
pub fn new(high: u64, max: u64) -> Self {
|
||||
Self { max, high }
|
||||
}
|
||||
}
|
||||
|
||||
// Methods for manipulating the actual cgroup
|
||||
@@ -638,7 +645,12 @@ impl CgroupWatcher {
|
||||
|
||||
/// Set cgroup memory.high and memory.max.
|
||||
pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
|
||||
info!(limits.high, path = self.path(), "writing new memory limits",);
|
||||
info!(
|
||||
limits.high,
|
||||
limits.max,
|
||||
path = self.path(),
|
||||
"writing new memory limits",
|
||||
);
|
||||
self.memory()
|
||||
.context("failed to get memory subsystem while setting memory limits")?
|
||||
.set_mem(cgroups_rs::memory::SetMemory {
|
||||
@@ -647,7 +659,7 @@ impl CgroupWatcher {
|
||||
high: Some(MaxValue::Value(
|
||||
u64::min(limits.high, i64::MAX as u64) as i64
|
||||
)),
|
||||
max: None,
|
||||
max: Some(MaxValue::Value(u64::min(limits.max, i64::MAX as u64) as i64)),
|
||||
})
|
||||
.context("failed to set memory limits")
|
||||
}
|
||||
@@ -655,7 +667,7 @@ impl CgroupWatcher {
|
||||
/// Given some amount of available memory, set the desired cgroup memory limits
|
||||
pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
|
||||
let new_high = self.config.calculate_memory_high_value(available_memory);
|
||||
let limits = MemoryLimits { high: new_high };
|
||||
let limits = MemoryLimits::new(new_high, available_memory);
|
||||
info!(
|
||||
path = self.path(),
|
||||
memory = ?limits,
|
||||
|
||||
@@ -257,11 +257,12 @@ impl Runner {
|
||||
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
|
||||
}
|
||||
|
||||
let limits = MemoryLimits {
|
||||
let limits = MemoryLimits::new(
|
||||
// new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
|
||||
// since it is properly initialized in the previous cgroup if let block
|
||||
high: new_cgroup_mem_high,
|
||||
};
|
||||
new_cgroup_mem_high,
|
||||
available_memory,
|
||||
);
|
||||
cgroup
|
||||
.set_limits(&limits)
|
||||
.context("failed to set cgroup memory limits")?;
|
||||
@@ -327,9 +328,7 @@ impl Runner {
|
||||
name = cgroup.path(),
|
||||
"updating cgroup memory.high",
|
||||
);
|
||||
let limits = MemoryLimits {
|
||||
high: new_cgroup_mem_high,
|
||||
};
|
||||
let limits = MemoryLimits::new(new_cgroup_mem_high, available_memory);
|
||||
cgroup
|
||||
.set_limits(&limits)
|
||||
.context("failed to set file cache size")?;
|
||||
|
||||
@@ -14,7 +14,7 @@ use tracing::*;
|
||||
use utils::id::NodeId;
|
||||
|
||||
mod metrics;
|
||||
use metrics::MetricsKey;
|
||||
use metrics::{Ids, MetricsKey};
|
||||
mod disk_cache;
|
||||
mod upload;
|
||||
|
||||
@@ -68,11 +68,10 @@ pub async fn collect_metrics(
|
||||
},
|
||||
);
|
||||
|
||||
let path: Arc<PathBuf> = Arc::new(local_disk_storage);
|
||||
let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);
|
||||
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
|
||||
let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
|
||||
let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval);
|
||||
|
||||
let mut cached_metrics = tokio::select! {
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
@@ -109,14 +108,14 @@ pub async fn collect_metrics(
|
||||
// already here, better to try to flush the new values.
|
||||
|
||||
let flush = async {
|
||||
match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
|
||||
match disk_cache::flush_metrics_to_disk(&metrics, &final_path).await {
|
||||
Ok(()) => {
|
||||
tracing::debug!("flushed metrics to disk");
|
||||
}
|
||||
Err(e) => {
|
||||
// idea here is that if someone creates a directory as our path, then they
|
||||
// idea here is that if someone creates a directory as our final_path, then they
|
||||
// might notice it from the logs before shutdown and remove it
|
||||
tracing::error!("failed to persist metrics to {path:?}: {e:#}");
|
||||
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -153,10 +152,12 @@ pub async fn collect_metrics(
|
||||
///
|
||||
/// Cancellation safe.
|
||||
async fn restore_and_reschedule(
|
||||
path: &Arc<PathBuf>,
|
||||
final_path: &Arc<PathBuf>,
|
||||
metric_collection_interval: Duration,
|
||||
) -> Cache {
|
||||
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
|
||||
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(final_path.clone())
|
||||
.await
|
||||
{
|
||||
Ok(found_some) => {
|
||||
// there is no min needed because we write these sequentially in
|
||||
// collect_all_metrics
|
||||
@@ -174,11 +175,12 @@ async fn restore_and_reschedule(
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
let root = e.root_cause();
|
||||
|
||||
let maybe_ioerr = root.downcast_ref::<Error>();
|
||||
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
|
||||
|
||||
if !is_not_found {
|
||||
tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
|
||||
tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}");
|
||||
}
|
||||
|
||||
(HashMap::new(), None)
|
||||
|
||||
@@ -9,13 +9,6 @@ pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _e = span.entered();
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
if let Err(e) = scan_and_delete_with_same_prefix(&path) {
|
||||
tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
let mut file = std::fs::File::open(&*path)?;
|
||||
let reader = std::io::BufReader::new(&mut file);
|
||||
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
|
||||
@@ -25,68 +18,26 @@ pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result
|
||||
.and_then(|x| x)
|
||||
}
|
||||
|
||||
fn scan_and_delete_with_same_prefix(path: &std::path::Path) -> std::io::Result<()> {
|
||||
let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
|
||||
|
||||
let prefix = path.file_name().expect("caller checked").to_string_lossy();
|
||||
|
||||
for entry in it {
|
||||
let entry = entry?;
|
||||
if !entry.metadata()?.is_file() {
|
||||
continue;
|
||||
}
|
||||
let file_name = entry.file_name();
|
||||
|
||||
if path.file_name().unwrap() == file_name {
|
||||
// do not remove our actual file
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = file_name.to_string_lossy();
|
||||
|
||||
if !file_name.starts_with(&*prefix) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
if let Err(e) = std::fs::remove_file(&path) {
|
||||
tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
|
||||
} else {
|
||||
tracing::info!("cleaned up old tempfile {file_name:?}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn flush_metrics_to_disk(
|
||||
current_metrics: &Arc<Vec<RawMetric>>,
|
||||
path: &Arc<PathBuf>,
|
||||
final_path: &Arc<PathBuf>,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::io::Write;
|
||||
|
||||
anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
|
||||
anyhow::ensure!(
|
||||
path.file_name().is_some(),
|
||||
"path must have filename: {path:?}"
|
||||
final_path.parent().is_some(),
|
||||
"path must have parent: {final_path:?}"
|
||||
);
|
||||
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking({
|
||||
let current_metrics = current_metrics.clone();
|
||||
let path = path.clone();
|
||||
let final_path = final_path.clone();
|
||||
move || {
|
||||
let _e = span.entered();
|
||||
|
||||
let parent = path.parent().expect("existence checked");
|
||||
let file_name = path.file_name().expect("existence checked");
|
||||
let mut tempfile = tempfile::Builder::new()
|
||||
.prefix(file_name)
|
||||
.suffix(".tmp")
|
||||
.tempfile_in(parent)?;
|
||||
|
||||
tracing::debug!("using tempfile {:?}", tempfile.path());
|
||||
let mut tempfile =
|
||||
tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?;
|
||||
|
||||
// write out all of the raw metrics, to be read out later on restart as cached values
|
||||
{
|
||||
@@ -101,17 +52,15 @@ pub(super) async fn flush_metrics_to_disk(
|
||||
tempfile.flush()?;
|
||||
tempfile.as_file().sync_all()?;
|
||||
|
||||
fail::fail_point!("before-persist-last-metrics-collected");
|
||||
drop(tempfile.persist(&*final_path)?);
|
||||
|
||||
drop(tempfile.persist(&*path).map_err(|e| e.error)?);
|
||||
|
||||
let f = std::fs::File::open(path.parent().unwrap())?;
|
||||
let f = std::fs::File::open(final_path.parent().unwrap())?;
|
||||
f.sync_all()?;
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("write metrics to {path:?} join error"))
|
||||
.and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
|
||||
.with_context(|| format!("write metrics to {final_path:?} join error"))
|
||||
.and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}")))
|
||||
}
|
||||
|
||||
@@ -1,21 +1,36 @@
|
||||
use crate::context::RequestContext;
|
||||
use anyhow::Context;
|
||||
use crate::tenant::mgr;
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::EventType;
|
||||
use futures::stream::StreamExt;
|
||||
use serde_with::serde_as;
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
use pageserver_api::models::TenantState;
|
||||
use serde::Serialize;
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use super::{Cache, RawMetric};
|
||||
|
||||
// FIXME: all other consumption_metrics::Event stuff is over at uploading, maybe move?
|
||||
#[serde_as]
|
||||
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)]
|
||||
pub(super) struct Ids {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(super) tenant_id: TenantId,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) timeline_id: Option<TimelineId>,
|
||||
}
|
||||
|
||||
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
|
||||
/// instead of static str.
|
||||
// Do not rename any of these without first consulting with data team and partner
|
||||
// management.
|
||||
// FIXME: write those tests before refactoring to this!
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||
pub(super) enum Name {
|
||||
/// Timeline last_record_lsn, absolute
|
||||
@@ -44,7 +59,7 @@ pub(super) enum Name {
|
||||
/// elsewhere.
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||
pub(crate) struct MetricsKey {
|
||||
pub(super) struct MetricsKey {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub(super) tenant_id: TenantId,
|
||||
|
||||
@@ -68,7 +83,7 @@ impl MetricsKey {
|
||||
struct AbsoluteValueFactory(MetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
let key = self.0;
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
@@ -83,7 +98,7 @@ struct IncrementalValueFactory(MetricsKey);
|
||||
|
||||
impl IncrementalValueFactory {
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
const fn from_until(
|
||||
fn from_previous_up_to(
|
||||
self,
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
@@ -91,11 +106,16 @@ impl IncrementalValueFactory {
|
||||
) -> RawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
};
|
||||
(key, (when, val))
|
||||
(
|
||||
key,
|
||||
(
|
||||
EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
},
|
||||
val,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn key(&self) -> &MetricsKey {
|
||||
@@ -189,11 +209,9 @@ pub(super) async fn collect_all_metrics(
|
||||
cached_metrics: &Cache,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<RawMetric> {
|
||||
use pageserver_api::models::TenantState;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let tenants = match crate::tenant::mgr::list_tenants().await {
|
||||
let tenants = match mgr::list_tenants().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(err) => {
|
||||
tracing::error!("failed to list tenants: {:?}", err);
|
||||
@@ -205,7 +223,7 @@ pub(super) async fn collect_all_metrics(
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
crate::tenant::mgr::get_tenant(id, true)
|
||||
mgr::get_tenant(id, true)
|
||||
.await
|
||||
.ok()
|
||||
.map(|tenant| (id, tenant))
|
||||
@@ -267,7 +285,7 @@ where
|
||||
current_metrics
|
||||
}
|
||||
|
||||
/// In-between abstraction to allow testing metrics without actual Tenants.
|
||||
/// Testing helping in-between abstraction allowing testing metrics without actual Tenants.
|
||||
struct TenantSnapshot {
|
||||
resident_size: u64,
|
||||
remote_size: u64,
|
||||
@@ -423,14 +441,14 @@ impl TimelineSnapshot {
|
||||
let up_to = now;
|
||||
|
||||
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
|
||||
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
|
||||
let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta);
|
||||
// written_size_delta
|
||||
metrics.push(key_value);
|
||||
// written_size
|
||||
metrics.push((key, written_size_now));
|
||||
} else {
|
||||
// the cached value was ahead of us, report zero until we've caught up
|
||||
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
|
||||
metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0));
|
||||
// the cached value was ahead of us, report the same until we've caught up
|
||||
metrics.push((key, (written_size_now.0, prev.1)));
|
||||
}
|
||||
@@ -450,6 +468,3 @@ impl TimelineSnapshot {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::metric_examples;
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use std::time::SystemTime;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
#[test]
|
||||
fn startup_collected_timeline_metrics_before_advancing() {
|
||||
@@ -27,7 +33,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
snap.loaded_at.1.into(),
|
||||
now,
|
||||
0
|
||||
@@ -67,7 +73,8 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
@@ -93,7 +100,11 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
// at t=before was the last time the last_record_lsn changed
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
|
||||
// end time of this event is used for the next ones
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
before,
|
||||
just_before,
|
||||
0,
|
||||
),
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
@@ -107,13 +118,81 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
just_before,
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metric_image_stability() {
|
||||
// it is important that these strings stay as they are
|
||||
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap();
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap();
|
||||
|
||||
let [now, before] = [DateTime::<Utc>::from(now), DateTime::from(before)];
|
||||
|
||||
let examples = [
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(before, now, 0),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::resident_size(tenant_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
];
|
||||
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0);
|
||||
|
||||
for (line, (key, (kind, value)), expected) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(expected, actual, "example from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
|
||||
@@ -141,7 +220,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
@@ -155,7 +234,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
before_restart,
|
||||
now,
|
||||
0
|
||||
@@ -173,7 +252,8 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(now, later, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
|
||||
]
|
||||
);
|
||||
@@ -279,19 +359,3 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
|
||||
times
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
MetricsKey::resident_size(tenant_id).at(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1,21 +1,8 @@
|
||||
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
|
||||
use serde_with::serde_as;
|
||||
use consumption_metrics::{idempotency_key, Event, EventChunk, CHUNK_SIZE};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use tracing::*;
|
||||
|
||||
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// How the metrics from pageserver are identified.
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
struct Ids {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub(super) tenant_id: TenantId,
|
||||
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) timeline_id: Option<TimelineId>,
|
||||
}
|
||||
use super::{Cache, Ids, RawMetric};
|
||||
|
||||
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
|
||||
pub(super) async fn upload_metrics(
|
||||
@@ -26,21 +13,44 @@ pub(super) async fn upload_metrics(
|
||||
metrics: &[RawMetric],
|
||||
cached_metrics: &mut Cache,
|
||||
) -> anyhow::Result<()> {
|
||||
use bytes::BufMut;
|
||||
|
||||
let mut uploaded = 0;
|
||||
let mut failed = 0;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
|
||||
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
|
||||
let mut buffer = bytes::BytesMut::new();
|
||||
let mut chunk_to_send = Vec::new();
|
||||
|
||||
while let Some(res) = iter.next() {
|
||||
let (chunk, body) = res?;
|
||||
for chunk in metrics.chunks(CHUNK_SIZE) {
|
||||
chunk_to_send.clear();
|
||||
|
||||
// FIXME: this should always overwrite and truncate to chunk.len()
|
||||
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
|
||||
kind: *when,
|
||||
metric: curr_key.metric,
|
||||
// FIXME: finally write! this to the prev allocation
|
||||
idempotency_key: idempotency_key(node_id),
|
||||
value: *curr_val,
|
||||
extra: Ids {
|
||||
tenant_id: curr_key.tenant_id,
|
||||
timeline_id: curr_key.timeline_id,
|
||||
},
|
||||
}));
|
||||
|
||||
serde_json::to_writer(
|
||||
(&mut buffer).writer(),
|
||||
&EventChunk {
|
||||
events: (&chunk_to_send).into(),
|
||||
},
|
||||
)?;
|
||||
|
||||
let body = buffer.split().freeze();
|
||||
let event_bytes = body.len();
|
||||
|
||||
let is_last = iter.len() == 0;
|
||||
|
||||
let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
|
||||
let res = upload(client, metric_collection_endpoint, body, cancel)
|
||||
.instrument(tracing::info_span!(
|
||||
"upload",
|
||||
%event_bytes,
|
||||
@@ -78,150 +88,6 @@ pub(super) async fn upload_metrics(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The return type is quite ugly, but we gain testability in isolation
|
||||
fn serialize_in_chunks<'a, F>(
|
||||
chunk_size: usize,
|
||||
input: &'a [RawMetric],
|
||||
factory: F,
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
where
|
||||
F: KeyGen<'a> + 'a,
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
struct Iter<'a, F> {
|
||||
inner: std::slice::Chunks<'a, RawMetric>,
|
||||
chunk_size: usize,
|
||||
|
||||
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
|
||||
buffer: bytes::BytesMut,
|
||||
// chunk amount of events are reused to produce the serialized document
|
||||
scratch: Vec<Event<Ids, Name>>,
|
||||
factory: F,
|
||||
}
|
||||
|
||||
impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
|
||||
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let chunk = self.inner.next()?;
|
||||
|
||||
if self.scratch.is_empty() {
|
||||
// first round: create events with N strings
|
||||
self.scratch.extend(
|
||||
chunk
|
||||
.iter()
|
||||
.map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
|
||||
);
|
||||
} else {
|
||||
// next rounds: update_in_place to reuse allocations
|
||||
assert_eq!(self.scratch.len(), self.chunk_size);
|
||||
self.scratch
|
||||
.iter_mut()
|
||||
.zip(chunk.iter())
|
||||
.for_each(|(slot, raw_metric)| {
|
||||
raw_metric.update_in_place(slot, &self.factory.generate())
|
||||
});
|
||||
}
|
||||
|
||||
let res = serde_json::to_writer(
|
||||
(&mut self.buffer).writer(),
|
||||
&EventChunk {
|
||||
events: (&self.scratch[..chunk.len()]).into(),
|
||||
},
|
||||
);
|
||||
|
||||
match res {
|
||||
Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
|
||||
Err(e) => Some(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
|
||||
|
||||
let buffer = bytes::BytesMut::new();
|
||||
let inner = input.chunks(chunk_size);
|
||||
let scratch = Vec::new();
|
||||
|
||||
Iter {
|
||||
inner,
|
||||
chunk_size,
|
||||
buffer,
|
||||
scratch,
|
||||
factory,
|
||||
}
|
||||
}
|
||||
|
||||
trait RawMetricExt {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
|
||||
}
|
||||
|
||||
impl RawMetricExt for RawMetric {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.0;
|
||||
|
||||
let (kind, value) = self.1;
|
||||
|
||||
Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
|
||||
use std::fmt::Write;
|
||||
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.0;
|
||||
|
||||
let (kind, value) = self.1;
|
||||
|
||||
*event = Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: {
|
||||
event.idempotency_key.clear();
|
||||
write!(event.idempotency_key, "{key}").unwrap();
|
||||
std::mem::take(&mut event.idempotency_key)
|
||||
},
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
trait KeyGen<'a>: Copy {
|
||||
fn generate(&self) -> IdempotencyKey<'a>;
|
||||
}
|
||||
|
||||
impl<'a> KeyGen<'a> for &'a str {
|
||||
fn generate(&self) -> IdempotencyKey<'a> {
|
||||
IdempotencyKey::generate(self)
|
||||
}
|
||||
}
|
||||
|
||||
enum UploadError {
|
||||
Rejected(reqwest::StatusCode),
|
||||
Reqwest(reqwest::Error),
|
||||
@@ -253,16 +119,11 @@ impl UploadError {
|
||||
}
|
||||
}
|
||||
|
||||
// this is consumed by the test verifiers
|
||||
static LAST_IN_BATCH: reqwest::header::HeaderName =
|
||||
reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
|
||||
|
||||
async fn upload(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
body: bytes::Bytes,
|
||||
cancel: &CancellationToken,
|
||||
is_last: bool,
|
||||
) -> Result<(), UploadError> {
|
||||
let warn_after = 3;
|
||||
let max_attempts = 10;
|
||||
@@ -273,24 +134,17 @@ async fn upload(
|
||||
let res = client
|
||||
.post(metric_collection_endpoint.clone())
|
||||
.header(reqwest::header::CONTENT_TYPE, "application/json")
|
||||
.header(
|
||||
LAST_IN_BATCH.clone(),
|
||||
if is_last { "true" } else { "false" },
|
||||
)
|
||||
.body(body)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let res = res.and_then(|res| res.error_for_status());
|
||||
|
||||
// 10 redirects are normally allowed, so we don't need worry about 3xx
|
||||
match res {
|
||||
Ok(_response) => Ok(()),
|
||||
Err(e) => {
|
||||
let status = e.status().filter(|s| s.is_client_error());
|
||||
if let Some(status) = status {
|
||||
// rejection used to be a thing when the server could reject a
|
||||
// whole batch of metrics if one metric was bad.
|
||||
Err(UploadError::Rejected(status))
|
||||
} else {
|
||||
Err(UploadError::Reqwest(e))
|
||||
@@ -321,123 +175,3 @@ async fn upload(
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
#[test]
|
||||
fn chunked_serialization() {
|
||||
let examples = metric_samples();
|
||||
assert!(examples.len() > 1);
|
||||
|
||||
let factory = FixedGen::new(Utc::now(), "1", 42);
|
||||
|
||||
// need to use Event here because serde_json::Value uses default hashmap, not linked
|
||||
// hashmap
|
||||
#[derive(serde::Deserialize)]
|
||||
struct EventChunk {
|
||||
events: Vec<Event<Ids, Name>>,
|
||||
}
|
||||
|
||||
let correct = serialize_in_chunks(examples.len(), &examples, factory)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for chunk_size in 1..examples.len() {
|
||||
let actual = serialize_in_chunks(chunk_size, &examples, factory)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// if these are equal, it means that multi-chunking version works as well
|
||||
assert_eq!(correct, actual);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
|
||||
|
||||
impl<'a> FixedGen<'a> {
|
||||
fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
|
||||
FixedGen(now, node_id, nonce)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> KeyGen<'a> for FixedGen<'a> {
|
||||
fn generate(&self) -> IdempotencyKey<'a> {
|
||||
IdempotencyKey::for_tests(self.0, self.1, self.2)
|
||||
}
|
||||
}
|
||||
|
||||
static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
|
||||
DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into()
|
||||
});
|
||||
|
||||
#[test]
|
||||
fn metric_image_stability() {
|
||||
// it is important that these strings stay as they are
|
||||
|
||||
let examples = [
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
];
|
||||
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
|
||||
let examples = examples.into_iter().zip(metric_samples());
|
||||
|
||||
for ((line, expected), (key, (kind, value))) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
fn metric_samples() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into();
|
||||
let [now, before] = [*SAMPLES_NOW, before];
|
||||
|
||||
super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,8 +129,10 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
|
||||
pub struct PageCacheMetrics {
|
||||
pub read_accesses_materialized_page: IntCounter,
|
||||
pub read_accesses_ephemeral: IntCounter,
|
||||
pub read_accesses_immutable: IntCounter,
|
||||
|
||||
pub read_hits_ephemeral: IntCounter,
|
||||
pub read_hits_immutable: IntCounter,
|
||||
pub read_hits_materialized_page_exact: IntCounter,
|
||||
pub read_hits_materialized_page_older_lsn: IntCounter,
|
||||
@@ -161,12 +163,24 @@ pub static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_ephemeral: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["ephemeral"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_immutable: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["immutable"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_ephemeral: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["ephemeral", "-"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_immutable: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["immutable", "-"])
|
||||
|
||||
@@ -2,3 +2,4 @@ comment = '** Deprecated ** Please use pg_embedding instead'
|
||||
default_version = '0.1.0'
|
||||
module_pathname = '$libdir/hnsw'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -222,9 +222,8 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
|
||||
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
|
||||
return;
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
|
||||
@@ -566,9 +566,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
|
||||
}
|
||||
|
||||
initStringInfo(&safekeeper[n_safekeepers].outbuf);
|
||||
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
|
||||
if (safekeeper[n_safekeepers].xlogreader == NULL)
|
||||
elog(FATAL, "Failed to allocate xlog reader");
|
||||
safekeeper[n_safekeepers].xlogreader = NULL;
|
||||
safekeeper[n_safekeepers].flushWrite = false;
|
||||
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
|
||||
safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr;
|
||||
@@ -716,6 +714,12 @@ ShutdownConnection(Safekeeper *sk)
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
|
||||
HackyRemoveWalProposerEvent(sk);
|
||||
|
||||
if (sk->xlogreader)
|
||||
{
|
||||
NeonWALReaderFree(sk->xlogreader);
|
||||
sk->xlogreader = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1238,8 +1242,8 @@ HandleElectedProposer(void)
|
||||
LSN_FORMAT_ARGS(truncateLsn),
|
||||
LSN_FORMAT_ARGS(propEpochStartLsn));
|
||||
/* Perform recovery */
|
||||
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
|
||||
elog(FATAL, "Failed to recover state");
|
||||
// if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
|
||||
// elog(FATAL, "Failed to recover state");
|
||||
}
|
||||
else if (syncSafekeepers)
|
||||
{
|
||||
@@ -1555,6 +1559,12 @@ SendProposerElected(Safekeeper *sk)
|
||||
term_t lastCommonTerm;
|
||||
int i;
|
||||
|
||||
/* It's a good moment to create WAL reader */
|
||||
Assert(!sk->xlogreader);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, propEpochStartLsn);
|
||||
if (!sk->xlogreader)
|
||||
elog(FATAL, "failed to allocate xlog reader");
|
||||
|
||||
/*
|
||||
* Determine start LSN by comparing safekeeper's log term switch history
|
||||
* and proposer's, searching for the divergence point.
|
||||
@@ -1834,19 +1844,24 @@ SendAppendRequests(Safekeeper *sk)
|
||||
|
||||
/* write the WAL itself */
|
||||
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
|
||||
if (!WALRead(sk->xlogreader,
|
||||
|
||||
if (!NeonWALRead(sk->xlogreader,
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req->endLsn - req->beginLsn,
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/* FIXME don't use hardcoded timeline_id here */
|
||||
1,
|
||||
1
|
||||
#else
|
||||
ThisTimeLineID,
|
||||
ThisTimeLineID
|
||||
#endif
|
||||
&errinfo))
|
||||
))
|
||||
{
|
||||
WALReadRaiseError(&errinfo);
|
||||
elog(WARNING, "WAL reading for node %s:%s failed: %s",
|
||||
sk->host, sk->port,
|
||||
sk->xlogreader->err_msg);
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
}
|
||||
sk->outbuf.len += req->endLsn - req->beginLsn;
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
#define __NEON_WALPROPOSER_H__
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "postgres.h"
|
||||
#include "port.h"
|
||||
#include "access/xlog_internal.h"
|
||||
@@ -327,6 +328,24 @@ typedef struct AppendResponse
|
||||
/* Other fields are fixed part */
|
||||
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
|
||||
|
||||
#define NEON_WALREADER_ERR_MSG_LEN 128
|
||||
|
||||
/*
|
||||
* Like WALRead, but returns error instead of throwing ERROR when segment is
|
||||
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
|
||||
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
|
||||
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
|
||||
*/
|
||||
typedef struct {
|
||||
/* LSN before */
|
||||
XLogRecPtr available_lsn;
|
||||
WALSegmentContext segcxt;
|
||||
WALOpenSegment seg;
|
||||
int wre_errno;
|
||||
/* Explains failure to read, static for simplicity. */
|
||||
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
|
||||
} NeonWALReader;
|
||||
|
||||
/*
|
||||
* Descriptor of safekeeper
|
||||
*/
|
||||
@@ -358,7 +377,7 @@ typedef struct Safekeeper
|
||||
/*
|
||||
* WAL reader, allocated for each safekeeper.
|
||||
*/
|
||||
XLogReaderState *xlogreader;
|
||||
NeonWALReader *xlogreader;
|
||||
|
||||
/*
|
||||
* Streaming will start here; must be record boundary.
|
||||
@@ -508,4 +527,9 @@ extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
|
||||
|
||||
#endif /* __NEON_WALPROPOSER_H__ */
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "replication/slot.h"
|
||||
#include "walproposer_utils.h"
|
||||
#include "replication/walsender_private.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
@@ -657,3 +658,185 @@ XLogBroadcastWalProposer(void)
|
||||
set_ps_display(activitymsg);
|
||||
}
|
||||
}
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
reader = (NeonWALReader *)
|
||||
palloc_extended(sizeof(NeonWALReader),
|
||||
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
|
||||
if (!reader)
|
||||
return NULL;
|
||||
|
||||
reader->available_lsn = available_lsn;
|
||||
reader->seg.ws_file = -1;
|
||||
reader->seg.ws_segno = 0;
|
||||
reader->seg.ws_tli = 0;
|
||||
reader->segcxt.ws_segsize = wal_segment_size;
|
||||
|
||||
return reader;
|
||||
}
|
||||
|
||||
static void neon_wal_segment_close(NeonWALReader *state);
|
||||
|
||||
void
|
||||
NeonWALReaderFree(NeonWALReader *state)
|
||||
{
|
||||
if (state->seg.ws_file != -1)
|
||||
neon_wal_segment_close(state);
|
||||
pfree(state);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Copy of vanilla wal_segment_open, but returns false in case of error instead
|
||||
* of ERROR, with errno set.
|
||||
*
|
||||
* XLogReaderRoutine->segment_open callback for local pg_wal files
|
||||
*/
|
||||
static bool
|
||||
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
|
||||
TimeLineID *tli_p)
|
||||
{
|
||||
TimeLineID tli = *tli_p;
|
||||
char path[MAXPGPATH];
|
||||
|
||||
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
|
||||
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
|
||||
if (state->seg.ws_file >= 0)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/* copy of vanilla wal_segment_close with NeonWALReader */
|
||||
void
|
||||
neon_wal_segment_close(NeonWALReader *state)
|
||||
{
|
||||
close(state->seg.ws_file);
|
||||
/* need to check errno? */
|
||||
state->seg.ws_file = -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mostly copy of vanilla WALRead, but 1) returns error if requested data before
|
||||
* available_lsn 2) returns error is segment is missing instead of throwing
|
||||
* ERROR.
|
||||
*
|
||||
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
|
||||
* fetched from timeline 'tli'.
|
||||
*
|
||||
* Returns true if succeeded, false if an error occurs, in which case
|
||||
* 'state->errno' shows whether it was missing WAL (ENOENT) or something else,
|
||||
* and 'err' the desciption.
|
||||
*/
|
||||
bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
|
||||
{
|
||||
char *p;
|
||||
XLogRecPtr recptr;
|
||||
Size nbytes;
|
||||
|
||||
if (startptr < state->available_lsn)
|
||||
{
|
||||
state->wre_errno = 0;
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X",
|
||||
LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn));
|
||||
return false;
|
||||
}
|
||||
|
||||
p = buf;
|
||||
recptr = startptr;
|
||||
nbytes = count;
|
||||
|
||||
while (nbytes > 0)
|
||||
{
|
||||
uint32 startoff;
|
||||
int segbytes;
|
||||
int readbytes;
|
||||
|
||||
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
|
||||
|
||||
/*
|
||||
* If the data we want is not in a segment we have open, close what we
|
||||
* have (if anything) and open the next one, using the caller's
|
||||
* provided openSegment callback.
|
||||
*/
|
||||
if (state->seg.ws_file < 0 ||
|
||||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
|
||||
tli != state->seg.ws_tli)
|
||||
{
|
||||
XLogSegNo nextSegNo;
|
||||
|
||||
if (state->seg.ws_file >= 0)
|
||||
neon_wal_segment_close(state);
|
||||
|
||||
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
|
||||
if (!neon_wal_segment_open(state, nextSegNo, &tli))
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
state->wre_errno = errno;
|
||||
|
||||
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
|
||||
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
/* This shouldn't happen -- indicates a bug in segment_open */
|
||||
Assert(state->seg.ws_file >= 0);
|
||||
|
||||
/* Update the current segment info. */
|
||||
state->seg.ws_tli = tli;
|
||||
state->seg.ws_segno = nextSegNo;
|
||||
}
|
||||
|
||||
/* How many bytes are within this segment? */
|
||||
if (nbytes > (state->segcxt.ws_segsize - startoff))
|
||||
segbytes = state->segcxt.ws_segsize - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
|
||||
#endif
|
||||
|
||||
/* Reset errno first; eases reporting non-errno-affecting errors */
|
||||
errno = 0;
|
||||
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_end();
|
||||
#endif
|
||||
|
||||
if (readbytes <= 0)
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
|
||||
|
||||
if (readbytes < 0)
|
||||
{
|
||||
state->wre_errno = errno;
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
|
||||
fname, startoff, strerror(state->wre_errno));
|
||||
}
|
||||
else
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
|
||||
fname, startoff);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Update state for read */
|
||||
recptr += readbytes;
|
||||
nbytes -= readbytes;
|
||||
p += readbytes;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
}
|
||||
50
poetry.lock
generated
50
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
@@ -887,34 +887,34 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "41.0.4"
|
||||
version = "41.0.3"
|
||||
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "cryptography-41.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:80907d3faa55dc5434a16579952ac6da800935cd98d14dbd62f6f042c7f5e839"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:35c00f637cd0b9d5b6c6bd11b6c3359194a8eba9c46d4e875a3660e3b400005f"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cecfefa17042941f94ab54f769c8ce0fe14beff2694e9ac684176a2535bf9714"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e40211b4923ba5a6dc9769eab704bdb3fbb58d56c5b336d30996c24fcf12aadb"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:23a25c09dfd0d9f28da2352503b23e086f8e78096b9fd585d1d14eca01613e13"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2ed09183922d66c4ec5fdaa59b4d14e105c084dd0febd27452de8f6f74704143"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:5a0f09cefded00e648a127048119f77bc2b2ec61e736660b5789e638f43cc397"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:9eeb77214afae972a00dee47382d2591abe77bdae166bda672fb1e24702a3860"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-win32.whl", hash = "sha256:3b224890962a2d7b57cf5eeb16ccaafba6083f7b811829f00476309bce2fe0fd"},
|
||||
{file = "cryptography-41.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c880eba5175f4307129784eca96f4e70b88e57aa3f680aeba3bab0e980b0f37d"},
|
||||
{file = "cryptography-41.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:004b6ccc95943f6a9ad3142cfabcc769d7ee38a3f60fb0dddbfb431f818c3a67"},
|
||||
{file = "cryptography-41.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:86defa8d248c3fa029da68ce61fe735432b047e32179883bdb1e79ed9bb8195e"},
|
||||
{file = "cryptography-41.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:37480760ae08065437e6573d14be973112c9e6dcaf5f11d00147ee74f37a3829"},
|
||||
{file = "cryptography-41.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:b5f4dfe950ff0479f1f00eda09c18798d4f49b98f4e2006d644b3301682ebdca"},
|
||||
{file = "cryptography-41.0.4-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:7e53db173370dea832190870e975a1e09c86a879b613948f09eb49324218c14d"},
|
||||
{file = "cryptography-41.0.4-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5b72205a360f3b6176485a333256b9bcd48700fc755fef51c8e7e67c4b63e3ac"},
|
||||
{file = "cryptography-41.0.4-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:93530900d14c37a46ce3d6c9e6fd35dbe5f5601bf6b3a5c325c7bffc030344d9"},
|
||||
{file = "cryptography-41.0.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:efc8ad4e6fc4f1752ebfb58aefece8b4e3c4cae940b0994d43649bdfce8d0d4f"},
|
||||
{file = "cryptography-41.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:c3391bd8e6de35f6f1140e50aaeb3e2b3d6a9012536ca23ab0d9c35ec18c8a91"},
|
||||
{file = "cryptography-41.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:0d9409894f495d465fe6fda92cb70e8323e9648af912d5b9141d616df40a87b8"},
|
||||
{file = "cryptography-41.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:8ac4f9ead4bbd0bc8ab2d318f97d85147167a488be0e08814a37eb2f439d5cf6"},
|
||||
{file = "cryptography-41.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:047c4603aeb4bbd8db2756e38f5b8bd7e94318c047cfe4efeb5d715e08b49311"},
|
||||
{file = "cryptography-41.0.4.tar.gz", hash = "sha256:7febc3094125fc126a7f6fb1f420d0da639f3f32cb15c8ff0dc3997c4549f51a"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:652627a055cb52a84f8c448185922241dd5217443ca194d5739b44612c5e6507"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:8f09daa483aedea50d249ef98ed500569841d6498aa9c9f4b0531b9964658922"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4fd871184321100fb400d759ad0cddddf284c4b696568204d281c902fc7b0d81"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84537453d57f55a50a5b6835622ee405816999a7113267739a1b4581f83535bd"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:3fb248989b6363906827284cd20cca63bb1a757e0a2864d4c1682a985e3dca47"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:42cb413e01a5d36da9929baa9d70ca90d90b969269e5a12d39c1e0d475010116"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:aeb57c421b34af8f9fe830e1955bf493a86a7996cc1338fe41b30047d16e962c"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6af1c6387c531cd364b72c28daa29232162010d952ceb7e5ca8e2827526aceae"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-win32.whl", hash = "sha256:0d09fb5356f975974dbcb595ad2d178305e5050656affb7890a1583f5e02a306"},
|
||||
{file = "cryptography-41.0.3-cp37-abi3-win_amd64.whl", hash = "sha256:a983e441a00a9d57a4d7c91b3116a37ae602907a7618b882c8013b5762e80574"},
|
||||
{file = "cryptography-41.0.3-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5259cb659aa43005eb55a0e4ff2c825ca111a0da1814202c64d28a985d33b087"},
|
||||
{file = "cryptography-41.0.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:67e120e9a577c64fe1f611e53b30b3e69744e5910ff3b6e97e935aeb96005858"},
|
||||
{file = "cryptography-41.0.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:7efe8041897fe7a50863e51b77789b657a133c75c3b094e51b5e4b5cec7bf906"},
|
||||
{file = "cryptography-41.0.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:ce785cf81a7bdade534297ef9e490ddff800d956625020ab2ec2780a556c313e"},
|
||||
{file = "cryptography-41.0.3-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:57a51b89f954f216a81c9d057bf1a24e2f36e764a1ca9a501a6964eb4a6800dd"},
|
||||
{file = "cryptography-41.0.3-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:4c2f0d35703d61002a2bbdcf15548ebb701cfdd83cdc12471d2bae80878a4207"},
|
||||
{file = "cryptography-41.0.3-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:23c2d778cf829f7d0ae180600b17e9fceea3c2ef8b31a99e3c694cbbf3a24b84"},
|
||||
{file = "cryptography-41.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:95dd7f261bb76948b52a5330ba5202b91a26fbac13ad0e9fc8a3ac04752058c7"},
|
||||
{file = "cryptography-41.0.3-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:41d7aa7cdfded09b3d73a47f429c298e80796c8e825ddfadc84c8a7f12df212d"},
|
||||
{file = "cryptography-41.0.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d0d651aa754ef58d75cec6edfbd21259d93810b73f6ec246436a21b7841908de"},
|
||||
{file = "cryptography-41.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:ab8de0d091acbf778f74286f4989cf3d1528336af1b59f3e5d2ebca8b5fe49e1"},
|
||||
{file = "cryptography-41.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:a74fbcdb2a0d46fe00504f571a2a540532f4c188e6ccf26f1f178480117b33c4"},
|
||||
{file = "cryptography-41.0.3.tar.gz", hash = "sha256:6d192741113ef5e30d89dcb5b956ef4e1578f304708701b8b73d38e3e1461f34"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.72.1"
|
||||
channel = "1.72.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// Main entry point for the safekeeper executable
|
||||
//
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::Parser;
|
||||
use clap::{ArgAction, Parser};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
@@ -105,6 +105,9 @@ struct Args {
|
||||
/// it during this period passed as a human readable duration.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
|
||||
heartbeat_timeout: Duration,
|
||||
/// Disable/enable peer recovery. Used for disabling it in tests.
|
||||
#[arg(long, default_value = "true", action=ArgAction::Set)]
|
||||
peer_recovery: bool,
|
||||
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
|
||||
/// inline table, e.g.
|
||||
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
|
||||
@@ -268,6 +271,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
broker_keepalive_interval: args.broker_keepalive_interval,
|
||||
heartbeat_timeout: args.heartbeat_timeout,
|
||||
peer_recovery_enabled: args.peer_recovery,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
|
||||
@@ -372,6 +372,13 @@ impl SafekeeperPostgresHandler {
|
||||
/// from a walproposer recovery function. This connection gets a special handling:
|
||||
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
|
||||
pub fn is_walproposer_recovery(&self) -> bool {
|
||||
self.appname == Some("wal_proposer_recovery".to_string())
|
||||
match &self.appname {
|
||||
None => false,
|
||||
Some(appname) => {
|
||||
appname == "wal_proposer_recovery" ||
|
||||
// set by safekeeper peer recovery
|
||||
appname.starts_with("safekeeper")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ use tokio::io::AsyncReadExt;
|
||||
use utils::http::endpoint::request_span;
|
||||
|
||||
use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{debug_dump, pull_timeline};
|
||||
@@ -60,16 +60,25 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
||||
.as_ref()
|
||||
}
|
||||
|
||||
/// Same as TermSwitchEntry, but serializes LSN using display serializer
|
||||
/// Same as TermLsn, but serializes LSN using display serializer
|
||||
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct TermSwitchApiEntry {
|
||||
pub term: Term,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl From<TermSwitchApiEntry> for TermLsn {
|
||||
fn from(api_val: TermSwitchApiEntry) -> Self {
|
||||
TermLsn {
|
||||
term: api_val.term,
|
||||
lsn: api_val.lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Augment AcceptorState with epoch for convenience
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AcceptorStateStatus {
|
||||
|
||||
@@ -62,6 +62,7 @@ pub struct SafeKeeperConf {
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
pub heartbeat_timeout: Duration,
|
||||
pub peer_recovery_enabled: bool,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
pub backup_parallel_jobs: usize,
|
||||
@@ -100,6 +101,7 @@ impl SafeKeeperConf {
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint"),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
peer_recovery_enabled: true,
|
||||
wal_backup_enabled: true,
|
||||
backup_parallel_jobs: 1,
|
||||
pg_auth: None,
|
||||
|
||||
@@ -55,9 +55,12 @@ impl WalReceivers {
|
||||
|
||||
/// Register new walreceiver. Returned guard provides access to the slot and
|
||||
/// automatically deregisters in Drop.
|
||||
pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
|
||||
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
|
||||
let slots = &mut self.mutex.lock().slots;
|
||||
let walreceiver = WalReceiverState::Voting;
|
||||
let walreceiver = WalReceiverState {
|
||||
conn_id,
|
||||
status: WalReceiverStatus::Voting,
|
||||
};
|
||||
// find empty slot or create new one
|
||||
let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
|
||||
slots[pos] = Some(walreceiver);
|
||||
@@ -96,6 +99,18 @@ impl WalReceivers {
|
||||
self.mutex.lock().slots.iter().flatten().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get number of streaming walreceivers (normally 0 or 1) from compute.
|
||||
pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
|
||||
self.mutex
|
||||
.lock()
|
||||
.slots
|
||||
.iter()
|
||||
.flatten()
|
||||
// conn_id.is_none skips recovery which also registers here
|
||||
.filter(|s| s.conn_id.is_none() && matches!(s.status, WalReceiverStatus::Streaming))
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Unregister walsender.
|
||||
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
|
||||
let mut shared = self.mutex.lock();
|
||||
@@ -108,10 +123,17 @@ struct WalReceiversShared {
|
||||
slots: Vec<Option<WalReceiverState>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WalReceiverState {
|
||||
/// None means it is recovery initiated by us (this safekeeper).
|
||||
pub conn_id: Option<ConnectionId>,
|
||||
pub status: WalReceiverStatus,
|
||||
}
|
||||
|
||||
/// Walreceiver status. Currently only whether it passed voting stage and
|
||||
/// started receiving the stream, but it is easy to add more if needed.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum WalReceiverState {
|
||||
pub enum WalReceiverStatus {
|
||||
Voting,
|
||||
Streaming,
|
||||
}
|
||||
@@ -136,8 +158,8 @@ impl Drop for WalReceiverGuard {
|
||||
}
|
||||
}
|
||||
|
||||
const MSG_QUEUE_SIZE: usize = 256;
|
||||
const REPLY_QUEUE_SIZE: usize = 16;
|
||||
pub const MSG_QUEUE_SIZE: usize = 256;
|
||||
pub const REPLY_QUEUE_SIZE: usize = 16;
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
/// Wrapper around handle_start_wal_push_guts handling result. Error is
|
||||
@@ -261,7 +283,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
tli.clone(),
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
self.conn_id,
|
||||
Some(self.conn_id),
|
||||
));
|
||||
|
||||
// Forward all messages to WalAcceptor
|
||||
@@ -317,31 +339,41 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
// even when it writes a steady stream of messages.
|
||||
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
|
||||
struct WalAcceptor {
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx; reading from socket and writing to disk in parallel is
|
||||
/// beneficial for performance, this struct provides writing to disk part.
|
||||
pub struct WalAcceptor {
|
||||
tli: Arc<Timeline>,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
conn_id: Option<ConnectionId>,
|
||||
}
|
||||
|
||||
impl WalAcceptor {
|
||||
/// Spawn thread with WalAcceptor running, return handle to it.
|
||||
fn spawn(
|
||||
/// Spawn task with WalAcceptor running, return handle to it. Task returns
|
||||
/// Ok(()) if either of channels has closed, and Err if any error during
|
||||
/// message processing is encountered.
|
||||
///
|
||||
/// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
|
||||
pub fn spawn(
|
||||
tli: Arc<Timeline>,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
conn_id: ConnectionId,
|
||||
conn_id: Option<ConnectionId>,
|
||||
) -> JoinHandle<anyhow::Result<()>> {
|
||||
task::spawn(async move {
|
||||
let mut wa = WalAcceptor {
|
||||
tli,
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
conn_id,
|
||||
};
|
||||
|
||||
let span_ttid = wa.tli.ttid; // satisfy borrow checker
|
||||
wa.run()
|
||||
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
|
||||
.instrument(
|
||||
info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
|
||||
)
|
||||
.await
|
||||
})
|
||||
}
|
||||
@@ -355,7 +387,7 @@ impl WalAcceptor {
|
||||
let _compute_conn_guard = ComputeConnectionGuard {
|
||||
timeline: Arc::clone(&self.tli),
|
||||
};
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register();
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
self.tli.update_status_notify().await?;
|
||||
|
||||
// After this timestamp we will stop processing AppendRequests and send a response
|
||||
@@ -372,7 +404,7 @@ impl WalAcceptor {
|
||||
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
|
||||
*walreceiver_guard.get() = WalReceiverState::Streaming;
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
}
|
||||
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
|
||||
@@ -1,17 +1,41 @@
|
||||
//! This module implements pulling WAL from peer safekeepers if compute can't
|
||||
//! provide it, i.e. safekeeper lags too much.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::{fmt, pin::pin, sync::Arc};
|
||||
|
||||
use tokio::{select, time::sleep, time::Duration};
|
||||
use tracing::{info, instrument};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::time::timeout;
|
||||
use tokio::{
|
||||
select,
|
||||
time::sleep,
|
||||
time::{self, Duration},
|
||||
};
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::{timeline::Timeline, SafeKeeperConf};
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
use crate::safekeeper::{AppendRequest, AppendRequestHeader};
|
||||
use crate::{
|
||||
http::routes::TimelineStatus,
|
||||
receive_wal::MSG_QUEUE_SIZE,
|
||||
safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
|
||||
TermLsn, VoteRequest,
|
||||
},
|
||||
timeline::{PeerInfo, Timeline},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
/// Entrypoint for per timeline task which always runs, checking whether
|
||||
/// recovery for this safekeeper is needed and starting it if so.
|
||||
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
info!("started");
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
@@ -22,19 +46,387 @@ pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
|
||||
};
|
||||
|
||||
select! {
|
||||
_ = recovery_main_loop(tli) => { unreachable!() }
|
||||
_ = recovery_main_loop(tli, conf) => { unreachable!() }
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("stopped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and
|
||||
/// fields to explain the choice.
|
||||
#[derive(Debug)]
|
||||
pub struct RecoveryNeededInfo {
|
||||
/// my term
|
||||
pub term: Term,
|
||||
/// my last_log_term
|
||||
pub last_log_term: Term,
|
||||
/// my flush_lsn
|
||||
pub flush_lsn: Lsn,
|
||||
/// peers from which we can fetch WAL, for observability.
|
||||
pub peers: Vec<PeerInfo>,
|
||||
/// for observability
|
||||
pub num_streaming_computes: usize,
|
||||
pub donors: Vec<Donor>,
|
||||
}
|
||||
|
||||
// Custom to omit not important fields from PeerInfo.
|
||||
impl fmt::Display for RecoveryNeededInfo {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
write!(
|
||||
f,
|
||||
"term: {}, last_log_term: {}, flush_lsn: {}, peers: {{",
|
||||
self.term, self.last_log_term, self.flush_lsn
|
||||
)?;
|
||||
for p in self.peers.iter() {
|
||||
write!(
|
||||
f,
|
||||
"PeerInfo {{ sk_id: {}, term: {}, last_log_term: {}, flush_lsn: {} }}, ",
|
||||
p.sk_id, p.term, p.last_log_term, p.flush_lsn
|
||||
)?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"}} num_streaming_computes: {}, donors: {:?}",
|
||||
self.num_streaming_computes, self.donors
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Donor {
|
||||
pub sk_id: NodeId,
|
||||
/// equals to last_log_term
|
||||
pub term: Term,
|
||||
pub flush_lsn: Lsn,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
}
|
||||
|
||||
impl From<&PeerInfo> for Donor {
|
||||
fn from(p: &PeerInfo) -> Self {
|
||||
Donor {
|
||||
sk_id: p.sk_id,
|
||||
term: p.term,
|
||||
flush_lsn: p.flush_lsn,
|
||||
pg_connstr: p.pg_connstr.clone(),
|
||||
http_connstr: p.http_connstr.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const CHECK_INTERVAL_MS: u64 = 2000;
|
||||
|
||||
/// Check regularly whether we need to start recovery.
|
||||
async fn recovery_main_loop(_tli: Arc<Timeline>) {
|
||||
async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
|
||||
loop {
|
||||
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
|
||||
match recovery_needed_info.donors.first() {
|
||||
Some(donor) => {
|
||||
info!(
|
||||
"starting recovery from donor {}: {}",
|
||||
donor.sk_id, recovery_needed_info
|
||||
);
|
||||
match recover(tli.clone(), donor, &conf).await {
|
||||
// Note: 'write_wal rewrites WAL written before' error is
|
||||
// expected here and might happen if compute and recovery
|
||||
// concurrently write the same data. Eventually compute
|
||||
// should win.
|
||||
Err(e) => warn!("recovery failed: {:#}", e),
|
||||
Ok(msg) => info!("recovery finished: {}", msg),
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!(
|
||||
"recovery not needed or not possible: {}",
|
||||
recovery_needed_info
|
||||
);
|
||||
}
|
||||
}
|
||||
sleep(check_duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Recover from the specified donor. Returns message explaining normal finish
|
||||
/// reason or error.
|
||||
async fn recover(
|
||||
tli: Arc<Timeline>,
|
||||
donor: &Donor,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// Learn donor term switch history to figure out starting point.
|
||||
let client = reqwest::Client::new();
|
||||
let timeline_info: TimelineStatus = client
|
||||
.get(format!(
|
||||
"http://{}/v1/tenant/{}/timeline/{}",
|
||||
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
if timeline_info.acceptor_state.term != donor.term {
|
||||
bail!(
|
||||
"donor term changed from {} to {}",
|
||||
donor.term,
|
||||
timeline_info.acceptor_state.term
|
||||
);
|
||||
}
|
||||
// convert from API TermSwitchApiEntry into TermLsn.
|
||||
let donor_th = TermHistory(
|
||||
timeline_info
|
||||
.acceptor_state
|
||||
.term_history
|
||||
.iter()
|
||||
.map(|tl| Into::<TermLsn>::into(*tl))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
// Now understand our term history.
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
|
||||
let vote_response = match tli
|
||||
.process_msg(&vote_request)
|
||||
.await
|
||||
.context("VoteRequest handling")?
|
||||
{
|
||||
Some(AcceptorProposerMessage::VoteResponse(vr)) => vr,
|
||||
_ => {
|
||||
bail!("unexpected VoteRequest response"); // unreachable
|
||||
}
|
||||
};
|
||||
if vote_response.term != donor.term {
|
||||
bail!(
|
||||
"our term changed from {} to {}",
|
||||
donor.term,
|
||||
vote_response.term
|
||||
);
|
||||
}
|
||||
|
||||
let last_common_point = match TermHistory::find_highest_common_point(
|
||||
&donor_th,
|
||||
&vote_response.term_history,
|
||||
vote_response.flush_lsn,
|
||||
) {
|
||||
None => bail!(
|
||||
"couldn't find common point in histories, donor {:?}, sk {:?}",
|
||||
donor_th,
|
||||
vote_response.term_history,
|
||||
),
|
||||
Some(lcp) => lcp,
|
||||
};
|
||||
info!("found last common point at {:?}", last_common_point);
|
||||
|
||||
// truncate WAL locally
|
||||
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term: donor.term,
|
||||
start_streaming_at: last_common_point.lsn,
|
||||
term_history: donor_th,
|
||||
timeline_start_lsn: Lsn::INVALID,
|
||||
});
|
||||
// Successful ProposerElected handling always returns None. If term changed,
|
||||
// we'll find out that during the streaming. Note: it is expected to get
|
||||
// 'refusing to overwrite correct WAL' here if walproposer reconnected
|
||||
// concurrently, restart helps here.
|
||||
tli.process_msg(&pe)
|
||||
.await
|
||||
.context("ProposerElected handling")?;
|
||||
|
||||
recovery_stream(tli, donor, last_common_point.lsn, conf).await
|
||||
}
|
||||
|
||||
// Pull WAL from donor, assuming handshake is already done.
|
||||
async fn recovery_stream(
|
||||
tli: Arc<Timeline>,
|
||||
donor: &Donor,
|
||||
start_streaming_at: Lsn,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
|
||||
|
||||
let connect_timeout = Duration::from_millis(10000);
|
||||
let (client, connection) = match time::timeout(connect_timeout, cfg.connect(postgres::NoTls))
|
||||
.await
|
||||
{
|
||||
Ok(client_and_conn) => client_and_conn?,
|
||||
Err(_elapsed) => {
|
||||
bail!("timed out while waiting {connect_timeout:?} for connection to peer safekeeper to open");
|
||||
}
|
||||
};
|
||||
trace!("connected to {:?}", donor);
|
||||
|
||||
// The connection object performs the actual communication with the
|
||||
// server, spawn it off to run on its own.
|
||||
let ttid = tli.ttid;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection
|
||||
.instrument(info_span!("recovery task connection poll", ttid = %ttid))
|
||||
.await
|
||||
{
|
||||
// This logging isn't very useful as error is anyway forwarded to client.
|
||||
trace!(
|
||||
"tokio_postgres connection object finished with error: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let query = format!(
|
||||
"START_REPLICATION PHYSICAL {} (term='{}')",
|
||||
start_streaming_at, donor.term
|
||||
);
|
||||
|
||||
let copy_stream = client.copy_both_simple(&query).await?;
|
||||
let physical_stream = ReplicationStream::new(copy_stream);
|
||||
|
||||
// As in normal walreceiver, do networking and writing to disk in parallel.
|
||||
let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
|
||||
let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
|
||||
let wa = WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, None);
|
||||
|
||||
let res = tokio::select! {
|
||||
r = network_io(physical_stream, msg_tx, donor.clone(), tli.clone(), conf.clone()) => r,
|
||||
r = read_replies(reply_rx, donor.term) => r.map(|()| None),
|
||||
};
|
||||
|
||||
// Join the spawned WalAcceptor. At this point chans to/from it passed to
|
||||
// network routines are dropped, so it will exit as soon as it touches them.
|
||||
match wa.await {
|
||||
Ok(Ok(())) => {
|
||||
// WalAcceptor finished normally, termination reason is different
|
||||
match res {
|
||||
Ok(Some(success_desc)) => Ok(success_desc),
|
||||
Ok(None) => bail!("unexpected recovery end without error/success"), // can't happen
|
||||
Err(e) => Err(e), // network error or term change
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => Err(e), // error while processing message
|
||||
Err(e) => bail!("WalAcceptor panicked: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// Perform network part of streaming: read data and push it to msg_tx, send KA
|
||||
// to make sender hear from us. If there is nothing coming for a while, check
|
||||
// for termination.
|
||||
// Returns
|
||||
// - Ok(None) if channel to WalAcceptor closed -- its task should return error.
|
||||
// - Ok(Some(String)) if recovery successfully completed.
|
||||
// - Err if error happened while reading/writing to socket.
|
||||
async fn network_io(
|
||||
physical_stream: ReplicationStream,
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
donor: Donor,
|
||||
tli: Arc<Timeline>,
|
||||
conf: SafeKeeperConf,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let mut physical_stream = pin!(physical_stream);
|
||||
let mut last_received_lsn = Lsn::INVALID;
|
||||
// tear down connection if no data arrives withing this period
|
||||
let no_data_timeout = Duration::from_millis(30000);
|
||||
|
||||
loop {
|
||||
let msg = match timeout(no_data_timeout, physical_stream.next()).await {
|
||||
Ok(next) => match next {
|
||||
None => bail!("unexpected end of replication stream"),
|
||||
Some(msg) => msg.context("get replication message")?,
|
||||
},
|
||||
Err(_) => bail!("no message received within {:?}", no_data_timeout),
|
||||
};
|
||||
|
||||
match msg {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
term: donor.term,
|
||||
epoch_start_lsn: Lsn::INVALID, // unused
|
||||
begin_lsn: Lsn(xlog_data.wal_start()),
|
||||
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
|
||||
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
|
||||
truncate_lsn: Lsn::INVALID, // do not attempt to advance
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let ar = AppendRequest {
|
||||
h: ar_hdr,
|
||||
wal_data: xlog_data.into_data(),
|
||||
};
|
||||
trace!(
|
||||
"processing AppendRequest {}-{}, len {}",
|
||||
ar.h.begin_lsn,
|
||||
ar.h.end_lsn,
|
||||
ar.wal_data.len()
|
||||
);
|
||||
last_received_lsn = ar.h.end_lsn;
|
||||
if msg_tx
|
||||
.send(ProposerAcceptorMessage::AppendRequest(ar))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Ok(None); // chan closed, WalAcceptor terminated
|
||||
}
|
||||
}
|
||||
ReplicationMessage::PrimaryKeepAlive(_) => {
|
||||
// keepalive means nothing is being streamed for a while. Check whether we need to stop.
|
||||
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
|
||||
// do current donors still contain one we currently connected to?
|
||||
if !recovery_needed_info
|
||||
.donors
|
||||
.iter()
|
||||
.any(|d| d.sk_id == donor.sk_id)
|
||||
{
|
||||
// Most likely it means we are caughtup.
|
||||
// note: just exiting makes tokio_postgres send CopyFail to the far end.
|
||||
return Ok(Some(format!(
|
||||
"terminating at {} as connected safekeeper {} with term {} is not a donor anymore: {}",
|
||||
last_received_lsn, donor.sk_id, donor.term, recovery_needed_info
|
||||
)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
// Send reply to each message to keep connection alive. Ideally we
|
||||
// should do that once in a while instead, but this again requires
|
||||
// stream split or similar workaround, and recovery is anyway not that
|
||||
// performance critical.
|
||||
//
|
||||
// We do not know here real write/flush LSNs (need to take mutex again
|
||||
// or check replies which are read in different future), but neither
|
||||
// sender much cares about them, so just send last received.
|
||||
physical_stream
|
||||
.as_mut()
|
||||
.standby_status_update(
|
||||
PgLsn::from(last_received_lsn.0),
|
||||
PgLsn::from(last_received_lsn.0),
|
||||
PgLsn::from(last_received_lsn.0),
|
||||
SystemTime::now(),
|
||||
0,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Read replies from WalAcceptor. We are not interested much in sending them to
|
||||
// donor safekeeper, so don't route them anywhere. However, we should check if
|
||||
// term changes and exit if it does.
|
||||
// Returns Ok(()) if channel closed, Err in case of term change.
|
||||
async fn read_replies(
|
||||
mut reply_rx: Receiver<AcceptorProposerMessage>,
|
||||
donor_term: Term,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
match reply_rx.recv().await {
|
||||
Some(msg) => {
|
||||
if let AcceptorProposerMessage::AppendResponse(ar) = msg {
|
||||
if ar.term != donor_term {
|
||||
bail!("donor term changed from {} to {}", donor_term, ar.term);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => return Ok(()), // chan closed, WalAcceptor terminated
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +91,59 @@ impl TermHistory {
|
||||
}
|
||||
TermHistory(res)
|
||||
}
|
||||
|
||||
/// Find point of divergence between leader (walproposer) term history and
|
||||
/// safekeeper. Arguments are not symmetrics as proposer history ends at
|
||||
/// +infinity while safekeeper at flush_lsn.
|
||||
/// C version is at walproposer SendProposerElected.
|
||||
pub fn find_highest_common_point(
|
||||
prop_th: &TermHistory,
|
||||
sk_th: &TermHistory,
|
||||
sk_wal_end: Lsn,
|
||||
) -> Option<TermLsn> {
|
||||
let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
|
||||
// find last common term, if any...
|
||||
let mut last_common_idx = None;
|
||||
for i in 0..min(sk_th.len(), prop_th.len()) {
|
||||
if prop_th[i].term != sk_th[i].term {
|
||||
break;
|
||||
}
|
||||
// If term is the same, LSN must be equal as well.
|
||||
assert!(
|
||||
prop_th[i].lsn == sk_th[i].lsn,
|
||||
"same term {} has different start LSNs: prop {}, sk {}",
|
||||
prop_th[i].term,
|
||||
prop_th[i].lsn,
|
||||
sk_th[i].lsn
|
||||
);
|
||||
last_common_idx = Some(i);
|
||||
}
|
||||
let last_common_idx = match last_common_idx {
|
||||
None => return None, // no common point
|
||||
Some(lci) => lci,
|
||||
};
|
||||
// Now find where it ends at both prop and sk and take min. End of
|
||||
// (common) term is the start of the next except it is the last one;
|
||||
// there it is flush_lsn in case of safekeeper or, in case of proposer
|
||||
// +infinity, so we just take flush_lsn then.
|
||||
if last_common_idx == prop_th.len() - 1 {
|
||||
Some(TermLsn {
|
||||
term: prop_th[last_common_idx].term,
|
||||
lsn: sk_wal_end,
|
||||
})
|
||||
} else {
|
||||
let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
|
||||
let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
|
||||
sk_th[last_common_idx + 1].lsn
|
||||
} else {
|
||||
sk_wal_end
|
||||
};
|
||||
Some(TermLsn {
|
||||
term: prop_th[last_common_idx].term,
|
||||
lsn: min(prop_common_term_end, sk_common_term_end),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Display only latest entries for Debug.
|
||||
@@ -305,19 +358,19 @@ pub struct AcceptorGreeting {
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct VoteRequest {
|
||||
term: Term,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
/// Vote itself, sent from safekeeper to proposer
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct VoteResponse {
|
||||
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
vote_given: u64, // fixme u64 due to padding
|
||||
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
|
||||
// proposer to choose the most advanced one.
|
||||
flush_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
truncate_lsn: Lsn,
|
||||
term_history: TermHistory,
|
||||
pub term_history: TermHistory,
|
||||
timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
@@ -344,7 +397,8 @@ pub struct AppendRequest {
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
// LSN since the proposer appends WAL; determines epoch switch point.
|
||||
// TODO: remove this field, it in unused -- LSN of term switch can be taken
|
||||
// from ProposerElected (as well as from term history).
|
||||
pub epoch_start_lsn: Lsn,
|
||||
/// start position of message in WAL
|
||||
pub begin_lsn: Lsn,
|
||||
@@ -759,7 +813,7 @@ where
|
||||
bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
|
||||
msg.term, self.flush_lsn(), msg.start_streaming_at)
|
||||
}
|
||||
// Otherwise this shouldn't happen.
|
||||
// Otherwise we must never attempt to truncate committed data.
|
||||
assert!(
|
||||
msg.start_streaming_at >= self.inmem.commit_lsn,
|
||||
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
|
||||
@@ -810,6 +864,14 @@ where
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
|
||||
// Cache LSN where term starts to immediately fsync control file with
|
||||
// commit_lsn once we reach it -- sync-safekeepers finishes when
|
||||
// persisted commit_lsn on majority of safekeepers aligns.
|
||||
self.epoch_start_lsn = match msg.term_history.0.last() {
|
||||
None => bail!("proposer elected with empty term history"),
|
||||
Some(term_lsn_start) => term_lsn_start.lsn,
|
||||
};
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -835,10 +897,7 @@ where
|
||||
// file: walproposer in sync mode is very interested when this
|
||||
// happens. Note: this is for sync-safekeepers mode only, as
|
||||
// otherwise commit_lsn might jump over epoch_start_lsn.
|
||||
// Also note that commit_lsn can reach epoch_start_lsn earlier
|
||||
// that we receive new epoch_start_lsn, and we still need to sync
|
||||
// control file in this case.
|
||||
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
|
||||
if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
|
||||
self.persist_control_file(self.state.clone()).await?;
|
||||
}
|
||||
|
||||
@@ -902,7 +961,6 @@ where
|
||||
// Now we know that we are in the same term as the proposer,
|
||||
// processing the message.
|
||||
|
||||
self.epoch_start_lsn = msg.h.epoch_start_lsn;
|
||||
self.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
|
||||
// do the job
|
||||
@@ -1185,4 +1243,65 @@ mod tests {
|
||||
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
|
||||
assert_eq!(sk.get_epoch(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_none() {
|
||||
let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
|
||||
let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_middle() {
|
||||
let prop_th = TermHistory(vec![
|
||||
(1, Lsn(10)).into(),
|
||||
(2, Lsn(20)).into(),
|
||||
(4, Lsn(40)).into(),
|
||||
]);
|
||||
let sk_th = TermHistory(vec![
|
||||
(1, Lsn(10)).into(),
|
||||
(2, Lsn(20)).into(),
|
||||
(3, Lsn(30)).into(), // sk ends last common term 2 at 30
|
||||
]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
|
||||
Some(TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(30),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_sk_end() {
|
||||
let prop_th = TermHistory(vec![
|
||||
(1, Lsn(10)).into(),
|
||||
(2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
|
||||
(4, Lsn(40)).into(),
|
||||
]);
|
||||
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
|
||||
Some(TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(32),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_walprop() {
|
||||
let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
|
||||
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
|
||||
Some(TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(32),
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,10 +418,11 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_))
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
appname
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -680,7 +681,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
}
|
||||
}
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||
|
||||
@@ -11,6 +11,7 @@ use serde_with::DisplayFromStr;
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use tokio::{
|
||||
sync::{mpsc::Sender, watch},
|
||||
@@ -27,7 +28,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::recovery::recovery_main;
|
||||
use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
|
||||
@@ -45,11 +46,12 @@ use crate::{debug_dump, wal_storage};
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PeerInfo {
|
||||
pub sk_id: NodeId,
|
||||
pub term: Term,
|
||||
/// Term of the last entry.
|
||||
_last_log_term: Term,
|
||||
pub last_log_term: Term,
|
||||
/// LSN of the last record.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
_flush_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub commit_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
|
||||
@@ -61,16 +63,21 @@ pub struct PeerInfo {
|
||||
#[serde(skip)]
|
||||
#[serde(default = "Instant::now")]
|
||||
ts: Instant,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
}
|
||||
|
||||
impl PeerInfo {
|
||||
fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
|
||||
PeerInfo {
|
||||
sk_id: NodeId(sk_info.safekeeper_id),
|
||||
_last_log_term: sk_info.last_log_term,
|
||||
_flush_lsn: Lsn(sk_info.flush_lsn),
|
||||
term: sk_info.term,
|
||||
last_log_term: sk_info.last_log_term,
|
||||
flush_lsn: Lsn(sk_info.flush_lsn),
|
||||
commit_lsn: Lsn(sk_info.commit_lsn),
|
||||
local_start_lsn: Lsn(sk_info.local_start_lsn),
|
||||
pg_connstr: sk_info.safekeeper_connstr.clone(),
|
||||
http_connstr: sk_info.http_connstr.clone(),
|
||||
ts,
|
||||
}
|
||||
}
|
||||
@@ -265,6 +272,20 @@ impl SharedState {
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get our latest view of alive peers status on the timeline.
|
||||
/// We pass our own info through the broker as well, so when we don't have connection
|
||||
/// to the broker returned vec is empty.
|
||||
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
|
||||
let now = Instant::now();
|
||||
self.peers_info
|
||||
.0
|
||||
.iter()
|
||||
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
|
||||
.filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -446,7 +467,9 @@ impl Timeline {
|
||||
/// Bootstrap new or existing timeline starting background stasks.
|
||||
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
|
||||
// Start recovery task which always runs on the timeline.
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
if conf.peer_recovery_enabled {
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
@@ -680,20 +703,88 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get our latest view of alive peers status on the timeline.
|
||||
/// We pass our own info through the broker as well, so when we don't have connection
|
||||
/// to the broker returned vec is empty.
|
||||
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let now = Instant::now();
|
||||
shared_state
|
||||
.peers_info
|
||||
.0
|
||||
.iter()
|
||||
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
|
||||
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
|
||||
.cloned()
|
||||
.collect()
|
||||
shared_state.get_peers(conf.heartbeat_timeout)
|
||||
}
|
||||
|
||||
/// Should we start fetching WAL from a peer safekeeper, and if yes, from
|
||||
/// which? Answer is yes, i.e. .donors is not empty if 1) there is something
|
||||
/// to fetch, and we can do that without running elections; 2) there is no
|
||||
/// actively streaming compute, as we don't want to compete with it.
|
||||
///
|
||||
/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
|
||||
/// to its last_log_term so we are sure such a leader ever had been elected.
|
||||
///
|
||||
/// All possible donors are returned so that we could keep connection to the
|
||||
/// current one if it is good even if it slightly lags behind.
|
||||
///
|
||||
/// Note that term conditions above might be not met, but safekeepers are
|
||||
/// still not aligned on last flush_lsn. Generally in this case until
|
||||
/// elections are run it is not possible to say which safekeeper should
|
||||
/// recover from which one -- history which would be committed is different
|
||||
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
|
||||
/// Thus we don't try to predict it here.
|
||||
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
|
||||
let ss = self.write_shared_state().await;
|
||||
let term = ss.sk.state.acceptor_state.term;
|
||||
let last_log_term = ss.sk.get_epoch();
|
||||
let flush_lsn = ss.sk.flush_lsn();
|
||||
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
|
||||
let mut peers = ss.get_peers(heartbeat_timeout);
|
||||
// Sort by <last log term, lsn> pairs.
|
||||
peers.sort_by(|p1, p2| {
|
||||
let tl1 = TermLsn {
|
||||
term: p1.last_log_term,
|
||||
lsn: p1.flush_lsn,
|
||||
};
|
||||
let tl2 = TermLsn {
|
||||
term: p2.last_log_term,
|
||||
lsn: p2.flush_lsn,
|
||||
};
|
||||
tl2.cmp(&tl1) // desc
|
||||
});
|
||||
let num_streaming_computes = self.walreceivers.get_num_streaming();
|
||||
let donors = if num_streaming_computes > 0 {
|
||||
vec![] // If there is a streaming compute, don't try to recover to not intervene.
|
||||
} else {
|
||||
peers
|
||||
.iter()
|
||||
.filter_map(|candidate| {
|
||||
// Are we interested in this candidate?
|
||||
let candidate_tl = TermLsn {
|
||||
term: candidate.last_log_term,
|
||||
lsn: candidate.flush_lsn,
|
||||
};
|
||||
let my_tl = TermLsn {
|
||||
term: last_log_term,
|
||||
lsn: flush_lsn,
|
||||
};
|
||||
if my_tl < candidate_tl {
|
||||
// Yes, we are interested. Can we pull from it without
|
||||
// (re)running elections? It is possible if 1) his term
|
||||
// is equal to his last_log_term so we could act on
|
||||
// behalf of leader of this term (we must be sure he was
|
||||
// ever elected) and 2) our term is not higher, or we'll refuse data.
|
||||
if candidate.term == candidate.last_log_term && candidate.term >= term {
|
||||
Some(Donor::from(candidate))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
RecoveryNeededInfo {
|
||||
term,
|
||||
last_log_term,
|
||||
flush_lsn,
|
||||
peers,
|
||||
num_streaming_computes,
|
||||
donors,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_walsenders(&self) -> &Arc<WalSenders> {
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Script to download the basebackup from a pageserver to a tar file.
|
||||
#
|
||||
# This can be useful in disaster recovery.
|
||||
#
|
||||
import argparse
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
|
||||
|
||||
def main(args: argparse.Namespace):
|
||||
pageserver_connstr = args.pageserver_connstr
|
||||
tenant_id = args.tenant
|
||||
timeline_id = args.timeline
|
||||
lsn = args.lsn
|
||||
output_path = args.output_path
|
||||
|
||||
psconn: PgConnection = psycopg2.connect(pageserver_connstr)
|
||||
psconn.autocommit = True
|
||||
|
||||
output = open(output_path, "wb")
|
||||
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.copy_expert(f"basebackup {tenant_id} {timeline_id} {lsn}", output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--tenant-id",
|
||||
dest="tenant",
|
||||
required=True,
|
||||
help="Id of the tenant",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeline-id",
|
||||
dest="timeline",
|
||||
required=True,
|
||||
help="Id of the timeline",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lsn",
|
||||
dest="lsn",
|
||||
required=True,
|
||||
help="LSN to take the basebackup at",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pageserver-connstr",
|
||||
dest="pageserver_connstr",
|
||||
required=True,
|
||||
help="libpq connection string of the pageserver",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
dest="output_path",
|
||||
required=True,
|
||||
help="output path to write the basebackup to",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
main(args)
|
||||
@@ -2691,6 +2691,20 @@ class Safekeeper:
|
||||
def data_dir(self) -> str:
|
||||
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
|
||||
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> str:
|
||||
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
Get list of segment names of the given timeline.
|
||||
"""
|
||||
tli_dir = self.timeline_dir(tenant_id, timeline_id)
|
||||
segments = []
|
||||
for _, _, filenames in os.walk(tli_dir):
|
||||
segments.extend([f for f in filenames if f != "safekeeper.control"])
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
import threading
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
|
||||
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_lfc_resize", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_lfc_resize",
|
||||
config_lines=[
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
],
|
||||
)
|
||||
n_resize = 10
|
||||
scale = 10
|
||||
log.info("postgres is running on 'test_lfc_resize' branch")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr])
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
thread.start()
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
for i in range(n_resize):
|
||||
cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
time.sleep(1)
|
||||
|
||||
thread.join()
|
||||
@@ -157,6 +157,8 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
|
||||
|
||||
# insert something to force sk -> ps message
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
# Wait to make sure that we get a latest WAL receiver data.
|
||||
# We need to wait here because it's possible that we don't have access to
|
||||
# the latest WAL yet, when the `timeline_detail` API is first called.
|
||||
@@ -168,7 +170,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
# Make a DB modification then expect getting a new WAL receiver's data.
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
endpoint.safe_psql("INSERT INTO t VALUES (1, 'hey')")
|
||||
wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, Set
|
||||
|
||||
@@ -30,7 +28,6 @@ def test_metric_collection(
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
@@ -38,9 +35,7 @@ def test_metric_collection(
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
uploads.put(events)
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
@@ -48,12 +43,15 @@ def test_metric_collection(
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
"""
|
||||
+ "tenant_config={pitr_interval = '0 sec'}"
|
||||
)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -65,7 +63,7 @@ def test_metric_collection(
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
env = neon_env_builder.init_start()
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
tenant_id = env.initial_tenant
|
||||
@@ -126,20 +124,19 @@ def test_metric_collection(
|
||||
events = uploads.get(timeout=timeout)
|
||||
|
||||
if events == "ready":
|
||||
(events, is_last) = uploads.get(timeout=timeout)
|
||||
v.ingest(events, is_last)
|
||||
events = uploads.get(timeout=timeout)
|
||||
v.ingest(events)
|
||||
break
|
||||
else:
|
||||
(events, is_last) = events
|
||||
v.ingest(events, is_last)
|
||||
v.ingest(events)
|
||||
|
||||
if "synthetic_storage_size" not in v.accepted_event_names():
|
||||
log.info("waiting for synthetic storage size to be calculated and uploaded...")
|
||||
|
||||
rounds = 0
|
||||
while "synthetic_storage_size" not in v.accepted_event_names():
|
||||
(events, is_last) = uploads.get(timeout=timeout)
|
||||
v.ingest(events, is_last)
|
||||
events = uploads.get(timeout=timeout)
|
||||
v.ingest(events)
|
||||
rounds += 1
|
||||
assert rounds < 10, "did not get synthetic_storage_size in 10 uploads"
|
||||
# once we have it in verifiers, it will assert that future batches will contain it
|
||||
@@ -153,161 +150,17 @@ def test_metric_collection(
|
||||
events = uploads.get(timeout=timeout)
|
||||
|
||||
if events == "ready":
|
||||
(events, is_last) = uploads.get(timeout=timeout * 3)
|
||||
v.ingest(events, is_last)
|
||||
(events, is_last) = uploads.get(timeout=timeout)
|
||||
v.ingest(events, is_last)
|
||||
events = uploads.get(timeout=timeout * 3)
|
||||
v.ingest(events)
|
||||
events = uploads.get(timeout=timeout)
|
||||
v.ingest(events)
|
||||
break
|
||||
else:
|
||||
(events, is_last) = events
|
||||
v.ingest(events, is_last)
|
||||
v.ingest(events)
|
||||
|
||||
httpserver.check()
|
||||
|
||||
|
||||
def test_metric_collection_cleans_up_tempfile(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO foo
|
||||
SELECT g, 0, 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
|
||||
# so give 5s we only want to get the following upload after "ready" value.
|
||||
timeout = 5
|
||||
|
||||
# these strings in the upload queue allow synchronizing with the uploads
|
||||
# and the main test execution
|
||||
uploads.put("ready")
|
||||
|
||||
while True:
|
||||
events = uploads.get(timeout=timeout)
|
||||
|
||||
if events == "ready":
|
||||
(events, _) = uploads.get(timeout=timeout)
|
||||
break
|
||||
|
||||
# should really configure an env?
|
||||
pageserver_http.configure_failpoints(("before-persist-last-metrics-collected", "exit"))
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
initially = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
|
||||
|
||||
assert (
|
||||
len(initially.matching) == 2
|
||||
), f"expecting actual file and tempfile, but not found: {initially.matching}"
|
||||
|
||||
uploads.put("ready")
|
||||
env.pageserver.start()
|
||||
|
||||
while True:
|
||||
events = uploads.get(timeout=timeout * 3)
|
||||
|
||||
if events == "ready":
|
||||
(events, _) = uploads.get(timeout=timeout)
|
||||
break
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
later = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
|
||||
|
||||
# it is possible we shutdown the pageserver right at the correct time, so the old tempfile
|
||||
# is gone, but we also have a new one.
|
||||
only = set(["last_consumption_metrics.json"])
|
||||
assert (
|
||||
initially.matching.intersection(later.matching) == only
|
||||
), "only initial tempfile should had been removed"
|
||||
assert initially.other.issuperset(later.other), "no other files should had been removed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrefixPartitionedFiles:
|
||||
matching: Set[str]
|
||||
other: Set[str]
|
||||
|
||||
|
||||
def iterate_pageserver_workdir(path: Path, prefix: str) -> PrefixPartitionedFiles:
|
||||
"""
|
||||
Iterates the files in the workdir, returns two sets:
|
||||
- files with the prefix
|
||||
- files without the prefix
|
||||
"""
|
||||
|
||||
matching = set()
|
||||
other = set()
|
||||
for entry in path.iterdir():
|
||||
if not entry.is_file():
|
||||
continue
|
||||
|
||||
if not entry.name.startswith(prefix):
|
||||
other.add(entry.name)
|
||||
else:
|
||||
matching.add(entry.name)
|
||||
|
||||
return PrefixPartitionedFiles(matching, other)
|
||||
|
||||
|
||||
class MetricsVerifier:
|
||||
"""
|
||||
A graph of per tenant per timeline verifiers, allowing one for each
|
||||
@@ -318,7 +171,7 @@ class MetricsVerifier:
|
||||
self.tenants: Dict[TenantId, TenantMetricsVerifier] = {}
|
||||
pass
|
||||
|
||||
def ingest(self, events, is_last):
|
||||
def ingest(self, events):
|
||||
stringified = json.dumps(events, indent=2)
|
||||
log.info(f"ingesting: {stringified}")
|
||||
for event in events:
|
||||
@@ -328,9 +181,8 @@ class MetricsVerifier:
|
||||
|
||||
self.tenants[id].ingest(event)
|
||||
|
||||
if is_last:
|
||||
for t in self.tenants.values():
|
||||
t.post_batch()
|
||||
for t in self.tenants.values():
|
||||
t.post_batch()
|
||||
|
||||
def accepted_event_names(self) -> Set[str]:
|
||||
names: Set[str] = set()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import filecmp
|
||||
import os
|
||||
import pathlib
|
||||
import random
|
||||
@@ -980,6 +981,137 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint.start()
|
||||
|
||||
|
||||
# Test that we can create timeline with one safekeeper down and initialize it
|
||||
# later when some data already had been written.
|
||||
def test_late_init(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
sk1 = env.safekeepers[0]
|
||||
sk1.stop()
|
||||
|
||||
# create and insert smth while safekeeper is down...
|
||||
env.neon_cli.create_branch("test_late_init")
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
|
||||
log.info("insert with safekeeper down done")
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
# stop another safekeeper, and start one which missed timeline creation
|
||||
sk2 = env.safekeepers[1]
|
||||
sk2.stop()
|
||||
sk1.start()
|
||||
|
||||
# insert some more
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
|
||||
# is timeline flush_lsn equal on provided safekeepers?
|
||||
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
|
||||
return (
|
||||
sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
== sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
)
|
||||
|
||||
|
||||
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
|
||||
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
|
||||
# still starts up and functions normally if two other sks are ok.
|
||||
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
|
||||
# normally if two other sks are ok.
|
||||
# 3) Lagged safekeeper can still recover by peer recovery.
|
||||
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
|
||||
pass
|
||||
|
||||
|
||||
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
|
||||
# it works without compute at all.
|
||||
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_peer_recovery")
|
||||
endpoint = env.endpoints.create_start("test_peer_recovery")
|
||||
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
|
||||
sk1 = env.safekeepers[0]
|
||||
sk1.stop()
|
||||
|
||||
# roughly fills one segment
|
||||
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
|
||||
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
# now start safekeeper, but with peer recovery disabled
|
||||
sk1.start(extra_opts=["--peer-recovery=false"])
|
||||
# it should lag for about a segment
|
||||
sk1_http_cli = sk1.http_client()
|
||||
sk2 = env.safekeepers[1]
|
||||
sk2_http_cli = sk2.http_client()
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
|
||||
)
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# wait a bit, lsns shouldn't change
|
||||
# time.sleep(5)
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
|
||||
)
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# now restart safekeeper with peer recovery enabled and wait for recovery
|
||||
sk1.stop().start()
|
||||
wait(
|
||||
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
|
||||
"flush_lsn to get aligned",
|
||||
wait_f=lambda sk1_http_cli=sk1_http_cli, sk2_http_cli=sk2_http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
|
||||
f"waiting for flush_lsn alignment, sk1.flush_lsn={sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}, sk2.flush_lsn={sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}"
|
||||
),
|
||||
)
|
||||
|
||||
# check that WALs are identic after recovery
|
||||
segs = sk1.list_segments(tenant_id, timeline_id)
|
||||
log.info(f"segs are {segs}")
|
||||
|
||||
(_, mismatch, not_regular) = filecmp.cmpfiles(
|
||||
sk1.timeline_dir(tenant_id, timeline_id),
|
||||
sk2.timeline_dir(tenant_id, timeline_id),
|
||||
segs,
|
||||
shallow=False,
|
||||
)
|
||||
log.info(
|
||||
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
|
||||
)
|
||||
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(sk1.timeline_dir(tenant_id, timeline_id), f)
|
||||
f2 = os.path.join(sk2.timeline_dir(tenant_id, timeline_id), f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
|
||||
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
|
||||
|
||||
cmd = "diff {}.hex {}.hex".format(f1, f2)
|
||||
subprocess.run([cmd], stdout=stdout_f, shell=True)
|
||||
|
||||
assert (mismatch, not_regular) == ([], [])
|
||||
|
||||
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
|
||||
env.safekeepers[2].stop()
|
||||
endpoint = env.endpoints.create_start("test_peer_recovery")
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -64,7 +64,6 @@ toml_edit = { version = "0.19", features = ["serde"] }
|
||||
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
tungstenite = { version = "0.20" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user