mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
Compare commits
21 Commits
skyzh/reti
...
run-clippy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae50e9600f | ||
|
|
cd613f5ab3 | ||
|
|
539b3ad541 | ||
|
|
79fa640058 | ||
|
|
c41f9870a5 | ||
|
|
a0e923b70b | ||
|
|
38f6107534 | ||
|
|
f25e07893c | ||
|
|
460d48437b | ||
|
|
7a1397cf37 | ||
|
|
75310fe441 | ||
|
|
ecfa3d9de9 | ||
|
|
3d9001d83f | ||
|
|
1a874a3e86 | ||
|
|
c4fe6641c1 | ||
|
|
c7187be8a1 | ||
|
|
83dd7f559c | ||
|
|
80512e2779 | ||
|
|
3916810f20 | ||
|
|
c43e664ff5 | ||
|
|
b37da32c6f |
22
.github/workflows/build_and_test.yml
vendored
22
.github/workflows/build_and_test.yml
vendored
@@ -122,10 +122,12 @@ jobs:
|
||||
|
||||
check-codestyle-rust:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
# There's no reason to expect clippy or code formatting to be different on different platforms,
|
||||
# so it's enough to run these on x64 only.
|
||||
strategy:
|
||||
matrix:
|
||||
arch: [ x64, arm64 ]
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
@@ -166,15 +168,25 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
|
||||
|
||||
- name: Run cargo clippy (debug)
|
||||
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
|
||||
- name: Run cargo clippy (release)
|
||||
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
|
||||
run: |
|
||||
parallel --jobs 8 "cargo hack --feature-powerset --partition {}/8 clippy --target-dir target/partition-{} $CLIPPY_COMMON_ARGS" ::: 1 2 3 4 5 6 7 8
|
||||
|
||||
# instead of running the full release build, running debug build again,
|
||||
# but with disabled `debug-assertions` to excersice release code paths
|
||||
- name: Run cargo clippy (debug, with debug-assertions=false)
|
||||
run: |
|
||||
for N in 4 8 10 12 14 16 18 20; do
|
||||
echo "Running clippy with debug-assertions=false for partition ${N}"
|
||||
time parallel --jobs ${N} "cargo hack --feature-powerset --partition {}/${N} clippy --target-dir target/partition-{} $CLIPPY_COMMON_ARGS -C debug-assertions=off" ::: $(seq -s " " 1 ${N})
|
||||
rm -rf target/partition-*
|
||||
done
|
||||
|
||||
- name: Check documentation generation
|
||||
run: cargo doc --workspace --no-deps --document-private-items
|
||||
env:
|
||||
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
|
||||
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
|
||||
|
||||
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
|
||||
- name: Check formatting
|
||||
|
||||
@@ -165,6 +165,9 @@ pub struct NeonStorageControllerConf {
|
||||
pub split_threshold: Option<u64>,
|
||||
|
||||
pub max_secondary_lag_bytes: Option<u64>,
|
||||
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heartbeat_interval: Duration,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -172,6 +175,9 @@ impl NeonStorageControllerConf {
|
||||
const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
// Very tight heartbeat interval to speed up tests
|
||||
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
|
||||
}
|
||||
|
||||
impl Default for NeonStorageControllerConf {
|
||||
@@ -183,6 +189,7 @@ impl Default for NeonStorageControllerConf {
|
||||
database_url: None,
|
||||
split_threshold: None,
|
||||
max_secondary_lag_bytes: None,
|
||||
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,6 +181,23 @@ impl PageServerNode {
|
||||
);
|
||||
io::stdout().flush()?;
|
||||
|
||||
// If the config file we got as a CLI argument includes the `availability_zone`
|
||||
// config, then use that to populate the `metadata.json` file for the pageserver.
|
||||
// In production the deployment orchestrator does this for us.
|
||||
let az_id = conf
|
||||
.other
|
||||
.get("availability_zone")
|
||||
.map(|toml| {
|
||||
let az_str = toml.to_string();
|
||||
// Trim the (") chars from the toml representation
|
||||
if az_str.starts_with('"') && az_str.ends_with('"') {
|
||||
az_str[1..az_str.len() - 1].to_string()
|
||||
} else {
|
||||
az_str
|
||||
}
|
||||
})
|
||||
.unwrap_or("local".to_string());
|
||||
|
||||
let config = self
|
||||
.pageserver_init_make_toml(conf)
|
||||
.context("make pageserver toml")?;
|
||||
@@ -216,6 +233,7 @@ impl PageServerNode {
|
||||
let (_http_host, http_port) =
|
||||
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
|
||||
let http_port = http_port.unwrap_or(9898);
|
||||
|
||||
// Intentionally hand-craft JSON: this acts as an implicit format compat test
|
||||
// in case the pageserver-side structure is edited, and reflects the real life
|
||||
// situation: the metadata is written by some other script.
|
||||
@@ -226,7 +244,10 @@ impl PageServerNode {
|
||||
postgres_port: self.pg_connection_config.port(),
|
||||
http_host: "localhost".to_string(),
|
||||
http_port,
|
||||
other: HashMap::new(),
|
||||
other: HashMap::from([(
|
||||
"availability_zone_id".to_string(),
|
||||
serde_json::json!(az_id),
|
||||
)]),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -437,6 +437,8 @@ impl StorageController {
|
||||
&humantime::Duration::from(self.config.max_offline).to_string(),
|
||||
"--max-warming-up-interval",
|
||||
&humantime::Duration::from(self.config.max_warming_up).to_string(),
|
||||
"--heartbeat-interval",
|
||||
&humantime::Duration::from(self.config.heartbeat_interval).to_string(),
|
||||
"--address-for-peers",
|
||||
&address_for_peers.to_string(),
|
||||
]
|
||||
|
||||
@@ -716,12 +716,17 @@ pub struct TimelineInfo {
|
||||
pub pg_version: u32,
|
||||
|
||||
pub state: TimelineState,
|
||||
pub is_archived: bool,
|
||||
|
||||
pub walreceiver_status: String,
|
||||
|
||||
// ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
|
||||
// Backward compatibility: you will get a JSON not containing the newly-added field.
|
||||
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
|
||||
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
|
||||
// read.
|
||||
/// The last aux file policy being used on this timeline
|
||||
pub last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
pub is_archived: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! Periodically collect consumption metrics for all active tenants
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::config::PageServerConf;
|
||||
use crate::consumption_metrics::metrics::MetricsKey;
|
||||
use crate::consumption_metrics::upload::KeyGen as _;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::size::CalculateSyntheticSizeError;
|
||||
@@ -8,6 +10,7 @@ use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
use itertools::Itertools as _;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
@@ -19,9 +22,8 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::NodeId;
|
||||
|
||||
mod metrics;
|
||||
use crate::consumption_metrics::metrics::MetricsKey;
|
||||
mod disk_cache;
|
||||
mod metrics;
|
||||
mod upload;
|
||||
|
||||
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
@@ -143,6 +145,12 @@ async fn collect_metrics(
|
||||
// these are point in time, with variable "now"
|
||||
let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
|
||||
|
||||
// Pre-generate event idempotency keys, to reuse them across the bucket
|
||||
// and HTTP sinks.
|
||||
let idempotency_keys = std::iter::repeat_with(|| node_id.as_str().generate())
|
||||
.take(metrics.len())
|
||||
.collect_vec();
|
||||
|
||||
let metrics = Arc::new(metrics);
|
||||
|
||||
// why not race cancellation here? because we are one of the last tasks, and if we are
|
||||
@@ -161,8 +169,14 @@ async fn collect_metrics(
|
||||
}
|
||||
|
||||
if let Some(bucket_client) = &bucket_client {
|
||||
let res =
|
||||
upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await;
|
||||
let res = upload::upload_metrics_bucket(
|
||||
bucket_client,
|
||||
&cancel,
|
||||
&node_id,
|
||||
&metrics,
|
||||
&idempotency_keys,
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
tracing::error!("failed to upload to S3: {e:#}");
|
||||
}
|
||||
@@ -174,9 +188,9 @@ async fn collect_metrics(
|
||||
&client,
|
||||
metric_collection_endpoint,
|
||||
&cancel,
|
||||
&node_id,
|
||||
&metrics,
|
||||
&mut cached_metrics,
|
||||
&idempotency_keys,
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
|
||||
@@ -24,16 +24,16 @@ pub(super) async fn upload_metrics_http(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
cancel: &CancellationToken,
|
||||
node_id: &str,
|
||||
metrics: &[RawMetric],
|
||||
cached_metrics: &mut Cache,
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
let mut uploaded = 0;
|
||||
let mut failed = 0;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
|
||||
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
|
||||
|
||||
while let Some(res) = iter.next() {
|
||||
let (chunk, body) = res?;
|
||||
@@ -87,6 +87,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
cancel: &CancellationToken,
|
||||
node_id: &str,
|
||||
metrics: &[RawMetric],
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
if metrics.is_empty() {
|
||||
// Skip uploads if we have no metrics, so that readers don't have to handle the edge case
|
||||
@@ -106,7 +107,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
|
||||
// Serialize and write into compressed buffer
|
||||
let started_at = std::time::Instant::now();
|
||||
for res in serialize_in_chunks(CHUNK_SIZE, metrics, node_id) {
|
||||
for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
|
||||
let (_chunk, body) = res?;
|
||||
gzip_writer.write_all(&body).await?;
|
||||
}
|
||||
@@ -134,29 +135,31 @@ pub(super) async fn upload_metrics_bucket(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The return type is quite ugly, but we gain testability in isolation
|
||||
fn serialize_in_chunks<'a, F>(
|
||||
/// Serializes the input metrics as JSON in chunks of chunk_size. The provided
|
||||
/// idempotency keys are injected into the corresponding metric events (reused
|
||||
/// across different metrics sinks), and must have the same length as input.
|
||||
fn serialize_in_chunks<'a>(
|
||||
chunk_size: usize,
|
||||
input: &'a [RawMetric],
|
||||
factory: F,
|
||||
idempotency_keys: &'a [IdempotencyKey<'a>],
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
where
|
||||
F: KeyGen<'a> + 'a,
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
struct Iter<'a, F> {
|
||||
assert_eq!(input.len(), idempotency_keys.len());
|
||||
|
||||
struct Iter<'a> {
|
||||
inner: std::slice::Chunks<'a, RawMetric>,
|
||||
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
|
||||
chunk_size: usize,
|
||||
|
||||
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
|
||||
buffer: bytes::BytesMut,
|
||||
// chunk amount of events are reused to produce the serialized document
|
||||
scratch: Vec<Event<Ids, Name>>,
|
||||
factory: F,
|
||||
}
|
||||
|
||||
impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
@@ -167,17 +170,14 @@ where
|
||||
self.scratch.extend(
|
||||
chunk
|
||||
.iter()
|
||||
.map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
|
||||
.zip(&mut self.idempotency_keys)
|
||||
.map(|(raw_metric, key)| raw_metric.as_event(key)),
|
||||
);
|
||||
} else {
|
||||
// next rounds: update_in_place to reuse allocations
|
||||
assert_eq!(self.scratch.len(), self.chunk_size);
|
||||
self.scratch
|
||||
.iter_mut()
|
||||
.zip(chunk.iter())
|
||||
.for_each(|(slot, raw_metric)| {
|
||||
raw_metric.update_in_place(slot, &self.factory.generate())
|
||||
});
|
||||
itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
|
||||
.for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
|
||||
}
|
||||
|
||||
let res = serde_json::to_writer(
|
||||
@@ -198,18 +198,19 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
|
||||
impl<'a> ExactSizeIterator for Iter<'a> {}
|
||||
|
||||
let buffer = bytes::BytesMut::new();
|
||||
let inner = input.chunks(chunk_size);
|
||||
let idempotency_keys = idempotency_keys.iter();
|
||||
let scratch = Vec::new();
|
||||
|
||||
Iter {
|
||||
inner,
|
||||
idempotency_keys,
|
||||
chunk_size,
|
||||
buffer,
|
||||
scratch,
|
||||
factory,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,7 +269,7 @@ impl RawMetricExt for RawMetric {
|
||||
}
|
||||
}
|
||||
|
||||
trait KeyGen<'a>: Copy {
|
||||
pub(crate) trait KeyGen<'a> {
|
||||
fn generate(&self) -> IdempotencyKey<'a>;
|
||||
}
|
||||
|
||||
@@ -389,7 +390,10 @@ mod tests {
|
||||
let examples = metric_samples();
|
||||
assert!(examples.len() > 1);
|
||||
|
||||
let factory = FixedGen::new(Utc::now(), "1", 42);
|
||||
let now = Utc::now();
|
||||
let idempotency_keys = (0..examples.len())
|
||||
.map(|i| FixedGen::new(now, "1", i as u16).generate())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// need to use Event here because serde_json::Value uses default hashmap, not linked
|
||||
// hashmap
|
||||
@@ -398,13 +402,13 @@ mod tests {
|
||||
events: Vec<Event<Ids, Name>>,
|
||||
}
|
||||
|
||||
let correct = serialize_in_chunks(examples.len(), &examples, factory)
|
||||
let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for chunk_size in 1..examples.len() {
|
||||
let actual = serialize_in_chunks(chunk_size, &examples, factory)
|
||||
let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -468,7 +468,7 @@ async fn build_timeline_info_common(
|
||||
pg_version: timeline.pg_version,
|
||||
|
||||
state,
|
||||
is_archived,
|
||||
is_archived: Some(is_archived),
|
||||
|
||||
walreceiver_status,
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
use crate::walrecord::decode_wal_record;
|
||||
use crate::walrecord::DecodedWALRecord;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -310,11 +311,13 @@ async fn import_wal(
|
||||
|
||||
let mut nrecords = 0;
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
|
||||
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, ctx)
|
||||
.await?;
|
||||
WAL_INGEST.records_committed.inc();
|
||||
|
||||
@@ -449,11 +452,12 @@ pub async fn import_wal_from_tar(
|
||||
waldecoder.feed_bytes(&bytes[offset..]);
|
||||
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, ctx)
|
||||
.await?;
|
||||
modification.commit(ctx).await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::{aux_file, repository::*};
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
@@ -168,7 +168,9 @@ impl Timeline {
|
||||
DatadirModification {
|
||||
tline: self,
|
||||
pending_lsns: Vec::new(),
|
||||
pending_updates: HashMap::new(),
|
||||
pending_metadata_pages: HashMap::new(),
|
||||
pending_data_pages: Vec::new(),
|
||||
pending_zero_data_pages: Default::default(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_directory_entries: Vec::new(),
|
||||
@@ -1031,10 +1033,24 @@ pub struct DatadirModification<'a> {
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
// underlying key-value store by the 'finish' function.
|
||||
pending_lsns: Vec<Lsn>,
|
||||
pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
/// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
|
||||
/// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
|
||||
pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
|
||||
|
||||
/// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
|
||||
/// which keys are stored here.
|
||||
pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
|
||||
|
||||
// Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However,
|
||||
// if we encounter a write from postgres in the same wal record, we will drop this entry.
|
||||
//
|
||||
// Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
|
||||
// at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
|
||||
pending_zero_data_pages: HashSet<CompactKey>,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
@@ -1058,6 +1074,10 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_bytes
|
||||
}
|
||||
|
||||
pub(crate) fn has_dirty_data_pages(&self) -> bool {
|
||||
(!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
@@ -1066,6 +1086,10 @@ impl<'a> DatadirModification<'a> {
|
||||
lsn,
|
||||
self.lsn
|
||||
);
|
||||
|
||||
// If we are advancing LSN, then state from previous wal record should have been flushed.
|
||||
assert!(self.pending_zero_data_pages.is_empty());
|
||||
|
||||
if lsn > self.lsn {
|
||||
self.pending_lsns.push(self.lsn);
|
||||
self.lsn = lsn;
|
||||
@@ -1073,6 +1097,17 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
|
||||
/// keys that represent literal blocks that postgres can read. So data includes relation blocks and
|
||||
/// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
|
||||
///
|
||||
/// The distinction is important because data keys are handled on a fast path where dirty writes are
|
||||
/// not readable until this modification is committed, whereas metadata keys are visible for read
|
||||
/// via [`Self::get`] as soon as their record has been ingested.
|
||||
fn is_data_key(key: &Key) -> bool {
|
||||
key.is_rel_block_key() || key.is_slru_block_key()
|
||||
}
|
||||
|
||||
/// Initialize a completely new repository.
|
||||
///
|
||||
/// This inserts the directory metadata entries that are assumed to
|
||||
@@ -1180,6 +1215,31 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
|
||||
self.pending_zero_data_pages
|
||||
.insert(rel_block_to_key(rel, blknum).to_compact());
|
||||
self.pending_bytes += ZERO_PAGE.len();
|
||||
}
|
||||
|
||||
pub(crate) fn put_slru_page_image_zero(
|
||||
&mut self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
) {
|
||||
self.pending_zero_data_pages
|
||||
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
|
||||
self.pending_bytes += ZERO_PAGE.len();
|
||||
}
|
||||
|
||||
/// Call this at the end of each WAL record.
|
||||
pub(crate) fn on_record_end(&mut self) {
|
||||
let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
|
||||
for key in pending_zero_data_pages {
|
||||
self.put_data(key, Value::Image(ZERO_PAGE.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Store a relmapper file (pg_filenode.map) in the repository
|
||||
pub async fn put_relmap_file(
|
||||
&mut self,
|
||||
@@ -1778,7 +1838,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -1789,31 +1849,11 @@ impl<'a> DatadirModification<'a> {
|
||||
let mut writer = self.tline.writer().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
|
||||
for (key, values) in self.pending_updates.drain() {
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
bail!(
|
||||
"the request contains data not supported by pageserver at TimelineWriter::put: {}", key
|
||||
);
|
||||
}
|
||||
let mut write_batch = Vec::new();
|
||||
for (lsn, value_ser_size, value) in values {
|
||||
if key.is_rel_block_key() || key.is_slru_block_key() {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
write_batch.push((key.to_compact(), lsn, value_ser_size, value));
|
||||
} else {
|
||||
retained_pending_updates.entry(key).or_default().push((
|
||||
lsn,
|
||||
value_ser_size,
|
||||
value,
|
||||
));
|
||||
}
|
||||
}
|
||||
writer.put_batch(write_batch, ctx).await?;
|
||||
}
|
||||
let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
|
||||
|
||||
self.pending_updates = retained_pending_updates;
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put_batch(pending_data_pages, ctx).await?;
|
||||
self.pending_bytes = 0;
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
@@ -1834,29 +1874,31 @@ impl<'a> DatadirModification<'a> {
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
// Commit should never be called mid-wal-record
|
||||
assert!(self.pending_zero_data_pages.is_empty());
|
||||
|
||||
let mut writer = self.tline.writer().await;
|
||||
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
if !self.pending_updates.is_empty() {
|
||||
// Ordering: the items in this batch do not need to be in any global order, but values for
|
||||
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
|
||||
// this to do efficient updates to its index.
|
||||
let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
|
||||
.pending_updates
|
||||
// Ordering: the items in this batch do not need to be in any global order, but values for
|
||||
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
|
||||
// this to do efficient updates to its index.
|
||||
let mut write_batch = std::mem::take(&mut self.pending_data_pages);
|
||||
|
||||
write_batch.extend(
|
||||
self.pending_metadata_pages
|
||||
.drain()
|
||||
.flat_map(|(key, values)| {
|
||||
values.into_iter().map(move |(lsn, val_ser_size, value)| {
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
bail!("the request contains data not supported by pageserver at TimelineWriter::put: {}", key);
|
||||
}
|
||||
Ok((key.to_compact(), lsn, val_ser_size, value))
|
||||
})
|
||||
})
|
||||
.collect::<anyhow::Result<Vec<_>>>()?;
|
||||
values
|
||||
.into_iter()
|
||||
.map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
|
||||
}),
|
||||
);
|
||||
|
||||
writer.put_batch(batch, ctx).await?;
|
||||
if !write_batch.is_empty() {
|
||||
writer.put_batch(write_batch, ctx).await?;
|
||||
}
|
||||
|
||||
if !self.pending_deletions.is_empty() {
|
||||
@@ -1887,33 +1929,58 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
self.pending_updates.len() + self.pending_deletions.len()
|
||||
self.pending_metadata_pages.len()
|
||||
+ self.pending_data_pages.len()
|
||||
+ self.pending_deletions.len()
|
||||
}
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
/// Read a page from the Timeline we are writing to. For metadata pages, this passes through
|
||||
/// a cache in Self, which makes writes earlier in this modification visible to WAL records later
|
||||
/// in the modification.
|
||||
///
|
||||
/// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
|
||||
/// page must ensure that the pages they read are already committed in Timeline, for example
|
||||
/// DB create operations are always preceded by a call to commit(). This is special cased because
|
||||
/// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
|
||||
/// and not data pages.
|
||||
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, _, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
// work directly with Images, and we never need to read actual
|
||||
// data pages. We could handle this if we had to, by calling
|
||||
// the walredo manager, but let's keep it simple for now.
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
)))
|
||||
};
|
||||
if !Self::is_data_key(&key) {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
|
||||
if let Some((_, _, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
// work directly with Images, and we never need to read actual
|
||||
// data pages. We could handle this if we had to, by calling
|
||||
// the walredo manager, but let's keep it simple for now.
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
)))
|
||||
};
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This is an expensive check, so we only do it in debug mode. If reading a data key,
|
||||
// this key should never be present in pending_data_pages. We ensure this by committing
|
||||
// modifications before ingesting DB create operations, which are the only kind that reads
|
||||
// data pages during ingest.
|
||||
if cfg!(debug_assertions) {
|
||||
for (dirty_key, _, _, _) in &self.pending_data_pages {
|
||||
debug_assert!(&key.to_compact() != dirty_key);
|
||||
}
|
||||
|
||||
debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
|
||||
}
|
||||
}
|
||||
|
||||
// Metadata page cache miss, or we're reading a data page.
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
@@ -1925,11 +1992,40 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
if Self::is_data_key(&key) {
|
||||
self.put_data(key.to_compact(), val)
|
||||
} else {
|
||||
self.put_metadata(key.to_compact(), val)
|
||||
}
|
||||
}
|
||||
|
||||
fn put_data(&mut self, key: CompactKey, val: Value) {
|
||||
let val_serialized_size = val.serialized_size().unwrap() as usize;
|
||||
|
||||
// If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This
|
||||
// is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
|
||||
// and the subsequent postgres-originating write
|
||||
if self.pending_zero_data_pages.remove(&key) {
|
||||
self.pending_bytes -= ZERO_PAGE.len();
|
||||
}
|
||||
|
||||
self.pending_bytes += val_serialized_size;
|
||||
self.pending_data_pages
|
||||
.push((key, self.lsn, val_serialized_size, val))
|
||||
}
|
||||
|
||||
fn put_metadata(&mut self, key: CompactKey, val: Value) {
|
||||
let values = self.pending_metadata_pages.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
|
||||
if *last_lsn == self.lsn {
|
||||
// Update the pending_bytes contribution from this entry, and update the serialized size in place
|
||||
self.pending_bytes -= *last_value_ser_size;
|
||||
*last_value_ser_size = val.serialized_size().unwrap() as usize;
|
||||
self.pending_bytes += *last_value_ser_size;
|
||||
|
||||
// Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
|
||||
// have been generated by synthesized zero page writes prior to the first real write to a page.
|
||||
*last_value = val;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -692,8 +692,13 @@ impl InMemoryLayer {
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
// This should not break anything, but is unexpected: ingestion code aims to filter out
|
||||
// multiple writes to the same key at the same LSN. This happens in cases where our
|
||||
// ingenstion code generates some write like an empty page, and we see a write from postgres
|
||||
// to the same key in the same wal record. If one such write makes it through, we
|
||||
// index the most recent write, implicitly ignoring the earlier write. We log a warning
|
||||
// because this case is unexpected, and we would like tests to fail if this happens.
|
||||
warn!("Key {} at {} written twice at same LSN", key, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::{
|
||||
task_mgr::{TaskKind, WALRECEIVER_RUNTIME},
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
walingest::WalIngest,
|
||||
walrecord::DecodedWALRecord,
|
||||
walrecord::{decode_wal_record, DecodedWALRecord},
|
||||
};
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
@@ -312,10 +312,25 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
{
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
let mut modification = timeline.begin_modification(startlsn);
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
@@ -324,9 +339,28 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
// Deserialize WAL record
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?;
|
||||
|
||||
if decoded.is_dbase_create_copy(timeline.pg_version)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
|
||||
// these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
|
||||
// all earlier writes of data blocks are visible by committing any modification in flight.
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
let ingested = walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {lsn}"))?;
|
||||
if !ingested {
|
||||
@@ -349,21 +383,25 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(uncommitted_records - filtered_records);
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
filtered_records = 0;
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the remaining records.
|
||||
if uncommitted_records > 0 {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(uncommitted_records - filtered_records);
|
||||
modification.commit(&ctx).await?;
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ use utils::lsn::Lsn;
|
||||
|
||||
pub struct WalIngest {
|
||||
shard: ShardIdentity,
|
||||
pg_version: u32,
|
||||
checkpoint: CheckPoint,
|
||||
checkpoint_modified: bool,
|
||||
warn_ingest_lag: WarnIngestLag,
|
||||
@@ -82,6 +83,7 @@ impl WalIngest {
|
||||
|
||||
Ok(WalIngest {
|
||||
shard: *timeline.get_shard_identity(),
|
||||
pg_version: timeline.pg_version,
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
warn_ingest_lag: WarnIngestLag {
|
||||
@@ -104,10 +106,9 @@ impl WalIngest {
|
||||
///
|
||||
pub async fn ingest_record(
|
||||
&mut self,
|
||||
recdata: Bytes,
|
||||
decoded: DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
decoded: &mut DecodedWALRecord,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<bool> {
|
||||
WAL_INGEST.records_received.inc();
|
||||
@@ -115,7 +116,12 @@ impl WalIngest {
|
||||
let prev_len = modification.len();
|
||||
|
||||
modification.set_lsn(lsn)?;
|
||||
decode_wal_record(recdata, decoded, pg_version)?;
|
||||
|
||||
if decoded.is_dbase_create_copy(self.pg_version) {
|
||||
// Records of this type should always be preceded by a commit(), as they
|
||||
// rely on reading data pages back from the Timeline.
|
||||
assert!(!modification.has_dirty_data_pages());
|
||||
}
|
||||
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
@@ -133,11 +139,11 @@ impl WalIngest {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
// Heap AM records need some special handling, because they modify VM pages
|
||||
// without registering them with the standard mechanism.
|
||||
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
|
||||
self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
|
||||
.await?;
|
||||
}
|
||||
pg_constants::RM_NEON_ID => {
|
||||
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
|
||||
self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
|
||||
.await?;
|
||||
}
|
||||
// Handle other special record types
|
||||
@@ -325,7 +331,7 @@ impl WalIngest {
|
||||
}
|
||||
pg_constants::RM_RELMAP_ID => {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
|
||||
self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
|
||||
.await?;
|
||||
}
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
@@ -470,7 +476,7 @@ impl WalIngest {
|
||||
|
||||
continue;
|
||||
}
|
||||
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
|
||||
self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -486,6 +492,8 @@ impl WalIngest {
|
||||
// until commit() is called to flush the data into the repository and update
|
||||
// the latest LSN.
|
||||
|
||||
modification.on_record_end();
|
||||
|
||||
Ok(modification.len() > prev_len)
|
||||
}
|
||||
|
||||
@@ -557,6 +565,7 @@ impl WalIngest {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
assert_eq!(image.len(), BLCKSZ as usize);
|
||||
|
||||
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
|
||||
.await?;
|
||||
} else {
|
||||
@@ -1195,7 +1204,7 @@ impl WalIngest {
|
||||
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
|
||||
// Tail of last remaining FSM page has to be zeroed.
|
||||
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
@@ -1217,7 +1226,7 @@ impl WalIngest {
|
||||
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
|
||||
// Tail of last remaining vm page has to be zeroed.
|
||||
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
|
||||
modification.put_rel_page_image_zero(rel, vm_page_no);
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
@@ -1687,7 +1696,7 @@ impl WalIngest {
|
||||
continue;
|
||||
}
|
||||
|
||||
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
|
||||
modification.put_rel_page_image_zero(rel, gap_blknum);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1753,7 +1762,7 @@ impl WalIngest {
|
||||
|
||||
// fill the gap with zeros
|
||||
for gap_blknum in old_nblocks..blknum {
|
||||
modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
|
||||
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1827,21 +1836,25 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.on_record_end();
|
||||
m.commit(&ctx).await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
|
||||
.await?;
|
||||
m.on_record_end();
|
||||
m.commit(&ctx).await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
|
||||
.await?;
|
||||
m.on_record_end();
|
||||
m.commit(&ctx).await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
|
||||
.await?;
|
||||
m.on_record_end();
|
||||
m.commit(&ctx).await?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
@@ -1983,6 +1996,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
|
||||
.await?;
|
||||
m.on_record_end();
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -2008,6 +2022,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
|
||||
.await?;
|
||||
m.on_record_end();
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -2409,7 +2424,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut modification = tline.begin_modification(startpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||
|
||||
// Decode and ingest wal. We process the wal in chunks because
|
||||
@@ -2417,8 +2431,10 @@ mod tests {
|
||||
for chunk in bytes[xlogoff..].chunks(50) {
|
||||
decoder.feed_bytes(chunk);
|
||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -160,6 +160,30 @@ pub struct DecodedWALRecord {
|
||||
pub origin_id: u16,
|
||||
}
|
||||
|
||||
impl DecodedWALRecord {
|
||||
/// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
|
||||
/// by reading other existing relations' data blocks. This is more complex to apply than new-style database
|
||||
/// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
|
||||
pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
|
||||
if self.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
match pg_version {
|
||||
14 => {
|
||||
// Postgres 14 database creations are always the legacy kind
|
||||
info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
|
||||
}
|
||||
15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
|
||||
16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
|
||||
_ => {
|
||||
panic!("Unsupported postgres version {pg_version}")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RelFileNode {
|
||||
|
||||
6
poetry.lock
generated
6
poetry.lock
generated
@@ -1110,13 +1110,13 @@ dotenv = ["python-dotenv"]
|
||||
|
||||
[[package]]
|
||||
name = "flask-cors"
|
||||
version = "4.0.1"
|
||||
version = "5.0.0"
|
||||
description = "A Flask extension adding a decorator for CORS support"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "Flask_Cors-4.0.1-py2.py3-none-any.whl", hash = "sha256:f2a704e4458665580c074b714c4627dd5a306b333deb9074d0b1794dfa2fb677"},
|
||||
{file = "flask_cors-4.0.1.tar.gz", hash = "sha256:eeb69b342142fdbf4766ad99357a7f3876a2ceb77689dc10ff912aac06c389e4"},
|
||||
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
|
||||
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
||||
@@ -86,7 +86,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
/// Subscribe and fetch all the interesting data from the broker.
|
||||
#[instrument(name = "broker pull", skip_all)]
|
||||
#[instrument(name = "broker_pull", skip_all)]
|
||||
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
||||
|
||||
|
||||
@@ -389,6 +389,25 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Unevict timeline and remove uploaded partial segment(s) from the remote storage.
|
||||
/// Successfull response returns list of segments existed before the deletion.
|
||||
/// Aimed for one-off usage not normally needed.
|
||||
async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
let response = tli
|
||||
.backup_partial_reset()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Used only in tests to hand craft required data.
|
||||
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
@@ -607,6 +626,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
|
||||
request_span(r, timeline_digest_handler)
|
||||
})
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|
||||
|r| request_span(r, timeline_backup_partial_reset),
|
||||
)
|
||||
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
|
||||
request_span(r, record_safekeeper_info)
|
||||
})
|
||||
|
||||
@@ -183,10 +183,10 @@ impl WalResidentTimeline {
|
||||
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
|
||||
);
|
||||
|
||||
let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
|
||||
let remote_timeline_path = &self.tli.remote_path;
|
||||
wal_backup::copy_partial_segment(
|
||||
&replace.previous.remote_path(&remote_timeline_path),
|
||||
&replace.current.remote_path(&remote_timeline_path),
|
||||
&replace.previous.remote_path(remote_timeline_path),
|
||||
&replace.current.remote_path(remote_timeline_path),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::{
|
||||
|
||||
/// Entrypoint for per timeline task which always runs, checking whether
|
||||
/// recovery for this safekeeper is needed and starting it if so.
|
||||
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
|
||||
#[instrument(name = "recovery", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn recovery_main(tli: WalResidentTimeline, conf: SafeKeeperConf) {
|
||||
info!("started");
|
||||
|
||||
|
||||
@@ -875,6 +875,29 @@ where
|
||||
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
|
||||
}
|
||||
|
||||
// Disallow any non-sequential writes, which can result in gaps or
|
||||
// overwrites. If we need to move the pointer, ProposerElected message
|
||||
// should have truncated WAL first accordingly. Note that the first
|
||||
// condition (WAL rewrite) is quite expected in real world; it happens
|
||||
// when walproposer reconnects to safekeeper and writes some more data
|
||||
// while first connection still gets some packets later. It might be
|
||||
// better to not log this as error! above.
|
||||
let write_lsn = self.wal_store.write_lsn();
|
||||
if write_lsn > msg.h.begin_lsn {
|
||||
bail!(
|
||||
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
|
||||
write_lsn,
|
||||
msg.h.begin_lsn
|
||||
);
|
||||
}
|
||||
if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
|
||||
bail!(
|
||||
"append request creates gap in written WAL, write_lsn={}, msg lsn={}",
|
||||
write_lsn,
|
||||
msg.h.begin_lsn,
|
||||
);
|
||||
}
|
||||
|
||||
// Now we know that we are in the same term as the proposer,
|
||||
// processing the message.
|
||||
|
||||
@@ -960,10 +983,7 @@ mod tests {
|
||||
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
state::{EvictionState, PersistedPeers, TimelinePersistentState},
|
||||
wal_storage::Storage,
|
||||
};
|
||||
use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
|
||||
use std::{ops::Deref, str::FromStr, time::Instant};
|
||||
|
||||
// fake storage for tests
|
||||
@@ -1003,6 +1023,10 @@ mod tests {
|
||||
}
|
||||
|
||||
impl wal_storage::Storage for DummyWalStore {
|
||||
fn write_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
@@ -1076,7 +1100,7 @@ mod tests {
|
||||
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
term: 2,
|
||||
term_start_lsn: Lsn(3),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
@@ -1090,24 +1114,29 @@ mod tests {
|
||||
};
|
||||
|
||||
let pem = ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(3),
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: 1,
|
||||
lsn: Lsn(3),
|
||||
}]),
|
||||
timeline_start_lsn: Lsn(0),
|
||||
term: 2,
|
||||
start_streaming_at: Lsn(1),
|
||||
term_history: TermHistory(vec![
|
||||
TermLsn {
|
||||
term: 1,
|
||||
lsn: Lsn(1),
|
||||
},
|
||||
TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(3),
|
||||
},
|
||||
]),
|
||||
timeline_start_lsn: Lsn(1),
|
||||
};
|
||||
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// check that AppendRequest before term_start_lsn doesn't switch last_log_term.
|
||||
let resp = sk
|
||||
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await;
|
||||
assert!(resp.is_ok());
|
||||
assert_eq!(sk.get_last_log_term(), 0);
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(sk.get_last_log_term(), 1);
|
||||
|
||||
// but record at term_start_lsn does the switch
|
||||
ar_hdr.begin_lsn = Lsn(2);
|
||||
@@ -1116,12 +1145,63 @@ mod tests {
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
let resp = sk
|
||||
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await;
|
||||
assert!(resp.is_ok());
|
||||
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
|
||||
assert_eq!(sk.get_last_log_term(), 1);
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(sk.get_last_log_term(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_non_consecutive_write() {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: test_sk_state(),
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
|
||||
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
|
||||
|
||||
let pem = ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(1),
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: 1,
|
||||
lsn: Lsn(1),
|
||||
}]),
|
||||
timeline_start_lsn: Lsn(1),
|
||||
};
|
||||
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(3),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
// do write ending at 2, it should be ok
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut ar_hrd2 = ar_hdr.clone();
|
||||
ar_hrd2.begin_lsn = Lsn(4);
|
||||
ar_hrd2.end_lsn = Lsn(5);
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
// and now starting at 4, it must fail
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -36,7 +37,7 @@ use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, Tim
|
||||
use crate::timeline_guard::ResidenceGuard;
|
||||
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::{self};
|
||||
use crate::wal_backup::{self, remote_timeline_path};
|
||||
use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
@@ -469,6 +470,7 @@ impl From<TimelineError> for ApiError {
|
||||
/// It also holds SharedState and provides mutually exclusive access to it.
|
||||
pub struct Timeline {
|
||||
pub ttid: TenantTimelineId,
|
||||
pub remote_path: RemotePath,
|
||||
|
||||
/// Used to broadcast commit_lsn updates to all background jobs.
|
||||
commit_lsn_watch_tx: watch::Sender<Lsn>,
|
||||
@@ -519,8 +521,10 @@ impl Timeline {
|
||||
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
remote_path,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
@@ -557,8 +561,10 @@ impl Timeline {
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
remote_path,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
@@ -902,6 +908,10 @@ impl Timeline {
|
||||
|
||||
Ok(WalResidentTimeline::new(self.clone(), guard))
|
||||
}
|
||||
|
||||
pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
|
||||
self.manager_ctl.backup_partial_reset().await
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a guard that allows to read/write disk timeline state.
|
||||
|
||||
@@ -28,28 +28,38 @@ impl Manager {
|
||||
/// - control file is flushed (no next event scheduled)
|
||||
/// - no WAL residence guards
|
||||
/// - no pushes to the broker
|
||||
/// - partial WAL backup is uploaded
|
||||
/// - last partial WAL segment is uploaded
|
||||
/// - all local segments before the uploaded partial are committed and uploaded
|
||||
pub(crate) fn ready_for_eviction(
|
||||
&self,
|
||||
next_event: &Option<tokio::time::Instant>,
|
||||
state: &StateSnapshot,
|
||||
) -> bool {
|
||||
self.backup_task.is_none()
|
||||
let ready = self.backup_task.is_none()
|
||||
&& self.recovery_task.is_none()
|
||||
&& self.wal_removal_task.is_none()
|
||||
&& self.partial_backup_task.is_none()
|
||||
&& self.partial_backup_uploaded.is_some()
|
||||
&& next_event.is_none()
|
||||
&& self.access_service.is_empty()
|
||||
&& !self.tli_broker_active.get()
|
||||
// Partial segment of current flush_lsn is uploaded up to this flush_lsn.
|
||||
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
|
||||
// And it is the next one after the last removed. Given that local
|
||||
// WAL is removed only after it is uploaded to s3 (and pageserver
|
||||
// advancing remote_consistent_lsn) which happens only after WAL is
|
||||
// committed, true means all this is done.
|
||||
//
|
||||
// This also works for the first segment despite last_removed_segno
|
||||
// being 0 on init because this 0 triggers run of wal_removal_task
|
||||
// on success of which manager updates the horizon.
|
||||
&& self
|
||||
.partial_backup_uploaded
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.flush_lsn
|
||||
.segment_number(self.wal_seg_size)
|
||||
== self.last_removed_segno + 1
|
||||
== self.last_removed_segno + 1;
|
||||
ready
|
||||
}
|
||||
|
||||
/// Evict the timeline to remote storage.
|
||||
@@ -83,7 +93,8 @@ impl Manager {
|
||||
info!("successfully evicted timeline");
|
||||
}
|
||||
|
||||
/// Restore evicted timeline from remote storage.
|
||||
/// Attempt to restore evicted timeline from remote storage; it must be
|
||||
/// offloaded.
|
||||
#[instrument(name = "unevict_timeline", skip_all)]
|
||||
pub(crate) async fn unevict_timeline(&mut self) {
|
||||
assert!(self.is_offloaded);
|
||||
@@ -167,7 +178,7 @@ async fn redownload_partial_segment(
|
||||
partial: &PartialRemoteSegment,
|
||||
) -> anyhow::Result<()> {
|
||||
let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
|
||||
let remote_segfile = remote_segment_path(mgr, partial)?;
|
||||
let remote_segfile = remote_segment_path(mgr, partial);
|
||||
|
||||
debug!(
|
||||
"redownloading partial segment: {} -> {}",
|
||||
@@ -252,7 +263,7 @@ async fn do_validation(
|
||||
);
|
||||
}
|
||||
|
||||
let remote_segfile = remote_segment_path(mgr, partial)?;
|
||||
let remote_segfile = remote_segment_path(mgr, partial);
|
||||
let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
|
||||
wal_backup::read_object(&remote_segfile, 0).await?;
|
||||
|
||||
@@ -279,12 +290,8 @@ fn local_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> Utf8Path
|
||||
local_partial_segfile
|
||||
}
|
||||
|
||||
fn remote_segment_path(
|
||||
mgr: &Manager,
|
||||
partial: &PartialRemoteSegment,
|
||||
) -> anyhow::Result<RemotePath> {
|
||||
let remote_timeline_path = wal_backup::remote_timeline_path(&mgr.tli.ttid)?;
|
||||
Ok(partial.remote_path(&remote_timeline_path))
|
||||
fn remote_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> RemotePath {
|
||||
partial.remote_path(&mgr.tli.remote_path)
|
||||
}
|
||||
|
||||
/// Compare first `n` bytes of two readers. If the bytes differ, return an error.
|
||||
|
||||
@@ -11,12 +11,14 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{
|
||||
task::{JoinError, JoinHandle},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, instrument, warn, Instrument};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -33,7 +35,7 @@ use crate::{
|
||||
timeline_guard::{AccessService, GuardId, ResidenceGuard},
|
||||
timelines_set::{TimelineSetGuard, TimelinesSet},
|
||||
wal_backup::{self, WalBackupTaskHandle},
|
||||
wal_backup_partial::{self, PartialRemoteSegment},
|
||||
wal_backup_partial::{self, PartialBackup, PartialRemoteSegment},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
@@ -96,6 +98,8 @@ pub enum ManagerCtlMessage {
|
||||
GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
|
||||
/// Request to drop the guard.
|
||||
GuardDrop(GuardId),
|
||||
/// Request to reset uploaded partial backup state.
|
||||
BackupPartialReset(oneshot::Sender<anyhow::Result<Vec<String>>>),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ManagerCtlMessage {
|
||||
@@ -103,6 +107,7 @@ impl std::fmt::Debug for ManagerCtlMessage {
|
||||
match self {
|
||||
ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
|
||||
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
|
||||
ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -143,6 +148,19 @@ impl ManagerCtl {
|
||||
.and_then(std::convert::identity)
|
||||
}
|
||||
|
||||
/// Request timeline manager to reset uploaded partial segment state and
|
||||
/// wait for the result.
|
||||
pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.manager_tx
|
||||
.send(ManagerCtlMessage::BackupPartialReset(tx))
|
||||
.expect("manager task is not running");
|
||||
match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(_) => anyhow::bail!("timeline manager is gone"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Must be called exactly once to bootstrap the manager.
|
||||
pub fn bootstrap_manager(
|
||||
&self,
|
||||
@@ -181,7 +199,8 @@ pub(crate) struct Manager {
|
||||
pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
|
||||
|
||||
// partial backup
|
||||
pub(crate) partial_backup_task: Option<JoinHandle<Option<PartialRemoteSegment>>>,
|
||||
pub(crate) partial_backup_task:
|
||||
Option<(JoinHandle<Option<PartialRemoteSegment>>, CancellationToken)>,
|
||||
pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
|
||||
|
||||
// misc
|
||||
@@ -302,12 +321,12 @@ pub async fn main_task(
|
||||
_ = sleep_until(&next_event) => {
|
||||
// we were waiting for some event (e.g. cfile save)
|
||||
}
|
||||
res = await_task_finish(&mut mgr.wal_removal_task) => {
|
||||
res = await_task_finish(mgr.wal_removal_task.as_mut()) => {
|
||||
// WAL removal task finished
|
||||
mgr.wal_removal_task = None;
|
||||
mgr.update_wal_removal_end(res);
|
||||
}
|
||||
res = await_task_finish(&mut mgr.partial_backup_task) => {
|
||||
res = await_task_finish(mgr.partial_backup_task.as_mut().map(|(handle, _)| handle)) => {
|
||||
// partial backup task finished
|
||||
mgr.partial_backup_task = None;
|
||||
mgr.update_partial_backup_end(res);
|
||||
@@ -335,8 +354,9 @@ pub async fn main_task(
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(partial_backup_task) = &mut mgr.partial_backup_task {
|
||||
if let Err(e) = partial_backup_task.await {
|
||||
if let Some((handle, cancel)) = &mut mgr.partial_backup_task {
|
||||
cancel.cancel();
|
||||
if let Err(e) = handle.await {
|
||||
warn!("partial backup task failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
@@ -560,11 +580,14 @@ impl Manager {
|
||||
}
|
||||
|
||||
// Get WalResidentTimeline and start partial backup task.
|
||||
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
|
||||
let cancel = CancellationToken::new();
|
||||
let handle = tokio::spawn(wal_backup_partial::main_task(
|
||||
self.wal_resident_timeline(),
|
||||
self.conf.clone(),
|
||||
self.global_rate_limiter.clone(),
|
||||
)));
|
||||
cancel.clone(),
|
||||
));
|
||||
self.partial_backup_task = Some((handle, cancel));
|
||||
}
|
||||
|
||||
/// Update the state after partial WAL backup task finished.
|
||||
@@ -579,6 +602,39 @@ impl Manager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset partial backup state and remove its remote storage data. Since it
|
||||
/// might concurrently uploading something, cancel the task first.
|
||||
async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
|
||||
info!("resetting partial backup state");
|
||||
// Force unevict timeline if it is evicted before erasing partial backup
|
||||
// state. The intended use of this function is to drop corrupted remote
|
||||
// state; we haven't enabled local files deletion yet anywhere,
|
||||
// so direct switch is safe.
|
||||
if self.is_offloaded {
|
||||
self.tli.switch_to_present().await?;
|
||||
// switch manager state as soon as possible
|
||||
self.is_offloaded = false;
|
||||
}
|
||||
|
||||
if let Some((handle, cancel)) = &mut self.partial_backup_task {
|
||||
cancel.cancel();
|
||||
info!("cancelled partial backup task, awaiting it");
|
||||
// we're going to reset .partial_backup_uploaded to None anyway, so ignore the result
|
||||
handle.await.ok();
|
||||
self.partial_backup_task = None;
|
||||
}
|
||||
|
||||
let tli = self.wal_resident_timeline();
|
||||
let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await;
|
||||
// Reset might fail e.g. when cfile is already reset but s3 removal
|
||||
// failed, so set manager state to None beforehand. In any case caller
|
||||
// is expected to retry until success.
|
||||
self.partial_backup_uploaded = None;
|
||||
let res = partial_backup.reset().await?;
|
||||
info!("reset is done");
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Handle message arrived from ManagerCtl.
|
||||
async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
|
||||
debug!("received manager message: {:?}", msg);
|
||||
@@ -602,6 +658,16 @@ impl Manager {
|
||||
Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
|
||||
self.access_service.drop_guard(guard_id);
|
||||
}
|
||||
Some(ManagerCtlMessage::BackupPartialReset(tx)) => {
|
||||
info!("resetting uploaded partial backup state");
|
||||
let res = self.backup_partial_reset().await;
|
||||
if let Err(ref e) = res {
|
||||
warn!("failed to reset partial backup state: {:?}", e);
|
||||
}
|
||||
if tx.send(res).is_err() {
|
||||
warn!("failed to send partial backup reset result, receiver dropped");
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// can't happen, we're holding the sender
|
||||
unreachable!();
|
||||
@@ -619,7 +685,11 @@ async fn sleep_until(option: &Option<tokio::time::Instant>) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn await_task_finish<T>(option: &mut Option<JoinHandle<T>>) -> Result<T, JoinError> {
|
||||
/// Future that resolves when the task is finished or never if the task is None.
|
||||
///
|
||||
/// Note: it accepts Option<&mut> instead of &mut Option<> because mapping the
|
||||
/// option to get the latter is hard.
|
||||
async fn await_task_finish<T>(option: Option<&mut JoinHandle<T>>) -> Result<T, JoinError> {
|
||||
if let Some(task) = option {
|
||||
task.await
|
||||
} else {
|
||||
|
||||
@@ -203,7 +203,7 @@ struct WalBackupTask {
|
||||
}
|
||||
|
||||
/// Offload single timeline.
|
||||
#[instrument(name = "WAL backup", skip_all, fields(ttid = %tli.ttid))]
|
||||
#[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
|
||||
async fn backup_task_main(
|
||||
tli: WalResidentTimeline,
|
||||
parallel_jobs: usize,
|
||||
@@ -315,7 +315,7 @@ async fn backup_lsn_range(
|
||||
anyhow::bail!("parallel_jobs must be >= 1");
|
||||
}
|
||||
|
||||
let remote_timeline_path = remote_timeline_path(&timeline.ttid)?;
|
||||
let remote_timeline_path = &timeline.remote_path;
|
||||
let start_lsn = *backup_lsn;
|
||||
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
|
||||
|
||||
@@ -328,11 +328,7 @@ async fn backup_lsn_range(
|
||||
loop {
|
||||
let added_task = match iter.next() {
|
||||
Some(s) => {
|
||||
uploads.push_back(backup_single_segment(
|
||||
s,
|
||||
timeline_dir,
|
||||
&remote_timeline_path,
|
||||
));
|
||||
uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path));
|
||||
true
|
||||
}
|
||||
None => false,
|
||||
|
||||
@@ -22,6 +22,7 @@ use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
|
||||
@@ -31,7 +32,7 @@ use crate::{
|
||||
safekeeper::Term,
|
||||
timeline::WalResidentTimeline,
|
||||
timeline_manager::StateSnapshot,
|
||||
wal_backup::{self, remote_timeline_path},
|
||||
wal_backup::{self},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
@@ -145,7 +146,7 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
struct PartialBackup {
|
||||
pub struct PartialBackup {
|
||||
wal_seg_size: usize,
|
||||
tli: WalResidentTimeline,
|
||||
conf: SafeKeeperConf,
|
||||
@@ -155,8 +156,25 @@ struct PartialBackup {
|
||||
state: State,
|
||||
}
|
||||
|
||||
// Read-only methods for getting segment names
|
||||
impl PartialBackup {
|
||||
pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup {
|
||||
let (_, persistent_state) = tli.get_state().await;
|
||||
let wal_seg_size = tli.get_wal_seg_size().await;
|
||||
|
||||
let local_prefix = tli.get_timeline_dir();
|
||||
let remote_timeline_path = tli.remote_path.clone();
|
||||
|
||||
PartialBackup {
|
||||
wal_seg_size,
|
||||
tli,
|
||||
state: persistent_state.partial_backup,
|
||||
conf,
|
||||
local_prefix,
|
||||
remote_timeline_path,
|
||||
}
|
||||
}
|
||||
|
||||
// Read-only methods for getting segment names
|
||||
fn segno(&self, lsn: Lsn) -> XLogSegNo {
|
||||
lsn.segment_number(self.wal_seg_size)
|
||||
}
|
||||
@@ -297,6 +315,18 @@ impl PartialBackup {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Prepend to the given segments remote prefix and delete them from the
|
||||
// remote storage.
|
||||
async fn delete_segments(&self, segments_to_delete: &Vec<String>) -> anyhow::Result<()> {
|
||||
info!("deleting objects: {:?}", segments_to_delete);
|
||||
let mut objects_to_delete = vec![];
|
||||
for seg in segments_to_delete.iter() {
|
||||
let remote_path = self.remote_timeline_path.join(seg);
|
||||
objects_to_delete.push(remote_path);
|
||||
}
|
||||
wal_backup::delete_objects(&objects_to_delete).await
|
||||
}
|
||||
|
||||
/// Delete all non-Uploaded segments from the remote storage. There should be only one
|
||||
/// Uploaded segment at a time.
|
||||
#[instrument(name = "gc", skip_all)]
|
||||
@@ -329,15 +359,8 @@ impl PartialBackup {
|
||||
);
|
||||
}
|
||||
|
||||
info!("deleting objects: {:?}", segments_to_delete);
|
||||
let mut objects_to_delete = vec![];
|
||||
for seg in segments_to_delete.iter() {
|
||||
let remote_path = self.remote_timeline_path.join(seg);
|
||||
objects_to_delete.push(remote_path);
|
||||
}
|
||||
|
||||
// removing segments from remote storage
|
||||
wal_backup::delete_objects(&objects_to_delete).await?;
|
||||
// execute the deletion
|
||||
self.delete_segments(&segments_to_delete).await?;
|
||||
|
||||
// now we can update the state on disk
|
||||
let new_state = {
|
||||
@@ -349,6 +372,27 @@ impl PartialBackup {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove uploaded segment(s) from the state and remote storage. Aimed for
|
||||
/// manual intervention, not normally needed.
|
||||
/// Returns list of segments which potentially existed in the remote storage.
|
||||
pub async fn reset(&mut self) -> anyhow::Result<Vec<String>> {
|
||||
let segments_to_delete = self
|
||||
.state
|
||||
.segments
|
||||
.iter()
|
||||
.map(|seg| seg.name.clone())
|
||||
.collect();
|
||||
|
||||
// First reset cfile state, and only then objects themselves. If the
|
||||
// later fails we might leave some garbage behind; that's ok for this
|
||||
// single time usage.
|
||||
let new_state = State { segments: vec![] };
|
||||
self.commit_state(new_state).await?;
|
||||
|
||||
self.delete_segments(&segments_to_delete).await?;
|
||||
Ok(segments_to_delete)
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if everything is uploaded and partial backup task doesn't need to run.
|
||||
@@ -372,38 +416,21 @@ pub(crate) fn needs_uploading(
|
||||
///
|
||||
/// When there is nothing more to do and the last segment was successfully uploaded, the task
|
||||
/// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
|
||||
#[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
|
||||
#[instrument(name = "partial_backup", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn main_task(
|
||||
tli: WalResidentTimeline,
|
||||
conf: SafeKeeperConf,
|
||||
limiter: RateLimiter,
|
||||
cancel: CancellationToken,
|
||||
) -> Option<PartialRemoteSegment> {
|
||||
debug!("started");
|
||||
let await_duration = conf.partial_backup_timeout;
|
||||
let mut first_iteration = true;
|
||||
|
||||
let (_, persistent_state) = tli.get_state().await;
|
||||
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
|
||||
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
|
||||
let wal_seg_size = tli.get_wal_seg_size().await;
|
||||
|
||||
let local_prefix = tli.get_timeline_dir();
|
||||
let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
|
||||
Ok(path) => path,
|
||||
Err(e) => {
|
||||
error!("failed to create remote path: {:?}", e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let mut backup = PartialBackup {
|
||||
wal_seg_size,
|
||||
tli,
|
||||
state: persistent_state.partial_backup,
|
||||
conf,
|
||||
local_prefix,
|
||||
remote_timeline_path,
|
||||
};
|
||||
let mut backup = PartialBackup::new(tli, conf).await;
|
||||
|
||||
debug!("state: {:?}", backup.state);
|
||||
|
||||
@@ -433,6 +460,10 @@ pub async fn main_task(
|
||||
&& flush_lsn_rx.borrow().term == seg.term
|
||||
{
|
||||
// we have nothing to do, the last segment is already uploaded
|
||||
debug!(
|
||||
"exiting, uploaded up to term={} flush_lsn={} commit_lsn={}",
|
||||
seg.term, seg.flush_lsn, seg.commit_lsn
|
||||
);
|
||||
return Some(seg.clone());
|
||||
}
|
||||
}
|
||||
@@ -444,6 +475,10 @@ pub async fn main_task(
|
||||
info!("timeline canceled");
|
||||
return None;
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
info!("task canceled");
|
||||
return None;
|
||||
}
|
||||
_ = flush_lsn_rx.changed() => {}
|
||||
}
|
||||
}
|
||||
@@ -470,6 +505,10 @@ pub async fn main_task(
|
||||
info!("timeline canceled");
|
||||
return None;
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
info!("task canceled");
|
||||
return None;
|
||||
}
|
||||
_ = commit_lsn_rx.changed() => {}
|
||||
_ = flush_lsn_rx.changed() => {
|
||||
let segno = backup.segno(flush_lsn_rx.borrow().lsn);
|
||||
@@ -492,7 +531,13 @@ pub async fn main_task(
|
||||
}
|
||||
|
||||
// limit concurrent uploads
|
||||
let _upload_permit = limiter.acquire_partial_backup().await;
|
||||
let _upload_permit = tokio::select! {
|
||||
acq = limiter.acquire_partial_backup() => acq,
|
||||
_ = cancel.cancelled() => {
|
||||
info!("task canceled");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let prepared = backup.prepare_upload().await;
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
|
||||
@@ -37,6 +37,8 @@ use pq_proto::SystemId;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
pub trait Storage {
|
||||
// Last written LSN.
|
||||
fn write_lsn(&self) -> Lsn;
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
@@ -329,6 +331,10 @@ impl PhysicalStorage {
|
||||
}
|
||||
|
||||
impl Storage for PhysicalStorage {
|
||||
// Last written LSN.
|
||||
fn write_lsn(&self) -> Lsn {
|
||||
self.write_lsn
|
||||
}
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
|
||||
@@ -175,6 +175,10 @@ impl DiskWALStorage {
|
||||
}
|
||||
|
||||
impl wal_storage::Storage for DiskWALStorage {
|
||||
// Last written LSN.
|
||||
fn write_lsn(&self) -> Lsn {
|
||||
self.write_lsn
|
||||
}
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
-- This file should undo anything in `up.sql`
|
||||
DROP TABLE safekeepers;
|
||||
@@ -0,0 +1,15 @@
|
||||
-- started out as a copy of cplane schema, removed the unnecessary columns.
|
||||
CREATE TABLE safekeepers (
|
||||
-- the surrogate identifier defined by control plane database sequence
|
||||
id BIGINT PRIMARY KEY,
|
||||
region_id TEXT NOT NULL,
|
||||
version BIGINT NOT NULL,
|
||||
-- the natural id on whatever cloud platform, not needed in storage controller
|
||||
-- instance_id TEXT UNIQUE NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
port INTEGER NOT NULL,
|
||||
active BOOLEAN NOT NULL DEFAULT false,
|
||||
-- projects_count INTEGER NOT NULL DEFAULT 0,
|
||||
http_port INTEGER NOT NULL,
|
||||
availability_zone_id TEXT NOT NULL
|
||||
);
|
||||
@@ -2,6 +2,7 @@ use crate::metrics::{
|
||||
HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
|
||||
METRICS_REGISTRY,
|
||||
};
|
||||
use crate::persistence::SafekeeperPersistence;
|
||||
use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
@@ -767,6 +768,55 @@ impl From<ReconcileError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the safekeeper record by instance id, or 404.
|
||||
///
|
||||
/// Not used by anything except manual testing.
|
||||
async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let id = parse_request_param::<i64>(&req, "id")?;
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
let res = state.service.get_safekeeper(id).await;
|
||||
|
||||
match res {
|
||||
Ok(b) => json_response(StatusCode::OK, b),
|
||||
Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
|
||||
Err(ApiError::NotFound("unknown instance_id".into()))
|
||||
}
|
||||
Err(other) => Err(other.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Used as part of deployment scripts.
|
||||
///
|
||||
/// Assumes information is only relayed to storage controller after first selecting an unique id on
|
||||
/// control plane database, which means we have an id field in the request and payload.
|
||||
async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let body = json_request::<SafekeeperPersistence>(&mut req).await?;
|
||||
let id = parse_request_param::<i64>(&req, "id")?;
|
||||
|
||||
if id != body.id {
|
||||
// it should be repeated
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"id mismatch: url={id:?}, body={:?}",
|
||||
body.id
|
||||
)));
|
||||
}
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
state.service.upsert_safekeeper(body).await?;
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
|
||||
/// be allowed to run if Service has finished its initial reconciliation.
|
||||
async fn tenant_service_handler<R, H>(
|
||||
@@ -1127,6 +1177,13 @@ pub fn make_router(
|
||||
.put("/control/v1/step_down", |r| {
|
||||
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
|
||||
})
|
||||
.get("/control/v1/safekeeper/:id", |r| {
|
||||
named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
|
||||
})
|
||||
.post("/control/v1/safekeeper/:id", |r| {
|
||||
// id is in the body
|
||||
named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
|
||||
@@ -11,8 +11,8 @@ use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::{
|
||||
Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
|
||||
RECONCILER_CONCURRENCY_DEFAULT,
|
||||
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT,
|
||||
MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
|
||||
};
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -104,6 +104,10 @@ struct Cli {
|
||||
// a pageserver
|
||||
#[arg(long)]
|
||||
max_secondary_lag_bytes: Option<u64>,
|
||||
|
||||
// Period with which to send heartbeats to registered nodes
|
||||
#[arg(long)]
|
||||
heartbeat_interval: Option<humantime::Duration>,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -285,6 +289,10 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
split_threshold: args.split_threshold,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
heartbeat_interval: args
|
||||
.heartbeat_interval
|
||||
.map(humantime::Duration::into)
|
||||
.unwrap_or(HEARTBEAT_INTERVAL_DEFAULT),
|
||||
address_for_peers: args.address_for_peers,
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
|
||||
@@ -938,6 +938,48 @@ impl Persistence {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_get(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<SafekeeperPersistence, DatabaseError> {
|
||||
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
|
||||
self.with_conn(move |conn| -> DatabaseResult<SafekeeperPersistence> {
|
||||
Ok(safekeepers
|
||||
.filter(id_column.eq(&id))
|
||||
.select(SafekeeperPersistence::as_select())
|
||||
.get_result(conn)?)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_upsert(
|
||||
&self,
|
||||
record: SafekeeperPersistence,
|
||||
) -> Result<(), DatabaseError> {
|
||||
use crate::schema::safekeepers::dsl::*;
|
||||
|
||||
self.with_conn(move |conn| -> DatabaseResult<()> {
|
||||
let bind = record.as_insert_or_update();
|
||||
|
||||
let inserted_updated = diesel::insert_into(safekeepers)
|
||||
.values(&bind)
|
||||
.on_conflict(id)
|
||||
.do_update()
|
||||
.set(&bind)
|
||||
.execute(conn)?;
|
||||
|
||||
if inserted_updated != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
@@ -1073,3 +1115,47 @@ pub(crate) struct ControllerPersistence {
|
||||
pub(crate) address: String,
|
||||
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Queryable, Selectable, Eq, PartialEq, Debug, Clone)]
|
||||
#[diesel(table_name = crate::schema::safekeepers)]
|
||||
pub(crate) struct SafekeeperPersistence {
|
||||
pub(crate) id: i64,
|
||||
pub(crate) region_id: String,
|
||||
/// 1 is special, it means just created (not currently posted to storcon).
|
||||
/// Zero or negative is not really expected.
|
||||
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
|
||||
pub(crate) version: i64,
|
||||
pub(crate) host: String,
|
||||
pub(crate) port: i32,
|
||||
pub(crate) active: bool,
|
||||
pub(crate) http_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
}
|
||||
|
||||
impl SafekeeperPersistence {
|
||||
fn as_insert_or_update(&self) -> InsertUpdateSafekeeper<'_> {
|
||||
InsertUpdateSafekeeper {
|
||||
id: self.id,
|
||||
region_id: &self.region_id,
|
||||
version: self.version,
|
||||
host: &self.host,
|
||||
port: self.port,
|
||||
active: self.active,
|
||||
http_port: self.http_port,
|
||||
availability_zone_id: &self.availability_zone_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Insertable, AsChangeset)]
|
||||
#[diesel(table_name = crate::schema::safekeepers)]
|
||||
struct InsertUpdateSafekeeper<'a> {
|
||||
id: i64,
|
||||
region_id: &'a str,
|
||||
version: i64,
|
||||
host: &'a str,
|
||||
port: i32,
|
||||
active: bool,
|
||||
http_port: i32,
|
||||
availability_zone_id: &'a str,
|
||||
}
|
||||
|
||||
@@ -45,3 +45,17 @@ diesel::table! {
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(controllers, metadata_health, nodes, tenant_shards,);
|
||||
|
||||
diesel::table! {
|
||||
safekeepers {
|
||||
id -> Int8,
|
||||
region_id -> Text,
|
||||
version -> Int8,
|
||||
instance_id -> Text,
|
||||
host -> Text,
|
||||
port -> Int4,
|
||||
active -> Bool,
|
||||
http_port -> Int4,
|
||||
availability_zone_id -> Text,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +121,9 @@ pub const MAX_OFFLINE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
|
||||
/// being handled on the pageserver side.
|
||||
pub const MAX_WARMING_UP_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
|
||||
|
||||
/// How often to send heartbeats to registered nodes?
|
||||
pub const HEARTBEAT_INTERVAL_DEFAULT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
enum TenantOperations {
|
||||
Create,
|
||||
@@ -326,6 +329,8 @@ pub struct Config {
|
||||
// upgraded to primary.
|
||||
pub max_secondary_lag_bytes: Option<u64>,
|
||||
|
||||
pub heartbeat_interval: Duration,
|
||||
|
||||
pub address_for_peers: Option<Uri>,
|
||||
|
||||
pub start_as_candidate: bool,
|
||||
@@ -909,9 +914,7 @@ impl Service {
|
||||
async fn spawn_heartbeat_driver(&self) {
|
||||
self.startup_complete.clone().wait().await;
|
||||
|
||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
|
||||
let mut interval = tokio::time::interval(self.config.heartbeat_interval);
|
||||
while !self.cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => { }
|
||||
@@ -6473,4 +6476,18 @@ impl Service {
|
||||
|
||||
global_observed
|
||||
}
|
||||
|
||||
pub(crate) async fn get_safekeeper(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
|
||||
self.persistence.safekeeper_get(id).await
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
&self,
|
||||
record: crate::persistence::SafekeeperPersistence,
|
||||
) -> Result<(), DatabaseError> {
|
||||
self.persistence.safekeeper_upsert(record).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,7 +422,7 @@ fn stream_objects_with_retries<'a>(
|
||||
let yield_err = if err.is_permanent() {
|
||||
true
|
||||
} else {
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
let backoff_time = 1 << trial.min(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
trial += 1;
|
||||
trial == MAX_RETRIES - 1
|
||||
@@ -473,7 +473,7 @@ async fn list_objects_with_retries(
|
||||
s3_target.delimiter,
|
||||
DisplayErrorContext(e),
|
||||
);
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
let backoff_time = 1 << trial.min(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
}
|
||||
}
|
||||
@@ -492,7 +492,7 @@ async fn download_object_with_retries(
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
error!("Failed to download object for key {key}: {e}");
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
let backoff_time = 1 << trial.min(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
continue;
|
||||
}
|
||||
@@ -508,7 +508,7 @@ async fn download_object_with_retries(
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to stream object body for key {key}: {e}");
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
let backoff_time = 1 << trial.min(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1164,6 +1164,8 @@ class NeonEnv:
|
||||
"listen_http_addr": f"localhost:{pageserver_port.http}",
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
# Default which can be overriden with `NeonEnvBuilder.pageserver_config_override`
|
||||
"availability_zone": "us-east-2a",
|
||||
}
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
@@ -1192,11 +1194,7 @@ class NeonEnv:
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(
|
||||
self,
|
||||
ps_id,
|
||||
port=pageserver_port,
|
||||
)
|
||||
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
|
||||
)
|
||||
cfg["pageservers"].append(ps_cfg)
|
||||
|
||||
@@ -2400,6 +2398,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
"listen_http_port": node.service_port.http,
|
||||
"listen_pg_addr": "localhost",
|
||||
"listen_pg_port": node.service_port.pg,
|
||||
"availability_zone_id": node.az_id,
|
||||
}
|
||||
log.info(f"node_register({body})")
|
||||
self.request(
|
||||
@@ -2846,6 +2845,29 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
|
||||
raise AssertionError("unreachable")
|
||||
|
||||
def on_safekeeper_deploy(self, id: int, body: dict[str, Any]):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.api}/control/v1/safekeeper/{id}",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
json=body,
|
||||
)
|
||||
|
||||
def get_safekeeper(self, id: int) -> Optional[dict[str, Any]]:
|
||||
try:
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.api}/control/v1/safekeeper/{id}",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
json = response.json()
|
||||
assert isinstance(json, dict)
|
||||
return json
|
||||
except StorageControllerApiException as e:
|
||||
if e.status_code == 404:
|
||||
return None
|
||||
raise e
|
||||
|
||||
def __enter__(self) -> "NeonStorageController":
|
||||
return self
|
||||
|
||||
@@ -2923,10 +2945,11 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
|
||||
TEMP_FILE_SUFFIX = "___temp"
|
||||
|
||||
def __init__(self, env: NeonEnv, id: int, port: PageserverPort):
|
||||
def __init__(self, env: NeonEnv, id: int, port: PageserverPort, az_id: str):
|
||||
super().__init__(host="localhost", port=port.pg, user="cloud_admin")
|
||||
self.env = env
|
||||
self.id = id
|
||||
self.az_id = az_id
|
||||
self.running = False
|
||||
self.service_port = port
|
||||
self.version = env.get_binary_version("pageserver")
|
||||
@@ -4553,6 +4576,8 @@ class Safekeeper(LogUtils):
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> Path:
|
||||
return self.data_dir / str(tenant_id) / str(timeline_id)
|
||||
|
||||
# List partial uploaded segments of this safekeeper. Works only for
|
||||
# RemoteStorageKind.LOCAL_FS.
|
||||
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
tline_path = (
|
||||
self.env.repo_dir
|
||||
@@ -4562,9 +4587,11 @@ class Safekeeper(LogUtils):
|
||||
/ str(timeline_id)
|
||||
)
|
||||
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
|
||||
return self._list_segments_in_dir(
|
||||
segs = self._list_segments_in_dir(
|
||||
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
|
||||
)
|
||||
mysegs = [s for s in segs if f"sk{self.id}" in s]
|
||||
return mysegs
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
|
||||
@@ -109,9 +109,6 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
# controller's attempts to notify the endpoint).
|
||||
".*reconciler.*neon_local notification hook failed.*",
|
||||
".*reconciler.*neon_local error.*",
|
||||
# Neon local does not provide pageserver with an AZ
|
||||
# TODO: remove this once neon local does so
|
||||
".*registering without specific availability zone id.*",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -174,6 +174,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def debug_dump_timeline(
|
||||
self, timeline_id: TimelineId, params: Optional[Dict[str, str]] = None
|
||||
) -> Any:
|
||||
params = params or {}
|
||||
params["timeline_id"] = str(timeline_id)
|
||||
dump = self.debug_dump(params)
|
||||
return dump["timelines"][0]
|
||||
|
||||
def get_partial_backup(self, timeline_id: TimelineId) -> Any:
|
||||
dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"})
|
||||
return dump["control_file"]["partial_backup"]
|
||||
|
||||
def get_eviction_state(self, timeline_id: TimelineId) -> Any:
|
||||
dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"})
|
||||
return dump["control_file"]["eviction_state"]
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
@@ -228,6 +244,14 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def backup_partial_reset(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/backup_partial_reset",
|
||||
json={},
|
||||
)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
|
||||
|
||||
@@ -31,7 +31,7 @@ from fixtures.pageserver.utils import (
|
||||
remote_storage_delete_key,
|
||||
timeline_delete_wait_completed,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.pg_version import PgVersion, run_only_on_default_postgres
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.storage_controller_proxy import StorageControllerProxy
|
||||
@@ -2330,3 +2330,69 @@ def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder)
|
||||
connect=0, # Disable retries: we want to see the 503
|
||||
)
|
||||
).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
fake_id = 5
|
||||
|
||||
target = env.storage_controller
|
||||
|
||||
assert target.get_safekeeper(fake_id) is None
|
||||
|
||||
body = {
|
||||
"active": True,
|
||||
"id": fake_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "safekeeper-333.us-east-2.aws.neon.build",
|
||||
"port": 6401,
|
||||
"http_port": 7676,
|
||||
"version": 5957,
|
||||
"availability_zone_id": "us-east-2b",
|
||||
}
|
||||
|
||||
target.on_safekeeper_deploy(fake_id, body)
|
||||
|
||||
inserted = target.get_safekeeper(fake_id)
|
||||
assert inserted is not None
|
||||
assert eq_safekeeper_records(body, inserted)
|
||||
|
||||
# error out if pk is changed (unexpected)
|
||||
with pytest.raises(StorageControllerApiException) as exc:
|
||||
different_pk = dict(body)
|
||||
different_pk["id"] = 4
|
||||
assert different_pk["id"] != body["id"]
|
||||
target.on_safekeeper_deploy(fake_id, different_pk)
|
||||
assert exc.value.status_code == 400
|
||||
|
||||
inserted_again = target.get_safekeeper(fake_id)
|
||||
assert inserted_again is not None
|
||||
assert eq_safekeeper_records(inserted, inserted_again)
|
||||
|
||||
# the most common case, version goes up:
|
||||
assert isinstance(body["version"], int)
|
||||
body["version"] += 1
|
||||
target.on_safekeeper_deploy(fake_id, body)
|
||||
inserted_now = target.get_safekeeper(fake_id)
|
||||
assert inserted_now is not None
|
||||
|
||||
assert eq_safekeeper_records(body, inserted_now)
|
||||
|
||||
|
||||
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
compared = [dict(a), dict(b)]
|
||||
|
||||
masked_keys = ["created_at", "updated_at"]
|
||||
|
||||
for d in compared:
|
||||
# keep deleting these in case we are comparing the body as it will be uploaded by real scripts
|
||||
for key in masked_keys:
|
||||
if key in d:
|
||||
del d[key]
|
||||
|
||||
return compared[0] == compared[1]
|
||||
|
||||
@@ -372,8 +372,10 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id: TenantId = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Multiple creation requests which race will generate this error
|
||||
# Multiple creation requests which race will generate this error on the pageserver
|
||||
# and storage controller respectively
|
||||
env.pageserver.allowed_errors.append(".*Conflict: Tenant is already being modified.*")
|
||||
env.storage_controller.allowed_errors.append(".*Conflict: Tenant is already being modified.*")
|
||||
|
||||
# Tenant creation requests which arrive out of order will generate complaints about
|
||||
# generation nubmers out of order.
|
||||
|
||||
@@ -72,6 +72,17 @@ def wait_lsn_force_checkpoint(
|
||||
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint_at_sk(
|
||||
safekeeper: Safekeeper,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id)
|
||||
wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint_at(
|
||||
lsn: Lsn,
|
||||
tenant_id: TenantId,
|
||||
@@ -79,6 +90,10 @@ def wait_lsn_force_checkpoint_at(
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
"""
|
||||
Wait until pageserver receives given lsn, force checkpoint and wait for
|
||||
upload, i.e. remote_consistent_lsn advancement.
|
||||
"""
|
||||
pageserver_conn_options = pageserver_conn_options or {}
|
||||
|
||||
auth_token = None
|
||||
@@ -2330,6 +2345,77 @@ def test_s3_eviction(
|
||||
assert event_metrics_seen
|
||||
|
||||
|
||||
# Test resetting uploaded partial segment state.
|
||||
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
# We want to upload/evict quickly, but not too quickly to check that s3 is
|
||||
# empty before next round of upload happens.
|
||||
# Note: this test fails with --delete-offloaded-wal, this is expected.
|
||||
neon_env_builder.safekeeper_extra_opts = [
|
||||
"--enable-offload",
|
||||
"--partial-backup-timeout",
|
||||
"1s",
|
||||
"--control-file-save-interval",
|
||||
"1s",
|
||||
"--eviction-min-resident=1s",
|
||||
]
|
||||
# XXX: pageserver currently connects to safekeeper as long as connection
|
||||
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
|
||||
# causing uneviction. It should be fixed to not reconnect if last
|
||||
# remote_consistent_lsn is communicated and there is nothing to fetch. Make
|
||||
# value lower to speed up the test.
|
||||
initial_tenant_conf = {
|
||||
"lagging_wal_timeout": "1s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.stop()
|
||||
sk = env.safekeepers[0]
|
||||
# eviction won't happen until remote_consistent_lsn catches up.
|
||||
wait_lsn_force_checkpoint_at_sk(sk, tenant_id, timeline_id, env.pageserver)
|
||||
|
||||
http_cli = env.safekeepers[0].http_client()
|
||||
|
||||
# wait until eviction happens
|
||||
def evicted():
|
||||
eviction_state = http_cli.get_eviction_state(timeline_id)
|
||||
log.info(f"eviction_state: {eviction_state}")
|
||||
if isinstance(eviction_state, str) and eviction_state == "Present":
|
||||
raise Exception("eviction didn't happen yet")
|
||||
|
||||
wait_until(30, 1, evicted)
|
||||
# it must have uploaded something
|
||||
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
log.info(f"uploaded segments before reset: {uploaded_segs}")
|
||||
assert len(uploaded_segs) > 0
|
||||
|
||||
reset_res = http_cli.backup_partial_reset(tenant_id, timeline_id)
|
||||
log.info(f"reset res: {reset_res}")
|
||||
|
||||
# Backup_partial_reset must have reset the state and dropped s3 segment.
|
||||
#
|
||||
# Note: if listing takes more than --partial-backup-timeout test becomes
|
||||
# flaky because file might be reuploaded. With local fs it shouldn't be an
|
||||
# issue, but can add retry if this appears.
|
||||
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
log.info(f"uploaded segments after reset: {uploaded_segs}")
|
||||
assert len(uploaded_segs) == 0
|
||||
|
||||
# calling second time should be ok
|
||||
http_cli.backup_partial_reset(tenant_id, timeline_id)
|
||||
|
||||
# inserting data should be ok
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("insert into t values(1, 'hehe')")
|
||||
|
||||
|
||||
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that pulling timeline from a SK with an uploaded partial segment
|
||||
@@ -2357,7 +2443,16 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
|
||||
"--eviction-min-resident=500ms",
|
||||
]
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
|
||||
# XXX: pageserver currently connects to safekeeper as long as connection
|
||||
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
|
||||
# causing uneviction. It should be fixed to not reconnect if last
|
||||
# remote_consistent_lsn is communicated and there is nothing to fetch. Until
|
||||
# this is fixed make value lower to speed up the test.
|
||||
initial_tenant_conf = {
|
||||
"lagging_wal_timeout": "1s",
|
||||
"checkpoint_timeout": "100ms",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -2421,7 +2516,7 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
|
||||
endpoint.start(safekeepers=[2, 3])
|
||||
|
||||
def new_partial_segment_uploaded():
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
segs = dst_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
for seg in segs:
|
||||
if "partial" in seg and "sk3" in seg:
|
||||
return seg
|
||||
|
||||
Reference in New Issue
Block a user