Merge pull request #4536 from neondatabase/releases/2023-06-20

Release 2023-06-20 (actually 2023-06-21)
This commit is contained in:
Christian Schwarz
2023-06-21 14:19:14 +03:00
committed by GitHub
10 changed files with 273 additions and 48 deletions

View File

@@ -914,6 +914,20 @@ jobs:
exit 1
fi
- name: Create tag "release-${{ needs.tag.outputs.build-tag }}"
if: github.ref_name == 'release'
uses: actions/github-script@v6
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}",
sha: context.sha,
})
promote-compatibility-data:
runs-on: [ self-hosted, gen3, small ]
container:

View File

@@ -3,6 +3,7 @@ name: Create Release Branch
on:
schedule:
- cron: '0 10 * * 2'
workflow_dispatch:
jobs:
create_release_branch:

View File

@@ -148,17 +148,17 @@ async fn import_rel(
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
//
// FIXME: use proper error type for this, instead of parsing the error message.
// Or better yet, keep track of which relations we've already created
// FIXME: Keep track of which relations we've already created?
// https://github.com/neondatabase/neon/issues/3309
if let Err(e) = modification
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
return Err(e);
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
}
_ => return Err(e.into()),
}
}

View File

@@ -43,6 +43,16 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
AlreadyExists,
#[error("invalid relnode")]
InvalidRelnode,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
/// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
@@ -101,9 +111,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
@@ -148,9 +158,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
@@ -193,9 +203,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
// first try to lookup relation in cache
@@ -724,7 +734,7 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
Ok(())
}
@@ -751,7 +761,7 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -875,32 +885,38 @@ impl<'a> DatadirModification<'a> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
) -> Result<(), RelationError> {
if rel.relnode == 0 {
return Err(RelationError::AlreadyExists);
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
// Didn't exist. Update dbdir
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
let buf = DbDirectory::ser(&dbdir)?;
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
anyhow::bail!("rel {rel} already exists");
return Err(RelationError::AlreadyExists);
}
self.put(
rel_dir_key,
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
@@ -925,7 +941,7 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let last_lsn = self.tline.get_last_record_lsn();
if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
let size_key = rel_size_to_key(rel);
@@ -956,7 +972,7 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
// Put size
let size_key = rel_size_to_key(rel);
@@ -977,7 +993,7 @@ impl<'a> DatadirModification<'a> {
/// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
// Remove it from the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);

View File

@@ -38,8 +38,8 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}

View File

@@ -15,6 +15,7 @@ use pageserver_api::models::{
TimelineState,
};
use remote_storage::GenericRemoteStorage;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -3003,7 +3004,7 @@ impl Timeline {
frozen_layer: &Arc<InMemoryLayer>,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
let span = tracing::info_span!("blocking");
let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({
let new_delta: DeltaLayer = tokio::task::spawn_blocking({
let _g = span.entered();
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
@@ -3015,29 +3016,31 @@ impl Timeline {
// Sync it to disk.
//
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
// new layer files are durable.
//
// NB: timeline dir must be synced _after_ the file contents are durable.
// So, two separate fsyncs are required, they mustn't be batched.
//
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
// files to flush, the fsync overhead can be reduces as follows:
// 1. write them all to temporary file names
// 2. fsync them
// 3. rename to the final name
// 4. fsync the parent directory.
// Note that (1),(2),(3) today happen inside write_to_disk().
par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
par_fsync::par_fsync(&[self_clone
.conf
.timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)])
.context("fsync of timeline dir")?;
let sz = new_delta_path.metadata()?.len();
anyhow::Ok((new_delta, sz))
anyhow::Ok(new_delta)
}
})
.await
.context("spawn_blocking")??;
let new_delta_name = new_delta.filename();
let sz = new_delta.desc.file_size;
// Add it to the layer map
let l = Arc::new(new_delta);
@@ -3051,9 +3054,8 @@ impl Timeline {
batch_updates.insert_historic(l.layer_desc().clone(), l);
batch_updates.flush();
// update the timeline's physical size
self.metrics.resident_physical_size_gauge.add(sz);
// update metrics
self.metrics.resident_physical_size_gauge.add(sz);
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
@@ -3332,6 +3334,130 @@ impl From<anyhow::Error> for CompactionError {
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
#[derive(Default)]
enum DurationRecorder {
#[default]
NotStarted,
Recorded(RecordedDuration, tokio::time::Instant),
}
impl DurationRecorder {
pub fn till_now(&self) -> DurationRecorder {
match self {
DurationRecorder::NotStarted => {
panic!("must only call on recorded measurements")
}
DurationRecorder::Recorded(_, ended) => {
let now = tokio::time::Instant::now();
DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
}
}
}
pub fn into_recorded(self) -> Option<RecordedDuration> {
match self {
DurationRecorder::NotStarted => None,
DurationRecorder::Recorded(recorded, _) => Some(recorded),
}
}
}
#[derive(Default)]
struct CompactLevel0Phase1StatsBuilder {
version: Option<u64>,
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
first_read_lock_acquisition_micros: DurationRecorder,
get_level0_deltas_plus_drop_lock_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
time_spent_between_locks: DurationRecorder,
second_read_lock_acquisition_micros: DurationRecorder,
second_read_lock_held_micros: DurationRecorder,
sort_holes_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
new_deltas_count: Option<usize>,
new_deltas_size: Option<u64>,
}
#[serde_as]
#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
first_read_lock_acquisition_micros: RecordedDuration,
get_level0_deltas_plus_drop_lock_micros: RecordedDuration,
level0_deltas_count: usize,
time_spent_between_locks: RecordedDuration,
second_read_lock_acquisition_micros: RecordedDuration,
second_read_lock_held_micros: RecordedDuration,
sort_holes_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
new_deltas_count: usize,
new_deltas_size: u64,
}
impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
type Error = anyhow::Error;
fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
let CompactLevel0Phase1StatsBuilder {
version,
tenant_id,
timeline_id,
first_read_lock_acquisition_micros,
get_level0_deltas_plus_drop_lock_micros,
level0_deltas_count,
time_spent_between_locks,
second_read_lock_acquisition_micros,
second_read_lock_held_micros,
sort_holes_micros,
write_layer_files_micros,
new_deltas_count,
new_deltas_size,
} = value;
Ok(CompactLevel0Phase1Stats {
version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?,
tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?,
timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?,
first_read_lock_acquisition_micros: first_read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?,
get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros
.into_recorded()
.ok_or_else(|| {
anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set")
})?,
level0_deltas_count: level0_deltas_count
.ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?,
time_spent_between_locks: time_spent_between_locks
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?,
second_read_lock_acquisition_micros: second_read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?,
second_read_lock_held_micros: second_read_lock_held_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?,
sort_holes_micros: sort_holes_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?,
write_layer_files_micros: write_layer_files_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?,
new_deltas_count: new_deltas_count
.ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?,
new_deltas_size: new_deltas_size
.ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?,
})
}
}
impl Timeline {
/// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
///
@@ -3344,9 +3470,23 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let mut stats = CompactLevel0Phase1StatsBuilder {
version: Some(1),
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
..Default::default()
};
let begin = tokio::time::Instant::now();
let layers = self.layers.read().await;
let now = tokio::time::Instant::now();
stats.first_read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
stats.level0_deltas_count = Some(level0_deltas.len());
stats.get_level0_deltas_plus_drop_lock_micros =
stats.first_read_lock_acquisition_micros.till_now();
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
@@ -3467,7 +3607,9 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now();
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3501,9 +3643,11 @@ impl Timeline {
prev = Some(next_key.next());
}
drop(layers);
stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now();
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
@@ -3663,8 +3807,26 @@ impl Timeline {
layer_paths.pop().unwrap();
}
stats.write_layer_files_micros = stats.sort_holes_micros.till_now();
stats.new_deltas_count = Some(new_layers.len());
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
.and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
{
Ok(stats_json) => {
info!(
stats_json = stats_json.as_str(),
"compact_level0_phase1 stats available"
)
}
Err(e) => {
warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
}
}
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,

View File

@@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
@@ -1082,7 +1082,10 @@ impl<'a> WalIngest<'a> {
.await?
{
// create it with 0 size initially, the logic below will extend it
modification.put_rel_creation(rel, 0, ctx).await?;
modification
.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
0
} else {
self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import List
from typing import Any, List, MutableMapping, cast
import pytest
from _pytest.config import Config
@@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]):
# Rerun 3 times = 1 original run + 2 reruns
log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times")
item.add_marker(pytest.mark.flaky(reruns=2))
# pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns),
# we can workaround it by setting `timeout_func_only` to True[1].
# Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2],
# but we still can do it using pytest marker.
#
# - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99
# - [2] https://github.com/pytest-dev/pytest-timeout/issues/142
timeout_marker = item.get_closest_marker("timeout")
if timeout_marker is not None:
kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs)
kwargs["func_only"] = True

View File

@@ -575,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
# Terminate first all safekeepers to prevent communication unexpectantly
# advancing peer_horizon_lsn.
for sk in env.safekeepers:
cli = sk.http_client()
cli.timeline_delete_force(tenant_id, timeline_id)
# restart safekeeper to clear its in-memory state
sk.stop().start()
sk.stop()
# wait all potenital in flight pushes to broker arrive before starting
# safekeepers (even without sleep, it is very unlikely they are not
# delivered yet).
time.sleep(1)
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
f_partial_path = (
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name

View File

@@ -1,3 +1,5 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
@@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
# Kills one of the safekeepers and ensures that only the active ones are printed in the state.
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
# Trigger WAL wait timeout faster
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
neon_env_builder.pageserver_config_override = """
wait_lsn_timeout = "1s"
tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"}
"""
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
neon_env_builder.safekeepers_id_start = 12345
neon_env_builder.num_safekeepers = 3
@@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
stopped_safekeeper_id = stopped_safekeeper.id
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
stopped_safekeeper.stop()
# sleep until stopped safekeeper is removed from candidates
time.sleep(2)
# Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats.
insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)