Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/layermap-ref-2

This commit is contained in:
Alex Chi
2023-06-20 09:12:15 -04:00
38 changed files with 872 additions and 410 deletions

View File

@@ -623,51 +623,6 @@ jobs:
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
neon-image-depot:
# For testing this will run side-by-side for a few merges.
# This action is not really optimized yet, but gets the job done
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
permissions:
contents: read
id-token: write
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Setup go
uses: actions/setup-go@v3
with:
go-version: '1.19'
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Install Crane & ECR helper
run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Build and push
uses: depot/build-push-action@v1
with:
# if no depot.json file is at the root of your repo, you must specify the project id
project: nrdv0s4kcs
push: true
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
build-args: |
GIT_VERSION=${{ github.sha }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
@@ -959,6 +914,20 @@ jobs:
exit 1
fi
- name: Create tag "release-${{ needs.tag.outputs.build-tag }}"
if: github.ref_name == 'release'
uses: actions/github-script@v6
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}",
sha: context.sha,
})
promote-compatibility-data:
runs-on: [ self-hosted, gen3, small ]
container:

View File

@@ -3,6 +3,7 @@ name: Create Release Branch
on:
schedule:
- cron: '0 10 * * 2'
workflow_dispatch:
jobs:
create_release_branch:

10
Cargo.lock generated
View File

@@ -2770,7 +2770,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2783,7 +2783,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"native-tls",
"tokio",
@@ -2794,7 +2794,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -2812,7 +2812,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4272,7 +4272,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -140,11 +140,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -180,7 +180,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,

View File

@@ -148,4 +148,14 @@ mod tests {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
}
#[test]
fn parse_unknown_fields() {
// Forward compatibility test
let file = File::open("tests/cluster_spec.json").unwrap();
let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
let ob = json.as_object_mut().unwrap();
ob.insert("unknown_field_123123123".into(), "hello".into());
let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
}
}

View File

@@ -34,6 +34,8 @@ use crate::{
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
pub(super) mod metrics {
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
@@ -424,17 +426,33 @@ impl RemoteStorage for S3Bucket {
delete_objects.push(obj_id);
}
metrics::inc_delete_objects(paths.len() as u64);
self.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(Delete::builder().set_objects(Some(delete_objects)).build())
.send()
.await
.map_err(|e| {
metrics::inc_delete_objects_fail(paths.len() as u64);
e
})?;
for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
metrics::inc_delete_objects(chunk.len() as u64);
let resp = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
.send()
.await;
match resp {
Ok(resp) => {
if let Some(errors) = resp.errors {
metrics::inc_delete_objects_fail(errors.len() as u64);
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
}
}
Err(e) => {
metrics::inc_delete_objects_fail(chunk.len() as u64);
return Err(e.into());
}
}
}
Ok(())
}

View File

@@ -24,6 +24,7 @@ enum RemoteOp {
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
DeleteObjects(Vec<RemotePath>),
}
impl UnreliableWrapper {
@@ -121,8 +122,18 @@ impl RemoteStorage for UnreliableWrapper {
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
let mut error_counter = 0;
for path in paths {
self.delete(path).await?
if (self.delete(path).await).is_err() {
error_counter += 1;
}
}
if error_counter > 0 {
return Err(anyhow::anyhow!(
"failed to delete {} objects",
error_counter
));
}
Ok(())
}

View File

@@ -148,17 +148,17 @@ async fn import_rel(
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
//
// FIXME: use proper error type for this, instead of parsing the error message.
// Or better yet, keep track of which relations we've already created
// FIXME: Keep track of which relations we've already created?
// https://github.com/neondatabase/neon/issues/3309
if let Err(e) = modification
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
return Err(e);
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
}
_ => return Err(e.into()),
}
}

View File

@@ -1,4 +1,3 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
@@ -95,21 +94,19 @@ static READ_NUM_FS_LAYERS: Lazy<HistogramVec> = Lazy::new(|| {
});
// Metrics collected on operations on the storage repository.
static RECONSTRUCT_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
pub static RECONSTRUCT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value",
&["tenant_id", "timeline_id"],
"Time spent in reconstruct_value (reconstruct a page from deltas)",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
@@ -124,11 +121,10 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
@@ -752,10 +748,7 @@ impl StorageTimeMetrics {
pub struct TimelineMetrics {
tenant_id: String,
timeline_id: String,
pub reconstruct_time_histo: Histogram,
pub get_reconstruct_data_time_histo: Histogram,
pub materialized_page_cache_hit_counter: GenericCounter<AtomicU64>,
pub materialized_page_cache_hit_upon_request_counter: GenericCounter<AtomicU64>,
pub flush_time_histo: StorageTimeMetrics,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
@@ -783,15 +776,9 @@ impl TimelineMetrics {
) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
let compact_time_histo =
@@ -833,19 +820,18 @@ impl TimelineMetrics {
let read_num_fs_layers = READ_NUM_FS_LAYERS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
// TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter
// and integration test will error.
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
MATERIALIZED_PAGE_CACHE_HIT.get();
TimelineMetrics {
tenant_id,
timeline_id,
reconstruct_time_histo,
get_reconstruct_data_time_histo,
materialized_page_cache_hit_counter,
materialized_page_cache_hit_upon_request_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
@@ -872,10 +858,7 @@ impl Drop for TimelineMetrics {
fn drop(&mut self) {
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]);
let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]);
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);

View File

@@ -43,6 +43,16 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
AlreadyExists,
#[error("invalid relnode")]
InvalidRelnode,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
/// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
@@ -101,9 +111,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
@@ -148,9 +158,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
@@ -193,9 +203,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
// first try to lookup relation in cache
@@ -724,7 +734,7 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
Ok(())
}
@@ -751,7 +761,7 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -875,32 +885,38 @@ impl<'a> DatadirModification<'a> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
) -> Result<(), RelationError> {
if rel.relnode == 0 {
return Err(RelationError::AlreadyExists);
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
// Didn't exist. Update dbdir
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
let buf = DbDirectory::ser(&dbdir)?;
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
anyhow::bail!("rel {rel} already exists");
return Err(RelationError::AlreadyExists);
}
self.put(
rel_dir_key,
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
@@ -925,7 +941,7 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let last_lsn = self.tline.get_last_record_lsn();
if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
let size_key = rel_size_to_key(rel);
@@ -956,7 +972,7 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
// Put size
let size_key = rel_size_to_key(rel);
@@ -977,7 +993,7 @@ impl<'a> DatadirModification<'a> {
/// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
// Remove it from the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);

View File

@@ -473,6 +473,14 @@ pub(crate) enum ShutdownError {
AlreadyStopping,
}
struct DeletionGuard(OwnedMutexGuard<bool>);
impl DeletionGuard {
fn is_deleted(&self) -> bool {
*self.0
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
@@ -1139,7 +1147,11 @@ impl Tenant {
)
.context("create_timeline_struct")?;
let guard = Arc::clone(&timeline.delete_lock).lock_owned().await;
let guard = DeletionGuard(
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.expect("cannot happen because we're the only owner"),
);
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -1463,7 +1475,13 @@ impl Tenant {
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
.filter_map(|(timeline_id, timeline)| {
if timeline.is_active() {
Some((*timeline_id, timeline.clone()))
} else {
None
}
})
.collect::<Vec<_>>();
drop(timelines);
timelines_to_compact
@@ -1544,6 +1562,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
guard: DeletionGuard,
) -> anyhow::Result<()> {
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
@@ -1616,6 +1635,25 @@ impl Tenant {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
if let Some(remote_client) = &timeline.remote_client {
remote_client.delete_all().await.context("delete_all")?
};
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint in_progress_delete");
fail::fail_point!("in_progress_delete");
}
})
.await
.expect("spawn_blocking");
}
{
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
@@ -1636,12 +1674,7 @@ impl Tenant {
drop(timelines);
}
let remote_client = match &timeline.remote_client {
Some(remote_client) => remote_client,
None => return Ok(()),
};
remote_client.delete_all().await?;
drop(guard);
Ok(())
}
@@ -1689,23 +1722,18 @@ impl Tenant {
timeline = Arc::clone(timeline_entry.get());
// Prevent two tasks from trying to delete the timeline at the same time.
//
// XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller
// needs to poll until the operation has finished. But for now, we return an
// error, because the control plane knows to retry errors.
delete_lock_guard =
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.map_err(|_| {
DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err(
|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
},
)?);
// If another task finished the deletion just before we acquired the lock,
// return success.
if *delete_lock_guard {
if delete_lock_guard.is_deleted() {
return Ok(());
}
@@ -1779,7 +1807,7 @@ impl Tenant {
self: Arc<Self>,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
_guard: OwnedMutexGuard<bool>,
guard: DeletionGuard,
) {
let tenant_id = self.tenant_id;
let timeline_clone = Arc::clone(&timeline);
@@ -1792,7 +1820,7 @@ impl Tenant {
"timeline_delete",
false,
async move {
if let Err(err) = self.delete_timeline(timeline_id, timeline).await {
if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await {
error!("Error: {err:#}");
timeline_clone.set_broken(err.to_string())
};

View File

@@ -38,8 +38,8 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}

View File

@@ -753,22 +753,18 @@ impl RemoteTimelineClient {
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
#[cfg(feature = "testing")]
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!(
"at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
fail::fail_point!(
"persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
}
})
.await
.expect("spawn_blocking");
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint persist_deleted_index_part");
fail::fail_point!("persist_deleted_index_part");
}
})
.await
.expect("spawn_blocking");
}
upload::upload_index_part(
self.conf,
&self.storage_impl,

View File

@@ -15,6 +15,7 @@ use pageserver_api::models::{
TimelineState,
};
use remote_storage::GenericRemoteStorage;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -47,7 +48,10 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -616,9 +620,7 @@ impl Timeline {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => {
self.metrics
.materialized_page_cache_hit_upon_request_counter
.inc();
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
return Ok(cached_img); // exact LSN match, return the image
}
Ordering::Greater => {
@@ -640,8 +642,7 @@ impl Timeline {
.await?;
timer.stop_and_record();
self.metrics
.reconstruct_time_histo
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
}
@@ -2476,7 +2477,7 @@ impl Timeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(());
}
if prev_lsn <= cont_lsn {
@@ -3097,7 +3098,7 @@ impl Timeline {
frozen_layer: &Arc<InMemoryLayer>,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
let span = tracing::info_span!("blocking");
let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({
let new_delta: DeltaLayer = tokio::task::spawn_blocking({
let _g = span.entered();
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
@@ -3109,29 +3110,31 @@ impl Timeline {
// Sync it to disk.
//
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
// new layer files are durable.
//
// NB: timeline dir must be synced _after_ the file contents are durable.
// So, two separate fsyncs are required, they mustn't be batched.
//
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
// files to flush, the fsync overhead can be reduces as follows:
// 1. write them all to temporary file names
// 2. fsync them
// 3. rename to the final name
// 4. fsync the parent directory.
// Note that (1),(2),(3) today happen inside write_to_disk().
par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
par_fsync::par_fsync(&[self_clone
.conf
.timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)])
.context("fsync of timeline dir")?;
let sz = new_delta_path.metadata()?.len();
anyhow::Ok((new_delta, sz))
anyhow::Ok(new_delta)
}
})
.await
.context("spawn_blocking")??;
let new_delta_name = new_delta.filename();
let sz = new_delta.desc.file_size;
// Add it to the layer map
let l = Arc::new(new_delta);
@@ -3147,9 +3150,8 @@ impl Timeline {
mapping.insert(l);
batch_updates.flush();
// update the timeline's physical size
self.metrics.resident_physical_size_gauge.add(sz);
// update metrics
self.metrics.resident_physical_size_gauge.add(sz);
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
@@ -3431,6 +3433,130 @@ impl From<anyhow::Error> for CompactionError {
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
#[derive(Default)]
enum DurationRecorder {
#[default]
NotStarted,
Recorded(RecordedDuration, tokio::time::Instant),
}
impl DurationRecorder {
pub fn till_now(&self) -> DurationRecorder {
match self {
DurationRecorder::NotStarted => {
panic!("must only call on recorded measurements")
}
DurationRecorder::Recorded(_, ended) => {
let now = tokio::time::Instant::now();
DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
}
}
}
pub fn into_recorded(self) -> Option<RecordedDuration> {
match self {
DurationRecorder::NotStarted => None,
DurationRecorder::Recorded(recorded, _) => Some(recorded),
}
}
}
#[derive(Default)]
struct CompactLevel0Phase1StatsBuilder {
version: Option<u64>,
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
first_read_lock_acquisition_micros: DurationRecorder,
get_level0_deltas_plus_drop_lock_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
time_spent_between_locks: DurationRecorder,
second_read_lock_acquisition_micros: DurationRecorder,
second_read_lock_held_micros: DurationRecorder,
sort_holes_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
new_deltas_count: Option<usize>,
new_deltas_size: Option<u64>,
}
#[serde_as]
#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
first_read_lock_acquisition_micros: RecordedDuration,
get_level0_deltas_plus_drop_lock_micros: RecordedDuration,
level0_deltas_count: usize,
time_spent_between_locks: RecordedDuration,
second_read_lock_acquisition_micros: RecordedDuration,
second_read_lock_held_micros: RecordedDuration,
sort_holes_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
new_deltas_count: usize,
new_deltas_size: u64,
}
impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
type Error = anyhow::Error;
fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
let CompactLevel0Phase1StatsBuilder {
version,
tenant_id,
timeline_id,
first_read_lock_acquisition_micros,
get_level0_deltas_plus_drop_lock_micros,
level0_deltas_count,
time_spent_between_locks,
second_read_lock_acquisition_micros,
second_read_lock_held_micros,
sort_holes_micros,
write_layer_files_micros,
new_deltas_count,
new_deltas_size,
} = value;
Ok(CompactLevel0Phase1Stats {
version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?,
tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?,
timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?,
first_read_lock_acquisition_micros: first_read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?,
get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros
.into_recorded()
.ok_or_else(|| {
anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set")
})?,
level0_deltas_count: level0_deltas_count
.ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?,
time_spent_between_locks: time_spent_between_locks
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?,
second_read_lock_acquisition_micros: second_read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?,
second_read_lock_held_micros: second_read_lock_held_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?,
sort_holes_micros: sort_holes_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?,
write_layer_files_micros: write_layer_files_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?,
new_deltas_count: new_deltas_count
.ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?,
new_deltas_size: new_deltas_size
.ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?,
})
}
}
impl Timeline {
/// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
///
@@ -3443,11 +3569,26 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let mut stats = CompactLevel0Phase1StatsBuilder {
version: Some(1),
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
..Default::default()
};
let begin = tokio::time::Instant::now();
let guard = self.layers.read().await;
let now = tokio::time::Instant::now();
stats.first_read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
let (layers, mapping) = &*guard;
let mut level0_deltas = layers.get_level0_deltas()?;
drop(guard);
stats.level0_deltas_count = Some(level0_deltas.len());
stats.get_level0_deltas_plus_drop_lock_micros =
stats.first_read_lock_acquisition_micros.till_now();
// Only compact if enough layers have accumulated.
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
debug!(
@@ -3574,7 +3715,9 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now();
let guard = self.layers.read().await; // Is'n it better to hold original layers lock till here?
stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now();
let (layers, _) = &*guard;
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3609,9 +3752,11 @@ impl Timeline {
prev = Some(next_key.next());
}
drop_rlock(guard);
stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now();
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
@@ -3771,8 +3916,26 @@ impl Timeline {
layer_paths.pop().unwrap();
}
stats.write_layer_files_micros = stats.sort_holes_micros.till_now();
stats.new_deltas_count = Some(new_layers.len());
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
.and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
{
Ok(stats_json) => {
info!(
stats_json = stats_json.as_str(),
"compact_level0_phase1 stats available"
)
}
Err(e) => {
warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
}
}
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
@@ -3901,6 +4064,7 @@ impl Timeline {
/// for example. The caller should hold `Tenant::gc_cs` lock to ensure
/// that.
///
#[instrument(skip_all, fields(timline_id=%self.timeline_id))]
pub(super) async fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,

View File

@@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
@@ -1082,7 +1082,10 @@ impl<'a> WalIngest<'a> {
.await?
{
// create it with 0 size initially, the logic below will extend it
modification.put_rel_creation(rel, 0, ctx).await?;
modification
.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
0
} else {
self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?

View File

@@ -257,7 +257,7 @@ nwp_register_gucs(void)
"Walproposer reconnects to offline safekeepers once in this interval.",
NULL,
&wal_acceptor_reconnect_timeout,
5000, 0, INT_MAX, /* default, min, max */
1000, 0, INT_MAX, /* default, min, max */
PGC_SIGHUP, /* context */
GUC_UNIT_MS, /* flags */
NULL, NULL, NULL);

View File

@@ -1,5 +1,6 @@
use futures::pin_mut;
use futures::StreamExt;
use futures::TryFutureExt;
use hyper::body::HttpBody;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
@@ -11,8 +12,13 @@ use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use tracing::error;
use tracing::info;
use tracing::instrument;
use url::Url;
use crate::proxy::invalidate_cache;
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
use crate::{auth, config::ProxyConfig, console};
#[derive(serde::Deserialize)]
@@ -90,10 +96,17 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
}
}
struct ConnInfo {
username: String,
dbname: String,
hostname: String,
password: String,
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
) -> Result<(String, String, String, String), anyhow::Error> {
) -> Result<ConnInfo, anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
.ok_or(anyhow::anyhow!("missing connection string"))?
@@ -146,12 +159,12 @@ fn get_conn_info(
}
}
Ok((
username.to_owned(),
dbname.to_owned(),
hostname.to_owned(),
password.to_owned(),
))
Ok(ConnInfo {
username: username.to_owned(),
dbname: dbname.to_owned(),
hostname: hostname.to_owned(),
password: password.to_owned(),
})
}
// TODO: return different http error codes
@@ -164,10 +177,10 @@ pub async fn handle(
// Determine the destination and connection params
//
let headers = request.headers();
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
let conn_info = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &username),
("database", &dbname),
("user", &conn_info.username),
("database", &conn_info.dbname),
("application_name", APP_NAME),
]);
@@ -186,21 +199,20 @@ pub async fn handle(
let creds = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let mut node_info = creds.wake_compute(&extra).await?.expect("msg");
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
@@ -220,28 +232,10 @@ pub async fn handle(
let QueryData { query, params } = serde_json::from_slice(&body)?;
let query_params = json_to_pg_text(params)?;
//
// Connenct to the destination
//
let (client, connection) = tokio_postgres::Config::new()
.host(host)
.port(port)
.user(&username)
.password(&password)
.dbname(&dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
//
// Now execute the query and return the result
//
let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?;
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
@@ -280,6 +274,11 @@ pub async fn handle(
json!({
"name": Value::String(c.name().to_owned()),
"dataTypeID": Value::Number(c.type_().oid().into()),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
})
})
.collect::<Vec<_>>()
@@ -303,6 +302,70 @@ pub async fn handle(
}))
}
/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with
/// the difference that it uses `tokio_postgres` for the connection.
#[instrument(skip_all)]
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
loop {
match connect_to_compute_once(node_info, conn_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).await? {
// Update `node_info` and try one more time.
Some(new) => {
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.inspect_err(|e: &tokio_postgres::Error| {
error!(
"failed to connect to compute node hosts={:?} ports={:?}: {}",
node_info.config.get_hosts(),
node_info.config.get_ports(),
e
);
invalidate_cache(node_info)
})
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
Ok(client)
}
//
// Convert postgres row with text-encoded values to JSON object
//

View File

@@ -26,7 +26,6 @@ use tls_listener::TlsListener;
use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
select,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
@@ -193,14 +192,9 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
Err(anyhow::anyhow!("Query timed out"))
}
response = sql_over_http::handle(config, request, sni_hostname) => {
response
}
};
let result = sql_over_http::handle(config, request, sni_hostname)
.instrument(info_span!("sql-over-http"))
.await;
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,

View File

@@ -22,7 +22,7 @@ use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -283,34 +283,35 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
/// If we couldn't connect, a cached connection info might be to blame
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(name = "invalidate_cache", skip_all)]
pub fn invalidate_cache(node_info: &console::CachedNodeInfo) {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
) -> Result<PostgresConnection, compute::ConnectionError> {
// If we couldn't connect, a cached connection info might be to blame
// (e.g. the compute node's address might've changed at the wrong time).
// Invalidate the cache entry (if any) to prevent subsequent errors.
let invalidate_cache = |_: &compute::ConnectionError| {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
};
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(allow_self_signed_compute)
.inspect_err(invalidate_cache)
.inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info))
.await
}

191
scripts/comment-test-report.js Normal file → Executable file
View File

@@ -1,3 +1,5 @@
#! /usr/bin/env node
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
//
@@ -19,7 +21,7 @@
// })
//
// Analog of Python's defaultdict.
// Equivalent of Python's defaultdict.
//
// const dm = new DefaultMap(() => new DefaultMap(() => []))
// dm["firstKey"]["secondKey"].push("value")
@@ -32,34 +34,7 @@ class DefaultMap extends Map {
}
}
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
const parseReportJson = async ({ reportJsonUrl, fetch }) => {
const suites = await (await fetch(reportJsonUrl)).json()
// Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests.
@@ -83,7 +58,7 @@ module.exports = async ({ github, context, fetch, report }) => {
let buildType, pgVersion
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
if (match) {
({buildType, pgVersion} = match)
({ buildType, pgVersion } = match)
} else {
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
@@ -123,37 +98,68 @@ module.exports = async ({ github, context, fetch, report }) => {
}
}
return {
failedTests,
failedTestsCount,
passedTests,
passedTestsCount,
skippedTests,
skippedTestsCount,
flakyTests,
flakyTestsCount,
retriedTests,
pgVersions,
}
}
const reportSummary = async (params) => {
const {
failedTests,
failedTestsCount,
passedTests,
passedTestsCount,
skippedTests,
skippedTestsCount,
flakyTests,
flakyTestsCount,
retriedTests,
pgVersions,
reportUrl,
} = params
let summary = ""
const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount
commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
summary += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
// Print test resuls from the newest to the oldest Postgres version for release and debug builds.
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(failedTests[pgVersion]).length > 0) {
commentBody += `#### Failures on Posgres ${pgVersion}\n\n`
summary += `#### Failures on Posgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(failedTests[pgVersion])) {
const links = []
for (const test of tests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
links.push(`[${test.buildType}](${allureLink})`)
}
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
summary += `- \`${testName}\`: ${links.join(", ")}\n`
}
const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name)
const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"`
commentBody += "```\n"
commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n`
commentBody += `${command}\n`
commentBody += "```\n"
summary += "```\n"
summary += `# Run failed on Postgres ${pgVersion} tests locally:\n`
summary += `${command}\n`
summary += "```\n"
}
}
if (flakyTestsCount > 0) {
commentBody += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
summary += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(flakyTests[pgVersion]).length > 0) {
commentBody += `#### Postgres ${pgVersion}\n\n`
summary += `#### Postgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) {
const links = []
for (const test of tests) {
@@ -161,11 +167,57 @@ module.exports = async ({ github, context, fetch, report }) => {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
links.push(`[${status} ${test.buildType}](${allureLink})`)
}
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
summary += `- \`${testName}\`: ${links.join(", ")}\n`
}
}
}
commentBody += "\n</details>\n"
summary += "\n</details>\n"
}
return summary
}
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
try {
const parsed = await parseReportJson({ reportJsonUrl, fetch })
commentBody += await reportSummary({ ...parsed, reportUrl })
} catch (error) {
commentBody += `### [full report](${reportUrl})\n___\n`
commentBody += `#### Failed to create a summary for the test run: \n`
commentBody += "```\n"
commentBody += `${error.stack}\n`
commentBody += "```\n"
commentBody += "\nTo reproduce and debug the error locally run:\n"
commentBody += "```\n"
commentBody += `scripts/comment-test-report.js ${reportJsonUrl}`
commentBody += "\n```\n"
}
commentBody += autoupdateNotice
@@ -207,3 +259,60 @@ module.exports = async ({ github, context, fetch, report }) => {
})
}
}
// Equivalent of Python's `if __name__ == "__main__":`
// https://nodejs.org/docs/latest/api/modules.html#accessing-the-main-module
if (require.main === module) {
// Poor man's argument parsing: we expect the third argument is a JSON URL (0: node binary, 1: this script, 2: JSON url)
if (process.argv.length !== 3) {
console.error(`Unexpected number of arguments\nUsage: node ${process.argv[1]} <jsonUrl>`)
process.exit(1)
}
const jsonUrl = process.argv[2]
try {
new URL(jsonUrl)
} catch (error) {
console.error(`Invalid URL: ${jsonUrl}\nUsage: node ${process.argv[1]} <jsonUrl>`)
process.exit(1)
}
const htmlUrl = jsonUrl.replace("/data/suites.json", "/index.html")
const githubMock = {
rest: {
issues: {
createComment: console.log,
listComments: async () => ({ data: [] }),
updateComment: console.log
},
repos: {
createCommitComment: console.log,
listCommentsForCommit: async () => ({ data: [] }),
updateCommitComment: console.log
}
}
}
const contextMock = {
repo: {
owner: 'testOwner',
repo: 'testRepo'
},
payload: {
number: 42,
pull_request: null,
},
sha: '0000000000000000000000000000000000000000',
}
module.exports({
github: githubMock,
context: contextMock,
fetch: fetch,
report: {
reportUrl: htmlUrl,
reportJsonUrl: jsonUrl,
}
})
}

View File

@@ -1,12 +1,14 @@
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import sys
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
import backoff
import psycopg2
import psycopg2.extras
@@ -35,9 +37,20 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
@@ -115,6 +128,7 @@ def main():
parser.add_argument(
"--ingest",
type=Path,
required=True,
help="Path to perf test result file, or directory with perf test result files",
)
parser.add_argument("--initdb", action="store_true", help="Initialuze database")
@@ -140,4 +154,5 @@ def main():
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -1,11 +1,13 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import re
import sys
from contextlib import contextmanager
from pathlib import Path
import backoff
import psycopg2
CREATE_TABLE = """
@@ -29,9 +31,20 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
@@ -101,4 +114,5 @@ def main():
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import List
from typing import Any, List, MutableMapping, cast
import pytest
from _pytest.config import Config
@@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]):
# Rerun 3 times = 1 original run + 2 reruns
log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times")
item.add_marker(pytest.mark.flaky(reruns=2))
# pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns),
# we can workaround it by setting `timeout_func_only` to True[1].
# Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2],
# but we still can do it using pytest marker.
#
# - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99
# - [2] https://github.com/pytest-dev/pytest-timeout/issues/142
timeout_marker = item.get_closest_marker("timeout")
if timeout_marker is not None:
kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs)
kwargs["func_only"] = True

View File

@@ -57,14 +57,16 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_current_logical_size",
"pageserver_resident_physical_size",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
"pageserver_getpage_get_reconstruct_data_seconds_count",
"pageserver_getpage_get_reconstruct_data_seconds_sum",
@@ -73,8 +75,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_io_operations_seconds_count",
"pageserver_io_operations_seconds_sum",
"pageserver_last_record_lsn",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_read_num_fs_layers_bucket",
"pageserver_read_num_fs_layers_count",
"pageserver_read_num_fs_layers_sum",

View File

@@ -1631,6 +1631,8 @@ class NeonPageserver(PgProtocol):
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
]
def start(

View File

@@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session):
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
"""
Note that deletion is not instant, it is scheduled and performed mostly in the background.
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
For longer description consult with pageserver openapi spec.
"""
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs
)

View File

@@ -193,19 +193,30 @@ def wait_for_upload_queue_empty(
time.sleep(0.2)
def assert_timeline_detail_404(
def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
last_exc = None
for _ in range(2):
time.sleep(0.250)
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code == 404:
return
last_exc = e
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
def timeline_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
**delete_args,
):
"""Asserts that timeline_detail returns 404, or dumps the detail."""
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.error(e)
if e.status_code == 404:
return
else:
raise
raise Exception("detail succeeded (it should return 404)")
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id)

View File

@@ -15,7 +15,11 @@ from fixtures.neon_fixtures import (
PortDistributor,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
from pytest import FixtureRequest
@@ -417,7 +421,7 @@ def check_neon_works(
)
shutil.rmtree(repo_dir / "local_fs_remote_storage")
pageserver_http.timeline_delete(tenant_id, timeline_id)
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
pageserver_http.timeline_create(pg_version, tenant_id, timeline_id)
pg_bin.run(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]

View File

@@ -14,7 +14,11 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture
@@ -151,7 +155,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
".*files not bound to index_file.json, proceeding with their deletion.*"
)
client.timeline_delete(tenant, timeline)
timeline_delete_wait_completed(client, tenant, timeline)
# Importing correct backup works
import_tar(base_tar, wal_tar)

View File

@@ -24,7 +24,13 @@ def test_basic_eviction(
test_name="test_download_remote_layers_api",
)
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable gc and compaction background loops because they perform on-demand downloads
"gc_period": "0s",
"compaction_period": "0s",
}
)
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
@@ -47,6 +53,11 @@ def test_basic_eviction(
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
# disable compute & sks to avoid on-demand downloads by walreceiver / getpage
endpoint.stop()
for sk in env.safekeepers:
sk.stop()
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))

View File

@@ -20,7 +20,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
@@ -597,14 +597,11 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
)
client.timeline_delete(tenant_id, timeline_id)
env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*")
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id))
timeline_delete_wait_completed(client, tenant_id, timeline_id)
assert not timeline_path.exists()

View File

@@ -11,6 +11,7 @@ from fixtures.neon_fixtures import (
wait_for_wal_insert_lsn,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.pg_version import PgVersion, xfail_on_postgres
from fixtures.types import Lsn, TenantId, TimelineId
@@ -628,12 +629,12 @@ def test_get_tenant_size_with_multiple_branches(
size_debug_file_before.write(size_debug)
# teardown, delete branches, and the size should be going down
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id)
size_after_deleting_first = http_client.tenant_size(tenant_id)
assert size_after_deleting_first < size_after_thinning_branch
http_client.timeline_delete(tenant_id, second_branch_timeline_id)
timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id)
size_after_deleting_second = http_client.tenant_size(tenant_id)
assert size_after_deleting_second < size_after_deleting_first

View File

@@ -1,6 +1,10 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_until_tenant_active,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
@@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
def delete_all_timelines(tenant: TenantId):
timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)]
for t in timelines:
client.timeline_delete(tenant, t)
timeline_delete_wait_completed(client, tenant, t)
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()

View File

@@ -21,6 +21,7 @@ from fixtures.neon_fixtures import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -213,7 +214,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
# Test (a subset of) pageserver global metrics
for metric in PAGESERVER_GLOBAL_METRICS:
ps_samples = ps_metrics.query_all(metric, {})
assert len(ps_samples) > 0
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
for sample in ps_samples:
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
log.info(f"{sample.name}{{{labels}}} {sample.value}")
@@ -318,9 +319,10 @@ def test_pageserver_with_empty_tenants(
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
timeline_delete_wait_completed(
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(

View File

@@ -17,9 +17,10 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
wait_timeline_detail_404,
wait_until_tenant_active,
wait_until_timeline_state,
)
@@ -83,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id),
func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id),
)
assert not timeline_path.exists()
@@ -94,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
) as exc:
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
# FIXME leaves tenant without timelines, should we prevent deletion of root timeline?
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id),
)
assert exc.value.status_code == 404
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: timeline_delete_wait_completed(
ps_http, env.initial_tenant, parent_timeline_id
),
)
# Check that we didn't pick up the timeline again after restart.
# See https://github.com/neondatabase/neon/issues/3560
env.pageserver.stop(immediate=True)
@@ -143,7 +144,6 @@ def test_delete_timeline_post_rm_failure(
ps_http.configure_failpoints((failpoint_name, "return"))
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
@@ -165,13 +165,7 @@ def test_delete_timeline_post_rm_failure(
# this should succeed
# this also checks that delete can be retried even when timeline is in Broken state
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2)
with pytest.raises(PageserverApiException) as e:
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert e.value.status_code == 404
env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*")
timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline)
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*"
)
@@ -247,13 +241,7 @@ def test_timeline_resurrection_on_attach(
pass
# delete new timeline
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id))
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id)
##### Stop the pageserver instance, erase all its data
env.endpoints.stop_all()
@@ -338,7 +326,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
@@ -357,12 +344,15 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id)
)
try:
data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
log.debug(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code != 404:
raise
else:
raise Exception("detail succeeded (it should return 404)")
assert (
not leaf_timeline_path.exists()
@@ -389,13 +379,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
assert env.initial_timeline is not None
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
ps_http.timeline_delete(env.initial_tenant, timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id)
timeline_delete_wait_completed(
ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id
)
assert_prefix_empty(
@@ -419,23 +404,27 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
neon_env_builder: NeonEnvBuilder,
@pytest.mark.parametrize(
"stuck_failpoint",
["persist_deleted_index_part", "in_progress_delete"],
)
def test_concurrent_timeline_delete_stuck_on(
neon_env_builder: NeonEnvBuilder, stuck_failpoint: str
):
"""
If we're stuck uploading the index file with the is_delete flag,
eventually console will hand up and retry.
If we're still stuck at the retry time, ensure that the retry
fails with status 500, signalling to console that it should retry
later.
Ideally, timeline_delete should return 202 Accepted and require
console to poll for completion, but, that would require changing
the API contract.
If delete is stuck console will eventually retry deletion.
So we need to be sure that these requests wont interleave with each other.
In this tests we check two places where we can spend a lot of time.
This is a regression test because there was a bug when DeletionGuard wasnt propagated
to the background task.
Ensure that when retry comes if we're still stuck request will get an immediate error response,
signalling to console that it should retry later.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload",
test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}",
)
env = neon_env_builder.init_start()
@@ -445,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
ps_http = env.pageserver.http_client()
# make the first call sleep practically forever
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
ps_http.configure_failpoints((stuck_failpoint, "pause"))
def first_call(result_queue):
try:
log.info("first call start")
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
timeline_delete_wait_completed(
ps_http, env.initial_tenant, child_timeline_id, timeout=10
)
log.info("first call success")
result_queue.put("success")
except Exception:
@@ -466,7 +456,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
def first_call_hit_failpoint():
assert env.pageserver.log_contains(
f".*{child_timeline_id}.*at failpoint {failpoint_name}"
f".*{child_timeline_id}.*at failpoint {stuck_failpoint}"
)
wait_until(50, 0.1, first_call_hit_failpoint)
@@ -484,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
)
log.info("second call failed as expected")
# ensure it is not 404 and stopping
detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert detail["state"] == "Stopping"
# by now we know that the second call failed, let's ensure the first call will finish
ps_http.configure_failpoints((failpoint_name, "off"))
ps_http.configure_failpoints((stuck_failpoint, "off"))
result = first_call_result.get()
assert result == "success"
@@ -498,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
"""
If the client hangs up before we start the index part upload but after we mark it
If the client hangs up before we start the index part upload but after deletion is scheduled
we mark it
deleted in local memory, a subsequent delete_timeline call should be able to do
another delete timeline operation.
This tests cancel safety up to the given failpoint.
@@ -515,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
ps_http = env.pageserver.http_client()
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
failpoint_name = "persist_deleted_index_part"
ps_http.configure_failpoints((failpoint_name, "pause"))
with pytest.raises(requests.exceptions.Timeout):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*timeline deletion is already in progress.*"
)
with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# make sure the timeout was due to the failpoint
at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*"
@@ -552,12 +554,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found"
env.pageserver.allowed_errors.append(".*" + notfound_message)
with pytest.raises(PageserverApiException, match=notfound_message) as exc:
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert exc.value.status_code == 404
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id)
@pytest.mark.parametrize(
@@ -616,12 +613,7 @@ def test_timeline_delete_works_for_remote_smoke(
for timeline_id in reversed(timeline_ids):
# note that we need to finish previous deletion before scheduling next one
# otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3)
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id))
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id)
assert_prefix_empty(
neon_env_builder,

View File

@@ -24,6 +24,7 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
@@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation(
if deletion_method == "tenant_detach":
client.tenant_detach(tenant_id)
elif deletion_method == "timeline_delete":
client.timeline_delete(tenant_id, timeline_id)
timeline_delete_wait_completed(client, tenant_id, timeline_id)
delete_timeline_success.put(True)
except PageserverApiException:
delete_timeline_success.put(False)

View File

@@ -31,7 +31,11 @@ from fixtures.neon_fixtures import (
SafekeeperPort,
available_remote_storages,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
f"sk_id={sk.id} to flush {last_lsn}",
)
ps_cli = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
ps_http = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
lag = last_lsn - pageserver_lsn
log.info(
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
endpoint.stop_and_destroy()
ps_cli.timeline_delete(tenant_id, timeline_id)
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
# Also delete and manually create timeline on safekeepers -- this tests
# scenario of manual recovery on different set of safekeepers.
@@ -571,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
# Terminate first all safekeepers to prevent communication unexpectantly
# advancing peer_horizon_lsn.
for sk in env.safekeepers:
cli = sk.http_client()
cli.timeline_delete_force(tenant_id, timeline_id)
# restart safekeeper to clear its in-memory state
sk.stop().start()
sk.stop()
# wait all potenital in flight pushes to broker arrive before starting
# safekeepers (even without sleep, it is very unlikely they are not
# delivered yet).
time.sleep(1)
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
f_partial_path = (
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
@@ -583,7 +597,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
shutil.copy(f_partial_saved, f_partial_path)
# recreate timeline on pageserver from scratch
ps_cli.timeline_create(
ps_http.timeline_create(
pg_version=PgVersion(pg_version),
tenant_id=tenant_id,
new_timeline_id=timeline_id,
@@ -598,7 +612,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
if elapsed > wait_lsn_timeout:
raise RuntimeError("Timed out waiting for WAL redo")
tenant_status = ps_cli.tenant_status(tenant_id)
tenant_status = ps_http.tenant_status(tenant_id)
if tenant_status["state"]["slug"] == "Loading":
log.debug(f"Tenant {tenant_id} is still loading, retrying")
else:

View File

@@ -1,3 +1,5 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
@@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
# Kills one of the safekeepers and ensures that only the active ones are printed in the state.
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
# Trigger WAL wait timeout faster
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
neon_env_builder.pageserver_config_override = """
wait_lsn_timeout = "1s"
tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"}
"""
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
neon_env_builder.safekeepers_id_start = 12345
neon_env_builder.num_safekeepers = 3
@@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
stopped_safekeeper_id = stopped_safekeeper.id
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
stopped_safekeeper.stop()
# sleep until stopped safekeeper is removed from candidates
time.sleep(2)
# Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats.
insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)