Compare commits

...

21 Commits

Author SHA1 Message Date
Alexander Bayandin
ae50e9600f Try to find the best parallelisation 2024-09-14 12:19:11 +01:00
Alexander Bayandin
cd613f5ab3 CI(check-codestyle-rust): use parallel and cargo hack --partition 2024-09-05 13:35:12 +01:00
Alexander Bayandin
539b3ad541 Revert "CI(check-codestyle-rust): use mold -run"
This reverts commit a0e923b70b.
2024-09-05 11:31:01 +01:00
Alexander Bayandin
79fa640058 Revert "CI(check-codestyle-rust): try to use -j$(nproc)"
This reverts commit c41f9870a5.
2024-09-05 11:30:21 +01:00
Alexander Bayandin
c41f9870a5 CI(check-codestyle-rust): try to use -j$(nproc) 2024-09-05 00:18:35 +01:00
Alexander Bayandin
a0e923b70b CI(check-codestyle-rust): use mold -run 2024-09-04 23:58:25 +01:00
Alexander Bayandin
38f6107534 CI(check-codestyle-rust): revert back arm64 & use large runners 2024-09-04 22:48:44 +01:00
Alexander Bayandin
f25e07893c CI(check-codestyle-rust): use -C debug-assertions=off instead of --release 2024-09-04 15:22:55 +01:00
Heikki Linnakangas
460d48437b Run fewer redundant rust formatting checks
We have no ARM64 or x64 specific code currently, so there's not much
need to run clippy and other rust codestyle checks on both
architectures. The check-codestyle-rust job took about 40 minutes on
arm64 in the CI, which is now avoided.

Also, skip running clippy in --release mode. It's pretty expensive to
run, and there is very little difference between debug and release
builds that could lead to different clippy warnings. The debug and
release clippy checks took about 6 minutes each, so this saves another
6 minutes of runtime on CI.

This doesn't make the overall CI runtime shorter, because other jobs
still take longer than thesee did. But the 40 minutes spent on arm64
was actually pretty close to being the slowest job, and spending less
on these tests is good anyway.
2024-09-04 14:12:07 +03:00
Joonas Koivunen
7a1397cf37 storcon: boilerplate to upsert safekeeper records on deploy (#8879)
We currently do not record safekeepers in the storage controller
database. We want to migrate timelines across safekeepers eventually, so
start recording the safekeepers on deploy.

Cc: #8698
2024-09-04 10:10:05 +00:00
Vlad Lazar
75310fe441 storcon: make hb interval an argument and speed up tests (#8880)
## Problem
Each test might wait for up to 5s in order to HB the pageserver.

## Summary of changes
Make the heartbeat interval configurable and use a really tight one for
neon local => startup quicker
2024-09-04 10:09:41 +01:00
Alex Chi Z.
ecfa3d9de9 fix(storage-scrubber): wrong trial condition (#8905)
ref https://github.com/neondatabase/neon/issues/8872

## Summary of changes

We saw stuck storage scrubber in staging caused by infinite retries. I
believe here we should use `min` instead of `max` to avoid getting
minutes or hours of retry backoff.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-03 21:39:56 +00:00
Alex Chi Z.
3d9001d83f fix(pageserver): is_archived should be optional (#8902)
Set the field to optional, otherwise there will be decode errors when
newer version of the storage controller receives the JSON from older
version of the pageservers.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-03 14:05:06 -04:00
dependabot[bot]
1a874a3e86 build(deps): bump flask-cors from 4.0.1 to 5.0.0 (#8899) 2024-09-03 17:31:42 +00:00
John Spray
c4fe6641c1 pageserver: separate metadata and data pages in DatadirModification (#8621)
## Problem

Currently, DatadirModification keeps a key-indexed map of all pending
writes, even though we (almost) never need to read back dirty pages for
anything other than metadata pages (e.g. relation sizes).

Related: https://github.com/neondatabase/neon/issues/6345

## Summary of changes

- commit() modifications before ingesting database creation wal records,
so that they are guaranteed to be able to get() everything they need
directly from the underlying Timeline.
- Split dirty pages in DatadirModification into pending_metadata_pages
and pending_data_pages. The data ones don't need to be in a
key-addressable format, so they just go in a Vec instead.
- Special case handling of zero-page writes in DatadirModification,
putting them in a map which is flushed on the end of a WAL record. This
handles the case where during ingest, we might first write a zero page,
and then ingest a postgres write to that page. We used to do this via
the key-indexed map of writes, but in this PR we change the data page
write path to not bother indexing these by key.

My least favorite thing about this PR is that I needed to change the
DatadirModification interface to add the on_record_end call. This is not
very invasive because there's really only one place we use it, but it
changes the object's behaviour from being clearly an aggregation of many
records to having some per-record state. I could avoid this by
implicitly doing the work when someone calls set_lsn or commit -- I'm
open to opinions on whether that's cleaner or dirtier.

## Performance

There may be some efficiency improvement here, but the primary
motivation is to enable an earlier stage of ingest to operate without
access to a Timeline. The `pending_data_pages` part is the "fast path"
bulk write data that can in principle be generated without a Timeline,
in parallel with other ingest batches, and ultimately on the safekeeper.

`test_bulk_insert` on AX102 shows approximately the same results as in
the previous PR #8591:

```
------------------------------ Benchmark results -------------------------------
test_bulk_insert[neon-release-pg16].insert: 23.577 s
test_bulk_insert[neon-release-pg16].pageserver_writes: 5,428 MB
test_bulk_insert[neon-release-pg16].peak_mem: 637 MB
test_bulk_insert[neon-release-pg16].size: 0 MB
test_bulk_insert[neon-release-pg16].data_uploaded: 1,922 MB
test_bulk_insert[neon-release-pg16].num_files_uploaded: 8 
test_bulk_insert[neon-release-pg16].wal_written: 1,382 MB
test_bulk_insert[neon-release-pg16].wal_recovery: 18.264 s
test_bulk_insert[neon-release-pg16].compaction: 0.052 s
```
2024-09-03 18:16:49 +01:00
Arseny Sher
c7187be8a1 safekeeper: check for non-consecutive writes in safekeeper.rs
wal_storage.rs already checks this, but since this is a quite legit scenario
check it at safekeeper.rs (consensus level) as well.

ref https://github.com/neondatabase/neon/issues/8212

This is a take 2; previous PR #8640 had been reverted because interplay
with another change broke test_last_log_term_switch.
2024-09-03 18:58:19 +03:00
Arseny Sher
83dd7f559c safekeeper: more consistent task naming.
Make all them snake case.
2024-09-03 17:21:36 +03:00
Arseny Sher
80512e2779 safekeeper: add endpoint resetting uploaded partial segment state.
Endpoint implementation sends msg to manager requesting to do the
reset. Manager stops current partial backup upload task if it exists and
performs the reset.

Also slightly tweak eviction condition: all full segments before
flush_lsn must be uploaded (and committed) and there must be only one
segment left on disk (partial). This allows to evict timelines which
started not on the first segment and didn't fill the whole
segment (previous condition wasn't good because last_removed_segno was
0).

ref https://github.com/neondatabase/neon/issues/8759
2024-09-03 17:21:36 +03:00
Arseny Sher
3916810f20 safekeeper: add remote_path to Timeline
It is used in many places, let's reduce number of ? on construction
results.
2024-09-03 17:21:36 +03:00
Vlad Lazar
c43e664ff5 storcon: provide an az id in metadata.json from neon local (#8897)
## Problem
Neon local set-up does not inject an az id in `metadata.json`. See real
change in https://github.com/neondatabase/neon/pull/8852.

## Summary of changes
We piggyback on the existing `availability_zone` pageserver
configuration in order to avoid making neon local even more complex.
2024-09-03 15:11:30 +01:00
Erik Grinaker
b37da32c6f pageserver: reuse idempotency keys across metrics sinks (#8876)
## Problem

Metrics event idempotency keys differ across S3 and Vector. The events
should be identical.

Resolves #8605.

## Summary of changes

Pre-generate the idempotency keys and pass the same set into both
metrics sinks.

Co-authored-by: John Spray <john@neon.tech>
2024-09-03 09:05:24 +01:00
41 changed files with 1151 additions and 252 deletions

View File

@@ -122,10 +122,12 @@ jobs:
check-codestyle-rust:
needs: [ check-permissions, build-build-tools-image ]
# There's no reason to expect clippy or code formatting to be different on different platforms,
# so it's enough to run these on x64 only.
strategy:
matrix:
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
@@ -166,15 +168,25 @@ jobs:
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
run: |
parallel --jobs 8 "cargo hack --feature-powerset --partition {}/8 clippy --target-dir target/partition-{} $CLIPPY_COMMON_ARGS" ::: 1 2 3 4 5 6 7 8
# instead of running the full release build, running debug build again,
# but with disabled `debug-assertions` to excersice release code paths
- name: Run cargo clippy (debug, with debug-assertions=false)
run: |
for N in 4 8 10 12 14 16 18 20; do
echo "Running clippy with debug-assertions=false for partition ${N}"
time parallel --jobs ${N} "cargo hack --feature-powerset --partition {}/${N} clippy --target-dir target/partition-{} $CLIPPY_COMMON_ARGS -C debug-assertions=off" ::: $(seq -s " " 1 ${N})
rm -rf target/partition-*
done
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
env:
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting

View File

@@ -165,6 +165,9 @@ pub struct NeonStorageControllerConf {
pub split_threshold: Option<u64>,
pub max_secondary_lag_bytes: Option<u64>,
#[serde(with = "humantime_serde")]
pub heartbeat_interval: Duration,
}
impl NeonStorageControllerConf {
@@ -172,6 +175,9 @@ impl NeonStorageControllerConf {
const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
// Very tight heartbeat interval to speed up tests
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
}
impl Default for NeonStorageControllerConf {
@@ -183,6 +189,7 @@ impl Default for NeonStorageControllerConf {
database_url: None,
split_threshold: None,
max_secondary_lag_bytes: None,
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
}
}
}

View File

@@ -181,6 +181,23 @@ impl PageServerNode {
);
io::stdout().flush()?;
// If the config file we got as a CLI argument includes the `availability_zone`
// config, then use that to populate the `metadata.json` file for the pageserver.
// In production the deployment orchestrator does this for us.
let az_id = conf
.other
.get("availability_zone")
.map(|toml| {
let az_str = toml.to_string();
// Trim the (") chars from the toml representation
if az_str.starts_with('"') && az_str.ends_with('"') {
az_str[1..az_str.len() - 1].to_string()
} else {
az_str
}
})
.unwrap_or("local".to_string());
let config = self
.pageserver_init_make_toml(conf)
.context("make pageserver toml")?;
@@ -216,6 +233,7 @@ impl PageServerNode {
let (_http_host, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -226,7 +244,10 @@ impl PageServerNode {
postgres_port: self.pg_connection_config.port(),
http_host: "localhost".to_string(),
http_port,
other: HashMap::new(),
other: HashMap::from([(
"availability_zone_id".to_string(),
serde_json::json!(az_id),
)]),
})
.unwrap(),
)

View File

@@ -437,6 +437,8 @@ impl StorageController {
&humantime::Duration::from(self.config.max_offline).to_string(),
"--max-warming-up-interval",
&humantime::Duration::from(self.config.max_warming_up).to_string(),
"--heartbeat-interval",
&humantime::Duration::from(self.config.heartbeat_interval).to_string(),
"--address-for-peers",
&address_for_peers.to_string(),
]

View File

@@ -716,12 +716,17 @@ pub struct TimelineInfo {
pub pg_version: u32,
pub state: TimelineState,
pub is_archived: bool,
pub walreceiver_status: String,
// ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
// Backward compatibility: you will get a JSON not containing the newly-added field.
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
// read.
/// The last aux file policy being used on this timeline
pub last_aux_file_policy: Option<AuxFilePolicy>,
pub is_archived: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -1,6 +1,8 @@
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
use crate::config::PageServerConf;
use crate::consumption_metrics::metrics::MetricsKey;
use crate::consumption_metrics::upload::KeyGen as _;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::size::CalculateSyntheticSizeError;
@@ -8,6 +10,7 @@ use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use itertools::Itertools as _;
use pageserver_api::models::TenantState;
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
use reqwest::Url;
@@ -19,9 +22,8 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::NodeId;
mod metrics;
use crate::consumption_metrics::metrics::MetricsKey;
mod disk_cache;
mod metrics;
mod upload;
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
@@ -143,6 +145,12 @@ async fn collect_metrics(
// these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
// Pre-generate event idempotency keys, to reuse them across the bucket
// and HTTP sinks.
let idempotency_keys = std::iter::repeat_with(|| node_id.as_str().generate())
.take(metrics.len())
.collect_vec();
let metrics = Arc::new(metrics);
// why not race cancellation here? because we are one of the last tasks, and if we are
@@ -161,8 +169,14 @@ async fn collect_metrics(
}
if let Some(bucket_client) = &bucket_client {
let res =
upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await;
let res = upload::upload_metrics_bucket(
bucket_client,
&cancel,
&node_id,
&metrics,
&idempotency_keys,
)
.await;
if let Err(e) = res {
tracing::error!("failed to upload to S3: {e:#}");
}
@@ -174,9 +188,9 @@ async fn collect_metrics(
&client,
metric_collection_endpoint,
&cancel,
&node_id,
&metrics,
&mut cached_metrics,
&idempotency_keys,
)
.await;
if let Err(e) = res {

View File

@@ -24,16 +24,16 @@ pub(super) async fn upload_metrics_http(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
node_id: &str,
metrics: &[RawMetric],
cached_metrics: &mut Cache,
idempotency_keys: &[IdempotencyKey<'_>],
) -> anyhow::Result<()> {
let mut uploaded = 0;
let mut failed = 0;
let started_at = std::time::Instant::now();
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
while let Some(res) = iter.next() {
let (chunk, body) = res?;
@@ -87,6 +87,7 @@ pub(super) async fn upload_metrics_bucket(
cancel: &CancellationToken,
node_id: &str,
metrics: &[RawMetric],
idempotency_keys: &[IdempotencyKey<'_>],
) -> anyhow::Result<()> {
if metrics.is_empty() {
// Skip uploads if we have no metrics, so that readers don't have to handle the edge case
@@ -106,7 +107,7 @@ pub(super) async fn upload_metrics_bucket(
// Serialize and write into compressed buffer
let started_at = std::time::Instant::now();
for res in serialize_in_chunks(CHUNK_SIZE, metrics, node_id) {
for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
let (_chunk, body) = res?;
gzip_writer.write_all(&body).await?;
}
@@ -134,29 +135,31 @@ pub(super) async fn upload_metrics_bucket(
Ok(())
}
// The return type is quite ugly, but we gain testability in isolation
fn serialize_in_chunks<'a, F>(
/// Serializes the input metrics as JSON in chunks of chunk_size. The provided
/// idempotency keys are injected into the corresponding metric events (reused
/// across different metrics sinks), and must have the same length as input.
fn serialize_in_chunks<'a>(
chunk_size: usize,
input: &'a [RawMetric],
factory: F,
idempotency_keys: &'a [IdempotencyKey<'a>],
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
where
F: KeyGen<'a> + 'a,
{
use bytes::BufMut;
struct Iter<'a, F> {
assert_eq!(input.len(), idempotency_keys.len());
struct Iter<'a> {
inner: std::slice::Chunks<'a, RawMetric>,
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
chunk_size: usize,
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
buffer: bytes::BytesMut,
// chunk amount of events are reused to produce the serialized document
scratch: Vec<Event<Ids, Name>>,
factory: F,
}
impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
impl<'a> Iterator for Iter<'a> {
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
@@ -167,17 +170,14 @@ where
self.scratch.extend(
chunk
.iter()
.map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
.zip(&mut self.idempotency_keys)
.map(|(raw_metric, key)| raw_metric.as_event(key)),
);
} else {
// next rounds: update_in_place to reuse allocations
assert_eq!(self.scratch.len(), self.chunk_size);
self.scratch
.iter_mut()
.zip(chunk.iter())
.for_each(|(slot, raw_metric)| {
raw_metric.update_in_place(slot, &self.factory.generate())
});
itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
.for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
}
let res = serde_json::to_writer(
@@ -198,18 +198,19 @@ where
}
}
impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
impl<'a> ExactSizeIterator for Iter<'a> {}
let buffer = bytes::BytesMut::new();
let inner = input.chunks(chunk_size);
let idempotency_keys = idempotency_keys.iter();
let scratch = Vec::new();
Iter {
inner,
idempotency_keys,
chunk_size,
buffer,
scratch,
factory,
}
}
@@ -268,7 +269,7 @@ impl RawMetricExt for RawMetric {
}
}
trait KeyGen<'a>: Copy {
pub(crate) trait KeyGen<'a> {
fn generate(&self) -> IdempotencyKey<'a>;
}
@@ -389,7 +390,10 @@ mod tests {
let examples = metric_samples();
assert!(examples.len() > 1);
let factory = FixedGen::new(Utc::now(), "1", 42);
let now = Utc::now();
let idempotency_keys = (0..examples.len())
.map(|i| FixedGen::new(now, "1", i as u16).generate())
.collect::<Vec<_>>();
// need to use Event here because serde_json::Value uses default hashmap, not linked
// hashmap
@@ -398,13 +402,13 @@ mod tests {
events: Vec<Event<Ids, Name>>,
}
let correct = serialize_in_chunks(examples.len(), &examples, factory)
let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
.map(|res| res.unwrap().1)
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
.collect::<Vec<_>>();
for chunk_size in 1..examples.len() {
let actual = serialize_in_chunks(chunk_size, &examples, factory)
let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
.map(|res| res.unwrap().1)
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
.collect::<Vec<_>>();

View File

@@ -468,7 +468,7 @@ async fn build_timeline_info_common(
pg_version: timeline.pg_version,
state,
is_archived,
is_archived: Some(is_archived),
walreceiver_status,

View File

@@ -19,6 +19,7 @@ use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::decode_wal_record;
use crate::walrecord::DecodedWALRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
@@ -310,11 +311,13 @@ async fn import_wal(
let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.ingest_record(decoded, lsn, &mut modification, ctx)
.await?;
WAL_INGEST.records_committed.inc();
@@ -449,11 +452,12 @@ pub async fn import_wal_from_tar(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.ingest_record(decoded, lsn, &mut modification, ctx)
.await?;
modification.commit(ctx).await?;
last_lsn = lsn;

View File

@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{bail, ensure, Context};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
@@ -168,7 +168,9 @@ impl Timeline {
DatadirModification {
tline: self,
pending_lsns: Vec::new(),
pending_updates: HashMap::new(),
pending_metadata_pages: HashMap::new(),
pending_data_pages: Vec::new(),
pending_zero_data_pages: Default::default(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
@@ -1031,10 +1033,24 @@ pub struct DatadirModification<'a> {
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_lsns: Vec<Lsn>,
pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
/// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
/// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
/// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
/// which keys are stored here.
pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
// Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However,
// if we encounter a write from postgres in the same wal record, we will drop this entry.
//
// Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
// at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
pending_zero_data_pages: HashSet<CompactKey>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
@@ -1058,6 +1074,10 @@ impl<'a> DatadirModification<'a> {
self.pending_bytes
}
pub(crate) fn has_dirty_data_pages(&self) -> bool {
(!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
@@ -1066,6 +1086,10 @@ impl<'a> DatadirModification<'a> {
lsn,
self.lsn
);
// If we are advancing LSN, then state from previous wal record should have been flushed.
assert!(self.pending_zero_data_pages.is_empty());
if lsn > self.lsn {
self.pending_lsns.push(self.lsn);
self.lsn = lsn;
@@ -1073,6 +1097,17 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
/// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
/// keys that represent literal blocks that postgres can read. So data includes relation blocks and
/// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
///
/// The distinction is important because data keys are handled on a fast path where dirty writes are
/// not readable until this modification is committed, whereas metadata keys are visible for read
/// via [`Self::get`] as soon as their record has been ingested.
fn is_data_key(key: &Key) -> bool {
key.is_rel_block_key() || key.is_slru_block_key()
}
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -1180,6 +1215,31 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
self.pending_zero_data_pages
.insert(rel_block_to_key(rel, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
}
pub(crate) fn put_slru_page_image_zero(
&mut self,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) {
self.pending_zero_data_pages
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
}
/// Call this at the end of each WAL record.
pub(crate) fn on_record_end(&mut self) {
let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
for key in pending_zero_data_pages {
self.put_data(key, Value::Image(ZERO_PAGE.clone()));
}
}
/// Store a relmapper file (pg_filenode.map) in the repository
pub async fn put_relmap_file(
&mut self,
@@ -1778,7 +1838,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1789,31 +1849,11 @@ impl<'a> DatadirModification<'a> {
let mut writer = self.tline.writer().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
for (key, values) in self.pending_updates.drain() {
if !key.is_valid_key_on_write_path() {
bail!(
"the request contains data not supported by pageserver at TimelineWriter::put: {}", key
);
}
let mut write_batch = Vec::new();
for (lsn, value_ser_size, value) in values {
if key.is_rel_block_key() || key.is_slru_block_key() {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
write_batch.push((key.to_compact(), lsn, value_ser_size, value));
} else {
retained_pending_updates.entry(key).or_default().push((
lsn,
value_ser_size,
value,
));
}
}
writer.put_batch(write_batch, ctx).await?;
}
let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
self.pending_updates = retained_pending_updates;
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put_batch(pending_data_pages, ctx).await?;
self.pending_bytes = 0;
if pending_nblocks != 0 {
@@ -1834,29 +1874,31 @@ impl<'a> DatadirModification<'a> {
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Commit should never be called mid-wal-record
assert!(self.pending_zero_data_pages.is_empty());
let mut writer = self.tline.writer().await;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
if !self.pending_updates.is_empty() {
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
.pending_updates
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let mut write_batch = std::mem::take(&mut self.pending_data_pages);
write_batch.extend(
self.pending_metadata_pages
.drain()
.flat_map(|(key, values)| {
values.into_iter().map(move |(lsn, val_ser_size, value)| {
if !key.is_valid_key_on_write_path() {
bail!("the request contains data not supported by pageserver at TimelineWriter::put: {}", key);
}
Ok((key.to_compact(), lsn, val_ser_size, value))
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
values
.into_iter()
.map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
}),
);
writer.put_batch(batch, ctx).await?;
if !write_batch.is_empty() {
writer.put_batch(write_batch, ctx).await?;
}
if !self.pending_deletions.is_empty() {
@@ -1887,33 +1929,58 @@ impl<'a> DatadirModification<'a> {
}
pub(crate) fn len(&self) -> usize {
self.pending_updates.len() + self.pending_deletions.len()
self.pending_metadata_pages.len()
+ self.pending_data_pages.len()
+ self.pending_deletions.len()
}
// Internal helper functions to batch the modifications
/// Read a page from the Timeline we are writing to. For metadata pages, this passes through
/// a cache in Self, which makes writes earlier in this modification visible to WAL records later
/// in the modification.
///
/// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
/// page must ensure that the pages they read are already committed in Timeline, for example
/// DB create operations are always preceded by a call to commit(). This is special cased because
/// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
/// and not data pages.
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
if !Self::is_data_key(&key) {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
}
}
} else {
// This is an expensive check, so we only do it in debug mode. If reading a data key,
// this key should never be present in pending_data_pages. We ensure this by committing
// modifications before ingesting DB create operations, which are the only kind that reads
// data pages during ingest.
if cfg!(debug_assertions) {
for (dirty_key, _, _, _) in &self.pending_data_pages {
debug_assert!(&key.to_compact() != dirty_key);
}
debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
}
}
// Metadata page cache miss, or we're reading a data page.
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}
@@ -1925,11 +1992,40 @@ impl<'a> DatadirModification<'a> {
}
fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
} else {
self.put_metadata(key.to_compact(), val)
}
}
fn put_data(&mut self, key: CompactKey, val: Value) {
let val_serialized_size = val.serialized_size().unwrap() as usize;
// If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This
// is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
// and the subsequent postgres-originating write
if self.pending_zero_data_pages.remove(&key) {
self.pending_bytes -= ZERO_PAGE.len();
}
self.pending_bytes += val_serialized_size;
self.pending_data_pages
.push((key, self.lsn, val_serialized_size, val))
}
fn put_metadata(&mut self, key: CompactKey, val: Value) {
let values = self.pending_metadata_pages.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
// Update the pending_bytes contribution from this entry, and update the serialized size in place
self.pending_bytes -= *last_value_ser_size;
*last_value_ser_size = val.serialized_size().unwrap() as usize;
self.pending_bytes += *last_value_ser_size;
// Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
// have been generated by synthesized zero page writes prior to the first real write to a page.
*last_value = val;
return;
}

View File

@@ -692,8 +692,13 @@ impl InMemoryLayer {
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
}
}

View File

@@ -31,7 +31,7 @@ use crate::{
task_mgr::{TaskKind, WALRECEIVER_RUNTIME},
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
walingest::WalIngest,
walrecord::DecodedWALRecord,
walrecord::{decode_wal_record, DecodedWALRecord},
};
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
@@ -312,10 +312,25 @@ pub(super) async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(startlsn);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())
}
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
@@ -324,9 +339,28 @@ pub(super) async fn handle_walreceiver_connection(
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
// Deserialize WAL record
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?;
if decoded.is_dbase_create_copy(timeline.pg_version)
&& uncommitted_records > 0
{
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
// these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
// all earlier writes of data blocks are visible by committing any modification in flight.
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
// Ingest the records without immediately committing them.
let ingested = walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.ingest_record(decoded, lsn, &mut modification, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
if !ingested {
@@ -349,21 +383,25 @@ pub(super) async fn handle_walreceiver_connection(
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
}
// Commit the remaining records.
if uncommitted_records > 0 {
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
}

View File

@@ -57,6 +57,7 @@ use utils::lsn::Lsn;
pub struct WalIngest {
shard: ShardIdentity,
pg_version: u32,
checkpoint: CheckPoint,
checkpoint_modified: bool,
warn_ingest_lag: WarnIngestLag,
@@ -82,6 +83,7 @@ impl WalIngest {
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
pg_version: timeline.pg_version,
checkpoint,
checkpoint_modified: false,
warn_ingest_lag: WarnIngestLag {
@@ -104,10 +106,9 @@ impl WalIngest {
///
pub async fn ingest_record(
&mut self,
recdata: Bytes,
decoded: DecodedWALRecord,
lsn: Lsn,
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
WAL_INGEST.records_received.inc();
@@ -115,7 +116,12 @@ impl WalIngest {
let prev_len = modification.len();
modification.set_lsn(lsn)?;
decode_wal_record(recdata, decoded, pg_version)?;
if decoded.is_dbase_create_copy(self.pg_version) {
// Records of this type should always be preceded by a commit(), as they
// rely on reading data pages back from the Timeline.
assert!(!modification.has_dirty_data_pages());
}
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -133,11 +139,11 @@ impl WalIngest {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages
// without registering them with the standard mechanism.
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
.await?;
}
pg_constants::RM_NEON_ID => {
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
.await?;
}
// Handle other special record types
@@ -325,7 +331,7 @@ impl WalIngest {
}
pg_constants::RM_RELMAP_ID => {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
.await?;
}
pg_constants::RM_XLOG_ID => {
@@ -470,7 +476,7 @@ impl WalIngest {
continue;
}
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
.await?;
}
@@ -486,6 +492,8 @@ impl WalIngest {
// until commit() is called to flush the data into the repository and update
// the latest LSN.
modification.on_record_end();
Ok(modification.len() > prev_len)
}
@@ -557,6 +565,7 @@ impl WalIngest {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
.await?;
} else {
@@ -1195,7 +1204,7 @@ impl WalIngest {
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1217,7 +1226,7 @@ impl WalIngest {
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
modification.put_rel_page_image_zero(rel, vm_page_no);
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1687,7 +1696,7 @@ impl WalIngest {
continue;
}
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
modification.put_rel_page_image_zero(rel, gap_blknum);
}
}
Ok(())
@@ -1753,7 +1762,7 @@ impl WalIngest {
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
}
}
Ok(())
@@ -1827,21 +1836,25 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1983,6 +1996,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
assert_eq!(
tline
@@ -2008,6 +2022,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
assert_eq!(
tline
@@ -2409,7 +2424,6 @@ mod tests {
.await
.unwrap();
let mut modification = tline.begin_modification(startpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal. We process the wal in chunks because
@@ -2417,8 +2431,10 @@ mod tests {
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.ingest_record(decoded, lsn, &mut modification, &ctx)
.instrument(span.clone())
.await
.unwrap();

View File

@@ -160,6 +160,30 @@ pub struct DecodedWALRecord {
pub origin_id: u16,
}
impl DecodedWALRecord {
/// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
/// by reading other existing relations' data blocks. This is more complex to apply than new-style database
/// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
if self.xl_rmid == pg_constants::RM_DBASE_ID {
let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
match pg_version {
14 => {
// Postgres 14 database creations are always the legacy kind
info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
}
15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
_ => {
panic!("Unsupported postgres version {pg_version}")
}
}
} else {
false
}
}
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct RelFileNode {

6
poetry.lock generated
View File

@@ -1110,13 +1110,13 @@ dotenv = ["python-dotenv"]
[[package]]
name = "flask-cors"
version = "4.0.1"
version = "5.0.0"
description = "A Flask extension adding a decorator for CORS support"
optional = false
python-versions = "*"
files = [
{file = "Flask_Cors-4.0.1-py2.py3-none-any.whl", hash = "sha256:f2a704e4458665580c074b714c4627dd5a306b333deb9074d0b1794dfa2fb677"},
{file = "flask_cors-4.0.1.tar.gz", hash = "sha256:eeb69b342142fdbf4766ad99357a7f3876a2ceb77689dc10ff912aac06c389e4"},
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
]
[package.dependencies]

View File

@@ -86,7 +86,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
}
/// Subscribe and fetch all the interesting data from the broker.
#[instrument(name = "broker pull", skip_all)]
#[instrument(name = "broker_pull", skip_all)]
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;

View File

@@ -389,6 +389,25 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, response)
}
/// Unevict timeline and remove uploaded partial segment(s) from the remote storage.
/// Successfull response returns list of segments existed before the deletion.
/// Aimed for one-off usage not normally needed.
async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.backup_partial_reset()
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -607,6 +626,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
request_span(r, timeline_digest_handler)
})
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|r| request_span(r, timeline_backup_partial_reset),
)
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})

View File

@@ -183,10 +183,10 @@ impl WalResidentTimeline {
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
);
let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
let remote_timeline_path = &self.tli.remote_path;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(&remote_timeline_path),
&replace.current.remote_path(&remote_timeline_path),
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
.await?;
}

View File

@@ -35,7 +35,7 @@ use crate::{
/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
#[instrument(name = "recovery", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: WalResidentTimeline, conf: SafeKeeperConf) {
info!("started");

View File

@@ -875,6 +875,29 @@ where
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// Disallow any non-sequential writes, which can result in gaps or
// overwrites. If we need to move the pointer, ProposerElected message
// should have truncated WAL first accordingly. Note that the first
// condition (WAL rewrite) is quite expected in real world; it happens
// when walproposer reconnects to safekeeper and writes some more data
// while first connection still gets some packets later. It might be
// better to not log this as error! above.
let write_lsn = self.wal_store.write_lsn();
if write_lsn > msg.h.begin_lsn {
bail!(
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
write_lsn,
msg.h.begin_lsn
);
}
if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
bail!(
"append request creates gap in written WAL, write_lsn={}, msg lsn={}",
write_lsn,
msg.h.begin_lsn,
);
}
// Now we know that we are in the same term as the proposer,
// processing the message.
@@ -960,10 +983,7 @@ mod tests {
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
use super::*;
use crate::{
state::{EvictionState, PersistedPeers, TimelinePersistentState},
wal_storage::Storage,
};
use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
use std::{ops::Deref, str::FromStr, time::Instant};
// fake storage for tests
@@ -1003,6 +1023,10 @@ mod tests {
}
impl wal_storage::Storage for DummyWalStore {
fn write_lsn(&self) -> Lsn {
self.lsn
}
fn flush_lsn(&self) -> Lsn {
self.lsn
}
@@ -1076,7 +1100,7 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
term: 1,
term: 2,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
@@ -1090,24 +1114,29 @@ mod tests {
};
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(3),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(3),
}]),
timeline_start_lsn: Lsn(0),
term: 2,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![
TermLsn {
term: 1,
lsn: Lsn(1),
},
TermLsn {
term: 2,
lsn: Lsn(3),
},
]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
// check that AppendRequest before term_start_lsn doesn't switch last_log_term.
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok());
assert_eq!(sk.get_last_log_term(), 0);
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
assert_eq!(sk.get_last_log_term(), 1);
// but record at term_start_lsn does the switch
ar_hdr.begin_lsn = Lsn(2);
@@ -1116,12 +1145,63 @@ mod tests {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok());
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_last_log_term(), 1);
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
assert_eq!(sk.get_last_log_term(), 2);
}
#[tokio::test]
async fn test_non_consecutive_write() {
let storage = InMemoryState {
persisted_state: test_sk_state(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(1),
}]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
let ar_hdr = AppendRequestHeader {
term: 1,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),
wal_data: Bytes::from_static(b"b"),
};
// do write ending at 2, it should be ok
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
let mut ar_hrd2 = ar_hdr.clone();
ar_hrd2.begin_lsn = Lsn(4);
ar_hrd2.end_lsn = Lsn(5);
let append_request = AppendRequest {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
// and now starting at 4, it must fail
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap_err();
}
#[test]

View File

@@ -3,6 +3,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
@@ -36,7 +37,7 @@ use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, Tim
use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
@@ -469,6 +470,7 @@ impl From<TimelineError> for ApiError {
/// It also holds SharedState and provides mutually exclusive access to it.
pub struct Timeline {
pub ttid: TenantTimelineId,
pub remote_path: RemotePath,
/// Used to broadcast commit_lsn updates to all background jobs.
commit_lsn_watch_tx: watch::Sender<Lsn>,
@@ -519,8 +521,10 @@ impl Timeline {
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let walreceivers = WalReceivers::new();
let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline {
ttid,
remote_path,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
@@ -557,8 +561,10 @@ impl Timeline {
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline {
ttid,
remote_path,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
@@ -902,6 +908,10 @@ impl Timeline {
Ok(WalResidentTimeline::new(self.clone(), guard))
}
pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
self.manager_ctl.backup_partial_reset().await
}
}
/// This is a guard that allows to read/write disk timeline state.

View File

@@ -28,28 +28,38 @@ impl Manager {
/// - control file is flushed (no next event scheduled)
/// - no WAL residence guards
/// - no pushes to the broker
/// - partial WAL backup is uploaded
/// - last partial WAL segment is uploaded
/// - all local segments before the uploaded partial are committed and uploaded
pub(crate) fn ready_for_eviction(
&self,
next_event: &Option<tokio::time::Instant>,
state: &StateSnapshot,
) -> bool {
self.backup_task.is_none()
let ready = self.backup_task.is_none()
&& self.recovery_task.is_none()
&& self.wal_removal_task.is_none()
&& self.partial_backup_task.is_none()
&& self.partial_backup_uploaded.is_some()
&& next_event.is_none()
&& self.access_service.is_empty()
&& !self.tli_broker_active.get()
// Partial segment of current flush_lsn is uploaded up to this flush_lsn.
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
// And it is the next one after the last removed. Given that local
// WAL is removed only after it is uploaded to s3 (and pageserver
// advancing remote_consistent_lsn) which happens only after WAL is
// committed, true means all this is done.
//
// This also works for the first segment despite last_removed_segno
// being 0 on init because this 0 triggers run of wal_removal_task
// on success of which manager updates the horizon.
&& self
.partial_backup_uploaded
.as_ref()
.unwrap()
.flush_lsn
.segment_number(self.wal_seg_size)
== self.last_removed_segno + 1
== self.last_removed_segno + 1;
ready
}
/// Evict the timeline to remote storage.
@@ -83,7 +93,8 @@ impl Manager {
info!("successfully evicted timeline");
}
/// Restore evicted timeline from remote storage.
/// Attempt to restore evicted timeline from remote storage; it must be
/// offloaded.
#[instrument(name = "unevict_timeline", skip_all)]
pub(crate) async fn unevict_timeline(&mut self) {
assert!(self.is_offloaded);
@@ -167,7 +178,7 @@ async fn redownload_partial_segment(
partial: &PartialRemoteSegment,
) -> anyhow::Result<()> {
let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
let remote_segfile = remote_segment_path(mgr, partial)?;
let remote_segfile = remote_segment_path(mgr, partial);
debug!(
"redownloading partial segment: {} -> {}",
@@ -252,7 +263,7 @@ async fn do_validation(
);
}
let remote_segfile = remote_segment_path(mgr, partial)?;
let remote_segfile = remote_segment_path(mgr, partial);
let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
wal_backup::read_object(&remote_segfile, 0).await?;
@@ -279,12 +290,8 @@ fn local_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> Utf8Path
local_partial_segfile
}
fn remote_segment_path(
mgr: &Manager,
partial: &PartialRemoteSegment,
) -> anyhow::Result<RemotePath> {
let remote_timeline_path = wal_backup::remote_timeline_path(&mgr.tli.ttid)?;
Ok(partial.remote_path(&remote_timeline_path))
fn remote_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> RemotePath {
partial.remote_path(&mgr.tli.remote_path)
}
/// Compare first `n` bytes of two readers. If the bytes differ, return an error.

View File

@@ -11,12 +11,14 @@ use std::{
time::Duration,
};
use futures::channel::oneshot;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use tokio::{
task::{JoinError, JoinHandle},
time::Instant,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, instrument, warn, Instrument};
use utils::lsn::Lsn;
@@ -33,7 +35,7 @@ use crate::{
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
wal_backup_partial::{self, PartialRemoteSegment},
wal_backup_partial::{self, PartialBackup, PartialRemoteSegment},
SafeKeeperConf,
};
@@ -96,6 +98,8 @@ pub enum ManagerCtlMessage {
GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
/// Request to drop the guard.
GuardDrop(GuardId),
/// Request to reset uploaded partial backup state.
BackupPartialReset(oneshot::Sender<anyhow::Result<Vec<String>>>),
}
impl std::fmt::Debug for ManagerCtlMessage {
@@ -103,6 +107,7 @@ impl std::fmt::Debug for ManagerCtlMessage {
match self {
ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
}
}
}
@@ -143,6 +148,19 @@ impl ManagerCtl {
.and_then(std::convert::identity)
}
/// Request timeline manager to reset uploaded partial segment state and
/// wait for the result.
pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
let (tx, rx) = oneshot::channel();
self.manager_tx
.send(ManagerCtlMessage::BackupPartialReset(tx))
.expect("manager task is not running");
match rx.await {
Ok(res) => res,
Err(_) => anyhow::bail!("timeline manager is gone"),
}
}
/// Must be called exactly once to bootstrap the manager.
pub fn bootstrap_manager(
&self,
@@ -181,7 +199,8 @@ pub(crate) struct Manager {
pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
// partial backup
pub(crate) partial_backup_task: Option<JoinHandle<Option<PartialRemoteSegment>>>,
pub(crate) partial_backup_task:
Option<(JoinHandle<Option<PartialRemoteSegment>>, CancellationToken)>,
pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
// misc
@@ -302,12 +321,12 @@ pub async fn main_task(
_ = sleep_until(&next_event) => {
// we were waiting for some event (e.g. cfile save)
}
res = await_task_finish(&mut mgr.wal_removal_task) => {
res = await_task_finish(mgr.wal_removal_task.as_mut()) => {
// WAL removal task finished
mgr.wal_removal_task = None;
mgr.update_wal_removal_end(res);
}
res = await_task_finish(&mut mgr.partial_backup_task) => {
res = await_task_finish(mgr.partial_backup_task.as_mut().map(|(handle, _)| handle)) => {
// partial backup task finished
mgr.partial_backup_task = None;
mgr.update_partial_backup_end(res);
@@ -335,8 +354,9 @@ pub async fn main_task(
}
}
if let Some(partial_backup_task) = &mut mgr.partial_backup_task {
if let Err(e) = partial_backup_task.await {
if let Some((handle, cancel)) = &mut mgr.partial_backup_task {
cancel.cancel();
if let Err(e) = handle.await {
warn!("partial backup task failed: {:?}", e);
}
}
@@ -560,11 +580,14 @@ impl Manager {
}
// Get WalResidentTimeline and start partial backup task.
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
let cancel = CancellationToken::new();
let handle = tokio::spawn(wal_backup_partial::main_task(
self.wal_resident_timeline(),
self.conf.clone(),
self.global_rate_limiter.clone(),
)));
cancel.clone(),
));
self.partial_backup_task = Some((handle, cancel));
}
/// Update the state after partial WAL backup task finished.
@@ -579,6 +602,39 @@ impl Manager {
}
}
/// Reset partial backup state and remove its remote storage data. Since it
/// might concurrently uploading something, cancel the task first.
async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
info!("resetting partial backup state");
// Force unevict timeline if it is evicted before erasing partial backup
// state. The intended use of this function is to drop corrupted remote
// state; we haven't enabled local files deletion yet anywhere,
// so direct switch is safe.
if self.is_offloaded {
self.tli.switch_to_present().await?;
// switch manager state as soon as possible
self.is_offloaded = false;
}
if let Some((handle, cancel)) = &mut self.partial_backup_task {
cancel.cancel();
info!("cancelled partial backup task, awaiting it");
// we're going to reset .partial_backup_uploaded to None anyway, so ignore the result
handle.await.ok();
self.partial_backup_task = None;
}
let tli = self.wal_resident_timeline();
let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await;
// Reset might fail e.g. when cfile is already reset but s3 removal
// failed, so set manager state to None beforehand. In any case caller
// is expected to retry until success.
self.partial_backup_uploaded = None;
let res = partial_backup.reset().await?;
info!("reset is done");
Ok(res)
}
/// Handle message arrived from ManagerCtl.
async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
debug!("received manager message: {:?}", msg);
@@ -602,6 +658,16 @@ impl Manager {
Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
self.access_service.drop_guard(guard_id);
}
Some(ManagerCtlMessage::BackupPartialReset(tx)) => {
info!("resetting uploaded partial backup state");
let res = self.backup_partial_reset().await;
if let Err(ref e) = res {
warn!("failed to reset partial backup state: {:?}", e);
}
if tx.send(res).is_err() {
warn!("failed to send partial backup reset result, receiver dropped");
}
}
None => {
// can't happen, we're holding the sender
unreachable!();
@@ -619,7 +685,11 @@ async fn sleep_until(option: &Option<tokio::time::Instant>) {
}
}
async fn await_task_finish<T>(option: &mut Option<JoinHandle<T>>) -> Result<T, JoinError> {
/// Future that resolves when the task is finished or never if the task is None.
///
/// Note: it accepts Option<&mut> instead of &mut Option<> because mapping the
/// option to get the latter is hard.
async fn await_task_finish<T>(option: Option<&mut JoinHandle<T>>) -> Result<T, JoinError> {
if let Some(task) = option {
task.await
} else {

View File

@@ -203,7 +203,7 @@ struct WalBackupTask {
}
/// Offload single timeline.
#[instrument(name = "WAL backup", skip_all, fields(ttid = %tli.ttid))]
#[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
async fn backup_task_main(
tli: WalResidentTimeline,
parallel_jobs: usize,
@@ -315,7 +315,7 @@ async fn backup_lsn_range(
anyhow::bail!("parallel_jobs must be >= 1");
}
let remote_timeline_path = remote_timeline_path(&timeline.ttid)?;
let remote_timeline_path = &timeline.remote_path;
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
@@ -328,11 +328,7 @@ async fn backup_lsn_range(
loop {
let added_task = match iter.next() {
Some(s) => {
uploads.push_back(backup_single_segment(
s,
timeline_dir,
&remote_timeline_path,
));
uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path));
true
}
None => false,

View File

@@ -22,6 +22,7 @@ use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use utils::{id::NodeId, lsn::Lsn};
@@ -31,7 +32,7 @@ use crate::{
safekeeper::Term,
timeline::WalResidentTimeline,
timeline_manager::StateSnapshot,
wal_backup::{self, remote_timeline_path},
wal_backup::{self},
SafeKeeperConf,
};
@@ -145,7 +146,7 @@ impl State {
}
}
struct PartialBackup {
pub struct PartialBackup {
wal_seg_size: usize,
tli: WalResidentTimeline,
conf: SafeKeeperConf,
@@ -155,8 +156,25 @@ struct PartialBackup {
state: State,
}
// Read-only methods for getting segment names
impl PartialBackup {
pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup {
let (_, persistent_state) = tli.get_state().await;
let wal_seg_size = tli.get_wal_seg_size().await;
let local_prefix = tli.get_timeline_dir();
let remote_timeline_path = tli.remote_path.clone();
PartialBackup {
wal_seg_size,
tli,
state: persistent_state.partial_backup,
conf,
local_prefix,
remote_timeline_path,
}
}
// Read-only methods for getting segment names
fn segno(&self, lsn: Lsn) -> XLogSegNo {
lsn.segment_number(self.wal_seg_size)
}
@@ -297,6 +315,18 @@ impl PartialBackup {
Ok(())
}
// Prepend to the given segments remote prefix and delete them from the
// remote storage.
async fn delete_segments(&self, segments_to_delete: &Vec<String>) -> anyhow::Result<()> {
info!("deleting objects: {:?}", segments_to_delete);
let mut objects_to_delete = vec![];
for seg in segments_to_delete.iter() {
let remote_path = self.remote_timeline_path.join(seg);
objects_to_delete.push(remote_path);
}
wal_backup::delete_objects(&objects_to_delete).await
}
/// Delete all non-Uploaded segments from the remote storage. There should be only one
/// Uploaded segment at a time.
#[instrument(name = "gc", skip_all)]
@@ -329,15 +359,8 @@ impl PartialBackup {
);
}
info!("deleting objects: {:?}", segments_to_delete);
let mut objects_to_delete = vec![];
for seg in segments_to_delete.iter() {
let remote_path = self.remote_timeline_path.join(seg);
objects_to_delete.push(remote_path);
}
// removing segments from remote storage
wal_backup::delete_objects(&objects_to_delete).await?;
// execute the deletion
self.delete_segments(&segments_to_delete).await?;
// now we can update the state on disk
let new_state = {
@@ -349,6 +372,27 @@ impl PartialBackup {
Ok(())
}
/// Remove uploaded segment(s) from the state and remote storage. Aimed for
/// manual intervention, not normally needed.
/// Returns list of segments which potentially existed in the remote storage.
pub async fn reset(&mut self) -> anyhow::Result<Vec<String>> {
let segments_to_delete = self
.state
.segments
.iter()
.map(|seg| seg.name.clone())
.collect();
// First reset cfile state, and only then objects themselves. If the
// later fails we might leave some garbage behind; that's ok for this
// single time usage.
let new_state = State { segments: vec![] };
self.commit_state(new_state).await?;
self.delete_segments(&segments_to_delete).await?;
Ok(segments_to_delete)
}
}
/// Check if everything is uploaded and partial backup task doesn't need to run.
@@ -372,38 +416,21 @@ pub(crate) fn needs_uploading(
///
/// When there is nothing more to do and the last segment was successfully uploaded, the task
/// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
#[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
#[instrument(name = "partial_backup", skip_all, fields(ttid = %tli.ttid))]
pub async fn main_task(
tli: WalResidentTimeline,
conf: SafeKeeperConf,
limiter: RateLimiter,
cancel: CancellationToken,
) -> Option<PartialRemoteSegment> {
debug!("started");
let await_duration = conf.partial_backup_timeout;
let mut first_iteration = true;
let (_, persistent_state) = tli.get_state().await;
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
let wal_seg_size = tli.get_wal_seg_size().await;
let local_prefix = tli.get_timeline_dir();
let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
Ok(path) => path,
Err(e) => {
error!("failed to create remote path: {:?}", e);
return None;
}
};
let mut backup = PartialBackup {
wal_seg_size,
tli,
state: persistent_state.partial_backup,
conf,
local_prefix,
remote_timeline_path,
};
let mut backup = PartialBackup::new(tli, conf).await;
debug!("state: {:?}", backup.state);
@@ -433,6 +460,10 @@ pub async fn main_task(
&& flush_lsn_rx.borrow().term == seg.term
{
// we have nothing to do, the last segment is already uploaded
debug!(
"exiting, uploaded up to term={} flush_lsn={} commit_lsn={}",
seg.term, seg.flush_lsn, seg.commit_lsn
);
return Some(seg.clone());
}
}
@@ -444,6 +475,10 @@ pub async fn main_task(
info!("timeline canceled");
return None;
}
_ = cancel.cancelled() => {
info!("task canceled");
return None;
}
_ = flush_lsn_rx.changed() => {}
}
}
@@ -470,6 +505,10 @@ pub async fn main_task(
info!("timeline canceled");
return None;
}
_ = cancel.cancelled() => {
info!("task canceled");
return None;
}
_ = commit_lsn_rx.changed() => {}
_ = flush_lsn_rx.changed() => {
let segno = backup.segno(flush_lsn_rx.borrow().lsn);
@@ -492,7 +531,13 @@ pub async fn main_task(
}
// limit concurrent uploads
let _upload_permit = limiter.acquire_partial_backup().await;
let _upload_permit = tokio::select! {
acq = limiter.acquire_partial_backup() => acq,
_ = cancel.cancelled() => {
info!("task canceled");
return None;
}
};
let prepared = backup.prepare_upload().await;
if let Some(seg) = &uploaded_segment {

View File

@@ -37,6 +37,8 @@ use pq_proto::SystemId;
use utils::{id::TenantTimelineId, lsn::Lsn};
pub trait Storage {
// Last written LSN.
fn write_lsn(&self) -> Lsn;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
@@ -329,6 +331,10 @@ impl PhysicalStorage {
}
impl Storage for PhysicalStorage {
// Last written LSN.
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn

View File

@@ -175,6 +175,10 @@ impl DiskWALStorage {
}
impl wal_storage::Storage for DiskWALStorage {
// Last written LSN.
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn

View File

@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE safekeepers;

View File

@@ -0,0 +1,15 @@
-- started out as a copy of cplane schema, removed the unnecessary columns.
CREATE TABLE safekeepers (
-- the surrogate identifier defined by control plane database sequence
id BIGINT PRIMARY KEY,
region_id TEXT NOT NULL,
version BIGINT NOT NULL,
-- the natural id on whatever cloud platform, not needed in storage controller
-- instance_id TEXT UNIQUE NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL,
active BOOLEAN NOT NULL DEFAULT false,
-- projects_count INTEGER NOT NULL DEFAULT 0,
http_port INTEGER NOT NULL,
availability_zone_id TEXT NOT NULL
);

View File

@@ -2,6 +2,7 @@ use crate::metrics::{
HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
METRICS_REGISTRY,
};
use crate::persistence::SafekeeperPersistence;
use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
@@ -767,6 +768,55 @@ impl From<ReconcileError> for ApiError {
}
}
/// Return the safekeeper record by instance id, or 404.
///
/// Not used by anything except manual testing.
async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let id = parse_request_param::<i64>(&req, "id")?;
let state = get_state(&req);
let res = state.service.get_safekeeper(id).await;
match res {
Ok(b) => json_response(StatusCode::OK, b),
Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
Err(ApiError::NotFound("unknown instance_id".into()))
}
Err(other) => Err(other.into()),
}
}
/// Used as part of deployment scripts.
///
/// Assumes information is only relayed to storage controller after first selecting an unique id on
/// control plane database, which means we have an id field in the request and payload.
async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let body = json_request::<SafekeeperPersistence>(&mut req).await?;
let id = parse_request_param::<i64>(&req, "id")?;
if id != body.id {
// it should be repeated
return Err(ApiError::BadRequest(anyhow::anyhow!(
"id mismatch: url={id:?}, body={:?}",
body.id
)));
}
let state = get_state(&req);
state.service.upsert_safekeeper(body).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.unwrap())
}
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
/// be allowed to run if Service has finished its initial reconciliation.
async fn tenant_service_handler<R, H>(
@@ -1127,6 +1177,13 @@ pub fn make_router(
.put("/control/v1/step_down", |r| {
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
})
.get("/control/v1/safekeeper/:id", |r| {
named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
})
.post("/control/v1/safekeeper/:id", |r| {
// id is in the body
named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.

View File

@@ -11,8 +11,8 @@ use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::{
Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
RECONCILER_CONCURRENCY_DEFAULT,
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT,
MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
@@ -104,6 +104,10 @@ struct Cli {
// a pageserver
#[arg(long)]
max_secondary_lag_bytes: Option<u64>,
// Period with which to send heartbeats to registered nodes
#[arg(long)]
heartbeat_interval: Option<humantime::Duration>,
}
enum StrictMode {
@@ -285,6 +289,10 @@ async fn async_main() -> anyhow::Result<()> {
split_threshold: args.split_threshold,
neon_local_repo_dir: args.neon_local_repo_dir,
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
heartbeat_interval: args
.heartbeat_interval
.map(humantime::Duration::into)
.unwrap_or(HEARTBEAT_INTERVAL_DEFAULT),
address_for_peers: args.address_for_peers,
start_as_candidate: args.start_as_candidate,
http_service_port: args.listen.port() as i32,

View File

@@ -938,6 +938,48 @@ impl Persistence {
Ok(())
}
pub(crate) async fn safekeeper_get(
&self,
id: i64,
) -> Result<SafekeeperPersistence, DatabaseError> {
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
self.with_conn(move |conn| -> DatabaseResult<SafekeeperPersistence> {
Ok(safekeepers
.filter(id_column.eq(&id))
.select(SafekeeperPersistence::as_select())
.get_result(conn)?)
})
.await
}
pub(crate) async fn safekeeper_upsert(
&self,
record: SafekeeperPersistence,
) -> Result<(), DatabaseError> {
use crate::schema::safekeepers::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
let bind = record.as_insert_or_update();
let inserted_updated = diesel::insert_into(safekeepers)
.values(&bind)
.on_conflict(id)
.do_update()
.set(&bind)
.execute(conn)?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
)));
}
Ok(())
})
.await
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -1073,3 +1115,47 @@ pub(crate) struct ControllerPersistence {
pub(crate) address: String,
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Serialize, Deserialize, Queryable, Selectable, Eq, PartialEq, Debug, Clone)]
#[diesel(table_name = crate::schema::safekeepers)]
pub(crate) struct SafekeeperPersistence {
pub(crate) id: i64,
pub(crate) region_id: String,
/// 1 is special, it means just created (not currently posted to storcon).
/// Zero or negative is not really expected.
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
pub(crate) version: i64,
pub(crate) host: String,
pub(crate) port: i32,
pub(crate) active: bool,
pub(crate) http_port: i32,
pub(crate) availability_zone_id: String,
}
impl SafekeeperPersistence {
fn as_insert_or_update(&self) -> InsertUpdateSafekeeper<'_> {
InsertUpdateSafekeeper {
id: self.id,
region_id: &self.region_id,
version: self.version,
host: &self.host,
port: self.port,
active: self.active,
http_port: self.http_port,
availability_zone_id: &self.availability_zone_id,
}
}
}
#[derive(Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::safekeepers)]
struct InsertUpdateSafekeeper<'a> {
id: i64,
region_id: &'a str,
version: i64,
host: &'a str,
port: i32,
active: bool,
http_port: i32,
availability_zone_id: &'a str,
}

View File

@@ -45,3 +45,17 @@ diesel::table! {
}
diesel::allow_tables_to_appear_in_same_query!(controllers, metadata_health, nodes, tenant_shards,);
diesel::table! {
safekeepers {
id -> Int8,
region_id -> Text,
version -> Int8,
instance_id -> Text,
host -> Text,
port -> Int4,
active -> Bool,
http_port -> Int4,
availability_zone_id -> Text,
}
}

View File

@@ -121,6 +121,9 @@ pub const MAX_OFFLINE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
/// being handled on the pageserver side.
pub const MAX_WARMING_UP_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
/// How often to send heartbeats to registered nodes?
pub const HEARTBEAT_INTERVAL_DEFAULT: Duration = Duration::from_secs(5);
#[derive(Clone, strum_macros::Display)]
enum TenantOperations {
Create,
@@ -326,6 +329,8 @@ pub struct Config {
// upgraded to primary.
pub max_secondary_lag_bytes: Option<u64>,
pub heartbeat_interval: Duration,
pub address_for_peers: Option<Uri>,
pub start_as_candidate: bool,
@@ -909,9 +914,7 @@ impl Service {
async fn spawn_heartbeat_driver(&self) {
self.startup_complete.clone().wait().await;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
let mut interval = tokio::time::interval(self.config.heartbeat_interval);
while !self.cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => { }
@@ -6473,4 +6476,18 @@ impl Service {
global_observed
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
self.persistence.safekeeper_get(id).await
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperPersistence,
) -> Result<(), DatabaseError> {
self.persistence.safekeeper_upsert(record).await
}
}

View File

@@ -422,7 +422,7 @@ fn stream_objects_with_retries<'a>(
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
@@ -473,7 +473,7 @@ async fn list_objects_with_retries(
s3_target.delimiter,
DisplayErrorContext(e),
);
let backoff_time = 1 << trial.max(5);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
}
}
@@ -492,7 +492,7 @@ async fn download_object_with_retries(
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
let backoff_time = 1 << trial.max(5);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
continue;
}
@@ -508,7 +508,7 @@ async fn download_object_with_retries(
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
let backoff_time = 1 << trial.max(5);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
}
}

View File

@@ -1164,6 +1164,8 @@ class NeonEnv:
"listen_http_addr": f"localhost:{pageserver_port.http}",
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
# Default which can be overriden with `NeonEnvBuilder.pageserver_config_override`
"availability_zone": "us-east-2a",
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
@@ -1192,11 +1194,7 @@ class NeonEnv:
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(
self,
ps_id,
port=pageserver_port,
)
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
)
cfg["pageservers"].append(ps_cfg)
@@ -2400,6 +2398,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"listen_http_port": node.service_port.http,
"listen_pg_addr": "localhost",
"listen_pg_port": node.service_port.pg,
"availability_zone_id": node.az_id,
}
log.info(f"node_register({body})")
self.request(
@@ -2846,6 +2845,29 @@ class NeonStorageController(MetricsGetter, LogUtils):
raise AssertionError("unreachable")
def on_safekeeper_deploy(self, id: int, body: dict[str, Any]):
self.request(
"POST",
f"{self.api}/control/v1/safekeeper/{id}",
headers=self.headers(TokenScope.ADMIN),
json=body,
)
def get_safekeeper(self, id: int) -> Optional[dict[str, Any]]:
try:
response = self.request(
"GET",
f"{self.api}/control/v1/safekeeper/{id}",
headers=self.headers(TokenScope.ADMIN),
)
json = response.json()
assert isinstance(json, dict)
return json
except StorageControllerApiException as e:
if e.status_code == 404:
return None
raise e
def __enter__(self) -> "NeonStorageController":
return self
@@ -2923,10 +2945,11 @@ class NeonPageserver(PgProtocol, LogUtils):
TEMP_FILE_SUFFIX = "___temp"
def __init__(self, env: NeonEnv, id: int, port: PageserverPort):
def __init__(self, env: NeonEnv, id: int, port: PageserverPort, az_id: str):
super().__init__(host="localhost", port=port.pg, user="cloud_admin")
self.env = env
self.id = id
self.az_id = az_id
self.running = False
self.service_port = port
self.version = env.get_binary_version("pageserver")
@@ -4553,6 +4576,8 @@ class Safekeeper(LogUtils):
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)
# List partial uploaded segments of this safekeeper. Works only for
# RemoteStorageKind.LOCAL_FS.
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
tline_path = (
self.env.repo_dir
@@ -4562,9 +4587,11 @@ class Safekeeper(LogUtils):
/ str(timeline_id)
)
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
return self._list_segments_in_dir(
segs = self._list_segments_in_dir(
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
)
mysegs = [s for s in segs if f"sk{self.id}" in s]
return mysegs
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""

View File

@@ -109,9 +109,6 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
# controller's attempts to notify the endpoint).
".*reconciler.*neon_local notification hook failed.*",
".*reconciler.*neon_local error.*",
# Neon local does not provide pageserver with an AZ
# TODO: remove this once neon local does so
".*registering without specific availability zone id.*",
]

View File

@@ -174,6 +174,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def debug_dump_timeline(
self, timeline_id: TimelineId, params: Optional[Dict[str, str]] = None
) -> Any:
params = params or {}
params["timeline_id"] = str(timeline_id)
dump = self.debug_dump(params)
return dump["timelines"][0]
def get_partial_backup(self, timeline_id: TimelineId) -> Any:
dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"})
return dump["control_file"]["partial_backup"]
def get_eviction_state(self, timeline_id: TimelineId) -> Any:
dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"})
return dump["control_file"]["eviction_state"]
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()
@@ -228,6 +244,14 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def backup_partial_reset(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/backup_partial_reset",
json={},
)
res.raise_for_status()
return res.json()
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",

View File

@@ -31,7 +31,7 @@ from fixtures.pageserver.utils import (
remote_storage_delete_key,
timeline_delete_wait_completed,
)
from fixtures.pg_version import PgVersion
from fixtures.pg_version import PgVersion, run_only_on_default_postgres
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.storage_controller_proxy import StorageControllerProxy
@@ -2330,3 +2330,69 @@ def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder)
connect=0, # Disable retries: we want to see the 503
)
).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
env.start()
fake_id = 5
target = env.storage_controller
assert target.get_safekeeper(fake_id) is None
body = {
"active": True,
"id": fake_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "safekeeper-333.us-east-2.aws.neon.build",
"port": 6401,
"http_port": 7676,
"version": 5957,
"availability_zone_id": "us-east-2b",
}
target.on_safekeeper_deploy(fake_id, body)
inserted = target.get_safekeeper(fake_id)
assert inserted is not None
assert eq_safekeeper_records(body, inserted)
# error out if pk is changed (unexpected)
with pytest.raises(StorageControllerApiException) as exc:
different_pk = dict(body)
different_pk["id"] = 4
assert different_pk["id"] != body["id"]
target.on_safekeeper_deploy(fake_id, different_pk)
assert exc.value.status_code == 400
inserted_again = target.get_safekeeper(fake_id)
assert inserted_again is not None
assert eq_safekeeper_records(inserted, inserted_again)
# the most common case, version goes up:
assert isinstance(body["version"], int)
body["version"] += 1
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]
masked_keys = ["created_at", "updated_at"]
for d in compared:
# keep deleting these in case we are comparing the body as it will be uploaded by real scripts
for key in masked_keys:
if key in d:
del d[key]
return compared[0] == compared[1]

View File

@@ -372,8 +372,10 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
tenant_id: TenantId = env.initial_tenant
timeline_id = env.initial_timeline
# Multiple creation requests which race will generate this error
# Multiple creation requests which race will generate this error on the pageserver
# and storage controller respectively
env.pageserver.allowed_errors.append(".*Conflict: Tenant is already being modified.*")
env.storage_controller.allowed_errors.append(".*Conflict: Tenant is already being modified.*")
# Tenant creation requests which arrive out of order will generate complaints about
# generation nubmers out of order.

View File

@@ -72,6 +72,17 @@ def wait_lsn_force_checkpoint(
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at_sk(
safekeeper: Safekeeper,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id)
wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
@@ -79,6 +90,10 @@ def wait_lsn_force_checkpoint_at(
ps: NeonPageserver,
pageserver_conn_options=None,
):
"""
Wait until pageserver receives given lsn, force checkpoint and wait for
upload, i.e. remote_consistent_lsn advancement.
"""
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
@@ -2330,6 +2345,77 @@ def test_s3_eviction(
assert event_metrics_seen
# Test resetting uploaded partial segment state.
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
# We want to upload/evict quickly, but not too quickly to check that s3 is
# empty before next round of upload happens.
# Note: this test fails with --delete-offloaded-wal, this is expected.
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--partial-backup-timeout",
"1s",
"--control-file-save-interval",
"1s",
"--eviction-min-resident=1s",
]
# XXX: pageserver currently connects to safekeeper as long as connection
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
# causing uneviction. It should be fixed to not reconnect if last
# remote_consistent_lsn is communicated and there is nothing to fetch. Make
# value lower to speed up the test.
initial_tenant_conf = {
"lagging_wal_timeout": "1s",
}
env = neon_env_builder.init_start(initial_tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create("main")
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.stop()
sk = env.safekeepers[0]
# eviction won't happen until remote_consistent_lsn catches up.
wait_lsn_force_checkpoint_at_sk(sk, tenant_id, timeline_id, env.pageserver)
http_cli = env.safekeepers[0].http_client()
# wait until eviction happens
def evicted():
eviction_state = http_cli.get_eviction_state(timeline_id)
log.info(f"eviction_state: {eviction_state}")
if isinstance(eviction_state, str) and eviction_state == "Present":
raise Exception("eviction didn't happen yet")
wait_until(30, 1, evicted)
# it must have uploaded something
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
log.info(f"uploaded segments before reset: {uploaded_segs}")
assert len(uploaded_segs) > 0
reset_res = http_cli.backup_partial_reset(tenant_id, timeline_id)
log.info(f"reset res: {reset_res}")
# Backup_partial_reset must have reset the state and dropped s3 segment.
#
# Note: if listing takes more than --partial-backup-timeout test becomes
# flaky because file might be reuploaded. With local fs it shouldn't be an
# issue, but can add retry if this appears.
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
log.info(f"uploaded segments after reset: {uploaded_segs}")
assert len(uploaded_segs) == 0
# calling second time should be ok
http_cli.backup_partial_reset(tenant_id, timeline_id)
# inserting data should be ok
endpoint.start()
endpoint.safe_psql("insert into t values(1, 'hehe')")
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
"""
Verify that pulling timeline from a SK with an uploaded partial segment
@@ -2357,7 +2443,16 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
"--eviction-min-resident=500ms",
]
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
# XXX: pageserver currently connects to safekeeper as long as connection
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
# causing uneviction. It should be fixed to not reconnect if last
# remote_consistent_lsn is communicated and there is nothing to fetch. Until
# this is fixed make value lower to speed up the test.
initial_tenant_conf = {
"lagging_wal_timeout": "1s",
"checkpoint_timeout": "100ms",
}
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -2421,7 +2516,7 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
endpoint.start(safekeepers=[2, 3])
def new_partial_segment_uploaded():
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
segs = dst_sk.list_uploaded_segments(tenant_id, timeline_id)
for seg in segs:
if "partial" in seg and "sk3" in seg:
return seg