Compare commits

..

17 Commits

Author SHA1 Message Date
Devin AI
8ec905239b Fix unused imports and pattern matching for CompactionError::Cancelled
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-07 09:07:04 +00:00
Devin AI
e32aceff16 Replace string comparison with pattern matching for error handling
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 19:28:51 +00:00
Devin AI
486d9f0c4d Address PR comments: restore maybe_fatal_err, use anyhow::Error::new, add TODO comments, fix string matching
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 19:17:57 +00:00
Devin AI
000503b38a Fix error handling in compaction.rs to use .into() for proper type conversion
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 18:53:08 +00:00
Devin AI
7a576d723c Extend error handling to put_image, put_value_*, and put_batch methods
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 16:29:57 +00:00
Devin AI
b419fa9e2f Fix InMemoryLayerError enum placement
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 16:04:52 +00:00
Devin AI
aa45bf998d Extend error handling to put_image, put_value_*, and put_batch methods
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 16:04:20 +00:00
Devin AI
f8dffb62cf Bubble up BlobWriterError to users of BlobWriter
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 14:56:28 +00:00
Devin AI
ee7479cb68 Replace anyhow::Result with typed BlobWriterError in BlobWriter error path
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 09:57:13 +00:00
Folke Behrens
62ac5b94b3 proxy: Include the exp/nbf timestamps in the errors (#11828)
## Problem

It's difficult to tell when the JWT expired from current logs and error
messages.

## Summary of changes

Add exp/nbf timestamps to the respective error variants.
Also use checked_add when deserializing a SystemTime from JWT.

Related to INC-509
2025-05-06 09:28:25 +00:00
Konstantin Knizhnik
f0e7b3e0ef Use unlogged build for gist_indexsortbuild_flush_ready_pages (#11753)
## Problem

See https://github.com/neondatabase/neon/issues/11718

GIST index can be constructed in two ways: GIST_SORTED_BUILD and
GIST_BUFFERING.
We used unlogged build in the second case but not in the first.

## Summary of changes

Use unlogged build in `gist_indexsortbuild_flush_ready_pages`

Correspondent Postgres PRsL:
https://github.com/neondatabase/postgres/pull/624
https://github.com/neondatabase/postgres/pull/625
https://github.com/neondatabase/postgres/pull/626

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-05-06 07:24:27 +00:00
Dmitrii Kovalkov
c6ff18affc cosmetics(pgxn/neon): WP code small clean up (#11824)
## Problem
Some small cosmetic changes I made while reading the code. Should not
affect anything.

## Summary of changes
- Remove `n_votes` field because it's not used anymore
- Explicitly initialize `safekeepers_generation` with
`INVALID_GENERATION` if the generation is not present (the struct is
zero-initialized anyway, but the explicit initialization is better IMHO)
- Access SafekeeperId via pointer `sk_id` created above
2025-05-06 06:51:51 +00:00
Heikki Linnakangas
16ca74a3f4 Add SAFETY comment on libc::sysconf() call (#11581)
I got an 'undocumented_unsafe_blocks' clippy warning about it. Not sure
why I got the warning now and not before, but in any case a comment is a
good idea.
2025-05-06 06:49:23 +00:00
Peter Bendel
cb67f9a651 delete orphan left over projects (#11826)
## Problem

sometimes our benchmarking GitHub workflow is terminated by side-effects
beyond our control (e.g. GitHub runner looses connection to server) and
then we have left-over Neon projects created during the workflow

[Example where GitHub runner lost connection and project was not
deleted](https://github.com/neondatabase/neon/actions/runs/14017400543/job/39244816485)

Fixes https://github.com/neondatabase/cloud/issues/28546

## Summary of changes

- Add a cleanup step that cleans up left-over projects
- also give each project created during workflows a name that references
the testcase and GitHub runid

## Example run (test of new job steps)


https://github.com/neondatabase/neon/actions/runs/14837092399/job/41650741922#step:6:63

---------

Co-authored-by: a-masterov <72613290+a-masterov@users.noreply.github.com>
2025-05-05 14:30:13 +00:00
devin-ai-integration[bot]
baf425a2cd [pageserver/virtual_file] impr: Improve OpenOptions API ergonomics (#11789)
# Improve OpenOptions API ergonomics

Closes #11787

This PR improves the OpenOptions API ergonomics by:

1. Making OpenOptions methods take and return owned Self instead of &mut
self
2. Changing VirtualFile::open_with_options_v2 to take an owned
OpenOptions
3. Removing unnecessary .clone() and .to_owned() calls

These changes make the API more idiomatic Rust by leveraging the builder
pattern with owned values, which is cleaner and more ergonomic than the
previous approach.

Link to Devin run:
https://app.devin.ai/sessions/c2a4b24f7aca40a3b3777f4259bf8ee1
Requested by: christian@neon.tech

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: christian@neon.tech <christian@neon.tech>
2025-05-05 13:06:37 +00:00
Alex Chi Z.
0b243242df fix(test): allow flush error in gc-compaction tests (#11822)
## Problem

Part of https://github.com/neondatabase/neon/issues/11762

## Summary of changes

While #11762 needs some work to refactor the error propagating thing, we
can do a hacky fix for the gc-compaction tests to allow flush error
during shutdown. It does not affect correctness.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-05 12:15:22 +00:00
Conrad Ludgate
6131d86ec9 proxy: allow invalid SNI (#11792)
## Problem

Some PrivateLink customers are unable to use Private DNS. As such they
use an invalid domain name to address Neon. We currently are rejecting
those connections because we cannot resolve the correct certificate.

## Summary of changes

1. Ensure a certificate is always returned.
2. If there is an SNI field, use endpoint fallback if it doesn't match.

I suggest reviewing each commit separately.
2025-05-05 11:18:55 +00:00
36 changed files with 838 additions and 673 deletions

View File

@@ -53,6 +53,77 @@ concurrency:
cancel-in-progress: true
jobs:
cleanup:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
env:
ORG_ID: org-solitary-dew-09443886
LIMIT: 100
SEARCH: "Created by actions/neon-project-create; GITHUB_RUN_ID"
BASE_URL: https://console-stage.neon.build/api/v2
DRY_RUN: "false" # Set to "true" to just test out the workflow
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cleanup inactive Neon projects left over from prior runs
env:
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
run: |
set -euo pipefail
NOW=$(date -u +%s)
DAYS_AGO=$((NOW - 5 * 86400))
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
echo "Requesting project list from:"
echo "$REQUEST_URL"
response=$(curl -s -X GET "$REQUEST_URL" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}" )
echo "Response:"
echo "$response" | jq .
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
.projects[]
| select(.compute_last_active_at != null)
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
| {id, name, compute_last_active_at}
')
if [ -z "$projects_to_delete" ]; then
echo "No projects eligible for deletion."
exit 0
fi
echo "Projects that will be deleted:"
echo "$projects_to_delete" | jq -r '.id'
if [ "$DRY_RUN" = "false" ]; then
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
echo "Deleting project: $project_id"
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
done
else
echo "Dry run enabled — no projects were deleted."
fi
bench:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -16,6 +16,7 @@ pub struct Collector {
const NMETRICS: usize = 2;
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
// SAFETY: libc::sysconf is safe, it merely returns a value.
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");

View File

@@ -2469,6 +2469,7 @@ async fn timeline_checkpoint_handler(
.map_err(|e|
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Cancelled => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e),

View File

@@ -3198,6 +3198,7 @@ impl TenantShard {
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
CompactionError::Cancelled => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}

View File

@@ -90,10 +90,18 @@ impl Header {
}
}
#[derive(Debug, thiserror::Error)]
pub enum BlobWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
Flush(BlobWriterError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
@@ -238,14 +246,16 @@ where
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> anyhow::Result<Self> {
) -> Result<Self, BlobWriterError> {
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
Ok(Self {
io_buf: Some(BytesMut::new()),
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate.enter()?,
gate_token,
cancel,
ctx,
flush_task_span,
@@ -265,13 +275,16 @@ where
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})
.map(|len| {
self.offset += len as u64;
});
@@ -418,8 +431,10 @@ where
self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<W, FlushTaskError> {
let (_, file) = self.writer.shutdown(mode, ctx).await?;
) -> Result<W, BlobWriterError> {
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})?;
Ok(file)
}
}
@@ -467,8 +482,11 @@ pub(crate) mod tests {
.await?,
gate.enter()?,
);
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -490,7 +508,11 @@ pub(crate) mod tests {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
file.disarm_into_inner()
};
Ok((temp_dir, pathbuf, offsets))

View File

@@ -19,6 +19,14 @@ use crate::context::RequestContext;
use crate::tenant::Timeline;
use crate::tenant::storage_layer::Layer;
#[derive(Debug, thiserror::Error)]
pub enum BatchSplitWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
@@ -97,7 +105,7 @@ impl BatchLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<ResidentLayer>> {
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
let res = self
.finish_with_discard_fn(tline, ctx, |_| async { false })
.await?;
@@ -115,7 +123,7 @@ impl BatchLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -139,14 +147,14 @@ impl BatchLayerWriter {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
LayerWriterWrapper::Delta(writer) => writer
.finish(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
LayerWriterWrapper::Image(writer) => writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
};
let layer = match res {
Ok((desc, path)) => {
@@ -155,7 +163,7 @@ impl BatchLayerWriter {
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
return Err(BatchSplitWriterError::Other(e));
}
}
}
@@ -235,7 +243,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), BatchSplitWriterError> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -253,7 +261,8 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -262,7 +271,10 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
self.inner.put_image(key, img, ctx).await
self.inner
.put_image(key, img, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -271,7 +283,7 @@ impl<'a> SplitImageLayerWriter<'a> {
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -291,7 +303,7 @@ impl<'a> SplitImageLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<BatchWriterResult>> {
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
@@ -346,7 +358,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), BatchSplitWriterError> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -366,7 +378,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?,
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -386,7 +399,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -396,16 +410,19 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
);
)));
}
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
inner
.put_value(key, lsn, val, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -413,7 +430,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -439,7 +456,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<BatchWriterResult>> {
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

View File

@@ -35,7 +35,9 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -76,7 +78,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -448,7 +450,11 @@ impl DeltaLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -477,15 +483,12 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), DeltaLayerWriterError> {
let val_ser =
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (_, res) = self
.put_value_bytes(
key,
lsn,
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
.await;
res
}
@@ -497,25 +500,46 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
where
Buf: IoBuf + Send,
{
assert!(
self.lsn_range.start <= lsn,
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
);
if self.lsn_range.start > lsn {
return (
val,
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
))),
);
}
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok((off, _)) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),
Err(e) => {
return (
val,
Err(match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
DeltaLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
DeltaLayerWriterError::Other(err)
}
},
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
}),
);
}
};
let blob_ref = BlobRef::new(off, will_init);
@@ -525,7 +549,10 @@ impl DeltaLayerWriterInner {
self.num_keys += 1;
(val, res.map_err(|e| anyhow::anyhow!(e)))
(
val,
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
)
}
fn size(&self) -> u64 {
@@ -539,7 +566,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let file = self
@@ -548,17 +575,24 @@ impl DeltaLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
})?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -575,24 +609,27 @@ impl DeltaLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let buf = summary
.ser_into_page()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let metadata = file
.metadata()
.await
.context("get file metadata to determine size")?;
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
// 5GB limit for objects without multipart upload (which we don't want to use)
// Make it a little bit below to account for differing GB units
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
);
if metadata.len() > S3_UPLOAD_LIMIT {
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
)));
}
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -609,7 +646,7 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("delta_layer sync_all")?;
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
trace!("created delta layer {}", self.path);
@@ -694,7 +731,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), DeltaLayerWriterError> {
self.inner
.as_mut()
.unwrap()
@@ -709,7 +746,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
where
Buf: IoBuf + Send,
{
@@ -731,7 +768,7 @@ impl DeltaLayerWriter {
mut self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
@@ -745,6 +782,14 @@ impl DeltaLayerWriter {
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeltaLayerWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -755,7 +800,7 @@ pub enum RewriteSummaryError {
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
Self::Other(anyhow::Error::new(e))
}
}

View File

@@ -33,7 +33,9 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -74,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -340,6 +342,14 @@ impl ImageLayer {
}
}
#[derive(Debug, thiserror::Error)]
pub enum ImageLayerWriterError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -814,7 +824,11 @@ impl ImageLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -850,8 +864,13 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -861,7 +880,18 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res?;
let (off, compression_info) = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -873,7 +903,9 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
self.tree
.append(&keybuf, off)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
#[cfg(feature = "testing")]
{
@@ -893,8 +925,12 @@ impl ImageLayerWriterInner {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
// NB: we don't update the (un)compressed metrics, since we can't determine them without
// decompressing the image. This seems okay.
@@ -904,11 +940,23 @@ impl ImageLayerWriterInner {
.blob_writer
.write_blob_raw(raw_with_header.slice_len(), ctx)
.await;
let offset = res?;
let offset = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, offset)?;
self.tree
.append(&keybuf, offset)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
#[cfg(feature = "testing")]
{
@@ -925,7 +973,7 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
// Calculate compression ratio
@@ -948,17 +996,24 @@ impl ImageLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
})?;
// Write out the index
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self.tree.finish()?;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
offset += PAGE_SZ as u64;
}
@@ -981,14 +1036,18 @@ impl ImageLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let buf = summary
.ser_into_page()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let metadata = file
.metadata()
.await
.context("get metadata to determine file size")?;
let metadata = file.metadata().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!(
"get metadata to determine file size: {}",
e
))
})?;
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
@@ -1011,9 +1070,9 @@ impl ImageLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("image_layer sync_all")?;
file.sync_all().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
})?;
trace!("created image layer {}", self.path);
@@ -1093,7 +1152,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), ImageLayerWriterError> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
@@ -1108,7 +1167,7 @@ impl ImageLayerWriter {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), ImageLayerWriterError> {
self.inner
.as_mut()
.unwrap()
@@ -1132,7 +1191,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish(
mut self,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
self.inner.take().unwrap().finish(ctx, None).await
}
@@ -1141,7 +1200,7 @@ impl ImageLayerWriter {
mut self,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
}
}

View File

@@ -26,6 +26,7 @@ use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::delta_layer::DeltaLayerWriterError;
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
use crate::config::PageServerConf;
@@ -581,7 +582,17 @@ impl InMemoryLayer {
estimated_in_mem_size: AtomicU64::new(0),
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum InMemoryLayerError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
impl InMemoryLayer {
/// Write path.
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
@@ -591,7 +602,7 @@ impl InMemoryLayer {
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), InMemoryLayerError> {
let mut inner = self.inner.write().await;
self.assert_writable();
@@ -605,7 +616,11 @@ impl InMemoryLayer {
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
inner
.file
.write_raw(&raw, ctx)
.await
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
let new_size = inner.file.len();
let expected_new_len = base_offset
@@ -637,7 +652,8 @@ impl InMemoryLayer {
batch_offset,
len,
will_init,
})?;
})
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
@@ -794,14 +810,25 @@ impl InMemoryLayer {
ctx,
)
.await;
res?;
res.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
DeltaLayerWriterError::Other(err) => err,
})?;
}
}
}
}
// MAX is used here because we identify L0 layers by full key range
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
let (desc, path) = delta_layer_writer
.finish(Key::MAX, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
DeltaLayerWriterError::Other(err) => err,
})?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//

View File

@@ -300,6 +300,7 @@ pub(crate) fn log_compaction_error(
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Cancelled => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,

View File

@@ -119,6 +119,8 @@ use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
@@ -773,6 +775,21 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
}
}
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
for CreateImageLayersError
{
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
match err {
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
Self::Cancelled
}
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
Self::Other(err)
}
}
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -2041,6 +2058,9 @@ impl Timeline {
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Cancelled) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
@@ -5232,7 +5252,17 @@ impl Timeline {
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
image_layer_writer
.put_image(img_key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
anyhow::anyhow!("flush task cancelled"),
),
ImageLayerWriterError::Other(err) => {
CreateImageLayersError::Other(err)
}
})?;
wrote_keys = true;
}
}
@@ -5329,7 +5359,15 @@ impl Timeline {
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer.put_image(k, v, ctx).await?;
image_layer_writer
.put_image(k, v, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
})?;
}
if wrote_any_image {
@@ -5843,6 +5881,8 @@ pub(crate) enum CompactionError {
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
#[error("cancelled")]
Cancelled,
}
impl CompactionError {
@@ -5857,6 +5897,7 @@ impl CompactionError {
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
| Self::Cancelled
)
}
@@ -6922,9 +6963,22 @@ impl Timeline {
)
.await?;
for (key, img) in images {
image_layer_writer.put_image(key, img, ctx).await?;
image_layer_writer
.put_image(key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
}
let (desc, path) = image_layer_writer.finish(ctx).await?;
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
@@ -7378,7 +7432,10 @@ impl TimelineWriter<'_> {
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
}
res
res.map_err(|e| match e {
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
InMemoryLayerError::Other(err) => err,
})
}
#[cfg(test)]

View File

@@ -1547,7 +1547,7 @@ impl Timeline {
ctx,
)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
@@ -1572,7 +1572,7 @@ impl Timeline {
let (desc, path) = image_layer_writer
.finish(ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
@@ -2140,7 +2140,7 @@ impl Timeline {
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2199,7 +2199,7 @@ impl Timeline {
.unwrap()
.put_value(key, lsn, value, ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
} else {
let owner = self.shard_identity.get_shard_number(&key);
@@ -2217,7 +2217,7 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
@@ -3682,8 +3682,7 @@ impl Timeline {
let (desc, path) = delta_writer_before
.finish(job_desc.compaction_key_range.start, ctx)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3693,8 +3692,7 @@ impl Timeline {
let (desc, path) = delta_writer_after
.finish(key.key_range.end, ctx)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3713,8 +3711,7 @@ impl Timeline {
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await
.context("failed to finish image layer writer")
.map_err(CompactionError::Other)?
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
} else {
drop(writer);
Vec::new()
@@ -3727,8 +3724,7 @@ impl Timeline {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
} else {
drop(delta_layer_writer);
Vec::new()
@@ -4253,7 +4249,10 @@ impl TimelineAdaptor {
unfinished_image_layer,
} = outcome
{
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
let (desc, path) = unfinished_image_layer
.finish(ctx)
.await
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);

View File

@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
@@ -816,7 +817,10 @@ async fn copy_lsn_prefix(
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(Error::Prepare)?;
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
})?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(Error::Prepare)?;

View File

@@ -14,8 +14,6 @@
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
@@ -99,7 +97,7 @@ impl VirtualFile {
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let mode = get_io_mode();
@@ -112,21 +110,16 @@ impl VirtualFile {
#[cfg(target_os = "linux")]
(IoMode::DirectRw, _) => true,
};
let open_options = open_options.clone();
let open_options = if set_o_direct {
if set_o_direct {
#[cfg(target_os = "linux")]
{
let mut open_options = open_options;
open_options.custom_flags(nix::libc::O_DIRECT);
open_options
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
}
#[cfg(not(target_os = "linux"))]
unreachable!(
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
);
} else {
open_options
};
}
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile { inner, _mode: mode })
}
@@ -530,7 +523,7 @@ impl VirtualFileInner {
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
/// Open a file with given options.
@@ -558,10 +551,11 @@ impl VirtualFileInner {
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options.clone();
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let reopen_options = open_options
.clone()
.create(false)
.create_new(false)
.truncate(false);
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
@@ -1307,7 +1301,7 @@ mod tests {
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
@@ -1374,7 +1368,7 @@ mod tests {
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
@@ -1393,8 +1387,7 @@ mod tests {
.read(true)
.write(true)
.create(true)
.truncate(true)
.to_owned(),
.truncate(true),
&ctx,
)
.await?;
@@ -1412,12 +1405,7 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = A::open(
path_b.clone(),
OpenOptions::new().read(true).to_owned(),
&ctx,
)
.await?;
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
vfiles.push(vfile);
}
@@ -1466,7 +1454,7 @@ mod tests {
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true).clone(),
OpenOptions::new().read(true),
&ctx,
)
.await?;

View File

@@ -1,6 +1,7 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use std::os::fd::OwnedFd;
use std::os::unix::fs::OpenOptionsExt;
use std::path::Path;
use super::io_engine::IoEngine;
@@ -43,7 +44,7 @@ impl OpenOptions {
self.write
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
pub fn read(mut self, read: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.read(read);
@@ -56,7 +57,7 @@ impl OpenOptions {
self
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
pub fn write(mut self, write: bool) -> Self {
self.write = write;
match &mut self.inner {
Inner::StdFs(x) => {
@@ -70,7 +71,7 @@ impl OpenOptions {
self
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
pub fn create(mut self, create: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create(create);
@@ -83,7 +84,7 @@ impl OpenOptions {
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
pub fn create_new(mut self, create_new: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create_new(create_new);
@@ -96,7 +97,7 @@ impl OpenOptions {
self
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
pub fn truncate(mut self, truncate: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.truncate(truncate);
@@ -124,10 +125,8 @@ impl OpenOptions {
}
}
}
}
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
pub fn mode(mut self, mode: u32) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.mode(mode);
@@ -140,7 +139,7 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
self
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
pub fn custom_flags(mut self, flags: i32) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);

View File

@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
* fetched from timeline 'tli'.
*
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
* occurs, in which case 'err' has the desciption. Error always closes remote
* occurs, in which case 'err' has the description. Error always closes remote
* connection, if there was any, so socket subscription should be removed.
*
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with

View File

@@ -1989,8 +1989,14 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
#if PG_MAJORVERSION_NUM >= 17
/*
* We have to disable this check for pg14-16 because sorted build of GIST index requires
* to perform unlogged build several times
*/
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;

View File

@@ -124,6 +124,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
else
{
wp->safekeepers_generation = INVALID_GENERATION;
host = wp->config->safekeepers_list;
}
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
@@ -756,7 +757,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
if (sk_id->node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
@@ -781,7 +782,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
if (sk_id->node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
@@ -1071,7 +1072,6 @@ RecvVoteResponse(Safekeeper *sk)
/* ready for elected message */
sk->state = SS_WAIT_ELECTED;
wp->n_votes++;
/* Are we already elected? */
if (wp->state == WPS_CAMPAIGN)
{

View File

@@ -845,9 +845,6 @@ typedef struct WalProposer
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;

View File

@@ -409,14 +409,22 @@ impl JwkCacheEntryLock {
if let Some(exp) = payload.expiration {
if now >= exp + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
exp.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
)));
}
}
if let Some(nbf) = payload.not_before {
if nbf >= now + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(
JwtClaimsError::JwtTokenNotYetReadyToUse,
JwtClaimsError::JwtTokenNotYetReadyToUse(
nbf.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
));
}
}
@@ -534,10 +542,10 @@ struct JwtPayload<'a> {
#[serde(rename = "aud", default)]
audience: OneOrMany,
/// Expiration - Time after which the JWT expires
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
expiration: Option<SystemTime>,
/// Not before - Time after which the JWT expires
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
/// Not before - Time before which the JWT is not valid
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
not_before: Option<SystemTime>,
// the following entries are only extracted for the sake of debug logging.
@@ -609,8 +617,15 @@ impl<'de> Deserialize<'de> for OneOrMany {
}
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
let d = <Option<u64>>::deserialize(d)?;
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
<Option<u64>>::deserialize(d)?
.map(|t| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(t))
.ok_or_else(|| {
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
})
})
.transpose()
}
struct JwkRenewalPermit<'a> {
@@ -746,11 +761,11 @@ pub enum JwtClaimsError {
#[error("invalid JWT token audience")]
InvalidJwtTokenAudience,
#[error("JWT token has expired")]
JwtTokenHasExpired,
#[error("JWT token has expired (exp={0})")]
JwtTokenHasExpired(u64),
#[error("JWT token is not yet ready to use")]
JwtTokenNotYetReadyToUse,
#[error("JWT token is not yet ready to use (nbf={0})")]
JwtTokenNotYetReadyToUse(u64),
}
#[allow(dead_code, reason = "Debug use only")]
@@ -1233,14 +1248,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
"nbf": now + 60,
"aud": "neon",
}},
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
},
Test {
body: json! {{
"exp": now - 60,
"aud": ["neon"],
}},
error: JwtClaimsError::JwtTokenHasExpired,
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
},
Test {
body: json! {{

View File

@@ -32,12 +32,6 @@ pub(crate) enum ComputeUserInfoParseError {
option: EndpointId,
},
#[error(
"Common name inferred from SNI ('{}') is not known",
.cn,
)]
UnknownCommonName { cn: String },
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(EndpointId),
}
@@ -66,22 +60,15 @@ impl ComputeUserInfoMaybeEndpoint {
}
}
pub(crate) fn endpoint_sni(
sni: &str,
common_names: &HashSet<String>,
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
let Some((subdomain, common_name)) = sni.split_once('.') else {
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
};
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
let (subdomain, common_name) = sni.split_once('.')?;
if !common_names.contains(common_name) {
return Err(ComputeUserInfoParseError::UnknownCommonName {
cn: common_name.into(),
});
return None;
}
if subdomain == SERVERLESS_DRIVER_SNI {
return Ok(None);
return None;
}
Ok(Some(EndpointId::from(subdomain)))
Some(EndpointId::from(subdomain))
}
impl ComputeUserInfoMaybeEndpoint {
@@ -113,15 +100,8 @@ impl ComputeUserInfoMaybeEndpoint {
})
.map(|name| name.into());
let endpoint_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
endpoint_sni(sni_str, cn)?
} else {
None
}
} else {
None
};
let endpoint_from_domain =
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
let endpoint = match (endpoint_option, endpoint_from_domain) {
// Invariant: if we have both project name variants, they should match.
@@ -424,21 +404,34 @@ mod tests {
}
#[test]
fn parse_inconsistent_sni() {
fn parse_unknown_sni() {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let ctx = RequestContext::test();
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
match err {
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
}
_ => panic!("bad error: {err:?}"),
}
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.unwrap();
assert!(info.endpoint_id.is_none());
}
#[test]
fn parse_unknown_sni_with_options() {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "endpoint=foo-bar-baz-1234"),
]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let ctx = RequestContext::test();
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.unwrap();
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
}
#[test]

View File

@@ -24,9 +24,6 @@ pub(crate) enum HandshakeError {
#[error("protocol violation")]
ProtocolViolation,
#[error("missing certificate")]
MissingCertificate,
#[error("{0}")]
StreamUpgradeError(#[from] StreamUpgradeError),
@@ -42,10 +39,6 @@ impl ReportableError for HandshakeError {
match self {
HandshakeError::EarlyData => crate::error::ErrorKind::User,
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
// This error should not happen, but will if we have no default certificate and
// the client sends no SNI extension.
// If they provide SNI then we can be sure there is a certificate that matches.
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
@@ -146,7 +139,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
// try parse endpoint
let ep = conn_info
.server_name()
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
if let Some(ep) = ep {
ctx.set_endpoint_id(ep);
}
@@ -161,10 +154,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
let (_, tls_server_end_point) = tls
.cert_resolver
.resolve(conn_info.server_name())
.ok_or(HandshakeError::MissingCertificate)?;
let (_, tls_server_end_point) =
tls.cert_resolver.resolve(conn_info.server_name());
stream = PqStream {
framed: Framed {

View File

@@ -98,8 +98,7 @@ fn generate_tls_config<'a>(
.with_no_client_auth()
.with_single_cert(vec![cert.clone()], key.clone_key())?;
let mut cert_resolver = CertResolver::new();
cert_resolver.add_cert(key, vec![cert], true)?;
let cert_resolver = CertResolver::new(key, vec![cert])?;
let common_names = cert_resolver.get_common_names();

View File

@@ -199,8 +199,7 @@ fn get_conn_info(
let endpoint = match connection_url.host() {
Some(url::Host::Domain(hostname)) => {
if let Some(tls) = tls {
endpoint_sni(hostname, &tls.common_names)?
.ok_or(ConnInfoError::MalformedEndpoint)?
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
} else {
hostname
.split_once('.')

View File

@@ -5,6 +5,7 @@ use anyhow::{Context, bail};
use itertools::Itertools;
use rustls::crypto::ring::{self, sign};
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::sign::CertifiedKey;
use x509_cert::der::{Reader, SliceReader};
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
@@ -25,10 +26,8 @@ pub fn configure_tls(
certs_dir: Option<&String>,
allow_tls_keylogfile: bool,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
// add default certificate
cert_resolver.add_cert_path(key_path, cert_path, true)?;
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
@@ -40,11 +39,8 @@ pub fn configure_tls(
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver.add_cert_path(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
cert_resolver
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
}
}
}
@@ -83,92 +79,42 @@ pub fn configure_tls(
})
}
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct CertResolver {
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
}
impl CertResolver {
pub fn new() -> Self {
Self::default()
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
Self::new(priv_key, cert_chain)
}
fn add_cert_path(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
pub fn new(
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
) -> anyhow::Result<Self> {
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
})?
};
self.add_cert(priv_key, cert_chain, is_default)
let mut certs = HashMap::new();
let default = (cert.clone(), tls_server_end_point);
certs.insert(common_name, (cert, tls_server_end_point));
Ok(Self { certs, default })
}
pub fn add_cert(
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
self.add_cert(priv_key, cert_chain)
}
fn add_cert(
&mut self,
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
is_default: bool,
) -> anyhow::Result<()> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let certificate = SliceReader::new(first_cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
let common_name = certificate.tbs_certificate.subject.to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some((cert.clone(), tls_server_end_point));
}
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
self.certs.insert(common_name, (cert, tls_server_end_point));
Ok(())
}
@@ -177,12 +123,82 @@ impl CertResolver {
}
}
fn parse_key_cert(
key_path: &str,
cert_path: &str,
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
)
})?
};
Ok((priv_key, cert_chain))
}
fn process_key_cert(
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let certificate = SliceReader::new(first_cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
let common_name = certificate.tbs_certificate.subject.to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
Ok((common_name, cert, tls_server_end_point))
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
client_hello: rustls::server::ClientHello<'_>,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
self.resolve(client_hello.server_name()).map(|x| x.0)
Some(self.resolve(client_hello.server_name()).0)
}
}
@@ -190,7 +206,7 @@ impl CertResolver {
pub fn resolve(
&self,
server_name: Option<&str>,
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
@@ -200,12 +216,17 @@ impl CertResolver {
if let Some(mut sni_name) = server_name {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
return cert.clone();
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
// The customer has some custom DNS mapping - just return
// a default certificate.
//
// This will error if the customer uses anything stronger
// than sslmode=require. That's a choice they can make.
return self.default.clone();
}
}
} else {

View File

@@ -1,5 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;
use futures_util::StreamExt;
@@ -56,7 +55,7 @@ impl TimelineAnalysis {
pub(crate) async fn branch_cleanup_and_check_errors(
remote_client: &GenericRemoteStorage,
id: &TenantShardTimelineId,
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<RemoteTimelineBlobData>,
@@ -151,11 +150,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
))
}
if !tenant_objects
.lock()
.await
.check_ref(id.timeline_id, &layer, &metadata)
{
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
let path = remote_layer_path(
&id.tenant_shard_id.tenant_id,
&id.timeline_id,

View File

@@ -73,12 +73,8 @@ enum Command {
node_kind: NodeKind,
#[arg(short, long, default_value_t = false)]
json: bool,
/// If provided, only these tenants will be listed from the remote storage.
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
/// If provided, we will list all tenants, but then filter with the prefix.
#[arg(long = "tenant-id-prefix")]
tenant_id_prefix: Option<TenantId>,
#[arg(long = "post", default_value_t = false)]
post_to_storcon: bool,
#[arg(long, default_value = None)]
@@ -182,7 +178,6 @@ async fn main() -> anyhow::Result<()> {
Command::ScanMetadata {
json,
tenant_ids,
tenant_id_prefix,
node_kind,
post_to_storcon,
dump_db_connstr,
@@ -191,9 +186,6 @@ async fn main() -> anyhow::Result<()> {
verbose,
} => {
if let NodeKind::Safekeeper = node_kind {
if tenant_id_prefix.is_some() {
bail!("`tenant_id_prefix` is not supported for safekeeper node_kind");
}
let db_or_list = match (timeline_lsns, dump_db_connstr) {
(Some(timeline_lsns), _) => {
let timeline_lsns = serde_json::from_str(&timeline_lsns)
@@ -235,7 +227,6 @@ async fn main() -> anyhow::Result<()> {
bucket_config,
controller_client.as_ref(),
tenant_ids,
tenant_id_prefix,
json,
post_to_storcon,
verbose,
@@ -347,7 +338,6 @@ pub async fn run_cron_job(
bucket_config,
controller_client,
Vec::new(),
None,
true,
post_to_storcon,
false, // default to non-verbose mode
@@ -394,12 +384,10 @@ pub async fn pageserver_physical_gc_cmd(
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn scan_pageserver_metadata_cmd(
bucket_config: BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
tenant_id_prefix: Option<TenantId>,
json: bool,
post_to_storcon: bool,
verbose: bool,
@@ -410,14 +398,7 @@ pub async fn scan_pageserver_metadata_cmd(
"Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"
));
}
match scan_pageserver_metadata(
bucket_config.clone(),
tenant_shard_ids,
tenant_id_prefix,
verbose,
)
.await
{
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)

View File

@@ -1,7 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use futures::SinkExt;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
@@ -9,7 +7,6 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use tracing::{Instrument, info_span};
use utils::generation::Generation;
use utils::id::TenantId;
use utils::shard::ShardCount;
@@ -17,12 +14,10 @@ use crate::checks::{
BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
branch_cleanup_and_check_errors, list_timeline_blobs,
};
use crate::metadata_stream::{
stream_tenant_timelines, stream_tenants, stream_tenants_maybe_prefix,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
#[derive(Serialize, Default, Clone)]
#[derive(Serialize, Default)]
pub struct MetadataSummary {
tenant_count: usize,
timeline_count: usize,
@@ -107,13 +102,13 @@ impl MetadataSummary {
format!(
"Tenants: {}
Timelines: {}
Timeline-shards: {}
With errors: {}
With warnings: {}
With orphan layers: {}
Index versions: {version_summary}
",
Timelines: {}
Timeline-shards: {}
With errors: {}
With warnings: {}
With orphan layers: {}
Index versions: {version_summary}
",
self.tenant_count,
self.timeline_count,
self.timeline_shard_count,
@@ -143,243 +138,24 @@ impl MetadataSummary {
pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
tenant_id_prefix: Option<TenantId>,
verbose: bool,
) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
if !tenant_ids.is_empty() && tenant_id_prefix.is_some() {
anyhow::bail!("`tenant_id_prefix` is not supported when `tenant_ids` is provided");
}
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
let (mut list_tenants_tx, list_tenants_rx) = futures::channel::mpsc::channel(1);
let remote_client_inner = remote_client.clone();
let target_inner = target.clone();
let list_tenants = tokio::spawn(async move {
let mut cnt = 0;
if tenant_ids.is_empty() {
if let Some(tenant_id_prefix) = tenant_id_prefix {
let stream = stream_tenants_maybe_prefix(
&remote_client_inner,
&target_inner,
Some(tenant_id_prefix.to_string()),
);
let mut stream = Box::pin(stream);
while let Some(tenant) = stream.next().await {
let tenant = tenant?;
list_tenants_tx.send(tenant).await?;
cnt += 1;
}
} else {
let stream = stream_tenants(&remote_client_inner, &target_inner);
let mut stream = Box::pin(stream);
while let Some(tenant) = stream.next().await {
let tenant = tenant?;
list_tenants_tx.send(tenant).await?;
cnt += 1;
}
}
} else {
for tenant_id in tenant_ids {
list_tenants_tx.send(tenant_id).await?;
cnt += 1;
}
}
tracing::info!("list_tenants: collected {} tenants", cnt);
Ok::<_, anyhow::Error>(())
});
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
let (mut list_timelines_tx, list_timelines_rx) = futures::channel::mpsc::channel(1);
let remote_client_inner = remote_client.clone();
let target_inner = target.clone();
let list_timelines = tokio::spawn(async move {
let stream = list_tenants_rx
.map(|tenant_id| {
stream_tenant_timelines(&remote_client_inner, &target_inner, tenant_id)
})
.buffered(8)
.try_flatten();
let mut stream = Box::pin(stream);
while let Some(item) = stream.next().await {
let item = item?;
list_timelines_tx.send(item).await?;
}
Ok::<_, anyhow::Error>(())
});
let (mut read_timelines_tx, read_timelines_rx) = futures::channel::mpsc::channel(1);
let remote_client_inner = remote_client.clone();
let target_inner = target.clone();
let read_timelines = tokio::spawn(async move {
let stream = list_timelines_rx
.map(|ttid| report_on_timeline(&remote_client_inner, &target_inner, ttid))
.buffered(32);
let mut stream = Box::pin(stream);
while let Some(item) = stream.next().await {
let item = item?;
read_timelines_tx.send(item).await?;
}
Ok::<_, anyhow::Error>(())
});
let summary = Arc::new(tokio::sync::Mutex::new(MetadataSummary::new()));
let summary_inner = summary.clone();
let (mut consolidate_tenants_tx, consolidate_tenants_rx) = futures::channel::mpsc::channel(32);
let consolidate_tenants = tokio::spawn(async move {
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
let mut tenant_id = None;
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
// Iterate through all the timeline results. These are in key-order, so
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut highest_shard_count = ShardCount::MIN;
let mut read_timelines_rx = read_timelines_rx;
while let Some(i) = read_timelines_rx.next().await {
let (ttid, data) = i;
{
let mut guard = summary_inner.lock().await;
guard.update_data(&data);
}
match tenant_id {
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
summary_inner.clone(),
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
timelines,
highest_shard_count,
&mut consolidate_tenants_tx,
)
.await?;
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = ttid.tenant_shard_id.shard_count;
} else {
highest_shard_count =
highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _,
index_part_generation: _index_part_generation,
s3_layers,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
}
tenant_timeline_results.push((ttid, data));
}
if !tenant_timeline_results.is_empty() {
analyze_tenant(
summary_inner.clone(),
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
tenant_timeline_results,
highest_shard_count,
&mut consolidate_tenants_tx,
)
.await?;
}
Ok::<_, anyhow::Error>(())
});
let remote_client_inner = remote_client.clone();
let summary_inner = summary.clone();
let analyze_tenants = tokio::spawn(async move {
let stream = consolidate_tenants_rx
.map(|(ttid, tenant_objects, data)| {
let remote_client_inner = remote_client_inner.clone();
async move {
let generation = if let BlobDataParseResult::Parsed {
index_part: _,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
Some(*index_part_generation)
} else {
None
};
let res = branch_cleanup_and_check_errors(
&remote_client_inner,
&ttid,
tenant_objects.clone(),
None,
None,
Some(data),
)
.await;
(ttid, tenant_objects.clone(), generation, res)
}
})
.buffered(32);
let mut last_tenant = None;
let mut last_tenant_objects = None;
let mut timeline_generations = HashMap::new();
let mut stream = Box::pin(stream);
while let Some((ttid, tenant_objects, generation, res)) = stream.next().await {
if last_tenant != Some(ttid) {
if let Some(tenant_id) = last_tenant {
let timeline_generations = std::mem::take(&mut timeline_generations);
identify_orphans(
tenant_id.tenant_shard_id.tenant_id,
last_tenant_objects.take().unwrap(),
summary_inner.clone(),
&timeline_generations,
)
.await;
}
last_tenant = Some(ttid);
last_tenant_objects = Some(tenant_objects);
}
if let Some(generation) = generation {
timeline_generations.insert(ttid, generation);
}
{
let mut guard = summary_inner.lock().await;
guard.update_analysis(&ttid, &res, verbose);
}
}
if let Some(tenant_id) = last_tenant {
identify_orphans(
tenant_id.tenant_shard_id.tenant_id,
last_tenant_objects.take().unwrap(),
summary_inner.clone(),
&timeline_generations,
)
.await;
}
Ok::<_, anyhow::Error>(())
});
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
@@ -387,94 +163,93 @@ pub async fn scan_pageserver_metadata(
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
tracing::info!("listing blobs for timeline: {}", ttid);
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
let mut tenant_id = None;
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
// DO NOT call any long-running tasks in this function; always route them through the channel and let
// other tokio tasks handle them.
async fn analyze_tenant(
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
remote_client: &GenericRemoteStorage,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
output_tx: &mut futures::channel::mpsc::Sender<(
TenantShardTimelineId,
Arc<tokio::sync::Mutex<TenantObjectListing>>,
RemoteTimelineBlobData,
)>,
) -> anyhow::Result<()> {
{
let mut guard = summary.lock().await;
guard.tenant_count += 1;
}
verbose: bool,
) {
summary.tenant_count += 1;
let mut timeline_ids = HashSet::new();
let mut timeline_generations = HashMap::new();
for (ttid, data) in timelines {
async {
if ttid.tenant_shard_id.shard_count == highest_shard_count {
// Only analyze `TenantShardId`s with highest shard count.
if ttid.tenant_shard_id.shard_count == highest_shard_count {
// Only analyze `TenantShardId`s with highest shard count.
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation: _,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
if index_part.deleted_at.is_some() {
// skip deleted timeline.
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
return Ok(());
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
if index_part.deleted_at.is_some() {
// skip deleted timeline.
tracing::info!(
"Skip analysis of {} b/c timeline is already deleted",
ttid
);
return;
}
timeline_generations.insert(ttid, *index_part_generation);
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
remote_client,
&ttid,
&mut tenant_objects,
None,
None,
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis, verbose);
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
output_tx.send((ttid, tenant_objects.clone(), data)).await?;
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
}
Ok::<_, anyhow::Error>(())
}.instrument(
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
)
.await?;
.instrument(
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
)
.await
}
{
let mut guard = summary.lock().await;
guard.timeline_count += timeline_ids.len();
}
summary.timeline_count += timeline_ids.len();
Ok(())
}
async fn identify_orphans(
tenant_id: TenantId,
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
timeline_generations: &HashMap<TenantShardTimelineId, Generation>,
) {
// Identifying orphan layers must be done on a tenant-wide basis, because individual
// shards' layers may be referenced by other shards.
//
// Orphan layers are not a corruption, and not an indication of a problem. They are just
// consuming some space in remote storage, and may be cleaned up at leisure.
let orphans = { tenant_objects.lock().await.get_orphans() };
for (shard_index, timeline_id, layer_file, generation) in orphans {
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
let ttid = TenantShardTimelineId {
tenant_shard_id: TenantShardId {
tenant_id,
@@ -504,20 +279,83 @@ pub async fn scan_pageserver_metadata(
tracing::info!("Orphan layer detected: {orphan_path}");
{
let mut guard = summary.lock().await;
guard.notify_timeline_orphan(&ttid);
}
summary.notify_timeline_orphan(&ttid);
}
}
// TODO: bail out early if any of the tasks fail
list_tenants.await??;
list_timelines.await??;
read_timelines.await??;
consolidate_tenants.await??;
analyze_tenants.await??;
// Iterate through all the timeline results. These are in key-order, so
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut summary = MetadataSummary::new();
let mut highest_shard_count = ShardCount::MIN;
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
let summary = summary.lock().await;
Ok(summary.clone())
match tenant_id {
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
&remote_client,
prev_tenant_id,
&mut summary,
tenant_objects,
timelines,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
.await;
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = ttid.tenant_shard_id.shard_count;
} else {
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _,
index_part_generation: _index_part_generation,
s3_layers,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
}
tenant_timeline_results.push((ttid, data));
}
if !tenant_timeline_results.is_empty() {
let tenant_id = tenant_id.expect("Must be set if results are present");
analyze_tenant(
&remote_client,
tenant_id,
&mut summary,
tenant_objects,
tenant_timeline_results,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
.await;
}
Ok(summary)
}

View File

@@ -1,4 +1,5 @@
import math # Add this import
import os
import time
import traceback
from pathlib import Path
@@ -87,7 +88,10 @@ def test_cumulative_statistics_persistence(
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
"""
project = neon_api.create_project(pg_version)
project = neon_api.create_project(
pg_version,
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
endpoint_id = project["endpoints"][0]["id"]

View File

@@ -62,7 +62,9 @@ def test_ro_replica_lag(
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
project = neon_api.create_project(pg_version)
project = neon_api.create_project(
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
@@ -195,7 +197,9 @@ def test_replication_start_stop(
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
error_occurred = False
project = neon_api.create_project(pg_version)
project = neon_api.create_project(
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])

View File

@@ -202,6 +202,8 @@ def test_pageserver_gc_compaction_preempt(
env = neon_env_builder.init_start(initial_tenant_conf=conf)
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
env.pageserver.allowed_errors.append(".*failed to pipe.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
#
# Test unlogged build for GIST index
#
def test_gist(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
con = endpoint.connect()
cur = con.cursor()
iterations = 100
for _ in range(iterations):
cur.execute(
"CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off)"
)
cur.execute(
"INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i"
)
cur.execute("CREATE INDEX gist_pvactst ON pvactst USING gist (p)")
cur.execute("VACUUM pvactst")
cur.execute("DROP TABLE pvactst")

View File

@@ -5,14 +5,14 @@
],
"v16": [
"16.8",
"37496f87b5324af53c56127e278ee5b1e8435253"
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
],
"v15": [
"15.12",
"8ecb12f21d862dfa39f7204b8f5e1c00a2a225b3"
"b838c8969b7c63f3e637a769656f5f36793b797c"
],
"v14": [
"14.17",
"d3c9d61fb7a362a165dac7060819dd9d6ad68c28"
"c8dab02bfc003ae7bd59096919042d7840f3c194"
]
}